diff --git a/runner/sidecar/source/kafka/kafka.go b/runner/sidecar/source/kafka/kafka.go index 29b15652..269d7857 100644 --- a/runner/sidecar/source/kafka/kafka.go +++ b/runner/sidecar/source/kafka/kafka.go @@ -27,7 +27,7 @@ type kafkaSource struct { consumer *kafka.Consumer topic string wg *sync.WaitGroup - channels map[int32]chan *kafka.Message + channels *sync.Map // map[int32]chan *kafka.Message process source.Process totalLag int64 } @@ -78,7 +78,7 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, cluster, n sourceURN: sourceURN, consumer: consumer, topic: x.Topic, - channels: map[int32]chan *kafka.Message{}, // partition -> messages + channels: new(sync.Map), // partition -> messages wg: &sync.WaitGroup{}, process: process, totalLag: pendingUnavailable, @@ -113,9 +113,9 @@ func (s *kafkaSource) processMessage(ctx context.Context, msg *kafka.Message) er func (s *kafkaSource) assignedPartition(ctx context.Context, partition int32) { logger := s.logger.WithValues("partition", partition) - if _, ok := s.channels[partition]; !ok { + if _, ok := s.channels.Load(partition); !ok { logger.Info("assigned partition") - s.channels[partition] = make(chan *kafka.Message, 256) + s.channels.Store(partition, make(chan *kafka.Message, 256)) go wait.JitterUntilWithContext(ctx, func(ctx context.Context) { s.consumePartition(ctx, partition) }, 3*time.Second, 1.2, true) @@ -142,7 +142,8 @@ func (s *kafkaSource) startPollLoop(ctx context.Context) { s.logger.Info("recovered from panic while queuing message", "recover", fmt.Sprint(r)) } }() - s.channels[e.TopicPartition.Partition] <- e + v, _ := s.channels.Load(e.TopicPartition.Partition) + v.(chan *kafka.Message) <- e }() case *kafka.Stats: // https://github.com/edenhill/librdkafka/wiki/Consumer-lag-monitoring @@ -166,9 +167,10 @@ func (s *kafkaSource) startPollLoop(ctx context.Context) { func (s *kafkaSource) Close() error { s.logger.Info("closing partition channels") - for _, ch := range s.channels { - close(ch) - } + s.channels.Range(func(_, v interface{}) bool { + close(v.(chan *kafka.Message)) + return true + }) s.logger.Info("waiting for partition consumers to finish") s.wg.Wait() s.logger.Info("closing consumer") @@ -218,10 +220,11 @@ func (s *kafkaSource) consumePartition(ctx context.Context, partition int32) { ticker := time.NewTicker(time.Second) defer ticker.Stop() for { + v, _ := s.channels.Load(partition) select { case <-ticker.C: commitLastUncommitted() - case msg, ok := <-s.channels[partition]: + case msg, ok := <-v.(chan *kafka.Message): if !ok { return }