diff --git a/esphome/components/mixer/speaker/__init__.py b/esphome/components/mixer/speaker/__init__.py index c4069851af..a3025d7121 100644 --- a/esphome/components/mixer/speaker/__init__.py +++ b/esphome/components/mixer/speaker/__init__.py @@ -1,6 +1,6 @@ from esphome import automation import esphome.codegen as cg -from esphome.components import audio, esp32, speaker +from esphome.components import audio, esp32, socket, speaker import esphome.config_validation as cv from esphome.const import ( CONF_BITS_PER_SAMPLE, @@ -61,7 +61,7 @@ def _set_stream_limits(config): def _validate_source_speaker(config): fconf = fv.full_config.get() - # Get ID for the output speaker and add it to the source speakrs config to easily inherit properties + # Get ID for the output speaker and add it to the source speakers config to easily inherit properties path = fconf.get_path_for_id(config[CONF_ID])[:-3] path.append(CONF_OUTPUT_SPEAKER) output_speaker_id = fconf.get_config_for_path(path) @@ -111,6 +111,9 @@ FINAL_VALIDATE_SCHEMA = cv.All( async def to_code(config): + # Enable wake_loop_threadsafe for immediate command processing from other tasks + socket.require_wake_loop_threadsafe() + var = cg.new_Pvariable(config[CONF_ID]) await cg.register_component(var, config) @@ -127,6 +130,9 @@ async def to_code(config): "CONFIG_SPIRAM_ALLOW_STACK_EXTERNAL_MEMORY", True ) + # Initialize FixedVector with exact count of source speakers + cg.add(var.init_source_speakers(len(config[CONF_SOURCE_SPEAKERS]))) + for speaker_config in config[CONF_SOURCE_SPEAKERS]: source_speaker = cg.new_Pvariable(speaker_config[CONF_ID]) diff --git a/esphome/components/mixer/speaker/automation.h b/esphome/components/mixer/speaker/automation.h index 2fb2f49373..4fa3853583 100644 --- a/esphome/components/mixer/speaker/automation.h +++ b/esphome/components/mixer/speaker/automation.h @@ -8,8 +8,8 @@ namespace esphome { namespace mixer_speaker { template class DuckingApplyAction : public Action, public Parented { - TEMPLATABLE_VALUE(uint8_t, decibel_reduction) - TEMPLATABLE_VALUE(uint32_t, duration) + TEMPLATABLE_VALUE(uint8_t, decibel_reduction); + TEMPLATABLE_VALUE(uint32_t, duration); void play(const Ts &...x) override { this->parent_->apply_ducking(this->decibel_reduction_.value(x...), this->duration_.value(x...)); } diff --git a/esphome/components/mixer/speaker/mixer_speaker.cpp b/esphome/components/mixer/speaker/mixer_speaker.cpp index 043b629cf1..100acbebc3 100644 --- a/esphome/components/mixer/speaker/mixer_speaker.cpp +++ b/esphome/components/mixer/speaker/mixer_speaker.cpp @@ -2,11 +2,13 @@ #ifdef USE_ESP32 +#include "esphome/core/application.h" #include "esphome/core/hal.h" #include "esphome/core/helpers.h" #include "esphome/core/log.h" #include +#include #include namespace esphome { @@ -14,6 +16,7 @@ namespace mixer_speaker { static const UBaseType_t MIXER_TASK_PRIORITY = 10; +static const uint32_t STOPPING_TIMEOUT_MS = 5000; static const uint32_t TRANSFER_BUFFER_DURATION_MS = 50; static const uint32_t TASK_DELAY_MS = 25; @@ -27,21 +30,53 @@ static const char *const TAG = "speaker_mixer"; // Gives the Q15 fixed point scaling factor to reduce by 0 dB, 1dB, ..., 50 dB // dB to PCM scaling factor formula: floating_point_scale_factor = 2^(-db/6.014) // float to Q15 fixed point formula: q15_scale_factor = floating_point_scale_factor * 2^(15) -static const std::vector DECIBEL_REDUCTION_TABLE = { +static const std::array DECIBEL_REDUCTION_TABLE = { 32767, 29201, 26022, 23189, 20665, 18415, 16410, 14624, 13032, 11613, 10349, 9222, 8218, 7324, 6527, 5816, 5183, 4619, 4116, 3668, 3269, 2913, 2596, 2313, 2061, 1837, 1637, 1459, 1300, 1158, 1032, 920, 820, 731, 651, 580, 517, 461, 411, 366, 326, 291, 259, 231, 206, 183, 163, 146, 130, 116, 103}; -enum MixerEventGroupBits : uint32_t { - COMMAND_STOP = (1 << 0), // stops the mixer task - STATE_STARTING = (1 << 10), - STATE_RUNNING = (1 << 11), - STATE_STOPPING = (1 << 12), - STATE_STOPPED = (1 << 13), - ERR_ESP_NO_MEM = (1 << 19), - ALL_BITS = 0x00FFFFFF, // All valid FreeRTOS event group bits +// Event bits for SourceSpeaker command processing +enum SourceSpeakerEventBits : uint32_t { + SOURCE_SPEAKER_COMMAND_START = (1 << 0), + SOURCE_SPEAKER_COMMAND_STOP = (1 << 1), + SOURCE_SPEAKER_COMMAND_FINISH = (1 << 2), }; +// Event bits for mixer task control and state +enum MixerTaskEventBits : uint32_t { + MIXER_TASK_COMMAND_START = (1 << 0), + MIXER_TASK_COMMAND_STOP = (1 << 1), + MIXER_TASK_STATE_STARTING = (1 << 10), + MIXER_TASK_STATE_RUNNING = (1 << 11), + MIXER_TASK_STATE_STOPPING = (1 << 12), + MIXER_TASK_STATE_STOPPED = (1 << 13), + MIXER_TASK_ERR_ESP_NO_MEM = (1 << 19), + MIXER_TASK_ALL_BITS = 0x00FFFFFF, // All valid FreeRTOS event group bits +}; + +static inline uint32_t atomic_subtract_clamped(std::atomic &var, uint32_t amount) { + uint32_t current = var.load(std::memory_order_acquire); + uint32_t subtracted = 0; + if (current > 0) { + uint32_t new_value; + do { + subtracted = std::min(amount, current); + new_value = current - subtracted; + } while (!var.compare_exchange_weak(current, new_value, std::memory_order_release, std::memory_order_acquire)); + } + return subtracted; +} + +static bool create_event_group(EventGroupHandle_t &event_group, Component *component) { + event_group = xEventGroupCreate(); + if (event_group == nullptr) { + ESP_LOGE(TAG, "Failed to create event group"); + component->mark_failed(); + return false; + } + return true; +} + void SourceSpeaker::dump_config() { ESP_LOGCONFIG(TAG, "Mixer Source Speaker\n" @@ -55,22 +90,70 @@ void SourceSpeaker::dump_config() { } void SourceSpeaker::setup() { - this->parent_->get_output_speaker()->add_audio_output_callback([this](uint32_t new_frames, int64_t write_timestamp) { - // The SourceSpeaker may not have included any audio in the mixed output, so verify there were pending frames - uint32_t speakers_playback_frames = std::min(new_frames, this->pending_playback_frames_); - this->pending_playback_frames_ -= speakers_playback_frames; + if (!create_event_group(this->event_group_, this)) { + return; + } - if (speakers_playback_frames > 0) { - this->audio_output_callback_(speakers_playback_frames, write_timestamp); + // Start with loop disabled since we begin in STATE_STOPPED with no pending commands + this->disable_loop(); + + this->parent_->get_output_speaker()->add_audio_output_callback([this](uint32_t new_frames, int64_t write_timestamp) { + // First, drain the playback delay (frames in pipeline before this source started contributing) + uint32_t delay_to_drain = atomic_subtract_clamped(this->playback_delay_frames_, new_frames); + uint32_t remaining_frames = new_frames - delay_to_drain; + + // Then, count towards this source's pending playback frames + if (remaining_frames > 0) { + uint32_t speakers_playback_frames = atomic_subtract_clamped(this->pending_playback_frames_, remaining_frames); + if (speakers_playback_frames > 0) { + this->audio_output_callback_(speakers_playback_frames, write_timestamp); + } } }); } void SourceSpeaker::loop() { + uint32_t event_bits = xEventGroupGetBits(this->event_group_); + + // Process commands with priority: STOP > FINISH > START + // This ensures stop commands take precedence over conflicting start commands + if (event_bits & SOURCE_SPEAKER_COMMAND_STOP) { + if (this->state_ == speaker::STATE_RUNNING) { + // Clear both STOP and START bits - stop takes precedence + xEventGroupClearBits(this->event_group_, SOURCE_SPEAKER_COMMAND_STOP | SOURCE_SPEAKER_COMMAND_START); + this->enter_stopping_state_(); + } else if (this->state_ == speaker::STATE_STOPPED) { + // Already stopped, just clear the command bits + xEventGroupClearBits(this->event_group_, SOURCE_SPEAKER_COMMAND_STOP | SOURCE_SPEAKER_COMMAND_START); + } + // Leave bits set if transitioning states (STARTING/STOPPING) - will be processed once state allows + } else if (event_bits & SOURCE_SPEAKER_COMMAND_FINISH) { + if (this->state_ == speaker::STATE_RUNNING) { + xEventGroupClearBits(this->event_group_, SOURCE_SPEAKER_COMMAND_FINISH); + this->stop_gracefully_ = true; + } else if (this->state_ == speaker::STATE_STOPPED) { + // Already stopped, just clear the command bit + xEventGroupClearBits(this->event_group_, SOURCE_SPEAKER_COMMAND_FINISH); + } + // Leave bit set if transitioning states - will be processed once state allows + } else if (event_bits & SOURCE_SPEAKER_COMMAND_START) { + if (this->state_ == speaker::STATE_STOPPED) { + xEventGroupClearBits(this->event_group_, SOURCE_SPEAKER_COMMAND_START); + this->state_ = speaker::STATE_STARTING; + } else if (this->state_ == speaker::STATE_RUNNING) { + // Already running, just clear the command bit + xEventGroupClearBits(this->event_group_, SOURCE_SPEAKER_COMMAND_START); + } + // Leave bit set if transitioning states - will be processed once state allows + } + // Process state machine switch (this->state_) { case speaker::STATE_STARTING: { esp_err_t err = this->start_(); if (err == ESP_OK) { + this->pending_playback_frames_.store(0, std::memory_order_release); // reset pending playback frames + this->playback_delay_frames_.store(0, std::memory_order_release); // reset playback delay + this->has_contributed_.store(false, std::memory_order_release); // reset contribution tracking this->state_ = speaker::STATE_RUNNING; this->stop_gracefully_ = false; this->last_seen_data_ms_ = millis(); @@ -78,41 +161,62 @@ void SourceSpeaker::loop() { } else { switch (err) { case ESP_ERR_NO_MEM: - this->status_set_error(LOG_STR("Failed to start mixer: not enough memory")); + this->status_set_error(LOG_STR("Not enough memory")); break; case ESP_ERR_NOT_SUPPORTED: - this->status_set_error(LOG_STR("Failed to start mixer: unsupported bits per sample")); + this->status_set_error(LOG_STR("Unsupported bit depth")); break; case ESP_ERR_INVALID_ARG: - this->status_set_error( - LOG_STR("Failed to start mixer: audio stream isn't compatible with the other audio stream.")); + this->status_set_error(LOG_STR("Incompatible audio streams")); break; case ESP_ERR_INVALID_STATE: - this->status_set_error(LOG_STR("Failed to start mixer: mixer task failed to start")); + this->status_set_error(LOG_STR("Task failed")); break; default: - this->status_set_error(LOG_STR("Failed to start mixer")); + this->status_set_error(LOG_STR("Failed")); break; } - this->state_ = speaker::STATE_STOPPING; + this->enter_stopping_state_(); } break; } case speaker::STATE_RUNNING: - if (!this->transfer_buffer_->has_buffered_data()) { + if (!this->transfer_buffer_->has_buffered_data() && + (this->pending_playback_frames_.load(std::memory_order_acquire) == 0)) { + // No audio data in buffer waiting to get mixed and no frames are pending playback if ((this->timeout_ms_.has_value() && ((millis() - this->last_seen_data_ms_) > this->timeout_ms_.value())) || this->stop_gracefully_) { - this->state_ = speaker::STATE_STOPPING; + // Timeout exceeded or graceful stop requested + this->enter_stopping_state_(); } } break; - case speaker::STATE_STOPPING: - this->stop_(); - this->stop_gracefully_ = false; - this->state_ = speaker::STATE_STOPPED; + case speaker::STATE_STOPPING: { + if ((this->parent_->get_output_speaker()->get_pause_state()) || + ((millis() - this->stopping_start_ms_) > STOPPING_TIMEOUT_MS)) { + // If parent speaker is paused or if the stopping timeout is exceeded, force stop the output speaker + this->parent_->get_output_speaker()->stop(); + } + + if (this->parent_->get_output_speaker()->is_stopped() || + (this->pending_playback_frames_.load(std::memory_order_acquire) == 0)) { + // Output speaker is stopped OR all pending playback frames have played + this->pending_playback_frames_.store(0, std::memory_order_release); + this->stop_gracefully_ = false; + + this->state_ = speaker::STATE_STOPPED; + } break; + } case speaker::STATE_STOPPED: + // Re-check event bits for any new commands that may have arrived + event_bits = xEventGroupGetBits(this->event_group_); + if (!(event_bits & + (SOURCE_SPEAKER_COMMAND_START | SOURCE_SPEAKER_COMMAND_STOP | SOURCE_SPEAKER_COMMAND_FINISH))) { + // No pending commands, disable loop to save CPU cycles + this->disable_loop(); + } break; } } @@ -122,17 +226,34 @@ size_t SourceSpeaker::play(const uint8_t *data, size_t length, TickType_t ticks_ this->start(); } size_t bytes_written = 0; - if (this->ring_buffer_.use_count() == 1) { - std::shared_ptr temp_ring_buffer = this->ring_buffer_.lock(); + std::shared_ptr temp_ring_buffer = this->ring_buffer_.lock(); + if (temp_ring_buffer.use_count() > 0) { + // Only write to the ring buffer if the reference is valid bytes_written = temp_ring_buffer->write_without_replacement(data, length, ticks_to_wait); if (bytes_written > 0) { this->last_seen_data_ms_ = millis(); } + } else { + // Delay to avoid repeatedly hammering while waiting for the speaker to start + vTaskDelay(ticks_to_wait); } return bytes_written; } -void SourceSpeaker::start() { this->state_ = speaker::STATE_STARTING; } +void SourceSpeaker::send_command_(uint32_t command_bit, bool wake_loop) { + this->enable_loop_soon_any_context(); + uint32_t event_bits = xEventGroupGetBits(this->event_group_); + if (!(event_bits & command_bit)) { + xEventGroupSetBits(this->event_group_, command_bit); +#if defined(USE_SOCKET_SELECT_SUPPORT) && defined(USE_WAKE_LOOP_THREADSAFE) + if (wake_loop) { + App.wake_loop_threadsafe(); + } +#endif + } +} + +void SourceSpeaker::start() { this->send_command_(SOURCE_SPEAKER_COMMAND_START, true); } esp_err_t SourceSpeaker::start_() { const size_t ring_buffer_size = this->audio_stream_info_.ms_to_bytes(this->buffer_duration_ms_); @@ -143,35 +264,26 @@ esp_err_t SourceSpeaker::start_() { if (this->transfer_buffer_ == nullptr) { return ESP_ERR_NO_MEM; } - std::shared_ptr temp_ring_buffer; - if (!this->ring_buffer_.use_count()) { + std::shared_ptr temp_ring_buffer = this->ring_buffer_.lock(); + if (!temp_ring_buffer) { temp_ring_buffer = RingBuffer::create(ring_buffer_size); this->ring_buffer_ = temp_ring_buffer; } - if (!this->ring_buffer_.use_count()) { + if (!temp_ring_buffer) { return ESP_ERR_NO_MEM; } else { this->transfer_buffer_->set_source(temp_ring_buffer); } } - this->pending_playback_frames_ = 0; // reset return this->parent_->start(this->audio_stream_info_); } -void SourceSpeaker::stop() { - if (this->state_ != speaker::STATE_STOPPED) { - this->state_ = speaker::STATE_STOPPING; - } -} +void SourceSpeaker::stop() { this->send_command_(SOURCE_SPEAKER_COMMAND_STOP); } -void SourceSpeaker::stop_() { - this->transfer_buffer_.reset(); // deallocates the transfer buffer -} - -void SourceSpeaker::finish() { this->stop_gracefully_ = true; } +void SourceSpeaker::finish() { this->send_command_(SOURCE_SPEAKER_COMMAND_FINISH); } bool SourceSpeaker::has_buffered_data() const { return ((this->transfer_buffer_.use_count() > 0) && this->transfer_buffer_->has_buffered_data()); @@ -191,19 +303,16 @@ void SourceSpeaker::set_volume(float volume) { float SourceSpeaker::get_volume() { return this->parent_->get_output_speaker()->get_volume(); } -size_t SourceSpeaker::process_data_from_source(TickType_t ticks_to_wait) { - if (!this->transfer_buffer_.use_count()) { - return 0; - } - +size_t SourceSpeaker::process_data_from_source(std::shared_ptr &transfer_buffer, + TickType_t ticks_to_wait) { // Store current offset, as these samples are already ducked - const size_t current_length = this->transfer_buffer_->available(); + const size_t current_length = transfer_buffer->available(); - size_t bytes_read = this->transfer_buffer_->transfer_data_from_source(ticks_to_wait); + size_t bytes_read = transfer_buffer->transfer_data_from_source(ticks_to_wait); uint32_t samples_to_duck = this->audio_stream_info_.bytes_to_samples(bytes_read); if (samples_to_duck > 0) { - int16_t *current_buffer = reinterpret_cast(this->transfer_buffer_->get_buffer_start() + current_length); + int16_t *current_buffer = reinterpret_cast(transfer_buffer->get_buffer_start() + current_length); duck_samples(current_buffer, samples_to_duck, &this->current_ducking_db_reduction_, &this->ducking_transition_samples_remaining_, this->samples_per_ducking_step_, @@ -215,10 +324,13 @@ size_t SourceSpeaker::process_data_from_source(TickType_t ticks_to_wait) { void SourceSpeaker::apply_ducking(uint8_t decibel_reduction, uint32_t duration) { if (this->target_ducking_db_reduction_ != decibel_reduction) { + // Start transition from the previous target (which becomes the new current level) this->current_ducking_db_reduction_ = this->target_ducking_db_reduction_; this->target_ducking_db_reduction_ = decibel_reduction; + // Calculate the number of intermediate dB steps for the transition timing. + // Subtract 1 because the first step is taken immediately after this calculation. uint8_t total_ducking_steps = 0; if (this->target_ducking_db_reduction_ > this->current_ducking_db_reduction_) { // The dB reduction level is increasing (which results in quieter audio) @@ -234,7 +346,7 @@ void SourceSpeaker::apply_ducking(uint8_t decibel_reduction, uint32_t duration) this->samples_per_ducking_step_ = this->ducking_transition_samples_remaining_ / total_ducking_steps; this->ducking_transition_samples_remaining_ = - this->samples_per_ducking_step_ * total_ducking_steps; // Adjust for integer division rounding + this->samples_per_ducking_step_ * total_ducking_steps; // adjust for integer division rounding this->current_ducking_db_reduction_ += this->db_change_per_ducking_step_; } else { @@ -293,6 +405,12 @@ void SourceSpeaker::duck_samples(int16_t *input_buffer, uint32_t input_samples_t } } +void SourceSpeaker::enter_stopping_state_() { + this->state_ = speaker::STATE_STOPPING; + this->stopping_start_ms_ = millis(); + this->transfer_buffer_.reset(); +} + void MixerSpeaker::dump_config() { ESP_LOGCONFIG(TAG, "Speaker Mixer:\n" @@ -301,42 +419,74 @@ void MixerSpeaker::dump_config() { } void MixerSpeaker::setup() { - this->event_group_ = xEventGroupCreate(); - - if (this->event_group_ == nullptr) { - ESP_LOGE(TAG, "Failed to create event group"); - this->mark_failed(); + if (!create_event_group(this->event_group_, this)) { return; } + + // Register callback to track frames in the output pipeline + this->output_speaker_->add_audio_output_callback([this](uint32_t new_frames, int64_t write_timestamp) { + atomic_subtract_clamped(this->frames_in_pipeline_, new_frames); + }); + + // Start with loop disabled since no task is running and no commands are pending + this->disable_loop(); } void MixerSpeaker::loop() { uint32_t event_group_bits = xEventGroupGetBits(this->event_group_); - if (event_group_bits & MixerEventGroupBits::STATE_STARTING) { - ESP_LOGD(TAG, "Starting speaker mixer"); - xEventGroupClearBits(this->event_group_, MixerEventGroupBits::STATE_STARTING); + // Handle pending start request + if (event_group_bits & MIXER_TASK_COMMAND_START) { + // Only start the task if it's fully stopped and cleaned up + if (!this->status_has_error() && (this->task_handle_ == nullptr) && (this->task_stack_buffer_ == nullptr)) { + esp_err_t err = this->start_task_(); + switch (err) { + case ESP_OK: + xEventGroupClearBits(this->event_group_, MIXER_TASK_COMMAND_START); + break; + case ESP_ERR_NO_MEM: + ESP_LOGE(TAG, "Failed to start; retrying in 1 second"); + this->status_momentary_error("memory-failure", 1000); + return; + case ESP_ERR_INVALID_STATE: + ESP_LOGE(TAG, "Failed to start; retrying in 1 second"); + this->status_momentary_error("task-failure", 1000); + return; + default: + ESP_LOGE(TAG, "Failed to start; retrying in 1 second"); + this->status_momentary_error("failure", 1000); + return; + } + } } - if (event_group_bits & MixerEventGroupBits::ERR_ESP_NO_MEM) { - this->status_set_error(LOG_STR("Failed to allocate the mixer's internal buffer")); - xEventGroupClearBits(this->event_group_, MixerEventGroupBits::ERR_ESP_NO_MEM); + + if (event_group_bits & MIXER_TASK_STATE_STARTING) { + ESP_LOGD(TAG, "Starting"); + xEventGroupClearBits(this->event_group_, MIXER_TASK_STATE_STARTING); } - if (event_group_bits & MixerEventGroupBits::STATE_RUNNING) { - ESP_LOGD(TAG, "Started speaker mixer"); + if (event_group_bits & MIXER_TASK_ERR_ESP_NO_MEM) { + this->status_set_error(LOG_STR("Not enough memory")); + xEventGroupClearBits(this->event_group_, MIXER_TASK_ERR_ESP_NO_MEM); + } + if (event_group_bits & MIXER_TASK_STATE_RUNNING) { + ESP_LOGV(TAG, "Started"); this->status_clear_error(); - xEventGroupClearBits(this->event_group_, MixerEventGroupBits::STATE_RUNNING); + xEventGroupClearBits(this->event_group_, MIXER_TASK_STATE_RUNNING); } - if (event_group_bits & MixerEventGroupBits::STATE_STOPPING) { - ESP_LOGD(TAG, "Stopping speaker mixer"); - xEventGroupClearBits(this->event_group_, MixerEventGroupBits::STATE_STOPPING); + if (event_group_bits & MIXER_TASK_STATE_STOPPING) { + ESP_LOGV(TAG, "Stopping"); + xEventGroupClearBits(this->event_group_, MIXER_TASK_STATE_STOPPING); } - if (event_group_bits & MixerEventGroupBits::STATE_STOPPED) { + if (event_group_bits & MIXER_TASK_STATE_STOPPED) { if (this->delete_task_() == ESP_OK) { - xEventGroupClearBits(this->event_group_, MixerEventGroupBits::ALL_BITS); + ESP_LOGD(TAG, "Stopped"); + xEventGroupClearBits(this->event_group_, MIXER_TASK_ALL_BITS); } } if (this->task_handle_ != nullptr) { + // If the mixer task is running, check if all source speakers are stopped + bool all_stopped = true; for (auto &speaker : this->source_speakers_) { @@ -344,7 +494,15 @@ void MixerSpeaker::loop() { } if (all_stopped) { - this->stop(); + // Send stop command signal to the mixer task since no source speakers are active + xEventGroupSetBits(this->event_group_, MIXER_TASK_COMMAND_STOP); + } + } else if (this->task_stack_buffer_ == nullptr) { + // Task is fully stopped and cleaned up, check if we can disable loop + event_group_bits = xEventGroupGetBits(this->event_group_); + if (event_group_bits == 0) { + // No pending events, disable loop to save CPU cycles + this->disable_loop(); } } } @@ -366,7 +524,18 @@ esp_err_t MixerSpeaker::start(audio::AudioStreamInfo &stream_info) { } } - return this->start_task_(); + this->enable_loop_soon_any_context(); // ensure loop processes command + + uint32_t event_bits = xEventGroupGetBits(this->event_group_); + if (!(event_bits & MIXER_TASK_COMMAND_START)) { + // Set MIXER_TASK_COMMAND_START bit if not already set, and then immediately wake for low latency + xEventGroupSetBits(this->event_group_, MIXER_TASK_COMMAND_START); +#if defined(USE_SOCKET_SELECT_SUPPORT) && defined(USE_WAKE_LOOP_THREADSAFE) + App.wake_loop_threadsafe(); +#endif + } + + return ESP_OK; } esp_err_t MixerSpeaker::start_task_() { @@ -397,28 +566,31 @@ esp_err_t MixerSpeaker::start_task_() { } esp_err_t MixerSpeaker::delete_task_() { - if (!this->task_created_) { + if (this->task_handle_ != nullptr) { + // Delete the task + vTaskDelete(this->task_handle_); this->task_handle_ = nullptr; - - if (this->task_stack_buffer_ != nullptr) { - if (this->task_stack_in_psram_) { - RAMAllocator stack_allocator(RAMAllocator::ALLOC_EXTERNAL); - stack_allocator.deallocate(this->task_stack_buffer_, TASK_STACK_SIZE); - } else { - RAMAllocator stack_allocator(RAMAllocator::ALLOC_INTERNAL); - stack_allocator.deallocate(this->task_stack_buffer_, TASK_STACK_SIZE); - } - - this->task_stack_buffer_ = nullptr; - } - - return ESP_OK; } - return ESP_ERR_INVALID_STATE; -} + if ((this->task_handle_ == nullptr) && (this->task_stack_buffer_ != nullptr)) { + // Deallocate the task stack buffer + if (this->task_stack_in_psram_) { + RAMAllocator stack_allocator(RAMAllocator::ALLOC_EXTERNAL); + stack_allocator.deallocate(this->task_stack_buffer_, TASK_STACK_SIZE); + } else { + RAMAllocator stack_allocator(RAMAllocator::ALLOC_INTERNAL); + stack_allocator.deallocate(this->task_stack_buffer_, TASK_STACK_SIZE); + } -void MixerSpeaker::stop() { xEventGroupSetBits(this->event_group_, MixerEventGroupBits::COMMAND_STOP); } + this->task_stack_buffer_ = nullptr; + } + + if ((this->task_handle_ != nullptr) || (this->task_stack_buffer_ != nullptr)) { + return ESP_ERR_INVALID_STATE; + } + + return ESP_OK; +} void MixerSpeaker::copy_frames(const int16_t *input_buffer, audio::AudioStreamInfo input_stream_info, int16_t *output_buffer, audio::AudioStreamInfo output_stream_info, @@ -472,32 +644,34 @@ void MixerSpeaker::mix_audio_samples(const int16_t *primary_buffer, audio::Audio } void MixerSpeaker::audio_mixer_task(void *params) { - MixerSpeaker *this_mixer = (MixerSpeaker *) params; + MixerSpeaker *this_mixer = static_cast(params); - xEventGroupSetBits(this_mixer->event_group_, MixerEventGroupBits::STATE_STARTING); - - this_mixer->task_created_ = true; + xEventGroupSetBits(this_mixer->event_group_, MIXER_TASK_STATE_STARTING); std::unique_ptr output_transfer_buffer = audio::AudioSinkTransferBuffer::create( this_mixer->audio_stream_info_.value().ms_to_bytes(TRANSFER_BUFFER_DURATION_MS)); if (output_transfer_buffer == nullptr) { - xEventGroupSetBits(this_mixer->event_group_, - MixerEventGroupBits::STATE_STOPPED | MixerEventGroupBits::ERR_ESP_NO_MEM); + xEventGroupSetBits(this_mixer->event_group_, MIXER_TASK_STATE_STOPPED | MIXER_TASK_ERR_ESP_NO_MEM); - this_mixer->task_created_ = false; - vTaskDelete(nullptr); + vTaskSuspend(nullptr); // Suspend this task indefinitely until the loop method deletes it } output_transfer_buffer->set_sink(this_mixer->output_speaker_); - xEventGroupSetBits(this_mixer->event_group_, MixerEventGroupBits::STATE_RUNNING); + xEventGroupSetBits(this_mixer->event_group_, MIXER_TASK_STATE_RUNNING); bool sent_finished = false; + // Pre-allocate vectors to avoid heap allocation in the loop (max 8 source speakers per schema) + FixedVector speakers_with_data; + FixedVector> transfer_buffers_with_data; + speakers_with_data.init(this_mixer->source_speakers_.size()); + transfer_buffers_with_data.init(this_mixer->source_speakers_.size()); + while (true) { uint32_t event_group_bits = xEventGroupGetBits(this_mixer->event_group_); - if (event_group_bits & MixerEventGroupBits::COMMAND_STOP) { + if (event_group_bits & MIXER_TASK_COMMAND_STOP) { break; } @@ -507,15 +681,20 @@ void MixerSpeaker::audio_mixer_task(void *params) { const uint32_t output_frames_free = this_mixer->audio_stream_info_.value().bytes_to_frames(output_transfer_buffer->free()); - std::vector speakers_with_data; - std::vector> transfer_buffers_with_data; + speakers_with_data.clear(); + transfer_buffers_with_data.clear(); for (auto &speaker : this_mixer->source_speakers_) { - if (speaker->get_transfer_buffer().use_count() > 0) { + if (speaker->is_running() && !speaker->get_pause_state()) { + // Speaker is running and not paused, so it possibly can provide audio data std::shared_ptr transfer_buffer = speaker->get_transfer_buffer().lock(); - speaker->process_data_from_source(0); // Transfers and ducks audio from source ring buffers + if (transfer_buffer.use_count() == 0) { + // No transfer buffer allocated, so skip processing this speaker + continue; + } + speaker->process_data_from_source(transfer_buffer, 0); // Transfers and ducks audio from source ring buffers - if ((transfer_buffer->available() > 0) && !speaker->get_pause_state()) { + if (transfer_buffer->available() > 0) { // Store the locked transfer buffers in their own vector to avoid releasing ownership until after the loop transfer_buffers_with_data.push_back(transfer_buffer); speakers_with_data.push_back(speaker); @@ -547,13 +726,21 @@ void MixerSpeaker::audio_mixer_task(void *params) { reinterpret_cast(output_transfer_buffer->get_buffer_end()), this_mixer->audio_stream_info_.value(), frames_to_mix); - // Update source speaker buffer length - transfer_buffers_with_data[0]->decrease_buffer_length(active_stream_info.frames_to_bytes(frames_to_mix)); - speakers_with_data[0]->pending_playback_frames_ += frames_to_mix; + // Set playback delay for newly contributing source + if (!speakers_with_data[0]->has_contributed_.load(std::memory_order_acquire)) { + speakers_with_data[0]->playback_delay_frames_.store( + this_mixer->frames_in_pipeline_.load(std::memory_order_acquire), std::memory_order_release); + speakers_with_data[0]->has_contributed_.store(true, std::memory_order_release); + } - // Update output transfer buffer length + // Update source speaker pending frames + speakers_with_data[0]->pending_playback_frames_.fetch_add(frames_to_mix, std::memory_order_release); + transfer_buffers_with_data[0]->decrease_buffer_length(active_stream_info.frames_to_bytes(frames_to_mix)); + + // Update output transfer buffer length and pipeline frame count output_transfer_buffer->increase_buffer_length( this_mixer->audio_stream_info_.value().frames_to_bytes(frames_to_mix)); + this_mixer->frames_in_pipeline_.fetch_add(frames_to_mix, std::memory_order_release); } else { // Speaker's stream info doesn't match the output speaker's, so it's a new source speaker if (!this_mixer->output_speaker_->is_stopped()) { @@ -568,6 +755,8 @@ void MixerSpeaker::audio_mixer_task(void *params) { active_stream_info.get_sample_rate()); this_mixer->output_speaker_->set_audio_stream_info(this_mixer->audio_stream_info_.value()); this_mixer->output_speaker_->start(); + // Reset pipeline frame count since we're starting fresh with a new sample rate + this_mixer->frames_in_pipeline_.store(0, std::memory_order_release); sent_finished = false; } } @@ -596,26 +785,39 @@ void MixerSpeaker::audio_mixer_task(void *params) { } } + // Get current pipeline depth for delay calculation (before incrementing) + uint32_t current_pipeline_frames = this_mixer->frames_in_pipeline_.load(std::memory_order_acquire); + // Update source transfer buffer lengths and add new audio durations to the source speaker pending playbacks for (size_t i = 0; i < transfer_buffers_with_data.size(); ++i) { + // Set playback delay for newly contributing sources + if (!speakers_with_data[i]->has_contributed_.load(std::memory_order_acquire)) { + speakers_with_data[i]->playback_delay_frames_.store(current_pipeline_frames, std::memory_order_release); + speakers_with_data[i]->has_contributed_.store(true, std::memory_order_release); + } + + speakers_with_data[i]->pending_playback_frames_.fetch_add(frames_to_mix, std::memory_order_release); transfer_buffers_with_data[i]->decrease_buffer_length( speakers_with_data[i]->get_audio_stream_info().frames_to_bytes(frames_to_mix)); - speakers_with_data[i]->pending_playback_frames_ += frames_to_mix; } - // Update output transfer buffer length + // Update output transfer buffer length and pipeline frame count (once, not per source) output_transfer_buffer->increase_buffer_length( this_mixer->audio_stream_info_.value().frames_to_bytes(frames_to_mix)); + this_mixer->frames_in_pipeline_.fetch_add(frames_to_mix, std::memory_order_release); } } - xEventGroupSetBits(this_mixer->event_group_, MixerEventGroupBits::STATE_STOPPING); + xEventGroupSetBits(this_mixer->event_group_, MIXER_TASK_STATE_STOPPING); + + // Reset pipeline frame count since the task is stopping + this_mixer->frames_in_pipeline_.store(0, std::memory_order_release); output_transfer_buffer.reset(); - xEventGroupSetBits(this_mixer->event_group_, MixerEventGroupBits::STATE_STOPPED); - this_mixer->task_created_ = false; - vTaskDelete(nullptr); + xEventGroupSetBits(this_mixer->event_group_, MIXER_TASK_STATE_STOPPED); + + vTaskSuspend(nullptr); // Suspend this task indefinitely until the loop method deletes it } } // namespace mixer_speaker diff --git a/esphome/components/mixer/speaker/mixer_speaker.h b/esphome/components/mixer/speaker/mixer_speaker.h index 48bacc4471..e920f9895a 100644 --- a/esphome/components/mixer/speaker/mixer_speaker.h +++ b/esphome/components/mixer/speaker/mixer_speaker.h @@ -7,26 +7,31 @@ #include "esphome/components/speaker/speaker.h" #include "esphome/core/component.h" +#include "esphome/core/helpers.h" -#include #include +#include + +#include namespace esphome { namespace mixer_speaker { /* Classes for mixing several source speaker audio streams and writing it to another speaker component. * - Volume controls are passed through to the output speaker + * - Source speaker commands are signaled via event group bits and processed in its loop function to ensure thread + * safety * - Directly handles pausing at the SourceSpeaker level; pause state is not passed through to the output speaker. - * - Audio sent to the SourceSpeaker's must have 16 bits per sample. + * - Audio sent to the SourceSpeaker must have 16 bits per sample. * - Audio sent to the SourceSpeaker can have any number of channels. They are duplicated or ignored as needed to match * the number of channels required for the output speaker. - * - In queue mode, the audio sent to the SoureSpeakers can have different sample rates. + * - In queue mode, the audio sent to the SourceSpeakers can have different sample rates. * - In non-queue mode, the audio sent to the SourceSpeakers must have the same sample rates. * - SourceSpeaker has an internal ring buffer. It also allocates a shared_ptr for an AudioTranserBuffer object. * - Audio Data Flow: * - Audio data played on a SourceSpeaker first writes to its internal ring buffer. * - MixerSpeaker task temporarily takes shared ownership of each SourceSpeaker's AudioTransferBuffer. - * - MixerSpeaker calls SourceSpeaker's `process_data_from_source`, which tranfers audio from the SourceSpeaker's + * - MixerSpeaker calls SourceSpeaker's `process_data_from_source`, which transfers audio from the SourceSpeaker's * ring buffer to its AudioTransferBuffer. Audio ducking is applied at this step. * - In queue mode, MixerSpeaker prioritizes the earliest configured SourceSpeaker with audio data. Audio data is * sent to the output speaker. @@ -63,13 +68,15 @@ class SourceSpeaker : public speaker::Speaker, public Component { bool get_pause_state() const override { return this->pause_state_; } /// @brief Transfers audio from the ring buffer into the transfer buffer. Ducks audio while transferring. + /// @param transfer_buffer Locked shared_ptr to the transfer buffer (must be valid, not null) /// @param ticks_to_wait FreeRTOS ticks to wait while waiting to read from the ring buffer. /// @return Number of bytes transferred from the ring buffer. - size_t process_data_from_source(TickType_t ticks_to_wait); + size_t process_data_from_source(std::shared_ptr &transfer_buffer, + TickType_t ticks_to_wait); /// @brief Sets the ducking level for the source speaker. - /// @param decibel_reduction (uint8_t) The dB reduction level. For example, 0 is no change, 10 is a reduction by 10 dB - /// @param duration (uint32_t) The number of milliseconds to transition from the current level to the new level + /// @param decibel_reduction The dB reduction level. For example, 0 is no change, 10 is a reduction by 10 dB + /// @param duration The number of milliseconds to transition from the current level to the new level void apply_ducking(uint8_t decibel_reduction, uint32_t duration); void set_buffer_duration(uint32_t buffer_duration_ms) { this->buffer_duration_ms_ = buffer_duration_ms; } @@ -81,14 +88,15 @@ class SourceSpeaker : public speaker::Speaker, public Component { protected: friend class MixerSpeaker; esp_err_t start_(); - void stop_(); + void enter_stopping_state_(); + void send_command_(uint32_t command_bit, bool wake_loop = false); /// @brief Ducks audio samples by a specified amount. When changing the ducking amount, it can transition gradually /// over a specified amount of samples. /// @param input_buffer buffer with audio samples to be ducked in place /// @param input_samples_to_duck number of samples to process in ``input_buffer`` /// @param current_ducking_db_reduction pointer to the current dB reduction - /// @param ducking_transition_samples_remaining pointer to the total number of samples left before the the + /// @param ducking_transition_samples_remaining pointer to the total number of samples left before the /// transition is finished /// @param samples_per_ducking_step total number of samples per ducking step for the transition /// @param db_change_per_ducking_step the change in dB reduction per step @@ -114,7 +122,12 @@ class SourceSpeaker : public speaker::Speaker, public Component { uint32_t ducking_transition_samples_remaining_{0}; uint32_t samples_per_ducking_step_{0}; - uint32_t pending_playback_frames_{0}; + std::atomic pending_playback_frames_{0}; + std::atomic playback_delay_frames_{0}; // Frames in output pipeline when this source started contributing + std::atomic has_contributed_{false}; // Tracks if source has contributed during this session + + EventGroupHandle_t event_group_{nullptr}; + uint32_t stopping_start_ms_{0}; }; class MixerSpeaker : public Component { @@ -123,10 +136,11 @@ class MixerSpeaker : public Component { void setup() override; void loop() override; + void init_source_speakers(size_t count) { this->source_speakers_.init(count); } void add_source_speaker(SourceSpeaker *source_speaker) { this->source_speakers_.push_back(source_speaker); } /// @brief Starts the mixer task. Called by a source speaker giving the current audio stream information - /// @param stream_info The calling source speakers audio stream information + /// @param stream_info The calling source speaker's audio stream information /// @return ESP_ERR_NOT_SUPPORTED if the incoming stream is incompatible due to unsupported bits per sample /// ESP_ERR_INVALID_ARG if the incoming stream is incompatible to be mixed with the other input audio stream /// ESP_ERR_NO_MEM if there isn't enough memory for the task's stack @@ -134,8 +148,6 @@ class MixerSpeaker : public Component { /// ESP_OK if the incoming stream is compatible and the mixer task starts esp_err_t start(audio::AudioStreamInfo &stream_info); - void stop(); - void set_output_channels(uint8_t output_channels) { this->output_channels_ = output_channels; } void set_output_speaker(speaker::Speaker *speaker) { this->output_speaker_ = speaker; } void set_queue_mode(bool queue_mode) { this->queue_mode_ = queue_mode; } @@ -143,6 +155,9 @@ class MixerSpeaker : public Component { speaker::Speaker *get_output_speaker() const { return this->output_speaker_; } + /// @brief Returns the current number of frames in the output pipeline (written but not yet played) + uint32_t get_frames_in_pipeline() const { return this->frames_in_pipeline_.load(std::memory_order_acquire); } + protected: /// @brief Copies audio frames from the input buffer to the output buffer taking into account the number of channels /// in each stream. If the output stream has more channels, the input samples are duplicated. If the output stream has @@ -159,11 +174,11 @@ class MixerSpeaker : public Component { /// and secondary samples are duplicated or dropped as necessary to ensure the output stream has the configured number /// of channels. Output samples are clamped to the corresponding int16 min or max values if the mixed sample /// overflows. - /// @param primary_buffer (int16_t *) samples buffer for the primary stream + /// @param primary_buffer samples buffer for the primary stream /// @param primary_stream_info stream info for the primary stream - /// @param secondary_buffer (int16_t *) samples buffer for secondary stream + /// @param secondary_buffer samples buffer for secondary stream /// @param secondary_stream_info stream info for the secondary stream - /// @param output_buffer (int16_t *) buffer for the mixed samples + /// @param output_buffer buffer for the mixed samples /// @param output_stream_info stream info for the output buffer /// @param frames_to_mix number of frames in the primary and secondary buffers to mix together static void mix_audio_samples(const int16_t *primary_buffer, audio::AudioStreamInfo primary_stream_info, @@ -185,20 +200,20 @@ class MixerSpeaker : public Component { EventGroupHandle_t event_group_{nullptr}; - std::vector source_speakers_; + FixedVector source_speakers_; speaker::Speaker *output_speaker_{nullptr}; uint8_t output_channels_; bool queue_mode_; bool task_stack_in_psram_{false}; - bool task_created_{false}; - TaskHandle_t task_handle_{nullptr}; StaticTask_t task_stack_; StackType_t *task_stack_buffer_{nullptr}; optional audio_stream_info_; + + std::atomic frames_in_pipeline_{0}; // Frames written to output but not yet played }; } // namespace mixer_speaker