Skip to content

Commit

Permalink
store_avro_kafka: fix memory leak
Browse files Browse the repository at this point in the history
Refactor the code a bit more to allow a missing call to
avro_value_iface_decref(). This should plug a memory leak.
  • Loading branch information
morrone committed Sep 20, 2023
1 parent ac56104 commit 4a03643
Showing 1 changed file with 25 additions and 21 deletions.
46 changes: 25 additions & 21 deletions ldms/src/store/avro_kafka/store_avro_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -690,40 +690,33 @@ static int set_avro_value_from_col(avro_value_t *col_value,
return rc;
}

static int serialize_row_as_avro(serdes_t *serdes,
serdes_schema_t *serdes_schema,
ldmsd_row_t row, avro_value_t *avro)
static int serialize_columns_of_row(avro_schema_t schema,
ldmsd_row_t row, avro_value_t *avro_row)
{
int rc, i;
ldmsd_col_t col;
avro_schema_t schema = serdes_schema_avro(serdes_schema);
avro_value_iface_t *class =
avro_generic_class_from_schema(schema);

avro_value_t avro_row, avro_col;
avro_generic_value_new(class, &avro_row);
avro_value_t avro_col;

for (i = 0; i < row->col_count; i++) {
char *avro_name;
col = &row->cols[i];
avro_name = ldmsd_avro_name_get(col->name);
rc = avro_value_get_by_name(&avro_row, avro_name,
rc = avro_value_get_by_name(avro_row, avro_name,
&avro_col, NULL);
free(avro_name);
if (rc) {
LOG_ERROR("Error %d retrieving '%s' "
"from '%s' schema\n", rc, col->name, avro_schema_name(schema));
LOG_ERROR("Error %d retrieving '%s' from '%s' schema\n",
rc, col->name, avro_schema_name(schema));
continue;
}
rc = set_avro_value_from_col(&avro_col, col);
}
#ifdef AVRO_KAFKA_DEBUG
char *json_buf;
avro_value_to_json(&avro_row, 0, &json_buf);
avro_value_to_json(avro_row, 0, &json_buf);
fprintf(stderr, "%s\n", json_buf);
free(json_buf);
#endif
*avro = avro_row;
return 0;
}

Expand Down Expand Up @@ -919,32 +912,43 @@ static char *get_topic_name(aks_handle_t sh, ldms_set_t set, ldmsd_row_t row)
static int row_to_avro_payload(aks_handle_t sh, ldmsd_row_t row,
void **payload, size_t *sizep)
{
avro_value_t avro_value;
serdes_schema_t *serdes_schema;
avro_schema_t schema;
avro_value_iface_t *class;
avro_value_t avro_row;

char errstr[512];
int rc = 0;

serdes_schema = serdes_schema_find(sh, (char *)row->schema_name, NULL, row);
if (!serdes_schema) {
LOG_ERROR("A serdes schema for '%s' could not be "
"constructed.\n", row->schema_name);
return 1;
rc = 1;
goto out1;
}
schema = serdes_schema_avro(serdes_schema);
class = avro_generic_class_from_schema(schema);
avro_generic_value_new(class, &avro_row);

/* Encode ldmsd_row_s as an Avro value */
rc = serialize_row_as_avro(sh->serdes, serdes_schema, row, &avro_value);
rc = serialize_columns_of_row(schema, row, &avro_row);
if (rc) {
LOG_ERROR("Failed to format row as Avro value, error: %d\n", rc);
return rc;
goto out2;
}
/* Serialize an Avro value into a buffer */
if (serdes_schema_serialize_avro(serdes_schema, &avro_value,
if (serdes_schema_serialize_avro(serdes_schema, &avro_row,
payload, sizep,
errstr, sizeof(errstr))) {
LOG_ERROR("Failed to serialize Avro row: '%s'\n", errstr);
rc = 1;
goto out2;
}
avro_value_decref(&avro_value);

out2:
avro_value_decref(&avro_row);
avro_value_iface_decref(class);
out1:
return rc;
}

Expand Down

0 comments on commit 4a03643

Please sign in to comment.