Skip to content

Commit

Permalink
Simplify async/await paths
Browse files Browse the repository at this point in the history
This removes quite a bit of complexity from how async/await works:

* The connection_coro_yield enum doesn't have any
  CONN_CORO_ASYNC_AWAIT_* value anymore, because the CONN_CORO_WANT_*
  values are used instead.

* epoll flags are now set using the same function that awaitv uses, and
  that resume_coro() used to use, so there's no need to pack the file
  descriptor and the interest in the coroutine yield value anymore.

* resume_coro() is now simplified, not needing to do any work related
  to async/await anymore, as that's now performed by the
  async_await_fd() auxiliary function: so now you only pay the (cheap!)
  price if you're using this feature.

In addition, signaling if an awaited file descriptor was hung up is
done differently, via the return value of the async/await functions.
  • Loading branch information
lpereira committed May 16, 2024
1 parent 802cf50 commit 3508980
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 153 deletions.
211 changes: 112 additions & 99 deletions src/lib/lwan-thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -639,23 +639,23 @@ static void unasync_await_conn(void *data1, void *data2)
}

static enum lwan_connection_coro_yield
resume_async(const struct lwan *l,
enum lwan_connection_coro_yield yield_result,
int await_fd,
struct lwan_connection *conn,
int epoll_fd)
prepare_await(const struct lwan *l,
enum lwan_connection_coro_yield yield_result,
int await_fd,
struct lwan_connection *conn,
int epoll_fd)
{
static const enum lwan_connection_flags to_connection_flags[] = {
[CONN_CORO_ASYNC_AWAIT_READ] = CONN_EVENTS_READ,
[CONN_CORO_ASYNC_AWAIT_WRITE] = CONN_EVENTS_WRITE,
[CONN_CORO_ASYNC_AWAIT_READ_WRITE] = CONN_EVENTS_READ_WRITE,
[CONN_CORO_WANT_READ] = CONN_EVENTS_READ,
[CONN_CORO_WANT_WRITE] = CONN_EVENTS_WRITE,
[CONN_CORO_WANT_READ_WRITE] = CONN_EVENTS_READ_WRITE,
};
enum lwan_connection_flags flags;
int op;

assert(await_fd >= 0);
assert(yield_result >= CONN_CORO_ASYNC_AWAIT_READ &&
yield_result <= CONN_CORO_ASYNC_AWAIT_READ_WRITE);
assert(yield_result >= CONN_CORO_WANT_READ &&
yield_result <= CONN_CORO_WANT_READ_WRITE);

flags = to_connection_flags[yield_result];

Expand Down Expand Up @@ -700,160 +700,181 @@ resume_async(const struct lwan *l,
return CONN_CORO_ABORT;
}

struct flag_update {
unsigned int num_awaiting;
enum lwan_connection_coro_yield request_conn_yield;
};

static void reset_conn_async_awaitv_flag(struct lwan_connection *conns,
va_list ap_orig)
static void clear_awaitv_flags(struct lwan_connection *conns, va_list ap_orig)
{
va_list ap;

va_copy(ap, ap_orig);

while (true) {
int await_fd = va_arg(ap, int);
if (await_fd < 0) {
va_end(ap);
break;
}

conns[await_fd].flags &= ~CONN_ASYNC_AWAITV;

for (int fd = va_arg(ap, int); fd >= 0; fd = va_arg(ap, int)) {
conns[fd].flags &= ~CONN_ASYNC_AWAITV;
LWAN_NO_DISCARD(va_arg(ap, enum lwan_connection_coro_yield));
}
va_end(ap);
}

static struct flag_update
update_flags_for_async_awaitv(struct lwan_request *r, struct lwan *l, va_list ap)
struct awaitv_state {
unsigned int num_awaiting;
enum lwan_connection_coro_yield request_conn_yield;
};

static int prepare_awaitv(struct lwan_request *r,
struct lwan *l,
va_list ap,
struct awaitv_state *state)
{
int epoll_fd = r->conn->thread->epoll_fd;
struct flag_update update = {.num_awaiting = 0,
.request_conn_yield = CONN_CORO_YIELD};

reset_conn_async_awaitv_flag(l->conns, ap);
*state = (struct awaitv_state){
.num_awaiting = 0,
.request_conn_yield = CONN_CORO_YIELD,
};

while (true) {
int await_fd = va_arg(ap, int);
if (await_fd < 0) {
return update;
}
clear_awaitv_flags(l->conns, ap);

for (int await_fd = va_arg(ap, int); await_fd >= 0;
await_fd = va_arg(ap, int)) {
struct lwan_connection *conn = &l->conns[await_fd];
enum lwan_connection_coro_yield events =
va_arg(ap, enum lwan_connection_coro_yield);
if (UNLIKELY(events < CONN_CORO_ASYNC_AWAIT_READ ||
events > CONN_CORO_ASYNC_AWAIT_READ_WRITE)) {
lwan_status_error("awaitv() called with invalid events");
coro_yield(r->conn->coro, CONN_CORO_ABORT);
__builtin_unreachable();
}

struct lwan_connection *conn = &l->conns[await_fd];

if (UNLIKELY(events < CONN_CORO_WANT_READ ||
events > CONN_CORO_WANT_READ_WRITE)) {
return -EINVAL;
}
if (UNLIKELY(conn->flags & CONN_ASYNC_AWAITV)) {
lwan_status_debug("ignoring second awaitv call on same fd: %d",
await_fd);
continue;
}

conn->flags |= CONN_ASYNC_AWAITV;
update.num_awaiting++;
state->num_awaiting++;

if (await_fd == r->fd) {
static const enum lwan_connection_coro_yield to_request_yield[] = {
[CONN_CORO_ASYNC_AWAIT_READ] = CONN_CORO_WANT_READ,
[CONN_CORO_ASYNC_AWAIT_WRITE] = CONN_CORO_WANT_WRITE,
[CONN_CORO_ASYNC_AWAIT_READ_WRITE] = CONN_CORO_WANT_READ_WRITE,
};

update.request_conn_yield = to_request_yield[events];
state->request_conn_yield = events;
continue;
}

events = resume_async(l, events, await_fd, r->conn, epoll_fd);
if (UNLIKELY(events == CONN_CORO_ABORT)) {
if (UNLIKELY(prepare_await(l, events, await_fd, r->conn, epoll_fd) ==
CONN_CORO_ABORT)) {
lwan_status_error("could not register fd for async operation");
coro_yield(r->conn->coro, CONN_CORO_ABORT);
__builtin_unreachable();
return -EIO;
}
}

return 0;
}

int lwan_request_awaitv_any(struct lwan_request *r, ...)
{
struct lwan *l = r->conn->thread->lwan;
struct awaitv_state state;
va_list ap;

va_start(ap, r);
struct flag_update update = update_flags_for_async_awaitv(r, l, ap);
int ret = prepare_awaitv(r, l, ap, &state);
va_end(ap);

if (UNLIKELY(ret < 0)) {
errno = -ret;
lwan_status_critical_perror("prepare_awaitv()");
coro_yield(r->conn->coro, CONN_CORO_ABORT);
__builtin_unreachable();
}

while (true) {
int64_t v = coro_yield(r->conn->coro, update.request_conn_yield);
int64_t v = coro_yield(r->conn->coro, state.request_conn_yield);
struct lwan_connection *conn = (struct lwan_connection *)(uintptr_t)v;

if (conn->flags & CONN_ASYNC_AWAITV)
return lwan_connection_get_fd(l, conn);
if (conn->flags & CONN_ASYNC_AWAITV) {
/* Ensure flags are unset in case awaitv_any() is called with
* a different set of file descriptors. */
va_start(ap, r);
clear_awaitv_flags(l->conns, ap);
va_end(ap);

int fd = lwan_connection_get_fd(l, conn);
return UNLIKELY(conn->flags & CONN_HUNG_UP) ? -fd : fd;
}
}
}

void lwan_request_awaitv_all(struct lwan_request *r, ...)
int lwan_request_awaitv_all(struct lwan_request *r, ...)
{
struct lwan *l = r->conn->thread->lwan;
struct awaitv_state state;
va_list ap;

va_start(ap, r);
struct flag_update update = update_flags_for_async_awaitv(r, l, ap);
int ret = prepare_awaitv(r, l, ap, &state);
va_end(ap);

while (update.num_awaiting) {
int64_t v = coro_yield(r->conn->coro, update.request_conn_yield);
if (UNLIKELY(ret < 0)) {
errno = -ret;
lwan_status_critical_perror("prepare_awaitv()");
coro_yield(r->conn->coro, CONN_CORO_ABORT);
__builtin_unreachable();
}

for (ret = 0; state.num_awaiting;) {
int64_t v = coro_yield(r->conn->coro, state.request_conn_yield);
struct lwan_connection *conn = (struct lwan_connection *)(uintptr_t)v;

if (conn->flags & CONN_ASYNC_AWAITV) {
conn->flags &= ~CONN_ASYNC_AWAITV;
update.num_awaiting--;

if (UNLIKELY(conn->flags & CONN_HUNG_UP)) {
/* Ensure flags are unset in case awaitv_any() is called with
* a different set of file descriptors. */
va_start(ap, r);
clear_awaitv_flags(l->conns, ap);
va_end(ap);

return lwan_connection_get_fd(l, conn);
}

state.num_awaiting--;
}
}
}

static inline int64_t
make_async_yield_value(int fd, enum lwan_connection_coro_yield event)
{
assert(event >= CONN_CORO_ASYNC_AWAIT_READ &&
event <= CONN_CORO_ASYNC_AWAIT_READ_WRITE);

return (int64_t)(((uint64_t)fd << 32 | event));
return -1;
}

static inline int async_await_fd(struct lwan_connection *conn,
int fd,
enum lwan_connection_coro_yield events)
{
int64_t yield_value = make_async_yield_value(fd, events);
int64_t from_coro = coro_yield(conn->coro, yield_value);
struct lwan_connection *conn_from_coro =
(struct lwan_connection *)(intptr_t)from_coro;
struct lwan_thread *thread = conn->thread;
struct lwan *lwan = thread->lwan;
enum lwan_connection_coro_yield yield =
prepare_await(lwan, events, fd, conn, thread->epoll_fd);

assert(conn_from_coro->flags & CONN_ASYNC_AWAIT);
if (LIKELY(yield == CONN_CORO_SUSPEND)) {
int64_t v = coro_yield(conn->coro, yield);

return lwan_connection_get_fd(conn->thread->lwan, conn_from_coro);
fd =
lwan_connection_get_fd(lwan, (struct lwan_connection *)(intptr_t)v);

return UNLIKELY(conn->flags & CONN_HUNG_UP) ? -fd : fd;
}

lwan_status_critical_perror("prepare_await(%d)", fd);
coro_yield(conn->coro, CONN_CORO_ABORT);
__builtin_unreachable();
}

inline int lwan_request_await_read(struct lwan_request *r, int fd)
int lwan_request_await_read(struct lwan_request *r, int fd)
{
return async_await_fd(r->conn, fd, CONN_CORO_ASYNC_AWAIT_READ);
return async_await_fd(r->conn, fd, CONN_CORO_WANT_READ);
}

inline int lwan_request_await_write(struct lwan_request *r, int fd)
int lwan_request_await_write(struct lwan_request *r, int fd)
{
return async_await_fd(r->conn, fd, CONN_CORO_ASYNC_AWAIT_WRITE);
return async_await_fd(r->conn, fd, CONN_CORO_WANT_WRITE);
}

inline int lwan_request_await_read_write(struct lwan_request *r, int fd)
int lwan_request_await_read_write(struct lwan_request *r, int fd)
{
return async_await_fd(r->conn, fd, CONN_CORO_ASYNC_AWAIT_READ_WRITE);
return async_await_fd(r->conn, fd, CONN_CORO_WANT_READ_WRITE);
}

static ALWAYS_INLINE void resume_coro(struct timeout_queue *tq,
Expand All @@ -864,20 +885,12 @@ static ALWAYS_INLINE void resume_coro(struct timeout_queue *tq,
assert(conn_to_resume->coro);
assert(conn_to_yield->coro);

int64_t from_coro = coro_resume_value(conn_to_resume->coro,
(int64_t)(intptr_t)conn_to_yield);
enum lwan_connection_coro_yield yield_result = from_coro & 0xffffffff;

if (UNLIKELY(yield_result >= CONN_CORO_ASYNC)) {
int await_fd = (int)((uint64_t)from_coro >> 32);
yield_result = resume_async(tq->lwan, yield_result, await_fd,
conn_to_resume, epoll_fd);
}

if (UNLIKELY(yield_result == CONN_CORO_ABORT)) {
enum lwan_connection_coro_yield from_coro = coro_resume_value(
conn_to_resume->coro, (int64_t)(intptr_t)conn_to_yield);
if (UNLIKELY(from_coro == CONN_CORO_ABORT)) {
timeout_queue_expire(tq, conn_to_resume);
} else {
update_epoll_flags(tq->lwan, conn_to_resume, epoll_fd, yield_result);
update_epoll_flags(tq->lwan, conn_to_resume, epoll_fd, from_coro);
timeout_queue_move_to_last(tq, conn_to_resume);
}
}
Expand Down
12 changes: 1 addition & 11 deletions src/lib/lwan.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,17 +334,7 @@ enum lwan_connection_coro_yield {
CONN_CORO_SUSPEND,
CONN_CORO_RESUME,

/* Group async stuff together to make it easier to check if a connection
* coroutine is yielding because of async reasons. */
CONN_CORO_ASYNC_AWAIT_READ,
CONN_CORO_ASYNC_AWAIT_WRITE,
CONN_CORO_ASYNC_AWAIT_READ_WRITE,

CONN_CORO_MAX,

/* Private API used by the async/await mechanism. Shouldn't be used
* by handlers. */
CONN_CORO_ASYNC = CONN_CORO_ASYNC_AWAIT_READ,
};

struct lwan_key_value {
Expand Down Expand Up @@ -675,7 +665,7 @@ int lwan_request_await_read(struct lwan_request *r, int fd);
int lwan_request_await_write(struct lwan_request *r, int fd);
int lwan_request_await_read_write(struct lwan_request *r, int fd);
int lwan_request_awaitv_any(struct lwan_request *r, ...);
void lwan_request_awaitv_all(struct lwan_request *r, ...);
int lwan_request_awaitv_all(struct lwan_request *r, ...);
ssize_t lwan_request_async_read(struct lwan_request *r, int fd, void *buf, size_t len);
ssize_t lwan_request_async_read_flags(struct lwan_request *request, int fd, void *buf, size_t len, int flags);
ssize_t lwan_request_async_write(struct lwan_request *r, int fd, const void *buf, size_t len);
Expand Down
Loading

0 comments on commit 3508980

Please sign in to comment.