diff --git a/src/iomux.c b/src/iomux.c index 468ffb2..9227140 100644 --- a/src/iomux.c +++ b/src/iomux.c @@ -1012,6 +1012,76 @@ iomux_unset_output_callback(iomux_t *iomux, int fd) return prev ? 1 : 0; } +static inline int +iomux_poll_connection(iomux_t *iomux, iomux_connection_t *connection, struct timeval *expire_min) +{ + struct timeval now; + int fd = connection->fd; + int len = connection->inlen; + + gettimeofday(&now, NULL); + + if (len && connection->cbs.mux_input) { + int mb = connection->cbs.mux_input(iomux, fd, connection->inbuf, len, connection->cbs.priv); + if (iomux->connections[fd] == connection && iomux->connections[fd]->inlen == connection->inlen) + { + if (mb == connection->inlen) { + connection->inlen = 0; + } else if (mb) { + memmove(connection->inbuf, connection->inbuf + mb, len - mb); + connection->inlen -= mb; + } + } + } + if (connection->expire_time.tv_sec) { + if (timercmp(&now, &connection->expire_time, <)) { + memset(&connection->expire_time, 0, sizeof(connection->expire_time)); + if (connection->cbs.mux_timeout) { + connection->cbs.mux_timeout(iomux, fd, connection->cbs.priv); + // a timeout routine can remove an fd from the mux, so we need to check for its existance again + if (!iomux->connections[fd]) + return -1; + } + } else { + struct timeval expire_time; + timersub(&connection->expire_time, &now, &expire_time); + if (!expire_min->tv_sec || timercmp(expire_min, &expire_time, >)) + memcpy(expire_min, &expire_time, sizeof(struct timeval)); + } + } + + iomux_output_chunk_t *chunk = TAILQ_FIRST(&connection->output_queue); + if (!chunk && connection->cbs.mux_output) { + int len = 0; + unsigned char *data = NULL; + int mode = connection->cbs.mux_output(iomux, fd, &data, &len, connection->cbs.priv); + + // NOTE: the output callback might have removed the fd from the mux + if (!iomux->connections[fd]) { + free(data); + return -1; + } + + if (data) { + chunk = calloc(1, sizeof(iomux_output_chunk_t)); + if (mode == IOMUX_OUTPUT_MODE_COPY) { + chunk->data = malloc(len); + memcpy(chunk->data, data, len); + } else { + chunk->data = data; + } + chunk->free = (mode != IOMUX_OUTPUT_MODE_NONE); + chunk->len = len; + TAILQ_INSERT_TAIL(&connection->output_queue, chunk, next); + } + } + + if (chunk) + return 1; + + return 0; +} + #if defined(HAVE_KQUEUE) void iomux_run(iomux_t *iomux, struct timeval *tv_default) @@ -1019,86 +1089,37 @@ iomux_run(iomux_t *iomux, struct timeval *tv_default) int i; struct timespec ts; struct timeval expire_min = { 0, 0 }; - struct timeval now; MUTEX_LOCK(iomux); - gettimeofday(&now, NULL); int n = 0; iomux_connection_t *connection = NULL; iomux_connection_t *tmp; TAILQ_FOREACH_SAFE(connection, &iomux->connections_list, next, tmp) { - int fd = connection->fd; - int len = connection->inlen; - if (len && connection->cbs.mux_input) { - int mb = connection->cbs.mux_input(iomux, fd, connection->inbuf, len, connection->cbs.priv); - if (iomux->connections[fd] == connection && iomux->connections[fd]->inlen == connection->inlen) - { - if (mb == connection->inlen) { - connection->inlen = 0; - } else if (mb) { - memmove(connection->inbuf, connection->inbuf + mb, len - mb); - connection->inlen -= mb; - } - } - } - if (connection->expire_time.tv_sec) { - if (timercmp(&now, &connection->expire_time, <)) { - memset(&connection->expire_time, 0, sizeof(connection->expire_time)); - if (connection->cbs.mux_timeout) { - connection->cbs.mux_timeout(iomux, fd, connection->cbs.priv); - // a timeout routine can remove an fd from the mux, so we need to check for its existance again - if (!iomux->connections[fd]) - continue; - } - } else { - struct timeval expire_time; - timersub(&connection->expire_time, &now, &expire_time); - if (!expire_min.tv_sec || timercmp(&expire_min, &expire_time, >)) - memcpy(&expire_min, &expire_time, sizeof(struct timeval)); - - if (!tv_default || (tv_default != &expire_min && timercmp(tv_default, &expire_min, >))) - tv_default = &expire_min; - } - } + int prc = iomux_poll_connection(iomux, connection, &expire_min); - iomux_output_chunk_t *chunk = TAILQ_FIRST(&connection->output_queue); - if (chunk) { - memcpy(&iomux->events[n], &connection->event, 2 * sizeof(struct kevent)); - n += 2; - } else if (connection->cbs.mux_output) { - int len = 0; - unsigned char *data = NULL; - int mode = connection->cbs.mux_output(iomux, fd, &data, &len, connection->cbs.priv); - - if (!iomux->connections[fd]) { - free(data); + switch(prc) { + case -1: continue; - } - - if (data) { - iomux_output_chunk_t *chunk = calloc(1, sizeof(iomux_output_chunk_t)); - if (mode == IOMUX_OUTPUT_MODE_COPY) { - chunk->data = malloc(len); - memcpy(chunk->data, data, len); - } else { - chunk->data = data; - } - chunk->len = len; - chunk->free = (mode != IOMUX_OUTPUT_MODE_NONE); - TAILQ_INSERT_TAIL(&connection->output_queue, chunk, next); - memcpy(&iomux->events[n], connection->event, 2 * sizeof(struct kevent)); + case 1: + memcpy(&iomux->events[n], &connection->event, 2 * sizeof(struct kevent)); n += 2; - } else { + break; + case 0: + default: memcpy(&iomux->events[n], &connection->event, sizeof(struct kevent)); n++; - } - } else { - memcpy(&iomux->events[n], &connection->event, sizeof(struct kevent)); - n++; + break; } } + + if ((!tv_default && (expire_min.tv_sec || expire_min.tv_usec)) || + (tv_default != &expire_min && timercmp(tv_default, &expire_min, >))) + { + tv_default = &expire_min; + } + struct timeval *tv = iomux_adjust_timeout(iomux, tv_default); if (tv) { ts.tv_sec = tv->tv_sec; @@ -1178,78 +1199,28 @@ iomux_run(iomux_t *iomux, struct timeval *tv_default) iomux_connection_t *tmp; TAILQ_FOREACH_SAFE(connection, &iomux->connections_list, next, tmp) { int fd = connection->fd; - int len = connection->inlen; + int prc = iomux_poll_connection(iomux, connection, &expire_min); - if (len && connection->cbs.mux_input) { - int mb = connection->cbs.mux_input(iomux, fd, connection->inbuf, len, connection->cbs.priv); - if (iomux->connections[fd] == connection && iomux->connections[fd]->inlen == connection->inlen) - { - if (mb == connection->inlen) { - connection->inlen = 0; - } else if (mb) { - memmove(connection->inbuf, connection->inbuf + mb, len - mb); - connection->inlen -= mb; - } - } - } - - if (connection->expire_time.tv_sec) { - if (timercmp(&now, &connection->expire_time, <)) { - memset(&connection->expire_time, 0, sizeof(connection->expire_time)); - if (connection->cbs.mux_timeout) { - connection->cbs.mux_timeout(iomux, fd, connection->cbs.priv); - // a timeout routine can remove an fd from the mux, so we need to check for its existance again - if (!iomux->connections[fd]) - continue; - } - } else { - struct timeval expire_time; - timersub(&connection->expire_time, &now, &expire_time); - if (!expire_min.tv_sec || timercmp(&expire_min, &expire_time, >)) - memcpy(&expire_min, &expire_time, sizeof(struct timeval)); - - if (!tv_default || (tv_default != &expire_min && timercmp(tv_default, &expire_min, >))) - tv_default = &expire_min; - } - } - - iomux_output_chunk_t *chunk = TAILQ_FIRST(&connection->output_queue); - - if (!chunk && connection->cbs.mux_output) { - int len = 0; - unsigned char *data = NULL; - int mode = connection->cbs.mux_output(iomux, fd, &data, &len, connection->cbs.priv); - - // NOTE: the output callback might have removed the fd from the mux - if (!iomux->connections[fd]) { - free(data); + switch(prc) { + case -1: continue; - } - - if (data) { - chunk = calloc(1, sizeof(iomux_output_chunk_t)); - if (mode == IOMUX_OUTPUT_MODE_COPY) { - chunk->data = malloc(len); - memcpy(chunk->data, data, len); - } else { - chunk->data = data; + case 1: + { + struct epoll_event event; + bzero(&event, sizeof(event)); + event.data.fd = fd; + event.events = EPOLLIN | EPOLLOUT; + + int rc = epoll_ctl(iomux->efd, EPOLL_CTL_MOD, fd, &event); + if (rc == -1) { + fprintf(stderr, "Errors modifying fd %d to epoll instance %d : %s\n", + fd, iomux->efd, strerror(errno)); } - chunk->free = (mode != IOMUX_OUTPUT_MODE_NONE); - chunk->len = len; - TAILQ_INSERT_TAIL(&connection->output_queue, chunk, next); - } - } - if (chunk) { - struct epoll_event event; - bzero(&event, sizeof(event)); - event.data.fd = fd; - event.events = EPOLLIN | EPOLLOUT; - - int rc = epoll_ctl(iomux->efd, EPOLL_CTL_MOD, fd, &event); - if (rc == -1) { - fprintf(stderr, "Errors modifying fd %d to epoll instance %d : %s\n", - fd, iomux->efd, strerror(errno)); + break; } + case 0: + default: + break; } } @@ -1257,6 +1228,12 @@ iomux_run(iomux_t *iomux, struct timeval *tv_default) MUTEX_UNLOCK(iomux); + if ((!tv_default && (expire_min.tv_sec || expire_min.tv_usec)) || + (tv_default != &expire_min && timercmp(tv_default, &expire_min, >))) + { + tv_default = &expire_min; + } + // shrink the timeout if we have timers expiring earlier struct timeval *tv = iomux_adjust_timeout(iomux, tv_default); int epoll_waiting_time = tv ? ((tv->tv_sec * 1000) + (tv->tv_usec / 1000)) : -1; @@ -1351,80 +1328,34 @@ iomux_run(iomux_t *iomux, struct timeval *tv_default) iomux_connection_t *tmp; TAILQ_FOREACH_SAFE(connection, &iomux->connections_list, next, tmp) { int fd = connection->fd; - int len = connection->inlen; + int prc = iomux_poll_connection(iomux, connection, &expire_min); - if (len && connection->cbs.mux_input) { - int mb = connection->cbs.mux_input(iomux, fd, connection->inbuf, len, connection->cbs.priv); - if (iomux->connections[fd] == connection && iomux->connections[fd]->inlen == connection->inlen) - { - if (mb == connection->inlen) { - connection->inlen = 0; - } else if (mb) { - memmove(connection->inbuf, connection->inbuf + mb, len - mb); - connection->inlen -= mb; - } - } - } - - if (connection->expire_time.tv_sec) { - if (timercmp(&now, &connection->expire_time, <)) { - memset(&connection->expire_time, 0, sizeof(connection->expire_time)); - if (connection->cbs.mux_timeout) { - connection->cbs.mux_timeout(iomux, fd, connection->cbs.priv); - // a timeout routine can remove an fd from the mux, so we need to check for its existance again - if (!iomux->connections[fd]) - continue; - } - } else { - struct timeval expire_time; - timersub(&connection->expire_time, &now, &expire_time); - if (!expire_min.tv_sec || timercmp(&expire_min, &expire_time, >)) - memcpy(&expire_min, &expire_time, sizeof(struct timeval)); - - if (!tv_default || (tv_default != &expire_min && timercmp(tv_default, &expire_min, >))) - tv_default = &expire_min; - } - } - - iomux_output_chunk_t *chunk = TAILQ_FIRST(&connection->output_queue); - - // always register managed fds for reading (even if - // no mux_input callbacks is present) to detect EOF. - if (!chunk && connection->cbs.mux_output) { - int len = 0; - unsigned char *data = NULL; - int mode = connection->cbs.mux_output(iomux, fd, &data, &len, connection->cbs.priv); - - // NOTE: the output callback might have removed the fd from the mux - if (!iomux->connections[fd]) { - free(data); + switch(prc) { + case -1: continue; - } - - if (data) { - chunk = calloc(1, sizeof(iomux_output_chunk_t)); - if (mode == IOMUX_OUTPUT_MODE_COPY) { - chunk->data = malloc(len); - memcpy(chunk->data, data, len); - } else { - chunk->data = data; - } - chunk->free = (mode != IOMUX_OUTPUT_MODE_NONE); - chunk->len = len; - TAILQ_INSERT_TAIL(&connection->output_queue, chunk, next); - } + case 1: + // output pending data + FD_SET(fd, &rout[0]); + if (fd > maxfd) + maxfd = fd; + // NOTE: no break statement here + // since we still want to + // register the fd for input + case 0: + default: + FD_SET(fd, &rin[0]); + if (fd > maxfd) + maxfd = fd; + break; } - FD_SET(fd, &rin[0]); - if (fd > maxfd) - maxfd = fd; - if (chunk) { - // output pending data - FD_SET(fd, &rout[0]); - if (fd > maxfd) - maxfd = fd; - } + } + + if ((!tv_default && (expire_min.tv_sec || expire_min.tv_usec)) || + (tv_default != &expire_min && timercmp(tv_default, &expire_min, >))) + { + tv_default = &expire_min; } // NOTE: some select() implementations update the timeout with