Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #416 from Parsely/feature/group_membership_api
Browse files Browse the repository at this point in the history
 Group Membership API
  • Loading branch information
emmettbutler committed Mar 21, 2016
2 parents 412466e + fa18706 commit 6035a91
Show file tree
Hide file tree
Showing 19 changed files with 1,466 additions and 187 deletions.
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ sudo: false
cache:
directories:
- $HOME/.cache/pip
- $HOME/kafka-bin
- $HOME/.ccache
python:
- "2.7"
Expand All @@ -18,7 +17,7 @@ env:
- KAFKA_BIN="$HOME/kafka-bin"
matrix:
- KAFKA_VERSION=0.8.2.2
- KAFKA_VERSION=0.9.0.0
- KAFKA_VERSION=0.9.0.1

addons:
apt:
Expand Down
158 changes: 84 additions & 74 deletions pykafka/balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from .exceptions import KafkaException, PartitionOwnedError, ConsumerStoppedException
from .handlers import GEventHandler
from .simpleconsumer import SimpleConsumer
from .utils.compat import range, get_bytes, itervalues, iteritems
from .utils.compat import range, get_bytes, itervalues, iteritems, get_string
try:
from . import rdkafka
except ImportError:
Expand Down Expand Up @@ -210,6 +210,7 @@ def __init__(self,
self._zookeeper_connection_timeout_ms = zookeeper_connection_timeout_ms
self._reset_offset_on_start = reset_offset_on_start
self._post_rebalance_callback = post_rebalance_callback
self._generation_id = -1
self._running = False
self._worker_exception = None
self._worker_trace_logged = False
Expand All @@ -223,10 +224,10 @@ def __init__(self,

self._rebalancing_lock = cluster.handler.Lock()
self._consumer = None
self._consumer_id = "{hostname}:{uuid}".format(
self._consumer_id = get_bytes("{hostname}:{uuid}".format(
hostname=socket.gethostname(),
uuid=uuid4()
)
))
self._setting_watches = True

self._topic_path = '/consumers/{group}/owners/{topic}'.format(
Expand Down Expand Up @@ -289,7 +290,7 @@ def held_offsets(self):
return self._consumer.held_offsets

def start(self):
"""Open connections and join a cluster."""
"""Open connections and join a consumer group."""
try:
if self._zookeeper is None:
self._setup_zookeeper(self._zookeeper_connect,
Expand Down Expand Up @@ -347,7 +348,29 @@ def _setup_zookeeper(self, zookeeper_connect, timeout):

def _setup_internal_consumer(self, partitions=None, start=True):
"""Instantiate an internal SimpleConsumer instance"""
self._consumer = self._get_internal_consumer(partitions=partitions, start=start)
if partitions is None:
partitions = []
# Only re-create internal consumer if something changed.
if partitions != self._partitions:
cns = self._get_internal_consumer(partitions=list(partitions), start=start)
if self._post_rebalance_callback is not None:
old_offsets = (self._consumer.held_offsets
if self._consumer else dict())
new_offsets = cns.held_offsets
try:
reset_offsets = self._post_rebalance_callback(
self, old_offsets, new_offsets)
except Exception:
log.exception("post rebalance callback threw an exception")
self._worker_exception = sys.exc_info()
return False

if reset_offsets:
cns.reset_offsets(partition_offsets=[
(cns.partitions[id_], offset) for
(id_, offset) in iteritems(reset_offsets)])
self._consumer = cns
return True

def _get_internal_consumer(self, partitions=None, start=True):
"""Instantiate a SimpleConsumer for internal use.
Expand Down Expand Up @@ -385,10 +408,12 @@ def _get_internal_consumer(self, partitions=None, start=True):
auto_offset_reset=self._auto_offset_reset,
reset_offset_on_start=reset_offset_on_start,
auto_start=start,
compacted_topic=self._is_compacted_topic
compacted_topic=self._is_compacted_topic,
generation_id=self._generation_id,
consumer_id=self._consumer_id
)

def _decide_partitions(self, participants):
def _decide_partitions(self, participants, consumer_id=None):
"""Decide which partitions belong to this consumer.
Uses the consumer rebalancing algorithm described here
Expand All @@ -402,6 +427,8 @@ def _decide_partitions(self, participants):
:param participants: Sorted list of ids of all other consumers in this
consumer group.
:type participants: Iterable of `bytes`
:param consumer_id: The ID of the consumer for which to generate a partition
assignment. Defaults to `self._consumer_id`
"""
# Freeze and sort partitions so we always have the same results
p_to_str = lambda p: '-'.join([str(p.topic.name), str(p.leader.id), str(p.id)])
Expand All @@ -410,7 +437,7 @@ def _decide_partitions(self, participants):

# get start point, # of partitions, and remainder
participants = sorted(participants) # just make sure it's sorted.
idx = participants.index(self._consumer_id)
idx = participants.index(consumer_id or self._consumer_id)
parts_per_consumer = len(all_parts) // len(participants)
remainder_ppc = len(all_parts) % len(participants)

Expand All @@ -420,15 +447,16 @@ def _decide_partitions(self, participants):
# assign partitions from i*N to (i+1)*N - 1 to consumer Ci
new_partitions = itertools.islice(all_parts, start, start + num_parts)
new_partitions = set(new_partitions)
log.info('Balancing %i participants for %i partitions.\nOwning %i partitions.',
len(participants), len(all_parts), len(new_partitions))
log.info('%s: Balancing %i participants for %i partitions. Owning %i partitions.',
self._consumer_id, len(participants), len(all_parts),
len(new_partitions))
log.debug('My partitions: %s', [p_to_str(p) for p in new_partitions])
return new_partitions

def _get_participants(self):
"""Use zookeeper to get the other consumers of this topic.
:return: A sorted list of the ids of the other consumers of this
:return: A sorted list of the ids of other consumers of this
consumer's topic
"""
try:
Expand All @@ -443,9 +471,9 @@ def _get_participants(self):
try:
topic, stat = self._zookeeper.get("%s/%s" % (self._consumer_id_path, id_))
if topic == self._topic.name:
participants.append(id_)
participants.append(get_bytes(id_))
except NoNodeException:
pass # disappeared between ``get_children`` and ``get``
pass # node disappeared between ``get_children`` and ``get``
participants = sorted(participants)
return participants

Expand Down Expand Up @@ -520,11 +548,50 @@ def _path_self(self):
"""Path where this consumer should be registered in zookeeper"""
return '{path}/{id_}'.format(
path=self._consumer_id_path,
id_=self._consumer_id
# get_string is necessary to avoid writing literal "b'" to zookeeper
id_=get_string(self._consumer_id)
)

def _update_member_assignment(self):
"""Decide and assign new partitions for this consumer"""
for i in range(self._rebalance_max_retries):
try:
# If retrying, be sure to make sure the
# partition allocation is correct.
participants = self._get_participants()
if self._consumer_id not in participants:
# situation that only occurs if our zk session expired
self._add_self()
participants.append(self._consumer_id)

new_partitions = self._decide_partitions(participants)
if not new_partitions:
log.warning("No partitions assigned to consumer %s",
self._consumer_id)

# Update zk with any changes:
# Note that we explicitly fetch our set of held partitions
# from zk, rather than assuming it will be identical to
# `self.partitions`. This covers the (rare) situation
# where due to an interrupted connection our zk session
# has expired, in which case we'd hold zero partitions on
# zk, but `self._partitions` may be outdated and non-empty
current_zk_parts = self._get_held_partitions()
self._remove_partitions(current_zk_parts - new_partitions)
self._add_partitions(new_partitions - current_zk_parts)
if self._setup_internal_consumer(new_partitions):
log.info('Rebalancing Complete.')
break
except PartitionOwnedError as ex:
if i == self._rebalance_max_retries - 1:
log.warning('Failed to acquire partition %s after %d retries.',
ex.partition, i)
raise
log.info('Unable to acquire partition %s. Retrying', ex.partition)
self._cluster.handler.sleep(i * (self._rebalance_backoff_ms / 1000))

def _rebalance(self):
"""Claim partitions for this consumer.
"""Start the rebalancing process for this consumer
This method is called whenever a zookeeper watch is triggered.
"""
Expand All @@ -536,65 +603,8 @@ def _rebalance(self):
if not self._running:
raise ConsumerStoppedException
log.info('Rebalancing consumer "%s" for topic "%s".' % (
self._consumer_id, self._topic.name)
)

for i in range(self._rebalance_max_retries):
try:
# If retrying, be sure to make sure the
# partition allocation is correct.
participants = self._get_participants()
if self._consumer_id not in participants:
# situation that only occurs if our zk session expired
self._add_self()
participants.append(self._consumer_id)

new_partitions = self._decide_partitions(participants)
if not new_partitions:
log.warning("No partitions assigned to consumer %s",
self._consumer_id)

# Update zk with any changes:
# Note that we explicitly fetch our set of held partitions
# from zk, rather than assuming it will be identical to
# `self.partitions`. This covers the (rare) situation
# where due to an interrupted connection our zk session
# has expired, in which case we'd hold zero partitions on
# zk, but `self._partitions` may be outdated and non-empty
current_zk_parts = self._get_held_partitions()
self._remove_partitions(current_zk_parts - new_partitions)
self._add_partitions(new_partitions - current_zk_parts)

# Only re-create internal consumer if something changed.
if new_partitions != self._partitions:
cns = self._get_internal_consumer(list(new_partitions))
if self._post_rebalance_callback is not None:
old_offsets = (self._consumer.held_offsets
if self._consumer else dict())
new_offsets = cns.held_offsets
try:
reset_offsets = self._post_rebalance_callback(
self, old_offsets, new_offsets)
except Exception as ex:
log.exception("post rebalance callback threw an exception")
self._worker_exception = sys.exc_info()
break

if reset_offsets:
cns.reset_offsets(partition_offsets=[
(cns.partitions[id_], offset) for
(id_, offset) in iteritems(reset_offsets)])
self._consumer = cns

log.info('Rebalancing Complete.')
break
except PartitionOwnedError as ex:
if i == self._rebalance_max_retries - 1:
log.warning('Failed to acquire partition %s after %d retries.',
ex.partition, i)
raise
log.info('Unable to acquire partition %s. Retrying', ex.partition)
self._cluster.handler.sleep(i * (self._rebalance_backoff_ms / 1000))
self._consumer_id, self._topic.name))
self._update_member_assignment()

def _path_from_partition(self, p):
"""Given a partition, return its path in zookeeper.
Expand Down
Loading

0 comments on commit 6035a91

Please sign in to comment.