diff --git a/src/api/api_lib.c b/src/api/api_lib.c index bee73f79..42edc57c 100644 --- a/src/api/api_lib.c +++ b/src/api/api_lib.c @@ -94,10 +94,14 @@ #if LWIP_NETCONN_FULLDUPLEX #define NETCONN_RECVMBOX_WAITABLE(conn) (sys_mbox_valid(&(conn)->recvmbox) && (((conn)->flags & NETCONN_FLAG_MBOXINVALID) == 0)) #define NETCONN_ACCEPTMBOX_WAITABLE(conn) (sys_mbox_valid(&(conn)->acceptmbox) && (((conn)->flags & (NETCONN_FLAG_MBOXCLOSED|NETCONN_FLAG_MBOXINVALID)) == 0)) -#else +#define NETCONN_MBOX_WAITING_INC(conn) SYS_ARCH_INC(conn->mbox_threads_waiting, 1) +#define NETCONN_MBOX_WAITING_DEC(conn) SYS_ARCH_INC(conn->mbox_threads_waiting, 1) +#else /* LWIP_NETCONN_FULLDUPLEX */ #define NETCONN_RECVMBOX_WAITABLE(conn) sys_mbox_valid(&(conn)->recvmbox) #define NETCONN_ACCEPTMBOX_WAITABLE(conn) (sys_mbox_valid(&(conn)->acceptmbox) && (((conn)->flags & NETCONN_FLAG_MBOXCLOSED) == 0)) -#endif +#define NETCONN_MBOX_WAITING_INC(conn) +#define NETCONN_MBOX_WAITING_DEC(conn) +#endif /* LWIP_NETCONN_FULLDUPLEX */ static err_t netconn_close_shutdown(struct netconn *conn, u8_t how); @@ -494,21 +498,35 @@ netconn_accept(struct netconn *conn, struct netconn **new_conn) API_MSG_VAR_ALLOC_ACCEPT(msg); + NETCONN_MBOX_WAITING_INC(conn); if (netconn_is_nonblocking(conn)) { if (sys_arch_mbox_tryfetch(&conn->acceptmbox, &accept_ptr) == SYS_ARCH_TIMEOUT) { API_MSG_VAR_FREE_ACCEPT(msg); + NETCONN_MBOX_WAITING_DEC(conn); return ERR_WOULDBLOCK; } } else { #if LWIP_SO_RCVTIMEO if (sys_arch_mbox_fetch(&conn->acceptmbox, &accept_ptr, conn->recv_timeout) == SYS_ARCH_TIMEOUT) { API_MSG_VAR_FREE_ACCEPT(msg); + NETCONN_MBOX_WAITING_DEC(conn); return ERR_TIMEOUT; } #else sys_arch_mbox_fetch(&conn->acceptmbox, &accept_ptr, 0); #endif /* LWIP_SO_RCVTIMEO*/ } + NETCONN_MBOX_WAITING_DEC(conn); +#if LWIP_NETCONN_FULLDUPLEX + if (conn->flags & NETCONN_FLAG_MBOXINVALID) { + if (lwip_netconn_is_deallocated_msg(accept_ptr)) { + /* the netconn has been closed from another thread */ + API_MSG_VAR_FREE_ACCEPT(msg); + return ERR_CONN; + } + } +#endif + /* Register event with callback */ API_EVENT(conn, NETCONN_EVT_RCVMINUS, 0); @@ -576,10 +594,13 @@ netconn_recv_data(struct netconn *conn, void **new_buf, u8_t apiflags) return ERR_CONN; } + NETCONN_MBOX_WAITING_INC(conn); if (netconn_is_nonblocking(conn) || (apiflags & NETCONN_DONTBLOCK) || (conn->flags & NETCONN_FLAG_MBOXCLOSED) || (conn->pending_err != ERR_OK)) { if (sys_arch_mbox_tryfetch(&conn->recvmbox, &buf) == SYS_ARCH_TIMEOUT) { - err_t err = netconn_err(conn); + err_t err; + NETCONN_MBOX_WAITING_DEC(conn); + err = netconn_err(conn); if (err != ERR_OK) { /* return pending error */ return err; @@ -592,12 +613,23 @@ netconn_recv_data(struct netconn *conn, void **new_buf, u8_t apiflags) } else { #if LWIP_SO_RCVTIMEO if (sys_arch_mbox_fetch(&conn->recvmbox, &buf, conn->recv_timeout) == SYS_ARCH_TIMEOUT) { + NETCONN_MBOX_WAITING_DEC(conn); return ERR_TIMEOUT; } #else sys_arch_mbox_fetch(&conn->recvmbox, &buf, 0); #endif /* LWIP_SO_RCVTIMEO*/ } + NETCONN_MBOX_WAITING_DEC(conn); +#if LWIP_NETCONN_FULLDUPLEX + if (conn->flags & NETCONN_FLAG_MBOXINVALID) { + if (lwip_netconn_is_deallocated_msg(buf)) { + /* the netconn has been closed from another thread */ + API_MSG_VAR_FREE_ACCEPT(msg); + return ERR_CONN; + } + } +#endif #if LWIP_TCP #if (LWIP_UDP || LWIP_RAW) diff --git a/src/api/api_msg.c b/src/api/api_msg.c index 254b7559..614ca641 100644 --- a/src/api/api_msg.c +++ b/src/api/api_msg.c @@ -92,6 +92,19 @@ static void netconn_drain(struct netconn *conn); #define TCPIP_APIMSG_ACK(m) do { sys_sem_signal(LWIP_API_MSG_SEM(m)); } while(0) #endif /* LWIP_TCPIP_CORE_LOCKING */ +#if LWIP_NETCONN_FULLDUPLEX +const u8_t netconn_deleted = 0; + +int +lwip_netconn_is_deallocated_msg(void *msg) +{ + if (msg == &netconn_deleted) { + return 1; + } + return 0; +} +#endif /* LWIP_NETCONN_FULLDUPLEX */ + #if LWIP_TCP const u8_t netconn_aborted = 0; const u8_t netconn_reset = 0; @@ -823,16 +836,21 @@ netconn_drain(struct netconn *conn) /* Delete and drain the recvmbox. */ if (sys_mbox_valid(&conn->recvmbox)) { while (sys_mbox_tryfetch(&conn->recvmbox, &mem) != SYS_MBOX_EMPTY) { -#if LWIP_TCP - if (NETCONNTYPE_GROUP(conn->type) == NETCONN_TCP) { - err_t err; - if (!lwip_netconn_is_err_msg(mem, &err)) { - pbuf_free((struct pbuf *)mem); - } - } else -#endif /* LWIP_TCP */ +#if LWIP_NETCONN_FULLDUPLEX + if (!lwip_netconn_is_deallocated_msg(mem)) +#endif /* LWIP_NETCONN_FULLDUPLEX */ { - netbuf_delete((struct netbuf *)mem); +#if LWIP_TCP + if (NETCONNTYPE_GROUP(conn->type) == NETCONN_TCP) { + err_t err; + if (!lwip_netconn_is_err_msg(mem, &err)) { + pbuf_free((struct pbuf *)mem); + } + } else +#endif /* LWIP_TCP */ + { + netbuf_delete((struct netbuf *)mem); + } } } sys_mbox_free(&conn->recvmbox); @@ -843,18 +861,23 @@ netconn_drain(struct netconn *conn) #if LWIP_TCP if (sys_mbox_valid(&conn->acceptmbox)) { while (sys_mbox_tryfetch(&conn->acceptmbox, &mem) != SYS_MBOX_EMPTY) { - err_t err; - if (!lwip_netconn_is_err_msg(mem, &err)) { - struct netconn *newconn = (struct netconn *)mem; - /* Only tcp pcbs have an acceptmbox, so no need to check conn->type */ - /* pcb might be set to NULL already by err_tcp() */ - /* drain recvmbox */ - netconn_drain(newconn); - if (newconn->pcb.tcp != NULL) { - tcp_abort(newconn->pcb.tcp); - newconn->pcb.tcp = NULL; +#if LWIP_NETCONN_FULLDUPLEX + if (!lwip_netconn_is_deallocated_msg(mem)) +#endif /* LWIP_NETCONN_FULLDUPLEX */ + { + err_t err; + if (!lwip_netconn_is_err_msg(mem, &err)) { + struct netconn *newconn = (struct netconn *)mem; + /* Only tcp pcbs have an acceptmbox, so no need to check conn->type */ + /* pcb might be set to NULL already by err_tcp() */ + /* drain recvmbox */ + netconn_drain(newconn); + if (newconn->pcb.tcp != NULL) { + tcp_abort(newconn->pcb.tcp); + newconn->pcb.tcp = NULL; + } + netconn_free(newconn); } - netconn_free(newconn); } } sys_mbox_free(&conn->acceptmbox); @@ -867,8 +890,20 @@ netconn_drain(struct netconn *conn) static void netconn_mark_mbox_invalid(struct netconn *conn) { + int i, num_waiting; + void *msg = LWIP_CONST_CAST(void *, &netconn_deleted); + /* Prevent new calls/threads from reading from the mbox */ conn->flags |= NETCONN_FLAG_MBOXINVALID; + + SYS_ARCH_LOCKED(num_waiting = conn->mbox_threads_waiting); + for (i = 0; i < num_waiting; i++) { + if (sys_mbox_valid_val(conn->recvmbox)) { + sys_mbox_trypost(&conn->recvmbox, msg); + } else { + sys_mbox_trypost(&conn->acceptmbox, msg); + } + } } #endif /* LWIP_NETCONN_FULLDUPLEX */ diff --git a/src/include/lwip/api.h b/src/include/lwip/api.h index fb1e05a7..f8426a7d 100644 --- a/src/include/lwip/api.h +++ b/src/include/lwip/api.h @@ -241,6 +241,11 @@ struct netconn { by the application thread */ sys_mbox_t acceptmbox; #endif /* LWIP_TCP */ +#if LWIP_NETCONN_FULLDUPLEX + /** number of threads waiting on an mbox. This is required to unblock + all threads when closing while threads are waiting. */ + int mbox_threads_waiting; +#endif /** only used for socket layer */ #if LWIP_SOCKET int socket; diff --git a/src/include/lwip/priv/api_msg.h b/src/include/lwip/priv/api_msg.h index 9dfd088f..9e8ffc9e 100644 --- a/src/include/lwip/priv/api_msg.h +++ b/src/include/lwip/priv/api_msg.h @@ -187,6 +187,9 @@ struct dns_api_msg { }; #endif /* LWIP_DNS */ +#if LWIP_NETCONN_FULLDUPLEX +int lwip_netconn_is_deallocated_msg(void *msg); +#endif int lwip_netconn_is_err_msg(void *msg, err_t *err); void lwip_netconn_do_newconn (void *m); void lwip_netconn_do_delconn (void *m);