Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
deduplicate error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
emmettbutler committed Mar 8, 2016
1 parent cf2ff7a commit 82780ee
Showing 1 changed file with 36 additions and 35 deletions.
71 changes: 36 additions & 35 deletions pykafka/managedbalancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@

from .balancedconsumer import BalancedConsumer
from .common import OffsetType
from .exceptions import (IllegalGeneration, RebalanceInProgress, UnknownMemberId,
NotCoordinatorForGroup, GroupCoordinatorNotAvailable,
GroupAuthorizationFailed, ERROR_CODES, GroupLoadInProgress,
InconsistentGroupProtocol, InvalidSessionTimeout)
from .exceptions import (IllegalGeneration, RebalanceInProgress, NotCoordinatorForGroup,
GroupCoordinatorNotAvailable, ERROR_CODES, GroupLoadInProgress)
from .protocol import MemberAssignment
from .utils.compat import iterkeys

Expand Down Expand Up @@ -198,6 +196,7 @@ def __init__(self,
self._consumer_id = b''
self._worker_trace_logged = False
self._worker_exception = None
self._default_error_handlers = self._build_default_error_handlers()

if auto_start is True:
self.start()
Expand Down Expand Up @@ -259,16 +258,7 @@ def _send_heartbeat(self):
if res.error_code == 0:
return
log.info("Error code %d encountered on heartbeat." % res.error_code)
if res.error_code in (IllegalGeneration.ERROR_CODE,
RebalanceInProgress.ERROR_CODE,
UnknownMemberId.ERROR_CODE):
pass
elif res.error_code in (GroupCoordinatorNotAvailable.ERROR_CODE,
NotCoordinatorForGroup.ERROR_CODE):
self._group_coordinator = self._cluster.get_group_coordinator(
self._consumer_group)
elif res.error_code == GroupAuthorizationFailed.ERROR_CODE:
raise GroupAuthorizationFailed()
self._default_error_handlers[res.error_code]()
log.debug("_rebalance called from _send_heartbeat")
self._rebalance()

Expand Down Expand Up @@ -306,6 +296,36 @@ def _update_member_assignment(self):
self._cluster.handler.sleep(i * (self._rebalance_backoff_ms / 1000))
self._raise_worker_exceptions()

def _build_default_error_handlers(self):
self = weakref.proxy(self)

def _handle_GroupCoordinatorNotAvailable():
self._group_coordinator = self._cluster.get_group_coordinator(
self._consumer_group)

def _handle_NotCoordinatorForGroup():
self._group_coordinator = self._cluster.get_group_coordinator(
self._consumer_group)

return {
GroupCoordinatorNotAvailable.ERROR_CODE: _handle_GroupCoordinatorNotAvailable,
NotCoordinatorForGroup.ERROR_CODE: _handle_NotCoordinatorForGroup,
GroupLoadInProgress.ERROR_CODE: None,
RebalanceInProgress.ERROR_CODE: None,
IllegalGeneration.ERROR_CODE: None
}

def _handle_error(self, error_code):
"""Call the appropriate handler function for the given error code
:param error_code: The error code returned from a Group Membership API request
:type error_code: int
"""
if error_code not in self._default_error_handlers:
raise ERROR_CODES[error_code]()
if self._default_error_handlers[error_code] is not None:
self._default_error_handlers[error_code]()

def _join_group(self):
"""Send a JoinGroupRequest.
Expand All @@ -322,17 +342,7 @@ def _join_group(self):
" generation '%s'", join_result.error_code, self._generation_id)
if i == self._cluster._max_connection_retries - 1:
raise ERROR_CODES[join_result.error_code]
if join_result.error_code in (GroupLoadInProgress.ERROR_CODE,):
pass
elif join_result.error_code in (GroupCoordinatorNotAvailable.ERROR_CODE,
NotCoordinatorForGroup.ERROR_CODE):
self._group_coordinator = self._cluster.get_group_coordinator(
self._consumer_group)
elif join_result.error_code in (InconsistentGroupProtocol.ERROR_CODE,
UnknownMemberId.ERROR_CODE,
InvalidSessionTimeout.ERROR_CODE,
GroupAuthorizationFailed.ERROR_CODE):
raise ERROR_CODES[join_result.error_code]
self._handle_error(join_result.error_code)
self._cluster.handler.sleep(i * 2)
self._generation_id = join_result.generation_id
self._consumer_id = join_result.member_id
Expand Down Expand Up @@ -361,15 +371,6 @@ def _sync_group(self, group_assignments):
sync_result.error_code)
if i == self._cluster._max_connection_retries - 1:
raise ERROR_CODES[sync_result.error_code]
if sync_result.error_code in (IllegalGeneration.ERROR_CODE,
GroupCoordinatorNotAvailable.ERROR_CODE,
RebalanceInProgress.ERROR_CODE):
pass
elif sync_result.error_code in (NotCoordinatorForGroup.ERROR_CODE,):
self._group_coordinator = self._cluster.get_group_coordinator(
self._consumer_group)
elif sync_result.error_code in (UnknownMemberId.ERROR_CODE,
GroupAuthorizationFailed.ERROR_CODE):
raise ERROR_CODES[sync_result.error_code]
self._default_error_handlers[sync_result.error_code]()
self._cluster.handler.sleep(i * 2)
return sync_result.member_assignment.partition_assignment

0 comments on commit 82780ee

Please sign in to comment.