diff --git a/runner/sidecar/source/kafka/kafka.go b/runner/sidecar/source/kafka/kafka.go index ca0c89d4..2cc35f26 100644 --- a/runner/sidecar/source/kafka/kafka.go +++ b/runner/sidecar/source/kafka/kafka.go @@ -17,6 +17,7 @@ import ( "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/go-logr/logr" "github.com/opentracing/opentracing-go" + "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" ) @@ -28,7 +29,6 @@ type kafkaSource struct { consumer *kafka.Consumer topic string wg *sync.WaitGroup - mu sync.Mutex // for channels channels map[int32]chan *kafka.Message process source.Process totalLag int64 @@ -73,7 +73,6 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, mntr monit sourceURN: sourceURN, consumer: consumer, topic: x.Topic, - mu: sync.Mutex{}, channels: map[int32]chan *kafka.Message{}, // partition -> messages wg: &sync.WaitGroup{}, process: process, @@ -109,20 +108,17 @@ func (s *kafkaSource) processMessage(ctx context.Context, msg *kafka.Message) er func (s *kafkaSource) assignedPartition(ctx context.Context, partition int32) { logger := s.logger.WithValues("partition", partition) - s.mu.Lock() - defer s.mu.Unlock() if _, ok := s.channels[partition]; !ok { logger.Info("assigned partition") s.channels[partition] = make(chan *kafka.Message, 256) - go wait.JitterUntilWithContext(ctx, func(ctx context.Context) { + go func() { + defer runtime.HandleCrash() s.consumePartition(ctx, partition) - }, 3*time.Second, 1.2, true) + }() } } func (s *kafkaSource) revokedPartition(partition int32) { - s.mu.Lock() - defer s.mu.Unlock() if _, ok := s.channels[partition]; ok { s.logger.Info("revoked partition", "partition", partition) close(s.channels[partition]) @@ -174,12 +170,10 @@ func (s *kafkaSource) startPollLoop(ctx context.Context) { func (s *kafkaSource) Close() error { s.logger.Info("closing partition channels") - s.mu.Lock() for key, ch := range s.channels { delete(s.channels, key) close(ch) } - s.mu.Unlock() s.logger.Info("waiting for partition consumers to finish") s.wg.Wait() s.logger.Info("closing consumer")