Skip to content

Commit

Permalink
minor refactoring to avoid repeated code
Browse files Browse the repository at this point in the history
  • Loading branch information
xant committed Dec 16, 2014
1 parent a5e5f8a commit 33f389a
Showing 1 changed file with 133 additions and 202 deletions.
335 changes: 133 additions & 202 deletions src/iomux.c
Original file line number Diff line number Diff line change
Expand Up @@ -1012,93 +1012,114 @@ iomux_unset_output_callback(iomux_t *iomux, int fd)
return prev ? 1 : 0;
}

static inline int
iomux_poll_connection(iomux_t *iomux, iomux_connection_t *connection, struct timeval *expire_min)
{
struct timeval now;
int fd = connection->fd;
int len = connection->inlen;

gettimeofday(&now, NULL);

if (len && connection->cbs.mux_input) {
int mb = connection->cbs.mux_input(iomux, fd, connection->inbuf, len, connection->cbs.priv);
if (iomux->connections[fd] == connection && iomux->connections[fd]->inlen == connection->inlen)
{
if (mb == connection->inlen) {
connection->inlen = 0;
} else if (mb) {
memmove(connection->inbuf, connection->inbuf + mb, len - mb);
connection->inlen -= mb;
}
}
}
if (connection->expire_time.tv_sec) {
if (timercmp(&now, &connection->expire_time, <)) {
memset(&connection->expire_time, 0, sizeof(connection->expire_time));
if (connection->cbs.mux_timeout) {
connection->cbs.mux_timeout(iomux, fd, connection->cbs.priv);
// a timeout routine can remove an fd from the mux, so we need to check for its existance again
if (!iomux->connections[fd])
return -1;
}
} else {
struct timeval expire_time;
timersub(&connection->expire_time, &now, &expire_time);
if (!expire_min->tv_sec || timercmp(expire_min, &expire_time, >))
memcpy(expire_min, &expire_time, sizeof(struct timeval));
}
}

iomux_output_chunk_t *chunk = TAILQ_FIRST(&connection->output_queue);
if (!chunk && connection->cbs.mux_output) {
int len = 0;
unsigned char *data = NULL;
int mode = connection->cbs.mux_output(iomux, fd, &data, &len, connection->cbs.priv);

// NOTE: the output callback might have removed the fd from the mux
if (!iomux->connections[fd]) {
free(data);
return -1;
}

if (data) {
chunk = calloc(1, sizeof(iomux_output_chunk_t));
if (mode == IOMUX_OUTPUT_MODE_COPY) {
chunk->data = malloc(len);
memcpy(chunk->data, data, len);
} else {
chunk->data = data;
}
chunk->free = (mode != IOMUX_OUTPUT_MODE_NONE);
chunk->len = len;
TAILQ_INSERT_TAIL(&connection->output_queue, chunk, next);
}
}

if (chunk)
return 1;

return 0;
}

#if defined(HAVE_KQUEUE)
void
iomux_run(iomux_t *iomux, struct timeval *tv_default)
{
int i;
struct timespec ts;
struct timeval expire_min = { 0, 0 };
struct timeval now;

MUTEX_LOCK(iomux);

gettimeofday(&now, NULL);
int n = 0;
iomux_connection_t *connection = NULL;
iomux_connection_t *tmp;
TAILQ_FOREACH_SAFE(connection, &iomux->connections_list, next, tmp) {
int fd = connection->fd;
int len = connection->inlen;
if (len && connection->cbs.mux_input) {
int mb = connection->cbs.mux_input(iomux, fd, connection->inbuf, len, connection->cbs.priv);
if (iomux->connections[fd] == connection && iomux->connections[fd]->inlen == connection->inlen)
{
if (mb == connection->inlen) {
connection->inlen = 0;
} else if (mb) {
memmove(connection->inbuf, connection->inbuf + mb, len - mb);
connection->inlen -= mb;
}
}
}
if (connection->expire_time.tv_sec) {
if (timercmp(&now, &connection->expire_time, <)) {
memset(&connection->expire_time, 0, sizeof(connection->expire_time));
if (connection->cbs.mux_timeout) {
connection->cbs.mux_timeout(iomux, fd, connection->cbs.priv);
// a timeout routine can remove an fd from the mux, so we need to check for its existance again
if (!iomux->connections[fd])
continue;
}
} else {
struct timeval expire_time;
timersub(&connection->expire_time, &now, &expire_time);
if (!expire_min.tv_sec || timercmp(&expire_min, &expire_time, >))
memcpy(&expire_min, &expire_time, sizeof(struct timeval));

if (!tv_default || (tv_default != &expire_min && timercmp(tv_default, &expire_min, >)))
tv_default = &expire_min;
}
}
int prc = iomux_poll_connection(iomux, connection, &expire_min);

iomux_output_chunk_t *chunk = TAILQ_FIRST(&connection->output_queue);
if (chunk) {
memcpy(&iomux->events[n], &connection->event, 2 * sizeof(struct kevent));
n += 2;
} else if (connection->cbs.mux_output) {
int len = 0;
unsigned char *data = NULL;
int mode = connection->cbs.mux_output(iomux, fd, &data, &len, connection->cbs.priv);

if (!iomux->connections[fd]) {
free(data);
switch(prc) {
case -1:
continue;
}

if (data) {
iomux_output_chunk_t *chunk = calloc(1, sizeof(iomux_output_chunk_t));
if (mode == IOMUX_OUTPUT_MODE_COPY) {
chunk->data = malloc(len);
memcpy(chunk->data, data, len);
} else {
chunk->data = data;
}
chunk->len = len;
chunk->free = (mode != IOMUX_OUTPUT_MODE_NONE);
TAILQ_INSERT_TAIL(&connection->output_queue, chunk, next);
memcpy(&iomux->events[n], connection->event, 2 * sizeof(struct kevent));
case 1:
memcpy(&iomux->events[n], &connection->event, 2 * sizeof(struct kevent));
n += 2;
} else {
break;
case 0:
default:
memcpy(&iomux->events[n], &connection->event, sizeof(struct kevent));
n++;
}
} else {
memcpy(&iomux->events[n], &connection->event, sizeof(struct kevent));
n++;
break;
}
}


if ((!tv_default && (expire_min.tv_sec || expire_min.tv_usec)) ||
(tv_default != &expire_min && timercmp(tv_default, &expire_min, >)))
{
tv_default = &expire_min;
}

struct timeval *tv = iomux_adjust_timeout(iomux, tv_default);
if (tv) {
ts.tv_sec = tv->tv_sec;
Expand Down Expand Up @@ -1178,85 +1199,41 @@ iomux_run(iomux_t *iomux, struct timeval *tv_default)
iomux_connection_t *tmp;
TAILQ_FOREACH_SAFE(connection, &iomux->connections_list, next, tmp) {
int fd = connection->fd;
int len = connection->inlen;
int prc = iomux_poll_connection(iomux, connection, &expire_min);

if (len && connection->cbs.mux_input) {
int mb = connection->cbs.mux_input(iomux, fd, connection->inbuf, len, connection->cbs.priv);
if (iomux->connections[fd] == connection && iomux->connections[fd]->inlen == connection->inlen)
{
if (mb == connection->inlen) {
connection->inlen = 0;
} else if (mb) {
memmove(connection->inbuf, connection->inbuf + mb, len - mb);
connection->inlen -= mb;
}
}
}

if (connection->expire_time.tv_sec) {
if (timercmp(&now, &connection->expire_time, <)) {
memset(&connection->expire_time, 0, sizeof(connection->expire_time));
if (connection->cbs.mux_timeout) {
connection->cbs.mux_timeout(iomux, fd, connection->cbs.priv);
// a timeout routine can remove an fd from the mux, so we need to check for its existance again
if (!iomux->connections[fd])
continue;
}
} else {
struct timeval expire_time;
timersub(&connection->expire_time, &now, &expire_time);
if (!expire_min.tv_sec || timercmp(&expire_min, &expire_time, >))
memcpy(&expire_min, &expire_time, sizeof(struct timeval));

if (!tv_default || (tv_default != &expire_min && timercmp(tv_default, &expire_min, >)))
tv_default = &expire_min;
}
}

iomux_output_chunk_t *chunk = TAILQ_FIRST(&connection->output_queue);

if (!chunk && connection->cbs.mux_output) {
int len = 0;
unsigned char *data = NULL;
int mode = connection->cbs.mux_output(iomux, fd, &data, &len, connection->cbs.priv);

// NOTE: the output callback might have removed the fd from the mux
if (!iomux->connections[fd]) {
free(data);
switch(prc) {
case -1:
continue;
}

if (data) {
chunk = calloc(1, sizeof(iomux_output_chunk_t));
if (mode == IOMUX_OUTPUT_MODE_COPY) {
chunk->data = malloc(len);
memcpy(chunk->data, data, len);
} else {
chunk->data = data;
case 1:
{
struct epoll_event event;
bzero(&event, sizeof(event));
event.data.fd = fd;
event.events = EPOLLIN | EPOLLOUT;

int rc = epoll_ctl(iomux->efd, EPOLL_CTL_MOD, fd, &event);
if (rc == -1) {
fprintf(stderr, "Errors modifying fd %d to epoll instance %d : %s\n",
fd, iomux->efd, strerror(errno));
}
chunk->free = (mode != IOMUX_OUTPUT_MODE_NONE);
chunk->len = len;
TAILQ_INSERT_TAIL(&connection->output_queue, chunk, next);
}
}
if (chunk) {
struct epoll_event event;
bzero(&event, sizeof(event));
event.data.fd = fd;
event.events = EPOLLIN | EPOLLOUT;

int rc = epoll_ctl(iomux->efd, EPOLL_CTL_MOD, fd, &event);
if (rc == -1) {
fprintf(stderr, "Errors modifying fd %d to epoll instance %d : %s\n",
fd, iomux->efd, strerror(errno));
break;
}
case 0:
default:
break;
}
}

int num_fds = iomux->num_fds;

MUTEX_UNLOCK(iomux);

if ((!tv_default && (expire_min.tv_sec || expire_min.tv_usec)) ||
(tv_default != &expire_min && timercmp(tv_default, &expire_min, >)))
{
tv_default = &expire_min;
}

// shrink the timeout if we have timers expiring earlier
struct timeval *tv = iomux_adjust_timeout(iomux, tv_default);
int epoll_waiting_time = tv ? ((tv->tv_sec * 1000) + (tv->tv_usec / 1000)) : -1;
Expand Down Expand Up @@ -1351,80 +1328,34 @@ iomux_run(iomux_t *iomux, struct timeval *tv_default)
iomux_connection_t *tmp;
TAILQ_FOREACH_SAFE(connection, &iomux->connections_list, next, tmp) {
int fd = connection->fd;
int len = connection->inlen;
int prc = iomux_poll_connection(iomux, connection, &expire_min);

if (len && connection->cbs.mux_input) {
int mb = connection->cbs.mux_input(iomux, fd, connection->inbuf, len, connection->cbs.priv);
if (iomux->connections[fd] == connection && iomux->connections[fd]->inlen == connection->inlen)
{
if (mb == connection->inlen) {
connection->inlen = 0;
} else if (mb) {
memmove(connection->inbuf, connection->inbuf + mb, len - mb);
connection->inlen -= mb;
}
}
}

if (connection->expire_time.tv_sec) {
if (timercmp(&now, &connection->expire_time, <)) {
memset(&connection->expire_time, 0, sizeof(connection->expire_time));
if (connection->cbs.mux_timeout) {
connection->cbs.mux_timeout(iomux, fd, connection->cbs.priv);
// a timeout routine can remove an fd from the mux, so we need to check for its existance again
if (!iomux->connections[fd])
continue;
}
} else {
struct timeval expire_time;
timersub(&connection->expire_time, &now, &expire_time);
if (!expire_min.tv_sec || timercmp(&expire_min, &expire_time, >))
memcpy(&expire_min, &expire_time, sizeof(struct timeval));

if (!tv_default || (tv_default != &expire_min && timercmp(tv_default, &expire_min, >)))
tv_default = &expire_min;
}
}

iomux_output_chunk_t *chunk = TAILQ_FIRST(&connection->output_queue);

// always register managed fds for reading (even if
// no mux_input callbacks is present) to detect EOF.
if (!chunk && connection->cbs.mux_output) {
int len = 0;
unsigned char *data = NULL;
int mode = connection->cbs.mux_output(iomux, fd, &data, &len, connection->cbs.priv);

// NOTE: the output callback might have removed the fd from the mux
if (!iomux->connections[fd]) {
free(data);
switch(prc) {
case -1:
continue;
}

if (data) {
chunk = calloc(1, sizeof(iomux_output_chunk_t));
if (mode == IOMUX_OUTPUT_MODE_COPY) {
chunk->data = malloc(len);
memcpy(chunk->data, data, len);
} else {
chunk->data = data;
}
chunk->free = (mode != IOMUX_OUTPUT_MODE_NONE);
chunk->len = len;
TAILQ_INSERT_TAIL(&connection->output_queue, chunk, next);
}
case 1:
// output pending data
FD_SET(fd, &rout[0]);
if (fd > maxfd)
maxfd = fd;
// NOTE: no break statement here
// since we still want to
// register the fd for input
case 0:
default:
FD_SET(fd, &rin[0]);
if (fd > maxfd)
maxfd = fd;
break;
}

FD_SET(fd, &rin[0]);
if (fd > maxfd)
maxfd = fd;

if (chunk) {
// output pending data
FD_SET(fd, &rout[0]);
if (fd > maxfd)
maxfd = fd;
}
}

if ((!tv_default && (expire_min.tv_sec || expire_min.tv_usec)) ||
(tv_default != &expire_min && timercmp(tv_default, &expire_min, >)))
{
tv_default = &expire_min;
}

// NOTE: some select() implementations update the timeout with
Expand Down

0 comments on commit 33f389a

Please sign in to comment.