diff --git a/ldms/src/store/avro_kafka/store_avro_kafka.c b/ldms/src/store/avro_kafka/store_avro_kafka.c index 809f9cc55..e78c25d81 100644 --- a/ldms/src/store/avro_kafka/store_avro_kafka.c +++ b/ldms/src/store/avro_kafka/store_avro_kafka.c @@ -690,40 +690,33 @@ static int set_avro_value_from_col(avro_value_t *col_value, return rc; } -static int serialize_row_as_avro(serdes_t *serdes, - serdes_schema_t *serdes_schema, - ldmsd_row_t row, avro_value_t *avro) +static int serialize_columns_of_row(avro_schema_t schema, + ldmsd_row_t row, avro_value_t *avro_row) { int rc, i; ldmsd_col_t col; - avro_schema_t schema = serdes_schema_avro(serdes_schema); - avro_value_iface_t *class = - avro_generic_class_from_schema(schema); - - avro_value_t avro_row, avro_col; - avro_generic_value_new(class, &avro_row); + avro_value_t avro_col; for (i = 0; i < row->col_count; i++) { char *avro_name; col = &row->cols[i]; avro_name = ldmsd_avro_name_get(col->name); - rc = avro_value_get_by_name(&avro_row, avro_name, + rc = avro_value_get_by_name(avro_row, avro_name, &avro_col, NULL); free(avro_name); if (rc) { - LOG_ERROR("Error %d retrieving '%s' " - "from '%s' schema\n", rc, col->name, avro_schema_name(schema)); + LOG_ERROR("Error %d retrieving '%s' from '%s' schema\n", + rc, col->name, avro_schema_name(schema)); continue; } rc = set_avro_value_from_col(&avro_col, col); } #ifdef AVRO_KAFKA_DEBUG char *json_buf; - avro_value_to_json(&avro_row, 0, &json_buf); + avro_value_to_json(avro_row, 0, &json_buf); fprintf(stderr, "%s\n", json_buf); free(json_buf); #endif - *avro = avro_row; return 0; } @@ -919,8 +912,11 @@ static char *get_topic_name(aks_handle_t sh, ldms_set_t set, ldmsd_row_t row) static int row_to_avro_payload(aks_handle_t sh, ldmsd_row_t row, void **payload, size_t *sizep) { - avro_value_t avro_value; serdes_schema_t *serdes_schema; + avro_schema_t schema; + avro_value_iface_t *class; + avro_value_t avro_row; + char errstr[512]; int rc = 0; @@ -928,23 +924,31 @@ static int row_to_avro_payload(aks_handle_t sh, ldmsd_row_t row, if (!serdes_schema) { LOG_ERROR("A serdes schema for '%s' could not be " "constructed.\n", row->schema_name); - return 1; + rc = 1; + goto out1; } + schema = serdes_schema_avro(serdes_schema); + class = avro_generic_class_from_schema(schema); + avro_generic_value_new(class, &avro_row); + /* Encode ldmsd_row_s as an Avro value */ - rc = serialize_row_as_avro(sh->serdes, serdes_schema, row, &avro_value); + rc = serialize_columns_of_row(schema, row, &avro_row); if (rc) { LOG_ERROR("Failed to format row as Avro value, error: %d\n", rc); - return rc; + goto out2; } /* Serialize an Avro value into a buffer */ - if (serdes_schema_serialize_avro(serdes_schema, &avro_value, + if (serdes_schema_serialize_avro(serdes_schema, &avro_row, payload, sizep, errstr, sizeof(errstr))) { LOG_ERROR("Failed to serialize Avro row: '%s'\n", errstr); rc = 1; + goto out2; } - avro_value_decref(&avro_value); - +out2: + avro_value_decref(&avro_row); + avro_value_iface_decref(class); +out1: return rc; }