[script][wait_until] Fix FIFO ordering and reentrancy bugs (#12049)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
#pragma once
|
||||
|
||||
#include <list>
|
||||
#include <memory>
|
||||
#include <tuple>
|
||||
#include <forward_list>
|
||||
#include "esphome/core/automation.h"
|
||||
#include "esphome/core/component.h"
|
||||
#include "esphome/core/helpers.h"
|
||||
@@ -290,10 +290,10 @@ template<class C, typename... Ts> class ScriptWaitAction : public Action<Ts...>,
|
||||
}
|
||||
|
||||
// Store parameters for later execution
|
||||
this->param_queue_.emplace_front(x...);
|
||||
// Enable loop now that we have work to do
|
||||
this->param_queue_.emplace_back(x...);
|
||||
// Enable loop now that we have work to do - don't call loop() synchronously!
|
||||
// Let the event loop call it to avoid reentrancy issues
|
||||
this->enable_loop();
|
||||
this->loop();
|
||||
}
|
||||
|
||||
void loop() override {
|
||||
@@ -303,13 +303,17 @@ template<class C, typename... Ts> class ScriptWaitAction : public Action<Ts...>,
|
||||
if (this->script_->is_running())
|
||||
return;
|
||||
|
||||
while (!this->param_queue_.empty()) {
|
||||
// Only process ONE queued item per loop iteration
|
||||
// Processing all items in a while loop causes infinite loops because
|
||||
// play_next_() can trigger more items to be queued
|
||||
if (!this->param_queue_.empty()) {
|
||||
auto ¶ms = this->param_queue_.front();
|
||||
this->play_next_tuple_(params, typename gens<sizeof...(Ts)>::type());
|
||||
this->param_queue_.pop_front();
|
||||
} else {
|
||||
// Queue is now empty - disable loop until next play_complex
|
||||
this->disable_loop();
|
||||
}
|
||||
// Queue is now empty - disable loop until next play_complex
|
||||
this->disable_loop();
|
||||
}
|
||||
|
||||
void play(const Ts &...x) override { /* ignore - see play_complex */
|
||||
@@ -326,7 +330,7 @@ template<class C, typename... Ts> class ScriptWaitAction : public Action<Ts...>,
|
||||
}
|
||||
|
||||
C *script_;
|
||||
std::forward_list<std::tuple<Ts...>> param_queue_;
|
||||
std::list<std::tuple<Ts...>> param_queue_;
|
||||
};
|
||||
|
||||
} // namespace script
|
||||
|
||||
@@ -9,8 +9,8 @@
|
||||
#include "esphome/core/application.h"
|
||||
#include "esphome/core/helpers.h"
|
||||
|
||||
#include <list>
|
||||
#include <vector>
|
||||
#include <forward_list>
|
||||
|
||||
namespace esphome {
|
||||
|
||||
@@ -445,9 +445,10 @@ template<typename... Ts> class WaitUntilAction : public Action<Ts...>, public Co
|
||||
// Store for later processing
|
||||
auto now = millis();
|
||||
auto timeout = this->timeout_value_.optional_value(x...);
|
||||
this->var_queue_.emplace_front(now, timeout, std::make_tuple(x...));
|
||||
this->var_queue_.emplace_back(now, timeout, std::make_tuple(x...));
|
||||
|
||||
// Do immediate check with fresh timestamp
|
||||
// Do immediate check with fresh timestamp - don't call loop() synchronously!
|
||||
// Let the event loop call it to avoid reentrancy issues
|
||||
if (this->process_queue_(now)) {
|
||||
// Only enable loop if we still have pending items
|
||||
this->enable_loop();
|
||||
@@ -499,7 +500,7 @@ template<typename... Ts> class WaitUntilAction : public Action<Ts...>, public Co
|
||||
}
|
||||
|
||||
Condition<Ts...> *condition_;
|
||||
std::forward_list<std::tuple<uint32_t, optional<uint32_t>, std::tuple<Ts...>>> var_queue_{};
|
||||
std::list<std::tuple<uint32_t, optional<uint32_t>, std::tuple<Ts...>>> var_queue_{};
|
||||
};
|
||||
|
||||
template<typename... Ts> class UpdateComponentAction : public Action<Ts...> {
|
||||
|
||||
131
tests/integration/fixtures/script_delay_with_params.yaml
Normal file
131
tests/integration/fixtures/script_delay_with_params.yaml
Normal file
@@ -0,0 +1,131 @@
|
||||
esphome:
|
||||
name: test-script-delay-params
|
||||
|
||||
host:
|
||||
|
||||
api:
|
||||
actions:
|
||||
# Test case from issue #12044: parent script with repeat calling child with delay
|
||||
- action: test_repeat_with_delay
|
||||
then:
|
||||
- logger.log: "=== TEST: Repeat loop calling script with delay and parameters ==="
|
||||
- script.execute: father_script
|
||||
|
||||
# Test case from issue #12043: script.wait with delayed child script
|
||||
- action: test_script_wait
|
||||
then:
|
||||
- logger.log: "=== TEST: script.wait with delayed child script ==="
|
||||
- script.execute: show_start_page
|
||||
- script.wait: show_start_page
|
||||
- logger.log: "After wait: script completed successfully"
|
||||
|
||||
# Test: Delay with different parameter types
|
||||
- action: test_delay_param_types
|
||||
then:
|
||||
- logger.log: "=== TEST: Delay with various parameter types ==="
|
||||
- script.execute:
|
||||
id: delay_with_int
|
||||
val: 42
|
||||
- delay: 50ms
|
||||
- script.execute:
|
||||
id: delay_with_string
|
||||
msg: "test message"
|
||||
- delay: 50ms
|
||||
- script.execute:
|
||||
id: delay_with_float
|
||||
num: 3.14
|
||||
|
||||
logger:
|
||||
level: DEBUG
|
||||
|
||||
script:
|
||||
# Reproduces issue #12044: child script with conditional delay
|
||||
- id: son_script
|
||||
mode: single
|
||||
parameters:
|
||||
iteration: int
|
||||
then:
|
||||
- logger.log:
|
||||
format: "Son script started with iteration %d"
|
||||
args: ['iteration']
|
||||
- if:
|
||||
condition:
|
||||
lambda: 'return iteration >= 5;'
|
||||
then:
|
||||
- logger.log:
|
||||
format: "Son script delaying for iteration %d"
|
||||
args: ['iteration']
|
||||
- delay: 100ms
|
||||
- logger.log:
|
||||
format: "Son script finished with iteration %d"
|
||||
args: ['iteration']
|
||||
|
||||
# Reproduces issue #12044: parent script with repeat loop
|
||||
- id: father_script
|
||||
mode: single
|
||||
then:
|
||||
- repeat:
|
||||
count: 10
|
||||
then:
|
||||
- logger.log:
|
||||
format: "Father iteration %d: calling son"
|
||||
args: ['iteration']
|
||||
- script.execute:
|
||||
id: son_script
|
||||
iteration: !lambda 'return iteration;'
|
||||
- script.wait: son_script
|
||||
- logger.log:
|
||||
format: "Father iteration %d: son finished, wait returned"
|
||||
args: ['iteration']
|
||||
|
||||
# Reproduces issue #12043: script.wait hangs
|
||||
- id: show_start_page
|
||||
mode: single
|
||||
then:
|
||||
- logger.log: "Start page: beginning"
|
||||
- delay: 100ms
|
||||
- logger.log: "Start page: after delay"
|
||||
- delay: 100ms
|
||||
- logger.log: "Start page: completed"
|
||||
|
||||
# Test delay with int parameter
|
||||
- id: delay_with_int
|
||||
mode: single
|
||||
parameters:
|
||||
val: int
|
||||
then:
|
||||
- logger.log:
|
||||
format: "Int test: before delay, val=%d"
|
||||
args: ['val']
|
||||
- delay: 50ms
|
||||
- logger.log:
|
||||
format: "Int test: after delay, val=%d"
|
||||
args: ['val']
|
||||
|
||||
# Test delay with string parameter
|
||||
- id: delay_with_string
|
||||
mode: single
|
||||
parameters:
|
||||
msg: string
|
||||
then:
|
||||
- logger.log:
|
||||
format: "String test: before delay, msg=%s"
|
||||
args: ['msg.c_str()']
|
||||
- delay: 50ms
|
||||
- logger.log:
|
||||
format: "String test: after delay, msg=%s"
|
||||
args: ['msg.c_str()']
|
||||
|
||||
# Test delay with float parameter
|
||||
- id: delay_with_float
|
||||
mode: single
|
||||
parameters:
|
||||
num: float
|
||||
then:
|
||||
- logger.log:
|
||||
format: "Float test: before delay, num=%.2f"
|
||||
args: ['num']
|
||||
- delay: 50ms
|
||||
- logger.log:
|
||||
format: "Float test: after delay, num=%.2f"
|
||||
args: ['num']
|
||||
82
tests/integration/fixtures/wait_until_fifo_ordering.yaml
Normal file
82
tests/integration/fixtures/wait_until_fifo_ordering.yaml
Normal file
@@ -0,0 +1,82 @@
|
||||
esphome:
|
||||
name: test-wait-until-ordering
|
||||
|
||||
host:
|
||||
|
||||
api:
|
||||
actions:
|
||||
- action: test_wait_until_fifo
|
||||
then:
|
||||
- logger.log: "=== TEST: wait_until should execute in FIFO order ==="
|
||||
- globals.set:
|
||||
id: gate_open
|
||||
value: 'false'
|
||||
- delay: 100ms
|
||||
# Start multiple parallel executions of coordinator script
|
||||
# Each will call the shared waiter script, queueing in same wait_until
|
||||
- script.execute: coordinator_0
|
||||
- script.execute: coordinator_1
|
||||
- script.execute: coordinator_2
|
||||
- script.execute: coordinator_3
|
||||
- script.execute: coordinator_4
|
||||
# Give scripts time to reach wait_until and queue
|
||||
- delay: 200ms
|
||||
- logger.log: "Opening gate - all wait_until should complete now"
|
||||
- globals.set:
|
||||
id: gate_open
|
||||
value: 'true'
|
||||
- delay: 500ms
|
||||
- logger.log: "Test complete"
|
||||
|
||||
globals:
|
||||
- id: gate_open
|
||||
type: bool
|
||||
initial_value: 'false'
|
||||
|
||||
script:
|
||||
# Shared waiter with single wait_until action (all coordinators call this)
|
||||
- id: waiter
|
||||
mode: parallel
|
||||
parameters:
|
||||
iter: int
|
||||
then:
|
||||
- lambda: 'ESP_LOGD("main", "Queueing iteration %d", iter);'
|
||||
- wait_until:
|
||||
condition:
|
||||
lambda: 'return id(gate_open);'
|
||||
timeout: 5s
|
||||
- lambda: 'ESP_LOGD("main", "Completed iteration %d", iter);'
|
||||
|
||||
# Coordinator scripts - each calls shared waiter with different iteration number
|
||||
- id: coordinator_0
|
||||
then:
|
||||
- script.execute:
|
||||
id: waiter
|
||||
iter: 0
|
||||
|
||||
- id: coordinator_1
|
||||
then:
|
||||
- script.execute:
|
||||
id: waiter
|
||||
iter: 1
|
||||
|
||||
- id: coordinator_2
|
||||
then:
|
||||
- script.execute:
|
||||
id: waiter
|
||||
iter: 2
|
||||
|
||||
- id: coordinator_3
|
||||
then:
|
||||
- script.execute:
|
||||
id: waiter
|
||||
iter: 3
|
||||
|
||||
- id: coordinator_4
|
||||
then:
|
||||
- script.execute:
|
||||
id: waiter
|
||||
iter: 4
|
||||
|
||||
logger:
|
||||
level: DEBUG
|
||||
121
tests/integration/test_script_delay_params.py
Normal file
121
tests/integration/test_script_delay_params.py
Normal file
@@ -0,0 +1,121 @@
|
||||
"""Integration test for script.wait FIFO ordering (issues #12043, #12044).
|
||||
|
||||
This test verifies that ScriptWaitAction processes queued items in FIFO order.
|
||||
|
||||
PR #7972 introduced bugs in ScriptWaitAction:
|
||||
- Used emplace_front() causing LIFO ordering instead of FIFO
|
||||
- Called loop() synchronously causing reentrancy issues
|
||||
- Used while loop processing entire queue causing infinite loops
|
||||
|
||||
These bugs manifested as:
|
||||
- Scripts becoming "zombies" (stuck in running state)
|
||||
- script.wait hanging forever
|
||||
- Incorrect execution order
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import re
|
||||
|
||||
import pytest
|
||||
|
||||
from .types import APIClientConnectedFactory, RunCompiledFunction
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_script_delay_with_params(
|
||||
yaml_config: str,
|
||||
run_compiled: RunCompiledFunction,
|
||||
api_client_connected: APIClientConnectedFactory,
|
||||
) -> None:
|
||||
"""Test that script.wait processes queued items in FIFO order.
|
||||
|
||||
This reproduces issues #12043 and #12044 where scripts would hang or become
|
||||
zombies due to LIFO ordering bugs in ScriptWaitAction from PR #7972.
|
||||
"""
|
||||
test_complete = asyncio.Event()
|
||||
|
||||
# Patterns to match in logs
|
||||
father_calling_pattern = re.compile(r"Father iteration (\d+): calling son")
|
||||
son_started_pattern = re.compile(r"Son script started with iteration (\d+)")
|
||||
son_delaying_pattern = re.compile(r"Son script delaying for iteration (\d+)")
|
||||
son_finished_pattern = re.compile(r"Son script finished with iteration (\d+)")
|
||||
father_wait_returned_pattern = re.compile(
|
||||
r"Father iteration (\d+): son finished, wait returned"
|
||||
)
|
||||
|
||||
# Track which iterations completed
|
||||
father_calling = set()
|
||||
son_started = set()
|
||||
son_delaying = set()
|
||||
son_finished = set()
|
||||
wait_returned = set()
|
||||
|
||||
def check_output(line: str) -> None:
|
||||
"""Check log output for expected messages."""
|
||||
if test_complete.is_set():
|
||||
return
|
||||
|
||||
if mo := father_calling_pattern.search(line):
|
||||
father_calling.add(int(mo.group(1)))
|
||||
elif mo := son_started_pattern.search(line):
|
||||
son_started.add(int(mo.group(1)))
|
||||
elif mo := son_delaying_pattern.search(line):
|
||||
son_delaying.add(int(mo.group(1)))
|
||||
elif mo := son_finished_pattern.search(line):
|
||||
son_finished.add(int(mo.group(1)))
|
||||
elif mo := father_wait_returned_pattern.search(line):
|
||||
iteration = int(mo.group(1))
|
||||
wait_returned.add(iteration)
|
||||
# Test completes when iteration 9 finishes
|
||||
if iteration == 9:
|
||||
test_complete.set()
|
||||
|
||||
# Run with log monitoring
|
||||
async with (
|
||||
run_compiled(yaml_config, line_callback=check_output),
|
||||
api_client_connected() as client,
|
||||
):
|
||||
# Verify device info
|
||||
device_info = await client.device_info()
|
||||
assert device_info is not None
|
||||
assert device_info.name == "test-script-delay-params"
|
||||
|
||||
# Get services
|
||||
_, services = await client.list_entities_services()
|
||||
test_service = next(
|
||||
(s for s in services if s.name == "test_repeat_with_delay"), None
|
||||
)
|
||||
assert test_service is not None, "test_repeat_with_delay service not found"
|
||||
|
||||
# Execute the test
|
||||
client.execute_service(test_service, {})
|
||||
|
||||
# Wait for test to complete (10 iterations * ~100ms each + margin)
|
||||
try:
|
||||
await asyncio.wait_for(test_complete.wait(), timeout=5.0)
|
||||
except TimeoutError:
|
||||
pytest.fail(
|
||||
f"Test timed out. Completed iterations: {sorted(wait_returned)}. "
|
||||
f"This likely indicates the script became a zombie (issue #12044)."
|
||||
)
|
||||
|
||||
# Verify all 10 iterations completed successfully
|
||||
expected_iterations = set(range(10))
|
||||
assert father_calling == expected_iterations, "Not all iterations started"
|
||||
assert son_started == expected_iterations, (
|
||||
"Son script not started for all iterations"
|
||||
)
|
||||
assert son_finished == expected_iterations, (
|
||||
"Son script not finished for all iterations"
|
||||
)
|
||||
assert wait_returned == expected_iterations, (
|
||||
"script.wait did not return for all iterations"
|
||||
)
|
||||
|
||||
# Verify delays were triggered for iterations >= 5
|
||||
expected_delays = set(range(5, 10))
|
||||
assert son_delaying == expected_delays, (
|
||||
"Delays not triggered for iterations >= 5"
|
||||
)
|
||||
90
tests/integration/test_wait_until_ordering.py
Normal file
90
tests/integration/test_wait_until_ordering.py
Normal file
@@ -0,0 +1,90 @@
|
||||
"""Integration test for wait_until FIFO ordering.
|
||||
|
||||
This test verifies that when multiple wait_until actions are queued,
|
||||
they execute in FIFO (First In First Out) order, not LIFO.
|
||||
|
||||
PR #7972 introduced a bug where emplace_front() was used, causing
|
||||
LIFO ordering which is incorrect.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import re
|
||||
|
||||
import pytest
|
||||
|
||||
from .types import APIClientConnectedFactory, RunCompiledFunction
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_wait_until_fifo_ordering(
|
||||
yaml_config: str,
|
||||
run_compiled: RunCompiledFunction,
|
||||
api_client_connected: APIClientConnectedFactory,
|
||||
) -> None:
|
||||
"""Test that wait_until executes queued items in FIFO order.
|
||||
|
||||
With the bug (using emplace_front), the order would be 4,3,2,1,0 (LIFO).
|
||||
With the fix (using emplace_back), the order should be 0,1,2,3,4 (FIFO).
|
||||
"""
|
||||
test_complete = asyncio.Event()
|
||||
|
||||
# Track completion order
|
||||
completed_order = []
|
||||
|
||||
# Patterns to match
|
||||
queuing_pattern = re.compile(r"Queueing iteration (\d+)")
|
||||
completed_pattern = re.compile(r"Completed iteration (\d+)")
|
||||
|
||||
def check_output(line: str) -> None:
|
||||
"""Check log output for completion order."""
|
||||
if test_complete.is_set():
|
||||
return
|
||||
|
||||
if mo := queuing_pattern.search(line):
|
||||
iteration = int(mo.group(1))
|
||||
|
||||
elif mo := completed_pattern.search(line):
|
||||
iteration = int(mo.group(1))
|
||||
completed_order.append(iteration)
|
||||
|
||||
# Test completes when all 5 have completed
|
||||
if len(completed_order) == 5:
|
||||
test_complete.set()
|
||||
|
||||
# Run with log monitoring
|
||||
async with (
|
||||
run_compiled(yaml_config, line_callback=check_output),
|
||||
api_client_connected() as client,
|
||||
):
|
||||
# Verify device info
|
||||
device_info = await client.device_info()
|
||||
assert device_info is not None
|
||||
assert device_info.name == "test-wait-until-ordering"
|
||||
|
||||
# Get services
|
||||
_, services = await client.list_entities_services()
|
||||
test_service = next(
|
||||
(s for s in services if s.name == "test_wait_until_fifo"), None
|
||||
)
|
||||
assert test_service is not None, "test_wait_until_fifo service not found"
|
||||
|
||||
# Execute the test
|
||||
client.execute_service(test_service, {})
|
||||
|
||||
# Wait for test to complete
|
||||
try:
|
||||
await asyncio.wait_for(test_complete.wait(), timeout=5.0)
|
||||
except TimeoutError:
|
||||
pytest.fail(
|
||||
f"Test timed out. Completed order: {completed_order}. "
|
||||
f"Expected 5 completions but got {len(completed_order)}."
|
||||
)
|
||||
|
||||
# Verify FIFO order
|
||||
expected_order = [0, 1, 2, 3, 4]
|
||||
assert completed_order == expected_order, (
|
||||
f"Unexpected order: {completed_order}. "
|
||||
f"Expected FIFO order: {expected_order}"
|
||||
)
|
||||
Reference in New Issue
Block a user