Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #555 from Parsely/enhancement/extra_consumers
Browse files Browse the repository at this point in the history
extra consumers
  • Loading branch information
emmettbutler committed May 25, 2016
2 parents 71497ac + 4f7eeb1 commit 57d2ab7
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 14 deletions.
10 changes: 1 addition & 9 deletions pykafka/balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,15 +535,7 @@ def _set_watches(self):
self._setting_watches = False

def _add_self(self):
"""Register this consumer in zookeeper.
This method ensures that the number of participants is at most the
number of partitions.
"""
participants = self._get_participants()
if len(self._topic.partitions) <= len(participants):
raise KafkaException("Cannot add consumer: more consumers than partitions")

"""Register this consumer in zookeeper."""
self._zookeeper.create(
self._path_self, self._topic.name, ephemeral=True, makepath=True)

Expand Down
5 changes: 2 additions & 3 deletions pykafka/simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,8 @@ def _raise_worker_exceptions(self):
raise ex

def _update(self):
"""Update the consumer and cluster after an ERROR_CODE
"""
# only allow one thread to be updating the producer at a time
"""Update the consumer and cluster after an ERROR_CODE"""
# only allow one thread to be updating the consumer at a time
with self._update_lock:
self._cluster.update()
self._setup_partitions_by_leader()
Expand Down
3 changes: 1 addition & 2 deletions pykafka/utils/error_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,9 @@ def build_parts_by_error(response, partitions_by_id):
parts_by_error = defaultdict(list)
for topic_name in response.topics.keys():
for partition_id, pres in iteritems(response.topics[topic_name]):
owned_partition = None
if partitions_by_id is not None and partition_id in partitions_by_id:
owned_partition = partitions_by_id[partition_id]
parts_by_error[pres.err].append((owned_partition, pres))
parts_by_error[pres.err].append((owned_partition, pres))
return parts_by_error


Expand Down
49 changes: 49 additions & 0 deletions tests/pykafka/test_balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,55 @@ def get_balanced_consumer(self, consumer_group, **kwargs):
**kwargs
)

def test_extra_consumer(self):
"""Ensure proper operation of "extra" consumers in a group
An "extra" consumer is the N+1th member of a consumer group consuming a topic
of N partitions, and any consumer beyond the N+1th.
"""
group = b"test_extra_consumer"
extras = 1

def verify_extras(consumers, extras_count):
messages = [c.consume() for c in consumers]
successes = [a for a in messages if a is not None]
nones = [a for a in messages if a is None]
attempts = 0
while len(nones) != extras_count and attempts < 5:
messages = [c.consume() for c in consumers]
successes = [a for a in messages if a is not None]
nones = [a for a in messages if a is None]
attempts += 1
self.assertEqual(len(nones), extras_count)
self.assertEqual(len(successes), self.n_partitions)

try:
consumers = [self.get_balanced_consumer(group, consumer_timeout_ms=5000)
for i in range(self.n_partitions + extras)]
verify_extras(consumers, extras)

# when one consumer stops, the extra should pick up its partitions
removed = consumers[:extras]
for consumer in removed:
consumer.stop()
consumers = [a for a in consumers if a not in removed]
self.wait_for_rebalancing(*consumers)
self.assertEqual(len(consumers), self.n_partitions)
verify_extras(consumers, 0)

# added "extra" consumers should idle
for i in range(extras):
consumers.append(self.get_balanced_consumer(group,
consumer_timeout_ms=5000))
self.wait_for_rebalancing(*consumers)
verify_extras(consumers, extras)
finally:
for consumer in consumers:
try:
consumer.stop()
except:
pass

def test_rebalance_callbacks(self):
def on_rebalance(cns, old_partition_offsets, new_partition_offsets):
self.assertTrue(len(new_partition_offsets) > 0)
Expand Down

0 comments on commit 57d2ab7

Please sign in to comment.