diff --git a/consumergroup/consumer_group.go b/consumergroup/consumer_group.go old mode 100644 new mode 100755 index a4e0d5c..dba378c --- a/consumergroup/consumer_group.go +++ b/consumergroup/consumer_group.go @@ -333,7 +333,18 @@ func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.Con return } - dividedPartitions := dividePartitionsBetweenConsumers(cg.consumers, partitionLeaders) + topicConsumers := []*kazoo.ConsumergroupInstance{} + for _, consumer := range cg.consumers{ + reg, _ := consumer.Registration() + for sub, _ := range reg.Subscription{ + if sub == topic{ + topicConsumers = append(topicConsumers, consumer) + break + } + } + } + + dividedPartitions := dividePartitionsBetweenConsumers(topicConsumers, partitionLeaders) myPartitions := dividedPartitions[cg.instance.ID] cg.Logf("%s :: Claiming %d of %d partitions", topic, len(myPartitions), len(partitionLeaders))