From e8361cfbb0281a7f3194e3ccc755a77f3605351c Mon Sep 17 00:00:00 2001 From: Tom Tucker Date: Mon, 28 Aug 2023 11:24:28 -0500 Subject: [PATCH] Fix error path memory leaks in avro_kafka --- ldms/src/store/avro_kafka/store_avro_kafka.c | 29 ++++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/ldms/src/store/avro_kafka/store_avro_kafka.c b/ldms/src/store/avro_kafka/store_avro_kafka.c index 37216570a..d10a18a9a 100644 --- a/ldms/src/store/avro_kafka/store_avro_kafka.c +++ b/ldms/src/store/avro_kafka/store_avro_kafka.c @@ -41,6 +41,7 @@ static ovis_log_t aks_log = NULL; #define LOG_ERROR(FMT, ...) LOG(OVIS_LERROR, FMT, ##__VA_ARGS__) #define LOG_INFO(FMT, ...) LOG(OVIS_LINFO, FMT, ##__VA_ARGS__) #define LOG_WARN(FMT, ...) LOG(OVIS_LWARNING, FMT, ##__VA_ARGS__) +#define LOG_DEBUG(FMT, ...) LOG(OVIS_LDEBUG, FMT, ##__VA_ARGS__) typedef struct aks_handle_s { @@ -440,6 +441,8 @@ static aks_handle_t __handle_new(ldmsd_strgp_t strgp) sh->encoding = g_serdes_encoding; sh->topic_fmt = strdup(g_topic_fmt); + if (!sh->topic_fmt) + goto err_1; sh->rd_conf = rd_kafka_conf_dup(g_rd_conf); if (!sh->rd_conf) @@ -485,6 +488,7 @@ static aks_handle_t __handle_new(ldmsd_strgp_t strgp) err_2: rd_kafka_conf_destroy(sh->rd_conf); err_1: + free(sh->topic_fmt); free(sh); err_0: return NULL; @@ -940,14 +944,18 @@ commit_rows(ldmsd_strgp_t strgp, ldms_set_t set, ldmsd_row_list_t row_list, serdes_schema_t *serdes_schema; char *topic_name = get_topic_name(sh, set, row); - LOG_INFO("topic name %s\n", topic_name); + if (!topic_name) { + LOG_ERROR("get_topic_name failed for schema '%s'\n", row->schema_name); + continue; + } + LOG_DEBUG("topic name %s\n", topic_name); rkt = rd_kafka_topic_new(sh->rd, topic_name, NULL); if (!rkt) { LOG_ERROR("rd_kafka_topic_new(\"%s\") failed, " "errno: %d\n", - row->schema_name, errno); - continue; + topic_name, errno); + goto skip_row_0; } switch (sh->encoding) { case AKS_ENCODING_AVRO: @@ -955,21 +963,20 @@ commit_rows(ldmsd_strgp_t strgp, ldms_set_t set, ldmsd_row_list_t row_list, if (!serdes_schema) { LOG_ERROR("A serdes schema for '%s' could not be " "constructed.\n", row->schema_name); - continue; + goto skip_row_1; } /* Encode ldmsd_row_s as an Avro value */ rc = serialize_row_as_avro(sh->serdes, serdes_schema, row, &avro_row); if (rc) { LOG_ERROR("Failed to format row as Avro value, error: %d\n", rc); - continue; + goto skip_row_1; } /* Serialize an Avro value into a buffer */ if (serdes_schema_serialize_avro(serdes_schema, &avro_row, &ser_buf, &ser_buf_size, errstr, sizeof(errstr))) { LOG_ERROR("Failed to serialize Avro row: '%s'\n", errstr); - avro_value_decref(&avro_row); - continue; + goto skip_row_2; } break; case AKS_ENCODING_JSON: @@ -977,7 +984,7 @@ commit_rows(ldmsd_strgp_t strgp, ldms_set_t set, ldmsd_row_list_t row_list, rc = ldmsd_row_to_json_object(row, (char **)&ser_buf, &ser_size); if (rc) { LOG_ERROR("Failed to serialize row as JSON object, error: %d", rc); - continue; + goto skip_row_1; } ser_buf_size = ser_size; break; @@ -993,10 +1000,14 @@ commit_rows(ldmsd_strgp_t strgp, ldms_set_t set, ldmsd_row_list_t row_list, LOG_ERROR("rd_kafka_produce(\"%s\") failed, " "\"%s\"\n", topic_name, rd_kafka_err2str(rd_kafka_last_error())); + free(ser_buf); } - rd_kafka_topic_destroy(rkt); + skip_row_2: if (sh->encoding == AKS_ENCODING_AVRO) avro_value_decref(&avro_row); + skip_row_1: + rd_kafka_topic_destroy(rkt); + skip_row_0: free(topic_name); }