Skip to content

Commit

Permalink
fix: Add mutex. Fixes #588 (#589)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec authored Apr 20, 2022
1 parent 039d6b9 commit 99e0df6
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions runner/sidecar/source/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type kafkaSource struct {
consumer *kafka.Consumer
topic string
wg *sync.WaitGroup
channels map[int32]chan *kafka.Message
channels *sync.Map // map[int32]chan *kafka.Message
process source.Process
totalLag int64
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, cluster, n
sourceURN: sourceURN,
consumer: consumer,
topic: x.Topic,
channels: map[int32]chan *kafka.Message{}, // partition -> messages
channels: new(sync.Map), // partition -> messages
wg: &sync.WaitGroup{},
process: process,
totalLag: pendingUnavailable,
Expand Down Expand Up @@ -113,9 +113,9 @@ func (s *kafkaSource) processMessage(ctx context.Context, msg *kafka.Message) er

func (s *kafkaSource) assignedPartition(ctx context.Context, partition int32) {
logger := s.logger.WithValues("partition", partition)
if _, ok := s.channels[partition]; !ok {
if _, ok := s.channels.Load(partition); !ok {
logger.Info("assigned partition")
s.channels[partition] = make(chan *kafka.Message, 256)
s.channels.Store(partition, make(chan *kafka.Message, 256))
go wait.JitterUntilWithContext(ctx, func(ctx context.Context) {
s.consumePartition(ctx, partition)
}, 3*time.Second, 1.2, true)
Expand All @@ -142,7 +142,8 @@ func (s *kafkaSource) startPollLoop(ctx context.Context) {
s.logger.Info("recovered from panic while queuing message", "recover", fmt.Sprint(r))
}
}()
s.channels[e.TopicPartition.Partition] <- e
v, _ := s.channels.Load(e.TopicPartition.Partition)
v.(chan *kafka.Message) <- e
}()
case *kafka.Stats:
// https://github.com/edenhill/librdkafka/wiki/Consumer-lag-monitoring
Expand All @@ -166,9 +167,10 @@ func (s *kafkaSource) startPollLoop(ctx context.Context) {

func (s *kafkaSource) Close() error {
s.logger.Info("closing partition channels")
for _, ch := range s.channels {
close(ch)
}
s.channels.Range(func(_, v interface{}) bool {
close(v.(chan *kafka.Message))
return true
})
s.logger.Info("waiting for partition consumers to finish")
s.wg.Wait()
s.logger.Info("closing consumer")
Expand Down Expand Up @@ -218,10 +220,11 @@ func (s *kafkaSource) consumePartition(ctx context.Context, partition int32) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
v, _ := s.channels.Load(partition)
select {
case <-ticker.C:
commitLastUncommitted()
case msg, ok := <-s.channels[partition]:
case msg, ok := <-v.(chan *kafka.Message):
if !ok {
return
}
Expand Down

0 comments on commit 99e0df6

Please sign in to comment.