Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix error path memory leaks in avro_kafka #1274

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions ldms/src/store/avro_kafka/store_avro_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ static ovis_log_t aks_log = NULL;
#define LOG_ERROR(FMT, ...) LOG(OVIS_LERROR, FMT, ##__VA_ARGS__)
#define LOG_INFO(FMT, ...) LOG(OVIS_LINFO, FMT, ##__VA_ARGS__)
#define LOG_WARN(FMT, ...) LOG(OVIS_LWARNING, FMT, ##__VA_ARGS__)
#define LOG_DEBUG(FMT, ...) LOG(OVIS_LDEBUG, FMT, ##__VA_ARGS__)

typedef struct aks_handle_s
{
Expand Down Expand Up @@ -440,6 +441,8 @@ static aks_handle_t __handle_new(ldmsd_strgp_t strgp)

sh->encoding = g_serdes_encoding;
sh->topic_fmt = strdup(g_topic_fmt);
if (!sh->topic_fmt)
goto err_1;

sh->rd_conf = rd_kafka_conf_dup(g_rd_conf);
if (!sh->rd_conf)
Expand Down Expand Up @@ -485,6 +488,7 @@ static aks_handle_t __handle_new(ldmsd_strgp_t strgp)
err_2:
rd_kafka_conf_destroy(sh->rd_conf);
err_1:
free(sh->topic_fmt);
free(sh);
err_0:
return NULL;
Expand Down Expand Up @@ -940,44 +944,47 @@ commit_rows(ldmsd_strgp_t strgp, ldms_set_t set, ldmsd_row_list_t row_list,
serdes_schema_t *serdes_schema;

char *topic_name = get_topic_name(sh, set, row);
LOG_INFO("topic name %s\n", topic_name);
if (!topic_name) {
LOG_ERROR("get_topic_name failed for schema '%s'\n", row->schema_name);
continue;
}
LOG_DEBUG("topic name %s\n", topic_name);
rkt = rd_kafka_topic_new(sh->rd, topic_name, NULL);
if (!rkt)
{
LOG_ERROR("rd_kafka_topic_new(\"%s\") failed, "
"errno: %d\n",
row->schema_name, errno);
continue;
topic_name, errno);
goto skip_row_0;
}
switch (sh->encoding) {
case AKS_ENCODING_AVRO:
serdes_schema = serdes_schema_find(sh, (char *)row->schema_name, NULL, row);
if (!serdes_schema) {
LOG_ERROR("A serdes schema for '%s' could not be "
"constructed.\n", row->schema_name);
continue;
goto skip_row_1;
}
/* Encode ldmsd_row_s as an Avro value */
rc = serialize_row_as_avro(sh->serdes, serdes_schema, row, &avro_row);
if (rc) {
LOG_ERROR("Failed to format row as Avro value, error: %d\n", rc);
continue;
goto skip_row_1;
}
/* Serialize an Avro value into a buffer */
if (serdes_schema_serialize_avro(serdes_schema, &avro_row,
&ser_buf, &ser_buf_size,
errstr, sizeof(errstr))) {
LOG_ERROR("Failed to serialize Avro row: '%s'\n", errstr);
avro_value_decref(&avro_row);
continue;
goto skip_row_2;
}
break;
case AKS_ENCODING_JSON:
/* Encode row as a JSON text object */
rc = ldmsd_row_to_json_object(row, (char **)&ser_buf, &ser_size);
if (rc) {
LOG_ERROR("Failed to serialize row as JSON object, error: %d", rc);
continue;
goto skip_row_1;
}
ser_buf_size = ser_size;
break;
Expand All @@ -993,10 +1000,14 @@ commit_rows(ldmsd_strgp_t strgp, ldms_set_t set, ldmsd_row_list_t row_list,
LOG_ERROR("rd_kafka_produce(\"%s\") failed, "
"\"%s\"\n", topic_name,
rd_kafka_err2str(rd_kafka_last_error()));
free(ser_buf);
}
rd_kafka_topic_destroy(rkt);
skip_row_2:
if (sh->encoding == AKS_ENCODING_AVRO)
avro_value_decref(&avro_row);
skip_row_1:
rd_kafka_topic_destroy(rkt);
skip_row_0:
free(topic_name);
}

Expand Down