diff --git a/esphome/components/logger/__init__.py b/esphome/components/logger/__init__.py index 0a6035f8d1..79a9a4208c 100644 --- a/esphome/components/logger/__init__.py +++ b/esphome/components/logger/__init__.py @@ -310,6 +310,10 @@ async def to_code(config): if task_log_buffer_size > 0: cg.add_define("USE_ESPHOME_TASK_LOG_BUFFER") cg.add(log.init_log_buffer(task_log_buffer_size)) + elif CORE.is_host: + cg.add(log.create_pthread_key()) + cg.add_define("USE_ESPHOME_TASK_LOG_BUFFER") + cg.add(log.init_log_buffer(64)) # Fixed 64 slots for host cg.add(log.set_log_level(initial_level)) if CONF_HARDWARE_UART in config: @@ -520,10 +524,11 @@ FILTER_SOURCE_FILES = filter_source_files_from_platform( PlatformFramework.LN882X_ARDUINO, }, "logger_zephyr.cpp": {PlatformFramework.NRF52_ZEPHYR}, - "task_log_buffer.cpp": { + "task_log_buffer_esp32.cpp": { PlatformFramework.ESP32_ARDUINO, PlatformFramework.ESP32_IDF, }, + "task_log_buffer_host.cpp": {PlatformFramework.HOST_NATIVE}, } ) diff --git a/esphome/components/logger/logger.cpp b/esphome/components/logger/logger.cpp index 474eb9ec38..e633f9fd7d 100644 --- a/esphome/components/logger/logger.cpp +++ b/esphome/components/logger/logger.cpp @@ -12,14 +12,14 @@ namespace esphome::logger { static const char *const TAG = "logger"; -#ifdef USE_ESP32 -// Implementation for ESP32 (multi-task platform with task-specific tracking) -// Main task always uses direct buffer access for console output and callbacks +#if defined(USE_ESP32) || defined(USE_HOST) +// Implementation for multi-threaded platforms (ESP32 with FreeRTOS, Host with pthreads) +// Main thread/task always uses direct buffer access for console output and callbacks // -// For non-main tasks: +// For non-main threads/tasks: // - WITH task log buffer: Prefer sending to ring buffer for async processing // - Avoids allocating stack memory for console output in normal operation -// - Prevents console corruption from concurrent writes by multiple tasks +// - Prevents console corruption from concurrent writes by multiple threads // - Messages are serialized through main loop for proper console output // - Fallback to emergency console logging only if ring buffer is full // - WITHOUT task log buffer: Only emergency console output, no callbacks @@ -27,15 +27,20 @@ void HOT Logger::log_vprintf_(uint8_t level, const char *tag, int line, const ch if (level > this->level_for(tag)) return; +#ifdef USE_ESP32 TaskHandle_t current_task = xTaskGetCurrentTaskHandle(); bool is_main_task = (current_task == main_task_); +#else // USE_HOST + pthread_t current_thread = pthread_self(); + bool is_main_task = pthread_equal(current_thread, main_thread_); +#endif - // Check and set recursion guard - uses pthread TLS for per-task state + // Check and set recursion guard - uses pthread TLS for per-thread/task state if (this->check_and_set_task_log_recursion_(is_main_task)) { return; // Recursion detected } - // Main task uses the shared buffer for efficiency + // Main thread/task uses the shared buffer for efficiency if (is_main_task) { this->log_message_to_buffer_and_send_(level, tag, line, format, args); this->reset_task_log_recursion_(is_main_task); @@ -44,9 +49,13 @@ void HOT Logger::log_vprintf_(uint8_t level, const char *tag, int line, const ch bool message_sent = false; #ifdef USE_ESPHOME_TASK_LOG_BUFFER - // For non-main tasks, queue the message for callbacks - but only if we have any callbacks registered + // For non-main threads/tasks, queue the message for callbacks +#ifdef USE_ESP32 message_sent = this->log_buffer_->send_message_thread_safe(level, tag, static_cast(line), current_task, format, args); +#else // USE_HOST + message_sent = this->log_buffer_->send_message_thread_safe(level, tag, static_cast(line), format, args); +#endif if (message_sent) { // Enable logger loop to process the buffered message // This is safe to call from any context including ISRs @@ -54,13 +63,19 @@ void HOT Logger::log_vprintf_(uint8_t level, const char *tag, int line, const ch } #endif // USE_ESPHOME_TASK_LOG_BUFFER - // Emergency console logging for non-main tasks when ring buffer is full or disabled + // Emergency console logging for non-main threads when ring buffer is full or disabled // This is a fallback mechanism to ensure critical log messages are visible - // Note: This may cause interleaved/corrupted console output if multiple tasks + // Note: This may cause interleaved/corrupted console output if multiple threads // log simultaneously, but it's better than losing important messages entirely +#ifdef USE_HOST + if (!message_sent) { + // Host always has console output - no baud_rate check needed + static const size_t MAX_CONSOLE_LOG_MSG_SIZE = 512; +#else if (!message_sent && this->baud_rate_ > 0) { // If logging is enabled, write to console // Maximum size for console log messages (includes null terminator) static const size_t MAX_CONSOLE_LOG_MSG_SIZE = 144; +#endif char console_buffer[MAX_CONSOLE_LOG_MSG_SIZE]; // MUST be stack allocated for thread safety uint16_t buffer_at = 0; // Initialize buffer position this->format_log_to_buffer_with_terminator_(level, tag, line, format, args, console_buffer, &buffer_at, @@ -70,7 +85,7 @@ void HOT Logger::log_vprintf_(uint8_t level, const char *tag, int line, const ch this->write_msg_(console_buffer, buffer_at); } - // Reset the recursion guard for this task + // Reset the recursion guard for this thread/task this->reset_task_log_recursion_(is_main_task); } #else @@ -86,7 +101,7 @@ void HOT Logger::log_vprintf_(uint8_t level, const char *tag, int line, const ch global_recursion_guard_ = false; } -#endif // !USE_ESP32 +#endif // USE_ESP32 / USE_HOST #ifdef USE_STORE_LOG_STR_IN_FLASH // Implementation for ESP8266 with flash string support. @@ -167,15 +182,24 @@ Logger::Logger(uint32_t baud_rate, size_t tx_buffer_size) : baud_rate_(baud_rate this->main_task_ = xTaskGetCurrentTaskHandle(); #elif defined(USE_ZEPHYR) this->main_task_ = k_current_get(); +#elif defined(USE_HOST) + this->main_thread_ = pthread_self(); #endif } #ifdef USE_ESPHOME_TASK_LOG_BUFFER void Logger::init_log_buffer(size_t total_buffer_size) { +#ifdef USE_HOST + // Host uses slot count instead of byte size + this->log_buffer_ = esphome::make_unique(total_buffer_size); +#else this->log_buffer_ = esphome::make_unique(total_buffer_size); +#endif +#ifdef USE_ESP32 // Start with loop disabled when using task buffer (unless using USB CDC) // The loop will be enabled automatically when messages arrive this->disable_loop_when_buffer_empty_(); +#endif } #endif @@ -187,41 +211,37 @@ void Logger::process_messages_() { #ifdef USE_ESPHOME_TASK_LOG_BUFFER // Process any buffered messages when available if (this->log_buffer_->has_messages()) { +#ifdef USE_HOST + logger::TaskLogBufferHost::LogMessage *message; + while (this->log_buffer_->get_message_main_loop(&message)) { + const char *thread_name = message->thread_name[0] != '\0' ? message->thread_name : nullptr; + this->format_buffered_message_and_notify_(message->level, message->tag, message->line, thread_name, message->text, + message->text_length); + this->log_buffer_->release_message_main_loop(); + this->write_tx_buffer_to_console_(); + } +#else // USE_ESP32 logger::TaskLogBuffer::LogMessage *message; const char *text; void *received_token; - - // Process messages from the buffer while (this->log_buffer_->borrow_message_main_loop(&message, &text, &received_token)) { - this->tx_buffer_at_ = 0; - // Use the thread name that was stored when the message was created - // This avoids potential crashes if the task no longer exists const char *thread_name = message->thread_name[0] != '\0' ? message->thread_name : nullptr; - this->write_header_to_buffer_(message->level, message->tag, message->line, thread_name, this->tx_buffer_, - &this->tx_buffer_at_, this->tx_buffer_size_); - this->write_body_to_buffer_(text, message->text_length, this->tx_buffer_, &this->tx_buffer_at_, - this->tx_buffer_size_); - this->write_footer_to_buffer_(this->tx_buffer_, &this->tx_buffer_at_, this->tx_buffer_size_); - this->tx_buffer_[this->tx_buffer_at_] = '\0'; - size_t msg_len = this->tx_buffer_at_; // We already know the length from tx_buffer_at_ - for (auto *listener : this->log_listeners_) - listener->on_log(message->level, message->tag, this->tx_buffer_, msg_len); - // At this point all the data we need from message has been transferred to the tx_buffer - // so we can release the message to allow other tasks to use it as soon as possible. + this->format_buffered_message_and_notify_(message->level, message->tag, message->line, thread_name, text, + message->text_length); + // Release the message to allow other tasks to use it as soon as possible this->log_buffer_->release_message_main_loop(received_token); - - // Write to console from the main loop to prevent corruption from concurrent writes - // This ensures all log messages appear on the console in a clean, serialized manner - // Note: Messages may appear slightly out of order due to async processing, but - // this is preferred over corrupted/interleaved console output this->write_tx_buffer_to_console_(); } - } else { +#endif + } +#ifdef USE_ESP32 + else { // No messages to process, disable loop if appropriate // This reduces overhead when there's no async logging activity this->disable_loop_when_buffer_empty_(); } #endif +#endif // USE_ESPHOME_TASK_LOG_BUFFER } void Logger::set_baud_rate(uint32_t baud_rate) { this->baud_rate_ = baud_rate; } @@ -271,7 +291,11 @@ void Logger::dump_config() { #endif #ifdef USE_ESPHOME_TASK_LOG_BUFFER if (this->log_buffer_) { - ESP_LOGCONFIG(TAG, " Task Log Buffer Size: %u", this->log_buffer_->size()); +#ifdef USE_HOST + ESP_LOGCONFIG(TAG, " Task Log Buffer Slots: %u", static_cast(this->log_buffer_->size())); +#else + ESP_LOGCONFIG(TAG, " Task Log Buffer Size: %u bytes", static_cast(this->log_buffer_->size())); +#endif } #endif diff --git a/esphome/components/logger/logger.h b/esphome/components/logger/logger.h index ba8d4667b6..86d2943135 100644 --- a/esphome/components/logger/logger.h +++ b/esphome/components/logger/logger.h @@ -2,7 +2,7 @@ #include #include -#ifdef USE_ESP32 +#if defined(USE_ESP32) || defined(USE_HOST) #include #endif #include "esphome/core/automation.h" @@ -12,7 +12,11 @@ #include "esphome/core/log.h" #ifdef USE_ESPHOME_TASK_LOG_BUFFER -#include "task_log_buffer.h" +#ifdef USE_HOST +#include "task_log_buffer_host.h" +#elif defined(USE_ESP32) +#include "task_log_buffer_esp32.h" +#endif #endif #ifdef USE_ARDUINO @@ -181,6 +185,9 @@ class Logger : public Component { uart_port_t get_uart_num() const { return uart_num_; } void create_pthread_key() { pthread_key_create(&log_recursion_key_, nullptr); } #endif +#ifdef USE_HOST + void create_pthread_key() { pthread_key_create(&log_recursion_key_, nullptr); } +#endif #if defined(USE_ESP32) || defined(USE_ESP8266) || defined(USE_RP2040) || defined(USE_LIBRETINY) || defined(USE_ZEPHYR) void set_uart_selection(UARTSelection uart_selection) { uart_ = uart_selection; } /// Get the UART used by the logger. @@ -228,7 +235,7 @@ class Logger : public Component { inline void HOT format_log_to_buffer_with_terminator_(uint8_t level, const char *tag, int line, const char *format, va_list args, char *buffer, uint16_t *buffer_at, uint16_t buffer_size) { -#if defined(USE_ESP32) || defined(USE_LIBRETINY) +#if defined(USE_ESP32) || defined(USE_LIBRETINY) || defined(USE_HOST) this->write_header_to_buffer_(level, tag, line, this->get_thread_name_(), buffer, buffer_at, buffer_size); #elif defined(USE_ZEPHYR) char buff[MAX_POINTER_REPRESENTATION]; @@ -291,6 +298,22 @@ class Logger : public Component { this->write_tx_buffer_to_console_(); } +#ifdef USE_ESPHOME_TASK_LOG_BUFFER + // Helper to format a pre-formatted message from the task log buffer and notify listeners + // Used by process_messages_ to avoid code duplication between ESP32 and host platforms + inline void HOT format_buffered_message_and_notify_(uint8_t level, const char *tag, uint16_t line, + const char *thread_name, const char *text, size_t text_length) { + this->tx_buffer_at_ = 0; + this->write_header_to_buffer_(level, tag, line, thread_name, this->tx_buffer_, &this->tx_buffer_at_, + this->tx_buffer_size_); + this->write_body_to_buffer_(text, text_length, this->tx_buffer_, &this->tx_buffer_at_, this->tx_buffer_size_); + this->write_footer_to_buffer_(this->tx_buffer_, &this->tx_buffer_at_, this->tx_buffer_size_); + this->tx_buffer_[this->tx_buffer_at_] = '\0'; + for (auto *listener : this->log_listeners_) + listener->on_log(level, tag, this->tx_buffer_, this->tx_buffer_at_); + } +#endif + // Write the body of the log message to the buffer inline void write_body_to_buffer_(const char *value, size_t length, char *buffer, uint16_t *buffer_at, uint16_t buffer_size) { @@ -325,6 +348,9 @@ class Logger : public Component { #if defined(USE_ESP32) || defined(USE_LIBRETINY) || defined(USE_ZEPHYR) void *main_task_ = nullptr; // Only used for thread name identification #endif +#ifdef USE_HOST + pthread_t main_thread_{}; // Main thread for identification +#endif #ifdef USE_ESP32 // Task-specific recursion guards: // - Main task uses a dedicated member variable for efficiency @@ -332,6 +358,10 @@ class Logger : public Component { pthread_key_t log_recursion_key_; // 4 bytes uart_port_t uart_num_; // 4 bytes (enum defaults to int size) #endif +#ifdef USE_HOST + // Thread-specific recursion guards using pthread TLS + pthread_key_t log_recursion_key_; +#endif // Large objects (internally aligned) #ifdef USE_LOGGER_RUNTIME_TAG_LEVELS @@ -342,7 +372,11 @@ class Logger : public Component { std::vector level_listeners_; // Log level change listeners #endif #ifdef USE_ESPHOME_TASK_LOG_BUFFER +#ifdef USE_HOST + std::unique_ptr log_buffer_; // Will be initialized with init_log_buffer +#elif defined(USE_ESP32) std::unique_ptr log_buffer_; // Will be initialized with init_log_buffer +#endif #endif // Group smaller types together at the end @@ -355,7 +389,7 @@ class Logger : public Component { #ifdef USE_LIBRETINY UARTSelection uart_{UART_SELECTION_DEFAULT}; #endif -#ifdef USE_ESP32 +#if defined(USE_ESP32) || defined(USE_HOST) bool main_task_recursion_guard_{false}; #else bool global_recursion_guard_{false}; // Simple global recursion guard for single-task platforms @@ -392,7 +426,7 @@ class Logger : public Component { } #endif -#ifdef USE_ESP32 +#if defined(USE_ESP32) || defined(USE_HOST) inline bool HOT check_and_set_task_log_recursion_(bool is_main_task) { if (is_main_task) { const bool was_recursive = main_task_recursion_guard_; @@ -418,6 +452,22 @@ class Logger : public Component { } #endif +#ifdef USE_HOST + const char *HOT get_thread_name_() { + pthread_t current_thread = pthread_self(); + if (pthread_equal(current_thread, main_thread_)) { + return nullptr; // Main thread + } + // For non-main threads, return the thread name + // We store it in thread-local storage to avoid allocation + static thread_local char thread_name_buf[32]; + if (pthread_getname_np(current_thread, thread_name_buf, sizeof(thread_name_buf)) == 0) { + return thread_name_buf; + } + return nullptr; + } +#endif + static inline void copy_string(char *buffer, uint16_t &pos, const char *str) { const size_t len = strlen(str); // Intentionally no null terminator, building larger string @@ -475,7 +525,7 @@ class Logger : public Component { buffer[pos++] = '0' + (remainder - tens * 10); buffer[pos++] = ']'; -#if defined(USE_ESP32) || defined(USE_LIBRETINY) || defined(USE_ZEPHYR) +#if defined(USE_ESP32) || defined(USE_LIBRETINY) || defined(USE_ZEPHYR) || defined(USE_HOST) if (thread_name != nullptr) { write_ansi_color_for_level(buffer, pos, 1); // Always use bold red for thread name buffer[pos++] = '['; diff --git a/esphome/components/logger/logger_host.cpp b/esphome/components/logger/logger_host.cpp index cbca06e431..874cdabd22 100644 --- a/esphome/components/logger/logger_host.cpp +++ b/esphome/components/logger/logger_host.cpp @@ -10,8 +10,9 @@ void HOT Logger::write_msg_(const char *msg, size_t len) { time_t rawtime; time(&rawtime); - struct tm *timeinfo = localtime(&rawtime); - size_t pos = strftime(buffer, TIMESTAMP_LEN + 1, "[%H:%M:%S]", timeinfo); + struct tm timeinfo; + localtime_r(&rawtime, &timeinfo); // Thread-safe version + size_t pos = strftime(buffer, TIMESTAMP_LEN + 1, "[%H:%M:%S]", &timeinfo); // Copy message (with newline already included by caller) size_t copy_len = std::min(len, sizeof(buffer) - pos); diff --git a/esphome/components/logger/task_log_buffer.cpp b/esphome/components/logger/task_log_buffer_esp32.cpp similarity index 98% rename from esphome/components/logger/task_log_buffer.cpp rename to esphome/components/logger/task_log_buffer_esp32.cpp index b5dd9f0239..b9dfe45b7f 100644 --- a/esphome/components/logger/task_log_buffer.cpp +++ b/esphome/components/logger/task_log_buffer_esp32.cpp @@ -1,5 +1,6 @@ +#ifdef USE_ESP32 -#include "task_log_buffer.h" +#include "task_log_buffer_esp32.h" #include "esphome/core/helpers.h" #include "esphome/core/log.h" @@ -134,3 +135,4 @@ bool TaskLogBuffer::send_message_thread_safe(uint8_t level, const char *tag, uin } // namespace esphome::logger #endif // USE_ESPHOME_TASK_LOG_BUFFER +#endif // USE_ESP32 diff --git a/esphome/components/logger/task_log_buffer.h b/esphome/components/logger/task_log_buffer_esp32.h similarity index 77% rename from esphome/components/logger/task_log_buffer.h rename to esphome/components/logger/task_log_buffer_esp32.h index fdda07190d..fde9bd60d5 100644 --- a/esphome/components/logger/task_log_buffer.h +++ b/esphome/components/logger/task_log_buffer_esp32.h @@ -1,5 +1,7 @@ #pragma once +#ifdef USE_ESP32 + #include "esphome/core/defines.h" #include "esphome/core/helpers.h" @@ -13,6 +15,22 @@ namespace esphome::logger { +/** + * @brief Task log buffer for ESP32 platform using FreeRTOS ring buffer. + * + * Threading Model: Multi-Producer Single-Consumer (MPSC) + * - Multiple FreeRTOS tasks can safely call send_message_thread_safe() concurrently + * - Only the main loop task calls borrow_message_main_loop() and release_message_main_loop() + * + * This uses the FreeRTOS ring buffer (RINGBUF_TYPE_NOSPLIT) which provides + * built-in thread-safety for the MPSC pattern. The ring buffer ensures + * message integrity - each message is stored contiguously. + * + * Design: + * - Variable-size messages with header + text stored contiguously + * - FreeRTOS ring buffer handles synchronization internally + * - Atomic counter for fast has_messages() check without ring buffer lock + */ class TaskLogBuffer { public: // Structure for a log message header (text data follows immediately after) @@ -65,3 +83,4 @@ class TaskLogBuffer { } // namespace esphome::logger #endif // USE_ESPHOME_TASK_LOG_BUFFER +#endif // USE_ESP32 diff --git a/esphome/components/logger/task_log_buffer_host.cpp b/esphome/components/logger/task_log_buffer_host.cpp new file mode 100644 index 0000000000..0660aeb061 --- /dev/null +++ b/esphome/components/logger/task_log_buffer_host.cpp @@ -0,0 +1,157 @@ +#ifdef USE_HOST + +#include "task_log_buffer_host.h" + +#ifdef USE_ESPHOME_TASK_LOG_BUFFER + +#include "esphome/core/log.h" +#include +#include + +namespace esphome::logger { + +TaskLogBufferHost::TaskLogBufferHost(size_t slot_count) : slot_count_(slot_count) { + // Allocate message slots + this->slots_ = std::make_unique(slot_count); +} + +TaskLogBufferHost::~TaskLogBufferHost() { + // unique_ptr handles cleanup automatically +} + +int TaskLogBufferHost::acquire_write_slot_() { + // Try to reserve a slot using compare-and-swap + size_t current_reserve = this->reserve_index_.load(std::memory_order_relaxed); + + while (true) { + // Calculate next index (with wrap-around) + size_t next_reserve = (current_reserve + 1) % this->slot_count_; + + // Check if buffer would be full + // Buffer is full when next write position equals read position + size_t current_read = this->read_index_.load(std::memory_order_acquire); + if (next_reserve == current_read) { + return -1; // Buffer full + } + + // Try to claim this slot + if (this->reserve_index_.compare_exchange_weak(current_reserve, next_reserve, std::memory_order_acq_rel, + std::memory_order_relaxed)) { + return static_cast(current_reserve); + } + // If CAS failed, current_reserve was updated, retry with new value + } +} + +void TaskLogBufferHost::commit_write_slot_(int slot_index) { + // Mark the slot as ready for reading + this->slots_[slot_index].ready.store(true, std::memory_order_release); + + // Try to advance the write_index if we're the next expected commit + // This ensures messages are read in order + size_t expected = slot_index; + size_t next = (slot_index + 1) % this->slot_count_; + + // We only advance write_index if this slot is the next one expected + // This handles out-of-order commits correctly + while (true) { + if (!this->write_index_.compare_exchange_weak(expected, next, std::memory_order_release, + std::memory_order_relaxed)) { + // Someone else advanced it or we're not next in line, that's fine + break; + } + + // Successfully advanced, check if next slot is also ready + expected = next; + next = (next + 1) % this->slot_count_; + if (!this->slots_[expected].ready.load(std::memory_order_acquire)) { + break; + } + } +} + +bool TaskLogBufferHost::send_message_thread_safe(uint8_t level, const char *tag, uint16_t line, const char *format, + va_list args) { + // Acquire a slot + int slot_index = this->acquire_write_slot_(); + if (slot_index < 0) { + return false; // Buffer full + } + + LogMessage &msg = this->slots_[slot_index]; + + // Fill in the message header + msg.level = level; + msg.tag = tag; + msg.line = line; + + // Get thread name using pthread + char thread_name_buf[LogMessage::MAX_THREAD_NAME_SIZE]; + // pthread_getname_np works the same on Linux and macOS + if (pthread_getname_np(pthread_self(), thread_name_buf, sizeof(thread_name_buf)) == 0) { + strncpy(msg.thread_name, thread_name_buf, sizeof(msg.thread_name) - 1); + msg.thread_name[sizeof(msg.thread_name) - 1] = '\0'; + } else { + msg.thread_name[0] = '\0'; + } + + // Format the message text + int ret = vsnprintf(msg.text, sizeof(msg.text), format, args); + if (ret < 0) { + // Formatting error - still commit the slot but with empty text + msg.text[0] = '\0'; + msg.text_length = 0; + } else { + msg.text_length = static_cast(std::min(static_cast(ret), sizeof(msg.text) - 1)); + } + + // Remove trailing newlines + while (msg.text_length > 0 && msg.text[msg.text_length - 1] == '\n') { + msg.text_length--; + } + msg.text[msg.text_length] = '\0'; + + // Commit the slot + this->commit_write_slot_(slot_index); + + return true; +} + +bool TaskLogBufferHost::get_message_main_loop(LogMessage **message) { + if (message == nullptr) { + return false; + } + + size_t current_read = this->read_index_.load(std::memory_order_relaxed); + size_t current_write = this->write_index_.load(std::memory_order_acquire); + + // Check if buffer is empty + if (current_read == current_write) { + return false; + } + + // Check if the slot is ready (should always be true if write_index advanced) + LogMessage &msg = this->slots_[current_read]; + if (!msg.ready.load(std::memory_order_acquire)) { + return false; + } + + *message = &msg; + return true; +} + +void TaskLogBufferHost::release_message_main_loop() { + size_t current_read = this->read_index_.load(std::memory_order_relaxed); + + // Clear the ready flag + this->slots_[current_read].ready.store(false, std::memory_order_release); + + // Advance read index + size_t next_read = (current_read + 1) % this->slot_count_; + this->read_index_.store(next_read, std::memory_order_release); +} + +} // namespace esphome::logger + +#endif // USE_ESPHOME_TASK_LOG_BUFFER +#endif // USE_HOST diff --git a/esphome/components/logger/task_log_buffer_host.h b/esphome/components/logger/task_log_buffer_host.h new file mode 100644 index 0000000000..d421d50ec6 --- /dev/null +++ b/esphome/components/logger/task_log_buffer_host.h @@ -0,0 +1,122 @@ +#pragma once + +#ifdef USE_HOST + +#include "esphome/core/defines.h" +#include "esphome/core/helpers.h" + +#ifdef USE_ESPHOME_TASK_LOG_BUFFER + +#include +#include +#include +#include +#include +#include + +namespace esphome::logger { + +/** + * @brief Lock-free task log buffer for host platform. + * + * Threading Model: Multi-Producer Single-Consumer (MPSC) + * - Multiple threads can safely call send_message_thread_safe() concurrently + * - Only the main loop thread calls get_message_main_loop() and release_message_main_loop() + * + * Producers (multiple threads) Consumer (main loop only) + * │ │ + * ▼ ▼ + * acquire_write_slot_() get_message_main_loop() + * CAS on reserve_index_ read write_index_ + * │ check ready flag + * ▼ │ + * write to slot (exclusive) ▼ + * │ read slot data + * ▼ │ + * commit_write_slot_() ▼ + * set ready=true release_message_main_loop() + * advance write_index_ set ready=false + * advance read_index_ + * + * This implements a lock-free ring buffer for log messages on the host platform. + * It uses atomic compare-and-swap (CAS) operations for thread-safe slot reservation + * without requiring mutexes in the hot path. + * + * Design: + * - Fixed number of pre-allocated message slots to avoid dynamic allocation + * - Each slot contains a header and fixed-size text buffer + * - Atomic CAS for slot reservation allows multiple producers without locks + * - Single consumer (main loop) processes messages in order + */ +class TaskLogBufferHost { + public: + // Default number of message slots - host has plenty of memory + static constexpr size_t DEFAULT_SLOT_COUNT = 64; + + // Structure for a log message (fixed size for lock-free operation) + struct LogMessage { + // Size constants + static constexpr size_t MAX_THREAD_NAME_SIZE = 32; + static constexpr size_t MAX_TEXT_SIZE = 512; + + const char *tag; // Pointer to static tag string + char thread_name[MAX_THREAD_NAME_SIZE]; // Thread name (copied) + char text[MAX_TEXT_SIZE + 1]; // Message text with null terminator + uint16_t text_length; // Actual length of text + uint16_t line; // Source line number + uint8_t level; // Log level + std::atomic ready; // Message is ready to be consumed + + LogMessage() : tag(nullptr), text_length(0), line(0), level(0), ready(false) { + thread_name[0] = '\0'; + text[0] = '\0'; + } + }; + + /// Constructor that takes the number of message slots + explicit TaskLogBufferHost(size_t slot_count); + ~TaskLogBufferHost(); + + // NOT thread-safe - get next message from buffer, only call from main loop + // Returns true if a message was retrieved, false if buffer is empty + bool get_message_main_loop(LogMessage **message); + + // NOT thread-safe - release the message after processing, only call from main loop + void release_message_main_loop(); + + // Thread-safe - send a message to the buffer from any thread + // Returns true if message was queued, false if buffer is full + bool send_message_thread_safe(uint8_t level, const char *tag, uint16_t line, const char *format, va_list args); + + // Check if there are messages ready to be processed + inline bool HOT has_messages() const { + return read_index_.load(std::memory_order_acquire) != write_index_.load(std::memory_order_acquire); + } + + // Get the buffer size (number of slots) + inline size_t size() const { return slot_count_; } + + private: + // Acquire a slot for writing (thread-safe) + // Returns slot index or -1 if buffer is full + int acquire_write_slot_(); + + // Commit a slot after writing (thread-safe) + void commit_write_slot_(int slot_index); + + std::unique_ptr slots_; // Pre-allocated message slots + size_t slot_count_; // Number of slots + + // Lock-free indices using atomics + // - reserve_index_: Next slot to reserve (producers CAS this to claim slots) + // - write_index_: Boundary of committed/ready slots (consumer reads up to this) + // - read_index_: Next slot to read (only consumer modifies this) + std::atomic reserve_index_{0}; // Next slot to reserve for writing + std::atomic write_index_{0}; // Last committed slot boundary + std::atomic read_index_{0}; // Next slot to read from +}; + +} // namespace esphome::logger + +#endif // USE_ESPHOME_TASK_LOG_BUFFER +#endif // USE_HOST diff --git a/tests/integration/fixtures/host_logger_thread_safety.yaml b/tests/integration/fixtures/host_logger_thread_safety.yaml new file mode 100644 index 0000000000..e44a217b2b --- /dev/null +++ b/tests/integration/fixtures/host_logger_thread_safety.yaml @@ -0,0 +1,91 @@ +esphome: + name: host-logger-thread-test +host: +api: +logger: + +button: + - platform: template + name: "Start Thread Race Test" + id: start_test_button + on_press: + - lambda: |- + // Number of threads and messages per thread + static const int NUM_THREADS = 3; + static const int MESSAGES_PER_THREAD = 100; + + // Counters + static std::atomic total_messages_logged{0}; + + // Thread function - must be a regular function pointer for pthread + struct ThreadTest { + static void *thread_func(void *arg) { + int thread_id = *static_cast(arg); + + // Set thread name (different signatures on macOS vs Linux) + char thread_name[16]; + snprintf(thread_name, sizeof(thread_name), "LogThread%d", thread_id); + #ifdef __APPLE__ + pthread_setname_np(thread_name); + #else + pthread_setname_np(pthread_self(), thread_name); + #endif + + // Log messages with different log levels + for (int i = 0; i < MESSAGES_PER_THREAD; i++) { + switch (i % 4) { + case 0: + ESP_LOGI("thread_test", "THREAD%d_MSG%03d_INFO_MESSAGE_WITH_DATA_%08X", + thread_id, i, i * 12345); + break; + case 1: + ESP_LOGD("thread_test", "THREAD%d_MSG%03d_DEBUG_MESSAGE_WITH_DATA_%08X", + thread_id, i, i * 12345); + break; + case 2: + ESP_LOGW("thread_test", "THREAD%d_MSG%03d_WARN_MESSAGE_WITH_DATA_%08X", + thread_id, i, i * 12345); + break; + case 3: + ESP_LOGE("thread_test", "THREAD%d_MSG%03d_ERROR_MESSAGE_WITH_DATA_%08X", + thread_id, i, i * 12345); + break; + } + total_messages_logged.fetch_add(1, std::memory_order_relaxed); + + // Small busy loop to vary timing between threads + int delay_count = (thread_id + 1) * 10; + while (delay_count-- > 0) { + asm volatile("" ::: "memory"); // Prevent optimization + } + } + return nullptr; + } + }; + + ESP_LOGI("thread_test", "RACE_TEST_START: Starting %d threads with %d messages each", + NUM_THREADS, MESSAGES_PER_THREAD); + + // Reset counter for this test run + total_messages_logged.store(0, std::memory_order_relaxed); + + pthread_t threads[NUM_THREADS]; + int thread_ids[NUM_THREADS]; + + // Create all threads + for (int i = 0; i < NUM_THREADS; i++) { + thread_ids[i] = i; + int ret = pthread_create(&threads[i], nullptr, ThreadTest::thread_func, &thread_ids[i]); + if (ret != 0) { + ESP_LOGE("thread_test", "RACE_TEST_ERROR: Failed to create thread %d", i); + return; + } + } + + // Wait for all threads to complete + for (int i = 0; i < NUM_THREADS; i++) { + pthread_join(threads[i], nullptr); + } + + ESP_LOGI("thread_test", "RACE_TEST_COMPLETE: All threads finished, total messages: %d", + total_messages_logged.load(std::memory_order_relaxed)); diff --git a/tests/integration/test_host_logger_thread_safety.py b/tests/integration/test_host_logger_thread_safety.py new file mode 100644 index 0000000000..922ce00155 --- /dev/null +++ b/tests/integration/test_host_logger_thread_safety.py @@ -0,0 +1,182 @@ +"""Integration test for host logger thread safety. + +This test verifies that the logger's MPSC ring buffer correctly handles +multiple threads racing to log messages without corruption or data loss. +""" + +from __future__ import annotations + +import asyncio +import re + +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + +# Expected pattern for log messages from threads +# Format: THREADn_MSGnnn_LEVEL_MESSAGE_WITH_DATA_xxxxxxxx +THREAD_MSG_PATTERN = re.compile( + r"THREAD(\d+)_MSG(\d{3})_(INFO|DEBUG|WARN|ERROR)_MESSAGE_WITH_DATA_([0-9A-F]{8})" +) + +# Pattern for test start/complete markers +TEST_START_PATTERN = re.compile(r"RACE_TEST_START.*Starting (\d+) threads") +TEST_COMPLETE_PATTERN = re.compile(r"RACE_TEST_COMPLETE.*total messages: (\d+)") + +# Expected values +NUM_THREADS = 3 +MESSAGES_PER_THREAD = 100 +EXPECTED_TOTAL_MESSAGES = NUM_THREADS * MESSAGES_PER_THREAD + + +@pytest.mark.asyncio +async def test_host_logger_thread_safety( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that multiple threads can log concurrently without corruption. + + This test: + 1. Spawns 3 threads that each log 100 messages + 2. Collects all log output + 3. Verifies no lines are corrupted (partially written or interleaved) + 4. Verifies all expected messages were received + """ + collected_lines: list[str] = [] + test_complete_event = asyncio.Event() + + def line_callback(line: str) -> None: + """Collect log lines and detect test completion.""" + collected_lines.append(line) + if "RACE_TEST_COMPLETE" in line: + test_complete_event.set() + + # Run the test binary and collect output + async with ( + run_compiled(yaml_config, line_callback=line_callback), + api_client_connected() as client, + ): + # Verify connection works + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "host-logger-thread-test" + + # Get the button entity - find by name + entities, _ = await client.list_entities_services() + button_entities = [e for e in entities if e.name == "Start Thread Race Test"] + assert button_entities, "Could not find Start Thread Race Test button" + button_key = button_entities[0].key + + # Press the button to start the thread race test + client.button_command(button_key) + + # Wait for test to complete (with timeout) + try: + await asyncio.wait_for(test_complete_event.wait(), timeout=30.0) + except TimeoutError: + pytest.fail( + "Test did not complete within timeout. " + f"Collected {len(collected_lines)} lines." + ) + + # Give a bit more time for any remaining buffered messages + await asyncio.sleep(0.5) + + # Analyze collected log lines + thread_messages: dict[int, set[int]] = {i: set() for i in range(NUM_THREADS)} + corrupted_lines: list[str] = [] + test_started = False + test_completed = False + reported_total = 0 + + for line in collected_lines: + # Check for test start + start_match = TEST_START_PATTERN.search(line) + if start_match: + test_started = True + assert int(start_match.group(1)) == NUM_THREADS, ( + f"Unexpected thread count: {start_match.group(1)}" + ) + continue + + # Check for test completion + complete_match = TEST_COMPLETE_PATTERN.search(line) + if complete_match: + test_completed = True + reported_total = int(complete_match.group(1)) + continue + + # Check for thread messages + msg_match = THREAD_MSG_PATTERN.search(line) + if msg_match: + thread_id = int(msg_match.group(1)) + msg_num = int(msg_match.group(2)) + # level = msg_match.group(3) # INFO, DEBUG, WARN, ERROR + data_hex = msg_match.group(4) + + # Verify data value matches expected calculation + expected_data = f"{msg_num * 12345:08X}" + if data_hex != expected_data: + corrupted_lines.append( + f"Data mismatch in line: {line} " + f"(expected {expected_data}, got {data_hex})" + ) + continue + + # Track which messages we received from each thread + if 0 <= thread_id < NUM_THREADS: + thread_messages[thread_id].add(msg_num) + else: + corrupted_lines.append(f"Invalid thread ID in line: {line}") + continue + + # Check for partial/corrupted thread messages + # If a line contains part of a thread message pattern but doesn't match fully + # This could indicate line corruption from interleaving + if ( + "THREAD" in line + and "MSG" in line + and not msg_match + and "_MESSAGE_WITH_DATA_" in line + ): + corrupted_lines.append(f"Possibly corrupted line: {line}") + + # Assertions + assert test_started, "Test start marker not found in output" + assert test_completed, "Test completion marker not found in output" + assert reported_total == EXPECTED_TOTAL_MESSAGES, ( + f"Reported total {reported_total} != expected {EXPECTED_TOTAL_MESSAGES}" + ) + + # Check for corrupted lines + assert not corrupted_lines, ( + f"Found {len(corrupted_lines)} corrupted lines:\n" + + "\n".join(corrupted_lines[:10]) # Show first 10 + ) + + # Count total messages received + total_received = sum(len(msgs) for msgs in thread_messages.values()) + + # We may not receive all messages due to ring buffer overflow when buffer is full + # The test primarily verifies no corruption, not that we receive every message + # However, we should receive a reasonable number of messages + min_expected = EXPECTED_TOTAL_MESSAGES // 2 # At least 50% + assert total_received >= min_expected, ( + f"Received only {total_received} messages, expected at least {min_expected}. " + f"Per-thread breakdown: " + + ", ".join(f"Thread{i}: {len(msgs)}" for i, msgs in thread_messages.items()) + ) + + # Verify we got messages from all threads (proves concurrent logging worked) + for thread_id in range(NUM_THREADS): + assert thread_messages[thread_id], ( + f"No messages received from thread {thread_id}" + ) + + # Log summary for debugging + print("\nThread safety test summary:") + print(f" Total messages received: {total_received}/{EXPECTED_TOTAL_MESSAGES}") + for thread_id in range(NUM_THREADS): + received = len(thread_messages[thread_id]) + print(f" Thread {thread_id}: {received}/{MESSAGES_PER_THREAD} messages")