Skip to content

Commit

Permalink
store_avro_kafka: refactor to improve symmetry
Browse files Browse the repository at this point in the history
  • Loading branch information
morrone committed Sep 20, 2023
1 parent 94b76db commit ac56104
Showing 1 changed file with 37 additions and 24 deletions.
61 changes: 37 additions & 24 deletions ldms/src/store/avro_kafka/store_avro_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,39 @@ static char *get_topic_name(aks_handle_t sh, ldms_set_t set, ldmsd_row_t row)
return topic;
}


static int row_to_avro_payload(aks_handle_t sh, ldmsd_row_t row,
void **payload, size_t *sizep)
{
avro_value_t avro_value;
serdes_schema_t *serdes_schema;
char errstr[512];
int rc = 0;

serdes_schema = serdes_schema_find(sh, (char *)row->schema_name, NULL, row);
if (!serdes_schema) {
LOG_ERROR("A serdes schema for '%s' could not be "
"constructed.\n", row->schema_name);
return 1;
}
/* Encode ldmsd_row_s as an Avro value */
rc = serialize_row_as_avro(sh->serdes, serdes_schema, row, &avro_value);
if (rc) {
LOG_ERROR("Failed to format row as Avro value, error: %d\n", rc);
return rc;
}
/* Serialize an Avro value into a buffer */
if (serdes_schema_serialize_avro(serdes_schema, &avro_value,
payload, sizep,
errstr, sizeof(errstr))) {
LOG_ERROR("Failed to serialize Avro row: '%s'\n", errstr);
rc = 1;
}
avro_value_decref(&avro_value);

return rc;
}

/* protected by strgp->lock */
static int
commit_rows(ldmsd_strgp_t strgp, ldms_set_t set, ldmsd_row_list_t row_list,
Expand All @@ -923,9 +956,7 @@ commit_rows(ldmsd_strgp_t strgp, ldms_set_t set, ldmsd_row_list_t row_list,
aks_handle_t sh;
rd_kafka_topic_t *rkt;
ldmsd_row_t row;
avro_value_t avro_row;
int rc;
char errstr[512];

sh = strgp->store_handle;
if (!sh)
Expand All @@ -940,8 +971,7 @@ commit_rows(ldmsd_strgp_t strgp, ldms_set_t set, ldmsd_row_list_t row_list,
{
void *ser_buf = NULL;
size_t ser_buf_size;
int ser_size;
serdes_schema_t *serdes_schema;
int ser_size;

char *topic_name = get_topic_name(sh, set, row);
if (!topic_name) {
Expand All @@ -959,25 +989,11 @@ commit_rows(ldmsd_strgp_t strgp, ldms_set_t set, ldmsd_row_list_t row_list,
}
switch (sh->encoding) {
case AKS_ENCODING_AVRO:
serdes_schema = serdes_schema_find(sh, (char *)row->schema_name, NULL, row);
if (!serdes_schema) {
LOG_ERROR("A serdes schema for '%s' could not be "
"constructed.\n", row->schema_name);
goto skip_row_1;
}
/* Encode ldmsd_row_s as an Avro value */
rc = serialize_row_as_avro(sh->serdes, serdes_schema, row, &avro_row);
rc = row_to_avro_payload(sh, row, &ser_buf, &ser_buf_size);
if (rc) {
LOG_ERROR("Failed to format row as Avro value, error: %d\n", rc);
LOG_ERROR("Failed to serialize row as AVRO object, error: %d", rc);
goto skip_row_1;
}
/* Serialize an Avro value into a buffer */
if (serdes_schema_serialize_avro(serdes_schema, &avro_row,
&ser_buf, &ser_buf_size,
errstr, sizeof(errstr))) {
LOG_ERROR("Failed to serialize Avro row: '%s'\n", errstr);
goto skip_row_2;
}
break;
case AKS_ENCODING_JSON:
/* Encode row as a JSON text object */
Expand All @@ -986,7 +1002,7 @@ commit_rows(ldmsd_strgp_t strgp, ldms_set_t set, ldmsd_row_list_t row_list,
LOG_ERROR("Failed to serialize row as JSON object, error: %d", rc);
goto skip_row_1;
}
ser_buf_size = ser_size;
ser_buf_size = (size_t)ser_size;
break;
default:
assert(0 == "Invalid/unsupported serialization encoding");
Expand All @@ -1002,9 +1018,6 @@ commit_rows(ldmsd_strgp_t strgp, ldms_set_t set, ldmsd_row_list_t row_list,
rd_kafka_err2str(rd_kafka_last_error()));
free(ser_buf);
}
skip_row_2:
if (sh->encoding == AKS_ENCODING_AVRO)
avro_value_decref(&avro_row);
skip_row_1:
rd_kafka_topic_destroy(rkt);
skip_row_0:
Expand Down

0 comments on commit ac56104

Please sign in to comment.