[logger] Add thread-safe logging for host platform (#13010)

This commit is contained in:
J. Nick Koston
2026-01-07 08:29:50 -10:00
committed by GitHub
parent d86d1f9f52
commit 25ac89e9b5
10 changed files with 698 additions and 45 deletions

View File

@@ -310,6 +310,10 @@ async def to_code(config):
if task_log_buffer_size > 0:
cg.add_define("USE_ESPHOME_TASK_LOG_BUFFER")
cg.add(log.init_log_buffer(task_log_buffer_size))
elif CORE.is_host:
cg.add(log.create_pthread_key())
cg.add_define("USE_ESPHOME_TASK_LOG_BUFFER")
cg.add(log.init_log_buffer(64)) # Fixed 64 slots for host
cg.add(log.set_log_level(initial_level))
if CONF_HARDWARE_UART in config:
@@ -520,10 +524,11 @@ FILTER_SOURCE_FILES = filter_source_files_from_platform(
PlatformFramework.LN882X_ARDUINO,
},
"logger_zephyr.cpp": {PlatformFramework.NRF52_ZEPHYR},
"task_log_buffer.cpp": {
"task_log_buffer_esp32.cpp": {
PlatformFramework.ESP32_ARDUINO,
PlatformFramework.ESP32_IDF,
},
"task_log_buffer_host.cpp": {PlatformFramework.HOST_NATIVE},
}
)

View File

@@ -12,14 +12,14 @@ namespace esphome::logger {
static const char *const TAG = "logger";
#ifdef USE_ESP32
// Implementation for ESP32 (multi-task platform with task-specific tracking)
// Main task always uses direct buffer access for console output and callbacks
#if defined(USE_ESP32) || defined(USE_HOST)
// Implementation for multi-threaded platforms (ESP32 with FreeRTOS, Host with pthreads)
// Main thread/task always uses direct buffer access for console output and callbacks
//
// For non-main tasks:
// For non-main threads/tasks:
// - WITH task log buffer: Prefer sending to ring buffer for async processing
// - Avoids allocating stack memory for console output in normal operation
// - Prevents console corruption from concurrent writes by multiple tasks
// - Prevents console corruption from concurrent writes by multiple threads
// - Messages are serialized through main loop for proper console output
// - Fallback to emergency console logging only if ring buffer is full
// - WITHOUT task log buffer: Only emergency console output, no callbacks
@@ -27,15 +27,20 @@ void HOT Logger::log_vprintf_(uint8_t level, const char *tag, int line, const ch
if (level > this->level_for(tag))
return;
#ifdef USE_ESP32
TaskHandle_t current_task = xTaskGetCurrentTaskHandle();
bool is_main_task = (current_task == main_task_);
#else // USE_HOST
pthread_t current_thread = pthread_self();
bool is_main_task = pthread_equal(current_thread, main_thread_);
#endif
// Check and set recursion guard - uses pthread TLS for per-task state
// Check and set recursion guard - uses pthread TLS for per-thread/task state
if (this->check_and_set_task_log_recursion_(is_main_task)) {
return; // Recursion detected
}
// Main task uses the shared buffer for efficiency
// Main thread/task uses the shared buffer for efficiency
if (is_main_task) {
this->log_message_to_buffer_and_send_(level, tag, line, format, args);
this->reset_task_log_recursion_(is_main_task);
@@ -44,9 +49,13 @@ void HOT Logger::log_vprintf_(uint8_t level, const char *tag, int line, const ch
bool message_sent = false;
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
// For non-main tasks, queue the message for callbacks - but only if we have any callbacks registered
// For non-main threads/tasks, queue the message for callbacks
#ifdef USE_ESP32
message_sent =
this->log_buffer_->send_message_thread_safe(level, tag, static_cast<uint16_t>(line), current_task, format, args);
#else // USE_HOST
message_sent = this->log_buffer_->send_message_thread_safe(level, tag, static_cast<uint16_t>(line), format, args);
#endif
if (message_sent) {
// Enable logger loop to process the buffered message
// This is safe to call from any context including ISRs
@@ -54,13 +63,19 @@ void HOT Logger::log_vprintf_(uint8_t level, const char *tag, int line, const ch
}
#endif // USE_ESPHOME_TASK_LOG_BUFFER
// Emergency console logging for non-main tasks when ring buffer is full or disabled
// Emergency console logging for non-main threads when ring buffer is full or disabled
// This is a fallback mechanism to ensure critical log messages are visible
// Note: This may cause interleaved/corrupted console output if multiple tasks
// Note: This may cause interleaved/corrupted console output if multiple threads
// log simultaneously, but it's better than losing important messages entirely
#ifdef USE_HOST
if (!message_sent) {
// Host always has console output - no baud_rate check needed
static const size_t MAX_CONSOLE_LOG_MSG_SIZE = 512;
#else
if (!message_sent && this->baud_rate_ > 0) { // If logging is enabled, write to console
// Maximum size for console log messages (includes null terminator)
static const size_t MAX_CONSOLE_LOG_MSG_SIZE = 144;
#endif
char console_buffer[MAX_CONSOLE_LOG_MSG_SIZE]; // MUST be stack allocated for thread safety
uint16_t buffer_at = 0; // Initialize buffer position
this->format_log_to_buffer_with_terminator_(level, tag, line, format, args, console_buffer, &buffer_at,
@@ -70,7 +85,7 @@ void HOT Logger::log_vprintf_(uint8_t level, const char *tag, int line, const ch
this->write_msg_(console_buffer, buffer_at);
}
// Reset the recursion guard for this task
// Reset the recursion guard for this thread/task
this->reset_task_log_recursion_(is_main_task);
}
#else
@@ -86,7 +101,7 @@ void HOT Logger::log_vprintf_(uint8_t level, const char *tag, int line, const ch
global_recursion_guard_ = false;
}
#endif // !USE_ESP32
#endif // USE_ESP32 / USE_HOST
#ifdef USE_STORE_LOG_STR_IN_FLASH
// Implementation for ESP8266 with flash string support.
@@ -167,15 +182,24 @@ Logger::Logger(uint32_t baud_rate, size_t tx_buffer_size) : baud_rate_(baud_rate
this->main_task_ = xTaskGetCurrentTaskHandle();
#elif defined(USE_ZEPHYR)
this->main_task_ = k_current_get();
#elif defined(USE_HOST)
this->main_thread_ = pthread_self();
#endif
}
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
void Logger::init_log_buffer(size_t total_buffer_size) {
#ifdef USE_HOST
// Host uses slot count instead of byte size
this->log_buffer_ = esphome::make_unique<logger::TaskLogBufferHost>(total_buffer_size);
#else
this->log_buffer_ = esphome::make_unique<logger::TaskLogBuffer>(total_buffer_size);
#endif
#ifdef USE_ESP32
// Start with loop disabled when using task buffer (unless using USB CDC)
// The loop will be enabled automatically when messages arrive
this->disable_loop_when_buffer_empty_();
#endif
}
#endif
@@ -187,41 +211,37 @@ void Logger::process_messages_() {
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
// Process any buffered messages when available
if (this->log_buffer_->has_messages()) {
#ifdef USE_HOST
logger::TaskLogBufferHost::LogMessage *message;
while (this->log_buffer_->get_message_main_loop(&message)) {
const char *thread_name = message->thread_name[0] != '\0' ? message->thread_name : nullptr;
this->format_buffered_message_and_notify_(message->level, message->tag, message->line, thread_name, message->text,
message->text_length);
this->log_buffer_->release_message_main_loop();
this->write_tx_buffer_to_console_();
}
#else // USE_ESP32
logger::TaskLogBuffer::LogMessage *message;
const char *text;
void *received_token;
// Process messages from the buffer
while (this->log_buffer_->borrow_message_main_loop(&message, &text, &received_token)) {
this->tx_buffer_at_ = 0;
// Use the thread name that was stored when the message was created
// This avoids potential crashes if the task no longer exists
const char *thread_name = message->thread_name[0] != '\0' ? message->thread_name : nullptr;
this->write_header_to_buffer_(message->level, message->tag, message->line, thread_name, this->tx_buffer_,
&this->tx_buffer_at_, this->tx_buffer_size_);
this->write_body_to_buffer_(text, message->text_length, this->tx_buffer_, &this->tx_buffer_at_,
this->tx_buffer_size_);
this->write_footer_to_buffer_(this->tx_buffer_, &this->tx_buffer_at_, this->tx_buffer_size_);
this->tx_buffer_[this->tx_buffer_at_] = '\0';
size_t msg_len = this->tx_buffer_at_; // We already know the length from tx_buffer_at_
for (auto *listener : this->log_listeners_)
listener->on_log(message->level, message->tag, this->tx_buffer_, msg_len);
// At this point all the data we need from message has been transferred to the tx_buffer
// so we can release the message to allow other tasks to use it as soon as possible.
this->format_buffered_message_and_notify_(message->level, message->tag, message->line, thread_name, text,
message->text_length);
// Release the message to allow other tasks to use it as soon as possible
this->log_buffer_->release_message_main_loop(received_token);
// Write to console from the main loop to prevent corruption from concurrent writes
// This ensures all log messages appear on the console in a clean, serialized manner
// Note: Messages may appear slightly out of order due to async processing, but
// this is preferred over corrupted/interleaved console output
this->write_tx_buffer_to_console_();
}
} else {
#endif
}
#ifdef USE_ESP32
else {
// No messages to process, disable loop if appropriate
// This reduces overhead when there's no async logging activity
this->disable_loop_when_buffer_empty_();
}
#endif
#endif // USE_ESPHOME_TASK_LOG_BUFFER
}
void Logger::set_baud_rate(uint32_t baud_rate) { this->baud_rate_ = baud_rate; }
@@ -271,7 +291,11 @@ void Logger::dump_config() {
#endif
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
if (this->log_buffer_) {
ESP_LOGCONFIG(TAG, " Task Log Buffer Size: %u", this->log_buffer_->size());
#ifdef USE_HOST
ESP_LOGCONFIG(TAG, " Task Log Buffer Slots: %u", static_cast<unsigned int>(this->log_buffer_->size()));
#else
ESP_LOGCONFIG(TAG, " Task Log Buffer Size: %u bytes", static_cast<unsigned int>(this->log_buffer_->size()));
#endif
}
#endif

View File

@@ -2,7 +2,7 @@
#include <cstdarg>
#include <map>
#ifdef USE_ESP32
#if defined(USE_ESP32) || defined(USE_HOST)
#include <pthread.h>
#endif
#include "esphome/core/automation.h"
@@ -12,7 +12,11 @@
#include "esphome/core/log.h"
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
#include "task_log_buffer.h"
#ifdef USE_HOST
#include "task_log_buffer_host.h"
#elif defined(USE_ESP32)
#include "task_log_buffer_esp32.h"
#endif
#endif
#ifdef USE_ARDUINO
@@ -181,6 +185,9 @@ class Logger : public Component {
uart_port_t get_uart_num() const { return uart_num_; }
void create_pthread_key() { pthread_key_create(&log_recursion_key_, nullptr); }
#endif
#ifdef USE_HOST
void create_pthread_key() { pthread_key_create(&log_recursion_key_, nullptr); }
#endif
#if defined(USE_ESP32) || defined(USE_ESP8266) || defined(USE_RP2040) || defined(USE_LIBRETINY) || defined(USE_ZEPHYR)
void set_uart_selection(UARTSelection uart_selection) { uart_ = uart_selection; }
/// Get the UART used by the logger.
@@ -228,7 +235,7 @@ class Logger : public Component {
inline void HOT format_log_to_buffer_with_terminator_(uint8_t level, const char *tag, int line, const char *format,
va_list args, char *buffer, uint16_t *buffer_at,
uint16_t buffer_size) {
#if defined(USE_ESP32) || defined(USE_LIBRETINY)
#if defined(USE_ESP32) || defined(USE_LIBRETINY) || defined(USE_HOST)
this->write_header_to_buffer_(level, tag, line, this->get_thread_name_(), buffer, buffer_at, buffer_size);
#elif defined(USE_ZEPHYR)
char buff[MAX_POINTER_REPRESENTATION];
@@ -291,6 +298,22 @@ class Logger : public Component {
this->write_tx_buffer_to_console_();
}
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
// Helper to format a pre-formatted message from the task log buffer and notify listeners
// Used by process_messages_ to avoid code duplication between ESP32 and host platforms
inline void HOT format_buffered_message_and_notify_(uint8_t level, const char *tag, uint16_t line,
const char *thread_name, const char *text, size_t text_length) {
this->tx_buffer_at_ = 0;
this->write_header_to_buffer_(level, tag, line, thread_name, this->tx_buffer_, &this->tx_buffer_at_,
this->tx_buffer_size_);
this->write_body_to_buffer_(text, text_length, this->tx_buffer_, &this->tx_buffer_at_, this->tx_buffer_size_);
this->write_footer_to_buffer_(this->tx_buffer_, &this->tx_buffer_at_, this->tx_buffer_size_);
this->tx_buffer_[this->tx_buffer_at_] = '\0';
for (auto *listener : this->log_listeners_)
listener->on_log(level, tag, this->tx_buffer_, this->tx_buffer_at_);
}
#endif
// Write the body of the log message to the buffer
inline void write_body_to_buffer_(const char *value, size_t length, char *buffer, uint16_t *buffer_at,
uint16_t buffer_size) {
@@ -325,6 +348,9 @@ class Logger : public Component {
#if defined(USE_ESP32) || defined(USE_LIBRETINY) || defined(USE_ZEPHYR)
void *main_task_ = nullptr; // Only used for thread name identification
#endif
#ifdef USE_HOST
pthread_t main_thread_{}; // Main thread for identification
#endif
#ifdef USE_ESP32
// Task-specific recursion guards:
// - Main task uses a dedicated member variable for efficiency
@@ -332,6 +358,10 @@ class Logger : public Component {
pthread_key_t log_recursion_key_; // 4 bytes
uart_port_t uart_num_; // 4 bytes (enum defaults to int size)
#endif
#ifdef USE_HOST
// Thread-specific recursion guards using pthread TLS
pthread_key_t log_recursion_key_;
#endif
// Large objects (internally aligned)
#ifdef USE_LOGGER_RUNTIME_TAG_LEVELS
@@ -342,7 +372,11 @@ class Logger : public Component {
std::vector<LoggerLevelListener *> level_listeners_; // Log level change listeners
#endif
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
#ifdef USE_HOST
std::unique_ptr<logger::TaskLogBufferHost> log_buffer_; // Will be initialized with init_log_buffer
#elif defined(USE_ESP32)
std::unique_ptr<logger::TaskLogBuffer> log_buffer_; // Will be initialized with init_log_buffer
#endif
#endif
// Group smaller types together at the end
@@ -355,7 +389,7 @@ class Logger : public Component {
#ifdef USE_LIBRETINY
UARTSelection uart_{UART_SELECTION_DEFAULT};
#endif
#ifdef USE_ESP32
#if defined(USE_ESP32) || defined(USE_HOST)
bool main_task_recursion_guard_{false};
#else
bool global_recursion_guard_{false}; // Simple global recursion guard for single-task platforms
@@ -392,7 +426,7 @@ class Logger : public Component {
}
#endif
#ifdef USE_ESP32
#if defined(USE_ESP32) || defined(USE_HOST)
inline bool HOT check_and_set_task_log_recursion_(bool is_main_task) {
if (is_main_task) {
const bool was_recursive = main_task_recursion_guard_;
@@ -418,6 +452,22 @@ class Logger : public Component {
}
#endif
#ifdef USE_HOST
const char *HOT get_thread_name_() {
pthread_t current_thread = pthread_self();
if (pthread_equal(current_thread, main_thread_)) {
return nullptr; // Main thread
}
// For non-main threads, return the thread name
// We store it in thread-local storage to avoid allocation
static thread_local char thread_name_buf[32];
if (pthread_getname_np(current_thread, thread_name_buf, sizeof(thread_name_buf)) == 0) {
return thread_name_buf;
}
return nullptr;
}
#endif
static inline void copy_string(char *buffer, uint16_t &pos, const char *str) {
const size_t len = strlen(str);
// Intentionally no null terminator, building larger string
@@ -475,7 +525,7 @@ class Logger : public Component {
buffer[pos++] = '0' + (remainder - tens * 10);
buffer[pos++] = ']';
#if defined(USE_ESP32) || defined(USE_LIBRETINY) || defined(USE_ZEPHYR)
#if defined(USE_ESP32) || defined(USE_LIBRETINY) || defined(USE_ZEPHYR) || defined(USE_HOST)
if (thread_name != nullptr) {
write_ansi_color_for_level(buffer, pos, 1); // Always use bold red for thread name
buffer[pos++] = '[';

View File

@@ -10,8 +10,9 @@ void HOT Logger::write_msg_(const char *msg, size_t len) {
time_t rawtime;
time(&rawtime);
struct tm *timeinfo = localtime(&rawtime);
size_t pos = strftime(buffer, TIMESTAMP_LEN + 1, "[%H:%M:%S]", timeinfo);
struct tm timeinfo;
localtime_r(&rawtime, &timeinfo); // Thread-safe version
size_t pos = strftime(buffer, TIMESTAMP_LEN + 1, "[%H:%M:%S]", &timeinfo);
// Copy message (with newline already included by caller)
size_t copy_len = std::min(len, sizeof(buffer) - pos);

View File

@@ -1,5 +1,6 @@
#ifdef USE_ESP32
#include "task_log_buffer.h"
#include "task_log_buffer_esp32.h"
#include "esphome/core/helpers.h"
#include "esphome/core/log.h"
@@ -134,3 +135,4 @@ bool TaskLogBuffer::send_message_thread_safe(uint8_t level, const char *tag, uin
} // namespace esphome::logger
#endif // USE_ESPHOME_TASK_LOG_BUFFER
#endif // USE_ESP32

View File

@@ -1,5 +1,7 @@
#pragma once
#ifdef USE_ESP32
#include "esphome/core/defines.h"
#include "esphome/core/helpers.h"
@@ -13,6 +15,22 @@
namespace esphome::logger {
/**
* @brief Task log buffer for ESP32 platform using FreeRTOS ring buffer.
*
* Threading Model: Multi-Producer Single-Consumer (MPSC)
* - Multiple FreeRTOS tasks can safely call send_message_thread_safe() concurrently
* - Only the main loop task calls borrow_message_main_loop() and release_message_main_loop()
*
* This uses the FreeRTOS ring buffer (RINGBUF_TYPE_NOSPLIT) which provides
* built-in thread-safety for the MPSC pattern. The ring buffer ensures
* message integrity - each message is stored contiguously.
*
* Design:
* - Variable-size messages with header + text stored contiguously
* - FreeRTOS ring buffer handles synchronization internally
* - Atomic counter for fast has_messages() check without ring buffer lock
*/
class TaskLogBuffer {
public:
// Structure for a log message header (text data follows immediately after)
@@ -65,3 +83,4 @@ class TaskLogBuffer {
} // namespace esphome::logger
#endif // USE_ESPHOME_TASK_LOG_BUFFER
#endif // USE_ESP32

View File

@@ -0,0 +1,157 @@
#ifdef USE_HOST
#include "task_log_buffer_host.h"
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
#include "esphome/core/log.h"
#include <algorithm>
#include <cstdio>
namespace esphome::logger {
TaskLogBufferHost::TaskLogBufferHost(size_t slot_count) : slot_count_(slot_count) {
// Allocate message slots
this->slots_ = std::make_unique<LogMessage[]>(slot_count);
}
TaskLogBufferHost::~TaskLogBufferHost() {
// unique_ptr handles cleanup automatically
}
int TaskLogBufferHost::acquire_write_slot_() {
// Try to reserve a slot using compare-and-swap
size_t current_reserve = this->reserve_index_.load(std::memory_order_relaxed);
while (true) {
// Calculate next index (with wrap-around)
size_t next_reserve = (current_reserve + 1) % this->slot_count_;
// Check if buffer would be full
// Buffer is full when next write position equals read position
size_t current_read = this->read_index_.load(std::memory_order_acquire);
if (next_reserve == current_read) {
return -1; // Buffer full
}
// Try to claim this slot
if (this->reserve_index_.compare_exchange_weak(current_reserve, next_reserve, std::memory_order_acq_rel,
std::memory_order_relaxed)) {
return static_cast<int>(current_reserve);
}
// If CAS failed, current_reserve was updated, retry with new value
}
}
void TaskLogBufferHost::commit_write_slot_(int slot_index) {
// Mark the slot as ready for reading
this->slots_[slot_index].ready.store(true, std::memory_order_release);
// Try to advance the write_index if we're the next expected commit
// This ensures messages are read in order
size_t expected = slot_index;
size_t next = (slot_index + 1) % this->slot_count_;
// We only advance write_index if this slot is the next one expected
// This handles out-of-order commits correctly
while (true) {
if (!this->write_index_.compare_exchange_weak(expected, next, std::memory_order_release,
std::memory_order_relaxed)) {
// Someone else advanced it or we're not next in line, that's fine
break;
}
// Successfully advanced, check if next slot is also ready
expected = next;
next = (next + 1) % this->slot_count_;
if (!this->slots_[expected].ready.load(std::memory_order_acquire)) {
break;
}
}
}
bool TaskLogBufferHost::send_message_thread_safe(uint8_t level, const char *tag, uint16_t line, const char *format,
va_list args) {
// Acquire a slot
int slot_index = this->acquire_write_slot_();
if (slot_index < 0) {
return false; // Buffer full
}
LogMessage &msg = this->slots_[slot_index];
// Fill in the message header
msg.level = level;
msg.tag = tag;
msg.line = line;
// Get thread name using pthread
char thread_name_buf[LogMessage::MAX_THREAD_NAME_SIZE];
// pthread_getname_np works the same on Linux and macOS
if (pthread_getname_np(pthread_self(), thread_name_buf, sizeof(thread_name_buf)) == 0) {
strncpy(msg.thread_name, thread_name_buf, sizeof(msg.thread_name) - 1);
msg.thread_name[sizeof(msg.thread_name) - 1] = '\0';
} else {
msg.thread_name[0] = '\0';
}
// Format the message text
int ret = vsnprintf(msg.text, sizeof(msg.text), format, args);
if (ret < 0) {
// Formatting error - still commit the slot but with empty text
msg.text[0] = '\0';
msg.text_length = 0;
} else {
msg.text_length = static_cast<uint16_t>(std::min(static_cast<size_t>(ret), sizeof(msg.text) - 1));
}
// Remove trailing newlines
while (msg.text_length > 0 && msg.text[msg.text_length - 1] == '\n') {
msg.text_length--;
}
msg.text[msg.text_length] = '\0';
// Commit the slot
this->commit_write_slot_(slot_index);
return true;
}
bool TaskLogBufferHost::get_message_main_loop(LogMessage **message) {
if (message == nullptr) {
return false;
}
size_t current_read = this->read_index_.load(std::memory_order_relaxed);
size_t current_write = this->write_index_.load(std::memory_order_acquire);
// Check if buffer is empty
if (current_read == current_write) {
return false;
}
// Check if the slot is ready (should always be true if write_index advanced)
LogMessage &msg = this->slots_[current_read];
if (!msg.ready.load(std::memory_order_acquire)) {
return false;
}
*message = &msg;
return true;
}
void TaskLogBufferHost::release_message_main_loop() {
size_t current_read = this->read_index_.load(std::memory_order_relaxed);
// Clear the ready flag
this->slots_[current_read].ready.store(false, std::memory_order_release);
// Advance read index
size_t next_read = (current_read + 1) % this->slot_count_;
this->read_index_.store(next_read, std::memory_order_release);
}
} // namespace esphome::logger
#endif // USE_ESPHOME_TASK_LOG_BUFFER
#endif // USE_HOST

View File

@@ -0,0 +1,122 @@
#pragma once
#ifdef USE_HOST
#include "esphome/core/defines.h"
#include "esphome/core/helpers.h"
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
#include <atomic>
#include <cstdarg>
#include <cstddef>
#include <cstring>
#include <memory>
#include <pthread.h>
namespace esphome::logger {
/**
* @brief Lock-free task log buffer for host platform.
*
* Threading Model: Multi-Producer Single-Consumer (MPSC)
* - Multiple threads can safely call send_message_thread_safe() concurrently
* - Only the main loop thread calls get_message_main_loop() and release_message_main_loop()
*
* Producers (multiple threads) Consumer (main loop only)
* │ │
* ▼ ▼
* acquire_write_slot_() get_message_main_loop()
* CAS on reserve_index_ read write_index_
* │ check ready flag
* ▼ │
* write to slot (exclusive) ▼
* │ read slot data
* ▼ │
* commit_write_slot_() ▼
* set ready=true release_message_main_loop()
* advance write_index_ set ready=false
* advance read_index_
*
* This implements a lock-free ring buffer for log messages on the host platform.
* It uses atomic compare-and-swap (CAS) operations for thread-safe slot reservation
* without requiring mutexes in the hot path.
*
* Design:
* - Fixed number of pre-allocated message slots to avoid dynamic allocation
* - Each slot contains a header and fixed-size text buffer
* - Atomic CAS for slot reservation allows multiple producers without locks
* - Single consumer (main loop) processes messages in order
*/
class TaskLogBufferHost {
public:
// Default number of message slots - host has plenty of memory
static constexpr size_t DEFAULT_SLOT_COUNT = 64;
// Structure for a log message (fixed size for lock-free operation)
struct LogMessage {
// Size constants
static constexpr size_t MAX_THREAD_NAME_SIZE = 32;
static constexpr size_t MAX_TEXT_SIZE = 512;
const char *tag; // Pointer to static tag string
char thread_name[MAX_THREAD_NAME_SIZE]; // Thread name (copied)
char text[MAX_TEXT_SIZE + 1]; // Message text with null terminator
uint16_t text_length; // Actual length of text
uint16_t line; // Source line number
uint8_t level; // Log level
std::atomic<bool> ready; // Message is ready to be consumed
LogMessage() : tag(nullptr), text_length(0), line(0), level(0), ready(false) {
thread_name[0] = '\0';
text[0] = '\0';
}
};
/// Constructor that takes the number of message slots
explicit TaskLogBufferHost(size_t slot_count);
~TaskLogBufferHost();
// NOT thread-safe - get next message from buffer, only call from main loop
// Returns true if a message was retrieved, false if buffer is empty
bool get_message_main_loop(LogMessage **message);
// NOT thread-safe - release the message after processing, only call from main loop
void release_message_main_loop();
// Thread-safe - send a message to the buffer from any thread
// Returns true if message was queued, false if buffer is full
bool send_message_thread_safe(uint8_t level, const char *tag, uint16_t line, const char *format, va_list args);
// Check if there are messages ready to be processed
inline bool HOT has_messages() const {
return read_index_.load(std::memory_order_acquire) != write_index_.load(std::memory_order_acquire);
}
// Get the buffer size (number of slots)
inline size_t size() const { return slot_count_; }
private:
// Acquire a slot for writing (thread-safe)
// Returns slot index or -1 if buffer is full
int acquire_write_slot_();
// Commit a slot after writing (thread-safe)
void commit_write_slot_(int slot_index);
std::unique_ptr<LogMessage[]> slots_; // Pre-allocated message slots
size_t slot_count_; // Number of slots
// Lock-free indices using atomics
// - reserve_index_: Next slot to reserve (producers CAS this to claim slots)
// - write_index_: Boundary of committed/ready slots (consumer reads up to this)
// - read_index_: Next slot to read (only consumer modifies this)
std::atomic<size_t> reserve_index_{0}; // Next slot to reserve for writing
std::atomic<size_t> write_index_{0}; // Last committed slot boundary
std::atomic<size_t> read_index_{0}; // Next slot to read from
};
} // namespace esphome::logger
#endif // USE_ESPHOME_TASK_LOG_BUFFER
#endif // USE_HOST

View File

@@ -0,0 +1,91 @@
esphome:
name: host-logger-thread-test
host:
api:
logger:
button:
- platform: template
name: "Start Thread Race Test"
id: start_test_button
on_press:
- lambda: |-
// Number of threads and messages per thread
static const int NUM_THREADS = 3;
static const int MESSAGES_PER_THREAD = 100;
// Counters
static std::atomic<int> total_messages_logged{0};
// Thread function - must be a regular function pointer for pthread
struct ThreadTest {
static void *thread_func(void *arg) {
int thread_id = *static_cast<int *>(arg);
// Set thread name (different signatures on macOS vs Linux)
char thread_name[16];
snprintf(thread_name, sizeof(thread_name), "LogThread%d", thread_id);
#ifdef __APPLE__
pthread_setname_np(thread_name);
#else
pthread_setname_np(pthread_self(), thread_name);
#endif
// Log messages with different log levels
for (int i = 0; i < MESSAGES_PER_THREAD; i++) {
switch (i % 4) {
case 0:
ESP_LOGI("thread_test", "THREAD%d_MSG%03d_INFO_MESSAGE_WITH_DATA_%08X",
thread_id, i, i * 12345);
break;
case 1:
ESP_LOGD("thread_test", "THREAD%d_MSG%03d_DEBUG_MESSAGE_WITH_DATA_%08X",
thread_id, i, i * 12345);
break;
case 2:
ESP_LOGW("thread_test", "THREAD%d_MSG%03d_WARN_MESSAGE_WITH_DATA_%08X",
thread_id, i, i * 12345);
break;
case 3:
ESP_LOGE("thread_test", "THREAD%d_MSG%03d_ERROR_MESSAGE_WITH_DATA_%08X",
thread_id, i, i * 12345);
break;
}
total_messages_logged.fetch_add(1, std::memory_order_relaxed);
// Small busy loop to vary timing between threads
int delay_count = (thread_id + 1) * 10;
while (delay_count-- > 0) {
asm volatile("" ::: "memory"); // Prevent optimization
}
}
return nullptr;
}
};
ESP_LOGI("thread_test", "RACE_TEST_START: Starting %d threads with %d messages each",
NUM_THREADS, MESSAGES_PER_THREAD);
// Reset counter for this test run
total_messages_logged.store(0, std::memory_order_relaxed);
pthread_t threads[NUM_THREADS];
int thread_ids[NUM_THREADS];
// Create all threads
for (int i = 0; i < NUM_THREADS; i++) {
thread_ids[i] = i;
int ret = pthread_create(&threads[i], nullptr, ThreadTest::thread_func, &thread_ids[i]);
if (ret != 0) {
ESP_LOGE("thread_test", "RACE_TEST_ERROR: Failed to create thread %d", i);
return;
}
}
// Wait for all threads to complete
for (int i = 0; i < NUM_THREADS; i++) {
pthread_join(threads[i], nullptr);
}
ESP_LOGI("thread_test", "RACE_TEST_COMPLETE: All threads finished, total messages: %d",
total_messages_logged.load(std::memory_order_relaxed));

View File

@@ -0,0 +1,182 @@
"""Integration test for host logger thread safety.
This test verifies that the logger's MPSC ring buffer correctly handles
multiple threads racing to log messages without corruption or data loss.
"""
from __future__ import annotations
import asyncio
import re
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
# Expected pattern for log messages from threads
# Format: THREADn_MSGnnn_LEVEL_MESSAGE_WITH_DATA_xxxxxxxx
THREAD_MSG_PATTERN = re.compile(
r"THREAD(\d+)_MSG(\d{3})_(INFO|DEBUG|WARN|ERROR)_MESSAGE_WITH_DATA_([0-9A-F]{8})"
)
# Pattern for test start/complete markers
TEST_START_PATTERN = re.compile(r"RACE_TEST_START.*Starting (\d+) threads")
TEST_COMPLETE_PATTERN = re.compile(r"RACE_TEST_COMPLETE.*total messages: (\d+)")
# Expected values
NUM_THREADS = 3
MESSAGES_PER_THREAD = 100
EXPECTED_TOTAL_MESSAGES = NUM_THREADS * MESSAGES_PER_THREAD
@pytest.mark.asyncio
async def test_host_logger_thread_safety(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test that multiple threads can log concurrently without corruption.
This test:
1. Spawns 3 threads that each log 100 messages
2. Collects all log output
3. Verifies no lines are corrupted (partially written or interleaved)
4. Verifies all expected messages were received
"""
collected_lines: list[str] = []
test_complete_event = asyncio.Event()
def line_callback(line: str) -> None:
"""Collect log lines and detect test completion."""
collected_lines.append(line)
if "RACE_TEST_COMPLETE" in line:
test_complete_event.set()
# Run the test binary and collect output
async with (
run_compiled(yaml_config, line_callback=line_callback),
api_client_connected() as client,
):
# Verify connection works
device_info = await client.device_info()
assert device_info is not None
assert device_info.name == "host-logger-thread-test"
# Get the button entity - find by name
entities, _ = await client.list_entities_services()
button_entities = [e for e in entities if e.name == "Start Thread Race Test"]
assert button_entities, "Could not find Start Thread Race Test button"
button_key = button_entities[0].key
# Press the button to start the thread race test
client.button_command(button_key)
# Wait for test to complete (with timeout)
try:
await asyncio.wait_for(test_complete_event.wait(), timeout=30.0)
except TimeoutError:
pytest.fail(
"Test did not complete within timeout. "
f"Collected {len(collected_lines)} lines."
)
# Give a bit more time for any remaining buffered messages
await asyncio.sleep(0.5)
# Analyze collected log lines
thread_messages: dict[int, set[int]] = {i: set() for i in range(NUM_THREADS)}
corrupted_lines: list[str] = []
test_started = False
test_completed = False
reported_total = 0
for line in collected_lines:
# Check for test start
start_match = TEST_START_PATTERN.search(line)
if start_match:
test_started = True
assert int(start_match.group(1)) == NUM_THREADS, (
f"Unexpected thread count: {start_match.group(1)}"
)
continue
# Check for test completion
complete_match = TEST_COMPLETE_PATTERN.search(line)
if complete_match:
test_completed = True
reported_total = int(complete_match.group(1))
continue
# Check for thread messages
msg_match = THREAD_MSG_PATTERN.search(line)
if msg_match:
thread_id = int(msg_match.group(1))
msg_num = int(msg_match.group(2))
# level = msg_match.group(3) # INFO, DEBUG, WARN, ERROR
data_hex = msg_match.group(4)
# Verify data value matches expected calculation
expected_data = f"{msg_num * 12345:08X}"
if data_hex != expected_data:
corrupted_lines.append(
f"Data mismatch in line: {line} "
f"(expected {expected_data}, got {data_hex})"
)
continue
# Track which messages we received from each thread
if 0 <= thread_id < NUM_THREADS:
thread_messages[thread_id].add(msg_num)
else:
corrupted_lines.append(f"Invalid thread ID in line: {line}")
continue
# Check for partial/corrupted thread messages
# If a line contains part of a thread message pattern but doesn't match fully
# This could indicate line corruption from interleaving
if (
"THREAD" in line
and "MSG" in line
and not msg_match
and "_MESSAGE_WITH_DATA_" in line
):
corrupted_lines.append(f"Possibly corrupted line: {line}")
# Assertions
assert test_started, "Test start marker not found in output"
assert test_completed, "Test completion marker not found in output"
assert reported_total == EXPECTED_TOTAL_MESSAGES, (
f"Reported total {reported_total} != expected {EXPECTED_TOTAL_MESSAGES}"
)
# Check for corrupted lines
assert not corrupted_lines, (
f"Found {len(corrupted_lines)} corrupted lines:\n"
+ "\n".join(corrupted_lines[:10]) # Show first 10
)
# Count total messages received
total_received = sum(len(msgs) for msgs in thread_messages.values())
# We may not receive all messages due to ring buffer overflow when buffer is full
# The test primarily verifies no corruption, not that we receive every message
# However, we should receive a reasonable number of messages
min_expected = EXPECTED_TOTAL_MESSAGES // 2 # At least 50%
assert total_received >= min_expected, (
f"Received only {total_received} messages, expected at least {min_expected}. "
f"Per-thread breakdown: "
+ ", ".join(f"Thread{i}: {len(msgs)}" for i, msgs in thread_messages.items())
)
# Verify we got messages from all threads (proves concurrent logging worked)
for thread_id in range(NUM_THREADS):
assert thread_messages[thread_id], (
f"No messages received from thread {thread_id}"
)
# Log summary for debugging
print("\nThread safety test summary:")
print(f" Total messages received: {total_received}/{EXPECTED_TOTAL_MESSAGES}")
for thread_id in range(NUM_THREADS):
received = len(thread_messages[thread_id])
print(f" Thread {thread_id}: {received}/{MESSAGES_PER_THREAD} messages")