Skip to content

Commit

Permalink
fix: try to fix memory leak in Kafka sink
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec committed Oct 7, 2021
1 parent 1bbd8d9 commit 0b2455f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 38 deletions.
73 changes: 38 additions & 35 deletions runner/sidecar/sink/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,35 +57,33 @@ func New(ctx context.Context, sinkName string, secretInterface corev1.SecretInte
}
}, 3*time.Second, 1.2, true)

if x.Async {
// track async success and errors
kafkaMessagesProducedSuccess := promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "sinks",
Name: "kafka_produced_successes",
Help: "Number of messages successfully produced to Kafka",
}, []string{"sinkName"})
kafkaMessagesProducedErr := promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "sinks",
Name: "kafka_produce_errors",
Help: "Number of errors while producing messages to Kafka",
}, []string{"sinkName"})
// track async success and errors
kafkaMessagesProducedSuccess := promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "sinks",
Name: "kafka_produced_successes",
Help: "Number of messages successfully produced to Kafka",
}, []string{"sinkName"})
kafkaMessagesProducedErr := promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "sinks",
Name: "kafka_produce_errors",
Help: "Number of errors while producing messages to Kafka",
}, []string{"sinkName"})

// read from Success Channel
go wait.JitterUntilWithContext(ctx, func(context.Context) {
logger.Info("starting producer event consuming loop")
for e := range producer.Events() {
switch ev := e.(type) {
case *kafka.Message:
if err := ev.TopicPartition.Error; err != nil {
logger.Error(err, "Async to Kafka failed", "topic", x.Topic)
kafkaMessagesProducedErr.WithLabelValues(sinkName).Inc()
} else {
kafkaMessagesProducedSuccess.WithLabelValues(sinkName).Inc()
}
go wait.JitterUntilWithContext(ctx, func(context.Context) {
logger.Info("starting producer event consuming loop")
for e := range producer.Events() {
switch ev := e.(type) {
case *kafka.Message:
if err := ev.TopicPartition.Error; err != nil {
logger.Error(err, "Async to Kafka failed", "topic", x.Topic)
kafkaMessagesProducedErr.WithLabelValues(sinkName).Inc()
} else {
kafkaMessagesProducedSuccess.WithLabelValues(sinkName).Inc()
}
}
}, time.Second, 1.2, true)
}
}
}, time.Second, 1.2, true)

return &kafkaSink{sinkName, producer, x.Topic, x.Async}, nil
}

Expand All @@ -96,21 +94,22 @@ func (h *kafkaSink) Sink(ctx context.Context, msg []byte) error {
if err != nil {
return err
}
message := kafka.Message{
var deliveryChan chan kafka.Event
if !h.async {
deliveryChan = make(chan kafka.Event)
defer close(deliveryChan)
}
if err := h.producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &h.topic, Partition: kafka.PartitionAny},
Headers: []kafka.Header{
{Key: "source", Value: []byte(m.Source)},
{Key: "id", Value: []byte(m.ID)},
},
Value: msg,
}, deliveryChan); err != nil {
return err
}
if h.async {
return h.producer.Produce(&message, nil)

This comment has been minimized.

Copy link
@alexec

alexec Oct 7, 2021

Author Contributor

@sarabala1979 @whynowy one downside of Confluent Kafka Go is that it uses GCO and can have memory leaks. I believe this line could have been the memory leak as the message was scoped to the function, but passed through the CGO interface and not garbage collected. I might be wrong mind you.

} else {
deliveryChan := make(chan kafka.Event, 1)
if err := h.producer.Produce(&message, deliveryChan); err != nil {
return err
}
if deliveryChan != nil {
select {
case <-ctx.Done():
return fmt.Errorf("failed to get delivery: %w", ctx.Err())
Expand All @@ -123,11 +122,15 @@ func (h *kafkaSink) Sink(ctx context.Context, msg []byte) error {
}
}
}
return nil
}

func (h *kafkaSink) Close() error {
logger.Info("flushing producer")
_ = h.producer.Flush(15 * 1000)
unflushedMessages := h.producer.Flush(15 * 1000)
if unflushedMessages > 0 {
logger.Error(fmt.Errorf("unflushed messagesd %d", unflushedMessages), "failed to flush producer", "sinkName", h.sinkName)
}
logger.Info("closing producer")
h.producer.Close()
logger.Info("producer closed")
Expand Down
6 changes: 3 additions & 3 deletions test/kafka-stress/test-results.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
{
"TestKafkaAsyncSinkStress/.tps": 1300,
"TestKafkaSinkStress/.tps": 400,
"TestKafkaAsyncSinkStress/.tps": 2300,
"TestKafkaSinkStress/.tps": 600,
"TestKafkaSinkStress/N=10,messageSize=100.tps": 200,
"TestKafkaSinkStress/N=10,messageSize=1000.tps": 150,
"TestKafkaSinkStress/N=50000.tps": 750,
"TestKafkaSinkStress/async=true.tps": 400,
"TestKafkaSinkStress/messageSize=1000.tps": 300,
"TestKafkaSinkStress/replicas=2.tps": 400,
"TestKafkaSourceStress/.tps": 1550,
"TestKafkaSourceStress/.tps": 2200,
"TestKafkaSourceStress/N=10,messageSize=100.tps": 450,
"TestKafkaSourceStress/N=10,messageSize=1000.tps": 650,
"TestKafkaSourceStress/N=50000.tps": 3150,
Expand Down

0 comments on commit 0b2455f

Please sign in to comment.