From fa187060808999caea1739b96861dba6bcd141f8 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Tue, 8 Mar 2016 13:49:06 -0800 Subject: [PATCH] clean up managedbalancedconsumer --- pykafka/managedbalancedconsumer.py | 39 +++++++++++++++--------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/pykafka/managedbalancedconsumer.py b/pykafka/managedbalancedconsumer.py index 8266849d2..e75c250b8 100644 --- a/pykafka/managedbalancedconsumer.py +++ b/pykafka/managedbalancedconsumer.py @@ -188,7 +188,7 @@ def __init__(self, self._generation_id = -1 self._rebalancing_lock = cluster.handler.Lock() # ManagedBalancedConsumers in the same process cannot share connections. - # This connection hash is passed to self.Broker calls that use the group + # This connection hash is passed to Broker calls that use the group # membership API self._connection_id = uuid.uuid4() self._consumer = None @@ -210,7 +210,18 @@ def fetcher(): try: if not self._running: break - self._send_heartbeat() + + log.info("Sending heartbeat from consumer '%s'", self._consumer_id) + res = self._group_coordinator.heartbeat(self._connection_id, + self._consumer_group, + self._generation_id, + self._consumer_id) + if res.error_code != 0: + log.info("Error code %d encountered on heartbeat.", + res.error_code) + self._handle_error(res.error_code) + self._rebalance() + self._cluster.handler.sleep(self._heartbeat_interval_ms / 1000) except ReferenceError: break @@ -224,12 +235,14 @@ def fetcher(): fetcher, name="pykafka.ManagedBalancedConsumer.heartbeats") def start(self): - """Start this consumer. Must be called before consume() if `auto_start=False`.""" + """Start this consumer. + + Must be called before consume() if `auto_start=False`. + """ try: self._running = True self._group_coordinator = self._cluster.get_group_coordinator( self._consumer_group) - log.debug("_rebalance called from _start") self._rebalance() self._setup_heartbeat_worker() except Exception: @@ -245,22 +258,9 @@ def stop(self): if self._consumer is not None: self._consumer.stop() if self._group_coordinator is not None: - self._group_coordinator.leave_group(self._connection_id, self._consumer_group, - self._consumer_id) - - def _send_heartbeat(self): - """Send a heartbeat request to the group coordinator and react to the response""" - log.info("Sending heartbeat from consumer '%s'", self._consumer_id) - res = self._group_coordinator.heartbeat(self._connection_id, + self._group_coordinator.leave_group(self._connection_id, self._consumer_group, - self._generation_id, self._consumer_id) - if res.error_code == 0: - return - log.info("Error code %d encountered on heartbeat." % res.error_code) - self._default_error_handlers[res.error_code]() - log.debug("_rebalance called from _send_heartbeat") - self._rebalance() def _update_member_assignment(self): """Join a managed consumer group and start consuming assigned partitions @@ -297,6 +297,7 @@ def _update_member_assignment(self): self._raise_worker_exceptions() def _build_default_error_handlers(self): + """Set up default responses to common error codes""" self = weakref.proxy(self) def _handle_GroupCoordinatorNotAvailable(): @@ -371,6 +372,6 @@ def _sync_group(self, group_assignments): sync_result.error_code) if i == self._cluster._max_connection_retries - 1: raise ERROR_CODES[sync_result.error_code] - self._default_error_handlers[sync_result.error_code]() + self._handle_error(sync_result.error_code) self._cluster.handler.sleep(i * 2) return sync_result.member_assignment.partition_assignment