From b530ed6e7f3744788967670ff0aa3c6abaa18461 Mon Sep 17 00:00:00 2001 From: "L. Pereira" Date: Wed, 8 May 2024 18:36:41 -0700 Subject: [PATCH] Allow async/await on multiple file descriptors Provide two new public APIs: lwan_request_awaitv_any() and lwan_request_awaitv_all(), which, respectively, will await for an operation on at least one of the awaited file descriptors, returning the one that unblocked the coroutine, and for all the awaited file descriptors. The APIs are experimental but you can already see how much it improves the chat implementation of the websockets sample: now, instead of having to poll both the websocket and the pub/sub subscription, and wait a few milliseconds, it now instantaneously wakes up when there's data in either one of them, and processes only what has data. The chat now feels like a proper chat app (well, within reason for that crude app, but you get the idea). (As a side effect: we now send websocket pings periodically.) There's a lot to clean up here, but I'm tired and this will be done eventually. --- src/lib/liblwan.sym | 1 + src/lib/lwan-private.h | 2 + src/lib/lwan-request.c | 32 ------ src/lib/lwan-strbuf.c | 1 + src/lib/lwan-thread.c | 190 ++++++++++++++++++++++++++++++++--- src/lib/lwan-tq.c | 9 +- src/lib/lwan-websocket.c | 115 +++++++++++++++++---- src/lib/lwan.h | 15 ++- src/samples/websocket/main.c | 49 ++++----- 9 files changed, 323 insertions(+), 91 deletions(-) diff --git a/src/lib/liblwan.sym b/src/lib/liblwan.sym index 5b5227d6d..d0f59d554 100644 --- a/src/lib/liblwan.sym +++ b/src/lib/liblwan.sym @@ -73,6 +73,7 @@ global: lwan_handler_info_*; lwan_request_await_*; + lwan_request_awaitv_*; lwan_request_async_*; lwan_straitjacket_enforce*; diff --git a/src/lib/lwan-private.h b/src/lib/lwan-private.h index 8486889f3..ed87c88a8 100644 --- a/src/lib/lwan-private.h +++ b/src/lib/lwan-private.h @@ -283,3 +283,5 @@ void lwan_request_foreach_header_for_cgi(struct lwan_request *request, size_t value_len, void *user_data), void *user_data); + +bool lwan_send_websocket_ping_for_tq(struct lwan_connection *conn); diff --git a/src/lib/lwan-request.c b/src/lib/lwan-request.c index b5ad925b0..424c1c9f6 100644 --- a/src/lib/lwan-request.c +++ b/src/lib/lwan-request.c @@ -2083,38 +2083,6 @@ __attribute__((used)) int fuzz_parse_http_request(const uint8_t *data, } #endif -static inline int64_t -make_async_yield_value(int fd, enum lwan_connection_coro_yield event) -{ - return (int64_t)(((uint64_t)fd << 32 | event)); -} - -static inline struct lwan_connection *async_await_fd( - struct coro *coro, int fd, enum lwan_connection_coro_yield events) -{ - assert(events >= CONN_CORO_ASYNC_AWAIT_READ && - events <= CONN_CORO_ASYNC_AWAIT_READ_WRITE); - - int64_t from_coro = coro_yield(coro, make_async_yield_value(fd, events)); - return (struct lwan_connection *)(intptr_t)from_coro; -} - -struct lwan_connection *lwan_request_await_read(struct lwan_request *r, int fd) -{ - return async_await_fd(r->conn->coro, fd, CONN_CORO_ASYNC_AWAIT_READ); -} - -struct lwan_connection *lwan_request_await_write(struct lwan_request *r, int fd) -{ - return async_await_fd(r->conn->coro, fd, CONN_CORO_ASYNC_AWAIT_WRITE); -} - -struct lwan_connection *lwan_request_await_read_write(struct lwan_request *r, - int fd) -{ - return async_await_fd(r->conn->coro, fd, CONN_CORO_ASYNC_AWAIT_READ_WRITE); -} - ssize_t lwan_request_async_read_flags( struct lwan_request *request, int fd, void *buf, size_t len, int flags) { diff --git a/src/lib/lwan-strbuf.c b/src/lib/lwan-strbuf.c index 29772969a..3678ff61a 100644 --- a/src/lib/lwan-strbuf.c +++ b/src/lib/lwan-strbuf.c @@ -27,6 +27,7 @@ #include #include #include +#include #include "lwan-private.h" diff --git a/src/lib/lwan-thread.c b/src/lib/lwan-thread.c index 42078c3aa..6525e2c6f 100644 --- a/src/lib/lwan-thread.c +++ b/src/lib/lwan-thread.c @@ -559,7 +559,7 @@ conn_flags_to_epoll_events(enum lwan_connection_flags flags) return EPOLL_EVENTS(flags); } -static void update_epoll_flags(const struct timeout_queue *tq, +static void update_epoll_flags(const struct lwan *lwan, struct lwan_connection *conn, int epoll_fd, enum lwan_connection_coro_yield yield_result) @@ -609,7 +609,7 @@ static void update_epoll_flags(const struct timeout_queue *tq, struct epoll_event event = {.events = conn_flags_to_epoll_events(conn->flags), .data.ptr = conn}; - int fd = lwan_connection_get_fd(tq->lwan, conn); + int fd = lwan_connection_get_fd(lwan, conn); if (UNLIKELY(epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &event) < 0)) lwan_status_perror("epoll_ctl"); @@ -619,7 +619,11 @@ static void unasync_await_conn(void *data1, void *data2) { struct lwan_connection *async_fd_conn = data1; - async_fd_conn->flags &= ~(CONN_ASYNC_AWAIT | CONN_HUNG_UP); + async_fd_conn->flags &= + ~(CONN_ASYNC_AWAIT | CONN_HUNG_UP | CONN_ASYNC_AWAIT_MULTIPLE); + assert(async_fd_conn->parent); + async_fd_conn->parent->flags &= ~CONN_ASYNC_AWAIT_MULTIPLE; + async_fd_conn->thread = data2; /* If this file descriptor number is used again in the future as an HTTP @@ -635,9 +639,9 @@ static void unasync_await_conn(void *data1, void *data2) } static enum lwan_connection_coro_yield -resume_async(const struct timeout_queue *tq, +resume_async(const struct lwan *l, enum lwan_connection_coro_yield yield_result, - int64_t from_coro, + int await_fd, struct lwan_connection *conn, int epoll_fd) { @@ -646,7 +650,6 @@ resume_async(const struct timeout_queue *tq, [CONN_CORO_ASYNC_AWAIT_WRITE] = CONN_EVENTS_WRITE, [CONN_CORO_ASYNC_AWAIT_READ_WRITE] = CONN_EVENTS_READ_WRITE, }; - int await_fd = (int)((uint64_t)from_coro >> 32); enum lwan_connection_flags flags; int op; @@ -656,7 +659,7 @@ resume_async(const struct timeout_queue *tq, flags = to_connection_flags[yield_result]; - struct lwan_connection *await_fd_conn = &tq->lwan->conns[await_fd]; + struct lwan_connection *await_fd_conn = &l->conns[await_fd]; if (LIKELY(await_fd_conn->flags & CONN_ASYNC_AWAIT)) { if (LIKELY((await_fd_conn->flags & CONN_EVENTS_MASK) == flags)) return CONN_CORO_SUSPEND; @@ -697,6 +700,168 @@ resume_async(const struct timeout_queue *tq, return CONN_CORO_ABORT; } +struct flag_update { + unsigned int num_awaiting; + enum lwan_connection_coro_yield request_conn_yield; +}; + +static struct flag_update +update_flags_for_async_awaitv(struct lwan_request *r, struct lwan *l, va_list ap) +{ + int epoll_fd = r->conn->thread->epoll_fd; + struct flag_update update = {.num_awaiting = 0, + .request_conn_yield = CONN_CORO_YIELD}; + + while (true) { + int await_fd = va_arg(ap, int); + if (await_fd < 0) { + return update; + } + + enum lwan_connection_coro_yield events = + va_arg(ap, enum lwan_connection_coro_yield); + if (UNLIKELY(events < CONN_CORO_ASYNC_AWAIT_READ || + events > CONN_CORO_ASYNC_AWAIT_READ_WRITE)) { + lwan_status_error("awaitv() called with invalid events"); + coro_yield(r->conn->coro, CONN_CORO_ABORT); + __builtin_unreachable(); + } + + struct lwan_connection *conn = &l->conns[await_fd]; + + if (UNLIKELY(conn->flags & CONN_ASYNC_AWAIT_MULTIPLE)) { + lwan_status_debug("ignoring second awaitv call on same fd: %d", + await_fd); + continue; + } + + conn->flags |= CONN_ASYNC_AWAIT_MULTIPLE; + update.num_awaiting++; + + if (await_fd == r->fd) { + static const enum lwan_connection_coro_yield to_request_yield[] = { + [CONN_CORO_ASYNC_AWAIT_READ] = CONN_CORO_WANT_READ, + [CONN_CORO_ASYNC_AWAIT_WRITE] = CONN_CORO_WANT_WRITE, + [CONN_CORO_ASYNC_AWAIT_READ_WRITE] = CONN_CORO_WANT_READ_WRITE, + }; + + update.request_conn_yield = to_request_yield[events]; + continue; + } + + events = resume_async(l, events, await_fd, r->conn, epoll_fd); + if (UNLIKELY(events == CONN_CORO_ABORT)) { + lwan_status_error("could not register fd for async operation"); + coro_yield(r->conn->coro, CONN_CORO_ABORT); + __builtin_unreachable(); + } + } +} + +static void reset_conn_async_await_multiple_flag(struct lwan_connection *conns, + va_list ap) +{ + while (true) { + int await_fd = va_arg(ap, int); + if (await_fd < 0) + return; + + struct lwan_connection *conn = &conns[await_fd]; + conn->flags &= ~CONN_ASYNC_AWAIT_MULTIPLE; + + LWAN_NO_DISCARD(va_arg(ap, enum lwan_connection_coro_yield)); + } +} + +int lwan_request_awaitv_any(struct lwan_request *r, ...) +{ + struct lwan *l = r->conn->thread->lwan; + va_list ap; + + va_start(ap, r); + reset_conn_async_await_multiple_flag(l->conns, ap); + va_end(ap); + + va_start(ap, r); + struct flag_update update = update_flags_for_async_awaitv(r, l, ap); + va_end(ap); + + while (true) { + int64_t v = coro_yield(r->conn->coro, update.request_conn_yield); + struct lwan_connection *conn = (struct lwan_connection *)(uintptr_t)v; + + if (conn->flags & CONN_ASYNC_AWAIT_MULTIPLE) { + va_start(ap, r); + reset_conn_async_await_multiple_flag(l->conns, ap); + va_end(ap); + + return lwan_connection_get_fd(l, conn); + } + } +} + +void lwan_request_awaitv_all(struct lwan_request *r, ...) +{ + struct lwan *l = r->conn->thread->lwan; + va_list ap; + + va_start(ap, r); + reset_conn_async_await_multiple_flag(l->conns, ap); + va_end(ap); + + va_start(ap, r); + struct flag_update update = update_flags_for_async_awaitv(r, l, ap); + va_end(ap); + + while (update.num_awaiting) { + int64_t v = coro_yield(r->conn->coro, update.request_conn_yield); + struct lwan_connection *conn = (struct lwan_connection *)(uintptr_t)v; + + if (conn->flags & CONN_ASYNC_AWAIT_MULTIPLE) { + conn->flags &= ~CONN_ASYNC_AWAIT_MULTIPLE; + update.num_awaiting--; + } + } +} + +static inline int64_t +make_async_yield_value(int fd, enum lwan_connection_coro_yield event) +{ + assert(event >= CONN_CORO_ASYNC_AWAIT_READ && + event <= CONN_CORO_ASYNC_AWAIT_READ_WRITE); + + return (int64_t)(((uint64_t)fd << 32 | event)); +} + +static inline int async_await_fd(struct lwan_connection *conn, + int fd, + enum lwan_connection_coro_yield events) +{ + int64_t yield_value = make_async_yield_value(fd, events); + int64_t from_coro = coro_yield(conn->coro, yield_value); + struct lwan_connection *conn_from_coro = + (struct lwan_connection *)(intptr_t)from_coro; + + assert(conn_from_coro->flags & CONN_ASYNC_AWAIT); + + return lwan_connection_get_fd(conn->thread->lwan, conn_from_coro); +} + +inline int lwan_request_await_read(struct lwan_request *r, int fd) +{ + return async_await_fd(r->conn, fd, CONN_CORO_ASYNC_AWAIT_READ); +} + +inline int lwan_request_await_write(struct lwan_request *r, int fd) +{ + return async_await_fd(r->conn, fd, CONN_CORO_ASYNC_AWAIT_WRITE); +} + +inline int lwan_request_await_read_write(struct lwan_request *r, int fd) +{ + return async_await_fd(r->conn, fd, CONN_CORO_ASYNC_AWAIT_READ_WRITE); +} + static ALWAYS_INLINE void resume_coro(struct timeout_queue *tq, struct lwan_connection *conn_to_resume, struct lwan_connection *conn_to_yield, @@ -710,14 +875,15 @@ static ALWAYS_INLINE void resume_coro(struct timeout_queue *tq, enum lwan_connection_coro_yield yield_result = from_coro & 0xffffffff; if (UNLIKELY(yield_result >= CONN_CORO_ASYNC)) { - yield_result = - resume_async(tq, yield_result, from_coro, conn_to_resume, epoll_fd); + int await_fd = (int)((uint64_t)from_coro >> 32); + yield_result = resume_async(tq->lwan, yield_result, await_fd, + conn_to_resume, epoll_fd); } if (UNLIKELY(yield_result == CONN_CORO_ABORT)) { timeout_queue_expire(tq, conn_to_resume); } else { - update_epoll_flags(tq, conn_to_resume, epoll_fd, yield_result); + update_epoll_flags(tq->lwan, conn_to_resume, epoll_fd, yield_result); timeout_queue_move_to_last(tq, conn_to_resume); } } @@ -787,7 +953,7 @@ static bool process_pending_timers(struct timeout_queue *tq, } request = container_of(timeout, struct lwan_request, timeout); - update_epoll_flags(tq, request->conn, epoll_fd, CONN_CORO_RESUME); + update_epoll_flags(tq->lwan, request->conn, epoll_fd, CONN_CORO_RESUME); } if (should_expire_timers) { @@ -1452,7 +1618,7 @@ void lwan_thread_init(struct lwan *l) for (unsigned int i = 0; i < l->thread.count; i++) { struct lwan_thread *thread; - + if (schedtbl) { /* For SO_ATTACH_REUSEPORT_CBPF to work with the program * we provide the kernel, sockets have to be added to the diff --git a/src/lib/lwan-tq.c b/src/lib/lwan-tq.c index 4f085049b..ce0542c0e 100644 --- a/src/lib/lwan-tq.c +++ b/src/lib/lwan-tq.c @@ -112,7 +112,14 @@ void timeout_queue_expire_waiting(struct timeout_queue *tq) if (conn->time_to_expire > tq->current_time) return; - timeout_queue_expire(tq, conn); + if (LIKELY(!(conn->flags & CONN_IS_WEBSOCKET))) { + timeout_queue_expire(tq, conn); + } else { + if (LIKELY(lwan_send_websocket_ping_for_tq(conn))) + timeout_queue_move_to_last(tq, conn); + else + timeout_queue_expire(tq, conn); + } } /* Timeout queue exhausted: reset epoch */ diff --git a/src/lib/lwan-websocket.c b/src/lib/lwan-websocket.c index 4f5b6d022..e6b598063 100644 --- a/src/lib/lwan-websocket.c +++ b/src/lib/lwan-websocket.c @@ -55,12 +55,16 @@ enum ws_opcode { WS_OPCODE_INVALID = 16, }; -static void write_websocket_frame(struct lwan_request *request, - unsigned char header_byte, - char *msg, - size_t len) +#define WS_MASKED 0x80 + +static ALWAYS_INLINE bool +write_websocket_frame_full(struct lwan_request *request, + unsigned char header_byte, + char *msg, + size_t len, + bool use_coro) { - uint8_t frame[10] = { header_byte }; + uint8_t frame[10] = {header_byte}; size_t frame_len; if (len <= 125) { @@ -82,14 +86,51 @@ static void write_websocket_frame(struct lwan_request *request, {.iov_base = msg, .iov_len = len}, }; - lwan_writev(request, vec, N_ELEMENTS(vec)); + if (LIKELY(use_coro)) { + lwan_writev(request, vec, N_ELEMENTS(vec)); + return true; + } + + size_t total_written = 0; + int curr_iov = 0; + for (int try = 0; try < 10; try++) { + ssize_t written = writev(request->fd, &vec[curr_iov], + (int)N_ELEMENTS(vec) - curr_iov); + if (written < 0) { + if (errno == EAGAIN || errno == EINTR) + continue; + return false; + } + + total_written += (size_t)written; + while (curr_iov < (int)N_ELEMENTS(vec) && + written >= (ssize_t)vec[curr_iov].iov_len) { + written -= (ssize_t)vec[curr_iov].iov_len; + curr_iov++; + } + if (curr_iov == (int)N_ELEMENTS(vec)) + return true; + + vec[curr_iov].iov_base = (char *)vec[curr_iov].iov_base + written; + vec[curr_iov].iov_len -= (size_t)written; + } + + return false; +} + +static bool write_websocket_frame(struct lwan_request *request, + unsigned char header_byte, + char *msg, + size_t len) +{ + return write_websocket_frame_full(request, header_byte, msg, len, true); } static inline void lwan_response_websocket_write(struct lwan_request *request, unsigned char op) { size_t len = lwan_strbuf_get_length(request->response.buffer); char *msg = lwan_strbuf_get_buffer(request->response.buffer); - unsigned char header = 0x80 | op; + unsigned char header = WS_MASKED | op; if (!(request->conn->flags & CONN_IS_WEBSOCKET)) return; @@ -233,18 +274,20 @@ static void unmask(char *msg, size_t msg_len, char mask[static 4]) } } -static void send_websocket_pong(struct lwan_request *request, uint16_t header) +static void +ping_pong(struct lwan_request *request, uint16_t header, enum ws_opcode opcode) { const size_t len = header & 0x7f; char msg[128]; char mask[4]; - assert(header & 0x80); + assert(header & WS_MASKED); + assert(opcode == WS_OPCODE_PING || opcode == WS_OPCODE_PONG); if (UNLIKELY(len > 125)) { - lwan_status_debug("Received PING opcode with length %zu." + lwan_status_debug("Received %s frame with length %zu." "Max is 125. Aborting connection.", - len); + opcode == WS_OPCODE_PING ? "PING" : "PONG", len); coro_yield(request->conn->coro, CONN_CORO_ABORT); __builtin_unreachable(); } @@ -254,10 +297,48 @@ static void send_websocket_pong(struct lwan_request *request, uint16_t header) {.iov_base = msg, .iov_len = len}, }; - lwan_readv(request, vec, N_ELEMENTS(vec)); - unmask(msg, len, mask); + if (opcode == WS_OPCODE_PING) { + lwan_readv(request, vec, N_ELEMENTS(vec)); + unmask(msg, len, mask); + write_websocket_frame(request, WS_MASKED | WS_OPCODE_PONG, msg, len); + } else { + /* From MDN: "You might also get a pong without ever sending a ping; + * ignore this if it happens." */ + + /* FIXME: should we care about the contents of PONG packets? */ + /* FIXME: should we have a lwan_recvmsg() too that takes an iovec? */ + const size_t total_len = vec[0].iov_len + vec[1].iov_len; + if (LIKELY(total_len < sizeof(msg))) { + lwan_recv(request, msg, total_len, MSG_TRUNC); + } else { + lwan_recv(request, vec[0].iov_base, vec[0].iov_len, MSG_TRUNC); + lwan_recv(request, vec[1].iov_base, vec[1].iov_len, MSG_TRUNC); + } + } +} - write_websocket_frame(request, 0x80 | WS_OPCODE_PONG, msg, len); +bool lwan_send_websocket_ping_for_tq(struct lwan_connection *conn) +{ + uint32_t mask32 = (uint32_t)lwan_random_uint64(); + char mask[sizeof(mask32)]; + struct timespec payload; + + memcpy(mask, &mask32, sizeof(mask32)); + + if (UNLIKELY(clock_gettime(monotonic_clock_id, &payload) < 0)) + return false; + + unmask((char *)&payload, sizeof(payload), mask); + + /* use_coro is set to false here because this function is called outside + * a connection coroutine and the I/O wrappers might yield, which of course + * wouldn't work */ + struct lwan_request req = { + .conn = conn, + .fd = lwan_connection_get_fd(conn->thread->lwan, conn), + }; + return write_websocket_frame_full(&req, WS_MASKED | WS_OPCODE_PING, + (char *)&payload, sizeof(payload), false); } int lwan_response_websocket_read_hint(struct lwan_request *request, size_t size_hint) @@ -286,7 +367,7 @@ int lwan_response_websocket_read_hint(struct lwan_request *request, size_t size_ coro_yield(request->conn->coro, CONN_CORO_ABORT); __builtin_unreachable(); } - if (UNLIKELY(!(header & 0x80))) { + if (UNLIKELY(!(header & WS_MASKED))) { lwan_status_debug("Client sent an unmasked WebSockets frame, aborting"); coro_yield(request->conn->coro, CONN_CORO_ABORT); __builtin_unreachable(); @@ -312,11 +393,11 @@ int lwan_response_websocket_read_hint(struct lwan_request *request, size_t size_ request->conn->flags &= ~CONN_IS_WEBSOCKET; break; + case WS_OPCODE_PONG: case WS_OPCODE_PING: - send_websocket_pong(request, header); + ping_pong(request, header, opcode); goto next_frame; - case WS_OPCODE_PONG: case WS_OPCODE_RSVD_1 ... WS_OPCODE_RSVD_5: case WS_OPCODE_RSVD_CONTROL_1 ... WS_OPCODE_RSVD_CONTROL_5: case WS_OPCODE_INVALID: diff --git a/src/lib/lwan.h b/src/lib/lwan.h index fb5ccb0a5..d20a7690a 100644 --- a/src/lib/lwan.h +++ b/src/lib/lwan.h @@ -300,6 +300,11 @@ enum lwan_connection_flags { * can deal with this fact. */ CONN_HUNG_UP = 1 << 12, + /* Used to both implement lwan_request_awaitv_all() correctly, and to + * ensure that spurious resumes from fds that weren't in the multiple + * await call won't return to the request handler. */ + CONN_ASYNC_AWAIT_MULTIPLE = 1 << 13, + CONN_FLAG_LAST = CONN_HUNG_UP, }; @@ -666,11 +671,11 @@ void lwan_response_websocket_write_binary(struct lwan_request *request); int lwan_response_websocket_read(struct lwan_request *request); int lwan_response_websocket_read_hint(struct lwan_request *request, size_t size_hint); -struct lwan_connection *lwan_request_await_read(struct lwan_request *r, int fd); -struct lwan_connection *lwan_request_await_write(struct lwan_request *r, - int fd); -struct lwan_connection *lwan_request_await_read_write(struct lwan_request *r, - int fd); +int lwan_request_await_read(struct lwan_request *r, int fd); +int lwan_request_await_write(struct lwan_request *r, int fd); +int lwan_request_await_read_write(struct lwan_request *r, int fd); +int lwan_request_awaitv_any(struct lwan_request *r, ...); +void lwan_request_awaitv_all(struct lwan_request *r, ...); ssize_t lwan_request_async_read(struct lwan_request *r, int fd, void *buf, size_t len); ssize_t lwan_request_async_read_flags(struct lwan_request *request, int fd, void *buf, size_t len, int flags); ssize_t lwan_request_async_write(struct lwan_request *r, int fd, const void *buf, size_t len); diff --git a/src/samples/websocket/main.c b/src/samples/websocket/main.c index d8b84930c..64ccdd644 100644 --- a/src/samples/websocket/main.c +++ b/src/samples/websocket/main.c @@ -123,12 +123,12 @@ static void pub_depart_message(void *data1, void *data2) LWAN_HANDLER_ROUTE(ws_chat, "/ws-chat") { + struct lwan *lwan = request->conn->thread->lwan; struct lwan_pubsub_subscriber *sub; struct lwan_pubsub_msg *msg; enum lwan_http_status status; static int total_user_count; int user_id; - uint64_t sleep_time = 1000; sub = lwan_pubsub_subscribe(chat); if (!sub) @@ -149,13 +149,17 @@ LWAN_HANDLER_ROUTE(ws_chat, "/ws-chat") (void *)(intptr_t)user_id); lwan_pubsub_publishf(chat, "*** User%d has joined the chat!\n", user_id); + const int websocket_fd = request->fd; + const int sub_fd = lwan_pubsub_get_notification_fd(sub); while (true) { - switch (lwan_response_websocket_read(request)) { - case ENOTCONN: /* read() called before connection is websocket */ - case ECONNRESET: /* Client closed the connection */ + int resumed_fd = lwan_request_awaitv_any( + request, websocket_fd, CONN_CORO_ASYNC_AWAIT_READ, sub_fd, + CONN_CORO_ASYNC_AWAIT_READ, -1); + + if (lwan->conns[resumed_fd].flags & CONN_HUNG_UP) goto out; - case EAGAIN: /* Nothing is available from other clients */ + if (resumed_fd == sub_fd) { while ((msg = lwan_pubsub_consume(sub))) { const struct lwan_value *value = lwan_pubsub_msg_value(msg); @@ -167,26 +171,23 @@ LWAN_HANDLER_ROUTE(ws_chat, "/ws-chat") lwan_pubsub_msg_done(msg); lwan_response_websocket_write_text(request); - sleep_time = 500; } - - lwan_request_sleep(request, sleep_time); - - /* We're receiving a lot of messages, wait up to 1s (500ms in the loop - * above, and 500ms in the increment below). Otherwise, wait 500ms every - * time we return from lwan_request_sleep() until we reach 8s. This way, - * if a chat is pretty busy, we'll have a lag of at least 1s -- which is - * probably fine; if it's not busy, we can sleep a bit more and conserve - * some resources. */ - if (sleep_time <= 8000) - sleep_time += 500; - break; - - case 0: /* We got something! Copy it to echo it back */ - lwan_pubsub_publishf(chat, "User%d: %.*s\n", user_id, - (int)lwan_strbuf_get_length(response->buffer), - lwan_strbuf_get_buffer(response->buffer)); - break; + } else if (resumed_fd == websocket_fd) { + switch (lwan_response_websocket_read(request)) { + case ENOTCONN: /* read() called before connection is websocket */ + case ECONNRESET: /* Client closed the connection */ + goto out; + + case 0: /* We got something! Copy it to echo it back */ + lwan_pubsub_publishf( + chat, "User%d: %.*s\n", user_id, + (int)lwan_strbuf_get_length(response->buffer), + lwan_strbuf_get_buffer(response->buffer)); + } + } else { + lwan_status_error( + "lwan_request_awaitv_any() returned %d, but waiting on it", + resumed_fd); } }