Skip to content

Commit

Permalink
Kafkaconsumer: handle multiple rebalances correctly (#63)
Browse files Browse the repository at this point in the history
* snmp: optimize memory usage on startup until cache is full

* kafkaconsumer: handle multiple rebalances correctly
  • Loading branch information
debugloop authored Jan 12, 2023
1 parent 31c3d5d commit 1f6a92e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 21 deletions.
33 changes: 16 additions & 17 deletions segments/input/kafkaconsumer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ type Handler struct {
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (h *Handler) Setup(sarama.ConsumerGroupSession) error {
func (h *Handler) Setup(session sarama.ConsumerGroupSession) error {
log.Println("[info] KafkaConsumer: Received new partition set to claim:", session.Claims()) // TODO: print those
// reopen flows channel
h.flows = make(chan *pb.EnrichedFlow)
// Mark the consumer as ready
close(h.ready)
return nil
Expand All @@ -29,24 +32,20 @@ func (h *Handler) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

func (h *Handler) Close() {
h.cancel()
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
session.MarkMessage(message, "")
flowMsg := new(pb.EnrichedFlow)
if err := proto.Unmarshal(message.Value, flowMsg); err == nil {
h.flows <- flowMsg
} else {
log.Printf("[warning] KafkaConsumer: Error decoding flow, this might be due to the use of Goflow custom fields. Original error:\n %s", err)
for {
select {
case message := <-claim.Messages():
session.MarkMessage(message, "")
flowMsg := new(pb.EnrichedFlow)
if err := proto.Unmarshal(message.Value, flowMsg); err == nil {
h.flows <- flowMsg
} else {
log.Printf("[warning] KafkaConsumer: Error decoding flow, this might be due to the use of Goflow custom fields. Original error:\n %s", err)
}
case <-session.Context().Done():
return nil
}
}
return nil
}
9 changes: 5 additions & 4 deletions segments/input/kafkaconsumer/kafkaconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func (segment KafkaConsumer) New(config map[string]string) segments.Segment {
}

// set some unconfigurable defaults
newsegment.saramaConfig.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
// newsegment.saramaConfig.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
newsegment.saramaConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategySticky}

// TODO: parse and set kafka version
newsegment.saramaConfig.Version, err = sarama.ParseKafkaVersion("2.4.0")
Expand Down Expand Up @@ -191,14 +192,14 @@ func (segment *KafkaConsumer) Run(wg *sync.WaitGroup) {
select {
case msg, ok := <-handler.flows:
if !ok {
// This will occur when the handler calls its Cleanup method
handlerCancel() // This is in case the channel was closed somehow else, which shouldn't happen
return
// This will occur during a rebalance when the handler calls its Cleanup method
continue
}
segment.Out <- msg
case msg, ok := <-segment.In:
if !ok {
handlerCancel() // Trigger handler shutdown and cleanup
return
} else {
segment.Out <- msg
}
Expand Down

0 comments on commit 1f6a92e

Please sign in to comment.