From 82780ee670273876deeed5aabcc4e3febcc703e9 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Tue, 8 Mar 2016 13:41:41 -0800 Subject: [PATCH] deduplicate error handling --- pykafka/managedbalancedconsumer.py | 71 +++++++++++++++--------------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/pykafka/managedbalancedconsumer.py b/pykafka/managedbalancedconsumer.py index 84d28ea2b..8266849d2 100644 --- a/pykafka/managedbalancedconsumer.py +++ b/pykafka/managedbalancedconsumer.py @@ -25,10 +25,8 @@ from .balancedconsumer import BalancedConsumer from .common import OffsetType -from .exceptions import (IllegalGeneration, RebalanceInProgress, UnknownMemberId, - NotCoordinatorForGroup, GroupCoordinatorNotAvailable, - GroupAuthorizationFailed, ERROR_CODES, GroupLoadInProgress, - InconsistentGroupProtocol, InvalidSessionTimeout) +from .exceptions import (IllegalGeneration, RebalanceInProgress, NotCoordinatorForGroup, + GroupCoordinatorNotAvailable, ERROR_CODES, GroupLoadInProgress) from .protocol import MemberAssignment from .utils.compat import iterkeys @@ -198,6 +196,7 @@ def __init__(self, self._consumer_id = b'' self._worker_trace_logged = False self._worker_exception = None + self._default_error_handlers = self._build_default_error_handlers() if auto_start is True: self.start() @@ -259,16 +258,7 @@ def _send_heartbeat(self): if res.error_code == 0: return log.info("Error code %d encountered on heartbeat." % res.error_code) - if res.error_code in (IllegalGeneration.ERROR_CODE, - RebalanceInProgress.ERROR_CODE, - UnknownMemberId.ERROR_CODE): - pass - elif res.error_code in (GroupCoordinatorNotAvailable.ERROR_CODE, - NotCoordinatorForGroup.ERROR_CODE): - self._group_coordinator = self._cluster.get_group_coordinator( - self._consumer_group) - elif res.error_code == GroupAuthorizationFailed.ERROR_CODE: - raise GroupAuthorizationFailed() + self._default_error_handlers[res.error_code]() log.debug("_rebalance called from _send_heartbeat") self._rebalance() @@ -306,6 +296,36 @@ def _update_member_assignment(self): self._cluster.handler.sleep(i * (self._rebalance_backoff_ms / 1000)) self._raise_worker_exceptions() + def _build_default_error_handlers(self): + self = weakref.proxy(self) + + def _handle_GroupCoordinatorNotAvailable(): + self._group_coordinator = self._cluster.get_group_coordinator( + self._consumer_group) + + def _handle_NotCoordinatorForGroup(): + self._group_coordinator = self._cluster.get_group_coordinator( + self._consumer_group) + + return { + GroupCoordinatorNotAvailable.ERROR_CODE: _handle_GroupCoordinatorNotAvailable, + NotCoordinatorForGroup.ERROR_CODE: _handle_NotCoordinatorForGroup, + GroupLoadInProgress.ERROR_CODE: None, + RebalanceInProgress.ERROR_CODE: None, + IllegalGeneration.ERROR_CODE: None + } + + def _handle_error(self, error_code): + """Call the appropriate handler function for the given error code + + :param error_code: The error code returned from a Group Membership API request + :type error_code: int + """ + if error_code not in self._default_error_handlers: + raise ERROR_CODES[error_code]() + if self._default_error_handlers[error_code] is not None: + self._default_error_handlers[error_code]() + def _join_group(self): """Send a JoinGroupRequest. @@ -322,17 +342,7 @@ def _join_group(self): " generation '%s'", join_result.error_code, self._generation_id) if i == self._cluster._max_connection_retries - 1: raise ERROR_CODES[join_result.error_code] - if join_result.error_code in (GroupLoadInProgress.ERROR_CODE,): - pass - elif join_result.error_code in (GroupCoordinatorNotAvailable.ERROR_CODE, - NotCoordinatorForGroup.ERROR_CODE): - self._group_coordinator = self._cluster.get_group_coordinator( - self._consumer_group) - elif join_result.error_code in (InconsistentGroupProtocol.ERROR_CODE, - UnknownMemberId.ERROR_CODE, - InvalidSessionTimeout.ERROR_CODE, - GroupAuthorizationFailed.ERROR_CODE): - raise ERROR_CODES[join_result.error_code] + self._handle_error(join_result.error_code) self._cluster.handler.sleep(i * 2) self._generation_id = join_result.generation_id self._consumer_id = join_result.member_id @@ -361,15 +371,6 @@ def _sync_group(self, group_assignments): sync_result.error_code) if i == self._cluster._max_connection_retries - 1: raise ERROR_CODES[sync_result.error_code] - if sync_result.error_code in (IllegalGeneration.ERROR_CODE, - GroupCoordinatorNotAvailable.ERROR_CODE, - RebalanceInProgress.ERROR_CODE): - pass - elif sync_result.error_code in (NotCoordinatorForGroup.ERROR_CODE,): - self._group_coordinator = self._cluster.get_group_coordinator( - self._consumer_group) - elif sync_result.error_code in (UnknownMemberId.ERROR_CODE, - GroupAuthorizationFailed.ERROR_CODE): - raise ERROR_CODES[sync_result.error_code] + self._default_error_handlers[sync_result.error_code]() self._cluster.handler.sleep(i * 2) return sync_result.member_assignment.partition_assignment