Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

extra consumers #555

Merged
merged 7 commits into from
May 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions pykafka/balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,15 +535,7 @@ def _set_watches(self):
self._setting_watches = False

def _add_self(self):
"""Register this consumer in zookeeper.

This method ensures that the number of participants is at most the
number of partitions.
"""
participants = self._get_participants()
if len(self._topic.partitions) <= len(participants):
raise KafkaException("Cannot add consumer: more consumers than partitions")

"""Register this consumer in zookeeper."""
self._zookeeper.create(
self._path_self, self._topic.name, ephemeral=True, makepath=True)

Expand Down
5 changes: 2 additions & 3 deletions pykafka/simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,8 @@ def _raise_worker_exceptions(self):
raise ex

def _update(self):
"""Update the consumer and cluster after an ERROR_CODE
"""
# only allow one thread to be updating the producer at a time
"""Update the consumer and cluster after an ERROR_CODE"""
# only allow one thread to be updating the consumer at a time
with self._update_lock:
self._cluster.update()
self._setup_partitions_by_leader()
Expand Down
3 changes: 1 addition & 2 deletions pykafka/utils/error_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,9 @@ def build_parts_by_error(response, partitions_by_id):
parts_by_error = defaultdict(list)
for topic_name in response.topics.keys():
for partition_id, pres in iteritems(response.topics[topic_name]):
owned_partition = None
if partitions_by_id is not None and partition_id in partitions_by_id:
owned_partition = partitions_by_id[partition_id]
parts_by_error[pres.err].append((owned_partition, pres))
parts_by_error[pres.err].append((owned_partition, pres))
return parts_by_error


Expand Down
49 changes: 49 additions & 0 deletions tests/pykafka/test_balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,55 @@ def get_balanced_consumer(self, consumer_group, **kwargs):
**kwargs
)

def test_extra_consumer(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @vitalyli this is what the test ended up looking like.

"""Ensure proper operation of "extra" consumers in a group

An "extra" consumer is the N+1th member of a consumer group consuming a topic
of N partitions, and any consumer beyond the N+1th.
"""
group = b"test_extra_consumer"
extras = 1

def verify_extras(consumers, extras_count):
messages = [c.consume() for c in consumers]
successes = [a for a in messages if a is not None]
nones = [a for a in messages if a is None]
attempts = 0
while len(nones) != extras_count and attempts < 5:
messages = [c.consume() for c in consumers]
successes = [a for a in messages if a is not None]
nones = [a for a in messages if a is None]
attempts += 1
self.assertEqual(len(nones), extras_count)
self.assertEqual(len(successes), self.n_partitions)

try:
consumers = [self.get_balanced_consumer(group, consumer_timeout_ms=5000)
for i in range(self.n_partitions + extras)]
verify_extras(consumers, extras)

# when one consumer stops, the extra should pick up its partitions
removed = consumers[:extras]
for consumer in removed:
consumer.stop()
consumers = [a for a in consumers if a not in removed]
self.wait_for_rebalancing(*consumers)
self.assertEqual(len(consumers), self.n_partitions)
verify_extras(consumers, 0)

# added "extra" consumers should idle
for i in range(extras):
consumers.append(self.get_balanced_consumer(group,
consumer_timeout_ms=5000))
self.wait_for_rebalancing(*consumers)
verify_extras(consumers, extras)
finally:
for consumer in consumers:
try:
consumer.stop()
except:
pass

def test_rebalance_callbacks(self):
def on_rebalance(cns, old_partition_offsets, new_partition_offsets):
self.assertTrue(len(new_partition_offsets) > 0)
Expand Down