mirror of
https://github.com/esphome/esphome.git
synced 2026-02-18 15:35:59 -07:00
tests
This commit is contained in:
@@ -34,6 +34,7 @@ from esphome.__main__ import (
|
||||
has_non_ip_address,
|
||||
has_resolvable_address,
|
||||
mqtt_get_ip,
|
||||
run_miniterm,
|
||||
show_logs,
|
||||
upload_program,
|
||||
upload_using_esptool,
|
||||
@@ -41,11 +42,13 @@ from esphome.__main__ import (
|
||||
from esphome.components.esp32 import KEY_ESP32, KEY_VARIANT, VARIANT_ESP32
|
||||
from esphome.const import (
|
||||
CONF_API,
|
||||
CONF_BAUD_RATE,
|
||||
CONF_BROKER,
|
||||
CONF_DISABLED,
|
||||
CONF_ESPHOME,
|
||||
CONF_LEVEL,
|
||||
CONF_LOG_TOPIC,
|
||||
CONF_LOGGER,
|
||||
CONF_MDNS,
|
||||
CONF_MQTT,
|
||||
CONF_NAME,
|
||||
@@ -838,6 +841,7 @@ class MockArgs:
|
||||
configuration: str | None = None
|
||||
name: str | None = None
|
||||
dashboard: bool = False
|
||||
reset: bool = False
|
||||
|
||||
|
||||
def test_upload_program_serial_esp32(
|
||||
@@ -2802,3 +2806,256 @@ def test_compile_program_no_build_info_when_json_missing_keys(
|
||||
|
||||
assert result == 0
|
||||
assert "Build Info:" not in caplog.text
|
||||
|
||||
|
||||
# Tests for run_miniterm serial log batching
|
||||
|
||||
|
||||
class MockSerial:
|
||||
"""Mock serial port for testing run_miniterm."""
|
||||
|
||||
def __init__(self, chunks: list[bytes]) -> None:
|
||||
"""Initialize with a list of chunks to return from read().
|
||||
|
||||
Args:
|
||||
chunks: List of byte chunks to return sequentially.
|
||||
Empty bytes at the end signal end of data.
|
||||
"""
|
||||
self.chunks = list(chunks)
|
||||
self.chunk_index = 0
|
||||
self.baudrate = 0
|
||||
self.port = ""
|
||||
self.dtr = True
|
||||
self.rts = True
|
||||
self.timeout = 0.1
|
||||
self._is_open = False
|
||||
|
||||
def __enter__(self) -> MockSerial:
|
||||
self._is_open = True
|
||||
return self
|
||||
|
||||
def __exit__(self, *args: Any) -> None:
|
||||
self._is_open = False
|
||||
|
||||
@property
|
||||
def in_waiting(self) -> int:
|
||||
"""Return number of bytes available."""
|
||||
if self.chunk_index < len(self.chunks):
|
||||
return len(self.chunks[self.chunk_index])
|
||||
return 0
|
||||
|
||||
def read(self, size: int = 1) -> bytes:
|
||||
"""Read next chunk of data."""
|
||||
if self.chunk_index < len(self.chunks):
|
||||
chunk = self.chunks[self.chunk_index]
|
||||
self.chunk_index += 1
|
||||
if not chunk:
|
||||
# Empty chunk means we're done - simulate end
|
||||
import serial
|
||||
|
||||
raise serial.SerialException("Port closed")
|
||||
return chunk
|
||||
import serial
|
||||
|
||||
raise serial.SerialException("Port closed")
|
||||
|
||||
|
||||
def test_run_miniterm_batches_lines_with_same_timestamp(
|
||||
capfd: CaptureFixture[str],
|
||||
) -> None:
|
||||
"""Test that lines from the same chunk get the same timestamp."""
|
||||
# Simulate receiving multiple log lines in a single chunk
|
||||
# This is how data arrives over USB - many lines at once
|
||||
chunk = b"[I][app:100]: Line 1\r\n[I][app:100]: Line 2\r\n[I][app:100]: Line 3\r\n"
|
||||
|
||||
mock_serial = MockSerial([chunk, b""])
|
||||
|
||||
config = {
|
||||
CONF_LOGGER: {
|
||||
CONF_BAUD_RATE: 115200,
|
||||
"deassert_rts_dtr": False,
|
||||
}
|
||||
}
|
||||
args = MockArgs()
|
||||
|
||||
with (
|
||||
patch("serial.Serial", return_value=mock_serial),
|
||||
patch.object(platformio_api, "process_stacktrace") as mock_bt,
|
||||
):
|
||||
mock_bt.return_value = False
|
||||
result = run_miniterm(config, "/dev/ttyUSB0", args)
|
||||
|
||||
assert result == 0
|
||||
|
||||
captured = capfd.readouterr()
|
||||
lines = [line for line in captured.out.strip().split("\n") if line]
|
||||
|
||||
# All 3 lines should have the same timestamp (first 13 chars like "[HH:MM:SS.mmm]")
|
||||
assert len(lines) == 3
|
||||
timestamps = [line[:13] for line in lines]
|
||||
assert timestamps[0] == timestamps[1] == timestamps[2], (
|
||||
f"Lines from same chunk should have same timestamp: {timestamps}"
|
||||
)
|
||||
|
||||
|
||||
def test_run_miniterm_different_chunks_different_timestamps(
|
||||
capfd: CaptureFixture[str],
|
||||
) -> None:
|
||||
"""Test that lines from different chunks can have different timestamps."""
|
||||
# Two separate chunks - could have different timestamps
|
||||
chunk1 = b"[I][app:100]: Chunk 1 Line\r\n"
|
||||
chunk2 = b"[I][app:100]: Chunk 2 Line\r\n"
|
||||
|
||||
mock_serial = MockSerial([chunk1, chunk2, b""])
|
||||
|
||||
config = {
|
||||
CONF_LOGGER: {
|
||||
CONF_BAUD_RATE: 115200,
|
||||
"deassert_rts_dtr": False,
|
||||
}
|
||||
}
|
||||
args = MockArgs()
|
||||
|
||||
with (
|
||||
patch("serial.Serial", return_value=mock_serial),
|
||||
patch.object(platformio_api, "process_stacktrace") as mock_bt,
|
||||
):
|
||||
mock_bt.return_value = False
|
||||
result = run_miniterm(config, "/dev/ttyUSB0", args)
|
||||
|
||||
assert result == 0
|
||||
|
||||
captured = capfd.readouterr()
|
||||
lines = [line for line in captured.out.strip().split("\n") if line]
|
||||
assert len(lines) == 2
|
||||
|
||||
|
||||
def test_run_miniterm_handles_split_lines() -> None:
|
||||
"""Test that partial lines are buffered until complete."""
|
||||
# Line split across two chunks
|
||||
chunk1 = b"[I][app:100]: Start of "
|
||||
chunk2 = b"line\r\n"
|
||||
|
||||
mock_serial = MockSerial([chunk1, chunk2, b""])
|
||||
|
||||
config = {
|
||||
CONF_LOGGER: {
|
||||
CONF_BAUD_RATE: 115200,
|
||||
"deassert_rts_dtr": False,
|
||||
}
|
||||
}
|
||||
args = MockArgs()
|
||||
|
||||
with (
|
||||
patch("serial.Serial", return_value=mock_serial),
|
||||
patch.object(platformio_api, "process_stacktrace") as mock_bt,
|
||||
patch("esphome.__main__.safe_print") as mock_print,
|
||||
):
|
||||
mock_bt.return_value = False
|
||||
run_miniterm(config, "/dev/ttyUSB0", args)
|
||||
|
||||
# Should have printed exactly one complete line
|
||||
assert mock_print.call_count == 1
|
||||
printed_line = mock_print.call_args[0][0]
|
||||
assert "Start of line" in printed_line
|
||||
|
||||
|
||||
def test_run_miniterm_backtrace_state_maintained() -> None:
|
||||
"""Test that backtrace_state is properly maintained across lines.
|
||||
|
||||
ESP8266 backtraces span multiple lines between >>>stack>>> and <<<stack<<<.
|
||||
The backtrace_state must persist correctly when lines arrive in the same chunk.
|
||||
"""
|
||||
# Simulate ESP8266 multi-line backtrace arriving in a single chunk
|
||||
backtrace_chunk = (
|
||||
b">>>stack>>>\r\n"
|
||||
b"3ffffe90: 40220ef8 b66aa8c0 3fff0a4c 40204c84\r\n"
|
||||
b"3ffffea0: 00000005 0000a635 3fff191c 4020413c\r\n"
|
||||
b"<<<stack<<<\r\n"
|
||||
)
|
||||
|
||||
mock_serial = MockSerial([backtrace_chunk, b""])
|
||||
|
||||
config = {
|
||||
CONF_LOGGER: {
|
||||
CONF_BAUD_RATE: 115200,
|
||||
"deassert_rts_dtr": False,
|
||||
}
|
||||
}
|
||||
args = MockArgs()
|
||||
|
||||
backtrace_states: list[tuple[str, bool]] = []
|
||||
|
||||
def track_backtrace_state(
|
||||
config: dict[str, Any], line: str, backtrace_state: bool
|
||||
) -> bool:
|
||||
"""Track the backtrace_state progression."""
|
||||
backtrace_states.append((line, backtrace_state))
|
||||
# Simulate actual behavior
|
||||
if ">>>stack>>>" in line:
|
||||
return True
|
||||
if "<<<stack<<<" in line:
|
||||
return False
|
||||
return backtrace_state
|
||||
|
||||
with (
|
||||
patch("serial.Serial", return_value=mock_serial),
|
||||
patch.object(
|
||||
platformio_api,
|
||||
"process_stacktrace",
|
||||
side_effect=track_backtrace_state,
|
||||
),
|
||||
):
|
||||
run_miniterm(config, "/dev/ttyUSB0", args)
|
||||
|
||||
# Verify the state progression
|
||||
assert len(backtrace_states) == 4
|
||||
|
||||
# Line 1: >>>stack>>> - state should be False (before processing)
|
||||
assert ">>>stack>>>" in backtrace_states[0][0]
|
||||
assert backtrace_states[0][1] is False
|
||||
|
||||
# Line 2: stack data - state should be True (after >>>stack>>>)
|
||||
assert "40220ef8" in backtrace_states[1][0]
|
||||
assert backtrace_states[1][1] is True
|
||||
|
||||
# Line 3: more stack data - state should be True
|
||||
assert "4020413c" in backtrace_states[2][0]
|
||||
assert backtrace_states[2][1] is True
|
||||
|
||||
# Line 4: <<<stack<<< - state should be True (before processing end marker)
|
||||
assert "<<<stack<<<" in backtrace_states[3][0]
|
||||
assert backtrace_states[3][1] is True
|
||||
|
||||
|
||||
def test_run_miniterm_no_logger_returns_early(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test that run_miniterm returns early if logger is not configured."""
|
||||
config: dict[str, Any] = {} # No logger config
|
||||
args = MockArgs()
|
||||
|
||||
with caplog.at_level(logging.INFO):
|
||||
result = run_miniterm(config, "/dev/ttyUSB0", args)
|
||||
|
||||
assert result == 1
|
||||
assert "Logger is not enabled" in caplog.text
|
||||
|
||||
|
||||
def test_run_miniterm_baud_rate_zero_returns_early(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test that run_miniterm returns early if baud_rate is 0."""
|
||||
config = {
|
||||
CONF_LOGGER: {
|
||||
CONF_BAUD_RATE: 0,
|
||||
"deassert_rts_dtr": False,
|
||||
}
|
||||
}
|
||||
args = MockArgs()
|
||||
|
||||
with caplog.at_level(logging.INFO):
|
||||
result = run_miniterm(config, "/dev/ttyUSB0", args)
|
||||
|
||||
assert result == 1
|
||||
assert "UART logging is disabled" in caplog.text
|
||||
|
||||
Reference in New Issue
Block a user