Skip to content

Commit

Permalink
ensure flushing any pending input data if any
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrea Guzzo committed May 23, 2014
1 parent f12cca8 commit bfa9ce4
Showing 1 changed file with 40 additions and 0 deletions.
40 changes: 40 additions & 0 deletions src/iomux.c
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,19 @@ iomux_run(iomux_t *iomux, struct timeval *tv_default)
iomux_connection_t *tmp;
TAILQ_FOREACH_SAFE(connection, &iomux->connections_list, next, tmp) {
int fd = connection->fd;
int len = connection->inlen;
if (len && connection->cbs.mux_input) {
int mb = connection->cbs.mux_input(iomux, fd, connection->inbuf, len, connection->cbs.priv);
if (iomux->connections[fd] == connection && iomux->connections[fd]->inlen == connection->inlen)
{
if (mb == connection->inlen) {
connection->inlen = 0;
} else if (mb) {
memmove(connection->inbuf, connection->inbuf + mb, len - mb);
connection->inlen -= mb;
}
}
}
iomux_output_chunk_t *chunk = TAILQ_FIRST(&connection->output_queue);
if (chunk) {
memcpy(&iomux->events[n], &connection->event, 2 * sizeof(struct kevent));
Expand Down Expand Up @@ -1088,6 +1101,20 @@ iomux_run(iomux_t *iomux, struct timeval *tv_default)
iomux_connection_t *tmp;
TAILQ_FOREACH_SAFE(connection, &iomux->connections_list, next, tmp) {
int fd = connection->fd;
int len = connection->inlen;
if (len && connection->cbs.mux_input) {
int mb = connection->cbs.mux_input(iomux, fd, connection->inbuf, len, connection->cbs.priv);
if (iomux->connections[fd] == connection && iomux->connections[fd]->inlen == connection->inlen)
{
if (mb == connection->inlen) {
connection->inlen = 0;
} else if (mb) {
memmove(connection->inbuf, connection->inbuf + mb, len - mb);
connection->inlen -= mb;
}
}
}

iomux_output_chunk_t *chunk = TAILQ_FIRST(&connection->output_queue);

if (!chunk && connection->cbs.mux_output) {
Expand Down Expand Up @@ -1211,6 +1238,19 @@ iomux_run(iomux_t *iomux, struct timeval *tv_default)
iomux_connection_t *tmp;
TAILQ_FOREACH_SAFE(connection, &iomux->connections_list, next, tmp) {
int fd = connection->fd;
int len = connection->inlen;
if (len && connection->cbs.mux_input) {
int mb = connection->cbs.mux_input(iomux, fd, connection->inbuf, len, connection->cbs.priv);
if (iomux->connections[fd] == connection && iomux->connections[fd]->inlen == connection->inlen)
{
if (mb == connection->inlen) {
connection->inlen = 0;
} else if (mb) {
memmove(connection->inbuf, connection->inbuf + mb, len - mb);
connection->inlen -= mb;
}
}
}
iomux_output_chunk_t *chunk = TAILQ_FIRST(&connection->output_queue);

// always register managed fds for reading (even if
Expand Down

0 comments on commit bfa9ce4

Please sign in to comment.