From 3aa4310c7d690e82315a2d947e8ec75145184c19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jarom=C3=ADr=20Wysoglad?= Date: Mon, 6 Sep 2021 15:46:07 +0200 Subject: [PATCH] Use link credit properly (#21) --- amqp_rcv_th.c | 11 ++++++++++- bridge.c | 12 +++++++++++- bridge.h | 2 ++ rb.c | 17 ++++++++++++++--- rb.h | 6 ++++-- 5 files changed, 41 insertions(+), 7 deletions(-) diff --git a/amqp_rcv_th.c b/amqp_rcv_th.c index aecd90f..d6d3357 100644 --- a/amqp_rcv_th.c +++ b/amqp_rcv_th.c @@ -86,7 +86,16 @@ static void handle_receive(app_data_t *app, pn_event_t *event, int link_credit = pn_link_credit(l); app->link_credit += link_credit; int free = rb_free_size(app->rbin); - int credit = free - link_credit + 1; + if (free == 0 && app->amqp_block) { + pthread_mutex_lock(&app->rbin->rb_mutex); + pthread_cond_wait(&app->rbin->rb_free, &app->rbin->rb_mutex); + pthread_mutex_unlock(&app->rbin->rb_mutex); + free = rb_free_size(app->rbin); + } + if (!app->amqp_block) { + free++; + } + int credit = free - link_credit; if (credit > 0) { pn_link_flow(l, credit); } diff --git a/bridge.c b/bridge.c index c5917f0..79b65cc 100644 --- a/bridge.c +++ b/bridge.c @@ -50,6 +50,7 @@ enum program_args { ARG_RING_BUFFER_COUNT, ARG_RING_BUFFER_SIZE, ARG_VERBOSE, + ARG_AMQP_BLOCK, ARG_HELP }; @@ -101,6 +102,10 @@ struct option_info option_info[] = { "", "Print extra info, multiple instance increase verbosity.", ""}, + {{"amqp_block", no_argument, 0, ARG_AMQP_BLOCK}, + "", + "Stop reading incoming messages if the buffer is full (%s)", + DEFAULT_AMQP_BLOCK}, {{"help", no_argument, 0, ARG_HELP}, "", "Print help.", ""}}; static void usage(char *program) { @@ -191,6 +196,7 @@ int main(int argc, char **argv) { app.peer_port = DEFAULT_INET_PORT; app.ring_buffer_size = atoi(DEFAULT_RING_BUFFER_SIZE); app.ring_buffer_count = atoi(DEFAULT_RING_BUFFER_COUNT); + app.amqp_block = false; /* disabled */ int num_args = sizeof(option_info) / sizeof(struct option_info); struct option *longopts = malloc(sizeof(struct option) * num_args); @@ -251,6 +257,9 @@ int main(int argc, char **argv) { case 'v': app.verbose++; break; + case ARG_AMQP_BLOCK: + app.amqp_block = true; + break; case 'h': case ARG_HELP: usage(argv[0]); @@ -285,7 +294,8 @@ int main(int argc, char **argv) { printf("Standalone mode\n"); } - app.rbin = rb_alloc(app.ring_buffer_count, app.ring_buffer_size); + app.rbin = + rb_alloc(app.ring_buffer_count, app.ring_buffer_size, app.amqp_block); app.amqp_rcv_th_running = true; pthread_create(&app.amqp_rcv_th, NULL, amqp_rcv_th, (void *)&app); diff --git a/bridge.h b/bridge.h index 6b6c533..04077cb 100644 --- a/bridge.h +++ b/bridge.h @@ -23,6 +23,7 @@ #define DEFAULT_STOP_COUNT "0" #define DEFAULT_RING_BUFFER_COUNT "5000" #define DEFAULT_RING_BUFFER_SIZE "2048" +#define DEFAULT_AMQP_BLOCK "false" #define AMQP_URL_REGEX \ "^(amqps*)://(([a-z]+)(:([a-z]+))*@)*([a-zA-Z_0-9.-]+)(:([0-9]+))*(.+)$" @@ -71,6 +72,7 @@ typedef struct { volatile long amqp_partial; volatile long amqp_total_batches; volatile long amqp_link_credit; + volatile bool amqp_block; /* Ring buffer stats */ volatile long link_credit; diff --git a/rb.c b/rb.c index 84de234..66d7aff 100644 --- a/rb.c +++ b/rb.c @@ -11,11 +11,12 @@ #include "rb.h" #include "utils.h" -rb_rwbytes_t *rb_alloc(int count, int buf_size) { +rb_rwbytes_t *rb_alloc(int count, int buf_size, bool wake_producer) { rb_rwbytes_t *rb = malloc(sizeof(rb_rwbytes_t)); rb->count = count; rb->buf_size = buf_size; + rb->wake_producer = wake_producer; if ((rb->ring_buffer = malloc(count * sizeof(pn_rwbytes_t))) == NULL) { free(rb); @@ -53,6 +54,10 @@ rb_rwbytes_t *rb_alloc(int count, int buf_size) { pthread_cond_init(&rb->rb_ready, NULL); pthread_mutex_init(&rb->rb_mutex, NULL); + if (rb->wake_producer) { + pthread_cond_init(&rb->rb_free, NULL); + } + return rb; } @@ -128,6 +133,12 @@ pn_rwbytes_t *rb_get(rb_rwbytes_t *rb) { rb->tail = next; + if (rb->wake_producer && rb_free_size(rb) == 1) { + pthread_mutex_lock(&rb->rb_mutex); + pthread_cond_broadcast(&rb->rb_free); + pthread_mutex_unlock(&rb->rb_mutex); + } + rb->processed++; return &rb->ring_buffer[rb->tail]; @@ -141,7 +152,7 @@ int rb_free_size(rb_rwbytes_t *rb) { int head = rb->head; int tail = rb->tail; - return head > tail ? rb->count - (head - tail) : tail - head; + return head > tail ? rb->count - (head - tail) - 1 : tail - head - 1; } int rb_size(rb_rwbytes_t *rb) { return rb->count; } @@ -150,4 +161,4 @@ long rb_get_overruns(rb_rwbytes_t *rb) { return rb->overruns; } long rb_get_processed(rb_rwbytes_t *rb) { return rb->processed; } -long rb_get_queue_block(rb_rwbytes_t *rb) { return rb->queue_block; } \ No newline at end of file +long rb_get_queue_block(rb_rwbytes_t *rb) { return rb->queue_block; } diff --git a/rb.h b/rb.h index 1336348..7865516 100644 --- a/rb.h +++ b/rb.h @@ -12,12 +12,14 @@ typedef struct { int count; int buf_size; + bool wake_producer; volatile int head; volatile int tail; pthread_mutex_t rb_mutex; pthread_cond_t rb_ready; + pthread_cond_t rb_free; // stats // @@ -32,7 +34,7 @@ typedef struct { } rb_rwbytes_t; -extern rb_rwbytes_t *rb_alloc(int count, int buf_size); +extern rb_rwbytes_t *rb_alloc(int count, int buf_size, bool wake_producer); extern pn_rwbytes_t *rb_get_head(rb_rwbytes_t *rb); @@ -52,4 +54,4 @@ extern int rb_size(rb_rwbytes_t *rb); extern long rb_get_queue_block(rb_rwbytes_t *rb); -#endif \ No newline at end of file +#endif