diff --git a/consumergroup/consumer_group.go b/consumergroup/consumer_group.go index 10add92..0ddc91a 100644 --- a/consumergroup/consumer_group.go +++ b/consumergroup/consumer_group.go @@ -264,10 +264,11 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) { cg.Logf("Currently registered consumers: %d\n", len(cg.consumers)) stopper := make(chan struct{}) + claimPartitionFailed := make(chan struct{}) for _, topic := range topics { cg.wg.Add(1) - go cg.topicConsumer(topic, cg.messages, cg.errors, stopper) + go cg.topicConsumer(topic, cg.messages, cg.errors, stopper, claimPartitionFailed) } select { @@ -291,11 +292,16 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) { cg.Logf("Triggering rebalance due to consumer list change\n") close(stopper) cg.wg.Wait() + + case <-claimPartitionFailed: + cg.Logf("Triggering rebalance due to claim partition failed\n") + close(stopper) + cg.wg.Wait() } } } -func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.ConsumerMessage, errors chan<- *sarama.ConsumerError, stopper <-chan struct{}) { +func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.ConsumerMessage, errors chan<- *sarama.ConsumerError, stopper <-chan struct{}, claimPartitionFailed chan<- struct{}) { defer cg.wg.Done() select { @@ -338,7 +344,7 @@ func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.Con for _, pid := range myPartitions { wg.Add(1) - go cg.partitionConsumer(topic, pid.ID, messages, errors, &wg, stopper) + go cg.partitionConsumer(topic, pid.ID, messages, errors, &wg, stopper, claimPartitionFailed) } wg.Wait() @@ -370,7 +376,7 @@ func (cg *ConsumerGroup) consumePartition(topic string, partition int32, nextOff } // Consumes a partition -func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messages chan<- *sarama.ConsumerMessage, errors chan<- *sarama.ConsumerError, wg *sync.WaitGroup, stopper <-chan struct{}) { +func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messages chan<- *sarama.ConsumerMessage, errors chan<- *sarama.ConsumerError, wg *sync.WaitGroup, stopper <-chan struct{}, claimPartitionFailed chan<- struct{}) { defer wg.Done() select { @@ -386,6 +392,10 @@ func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messag time.Sleep(1 * time.Second) } else { cg.Logf("%s/%d :: FAILED to claim the partition: %s\n", topic, partition, err) + select { + case claimPartitionFailed <- struct{}{}: + default: + } return } }