@@ -1016,7 +1016,10 @@ static int fw_process_message_mode_entry(
10161016 msgpack_object options ;
10171017 int result ;
10181018 msgpack_object chunk ;
1019+ struct flb_in_fw_config * ctx ;
10191020
1021+ /* Save ctx pointer before any operation that might delete the connection */
1022+ ctx = conn -> ctx ;
10201023 metadata = NULL ;
10211024
10221025 if (chunk_id != -1 || metadata_id != -1 ) {
@@ -1030,39 +1033,47 @@ static int fw_process_message_mode_entry(
10301033 result = flb_log_event_decoder_decode_timestamp (ts , & timestamp );
10311034
10321035 if (result == FLB_EVENT_ENCODER_SUCCESS ) {
1033- result = flb_log_event_encoder_begin_record (conn -> ctx -> log_encoder );
1036+ result = flb_log_event_encoder_begin_record (ctx -> log_encoder );
10341037 }
10351038
10361039 if (result == FLB_EVENT_ENCODER_SUCCESS ) {
1037- result = flb_log_event_encoder_set_timestamp (conn -> ctx -> log_encoder ,
1040+ result = flb_log_event_encoder_set_timestamp (ctx -> log_encoder ,
10381041 & timestamp );
10391042 }
10401043
10411044 if (result == FLB_EVENT_ENCODER_SUCCESS ) {
10421045 if (metadata != NULL ) {
10431046 result = flb_log_event_encoder_set_metadata_from_msgpack_object (
1044- conn -> ctx -> log_encoder ,
1047+ ctx -> log_encoder ,
10451048 metadata );
10461049 }
10471050 }
10481051
10491052 if (result == FLB_EVENT_ENCODER_SUCCESS ) {
10501053 result = flb_log_event_encoder_set_body_from_msgpack_object (
1051- conn -> ctx -> log_encoder ,
1054+ ctx -> log_encoder ,
10521055 body );
10531056 }
10541057
10551058 if (result == FLB_EVENT_ENCODER_SUCCESS ) {
1056- result = flb_log_event_encoder_commit_record (conn -> ctx -> log_encoder );
1059+ result = flb_log_event_encoder_commit_record (ctx -> log_encoder );
10571060 }
10581061
10591062 if (result == FLB_EVENT_ENCODER_SUCCESS ) {
10601063 flb_input_log_append (in , tag , tag_len ,
1061- conn -> ctx -> log_encoder -> output_buffer ,
1062- conn -> ctx -> log_encoder -> output_length );
1064+ ctx -> log_encoder -> output_buffer ,
1065+ ctx -> log_encoder -> output_length );
10631066 }
10641067
1065- flb_log_event_encoder_reset (conn -> ctx -> log_encoder );
1068+ flb_log_event_encoder_reset (ctx -> log_encoder );
1069+
1070+ /* Check if plugin was paused during log append (connection may have been deleted) */
1071+ pthread_mutex_lock (& ctx -> conn_mutex );
1072+ if (ctx -> is_paused ) {
1073+ pthread_mutex_unlock (& ctx -> conn_mutex );
1074+ return -1 ;
1075+ }
1076+ pthread_mutex_unlock (& ctx -> conn_mutex );
10661077
10671078 if (chunk_id != -1 ) {
10681079 chunk = options .via .map .ptr [chunk_id ].val ;
@@ -1468,6 +1479,14 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
14681479 out_tag , flb_sds_len (out_tag ),
14691480 & entry .via .array .ptr [index ],
14701481 chunk_id );
1482+
1483+ /* Check if connection was deleted during processing */
1484+ if (conn -> being_deleted ) {
1485+ msgpack_unpacked_destroy (& result );
1486+ msgpack_unpacker_free (unp );
1487+ flb_sds_destroy (out_tag );
1488+ return 0 ;
1489+ }
14711490 }
14721491
14731492 if (chunk_id != -1 ) {
@@ -1641,6 +1660,16 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
16411660
16421661 goto cleanup_decompress ;
16431662 }
1663+
1664+ /* Check if connection was deleted during append */
1665+ if (conn -> being_deleted ) {
1666+ flb_free (decomp_buf );
1667+ msgpack_unpacked_destroy (& result );
1668+ msgpack_unpacker_free (unp );
1669+ flb_sds_destroy (out_tag );
1670+ flb_decompression_context_destroy (conn -> d_ctx );
1671+ return 0 ;
1672+ }
16441673 }
16451674 } while (decomp_len > 0 );
16461675
0 commit comments