From 67ebde1ed4755f0e7f21ddea88fe2556a7f2f9f7 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 24 Aug 2017 12:16:45 -0700 Subject: [PATCH 1/2] use LeaderNotFoundError instead of LeaderNotAvailable to disambiguate the source of broker-side exceptions --- pykafka/cluster.py | 10 +++++----- pykafka/exceptions.py | 17 +++++++++++++++-- pykafka/partition.py | 8 ++++---- pykafka/topic.py | 4 ++-- 4 files changed, 26 insertions(+), 13 deletions(-) diff --git a/pykafka/cluster.py b/pykafka/cluster.py index 0b71f37cb..a9905808c 100644 --- a/pykafka/cluster.py +++ b/pykafka/cluster.py @@ -31,7 +31,7 @@ GroupCoordinatorNotAvailable, NoBrokersAvailableError, SocketDisconnectedError, - LeaderNotAvailable) + LeaderNotFoundError) from .protocol import GroupCoordinatorRequest, GroupCoordinatorResponse from .topic import Topic from .utils.compat import iteritems, itervalues, range @@ -66,8 +66,8 @@ def __getitem__(self, key): meta = self._cluster()._get_metadata([key]) try: topic = Topic(self._cluster(), meta.topics[key]) - except LeaderNotAvailable: - log.warning("LeaderNotAvailable encountered during Topic creation") + except LeaderNotFoundError: + log.warning("LeaderNotFoundError encountered during Topic creation") if i == self._cluster()._max_connection_retries - 1: raise else: @@ -455,8 +455,8 @@ def update(self): self._update_brokers(metadata.brokers) try: self._topics._update_topics(metadata.topics) - except LeaderNotAvailable: - log.warning("LeaderNotAvailable encountered. This may be " + except LeaderNotFoundError: + log.warning("LeaderNotFoundError encountered. This may be " "because one or more partitions have no available replicas.") if i == self._max_connection_retries - 1: raise diff --git a/pykafka/exceptions.py b/pykafka/exceptions.py index c4d583916..cb914fd5b 100644 --- a/pykafka/exceptions.py +++ b/pykafka/exceptions.py @@ -29,6 +29,13 @@ class NoBrokersAvailableError(KafkaException): pass +class LeaderNotFoundError(KafkaException): + """Indicates that the leader broker for a given partition was not found during + an update in response to a MetadataRequest + """ + pass + + class SocketDisconnectedError(KafkaException): """Indicates that the socket connecting this client to a kafka broker has become disconnected @@ -74,8 +81,14 @@ def __init__(self, partition, *args, **kwargs): self.partition = partition -# Protocol Client Exceptions -# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes +""" +Protocol Client Exceptions +https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes + +NOTE: Don't raise these from client code unless it's in direct response to an error +code from the broker. When that's not the case, the exception raised should instead be +a subclass of KafkaException. +""" class ProtocolClientError(KafkaException): diff --git a/pykafka/partition.py b/pykafka/partition.py index b22d34e1b..c0959e465 100644 --- a/pykafka/partition.py +++ b/pykafka/partition.py @@ -22,7 +22,7 @@ import weakref from .common import OffsetType -from .exceptions import LeaderNotAvailable +from .exceptions import LeaderNotFoundError from .protocol import PartitionOffsetRequest log = logging.getLogger(__name__) @@ -153,6 +153,6 @@ def update(self, brokers, metadata): log.info('Updating in sync replicas list for %s', self) self._isr = [brokers[b] for b in metadata.isr] except KeyError: - raise LeaderNotAvailable("Replica for partition %s not available. This is " - "probably because none of its replicas are " - "available.", self.id) + raise LeaderNotFoundError("Replica for partition %s not available. This is " + "probably because none of its replicas are " + "available.", self.id) diff --git a/pykafka/topic.py b/pykafka/topic.py index b6d256b2d..f898beb49 100644 --- a/pykafka/topic.py +++ b/pykafka/topic.py @@ -22,7 +22,7 @@ from .balancedconsumer import BalancedConsumer from .common import OffsetType -from .exceptions import LeaderNotAvailable +from .exceptions import LeaderNotFoundError from .managedbalancedconsumer import ManagedBalancedConsumer from .partition import Partition from .producer import Producer @@ -164,7 +164,7 @@ def update(self, metadata): log.info("Adding %d partitions", len(p_metas)) for id_, meta in iteritems(p_metas): if meta.leader not in brokers: - raise LeaderNotAvailable() + raise LeaderNotFoundError() if meta.id not in self._partitions: log.debug('Adding partition %s/%s', self.name, meta.id) self._partitions[meta.id] = Partition( From 27f7248550223e322ed95f73f03fa2c78937b74c Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 24 Aug 2017 12:40:08 -0700 Subject: [PATCH 2/2] fix missing exception --- pykafka/cluster.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pykafka/cluster.py b/pykafka/cluster.py index a9905808c..cb131b1f6 100644 --- a/pykafka/cluster.py +++ b/pykafka/cluster.py @@ -31,7 +31,8 @@ GroupCoordinatorNotAvailable, NoBrokersAvailableError, SocketDisconnectedError, - LeaderNotFoundError) + LeaderNotFoundError, + LeaderNotAvailable) from .protocol import GroupCoordinatorRequest, GroupCoordinatorResponse from .topic import Topic from .utils.compat import iteritems, itervalues, range