diff --git a/pykafka/balancedconsumer.py b/pykafka/balancedconsumer.py index 3fce1fc54..b96938e28 100644 --- a/pykafka/balancedconsumer.py +++ b/pykafka/balancedconsumer.py @@ -535,15 +535,7 @@ def _set_watches(self): self._setting_watches = False def _add_self(self): - """Register this consumer in zookeeper. - - This method ensures that the number of participants is at most the - number of partitions. - """ - participants = self._get_participants() - if len(self._topic.partitions) <= len(participants): - raise KafkaException("Cannot add consumer: more consumers than partitions") - + """Register this consumer in zookeeper.""" self._zookeeper.create( self._path_self, self._topic.name, ephemeral=True, makepath=True) diff --git a/pykafka/simpleconsumer.py b/pykafka/simpleconsumer.py index 9f787312e..d16f9141d 100644 --- a/pykafka/simpleconsumer.py +++ b/pykafka/simpleconsumer.py @@ -227,9 +227,8 @@ def _raise_worker_exceptions(self): raise ex def _update(self): - """Update the consumer and cluster after an ERROR_CODE - """ - # only allow one thread to be updating the producer at a time + """Update the consumer and cluster after an ERROR_CODE""" + # only allow one thread to be updating the consumer at a time with self._update_lock: self._cluster.update() self._setup_partitions_by_leader() diff --git a/tests/pykafka/test_balancedconsumer.py b/tests/pykafka/test_balancedconsumer.py index 0ab92b94e..4b4fd3dc3 100644 --- a/tests/pykafka/test_balancedconsumer.py +++ b/tests/pykafka/test_balancedconsumer.py @@ -170,6 +170,42 @@ def get_balanced_consumer(self, consumer_group, **kwargs): **kwargs ) + def test_extra_consumer(self): + """Ensure proper operation of "extra" consumers in a group + + An "extra" consumer is the N+1th member of a consumer group consuming a topic + of N partitions, and any consumer beyond the N+1th. + """ + group = "test_extra_consumer" + extras = 1 + + def verify_extras(consumers, extras_count): + messages = [c.consume() for c in consumers] + nones = [a for a in messages if a is None] + successes = [a for a in messages if a is not None] + self.assertEqual(len(nones), extras_count) + self.assertEqual(len(successes), self.n_partitions) + + consumers = [self.get_balanced_consumer(group, + consumer_timeout_ms=5000) + for i in range(self.n_partitions + extras)] + verify_extras(consumers, extras) + + # when one consumer stops, the extra should pick up its partitions + for i in range(extras): + removed = consumers[i] + removed.stop() + consumers = [a for a in consumers if a is not removed] + self.wait_for_rebalancing(*consumers) + self.assertEqual(len(consumers), self.n_partitions) + verify_extras(consumers, 0) + + # added extra consumers should idle + for i in range(extras): + consumers.append(self.get_balanced_consumer(group, consumer_timeout_ms=5000)) + self.wait_for_rebalancing(*consumers) + verify_extras(consumers, extras) + def test_rebalance_callbacks(self): def on_rebalance(cns, old_partition_offsets, new_partition_offsets): self.assertTrue(len(new_partition_offsets) > 0)