Skip to content

Commit

Permalink
use a (macro-based) linklist to keep a list of active filedescriptor
Browse files Browse the repository at this point in the history
to avoid iterating over the main array where the connections are stored
since it contains many holes and wastes cpu cycles (because of cache misses and such)
when handling a big amount of connections
  • Loading branch information
Andrea Guzzo committed Apr 19, 2014
1 parent bc0c52f commit 6d56048
Showing 1 changed file with 65 additions and 59 deletions.
124 changes: 65 additions & 59 deletions src/iomux.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ void iomux_run(iomux_t *iomux, struct timeval *tv_default);
int iomux_hangup = 0;

//! \brief iomux connection strucure
typedef struct __iomux_connection {
typedef struct __iomux_connection_s {
int fd;
uint32_t flags;
iomux_callbacks_t cbs;
unsigned char *inbuf;
Expand All @@ -63,6 +64,7 @@ typedef struct __iomux_connection {
int inlen;
int outlen;
iomux_timeout_id_t timeout_id;
TAILQ_ENTRY(__iomux_connection_s) next;
#if defined(HAVE_KQUEUE)
int16_t kfilters[2];
struct kevent event[2];
Expand All @@ -73,14 +75,14 @@ typedef struct __iomux_connection {
typedef struct __iomux_timeout {
iomux_timeout_id_t id;
struct timeval expire_time;
TAILQ_ENTRY(__iomux_timeout) timeout_list;
void (*cb)(iomux_t *iomux, void *priv);
void *priv;
} iomux_timeout_t;

//! \brief IOMUX base structure
struct __iomux {
iomux_connection_t **connections;
TAILQ_HEAD(, __iomux_connection_s) connections_list;
int maxfd;
int minfd;
int bufsize;
Expand Down Expand Up @@ -167,6 +169,7 @@ iomux_create(int bufsize, int threadsafe)
#endif

iomux->connections = calloc(1, sizeof(iomux_connection_t *) * iomux->maxconnections);
TAILQ_INIT(&iomux->connections_list);

iomux->timeouts = bh_create();

Expand Down Expand Up @@ -258,12 +261,14 @@ iomux_add(iomux_t *iomux, int fd, iomux_callbacks_t *cbs)
connection->inbuf = malloc(iomux->bufsize);
connection->outbuf = malloc(iomux->bufsize);
connection->bufsize = iomux->bufsize;
connection->fd = fd;

iomux->connections[fd] = connection;
iomux->num_fds++;
while (!iomux->connections[iomux->minfd] && iomux->minfd != iomux->maxfd)
iomux->minfd++;

TAILQ_INSERT_TAIL(&iomux->connections_list, connection, next);
// if we have no emfile_fd saved, let's open one now
// it could have been previously closed because we
// reached the EMFILE condition but we were not able
Expand Down Expand Up @@ -307,6 +312,7 @@ iomux_remove(iomux_t *iomux, int fd)
EV_SET(&iomux->connections[fd]->event[i], fd, iomux->connections[fd]->kfilters[i], EV_DELETE | EV_ONESHOT, 0, 0, 0);
}
#endif
TAILQ_REMOVE(&iomux->connections_list, iomux->connections[fd], next);
free(iomux->connections[fd]->inbuf);
free(iomux->connections[fd]->outbuf);
free(iomux->connections[fd]);
Expand Down Expand Up @@ -910,33 +916,34 @@ iomux_run(iomux_t *iomux, struct timeval *tv_default)
MUTEX_LOCK(iomux);

int n = 0;
for (i = iomux->minfd; i <= iomux->maxfd; i++) {
if (!iomux->connections[i])
continue;
if (iomux->connections[i]->cbs.mux_output) {
int maxlen = iomux->connections[i]->bufsize - iomux->connections[i]->outlen;
iomux_connection_t *connection = NULL;
iomux_connection_t *tmp;
TAILQ_FOREACH_SAFE(connection, &iomux->connections_list, next, tmp) {
int fd = connection->fd;
if (connection->cbs.mux_output) {
int maxlen = connection->bufsize - connection->outlen;
unsigned char data[maxlen];
int len = maxlen;
iomux->connections[i]->cbs.mux_output(iomux, i, data, &len,
iomux->connections[i]->cbs.priv);
connection->cbs.mux_output(iomux, fd, data, &len,
connection->cbs.priv);

if (!iomux->connections[i])
if (!iomux->connections[fd])
continue;

if (len) {
memcpy(iomux->connections[i]->outbuf + iomux->connections[i]->outlen, data, len);
iomux->connections[i]->outlen += len;
memcpy(&iomux->events[n], &iomux->connections[i]->event, 2 * sizeof(struct kevent));
memcpy(connection->outbuf + connection->outlen, data, len);
connection->outlen += len;
memcpy(&iomux->events[n], connection->event, 2 * sizeof(struct kevent));
n += 2;
} else {
memcpy(&iomux->events[n], &iomux->connections[i]->event, sizeof(struct kevent));
memcpy(&iomux->events[n], &connection->event, sizeof(struct kevent));
n++;
}
} else if (iomux->connections[i]->outlen) {
memcpy(&iomux->events[n], &iomux->connections[i]->event, 2 * sizeof(struct kevent));
} else if (connection->outlen) {
memcpy(&iomux->events[n], &connection->event, 2 * sizeof(struct kevent));
n += 2;
} else {
memcpy(&iomux->events[n], &iomux->connections[i]->event, sizeof(struct kevent));
memcpy(&iomux->events[n], &connection->event, sizeof(struct kevent));
n++;
}
}
Expand Down Expand Up @@ -1011,42 +1018,40 @@ iomux_run(iomux_t *iomux, struct timeval *tv_default)

MUTEX_LOCK(iomux);

int i;
for (i = iomux->minfd; i <= iomux->maxfd; i++) {
if (!iomux->connections[i])
continue;
if (iomux->connections[i]->cbs.mux_output) {
int maxlen = iomux->connections[i]->bufsize - iomux->connections[i]->outlen;
iomux_connection_t *connection = NULL;
iomux_connection_t *tmp;
TAILQ_FOREACH_SAFE(connection, &iomux->connections_list, next, tmp) {
int fd = connection->fd;
if (connection->cbs.mux_output) {
int maxlen = connection->bufsize - connection->outlen;
unsigned char data[maxlen];
int len = maxlen;
iomux->connections[i]->cbs.mux_output(iomux, i, data, &len,
iomux->connections[i]->cbs.priv);
connection->cbs.mux_output(iomux, fd, data, &len,
connection->cbs.priv);

// NOTE: the output callback might have removed the fd from the mux
if (!iomux->connections[i])
if (!iomux->connections[fd])
continue;

if (len) {
memcpy(iomux->connections[i]->outbuf + iomux->connections[i]->outlen, data, len);
iomux->connections[i]->outlen += len;
memcpy(connection->outbuf + connection->outlen, data, len);
connection->outlen += len;
}
}
if (iomux->connections[i]->outlen) {
if (connection->outlen) {
struct epoll_event event;
bzero(&event, sizeof(event));
event.data.fd = i;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLOUT;

int rc = epoll_ctl(iomux->efd, EPOLL_CTL_MOD, i, &event);
int rc = epoll_ctl(iomux->efd, EPOLL_CTL_MOD, fd, &event);
if (rc == -1) {
fprintf(stderr, "Errors modifying fd %d to epoll instance %d : %s\n",
i, iomux->efd, strerror(errno));
fd, iomux->efd, strerror(errno));
}
}
}



MUTEX_UNLOCK(iomux);

// shrink the timeout if we have timers expiring earlier
Expand All @@ -1067,6 +1072,7 @@ iomux_run(iomux_t *iomux, struct timeval *tv_default)

MUTEX_LOCK(iomux);

int i;
for (i = 0; i < n; i++) {
if ((iomux->events[i].events & EPOLLHUP))
{
Expand Down Expand Up @@ -1124,36 +1130,36 @@ iomux_run(iomux_t *iomux, struct timeval *tv_default)

MUTEX_LOCK(iomux);

for (fd = iomux->minfd; fd <= iomux->maxfd; fd++) {
if (iomux->connections[fd]) {
iomux_connection_t *conn = iomux->connections[fd];
// always register managed fds for reading (even if
// no mux_input callbacks is present) to detect EOF.
if (conn->cbs.mux_output) {
int maxsize = iomux->connections[fd]->bufsize - iomux->connections[fd]->outlen;
unsigned char data[maxsize];
int len = maxsize;
iomux->connections[fd]->cbs.mux_output(iomux, fd, data, &len,
iomux->connections[fd]->cbs.priv);
if (!iomux->connections[fd])
continue;
iomux_connection_t *connection = NULL;
iomux_connection_t *tmp;
TAILQ_FOREACH_SAFE(connection, &iomux->connections_list, next, tmp) {
int fd = connection->fd;
// always register managed fds for reading (even if
// no mux_input callbacks is present) to detect EOF.
if (connection->cbs.mux_output) {
int maxsize = connection->bufsize - connection->outlen;
unsigned char data[maxsize];
int len = maxsize;
connection->cbs.mux_output(iomux, fd, data, &len,
connection->cbs.priv);
if (!iomux->connections[fd])
continue;

if (len) {
memcpy(iomux->connections[fd]->outbuf + iomux->connections[fd]->outlen, data, len);
iomux->connections[fd]->outlen += len;
}
if (len) {
memcpy(connection->outbuf + connection->outlen, data, len);
connection->outlen += len;
}
}

FD_SET(fd, &rin[0]);
if (fd > maxfd)
maxfd = fd;

FD_SET(fd, &rin[0]);
if (connection->outlen) {
// output pending data
FD_SET(fd, &rout[0]);
if (fd > maxfd)
maxfd = fd;

if (conn->outlen) {
// output pending data
FD_SET(fd, &rout[0]);
if (fd > maxfd)
maxfd = fd;
}
}
}

Expand Down

0 comments on commit 6d56048

Please sign in to comment.