Skip to content

Commit

Permalink
changed the output API to allow avoiding copies and not being limited…
Browse files Browse the repository at this point in the history
… by the internal output buffer

A caller would anyway keep the whole data in memory while it is being flushed through the mux.
In such cases it's just easier and faster to pass the data to the mux so it will take care of flushing,
drastically reducing copies. Note that the mux might still impose an upper limit to queued data
to avoid allowing to queueing more data than it's possible to send out
  • Loading branch information
xant committed May 22, 2014
1 parent 3937fc4 commit 93e3661
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 40 deletions.
16 changes: 10 additions & 6 deletions src/iomtee.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ iomtee_read_buffer(iomtee_t *tee, iomtee_fd_t *tfd, void *out, int len)
return read_len;
}

static void
static int
iomtee_output(iomux_t *iomux,
int fd,
unsigned char *out,
unsigned char **out,
int *len,
void *priv)
{
Expand All @@ -92,11 +92,15 @@ iomtee_output(iomux_t *iomux,
if (!tfd) {
// TODO - Error Messages
*len = 0;
return;
return IOMUX_OUTPUT_MODE_NONE;
}

int rb = iomtee_read_buffer(tee, tfd, out, *len);
*len = rb;
*len = abs(tee->wofx - tfd->rofx);
*out = malloc(*len);
int rb = iomtee_read_buffer(tee, tfd, *out, *len);
if (rb != *len)
*len = rb;
return IOMUX_OUTPUT_MODE_FREE;
}

static int
Expand All @@ -108,7 +112,7 @@ iomtee_input(iomux_t *iomux, int fd, unsigned char *data, int len, void *priv)
TAILQ_FOREACH(tee_fd, &tee->fds, next) {
if (tee_fd->fd == -1)
continue; // skip closed receivers
int wb = iomux_write(iomux, tee_fd->fd, data, len, -1);
int wb = iomux_write(iomux, tee_fd->fd, data, len, IOMUX_OUTPUT_MODE_COPY);
if (wb < len) {
if (wb < min_write)
min_write = wb;
Expand Down
63 changes: 36 additions & 27 deletions src/iomux.c
Original file line number Diff line number Diff line change
Expand Up @@ -758,11 +758,12 @@ int
iomux_write(iomux_t *iomux, int fd, unsigned char *buf, int len, int mode)
{
iomux_output_chunk_t *chunk = calloc(1, sizeof(iomux_output_chunk_t));
chunk->free = (mode != 0);
if (mode == -1) {
chunk->free = (mode != IOMUX_OUTPUT_MODE_NONE);
if (mode == IOMUX_OUTPUT_MODE_COPY) {
chunk->data = malloc(len);
memcpy(chunk->data, buf, len);
} else {
// TODO - check for unknown output modes
chunk->data = buf;
}
chunk->len = len;
Expand Down Expand Up @@ -987,27 +988,29 @@ iomux_run(iomux_t *iomux, struct timeval *tv_default)
memcpy(&iomux->events[n], &connection->event, 2 * sizeof(struct kevent));
n += 2;
} else if (connection->cbs.mux_output) {
int len = IOMUX_CONNECTION_BUFSIZE_DEFAULT;
unsigned char *data = malloc(len);
connection->cbs.mux_output(iomux, fd, data, &len,
connection->cbs.priv);
int len = 0;
unsigned char *data = NULL;
int mode = connection->cbs.mux_output(iomux, fd, &data, &len, connection->cbs.priv);

if (!iomux->connections[fd]) {
free(data);
continue;
}

if (len) {
data = realloc(data, len);
if (data) {
iomux_output_chunk_t *chunk = calloc(1, sizeof(iomux_output_chunk_t));
chunk->data = data;
if (mode == IOMUX_OUTPUT_MODE_COPY) {
chunk->data = malloc(len);
memcpy(chunk->data, data, len);
} else {
chunk->data = data;
}
chunk->len = len;
chunk->free = 1;
chunk->free = (mode != IOMUX_OUTPUT_MODE_NONE);
TAILQ_INSERT_TAIL(&connection->output_queue, chunk, next);
memcpy(&iomux->events[n], connection->event, 2 * sizeof(struct kevent));
n += 2;
} else {
free(data);
memcpy(&iomux->events[n], &connection->event, sizeof(struct kevent));
n++;
}
Expand Down Expand Up @@ -1094,22 +1097,25 @@ iomux_run(iomux_t *iomux, struct timeval *tv_default)
iomux_output_chunk_t *chunk = TAILQ_FIRST(&connection->output_queue);

if (!chunk && connection->cbs.mux_output) {
int len = IOMUX_CONNECTION_BUFSIZE_DEFAULT;
unsigned char *data = malloc(len);
connection->cbs.mux_output(iomux, fd, data, &len,
connection->cbs.priv);
int len = 0;
unsigned char *data = NULL;
int mode = connection->cbs.mux_output(iomux, fd, &data, &len, connection->cbs.priv);

// NOTE: the output callback might have removed the fd from the mux
if (!iomux->connections[fd]) {
free(data);
continue;
}

if (len) {
data = realloc(data, len);
if (data) {
chunk = calloc(1, sizeof(iomux_output_chunk_t));
chunk->free = 1;
chunk->data = data;
if (mode == IOMUX_OUTPUT_MODE_COPY) {
chunk->data = malloc(len);
memcpy(chunk->data, data, len);
} else {
chunk->data = data;
}
chunk->free = (mode != IOMUX_OUTPUT_MODE_NONE);
chunk->len = len;
TAILQ_INSERT_TAIL(&connection->output_queue, chunk, next);
}
Expand Down Expand Up @@ -1216,22 +1222,25 @@ iomux_run(iomux_t *iomux, struct timeval *tv_default)
// always register managed fds for reading (even if
// no mux_input callbacks is present) to detect EOF.
if (!chunk && connection->cbs.mux_output) {
int len = IOMUX_CONNECTION_BUFSIZE_DEFAULT;
unsigned char *data = malloc(len);
connection->cbs.mux_output(iomux, fd, data, &len,
connection->cbs.priv);
int len = 0;
unsigned char *data = NULL;
int mode = connection->cbs.mux_output(iomux, fd, &data, &len, connection->cbs.priv);

// NOTE: the output callback might have removed the fd from the mux
if (!iomux->connections[fd]) {
free(data);
continue;
}

if (len) {
data = realloc(data, len);
if (data) {
chunk = calloc(1, sizeof(iomux_output_chunk_t));
chunk->free = 1;
chunk->data = data;
if (mode == IOMUX_OUTPUT_MODE_COPY) {
chunk->data = malloc(len);
memcpy(chunk->data, data, len);
} else {
chunk->data = data;
}
chunk->free = (mode != IOMUX_OUTPUT_MODE_NONE);
chunk->len = len;
TAILQ_INSERT_TAIL(&connection->output_queue, chunk, next);
}
Expand Down
21 changes: 15 additions & 6 deletions src/iomux.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ typedef void (*iomux_cb_t)(iomux_t *iomux, void *priv);

typedef uint64_t iomux_timeout_id_t;

typedef enum {
IOMUX_OUTPUT_MODE_COPY = -1,
IOMUX_OUTPUT_MODE_FREE = 1,
IOMUX_OUTPUT_MODE_NONE = 0
} iomux_output_mode_t;

/**
* @brief Handle input coming from a managed filedescriptor
* @param iomux The iomux handle
Expand All @@ -41,17 +47,18 @@ typedef int (*iomux_input_callback_t)(iomux_t *iomux, int fd, unsigned char *dat
* @brief Callback called to poll for data to be written to a managed filedescriptor
* @param iomux The iomux handle
* @param fd The fd the timer relates to
* @param data A pointer to where to store data which is available for writing
* @param len On input it holds the size of the memory pointed by data;
* On output it MUST be set to the actual size of the data
* copied to the out pointer
* @param data A reference to the pointer to where the data is stored
* @param len A pointer to where to store length of the data
* @param priv the private pointer registered with the callbacks
* @return the iomux_output_mode which determines if the data has to be copied,
* freed or ignored (in which case the caller needs to take care of releasing the underlying memory)
*
* @note If no data is available for writing and hence has been copied
* to the data pointer, the callback MUST ensure setting *len to zero
* so that the iomux doesn't try sending garbage data
*/
typedef void (*iomux_output_callback_t)(iomux_t *iomux, int fd, unsigned char *data, int *len, void *priv);
typedef iomux_output_mode_t (*iomux_output_callback_t)(iomux_t *iomux, int fd, unsigned char **data, int *len, void *priv);


/*
* @brief Callback called when a timeout registered using iomux_set_timeout() expires
Expand Down Expand Up @@ -264,9 +271,11 @@ void iomux_run(iomux_t *iomux, struct timeval *timeout);
* @param fd The fd we want to write to
* @param buf The buffer to write
* @param len The length of the buffer
* @param mode the iomux_output_mode which determines if the data has to be copied,
* freed or ignored (in which case the caller needs to take care of releasing the underlying memory)
* @returns The number of written bytes
*/
int iomux_write(iomux_t *iomux, int fd, unsigned char *data, int len, int mode);
int iomux_write(iomux_t *iomux, int fd, unsigned char *data, int len, iomux_output_mode_t mode);

int iomux_set_output_callback(iomux_t *iomux, int fd, iomux_output_callback_t cb);
int iomux_unset_output_callback(iomux_t *iomux, int fd);
Expand Down
2 changes: 1 addition & 1 deletion test/iomux_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ main(int argc, char **argv)
ut_validate_int(iomux_add(mux, client, &callbacks), 1);

ut_testing("iomux_write(mux, client, %s, %d)", TEST_STRING, strlen(TEST_STRING));
ut_validate_int(iomux_write(mux, client, TEST_STRING, strlen(TEST_STRING), 0), strlen(TEST_STRING));
ut_validate_int(iomux_write(mux, client, TEST_STRING, strlen(TEST_STRING), IOMUX_OUTPUT_MODE_NONE), strlen(TEST_STRING));

ut_testing("iomux_input_callback() callback");
iomux_loop(mux, NULL);
Expand Down

0 comments on commit 93e3661

Please sign in to comment.