mirror of
https://github.com/esphome/esphome.git
synced 2026-03-04 11:48:21 -07:00
[ld2410] Add UART mock integration test for LD2410 component (#14377)
This commit is contained in:
@@ -0,0 +1,86 @@
|
||||
import esphome.codegen as cg
|
||||
from esphome.components import uart
|
||||
from esphome.components.uart import CONF_DATA_BITS, CONF_PARITY, CONF_STOP_BITS
|
||||
import esphome.config_validation as cv
|
||||
from esphome.const import CONF_BAUD_RATE, CONF_DATA, CONF_DELAY, CONF_ID, CONF_INTERVAL
|
||||
|
||||
CODEOWNERS = ["@esphome/tests"]
|
||||
MULTI_CONF = True
|
||||
|
||||
uart_mock_ns = cg.esphome_ns.namespace("uart_mock")
|
||||
MockUartComponent = uart_mock_ns.class_(
|
||||
"MockUartComponent", uart.UARTComponent, cg.Component
|
||||
)
|
||||
|
||||
CONF_INJECTIONS = "injections"
|
||||
CONF_RESPONSES = "responses"
|
||||
CONF_INJECT_RX = "inject_rx"
|
||||
CONF_EXPECT_TX = "expect_tx"
|
||||
CONF_PERIODIC_RX = "periodic_rx"
|
||||
|
||||
UART_PARITY_OPTIONS = {
|
||||
"NONE": uart.UARTParityOptions.UART_CONFIG_PARITY_NONE,
|
||||
"EVEN": uart.UARTParityOptions.UART_CONFIG_PARITY_EVEN,
|
||||
"ODD": uart.UARTParityOptions.UART_CONFIG_PARITY_ODD,
|
||||
}
|
||||
|
||||
INJECTION_SCHEMA = cv.Schema(
|
||||
{
|
||||
cv.Required(CONF_INJECT_RX): [cv.hex_uint8_t],
|
||||
cv.Optional(CONF_DELAY, default="0ms"): cv.positive_time_period_milliseconds,
|
||||
}
|
||||
)
|
||||
|
||||
RESPONSE_SCHEMA = cv.Schema(
|
||||
{
|
||||
cv.Required(CONF_EXPECT_TX): [cv.hex_uint8_t],
|
||||
cv.Required(CONF_INJECT_RX): [cv.hex_uint8_t],
|
||||
}
|
||||
)
|
||||
|
||||
PERIODIC_RX_SCHEMA = cv.Schema(
|
||||
{
|
||||
cv.Required(CONF_DATA): [cv.hex_uint8_t],
|
||||
cv.Required(CONF_INTERVAL): cv.positive_time_period_milliseconds,
|
||||
}
|
||||
)
|
||||
|
||||
CONFIG_SCHEMA = cv.Schema(
|
||||
{
|
||||
cv.GenerateID(): cv.declare_id(MockUartComponent),
|
||||
cv.Required(CONF_BAUD_RATE): cv.int_range(min=1),
|
||||
cv.Optional(CONF_STOP_BITS, default=1): cv.one_of(1, 2, int=True),
|
||||
cv.Optional(CONF_DATA_BITS, default=8): cv.int_range(min=5, max=8),
|
||||
cv.Optional(CONF_PARITY, default="NONE"): cv.enum(
|
||||
UART_PARITY_OPTIONS, upper=True
|
||||
),
|
||||
cv.Optional(CONF_INJECTIONS, default=[]): cv.ensure_list(INJECTION_SCHEMA),
|
||||
cv.Optional(CONF_RESPONSES, default=[]): cv.ensure_list(RESPONSE_SCHEMA),
|
||||
cv.Optional(CONF_PERIODIC_RX, default=[]): cv.ensure_list(PERIODIC_RX_SCHEMA),
|
||||
}
|
||||
).extend(cv.COMPONENT_SCHEMA)
|
||||
|
||||
|
||||
async def to_code(config):
|
||||
var = cg.new_Pvariable(config[CONF_ID])
|
||||
await cg.register_component(var, config)
|
||||
|
||||
cg.add(var.set_baud_rate(config[CONF_BAUD_RATE]))
|
||||
cg.add(var.set_stop_bits(config[CONF_STOP_BITS]))
|
||||
cg.add(var.set_data_bits(config[CONF_DATA_BITS]))
|
||||
cg.add(var.set_parity(config[CONF_PARITY]))
|
||||
|
||||
for injection in config[CONF_INJECTIONS]:
|
||||
rx_data = injection[CONF_INJECT_RX]
|
||||
delay_ms = injection[CONF_DELAY]
|
||||
cg.add(var.add_injection(rx_data, delay_ms))
|
||||
|
||||
for response in config[CONF_RESPONSES]:
|
||||
tx_data = response[CONF_EXPECT_TX]
|
||||
rx_data = response[CONF_INJECT_RX]
|
||||
cg.add(var.add_response(tx_data, rx_data))
|
||||
|
||||
for periodic in config[CONF_PERIODIC_RX]:
|
||||
data = periodic[CONF_DATA]
|
||||
interval = periodic[CONF_INTERVAL]
|
||||
cg.add(var.add_periodic_rx(data, interval))
|
||||
@@ -0,0 +1,159 @@
|
||||
// Host-only test component — do not copy to production code.
|
||||
// See uart_mock.h for details.
|
||||
|
||||
#include "uart_mock.h"
|
||||
#include "esphome/core/application.h"
|
||||
#include "esphome/core/helpers.h"
|
||||
#include "esphome/core/log.h"
|
||||
|
||||
namespace esphome::uart_mock {
|
||||
|
||||
static const char *const TAG = "uart_mock";
|
||||
|
||||
void MockUartComponent::setup() {
|
||||
ESP_LOGI(TAG, "Mock UART initialized with %zu injections, %zu responses, %zu periodic", this->injections_.size(),
|
||||
this->responses_.size(), this->periodic_rx_.size());
|
||||
}
|
||||
|
||||
void MockUartComponent::loop() {
|
||||
uint32_t now = App.get_loop_component_start_time();
|
||||
|
||||
// Initialize scenario start time on first loop() call, after all components have
|
||||
// finished setup(). This prevents injection delays from being consumed during setup.
|
||||
if (!this->loop_started_) {
|
||||
this->loop_started_ = true;
|
||||
this->scenario_start_ms_ = now;
|
||||
this->cumulative_delay_ms_ = 0;
|
||||
ESP_LOGD(TAG, "Scenario started at %u ms", now);
|
||||
}
|
||||
|
||||
// Process at most ONE timed injection per loop iteration.
|
||||
// This ensures each injection is in a separate loop cycle, giving the consuming
|
||||
// component (e.g., LD2410) a chance to process each batch independently.
|
||||
if (this->injection_index_ < this->injections_.size()) {
|
||||
auto &injection = this->injections_[this->injection_index_];
|
||||
uint32_t target_time = this->scenario_start_ms_ + this->cumulative_delay_ms_ + injection.delay_ms;
|
||||
if (now >= target_time) {
|
||||
ESP_LOGD(TAG, "Injecting %zu RX bytes (injection %u)", injection.rx_data.size(), this->injection_index_);
|
||||
this->inject_to_rx_buffer_(injection.rx_data);
|
||||
this->cumulative_delay_ms_ += injection.delay_ms;
|
||||
this->injection_index_++;
|
||||
}
|
||||
}
|
||||
|
||||
// Process periodic RX
|
||||
for (auto &periodic : this->periodic_rx_) {
|
||||
if (now - periodic.last_inject_ms >= periodic.interval_ms) {
|
||||
this->inject_to_rx_buffer_(periodic.data);
|
||||
periodic.last_inject_ms = now;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void MockUartComponent::dump_config() {
|
||||
ESP_LOGCONFIG(TAG,
|
||||
"Mock UART Component:\n"
|
||||
" Baud Rate: %u\n"
|
||||
" Injections: %zu\n"
|
||||
" Responses: %zu\n"
|
||||
" Periodic RX: %zu",
|
||||
this->baud_rate_, this->injections_.size(), this->responses_.size(), this->periodic_rx_.size());
|
||||
}
|
||||
|
||||
void MockUartComponent::write_array(const uint8_t *data, size_t len) {
|
||||
this->tx_count_ += len;
|
||||
this->tx_buffer_.insert(this->tx_buffer_.end(), data, data + len);
|
||||
|
||||
// Log all TX data so tests can verify what the component sends
|
||||
if (len > 0 && len <= 64) {
|
||||
char hex_buf[format_hex_pretty_size(64)];
|
||||
ESP_LOGD(TAG, "TX %zu bytes: %s", len, format_hex_pretty_to(hex_buf, sizeof(hex_buf), data, len));
|
||||
} else if (len > 64) {
|
||||
ESP_LOGD(TAG, "TX %zu bytes (too large to log)", len);
|
||||
}
|
||||
|
||||
#ifdef USE_UART_DEBUGGER
|
||||
for (size_t i = 0; i < len; i++) {
|
||||
this->debug_callback_.call(uart::UART_DIRECTION_TX, data[i]);
|
||||
}
|
||||
#endif
|
||||
|
||||
this->try_match_response_();
|
||||
}
|
||||
|
||||
bool MockUartComponent::peek_byte(uint8_t *data) {
|
||||
if (this->rx_buffer_.empty()) {
|
||||
return false;
|
||||
}
|
||||
*data = this->rx_buffer_.front();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool MockUartComponent::read_array(uint8_t *data, size_t len) {
|
||||
if (this->rx_buffer_.size() < len) {
|
||||
return false;
|
||||
}
|
||||
for (size_t i = 0; i < len; i++) {
|
||||
data[i] = this->rx_buffer_.front();
|
||||
this->rx_buffer_.pop_front();
|
||||
}
|
||||
this->rx_count_ += len;
|
||||
|
||||
#ifdef USE_UART_DEBUGGER
|
||||
for (size_t i = 0; i < len; i++) {
|
||||
this->debug_callback_.call(uart::UART_DIRECTION_RX, data[i]);
|
||||
}
|
||||
#endif
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
size_t MockUartComponent::available() { return this->rx_buffer_.size(); }
|
||||
|
||||
void MockUartComponent::flush() {
|
||||
// Nothing to flush in mock
|
||||
}
|
||||
|
||||
void MockUartComponent::add_injection(const std::vector<uint8_t> &rx_data, uint32_t delay_ms) {
|
||||
this->injections_.push_back({rx_data, delay_ms});
|
||||
}
|
||||
|
||||
void MockUartComponent::add_response(const std::vector<uint8_t> &expect_tx, const std::vector<uint8_t> &inject_rx) {
|
||||
this->responses_.push_back({expect_tx, inject_rx});
|
||||
}
|
||||
|
||||
void MockUartComponent::add_periodic_rx(const std::vector<uint8_t> &data, uint32_t interval_ms) {
|
||||
this->periodic_rx_.push_back({data, interval_ms, 0});
|
||||
}
|
||||
|
||||
void MockUartComponent::try_match_response_() {
|
||||
for (auto &response : this->responses_) {
|
||||
if (this->tx_buffer_.size() < response.expect_tx.size()) {
|
||||
continue;
|
||||
}
|
||||
// Check if tx_buffer_ ends with expect_tx
|
||||
size_t offset = this->tx_buffer_.size() - response.expect_tx.size();
|
||||
if (std::equal(response.expect_tx.begin(), response.expect_tx.end(), this->tx_buffer_.begin() + offset)) {
|
||||
ESP_LOGD(TAG, "TX match found, injecting %zu RX bytes", response.inject_rx.size());
|
||||
this->inject_to_rx_buffer_(response.inject_rx);
|
||||
this->tx_buffer_.clear();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void MockUartComponent::inject_to_rx_buffer_(const std::vector<uint8_t> &data) {
|
||||
// Log injected RX data so tests can see what's being fed to the component
|
||||
if (!data.empty() && data.size() <= 64) {
|
||||
char hex_buf[format_hex_pretty_size(64)];
|
||||
ESP_LOGD(TAG, "RX inject %zu bytes: %s", data.size(),
|
||||
format_hex_pretty_to(hex_buf, sizeof(hex_buf), data.data(), data.size()));
|
||||
} else if (data.size() > 64) {
|
||||
ESP_LOGD(TAG, "RX inject %zu bytes (too large to log inline)", data.size());
|
||||
}
|
||||
for (uint8_t byte : data) {
|
||||
this->rx_buffer_.push_back(byte);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace esphome::uart_mock
|
||||
@@ -0,0 +1,78 @@
|
||||
#pragma once
|
||||
|
||||
// ============================================================================
|
||||
// HOST-ONLY TEST COMPONENT — DO NOT COPY TO PRODUCTION CODE
|
||||
//
|
||||
// This component runs exclusively on the host platform for integration testing.
|
||||
// It intentionally uses std::vector, std::deque, and dynamic allocation which
|
||||
// would be inappropriate for production embedded components. Do not use this
|
||||
// code as a reference for writing ESPHome components targeting real hardware.
|
||||
// ============================================================================
|
||||
|
||||
#include "esphome/core/component.h"
|
||||
#include "esphome/components/uart/uart_component.h"
|
||||
#include <deque>
|
||||
#include <vector>
|
||||
|
||||
namespace esphome::uart_mock {
|
||||
|
||||
class MockUartComponent : public uart::UARTComponent, public Component {
|
||||
public:
|
||||
void setup() override;
|
||||
void loop() override;
|
||||
void dump_config() override;
|
||||
float get_setup_priority() const override { return setup_priority::BUS; }
|
||||
|
||||
// UARTComponent interface
|
||||
void write_array(const uint8_t *data, size_t len) override;
|
||||
bool peek_byte(uint8_t *data) override;
|
||||
bool read_array(uint8_t *data, size_t len) override;
|
||||
size_t available() override;
|
||||
void flush() override;
|
||||
|
||||
// Scenario configuration - called from generated code
|
||||
void add_injection(const std::vector<uint8_t> &rx_data, uint32_t delay_ms);
|
||||
void add_response(const std::vector<uint8_t> &expect_tx, const std::vector<uint8_t> &inject_rx);
|
||||
void add_periodic_rx(const std::vector<uint8_t> &data, uint32_t interval_ms);
|
||||
|
||||
protected:
|
||||
void check_logger_conflict() override {}
|
||||
void try_match_response_();
|
||||
void inject_to_rx_buffer_(const std::vector<uint8_t> &data);
|
||||
|
||||
// Timed injections
|
||||
struct Injection {
|
||||
std::vector<uint8_t> rx_data;
|
||||
uint32_t delay_ms;
|
||||
};
|
||||
std::vector<Injection> injections_;
|
||||
uint32_t injection_index_{0};
|
||||
uint32_t scenario_start_ms_{0};
|
||||
uint32_t cumulative_delay_ms_{0};
|
||||
bool loop_started_{false};
|
||||
|
||||
// TX-triggered responses
|
||||
struct Response {
|
||||
std::vector<uint8_t> expect_tx;
|
||||
std::vector<uint8_t> inject_rx;
|
||||
};
|
||||
std::vector<Response> responses_;
|
||||
std::vector<uint8_t> tx_buffer_;
|
||||
|
||||
// RX buffer
|
||||
std::deque<uint8_t> rx_buffer_;
|
||||
|
||||
// Periodic RX
|
||||
struct PeriodicRx {
|
||||
std::vector<uint8_t> data;
|
||||
uint32_t interval_ms;
|
||||
uint32_t last_inject_ms{0};
|
||||
};
|
||||
std::vector<PeriodicRx> periodic_rx_;
|
||||
|
||||
// Observability
|
||||
uint32_t tx_count_{0};
|
||||
uint32_t rx_count_{0};
|
||||
};
|
||||
|
||||
} // namespace esphome::uart_mock
|
||||
145
tests/integration/fixtures/uart_mock_ld2410.yaml
Normal file
145
tests/integration/fixtures/uart_mock_ld2410.yaml
Normal file
@@ -0,0 +1,145 @@
|
||||
esphome:
|
||||
name: uart-mock-ld2410-test
|
||||
|
||||
host:
|
||||
api:
|
||||
logger:
|
||||
level: VERBOSE
|
||||
|
||||
external_components:
|
||||
- source:
|
||||
type: local
|
||||
path: EXTERNAL_COMPONENT_PATH
|
||||
|
||||
# Dummy uart entry to satisfy ld2410's DEPENDENCIES = ["uart"]
|
||||
# The actual UART bus used is the uart_mock component below
|
||||
uart:
|
||||
baud_rate: 115200
|
||||
port: /dev/null
|
||||
|
||||
uart_mock:
|
||||
id: mock_uart
|
||||
baud_rate: 256000
|
||||
injections:
|
||||
# Phase 1 (t=100ms): Valid LD2410 normal mode data frame - happy path
|
||||
# The buffer is clean at this point, so this frame should parse correctly.
|
||||
# Moving target: 100cm, energy 50
|
||||
# Still target: 120cm, energy 25
|
||||
# Detection distance: 300cm
|
||||
# Target state: 0x03 (moving + still)
|
||||
#
|
||||
# Note: LD2410's two_byte_to_int() uses signed char, so low bytes must be
|
||||
# <=127 to produce correct values. Values >127 wrap negative.
|
||||
#
|
||||
# Frame layout (24 bytes):
|
||||
# [0-3] F4 F3 F2 F1 = data frame header
|
||||
# [4-5] 0D 00 = length 13
|
||||
# [6] 02 = data type (normal)
|
||||
# [7] AA = data header marker
|
||||
# [8] 03 = target states (moving+still)
|
||||
# [9-10] 64 00 = moving distance 100 (0x0064)
|
||||
# [11] 32 = moving energy 50
|
||||
# [12-13] 78 00 = still distance 120 (0x0078)
|
||||
# [14] 19 = still energy 25
|
||||
# [15-16] 2C 01 = detection distance 300 (0x012C)
|
||||
# [17] 00 = padding
|
||||
# [18] 55 = data footer marker
|
||||
# [19] 00 = CRC/check
|
||||
# [20-23] F8 F7 F6 F5 = data frame footer
|
||||
- delay: 100ms
|
||||
inject_rx:
|
||||
[
|
||||
0xF4, 0xF3, 0xF2, 0xF1,
|
||||
0x0D, 0x00,
|
||||
0x02, 0xAA,
|
||||
0x03,
|
||||
0x64, 0x00,
|
||||
0x32,
|
||||
0x78, 0x00,
|
||||
0x19,
|
||||
0x2C, 0x01,
|
||||
0x00,
|
||||
0x55, 0x00,
|
||||
0xF8, 0xF7, 0xF6, 0xF5,
|
||||
]
|
||||
|
||||
# Phase 2 (t=200ms): Garbage bytes - corrupt the buffer
|
||||
# These random bytes will accumulate in the LD2410's internal buffer.
|
||||
# No footer will be found, so the buffer just grows.
|
||||
- delay: 100ms
|
||||
inject_rx: [0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x11, 0x22]
|
||||
|
||||
# Phase 3 (t=300ms): Truncated frame (header + partial data, no footer)
|
||||
# More bytes accumulating in the buffer without a footer match.
|
||||
- delay: 100ms
|
||||
inject_rx: [0xF4, 0xF3, 0xF2, 0xF1, 0x0D, 0x00, 0x02, 0xAA]
|
||||
|
||||
# Phase 4 (t=500ms): Overflow - inject 85 bytes of 0xFF (MAX_LINE_LENGTH=50)
|
||||
# Buffer has 15 bytes from phases 2+3.
|
||||
# Overflow math: need 35 bytes to trigger first overflow (pos 15->49),
|
||||
# then 50 more to trigger second overflow (pos 0->49). Total = 85 bytes.
|
||||
# After two overflows, buffer_pos_ = 0 with a completely clean buffer.
|
||||
# The LD2410 logs "Max command length exceeded" at each overflow.
|
||||
- delay: 200ms
|
||||
inject_rx:
|
||||
[
|
||||
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
||||
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
||||
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
||||
0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
||||
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
||||
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
||||
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
||||
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
||||
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
||||
]
|
||||
|
||||
# Phase 5 (t=600ms): Valid frame after overflow - recovery test
|
||||
# Buffer was reset by overflow. This valid frame should parse correctly.
|
||||
# Moving target: 50cm, energy 100
|
||||
# Still target: 75cm, energy 80
|
||||
# Detection distance: 127cm
|
||||
- delay: 100ms
|
||||
inject_rx:
|
||||
[
|
||||
0xF4, 0xF3, 0xF2, 0xF1,
|
||||
0x0D, 0x00,
|
||||
0x02, 0xAA,
|
||||
0x03,
|
||||
0x32, 0x00,
|
||||
0x64,
|
||||
0x4B, 0x00,
|
||||
0x50,
|
||||
0x7F, 0x00,
|
||||
0x00,
|
||||
0x55, 0x00,
|
||||
0xF8, 0xF7, 0xF6, 0xF5,
|
||||
]
|
||||
|
||||
ld2410:
|
||||
id: ld2410_dev
|
||||
uart_id: mock_uart
|
||||
|
||||
sensor:
|
||||
- platform: ld2410
|
||||
ld2410_id: ld2410_dev
|
||||
moving_distance:
|
||||
name: "Moving Distance"
|
||||
still_distance:
|
||||
name: "Still Distance"
|
||||
moving_energy:
|
||||
name: "Moving Energy"
|
||||
still_energy:
|
||||
name: "Still Energy"
|
||||
detection_distance:
|
||||
name: "Detection Distance"
|
||||
|
||||
binary_sensor:
|
||||
- platform: ld2410
|
||||
ld2410_id: ld2410_dev
|
||||
has_target:
|
||||
name: "Has Target"
|
||||
has_moving_target:
|
||||
name: "Has Moving Target"
|
||||
has_still_target:
|
||||
name: "Has Still Target"
|
||||
156
tests/integration/fixtures/uart_mock_ld2410_engineering.yaml
Normal file
156
tests/integration/fixtures/uart_mock_ld2410_engineering.yaml
Normal file
@@ -0,0 +1,156 @@
|
||||
esphome:
|
||||
name: uart-mock-ld2410-eng-test
|
||||
|
||||
host:
|
||||
api:
|
||||
logger:
|
||||
level: VERBOSE
|
||||
|
||||
external_components:
|
||||
- source:
|
||||
type: local
|
||||
path: EXTERNAL_COMPONENT_PATH
|
||||
|
||||
# Dummy uart entry to satisfy ld2410's DEPENDENCIES = ["uart"]
|
||||
uart:
|
||||
baud_rate: 115200
|
||||
port: /dev/null
|
||||
|
||||
uart_mock:
|
||||
id: mock_uart
|
||||
baud_rate: 256000
|
||||
injections:
|
||||
# Phase 1 (t=100ms): Valid LD2410 engineering mode data frame
|
||||
# Captured from a real Screek Human Presence Sensor 1U with LD2410 firmware 2.4.x
|
||||
#
|
||||
# Engineering mode frame layout (45 bytes):
|
||||
# [0-3] F4 F3 F2 F1 = data frame header
|
||||
# [4-5] 23 00 = length 35
|
||||
# [6] 01 = data type (engineering mode)
|
||||
# [7] AA = data header marker
|
||||
# [8] 03 = target states (moving+still)
|
||||
# [9-10] 1E 00 = moving distance 30 (0x001E)
|
||||
# [11] 64 = moving energy 100
|
||||
# [12-13] 1E 00 = still distance 30 (0x001E)
|
||||
# [14] 64 = still energy 100
|
||||
# [15-16] 00 00 = detection distance 0
|
||||
# [17] 08 = max moving distance gate
|
||||
# [18] 08 = max still distance gate
|
||||
# [19-27] gate moving energies (gates 0-8)
|
||||
# [28-36] gate still energies (gates 0-8)
|
||||
# [37] 57 = light sensor value 87
|
||||
# [38] 01 = out pin presence (HIGH)
|
||||
# [39] 55 = data footer marker
|
||||
# [40] 00 = check
|
||||
# [41-44] F8 F7 F6 F5 = data frame footer
|
||||
- delay: 100ms
|
||||
inject_rx:
|
||||
[
|
||||
0xF4, 0xF3, 0xF2, 0xF1,
|
||||
0x23, 0x00,
|
||||
0x01, 0xAA,
|
||||
0x03,
|
||||
0x1E, 0x00,
|
||||
0x64,
|
||||
0x1E, 0x00,
|
||||
0x64,
|
||||
0x00, 0x00,
|
||||
0x08, 0x08,
|
||||
0x64, 0x41, 0x06, 0x0E, 0x2B, 0x16, 0x03, 0x03, 0x07,
|
||||
0x00, 0x00, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64,
|
||||
0x57, 0x01,
|
||||
0x55, 0x00,
|
||||
0xF8, 0xF7, 0xF6, 0xF5,
|
||||
]
|
||||
|
||||
# Phase 2 (t=200ms): Second engineering mode frame with different values
|
||||
# Real capture: moving at 73cm, still at 30cm, detection at 33cm
|
||||
- delay: 100ms
|
||||
inject_rx:
|
||||
[
|
||||
0xF4, 0xF3, 0xF2, 0xF1,
|
||||
0x23, 0x00,
|
||||
0x01, 0xAA,
|
||||
0x03,
|
||||
0x49, 0x00,
|
||||
0x64,
|
||||
0x1E, 0x00,
|
||||
0x64,
|
||||
0x21, 0x00,
|
||||
0x08, 0x08,
|
||||
0x11, 0x64, 0x05, 0x29, 0x39, 0x10, 0x03, 0x11, 0x0E,
|
||||
0x00, 0x00, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64,
|
||||
0x57, 0x01,
|
||||
0x55, 0x00,
|
||||
0xF8, 0xF7, 0xF6, 0xF5,
|
||||
]
|
||||
|
||||
# Phase 3 (t=300ms): Frame with still target at 291cm (multi-byte distance)
|
||||
# This tests the two_byte_to_int function with high byte > 0
|
||||
# Note: low byte 0x2F < 0x80 so it avoids the signed char bug
|
||||
- delay: 100ms
|
||||
inject_rx:
|
||||
[
|
||||
0xF4, 0xF3, 0xF2, 0xF1,
|
||||
0x23, 0x00,
|
||||
0x01, 0xAA,
|
||||
0x03,
|
||||
0x2F, 0x00,
|
||||
0x36,
|
||||
0x23, 0x01,
|
||||
0x64,
|
||||
0x21, 0x00,
|
||||
0x08, 0x08,
|
||||
0x2F, 0x36, 0x09, 0x0D, 0x15, 0x0B, 0x06, 0x06, 0x08,
|
||||
0x00, 0x00, 0x64, 0x64, 0x64, 0x64, 0x64, 0x5A, 0x3D,
|
||||
0x57, 0x01,
|
||||
0x55, 0x00,
|
||||
0xF8, 0xF7, 0xF6, 0xF5,
|
||||
]
|
||||
|
||||
ld2410:
|
||||
id: ld2410_dev
|
||||
uart_id: mock_uart
|
||||
|
||||
sensor:
|
||||
- platform: ld2410
|
||||
ld2410_id: ld2410_dev
|
||||
moving_distance:
|
||||
name: "Moving Distance"
|
||||
still_distance:
|
||||
name: "Still Distance"
|
||||
moving_energy:
|
||||
name: "Moving Energy"
|
||||
still_energy:
|
||||
name: "Still Energy"
|
||||
detection_distance:
|
||||
name: "Detection Distance"
|
||||
light:
|
||||
name: "Light"
|
||||
g0:
|
||||
move_energy:
|
||||
name: "Gate 0 Move Energy"
|
||||
still_energy:
|
||||
name: "Gate 0 Still Energy"
|
||||
g1:
|
||||
move_energy:
|
||||
name: "Gate 1 Move Energy"
|
||||
still_energy:
|
||||
name: "Gate 1 Still Energy"
|
||||
g2:
|
||||
move_energy:
|
||||
name: "Gate 2 Move Energy"
|
||||
still_energy:
|
||||
name: "Gate 2 Still Energy"
|
||||
|
||||
binary_sensor:
|
||||
- platform: ld2410
|
||||
ld2410_id: ld2410_dev
|
||||
has_target:
|
||||
name: "Has Target"
|
||||
has_moving_target:
|
||||
name: "Has Moving Target"
|
||||
has_still_target:
|
||||
name: "Has Still Target"
|
||||
out_pin_presence_status:
|
||||
name: "Out Pin Presence"
|
||||
392
tests/integration/test_uart_mock_ld2410.py
Normal file
392
tests/integration/test_uart_mock_ld2410.py
Normal file
@@ -0,0 +1,392 @@
|
||||
"""Integration test for LD2410 component with mock UART.
|
||||
|
||||
Tests:
|
||||
test_uart_mock_ld2410 (normal mode):
|
||||
1. Happy path - valid data frame publishes correct sensor values
|
||||
2. Garbage resilience - random bytes don't crash the component
|
||||
3. Truncated frame handling - partial frame doesn't corrupt state
|
||||
4. Buffer overflow recovery - overflow resets the parser
|
||||
5. Post-overflow parsing - next valid frame after overflow is parsed correctly
|
||||
6. TX logging - verifies LD2410 sends expected setup commands
|
||||
|
||||
test_uart_mock_ld2410_engineering (engineering mode):
|
||||
1. Engineering mode frames with per-gate energy data and light sensor
|
||||
2. Multi-byte still distance (291cm) using high byte > 0
|
||||
3. Out pin presence binary sensor
|
||||
4. Gate energy sensor values from real device captures
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
|
||||
from aioesphomeapi import (
|
||||
BinarySensorInfo,
|
||||
BinarySensorState,
|
||||
EntityState,
|
||||
SensorInfo,
|
||||
SensorState,
|
||||
)
|
||||
import pytest
|
||||
|
||||
from .state_utils import InitialStateHelper, build_key_to_entity_mapping, find_entity
|
||||
from .types import APIClientConnectedFactory, RunCompiledFunction
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_uart_mock_ld2410(
|
||||
yaml_config: str,
|
||||
run_compiled: RunCompiledFunction,
|
||||
api_client_connected: APIClientConnectedFactory,
|
||||
) -> None:
|
||||
"""Test LD2410 data parsing with happy path, garbage, overflow, and recovery."""
|
||||
# Replace external component path placeholder
|
||||
external_components_path = str(
|
||||
Path(__file__).parent / "fixtures" / "external_components"
|
||||
)
|
||||
yaml_config = yaml_config.replace(
|
||||
"EXTERNAL_COMPONENT_PATH", external_components_path
|
||||
)
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
# Track overflow warning in logs
|
||||
overflow_seen = loop.create_future()
|
||||
|
||||
# Track TX data logged by the mock for assertions
|
||||
tx_log_lines: list[str] = []
|
||||
|
||||
def line_callback(line: str) -> None:
|
||||
if "Max command length exceeded" in line and not overflow_seen.done():
|
||||
overflow_seen.set_result(True)
|
||||
# Capture all TX log lines from uart_mock
|
||||
if "uart_mock" in line and "TX " in line:
|
||||
tx_log_lines.append(line)
|
||||
|
||||
# Track sensor state updates (after initial state is swallowed)
|
||||
sensor_states: dict[str, list[float]] = {
|
||||
"moving_distance": [],
|
||||
"still_distance": [],
|
||||
"moving_energy": [],
|
||||
"still_energy": [],
|
||||
"detection_distance": [],
|
||||
}
|
||||
binary_states: dict[str, list[bool]] = {
|
||||
"has_target": [],
|
||||
"has_moving_target": [],
|
||||
"has_still_target": [],
|
||||
}
|
||||
|
||||
# Signal when we see recovery frame values
|
||||
recovery_received = loop.create_future()
|
||||
|
||||
def on_state(state: EntityState) -> None:
|
||||
if isinstance(state, SensorState) and not state.missing_state:
|
||||
sensor_name = key_to_sensor.get(state.key)
|
||||
if sensor_name and sensor_name in sensor_states:
|
||||
sensor_states[sensor_name].append(state.state)
|
||||
# Check if this is the recovery frame (moving_distance = 50)
|
||||
if (
|
||||
sensor_name == "moving_distance"
|
||||
and state.state == pytest.approx(50.0)
|
||||
and not recovery_received.done()
|
||||
):
|
||||
recovery_received.set_result(True)
|
||||
elif isinstance(state, BinarySensorState):
|
||||
sensor_name = key_to_sensor.get(state.key)
|
||||
if sensor_name and sensor_name in binary_states:
|
||||
binary_states[sensor_name].append(state.state)
|
||||
|
||||
async with (
|
||||
run_compiled(yaml_config, line_callback=line_callback),
|
||||
api_client_connected() as client,
|
||||
):
|
||||
entities, _ = await client.list_entities_services()
|
||||
|
||||
# Build key mappings for all sensor types
|
||||
all_names = list(sensor_states.keys()) + list(binary_states.keys())
|
||||
key_to_sensor = build_key_to_entity_mapping(entities, all_names)
|
||||
|
||||
# Set up initial state helper
|
||||
initial_state_helper = InitialStateHelper(entities)
|
||||
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
|
||||
|
||||
try:
|
||||
await initial_state_helper.wait_for_initial_states()
|
||||
except TimeoutError:
|
||||
pytest.fail("Timeout waiting for initial states")
|
||||
|
||||
# Phase 1 values are in the initial states (swallowed by InitialStateHelper).
|
||||
# Verify them via initial_states dict.
|
||||
moving_dist_entity = find_entity(entities, "moving_distance", SensorInfo)
|
||||
assert moving_dist_entity is not None
|
||||
initial_moving = initial_state_helper.initial_states.get(moving_dist_entity.key)
|
||||
assert initial_moving is not None and isinstance(initial_moving, SensorState)
|
||||
assert initial_moving.state == pytest.approx(100.0), (
|
||||
f"Initial moving distance should be 100, got {initial_moving.state}"
|
||||
)
|
||||
|
||||
still_dist_entity = find_entity(entities, "still_distance", SensorInfo)
|
||||
assert still_dist_entity is not None
|
||||
initial_still = initial_state_helper.initial_states.get(still_dist_entity.key)
|
||||
assert initial_still is not None and isinstance(initial_still, SensorState)
|
||||
assert initial_still.state == pytest.approx(120.0), (
|
||||
f"Initial still distance should be 120, got {initial_still.state}"
|
||||
)
|
||||
|
||||
moving_energy_entity = find_entity(entities, "moving_energy", SensorInfo)
|
||||
assert moving_energy_entity is not None
|
||||
initial_me = initial_state_helper.initial_states.get(moving_energy_entity.key)
|
||||
assert initial_me is not None and isinstance(initial_me, SensorState)
|
||||
assert initial_me.state == pytest.approx(50.0), (
|
||||
f"Initial moving energy should be 50, got {initial_me.state}"
|
||||
)
|
||||
|
||||
still_energy_entity = find_entity(entities, "still_energy", SensorInfo)
|
||||
assert still_energy_entity is not None
|
||||
initial_se = initial_state_helper.initial_states.get(still_energy_entity.key)
|
||||
assert initial_se is not None and isinstance(initial_se, SensorState)
|
||||
assert initial_se.state == pytest.approx(25.0), (
|
||||
f"Initial still energy should be 25, got {initial_se.state}"
|
||||
)
|
||||
|
||||
detect_dist_entity = find_entity(entities, "detection_distance", SensorInfo)
|
||||
assert detect_dist_entity is not None
|
||||
initial_dd = initial_state_helper.initial_states.get(detect_dist_entity.key)
|
||||
assert initial_dd is not None and isinstance(initial_dd, SensorState)
|
||||
assert initial_dd.state == pytest.approx(300.0), (
|
||||
f"Initial detection distance should be 300, got {initial_dd.state}"
|
||||
)
|
||||
|
||||
# Wait for the recovery frame (Phase 5) to be parsed
|
||||
# This proves the component survived garbage + truncated + overflow
|
||||
try:
|
||||
await asyncio.wait_for(recovery_received, timeout=15.0)
|
||||
except TimeoutError:
|
||||
pytest.fail(
|
||||
f"Timeout waiting for recovery frame. Received sensor states:\n"
|
||||
f" moving_distance: {sensor_states['moving_distance']}\n"
|
||||
f" still_distance: {sensor_states['still_distance']}\n"
|
||||
f" moving_energy: {sensor_states['moving_energy']}\n"
|
||||
f" still_energy: {sensor_states['still_energy']}\n"
|
||||
f" detection_distance: {sensor_states['detection_distance']}"
|
||||
)
|
||||
|
||||
# Verify overflow warning was logged
|
||||
assert overflow_seen.done(), (
|
||||
"Expected 'Max command length exceeded' warning in logs"
|
||||
)
|
||||
|
||||
# Verify LD2410 sent setup commands (TX logging)
|
||||
# LD2410 sends 7 commands during setup: FF (config on), A0 (version),
|
||||
# A5 (MAC), AB (distance res), AE (light), 61 (params), FE (config off)
|
||||
assert len(tx_log_lines) > 0, "Expected TX log lines from uart_mock"
|
||||
tx_data = " ".join(tx_log_lines)
|
||||
# Verify command frame header appears (FD:FC:FB:FA)
|
||||
assert "FD:FC:FB:FA" in tx_data, (
|
||||
"Expected LD2410 command frame header FD:FC:FB:FA in TX log"
|
||||
)
|
||||
# Verify command frame footer appears (04:03:02:01)
|
||||
assert "04:03:02:01" in tx_data, (
|
||||
"Expected LD2410 command frame footer 04:03:02:01 in TX log"
|
||||
)
|
||||
|
||||
# Recovery frame values (Phase 5, after overflow)
|
||||
assert len(sensor_states["moving_distance"]) >= 1, (
|
||||
f"Expected recovery moving_distance, got: {sensor_states['moving_distance']}"
|
||||
)
|
||||
# Find the recovery value (moving_distance = 50)
|
||||
recovery_values = [
|
||||
v for v in sensor_states["moving_distance"] if v == pytest.approx(50.0)
|
||||
]
|
||||
assert len(recovery_values) >= 1, (
|
||||
f"Expected moving_distance=50 in recovery, got: {sensor_states['moving_distance']}"
|
||||
)
|
||||
|
||||
# Recovery frame: moving=50, still=75, energy=100/80, detect=127
|
||||
recovery_idx = next(
|
||||
i
|
||||
for i, v in enumerate(sensor_states["moving_distance"])
|
||||
if v == pytest.approx(50.0)
|
||||
)
|
||||
assert sensor_states["still_distance"][recovery_idx] == pytest.approx(75.0), (
|
||||
f"Recovery still distance should be 75, got {sensor_states['still_distance'][recovery_idx]}"
|
||||
)
|
||||
assert sensor_states["moving_energy"][recovery_idx] == pytest.approx(100.0), (
|
||||
f"Recovery moving energy should be 100, got {sensor_states['moving_energy'][recovery_idx]}"
|
||||
)
|
||||
assert sensor_states["still_energy"][recovery_idx] == pytest.approx(80.0), (
|
||||
f"Recovery still energy should be 80, got {sensor_states['still_energy'][recovery_idx]}"
|
||||
)
|
||||
assert sensor_states["detection_distance"][recovery_idx] == pytest.approx(
|
||||
127.0
|
||||
), (
|
||||
f"Recovery detection distance should be 127, got {sensor_states['detection_distance'][recovery_idx]}"
|
||||
)
|
||||
|
||||
# Verify binary sensors detected targets
|
||||
# Binary sensors could be in initial states or forwarded states
|
||||
has_target_entity = find_entity(entities, "has_target", BinarySensorInfo)
|
||||
assert has_target_entity is not None
|
||||
initial_ht = initial_state_helper.initial_states.get(has_target_entity.key)
|
||||
assert initial_ht is not None and isinstance(initial_ht, BinarySensorState)
|
||||
assert initial_ht.state is True, "Has target should be True"
|
||||
|
||||
has_moving_entity = find_entity(entities, "has_moving_target", BinarySensorInfo)
|
||||
assert has_moving_entity is not None
|
||||
initial_hm = initial_state_helper.initial_states.get(has_moving_entity.key)
|
||||
assert initial_hm is not None and isinstance(initial_hm, BinarySensorState)
|
||||
assert initial_hm.state is True, "Has moving target should be True"
|
||||
|
||||
has_still_entity = find_entity(entities, "has_still_target", BinarySensorInfo)
|
||||
assert has_still_entity is not None
|
||||
initial_hs = initial_state_helper.initial_states.get(has_still_entity.key)
|
||||
assert initial_hs is not None and isinstance(initial_hs, BinarySensorState)
|
||||
assert initial_hs.state is True, "Has still target should be True"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_uart_mock_ld2410_engineering(
|
||||
yaml_config: str,
|
||||
run_compiled: RunCompiledFunction,
|
||||
api_client_connected: APIClientConnectedFactory,
|
||||
) -> None:
|
||||
"""Test LD2410 engineering mode with per-gate energy, light, and multi-byte distance."""
|
||||
external_components_path = str(
|
||||
Path(__file__).parent / "fixtures" / "external_components"
|
||||
)
|
||||
yaml_config = yaml_config.replace(
|
||||
"EXTERNAL_COMPONENT_PATH", external_components_path
|
||||
)
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
# Track sensor state updates (after initial state is swallowed)
|
||||
sensor_states: dict[str, list[float]] = {
|
||||
"moving_distance": [],
|
||||
"still_distance": [],
|
||||
"moving_energy": [],
|
||||
"still_energy": [],
|
||||
"detection_distance": [],
|
||||
"light": [],
|
||||
"gate_0_move_energy": [],
|
||||
"gate_1_move_energy": [],
|
||||
"gate_2_move_energy": [],
|
||||
"gate_0_still_energy": [],
|
||||
"gate_1_still_energy": [],
|
||||
"gate_2_still_energy": [],
|
||||
}
|
||||
binary_states: dict[str, list[bool]] = {
|
||||
"has_target": [],
|
||||
"has_moving_target": [],
|
||||
"has_still_target": [],
|
||||
"out_pin_presence": [],
|
||||
}
|
||||
|
||||
# Signal when we see Phase 3 frame (still_distance = 291)
|
||||
phase3_received = loop.create_future()
|
||||
|
||||
def on_state(state: EntityState) -> None:
|
||||
if isinstance(state, SensorState) and not state.missing_state:
|
||||
sensor_name = key_to_sensor.get(state.key)
|
||||
if sensor_name and sensor_name in sensor_states:
|
||||
sensor_states[sensor_name].append(state.state)
|
||||
if (
|
||||
sensor_name == "still_distance"
|
||||
and state.state == pytest.approx(291.0)
|
||||
and not phase3_received.done()
|
||||
):
|
||||
phase3_received.set_result(True)
|
||||
elif isinstance(state, BinarySensorState):
|
||||
sensor_name = key_to_sensor.get(state.key)
|
||||
if sensor_name and sensor_name in binary_states:
|
||||
binary_states[sensor_name].append(state.state)
|
||||
|
||||
async with (
|
||||
run_compiled(yaml_config),
|
||||
api_client_connected() as client,
|
||||
):
|
||||
entities, _ = await client.list_entities_services()
|
||||
|
||||
all_names = list(sensor_states.keys()) + list(binary_states.keys())
|
||||
key_to_sensor = build_key_to_entity_mapping(entities, all_names)
|
||||
|
||||
initial_state_helper = InitialStateHelper(entities)
|
||||
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
|
||||
|
||||
try:
|
||||
await initial_state_helper.wait_for_initial_states()
|
||||
except TimeoutError:
|
||||
pytest.fail("Timeout waiting for initial states")
|
||||
|
||||
# Phase 1 initial values (engineering mode frame):
|
||||
# moving=30, energy=100, still=30, energy=100, detect=0
|
||||
moving_dist_entity = find_entity(entities, "moving_distance", SensorInfo)
|
||||
assert moving_dist_entity is not None
|
||||
initial_moving = initial_state_helper.initial_states.get(moving_dist_entity.key)
|
||||
assert initial_moving is not None and isinstance(initial_moving, SensorState)
|
||||
assert initial_moving.state == pytest.approx(30.0), (
|
||||
f"Initial moving distance should be 30, got {initial_moving.state}"
|
||||
)
|
||||
|
||||
still_dist_entity = find_entity(entities, "still_distance", SensorInfo)
|
||||
assert still_dist_entity is not None
|
||||
initial_still = initial_state_helper.initial_states.get(still_dist_entity.key)
|
||||
assert initial_still is not None and isinstance(initial_still, SensorState)
|
||||
assert initial_still.state == pytest.approx(30.0), (
|
||||
f"Initial still distance should be 30, got {initial_still.state}"
|
||||
)
|
||||
|
||||
# Verify engineering mode sensors from initial state
|
||||
# Gate 0 moving energy = 0x64 = 100
|
||||
gate0_move_entity = find_entity(entities, "gate_0_move_energy", SensorInfo)
|
||||
assert gate0_move_entity is not None
|
||||
initial_g0m = initial_state_helper.initial_states.get(gate0_move_entity.key)
|
||||
assert initial_g0m is not None and isinstance(initial_g0m, SensorState)
|
||||
assert initial_g0m.state == pytest.approx(100.0), (
|
||||
f"Gate 0 move energy should be 100, got {initial_g0m.state}"
|
||||
)
|
||||
|
||||
# Gate 1 moving energy = 0x41 = 65
|
||||
gate1_move_entity = find_entity(entities, "gate_1_move_energy", SensorInfo)
|
||||
assert gate1_move_entity is not None
|
||||
initial_g1m = initial_state_helper.initial_states.get(gate1_move_entity.key)
|
||||
assert initial_g1m is not None and isinstance(initial_g1m, SensorState)
|
||||
assert initial_g1m.state == pytest.approx(65.0), (
|
||||
f"Gate 1 move energy should be 65, got {initial_g1m.state}"
|
||||
)
|
||||
|
||||
# Light sensor = 0x57 = 87
|
||||
light_entity = find_entity(entities, "light", SensorInfo)
|
||||
assert light_entity is not None
|
||||
initial_light = initial_state_helper.initial_states.get(light_entity.key)
|
||||
assert initial_light is not None and isinstance(initial_light, SensorState)
|
||||
assert initial_light.state == pytest.approx(87.0), (
|
||||
f"Light sensor should be 87, got {initial_light.state}"
|
||||
)
|
||||
|
||||
# Out pin presence = 0x01 = True
|
||||
out_pin_entity = find_entity(entities, "out_pin_presence", BinarySensorInfo)
|
||||
assert out_pin_entity is not None
|
||||
initial_out = initial_state_helper.initial_states.get(out_pin_entity.key)
|
||||
assert initial_out is not None and isinstance(initial_out, BinarySensorState)
|
||||
assert initial_out.state is True, "Out pin presence should be True"
|
||||
|
||||
# Wait for Phase 3 frame (still_distance = 291cm, multi-byte)
|
||||
try:
|
||||
await asyncio.wait_for(phase3_received, timeout=15.0)
|
||||
except TimeoutError:
|
||||
pytest.fail(
|
||||
f"Timeout waiting for Phase 3 frame. Received sensor states:\n"
|
||||
f" still_distance: {sensor_states['still_distance']}\n"
|
||||
f" moving_distance: {sensor_states['moving_distance']}"
|
||||
)
|
||||
|
||||
# Phase 3: still distance = 0x0123 = 291cm (multi-byte distance test)
|
||||
phase3_still = [
|
||||
v for v in sensor_states["still_distance"] if v == pytest.approx(291.0)
|
||||
]
|
||||
assert len(phase3_still) >= 1, (
|
||||
f"Expected still_distance=291, got: {sensor_states['still_distance']}"
|
||||
)
|
||||
Reference in New Issue
Block a user