Skip to content

Commit

Permalink
fix: remove lock, and fix loop. Fixes #404 (#407)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec authored Oct 4, 2021
1 parent 7213ea5 commit a46256f
Showing 1 changed file with 4 additions and 10 deletions.
14 changes: 4 additions & 10 deletions runner/sidecar/source/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/go-logr/logr"
"github.com/opentracing/opentracing-go"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
)
Expand All @@ -28,7 +29,6 @@ type kafkaSource struct {
consumer *kafka.Consumer
topic string
wg *sync.WaitGroup
mu sync.Mutex // for channels
channels map[int32]chan *kafka.Message
process source.Process
totalLag int64
Expand Down Expand Up @@ -73,7 +73,6 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, mntr monit
sourceURN: sourceURN,
consumer: consumer,
topic: x.Topic,
mu: sync.Mutex{},
channels: map[int32]chan *kafka.Message{}, // partition -> messages
wg: &sync.WaitGroup{},
process: process,
Expand Down Expand Up @@ -109,20 +108,17 @@ func (s *kafkaSource) processMessage(ctx context.Context, msg *kafka.Message) er

func (s *kafkaSource) assignedPartition(ctx context.Context, partition int32) {
logger := s.logger.WithValues("partition", partition)
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.channels[partition]; !ok {
logger.Info("assigned partition")
s.channels[partition] = make(chan *kafka.Message, 256)
go wait.JitterUntilWithContext(ctx, func(ctx context.Context) {
go func() {
defer runtime.HandleCrash()
s.consumePartition(ctx, partition)
}, 3*time.Second, 1.2, true)
}()
}
}

func (s *kafkaSource) revokedPartition(partition int32) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.channels[partition]; ok {
s.logger.Info("revoked partition", "partition", partition)
close(s.channels[partition])
Expand Down Expand Up @@ -174,12 +170,10 @@ func (s *kafkaSource) startPollLoop(ctx context.Context) {

func (s *kafkaSource) Close() error {
s.logger.Info("closing partition channels")
s.mu.Lock()
for key, ch := range s.channels {
delete(s.channels, key)
close(ch)
}
s.mu.Unlock()
s.logger.Info("waiting for partition consumers to finish")
s.wg.Wait()
s.logger.Info("closing consumer")
Expand Down

0 comments on commit a46256f

Please sign in to comment.