diff --git a/ldms/src/store/avro_kafka/store_avro_kafka.c b/ldms/src/store/avro_kafka/store_avro_kafka.c index d10a18a9a..809f9cc55 100644 --- a/ldms/src/store/avro_kafka/store_avro_kafka.c +++ b/ldms/src/store/avro_kafka/store_avro_kafka.c @@ -915,6 +915,39 @@ static char *get_topic_name(aks_handle_t sh, ldms_set_t set, ldmsd_row_t row) return topic; } + +static int row_to_avro_payload(aks_handle_t sh, ldmsd_row_t row, + void **payload, size_t *sizep) +{ + avro_value_t avro_value; + serdes_schema_t *serdes_schema; + char errstr[512]; + int rc = 0; + + serdes_schema = serdes_schema_find(sh, (char *)row->schema_name, NULL, row); + if (!serdes_schema) { + LOG_ERROR("A serdes schema for '%s' could not be " + "constructed.\n", row->schema_name); + return 1; + } + /* Encode ldmsd_row_s as an Avro value */ + rc = serialize_row_as_avro(sh->serdes, serdes_schema, row, &avro_value); + if (rc) { + LOG_ERROR("Failed to format row as Avro value, error: %d\n", rc); + return rc; + } + /* Serialize an Avro value into a buffer */ + if (serdes_schema_serialize_avro(serdes_schema, &avro_value, + payload, sizep, + errstr, sizeof(errstr))) { + LOG_ERROR("Failed to serialize Avro row: '%s'\n", errstr); + rc = 1; + } + avro_value_decref(&avro_value); + + return rc; +} + /* protected by strgp->lock */ static int commit_rows(ldmsd_strgp_t strgp, ldms_set_t set, ldmsd_row_list_t row_list, @@ -923,9 +956,7 @@ commit_rows(ldmsd_strgp_t strgp, ldms_set_t set, ldmsd_row_list_t row_list, aks_handle_t sh; rd_kafka_topic_t *rkt; ldmsd_row_t row; - avro_value_t avro_row; int rc; - char errstr[512]; sh = strgp->store_handle; if (!sh) @@ -940,8 +971,7 @@ commit_rows(ldmsd_strgp_t strgp, ldms_set_t set, ldmsd_row_list_t row_list, { void *ser_buf = NULL; size_t ser_buf_size; - int ser_size; - serdes_schema_t *serdes_schema; + int ser_size; char *topic_name = get_topic_name(sh, set, row); if (!topic_name) { @@ -959,25 +989,11 @@ commit_rows(ldmsd_strgp_t strgp, ldms_set_t set, ldmsd_row_list_t row_list, } switch (sh->encoding) { case AKS_ENCODING_AVRO: - serdes_schema = serdes_schema_find(sh, (char *)row->schema_name, NULL, row); - if (!serdes_schema) { - LOG_ERROR("A serdes schema for '%s' could not be " - "constructed.\n", row->schema_name); - goto skip_row_1; - } - /* Encode ldmsd_row_s as an Avro value */ - rc = serialize_row_as_avro(sh->serdes, serdes_schema, row, &avro_row); + rc = row_to_avro_payload(sh, row, &ser_buf, &ser_buf_size); if (rc) { - LOG_ERROR("Failed to format row as Avro value, error: %d\n", rc); + LOG_ERROR("Failed to serialize row as AVRO object, error: %d", rc); goto skip_row_1; } - /* Serialize an Avro value into a buffer */ - if (serdes_schema_serialize_avro(serdes_schema, &avro_row, - &ser_buf, &ser_buf_size, - errstr, sizeof(errstr))) { - LOG_ERROR("Failed to serialize Avro row: '%s'\n", errstr); - goto skip_row_2; - } break; case AKS_ENCODING_JSON: /* Encode row as a JSON text object */ @@ -986,7 +1002,7 @@ commit_rows(ldmsd_strgp_t strgp, ldms_set_t set, ldmsd_row_list_t row_list, LOG_ERROR("Failed to serialize row as JSON object, error: %d", rc); goto skip_row_1; } - ser_buf_size = ser_size; + ser_buf_size = (size_t)ser_size; break; default: assert(0 == "Invalid/unsupported serialization encoding"); @@ -1002,9 +1018,6 @@ commit_rows(ldmsd_strgp_t strgp, ldms_set_t set, ldmsd_row_list_t row_list, rd_kafka_err2str(rd_kafka_last_error())); free(ser_buf); } - skip_row_2: - if (sh->encoding == AKS_ENCODING_AVRO) - avro_value_decref(&avro_row); skip_row_1: rd_kafka_topic_destroy(rkt); skip_row_0: