From 1f6a92e6a2a37e0dda32574263c873b08ff65ed6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20N=C3=A4gele?= Date: Thu, 12 Jan 2023 10:39:57 +0100 Subject: [PATCH] Kafkaconsumer: handle multiple rebalances correctly (#63) * snmp: optimize memory usage on startup until cache is full * kafkaconsumer: handle multiple rebalances correctly --- segments/input/kafkaconsumer/handler.go | 33 +++++++++---------- segments/input/kafkaconsumer/kafkaconsumer.go | 9 ++--- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/segments/input/kafkaconsumer/handler.go b/segments/input/kafkaconsumer/handler.go index 8a89d44..cc26337 100644 --- a/segments/input/kafkaconsumer/handler.go +++ b/segments/input/kafkaconsumer/handler.go @@ -17,7 +17,10 @@ type Handler struct { } // Setup is run at the beginning of a new session, before ConsumeClaim -func (h *Handler) Setup(sarama.ConsumerGroupSession) error { +func (h *Handler) Setup(session sarama.ConsumerGroupSession) error { + log.Println("[info] KafkaConsumer: Received new partition set to claim:", session.Claims()) // TODO: print those + // reopen flows channel + h.flows = make(chan *pb.EnrichedFlow) // Mark the consumer as ready close(h.ready) return nil @@ -29,24 +32,20 @@ func (h *Handler) Cleanup(sarama.ConsumerGroupSession) error { return nil } -func (h *Handler) Close() { - h.cancel() -} - // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - // NOTE: - // Do not move the code below to a goroutine. - // The `ConsumeClaim` itself is called within a goroutine, see: - // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 - for message := range claim.Messages() { - session.MarkMessage(message, "") - flowMsg := new(pb.EnrichedFlow) - if err := proto.Unmarshal(message.Value, flowMsg); err == nil { - h.flows <- flowMsg - } else { - log.Printf("[warning] KafkaConsumer: Error decoding flow, this might be due to the use of Goflow custom fields. Original error:\n %s", err) + for { + select { + case message := <-claim.Messages(): + session.MarkMessage(message, "") + flowMsg := new(pb.EnrichedFlow) + if err := proto.Unmarshal(message.Value, flowMsg); err == nil { + h.flows <- flowMsg + } else { + log.Printf("[warning] KafkaConsumer: Error decoding flow, this might be due to the use of Goflow custom fields. Original error:\n %s", err) + } + case <-session.Context().Done(): + return nil } } - return nil } diff --git a/segments/input/kafkaconsumer/kafkaconsumer.go b/segments/input/kafkaconsumer/kafkaconsumer.go index 30fca17..0543a02 100644 --- a/segments/input/kafkaconsumer/kafkaconsumer.go +++ b/segments/input/kafkaconsumer/kafkaconsumer.go @@ -49,7 +49,8 @@ func (segment KafkaConsumer) New(config map[string]string) segments.Segment { } // set some unconfigurable defaults - newsegment.saramaConfig.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky + // newsegment.saramaConfig.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky + newsegment.saramaConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategySticky} // TODO: parse and set kafka version newsegment.saramaConfig.Version, err = sarama.ParseKafkaVersion("2.4.0") @@ -191,14 +192,14 @@ func (segment *KafkaConsumer) Run(wg *sync.WaitGroup) { select { case msg, ok := <-handler.flows: if !ok { - // This will occur when the handler calls its Cleanup method - handlerCancel() // This is in case the channel was closed somehow else, which shouldn't happen - return + // This will occur during a rebalance when the handler calls its Cleanup method + continue } segment.Out <- msg case msg, ok := <-segment.In: if !ok { handlerCancel() // Trigger handler shutdown and cleanup + return } else { segment.Out <- msg }