web_server: ensure fair network sharing + prevent lost state changes via deferred publish at high event load (#7538)

Co-authored-by: Jesse Hills <3060199+jesserockz@users.noreply.github.com>
Co-authored-by: Keith Burzinski <kbx81x@gmail.com>
This commit is contained in:
Nick Kinnan
2025-02-24 18:19:31 -08:00
committed by GitHub
parent c424fea524
commit 5e44a035a3
9 changed files with 832 additions and 200 deletions

View File

@@ -8,6 +8,10 @@
#include "esp_tls_crypto.h"
#include "utils.h"
#include "esphome/components/web_server/web_server.h"
#include "esphome/components/web_server/list_entities.h"
#include "web_server_idf.h"
namespace esphome {
@@ -276,21 +280,37 @@ AsyncEventSource::~AsyncEventSource() {
}
void AsyncEventSource::handleRequest(AsyncWebServerRequest *request) {
auto *rsp = new AsyncEventSourceResponse(request, this); // NOLINT(cppcoreguidelines-owning-memory)
auto *rsp = // NOLINT(cppcoreguidelines-owning-memory)
new AsyncEventSourceResponse(request, this, this->web_server_);
if (this->on_connect_) {
this->on_connect_(rsp);
}
this->sessions_.insert(rsp);
}
void AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect) {
void AsyncEventSource::loop() {
for (auto *ses : this->sessions_) {
ses->send(message, event, id, reconnect);
ses->loop();
}
}
AsyncEventSourceResponse::AsyncEventSourceResponse(const AsyncWebServerRequest *request, AsyncEventSource *server)
: server_(server) {
void AsyncEventSource::try_send_nodefer(const char *message, const char *event, uint32_t id, uint32_t reconnect) {
for (auto *ses : this->sessions_) {
ses->try_send_nodefer(message, event, id, reconnect);
}
}
void AsyncEventSource::deferrable_send_state(void *source, const char *event_type,
message_generator_t *message_generator) {
for (auto *ses : this->sessions_) {
ses->deferrable_send_state(source, event_type, message_generator);
}
}
AsyncEventSourceResponse::AsyncEventSourceResponse(const AsyncWebServerRequest *request,
esphome::web_server_idf::AsyncEventSource *server,
esphome::web_server::WebServer *ws)
: server_(server), web_server_(ws), entities_iterator_(new esphome::web_server::ListEntitiesIterator(ws, server)) {
httpd_req_t *req = *request;
httpd_resp_set_status(req, HTTPD_200);
@@ -309,6 +329,30 @@ AsyncEventSourceResponse::AsyncEventSourceResponse(const AsyncWebServerRequest *
this->hd_ = req->handle;
this->fd_ = httpd_req_to_sockfd(req);
// Configure reconnect timeout and send config
// this should always go through since the tcp send buffer is empty on connect
std::string message = ws->get_config_json();
this->try_send_nodefer(message.c_str(), "ping", millis(), 30000);
for (auto &group : ws->sorting_groups_) {
message = json::build_json([group](JsonObject root) {
root["name"] = group.second.name;
root["sorting_weight"] = group.second.weight;
});
// a (very) large number of these should be able to be queued initially without defer
// since the only thing in the send buffer at this point is the initial ping/config
this->try_send_nodefer(message.c_str(), "sorting_group");
}
this->entities_iterator_->begin(ws->include_internal_);
// just dump them all up-front and take advantage of the deferred queue
// on second thought that takes too long, but leaving the commented code here for debug purposes
// while(!this->entities_iterator_->completed()) {
// this->entities_iterator_->advance();
//}
}
void AsyncEventSourceResponse::destroy(void *ptr) {
@@ -317,52 +361,155 @@ void AsyncEventSourceResponse::destroy(void *ptr) {
delete rsp; // NOLINT(cppcoreguidelines-owning-memory)
}
void AsyncEventSourceResponse::send(const char *message, const char *event, uint32_t id, uint32_t reconnect) {
if (this->fd_ == 0) {
// helper for allowing only unique entries in the queue
void AsyncEventSourceResponse::deq_push_back_with_dedup_(void *source, message_generator_t *message_generator) {
DeferredEvent item(source, message_generator);
auto iter = std::find_if(this->deferred_queue_.begin(), this->deferred_queue_.end(),
[&item](const DeferredEvent &test) -> bool { return test == item; });
if (iter != this->deferred_queue_.end()) {
(*iter) = item;
} else {
this->deferred_queue_.push_back(item);
}
}
void AsyncEventSourceResponse::process_deferred_queue_() {
while (!deferred_queue_.empty()) {
DeferredEvent &de = deferred_queue_.front();
std::string message = de.message_generator_(web_server_, de.source_);
if (this->try_send_nodefer(message.c_str(), "state")) {
// O(n) but memory efficiency is more important than speed here which is why std::vector was chosen
deferred_queue_.erase(deferred_queue_.begin());
} else {
break;
}
}
}
void AsyncEventSourceResponse::process_buffer_() {
if (event_buffer_.empty()) {
return;
}
if (event_bytes_sent_ == event_buffer_.size()) {
event_buffer_.resize(0);
event_bytes_sent_ = 0;
return;
}
std::string ev;
int bytes_sent = httpd_socket_send(this->hd_, this->fd_, event_buffer_.c_str() + event_bytes_sent_,
event_buffer_.size() - event_bytes_sent_, 0);
if (bytes_sent == HTTPD_SOCK_ERR_TIMEOUT || bytes_sent == HTTPD_SOCK_ERR_FAIL) {
return;
}
event_bytes_sent_ += bytes_sent;
if (event_bytes_sent_ == event_buffer_.size()) {
event_buffer_.resize(0);
event_bytes_sent_ = 0;
}
}
void AsyncEventSourceResponse::loop() {
process_buffer_();
process_deferred_queue_();
if (!this->entities_iterator_->completed())
this->entities_iterator_->advance();
}
bool AsyncEventSourceResponse::try_send_nodefer(const char *message, const char *event, uint32_t id,
uint32_t reconnect) {
if (this->fd_ == 0) {
return false;
}
process_buffer_();
if (!event_buffer_.empty()) {
// there is still pending event data to send first
return false;
}
// 8 spaces are standing in for the hexidecimal chunk length to print later
const char chunk_len_header[] = " " CRLF_STR;
const int chunk_len_header_len = sizeof(chunk_len_header) - 1;
event_buffer_.append(chunk_len_header);
if (reconnect) {
ev.append("retry: ", sizeof("retry: ") - 1);
ev.append(to_string(reconnect));
ev.append(CRLF_STR, CRLF_LEN);
event_buffer_.append("retry: ", sizeof("retry: ") - 1);
event_buffer_.append(to_string(reconnect));
event_buffer_.append(CRLF_STR, CRLF_LEN);
}
if (id) {
ev.append("id: ", sizeof("id: ") - 1);
ev.append(to_string(id));
ev.append(CRLF_STR, CRLF_LEN);
event_buffer_.append("id: ", sizeof("id: ") - 1);
event_buffer_.append(to_string(id));
event_buffer_.append(CRLF_STR, CRLF_LEN);
}
if (event && *event) {
ev.append("event: ", sizeof("event: ") - 1);
ev.append(event);
ev.append(CRLF_STR, CRLF_LEN);
event_buffer_.append("event: ", sizeof("event: ") - 1);
event_buffer_.append(event);
event_buffer_.append(CRLF_STR, CRLF_LEN);
}
if (message && *message) {
ev.append("data: ", sizeof("data: ") - 1);
ev.append(message);
ev.append(CRLF_STR, CRLF_LEN);
event_buffer_.append("data: ", sizeof("data: ") - 1);
event_buffer_.append(message);
event_buffer_.append(CRLF_STR, CRLF_LEN);
}
if (ev.empty()) {
if (event_buffer_.empty()) {
return true;
}
event_buffer_.append(CRLF_STR, CRLF_LEN);
event_buffer_.append(CRLF_STR, CRLF_LEN);
// chunk length header itself and the final chunk terminating CRLF are not counted as part of the chunk
int chunk_len = event_buffer_.size() - CRLF_LEN - chunk_len_header_len;
char chunk_len_str[9];
snprintf(chunk_len_str, 9, "%08x", chunk_len);
std::memcpy(&event_buffer_[0], chunk_len_str, 8);
event_bytes_sent_ = 0;
process_buffer_();
return true;
}
void AsyncEventSourceResponse::deferrable_send_state(void *source, const char *event_type,
message_generator_t *message_generator) {
// allow all json "details_all" to go through before publishing bare state events, this avoids unnamed entries showing
// up in the web GUI and reduces event load during initial connect
if (!entities_iterator_->completed() && 0 != strcmp(event_type, "state_detail_all"))
return;
if (source == nullptr)
return;
if (event_type == nullptr)
return;
if (message_generator == nullptr)
return;
if (0 != strcmp(event_type, "state_detail_all") && 0 != strcmp(event_type, "state")) {
ESP_LOGE(TAG, "Can't defer non-state event");
}
ev.append(CRLF_STR, CRLF_LEN);
process_buffer_();
process_deferred_queue_();
// Sending chunked content prelude
auto cs = str_snprintf("%x" CRLF_STR, 4 * sizeof(ev.size()) + CRLF_LEN, ev.size());
httpd_socket_send(this->hd_, this->fd_, cs.c_str(), cs.size(), 0);
// Sendiing content chunk
httpd_socket_send(this->hd_, this->fd_, ev.c_str(), ev.size(), 0);
// Indicate end of chunk
httpd_socket_send(this->hd_, this->fd_, CRLF_STR, CRLF_LEN, 0);
if (!event_buffer_.empty() || !deferred_queue_.empty()) {
// outgoing event buffer or deferred queue still not empty which means downstream tcp send buffer full, no point
// trying to send first
deq_push_back_with_dedup_(source, message_generator);
} else {
std::string message = message_generator(web_server_, source);
if (!this->try_send_nodefer(message.c_str(), "state")) {
deq_push_back_with_dedup_(source, message_generator);
}
}
}
} // namespace web_server_idf