Add non-blocking support for connect (partly from patch #6860) plus many cleanups in socket & netconn API

This commit is contained in:
goldsimon
2010-01-29 22:09:31 +00:00
parent 1dd8300e69
commit e58f4c567a
8 changed files with 203 additions and 132 deletions

View File

@@ -98,7 +98,8 @@ recv_raw(void *arg, struct raw_pcb *pcb, struct pbuf *p,
}
}
if(q != NULL) {
if (q != NULL) {
u16_t len;
buf = (struct netbuf *)memp_malloc(MEMP_NETBUF);
if (buf == NULL) {
pbuf_free(q);
@@ -110,13 +111,14 @@ recv_raw(void *arg, struct raw_pcb *pcb, struct pbuf *p,
buf->addr = &(((struct ip_hdr*)(q->payload))->src);
buf->port = pcb->protocol;
len = q->tot_len;
if (sys_mbox_trypost(conn->recvmbox, buf) != ERR_OK) {
netbuf_delete(buf);
return 0;
} else {
SYS_ARCH_INC(conn->recv_avail, q->tot_len);
SYS_ARCH_INC(conn->recv_avail, len);
/* Register event with callback */
API_EVENT(conn, NETCONN_EVT_RCVPLUS, q->tot_len);
API_EVENT(conn, NETCONN_EVT_RCVPLUS, len);
}
}
}
@@ -138,6 +140,7 @@ recv_udp(void *arg, struct udp_pcb *pcb, struct pbuf *p,
{
struct netbuf *buf;
struct netconn *conn;
u16_t len;
#if LWIP_SO_RCVBUF
int recv_avail;
#endif /* LWIP_SO_RCVBUF */
@@ -179,13 +182,14 @@ recv_udp(void *arg, struct udp_pcb *pcb, struct pbuf *p,
#endif /* LWIP_NETBUF_RECVINFO */
}
len = p->tot_len;
if (sys_mbox_trypost(conn->recvmbox, buf) != ERR_OK) {
netbuf_delete(buf);
return;
} else {
SYS_ARCH_INC(conn->recv_avail, p->tot_len);
SYS_ARCH_INC(conn->recv_avail, len);
/* Register event with callback */
API_EVENT(conn, NETCONN_EVT_RCVPLUS, p->tot_len);
API_EVENT(conn, NETCONN_EVT_RCVPLUS, len);
}
}
#endif /* LWIP_UDP */
@@ -220,20 +224,24 @@ recv_tcp(void *arg, struct tcp_pcb *pcb, struct pbuf *p, err_t err)
}
return ERR_OK;
}
/* Unlike for UDP or RAW pcbs, don't check for available space
using recv_avail since that could break the connection
(data is already ACKed) */
/* don't overwrite fatal errors! */
NETCONN_SET_SAFE_ERR(conn, err);
if (p != NULL) {
len = p->tot_len;
SYS_ARCH_INC(conn->recv_avail, len);
} else {
len = 0;
}
if (sys_mbox_trypost(conn->recvmbox, p) != ERR_OK) {
/* don't deallocate p: it is presented to us later again from tcp_fasttmr! */
return ERR_MEM;
} else {
SYS_ARCH_INC(conn->recv_avail, len);
/* Register event with callback */
API_EVENT(conn, NETCONN_EVT_RCVPLUS, len);
}
@@ -265,6 +273,7 @@ poll_tcp(void *arg, struct tcp_pcb *pcb)
} else if (conn->state == NETCONN_CLOSE) {
do_close_internal(conn);
}
/* @todo: implement connect timeout here? */
return ERR_OK;
}
@@ -285,7 +294,6 @@ sent_tcp(void *arg, struct tcp_pcb *pcb, u16_t len)
LWIP_ASSERT("conn != NULL", (conn != NULL));
if (conn->state == NETCONN_WRITE) {
LWIP_ASSERT("conn->pcb.tcp != NULL", conn->pcb.tcp != NULL);
do_writemore(conn);
} else if (conn->state == NETCONN_CLOSE) {
do_close_internal(conn);
@@ -327,7 +335,7 @@ err_tcp(void *arg, err_t err)
conn->last_err = err;
SYS_ARCH_UNPROTECT(lev);
/* API_EVENT might call tcp_tmr, so reset conn->state now */
/* reset conn->state now before waking up other threads */
old_state = conn->state;
conn->state = NETCONN_NONE;
@@ -347,13 +355,17 @@ err_tcp(void *arg, err_t err)
(old_state == NETCONN_CONNECT)) {
/* calling do_writemore/do_close_internal is not necessary
since the pcb has already been deleted! */
int was_nonblocking_connect = conn->in_non_blocking_connect;
conn->in_non_blocking_connect = 0;
/* set error return code */
LWIP_ASSERT("conn->current_msg != NULL", conn->current_msg != NULL);
conn->current_msg->err = err;
conn->current_msg = NULL;
/* wake up the waiting task */
sys_sem_signal(conn->op_completed);
if (!was_nonblocking_connect) {
/* set error return code */
LWIP_ASSERT("conn->current_msg != NULL", conn->current_msg != NULL);
conn->current_msg->err = err;
conn->current_msg = NULL;
/* wake up the waiting task */
sys_sem_signal(conn->op_completed);
}
} else {
LWIP_ASSERT("conn->current_msg == NULL", conn->current_msg == NULL);
}
@@ -415,7 +427,7 @@ accept_function(void *arg, struct tcp_pcb *newpcb, err_t err)
newconn->last_err = err;
if (sys_mbox_trypost(conn->acceptmbox, newconn) != ERR_OK) {
/* When returning != ERR_OK, the connection is aborted in tcp_process(),
/* When returning != ERR_OK, the pcb is aborted in tcp_process(),
so do nothing here! */
newconn->pcb.tcp = NULL;
/* no need to drain since we know the recvmbox is empty. */
@@ -591,6 +603,8 @@ netconn_alloc(enum netconn_type t, netconn_callback callback)
#if LWIP_SO_RCVBUF
conn->recv_bufsize = RECV_BUFSIZE_DEFAULT;
#endif /* LWIP_SO_RCVBUF */
conn->non_blocking = 0;
conn->in_non_blocking_connect = 0;
return conn;
}
@@ -707,8 +721,8 @@ do_close_internal(struct netconn *conn)
conn->state = NETCONN_NONE;
/* Set back some callback pointers as conn is going away */
conn->pcb.tcp = NULL;
/* Trigger select() in socket layer. This send should something else so the
errorfd is set, not the read and write fd! */
/* @todo: this lets select make the socket readable and writable,
which is wrong! errfd instead? */
API_EVENT(conn, NETCONN_EVT_RCVPLUS, 0);
API_EVENT(conn, NETCONN_EVT_SENDPLUS, 0);
/* wake up the application task */
@@ -736,45 +750,52 @@ do_close_internal(struct netconn *conn)
void
do_delconn(struct api_msg_msg *msg)
{
/* Drain and delete mboxes */
netconn_drain(msg->conn);
/* @todo TCP: abort running write/connect? */
if ((msg->conn->state != NETCONN_NONE) && (msg->conn->state != NETCONN_LISTEN)) {
/* this only happens for TCP netconns */
LWIP_ASSERT("msg->conn->type == NETCONN_TCP", msg->conn->type == NETCONN_TCP);
msg->err = ERR_INPROGRESS;
} else {
/* Drain and delete mboxes */
netconn_drain(msg->conn);
if (msg->conn->pcb.tcp != NULL) {
if (msg->conn->pcb.tcp != NULL) {
switch (NETCONNTYPE_GROUP(msg->conn->type)) {
switch (NETCONNTYPE_GROUP(msg->conn->type)) {
#if LWIP_RAW
case NETCONN_RAW:
raw_remove(msg->conn->pcb.raw);
break;
case NETCONN_RAW:
raw_remove(msg->conn->pcb.raw);
break;
#endif /* LWIP_RAW */
#if LWIP_UDP
case NETCONN_UDP:
msg->conn->pcb.udp->recv_arg = NULL;
udp_remove(msg->conn->pcb.udp);
break;
case NETCONN_UDP:
msg->conn->pcb.udp->recv_arg = NULL;
udp_remove(msg->conn->pcb.udp);
break;
#endif /* LWIP_UDP */
#if LWIP_TCP
case NETCONN_TCP:
LWIP_ASSERT("already writing or closing", msg->conn->current_msg == NULL &&
msg->conn->write_offset == 0);
msg->conn->state = NETCONN_CLOSE;
msg->conn->current_msg = msg;
do_close_internal(msg->conn);
/* API_EVENT is called inside do_close_internal, before releasing
the application thread, so we can return at this point! */
return;
case NETCONN_TCP:
LWIP_ASSERT("already writing or closing", msg->conn->current_msg == NULL &&
msg->conn->write_offset == 0);
msg->conn->state = NETCONN_CLOSE;
msg->conn->current_msg = msg;
do_close_internal(msg->conn);
/* API_EVENT is called inside do_close_internal, before releasing
the application thread, so we can return at this point! */
return;
#endif /* LWIP_TCP */
default:
break;
default:
break;
}
msg->conn->pcb.tcp = NULL;
}
/* tcp netconns don't come here! */
/* @todo: this lets select make the socket readable and writable,
which is wrong! errfd instead? */
API_EVENT(msg->conn, NETCONN_EVT_RCVPLUS, 0);
API_EVENT(msg->conn, NETCONN_EVT_SENDPLUS, 0);
}
/* tcp netconns don't come here! */
/* Trigger select() in socket layer. This send should something else so the
errorfd is set, not the read and write fd! */
API_EVENT(msg->conn, NETCONN_EVT_RCVPLUS, 0);
API_EVENT(msg->conn, NETCONN_EVT_SENDPLUS, 0);
if (msg->conn->op_completed != SYS_SEM_NULL) {
sys_sem_signal(msg->conn->op_completed);
}
@@ -830,6 +851,7 @@ static err_t
do_connected(void *arg, struct tcp_pcb *pcb, err_t err)
{
struct netconn *conn;
int was_blocking;
LWIP_UNUSED_ARG(pcb);
@@ -840,15 +862,32 @@ do_connected(void *arg, struct tcp_pcb *pcb, err_t err)
}
LWIP_ASSERT("conn->state == NETCONN_CONNECT", conn->state == NETCONN_CONNECT);
LWIP_ASSERT("conn->current_msg != NULL", conn->current_msg != NULL);
LWIP_ASSERT("(conn->current_msg != NULL) || conn->in_non_blocking_connect",
(conn->current_msg != NULL) || conn->in_non_blocking_connect);
conn->current_msg->err = err;
if (conn->current_msg != NULL) {
conn->current_msg->err = err;
}
if ((conn->type == NETCONN_TCP) && (err == ERR_OK)) {
setup_tcp(conn);
}
was_blocking = !conn->in_non_blocking_connect;
conn->in_non_blocking_connect = 0;
conn->current_msg = NULL;
conn->state = NETCONN_NONE;
sys_sem_signal(conn->op_completed);
if (!was_blocking) {
SYS_ARCH_DECL_PROTECT(lev);
SYS_ARCH_PROTECT(lev);
if (conn->last_err == ERR_INPROGRESS) {
conn->last_err = ERR_OK;
}
SYS_ARCH_UNPROTECT(lev);
}
API_EVENT(conn, NETCONN_EVT_SENDPLUS, 0);
if (was_blocking) {
sys_sem_signal(conn->op_completed);
}
return ERR_OK;
}
#endif /* LWIP_TCP */
@@ -866,44 +905,48 @@ do_connect(struct api_msg_msg *msg)
if (msg->conn->pcb.tcp == NULL) {
/* This may happen when calling netconn_connect() a second time */
msg->err = ERR_CLSD;
sys_sem_signal(msg->conn->op_completed);
return;
}
switch (NETCONNTYPE_GROUP(msg->conn->type)) {
} else {
switch (NETCONNTYPE_GROUP(msg->conn->type)) {
#if LWIP_RAW
case NETCONN_RAW:
msg->err = raw_connect(msg->conn->pcb.raw, msg->msg.bc.ipaddr);
sys_sem_signal(msg->conn->op_completed);
break;
#endif /* LWIP_RAW */
#if LWIP_UDP
case NETCONN_UDP:
msg->err = udp_connect(msg->conn->pcb.udp, msg->msg.bc.ipaddr, msg->msg.bc.port);
sys_sem_signal(msg->conn->op_completed);
break;
#endif /* LWIP_UDP */
#if LWIP_TCP
case NETCONN_TCP:
setup_tcp(msg->conn);
msg->err = tcp_connect(msg->conn->pcb.tcp, msg->msg.bc.ipaddr, msg->msg.bc.port,
do_connected);
/* sys_sem_signal() is called from do_connected (or err_tcp()),
* when the connection is established! */
if (msg->err == ERR_OK) {
msg->conn->state = NETCONN_CONNECT;
msg->conn->current_msg = msg;
/* Prevent connect while doing any other action. */
if (msg->conn->state != NETCONN_NONE) {
msg->err = ERR_ISCONN;
} else {
/* tcp_connect failed, so do_connected will not be called: return now */
sys_sem_signal(msg->conn->op_completed);
setup_tcp(msg->conn);
msg->err = tcp_connect(msg->conn->pcb.tcp, msg->msg.bc.ipaddr,
msg->msg.bc.port, do_connected);
if (msg->err == ERR_OK) {
msg->conn->state = NETCONN_CONNECT;
msg->conn->in_non_blocking_connect = msg->conn->non_blocking;
if (msg->conn->non_blocking) {
msg->err = ERR_INPROGRESS;
} else {
msg->conn->current_msg = msg;
/* sys_sem_signal() is called from do_connected (or err_tcp()),
* when the connection is established! */
return;
}
}
}
break;
#endif /* LWIP_TCP */
default:
LWIP_ERROR("Invalid netconn type", 0, do{ msg->err = ERR_VAL;
sys_sem_signal(msg->conn->op_completed); }while(0));
LWIP_ERROR("Invalid netconn type", 0, do{ msg->err = ERR_VAL; }while(0));
break;
}
}
sys_sem_signal(msg->conn->op_completed);
}
/**
@@ -944,13 +987,14 @@ do_listen(struct api_msg_msg *msg)
msg->err = ERR_CONN;
if (msg->conn->pcb.tcp != NULL) {
if (msg->conn->type == NETCONN_TCP) {
if (msg->conn->pcb.tcp->state == CLOSED) {
if (msg->conn->state == NETCONN_NONE) {
#if TCP_LISTEN_BACKLOG
struct tcp_pcb* lpcb = tcp_listen_with_backlog(msg->conn->pcb.tcp, msg->msg.lb.backlog);
#else /* TCP_LISTEN_BACKLOG */
struct tcp_pcb* lpcb = tcp_listen(msg->conn->pcb.tcp);
#endif /* TCP_LISTEN_BACKLOG */
if (lpcb == NULL) {
/* in this case, the old pcb is still allocated */
msg->err = ERR_MEM;
} else {
/* delete the recvmbox and allocate the acceptmbox */
@@ -970,6 +1014,10 @@ do_listen(struct api_msg_msg *msg)
msg->conn->pcb.tcp = lpcb;
tcp_arg(msg->conn->pcb.tcp, msg->conn);
tcp_accept(msg->conn->pcb.tcp, accept_function);
} else {
/* since the old pcb is already deallocated, free lpcb now */
tcp_close(lpcb);
msg->conn->pcb.tcp = NULL;
}
}
}
@@ -1072,6 +1120,9 @@ do_writemore(struct netconn *conn)
LWIP_ASSERT("conn != NULL", conn != NULL);
LWIP_ASSERT("conn->state == NETCONN_WRITE", (conn->state == NETCONN_WRITE));
LWIP_ASSERT("conn->current_msg != NULL", conn->current_msg != NULL);
LWIP_ASSERT("conn->pcb.tcp != NULL", conn->pcb.tcp != NULL);
LWIP_ASSERT("conn->write_offset < conn->current_msg->msg.w.len",
conn->write_offset < conn->current_msg->msg.w.len);
dataptr = (u8_t*)conn->current_msg->msg.w.dataptr + conn->write_offset;
diff = conn->current_msg->msg.w.len - conn->write_offset;
@@ -1094,29 +1145,31 @@ do_writemore(struct netconn *conn)
err = tcp_write(conn->pcb.tcp, dataptr, len, conn->current_msg->msg.w.apiflags);
LWIP_ASSERT("do_writemore: invalid length!", ((conn->write_offset + len) <= conn->current_msg->msg.w.len));
/* if OK or memory error, check available space */
if (((err == ERR_OK) || (err == ERR_MEM)) &&
((tcp_sndbuf(conn->pcb.tcp) <= TCP_SNDLOWAT) ||
(tcp_sndqueuelen(conn->pcb.tcp) >= TCP_SNDQUEUELOWAT))) {
/* The queued byte- or pbuf-count exceeds the configured low-water limit,
let select mark this pcb as non-writable. */
API_EVENT(conn, NETCONN_EVT_SENDMINUS, len);
}
if (err == ERR_OK) {
conn->write_offset += len;
if (conn->write_offset == conn->current_msg->msg.w.len) {
/* everything was written */
write_finished = 1;
conn->write_offset = 0;
/* API_EVENT might call tcp_tmr, so reset conn->state now */
conn->state = NETCONN_NONE;
}
err = tcp_output_nagle(conn->pcb.tcp);
/* If the queued byte- or pbuf-count exceeds the configured low-water limit,
let select mark this pcb as non-writable. */
if ((err == ERR_OK) && (tcp_sndbuf(conn->pcb.tcp) <= TCP_SNDLOWAT) ||
(tcp_sndqueuelen(conn->pcb.tcp) >= TCP_SNDQUEUELOWAT)) {
API_EVENT(conn, NETCONN_EVT_SENDMINUS, len);
}
tcp_output_nagle(conn->pcb.tcp);
} else if (err == ERR_MEM) {
/* If ERR_MEM, we wait for sent_tcp or poll_tcp to be called
we do NOT return to the application thread, since ERR_MEM is
only a temporary error! */
/* tcp_enqueue returned ERR_MEM, try tcp_output anyway */
err = tcp_output(conn->pcb.tcp);
tcp_output(conn->pcb.tcp);
#if LWIP_TCPIP_CORE_LOCKING
conn->write_delayed = 1;
@@ -1162,11 +1215,14 @@ do_write(struct api_msg_msg *msg)
} else {
if (msg->conn->type == NETCONN_TCP) {
#if LWIP_TCP
if (msg->conn->pcb.tcp != NULL) {
if (msg->conn->state != NETCONN_NONE) {
msg->err = ERR_INPROGRESS;
} else if (msg->conn->pcb.tcp != NULL) {
msg->conn->state = NETCONN_WRITE;
/* set all the variables used by do_writemore */
LWIP_ASSERT("already writing or closing", msg->conn->current_msg == NULL &&
msg->conn->write_offset == 0);
LWIP_ASSERT("msg->msg.w.len != 0", msg->msg.w.len != 0);
msg->conn->current_msg = msg;
msg->conn->write_offset = 0;
#if LWIP_TCPIP_CORE_LOCKING
@@ -1258,7 +1314,13 @@ void
do_close(struct api_msg_msg *msg)
{
#if LWIP_TCP
if ((msg->conn->pcb.tcp != NULL) && (msg->conn->type == NETCONN_TCP)) {
/* @todo: abort running write/connect? */
if ((msg->conn->state != NETCONN_NONE) && (msg->conn->state != NETCONN_LISTEN)) {
/* this only happens for TCP netconns */
LWIP_ASSERT("msg->conn->type == NETCONN_TCP", msg->conn->type == NETCONN_TCP);
msg->err = ERR_INPROGRESS;
} else if ((msg->conn->pcb.tcp != NULL) && (msg->conn->type == NETCONN_TCP)) {
/* Drain and delete mboxes */
netconn_drain(msg->conn);
LWIP_ASSERT("already writing or closing", msg->conn->current_msg == NULL &&
msg->conn->write_offset == 0);
@@ -1266,12 +1328,13 @@ do_close(struct api_msg_msg *msg)
msg->conn->current_msg = msg;
do_close_internal(msg->conn);
/* for tcp netconns, do_close_internal ACKs the message */
return;
} else
#endif /* LWIP_TCP */
{
msg->err = ERR_VAL;
sys_sem_signal(msg->conn->op_completed);
}
sys_sem_signal(msg->conn->op_completed);
}
#if LWIP_IGMP