Skip to content

Commit

Permalink
Use link credit properly (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
vyzigold authored Sep 6, 2021
1 parent 5de1db5 commit 3aa4310
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 7 deletions.
11 changes: 10 additions & 1 deletion amqp_rcv_th.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,16 @@ static void handle_receive(app_data_t *app, pn_event_t *event,
int link_credit = pn_link_credit(l);
app->link_credit += link_credit;
int free = rb_free_size(app->rbin);
int credit = free - link_credit + 1;
if (free == 0 && app->amqp_block) {
pthread_mutex_lock(&app->rbin->rb_mutex);
pthread_cond_wait(&app->rbin->rb_free, &app->rbin->rb_mutex);
pthread_mutex_unlock(&app->rbin->rb_mutex);
free = rb_free_size(app->rbin);
}
if (!app->amqp_block) {
free++;
}
int credit = free - link_credit;
if (credit > 0) {
pn_link_flow(l, credit);
}
Expand Down
12 changes: 11 additions & 1 deletion bridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ enum program_args {
ARG_RING_BUFFER_COUNT,
ARG_RING_BUFFER_SIZE,
ARG_VERBOSE,
ARG_AMQP_BLOCK,
ARG_HELP
};

Expand Down Expand Up @@ -101,6 +102,10 @@ struct option_info option_info[] = {
"",
"Print extra info, multiple instance increase verbosity.",
""},
{{"amqp_block", no_argument, 0, ARG_AMQP_BLOCK},
"",
"Stop reading incoming messages if the buffer is full (%s)",
DEFAULT_AMQP_BLOCK},
{{"help", no_argument, 0, ARG_HELP}, "", "Print help.", ""}};

static void usage(char *program) {
Expand Down Expand Up @@ -191,6 +196,7 @@ int main(int argc, char **argv) {
app.peer_port = DEFAULT_INET_PORT;
app.ring_buffer_size = atoi(DEFAULT_RING_BUFFER_SIZE);
app.ring_buffer_count = atoi(DEFAULT_RING_BUFFER_COUNT);
app.amqp_block = false; /* disabled */

int num_args = sizeof(option_info) / sizeof(struct option_info);
struct option *longopts = malloc(sizeof(struct option) * num_args);
Expand Down Expand Up @@ -251,6 +257,9 @@ int main(int argc, char **argv) {
case 'v':
app.verbose++;
break;
case ARG_AMQP_BLOCK:
app.amqp_block = true;
break;
case 'h':
case ARG_HELP:
usage(argv[0]);
Expand Down Expand Up @@ -285,7 +294,8 @@ int main(int argc, char **argv) {
printf("Standalone mode\n");
}

app.rbin = rb_alloc(app.ring_buffer_count, app.ring_buffer_size);
app.rbin =
rb_alloc(app.ring_buffer_count, app.ring_buffer_size, app.amqp_block);

app.amqp_rcv_th_running = true;
pthread_create(&app.amqp_rcv_th, NULL, amqp_rcv_th, (void *)&app);
Expand Down
2 changes: 2 additions & 0 deletions bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#define DEFAULT_STOP_COUNT "0"
#define DEFAULT_RING_BUFFER_COUNT "5000"
#define DEFAULT_RING_BUFFER_SIZE "2048"
#define DEFAULT_AMQP_BLOCK "false"

#define AMQP_URL_REGEX \
"^(amqps*)://(([a-z]+)(:([a-z]+))*@)*([a-zA-Z_0-9.-]+)(:([0-9]+))*(.+)$"
Expand Down Expand Up @@ -71,6 +72,7 @@ typedef struct {
volatile long amqp_partial;
volatile long amqp_total_batches;
volatile long amqp_link_credit;
volatile bool amqp_block;

/* Ring buffer stats */
volatile long link_credit;
Expand Down
17 changes: 14 additions & 3 deletions rb.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
#include "rb.h"
#include "utils.h"

rb_rwbytes_t *rb_alloc(int count, int buf_size) {
rb_rwbytes_t *rb_alloc(int count, int buf_size, bool wake_producer) {
rb_rwbytes_t *rb = malloc(sizeof(rb_rwbytes_t));

rb->count = count;
rb->buf_size = buf_size;
rb->wake_producer = wake_producer;

if ((rb->ring_buffer = malloc(count * sizeof(pn_rwbytes_t))) == NULL) {
free(rb);
Expand Down Expand Up @@ -53,6 +54,10 @@ rb_rwbytes_t *rb_alloc(int count, int buf_size) {
pthread_cond_init(&rb->rb_ready, NULL);
pthread_mutex_init(&rb->rb_mutex, NULL);

if (rb->wake_producer) {
pthread_cond_init(&rb->rb_free, NULL);
}

return rb;
}

Expand Down Expand Up @@ -128,6 +133,12 @@ pn_rwbytes_t *rb_get(rb_rwbytes_t *rb) {

rb->tail = next;

if (rb->wake_producer && rb_free_size(rb) == 1) {
pthread_mutex_lock(&rb->rb_mutex);
pthread_cond_broadcast(&rb->rb_free);
pthread_mutex_unlock(&rb->rb_mutex);
}

rb->processed++;

return &rb->ring_buffer[rb->tail];
Expand All @@ -141,7 +152,7 @@ int rb_free_size(rb_rwbytes_t *rb) {
int head = rb->head;
int tail = rb->tail;

return head > tail ? rb->count - (head - tail) : tail - head;
return head > tail ? rb->count - (head - tail) - 1 : tail - head - 1;
}

int rb_size(rb_rwbytes_t *rb) { return rb->count; }
Expand All @@ -150,4 +161,4 @@ long rb_get_overruns(rb_rwbytes_t *rb) { return rb->overruns; }

long rb_get_processed(rb_rwbytes_t *rb) { return rb->processed; }

long rb_get_queue_block(rb_rwbytes_t *rb) { return rb->queue_block; }
long rb_get_queue_block(rb_rwbytes_t *rb) { return rb->queue_block; }
6 changes: 4 additions & 2 deletions rb.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ typedef struct {

int count;
int buf_size;
bool wake_producer;

volatile int head;
volatile int tail;

pthread_mutex_t rb_mutex;
pthread_cond_t rb_ready;
pthread_cond_t rb_free;

// stats
//
Expand All @@ -32,7 +34,7 @@ typedef struct {

} rb_rwbytes_t;

extern rb_rwbytes_t *rb_alloc(int count, int buf_size);
extern rb_rwbytes_t *rb_alloc(int count, int buf_size, bool wake_producer);

extern pn_rwbytes_t *rb_get_head(rb_rwbytes_t *rb);

Expand All @@ -52,4 +54,4 @@ extern int rb_size(rb_rwbytes_t *rb);

extern long rb_get_queue_block(rb_rwbytes_t *rb);

#endif
#endif

0 comments on commit 3aa4310

Please sign in to comment.