Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
clean up managedbalancedconsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
emmettbutler committed Mar 8, 2016
1 parent 82780ee commit fa18706
Showing 1 changed file with 20 additions and 19 deletions.
39 changes: 20 additions & 19 deletions pykafka/managedbalancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def __init__(self,
self._generation_id = -1
self._rebalancing_lock = cluster.handler.Lock()
# ManagedBalancedConsumers in the same process cannot share connections.
# This connection hash is passed to self.Broker calls that use the group
# This connection hash is passed to Broker calls that use the group
# membership API
self._connection_id = uuid.uuid4()
self._consumer = None
Expand All @@ -210,7 +210,18 @@ def fetcher():
try:
if not self._running:
break
self._send_heartbeat()

log.info("Sending heartbeat from consumer '%s'", self._consumer_id)
res = self._group_coordinator.heartbeat(self._connection_id,
self._consumer_group,
self._generation_id,
self._consumer_id)
if res.error_code != 0:
log.info("Error code %d encountered on heartbeat.",
res.error_code)
self._handle_error(res.error_code)
self._rebalance()

self._cluster.handler.sleep(self._heartbeat_interval_ms / 1000)
except ReferenceError:
break
Expand All @@ -224,12 +235,14 @@ def fetcher():
fetcher, name="pykafka.ManagedBalancedConsumer.heartbeats")

def start(self):
"""Start this consumer. Must be called before consume() if `auto_start=False`."""
"""Start this consumer.
Must be called before consume() if `auto_start=False`.
"""
try:
self._running = True
self._group_coordinator = self._cluster.get_group_coordinator(
self._consumer_group)
log.debug("_rebalance called from _start")
self._rebalance()
self._setup_heartbeat_worker()
except Exception:
Expand All @@ -245,22 +258,9 @@ def stop(self):
if self._consumer is not None:
self._consumer.stop()
if self._group_coordinator is not None:
self._group_coordinator.leave_group(self._connection_id, self._consumer_group,
self._consumer_id)

def _send_heartbeat(self):
"""Send a heartbeat request to the group coordinator and react to the response"""
log.info("Sending heartbeat from consumer '%s'", self._consumer_id)
res = self._group_coordinator.heartbeat(self._connection_id,
self._group_coordinator.leave_group(self._connection_id,
self._consumer_group,
self._generation_id,
self._consumer_id)
if res.error_code == 0:
return
log.info("Error code %d encountered on heartbeat." % res.error_code)
self._default_error_handlers[res.error_code]()
log.debug("_rebalance called from _send_heartbeat")
self._rebalance()

def _update_member_assignment(self):
"""Join a managed consumer group and start consuming assigned partitions
Expand Down Expand Up @@ -297,6 +297,7 @@ def _update_member_assignment(self):
self._raise_worker_exceptions()

def _build_default_error_handlers(self):
"""Set up default responses to common error codes"""
self = weakref.proxy(self)

def _handle_GroupCoordinatorNotAvailable():
Expand Down Expand Up @@ -371,6 +372,6 @@ def _sync_group(self, group_assignments):
sync_result.error_code)
if i == self._cluster._max_connection_retries - 1:
raise ERROR_CODES[sync_result.error_code]
self._default_error_handlers[sync_result.error_code]()
self._handle_error(sync_result.error_code)
self._cluster.handler.sleep(i * 2)
return sync_result.member_assignment.partition_assignment

0 comments on commit fa18706

Please sign in to comment.