diff --git a/ngx_http_kafka_module.c b/ngx_http_kafka_module.c index a63e968..10625a1 100644 --- a/ngx_http_kafka_module.c +++ b/ngx_http_kafka_module.c @@ -49,6 +49,7 @@ typedef struct { rd_kafka_t *rk; rd_kafka_conf_t *rkc; ngx_array_t *broker_list; + size_t message_max_bytes; /* for rd_kafka_conf_t: default 1000000 */ } ngx_http_kafka_main_conf_t; static char *ngx_http_kafka_main_conf_broker_add(ngx_http_kafka_main_conf_t *cf, @@ -81,6 +82,13 @@ static ngx_command_t ngx_http_kafka_commands[] = { NGX_HTTP_MAIN_CONF_OFFSET, 0, NULL }, + { + ngx_string("kafka_message_max_bytes"), + NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1, + ngx_conf_set_size_slot, + NGX_HTTP_MAIN_CONF_OFFSET, + offsetof(ngx_http_kafka_main_conf_t, message_max_bytes), + NULL }, { ngx_string("kafka_topic"), NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, @@ -172,6 +180,8 @@ void *ngx_http_kafka_create_main_conf(ngx_conf_t *cf) return NULL; } + conf->message_max_bytes = NGX_CONF_UNSET_SIZE; + return conf; } @@ -486,6 +496,39 @@ static void ngx_http_kafka_post_callback_handler(ngx_http_request_t *r) } } +static void do_rd_kafka_conf_set(rd_kafka_conf_t *conf, const char *name, + const char *value, ngx_log_t *log) +{ + char errstr[512]; + rd_kafka_conf_res_t ret; + + ret = rd_kafka_conf_set(conf, name, value, errstr, sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) { + ngx_log_error(NGX_LOG_ERR, log, 0, (const char *)errstr); + /* ignore error */ + } +} + +static void ngx_http_kafka_conf_set_message_max_bytes(ngx_cycle_t *cycle, + ngx_http_kafka_main_conf_t *main_conf) +{ + unsigned char buf[32]; + + if (main_conf->message_max_bytes == NGX_CONF_UNSET_SIZE) + return; /* no option */ + + ngx_snprintf(buf, sizeof(buf), "%lu%Z", main_conf->message_max_bytes); + do_rd_kafka_conf_set(main_conf->rkc, + "message.max.bytes", + (const char *)buf, + cycle->log); +} + +static void ngx_http_kafka_conf_set(ngx_cycle_t *cycle, + ngx_http_kafka_main_conf_t *main_conf) +{ + ngx_http_kafka_conf_set_message_max_bytes(cycle, main_conf); +} ngx_int_t ngx_http_kafka_init_worker(ngx_cycle_t *cycle) { @@ -497,6 +540,7 @@ ngx_int_t ngx_http_kafka_init_worker(ngx_cycle_t *cycle) ngx_http_kafka_module); main_conf->rkc = rd_kafka_conf_new(); rd_kafka_conf_set_dr_cb(main_conf->rkc, kafka_callback_handler); + ngx_http_kafka_conf_set(cycle, main_conf); main_conf->rk = rd_kafka_new(RD_KAFKA_PRODUCER, main_conf->rkc, NULL, 0); broker_list = main_conf->broker_list->elts;