Create integration tests for modbus (#14395)

Co-authored-by: J. Nick Koston <nick@home-assistant.io>
This commit is contained in:
Bonne Eggleston
2026-03-02 00:51:27 -08:00
committed by GitHub
parent 590ee81f7a
commit 3160457ca6
7 changed files with 409 additions and 7 deletions

View File

@@ -1,8 +1,28 @@
from esphome import automation
import esphome.codegen as cg
from esphome.components import uart
from esphome.components.uart import CONF_DATA_BITS, CONF_PARITY, CONF_STOP_BITS
from esphome.components.uart import (
CONF_RX_FULL_THRESHOLD,
CONF_RX_TIMEOUT,
debug_to_code,
maybe_empty_debug,
validate_raw_data,
)
import esphome.config_validation as cv
from esphome.const import CONF_BAUD_RATE, CONF_DATA, CONF_DELAY, CONF_ID, CONF_INTERVAL
from esphome.const import (
CONF_BAUD_RATE,
CONF_DATA,
CONF_DATA_BITS,
CONF_DEBUG,
CONF_DELAY,
CONF_ID,
CONF_INTERVAL,
CONF_PARITY,
CONF_RX_BUFFER_SIZE,
CONF_STOP_BITS,
CONF_TRIGGER_ID,
)
from esphome.core import ID
CODEOWNERS = ["@esphome/tests"]
MULTI_CONF = True
@@ -11,12 +31,20 @@ uart_mock_ns = cg.esphome_ns.namespace("uart_mock")
MockUartComponent = uart_mock_ns.class_(
"MockUartComponent", uart.UARTComponent, cg.Component
)
MockUartInjectRXAction = uart_mock_ns.class_(
"MockUartInjectRXAction", automation.Action
)
MockUartTXTrigger = uart_mock_ns.class_(
"MockUartTXTrigger",
automation.Trigger.template(cg.std_vector.template(cg.uint8)),
)
CONF_INJECTIONS = "injections"
CONF_RESPONSES = "responses"
CONF_INJECT_RX = "inject_rx"
CONF_EXPECT_TX = "expect_tx"
CONF_PERIODIC_RX = "periodic_rx"
CONF_ON_TX = "on_tx"
UART_PARITY_OPTIONS = {
"NONE": uart.UARTParityOptions.UART_CONFIG_PARITY_NONE,
@@ -31,6 +59,15 @@ INJECTION_SCHEMA = cv.Schema(
}
)
CONFIG_INJECT_RX_SCHEMA = cv.maybe_simple_value(
{
cv.GenerateID(): cv.use_id(MockUartComponent),
cv.Required("data"): cv.templatable(validate_raw_data),
},
key=CONF_DATA,
)
RESPONSE_SCHEMA = cv.Schema(
{
cv.Required(CONF_EXPECT_TX): [cv.hex_uint8_t],
@@ -49,6 +86,9 @@ CONFIG_SCHEMA = cv.Schema(
{
cv.GenerateID(): cv.declare_id(MockUartComponent),
cv.Required(CONF_BAUD_RATE): cv.int_range(min=1),
cv.Optional(CONF_RX_BUFFER_SIZE, default=256): cv.validate_bytes,
cv.Optional(CONF_RX_FULL_THRESHOLD, default=10): cv.int_range(min=1, max=120),
cv.Optional(CONF_RX_TIMEOUT, default=2): cv.int_range(min=0, max=92),
cv.Optional(CONF_STOP_BITS, default=1): cv.one_of(1, 2, int=True),
cv.Optional(CONF_DATA_BITS, default=8): cv.int_range(min=5, max=8),
cv.Optional(CONF_PARITY, default="NONE"): cv.enum(
@@ -57,15 +97,45 @@ CONFIG_SCHEMA = cv.Schema(
cv.Optional(CONF_INJECTIONS, default=[]): cv.ensure_list(INJECTION_SCHEMA),
cv.Optional(CONF_RESPONSES, default=[]): cv.ensure_list(RESPONSE_SCHEMA),
cv.Optional(CONF_PERIODIC_RX, default=[]): cv.ensure_list(PERIODIC_RX_SCHEMA),
cv.Optional(CONF_ON_TX): automation.validate_automation(
{
cv.GenerateID(CONF_TRIGGER_ID): cv.declare_id(MockUartTXTrigger),
}
),
cv.Optional(CONF_DEBUG): maybe_empty_debug,
}
).extend(cv.COMPONENT_SCHEMA)
@automation.register_action(
"uart_mock.inject_rx", MockUartInjectRXAction, CONFIG_INJECT_RX_SCHEMA
)
async def inject_rx_to_code(config, action_id, template_arg, args):
var = cg.new_Pvariable(action_id, template_arg)
await cg.register_parented(var, config[CONF_ID])
data = config[CONF_DATA]
if isinstance(data, bytes):
data = list(data)
if cg.is_template(data):
templ = await cg.templatable(data, args, cg.std_vector.template(cg.uint8))
cg.add(var.set_data_template(templ))
else:
# Generate static array in flash to avoid RAM copy
arr_id = ID(f"{action_id}_data", is_declaration=True, type=cg.uint8)
arr = cg.static_const_array(arr_id, cg.ArrayInitializer(*data))
cg.add(var.set_data_static(arr, len(data)))
return var
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
await cg.register_component(var, config)
cg.add(var.set_baud_rate(config[CONF_BAUD_RATE]))
cg.add(var.set_rx_buffer_size(config[CONF_RX_BUFFER_SIZE]))
cg.add(var.set_rx_full_threshold(config[CONF_RX_FULL_THRESHOLD]))
cg.add(var.set_rx_timeout(config[CONF_RX_TIMEOUT]))
cg.add(var.set_stop_bits(config[CONF_STOP_BITS]))
cg.add(var.set_data_bits(config[CONF_DATA_BITS]))
cg.add(var.set_parity(config[CONF_PARITY]))
@@ -84,3 +154,12 @@ async def to_code(config):
data = periodic[CONF_DATA]
interval = periodic[CONF_INTERVAL]
cg.add(var.add_periodic_rx(data, interval))
for conf in config.get(CONF_ON_TX, []):
trigger = cg.new_Pvariable(conf[CONF_TRIGGER_ID], var)
await automation.build_automation(
trigger, [(cg.std_vector.template(cg.uint8), "data")], conf
)
if CONF_DEBUG in config:
await debug_to_code(config[CONF_DEBUG], var)

View File

@@ -0,0 +1,51 @@
#pragma once
#include "esphome/core/component.h"
#include "esphome/core/automation.h"
#include "uart_mock.h"
namespace esphome::uart_mock {
// This pattern is similar to UARTWriteAction but calls inject_rx instead of write_array, and is parented to VirtualUART
// instead of UARTComponent
template<typename... Ts> class MockUartInjectRXAction : public Action<Ts...>, public Parented<MockUartComponent> {
public:
void set_data_template(std::vector<uint8_t> (*func)(Ts...)) {
// Stateless lambdas (generated by ESPHome) implicitly convert to function pointers
this->code_.func = func;
this->len_ = -1; // Sentinel value indicates template mode
}
// Store pointer to static data in flash (no RAM copy)
void set_data_static(const uint8_t *data, size_t len) {
this->code_.data = data;
this->len_ = len; // Length >= 0 indicates static mode
}
void play(const Ts &...x) override {
if (this->len_ >= 0) {
// Static mode: use pointer and length
this->parent_->inject_to_rx_buffer(this->code_.data, static_cast<size_t>(this->len_));
} else {
// Template mode: call function
auto val = this->code_.func(x...);
this->parent_->inject_to_rx_buffer(val);
}
}
protected:
ssize_t len_{-1}; // -1 = template mode, >=0 = static mode with length
union Code {
std::vector<uint8_t> (*func)(Ts...); // Function pointer (stateless lambdas)
const uint8_t *data; // Pointer to static data in flash
} code_;
};
class MockUartTXTrigger : public Trigger<std::vector<uint8_t>> {
public:
explicit MockUartTXTrigger(MockUartComponent *parent) {
parent->set_tx_hook([this](std::vector<uint8_t> data) { this->trigger(data); });
}
};
} // namespace esphome::uart_mock

View File

@@ -35,7 +35,7 @@ void MockUartComponent::loop() {
uint32_t target_time = this->scenario_start_ms_ + this->cumulative_delay_ms_ + injection.delay_ms;
if (now >= target_time) {
ESP_LOGD(TAG, "Injecting %zu RX bytes (injection %u)", injection.rx_data.size(), this->injection_index_);
this->inject_to_rx_buffer_(injection.rx_data);
this->inject_to_rx_buffer(injection.rx_data);
this->cumulative_delay_ms_ += injection.delay_ms;
this->injection_index_++;
}
@@ -44,7 +44,7 @@ void MockUartComponent::loop() {
// Process periodic RX
for (auto &periodic : this->periodic_rx_) {
if (now - periodic.last_inject_ms >= periodic.interval_ms) {
this->inject_to_rx_buffer_(periodic.data);
this->inject_to_rx_buffer(periodic.data);
periodic.last_inject_ms = now;
}
}
@@ -79,6 +79,12 @@ void MockUartComponent::write_array(const uint8_t *data, size_t len) {
#endif
this->try_match_response_();
// This directly calls a tx_hook (lambda) as an alternative to the simpler match_response mechanism.
if (this->tx_hook_) {
std::vector<uint8_t> buf(data, data + len);
this->tx_hook_(buf);
}
}
bool MockUartComponent::peek_byte(uint8_t *data) {
@@ -114,6 +120,12 @@ void MockUartComponent::flush() {
// Nothing to flush in mock
}
void MockUartComponent::set_rx_full_threshold(size_t rx_full_threshold) {
this->rx_full_threshold_ = rx_full_threshold;
}
void MockUartComponent::set_rx_timeout(size_t rx_timeout) { this->rx_timeout_ = rx_timeout; }
void MockUartComponent::add_injection(const std::vector<uint8_t> &rx_data, uint32_t delay_ms) {
this->injections_.push_back({rx_data, delay_ms});
}
@@ -135,14 +147,19 @@ void MockUartComponent::try_match_response_() {
size_t offset = this->tx_buffer_.size() - response.expect_tx.size();
if (std::equal(response.expect_tx.begin(), response.expect_tx.end(), this->tx_buffer_.begin() + offset)) {
ESP_LOGD(TAG, "TX match found, injecting %zu RX bytes", response.inject_rx.size());
this->inject_to_rx_buffer_(response.inject_rx);
this->inject_to_rx_buffer(response.inject_rx);
this->tx_buffer_.clear();
return;
}
}
}
void MockUartComponent::inject_to_rx_buffer_(const std::vector<uint8_t> &data) {
void MockUartComponent::inject_to_rx_buffer(const uint8_t *data, size_t len) {
std::vector<uint8_t> vec(data, data + len);
this->inject_to_rx_buffer(vec);
}
void MockUartComponent::inject_to_rx_buffer(const std::vector<uint8_t> &data) {
// Log injected RX data so tests can see what's being fed to the component
if (!data.empty() && data.size() <= 64) {
char hex_buf[format_hex_pretty_size(64)];

View File

@@ -29,16 +29,21 @@ class MockUartComponent : public uart::UARTComponent, public Component {
bool read_array(uint8_t *data, size_t len) override;
size_t available() override;
void flush() override;
void set_rx_full_threshold(size_t rx_full_threshold) override;
void set_rx_timeout(size_t rx_timeout) override;
// Scenario configuration - called from generated code
void add_injection(const std::vector<uint8_t> &rx_data, uint32_t delay_ms);
void add_response(const std::vector<uint8_t> &expect_tx, const std::vector<uint8_t> &inject_rx);
void add_periodic_rx(const std::vector<uint8_t> &data, uint32_t interval_ms);
void set_tx_hook(std::function<void(const std::vector<uint8_t> &)> &&cb) { this->tx_hook_ = std::move(cb); }
void inject_to_rx_buffer(const std::vector<uint8_t> &data);
void inject_to_rx_buffer(const uint8_t *data, size_t len);
protected:
void check_logger_conflict() override {}
void try_match_response_();
void inject_to_rx_buffer_(const std::vector<uint8_t> &data);
// Timed injections
struct Injection {
@@ -73,6 +78,9 @@ class MockUartComponent : public uart::UARTComponent, public Component {
// Observability
uint32_t tx_count_{0};
uint32_t rx_count_{0};
// Direct TX hook for tests that want to bypass the response-matching logic
std::function<void(const std::vector<uint8_t> &)> tx_hook_;
};
} // namespace esphome::uart_mock

View File

@@ -0,0 +1,40 @@
esphome:
name: uart-mock-modbus-test
host:
api:
logger:
level: VERBOSE
external_components:
- source:
type: local
path: EXTERNAL_COMPONENT_PATH
# Dummy uart entry to satisfy modbus's DEPENDENCIES = ["uart"]
# The actual UART bus used is the uart_mock component below
uart:
baud_rate: 115200
port: /dev/null
uart_mock:
- id: virtual_uart_dev
baud_rate: 9600
rx_full_threshold: 120
rx_timeout: 2
debug:
responses:
- expect_tx: [0x01, 0x03, 0x00, 0x03, 0x00, 0x01, 0x74, 0x0A] # Read holding register 1 on device 1
inject_rx: [0x01, 0x03, 0x02, 0x01, 0x03, 0xF9, 0xD5] # Return value 0x0103 (hex) = 259 (dec)
modbus:
uart_id: virtual_uart_dev
modbus_controller:
address: 1
sensor:
- platform: modbus_controller
name: "basic_register"
address: 0x03
register_type: holding

View File

@@ -0,0 +1,54 @@
esphome:
name: uart-mock-modbus-test
host:
api:
logger:
level: VERBOSE
external_components:
- source:
type: local
path: EXTERNAL_COMPONENT_PATH
# Dummy uart entry to satisfy modbus's DEPENDENCIES = ["uart"]
# The actual UART bus used is the uart_mock component below
uart:
baud_rate: 115200
port: /dev/null
uart_mock:
- id: virtual_uart_dev
baud_rate: 9600
rx_full_threshold: 120
rx_timeout: 2
debug:
on_tx:
- then:
- if:
condition: #Read 80 input registers on device 2, starting at address 0 (SDM meter request)
lambda: "return data == std::vector<uint8_t>({0x02,0x04,0x00,0x00,0x00,0x50,0xF0,0x05});"
then:
- uart_mock.inject_rx: # Good response from SDM meter with CRC. Voltage is 243V
!lambda return {0x02,0x04,0xA0,0x43,0x73,0x19,0x9A,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,
0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,
0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,
0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x3F,0x80,0x00,0x00,0x00,0x00,0x00,0x00,0x00,
0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,
0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,
0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00};
- delay: 120ms # Simulate some delay for UART buffer to fill before next part of response (1ms = 1 byte at 9600 baud)
- uart_mock.inject_rx: # Rest of response (including CRC)
!lambda return{0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,
0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x42,0x6F,0xCC,0xCD,0x43,0x7C,0xB8,0x10,0x3D,0x38,0x51,0xEC,
0x43,0x81,0x1B,0xE7,0x3B,0x03,0x12,0x6F,0x50,0x1B};
modbus:
uart_id: virtual_uart_dev
sensor:
- platform: sdm_meter
address: 2
phase_a:
voltage:
name: sdm_voltage

View File

@@ -0,0 +1,153 @@
"""Integration test for modbus component with virtual UART.
Tests:
test_uart_mock_modbus :
1. Read a single register and parse successfully (basic_register)
2. Read multiple registers from SDM meter and parse successfully (sdm_voltage), with some intermediate delay to simulate UART buffer time.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from aioesphomeapi import EntityState, SensorState
import pytest
from .state_utils import InitialStateHelper, build_key_to_entity_mapping
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_uart_mock_modbus(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test basic modbus data parsing."""
# Replace external component path placeholder
external_components_path = str(
Path(__file__).parent / "fixtures" / "external_components"
)
yaml_config = yaml_config.replace(
"EXTERNAL_COMPONENT_PATH", external_components_path
)
loop = asyncio.get_running_loop()
# Track sensor state updates (after initial state is swallowed)
sensor_states: dict[str, list[float]] = {
"basic_register": [],
}
basic_register_changed = loop.create_future()
def on_state(state: EntityState) -> None:
if isinstance(state, SensorState) and not state.missing_state:
sensor_name = key_to_sensor.get(state.key)
if sensor_name and sensor_name in sensor_states:
sensor_states[sensor_name].append(state.state)
if (
sensor_name == "basic_register"
and state.state == 259.0
and not basic_register_changed.done()
):
basic_register_changed.set_result(True)
async with (
run_compiled(yaml_config),
api_client_connected() as client,
):
entities, _ = await client.list_entities_services()
# Build key mappings for all sensor types
all_names = list(sensor_states.keys())
key_to_sensor = build_key_to_entity_mapping(entities, all_names)
# Set up initial state helper
initial_state_helper = InitialStateHelper(entities)
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
try:
await initial_state_helper.wait_for_initial_states()
except TimeoutError:
pytest.fail("Timeout waiting for initial states")
# Wait for basic register to be updated with successful parse
try:
await asyncio.wait_for(basic_register_changed, timeout=15.0)
except TimeoutError:
pytest.fail(
f"Timeout waiting for Basic Register change. Received sensor states:\n"
f" basic_register: {sensor_states['basic_register']}\n"
)
@pytest.mark.asyncio
@pytest.mark.xfail(
reason="There is a bug in UART which will timeout for long responses."
)
async def test_uart_mock_modbus_timing(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test basic modbus data parsing."""
# Replace external component path placeholder
external_components_path = str(
Path(__file__).parent / "fixtures" / "external_components"
)
yaml_config = yaml_config.replace(
"EXTERNAL_COMPONENT_PATH", external_components_path
)
loop = asyncio.get_running_loop()
# Track sensor state updates (after initial state is swallowed)
sensor_states: dict[str, list[float]] = {
"sdm_voltage": [],
}
voltage_changed = loop.create_future()
def on_state(state: EntityState) -> None:
if isinstance(state, SensorState) and not state.missing_state:
sensor_name = key_to_sensor.get(state.key)
if sensor_name and sensor_name in sensor_states:
sensor_states[sensor_name].append(state.state)
# Check if this is a good voltage reading (243V)
if (
sensor_name == "sdm_voltage"
and state.state > 200.0
and not voltage_changed.done()
):
voltage_changed.set_result(True)
async with (
run_compiled(yaml_config),
api_client_connected() as client,
):
entities, _ = await client.list_entities_services()
# Build key mappings for all sensor types
all_names = list(sensor_states.keys())
key_to_sensor = build_key_to_entity_mapping(entities, all_names)
# Set up initial state helper
initial_state_helper = InitialStateHelper(entities)
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
try:
await initial_state_helper.wait_for_initial_states()
except TimeoutError:
pytest.fail("Timeout waiting for initial states")
# Wait for voltage to be updated with successful parse
try:
await asyncio.wait_for(voltage_changed, timeout=15.0)
except TimeoutError:
pytest.fail(
f"Timeout waiting for SDM voltage change. Received sensor states:\n"
f" sdm_voltage: {sensor_states['sdm_voltage']}\n"
)