Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
remove limit on one consumer per partition per group. fixes #527
Browse files Browse the repository at this point in the history
  • Loading branch information
emmettbutler committed May 24, 2016
1 parent 71497ac commit a3f9adf
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 12 deletions.
10 changes: 1 addition & 9 deletions pykafka/balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,15 +535,7 @@ def _set_watches(self):
self._setting_watches = False

def _add_self(self):
"""Register this consumer in zookeeper.
This method ensures that the number of participants is at most the
number of partitions.
"""
participants = self._get_participants()
if len(self._topic.partitions) <= len(participants):
raise KafkaException("Cannot add consumer: more consumers than partitions")

"""Register this consumer in zookeeper."""
self._zookeeper.create(
self._path_self, self._topic.name, ephemeral=True, makepath=True)

Expand Down
5 changes: 2 additions & 3 deletions pykafka/simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,8 @@ def _raise_worker_exceptions(self):
raise ex

def _update(self):
"""Update the consumer and cluster after an ERROR_CODE
"""
# only allow one thread to be updating the producer at a time
"""Update the consumer and cluster after an ERROR_CODE"""
# only allow one thread to be updating the consumer at a time
with self._update_lock:
self._cluster.update()
self._setup_partitions_by_leader()
Expand Down
36 changes: 36 additions & 0 deletions tests/pykafka/test_balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,42 @@ def get_balanced_consumer(self, consumer_group, **kwargs):
**kwargs
)

def test_extra_consumer(self):
"""Ensure proper operation of "extra" consumers in a group
An "extra" consumer is the N+1th member of a consumer group consuming a topic
of N partitions, and any consumer beyond the N+1th.
"""
group = "test_extra_consumer"
extras = 1

def verify_extras(consumers, extras_count):
messages = [c.consume() for c in consumers]
nones = [a for a in messages if a is None]
successes = [a for a in messages if a is not None]
self.assertEqual(len(nones), extras_count)
self.assertEqual(len(successes), self.n_partitions)

consumers = [self.get_balanced_consumer(group,
consumer_timeout_ms=5000)
for i in range(self.n_partitions + extras)]
verify_extras(consumers, extras)

# when one consumer stops, the extra should pick up its partitions
for i in range(extras):
removed = consumers[i]
removed.stop()
consumers = [a for a in consumers if a is not removed]
self.wait_for_rebalancing(*consumers)
self.assertEqual(len(consumers), self.n_partitions)
verify_extras(consumers, 0)

# added extra consumers should idle
for i in range(extras):
consumers.append(self.get_balanced_consumer(group, consumer_timeout_ms=5000))
self.wait_for_rebalancing(*consumers)
verify_extras(consumers, extras)

def test_rebalance_callbacks(self):
def on_rebalance(cns, old_partition_offsets, new_partition_offsets):
self.assertTrue(len(new_partition_offsets) > 0)
Expand Down

0 comments on commit a3f9adf

Please sign in to comment.