diff --git a/pykafka/cluster.py b/pykafka/cluster.py index 0b71f37cb..cb131b1f6 100644 --- a/pykafka/cluster.py +++ b/pykafka/cluster.py @@ -31,6 +31,7 @@ GroupCoordinatorNotAvailable, NoBrokersAvailableError, SocketDisconnectedError, + LeaderNotFoundError, LeaderNotAvailable) from .protocol import GroupCoordinatorRequest, GroupCoordinatorResponse from .topic import Topic @@ -66,8 +67,8 @@ def __getitem__(self, key): meta = self._cluster()._get_metadata([key]) try: topic = Topic(self._cluster(), meta.topics[key]) - except LeaderNotAvailable: - log.warning("LeaderNotAvailable encountered during Topic creation") + except LeaderNotFoundError: + log.warning("LeaderNotFoundError encountered during Topic creation") if i == self._cluster()._max_connection_retries - 1: raise else: @@ -455,8 +456,8 @@ def update(self): self._update_brokers(metadata.brokers) try: self._topics._update_topics(metadata.topics) - except LeaderNotAvailable: - log.warning("LeaderNotAvailable encountered. This may be " + except LeaderNotFoundError: + log.warning("LeaderNotFoundError encountered. This may be " "because one or more partitions have no available replicas.") if i == self._max_connection_retries - 1: raise diff --git a/pykafka/exceptions.py b/pykafka/exceptions.py index c4d583916..cb914fd5b 100644 --- a/pykafka/exceptions.py +++ b/pykafka/exceptions.py @@ -29,6 +29,13 @@ class NoBrokersAvailableError(KafkaException): pass +class LeaderNotFoundError(KafkaException): + """Indicates that the leader broker for a given partition was not found during + an update in response to a MetadataRequest + """ + pass + + class SocketDisconnectedError(KafkaException): """Indicates that the socket connecting this client to a kafka broker has become disconnected @@ -74,8 +81,14 @@ def __init__(self, partition, *args, **kwargs): self.partition = partition -# Protocol Client Exceptions -# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes +""" +Protocol Client Exceptions +https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes + +NOTE: Don't raise these from client code unless it's in direct response to an error +code from the broker. When that's not the case, the exception raised should instead be +a subclass of KafkaException. +""" class ProtocolClientError(KafkaException): diff --git a/pykafka/partition.py b/pykafka/partition.py index b22d34e1b..c0959e465 100644 --- a/pykafka/partition.py +++ b/pykafka/partition.py @@ -22,7 +22,7 @@ import weakref from .common import OffsetType -from .exceptions import LeaderNotAvailable +from .exceptions import LeaderNotFoundError from .protocol import PartitionOffsetRequest log = logging.getLogger(__name__) @@ -153,6 +153,6 @@ def update(self, brokers, metadata): log.info('Updating in sync replicas list for %s', self) self._isr = [brokers[b] for b in metadata.isr] except KeyError: - raise LeaderNotAvailable("Replica for partition %s not available. This is " - "probably because none of its replicas are " - "available.", self.id) + raise LeaderNotFoundError("Replica for partition %s not available. This is " + "probably because none of its replicas are " + "available.", self.id) diff --git a/pykafka/topic.py b/pykafka/topic.py index b6d256b2d..f898beb49 100644 --- a/pykafka/topic.py +++ b/pykafka/topic.py @@ -22,7 +22,7 @@ from .balancedconsumer import BalancedConsumer from .common import OffsetType -from .exceptions import LeaderNotAvailable +from .exceptions import LeaderNotFoundError from .managedbalancedconsumer import ManagedBalancedConsumer from .partition import Partition from .producer import Producer @@ -164,7 +164,7 @@ def update(self, metadata): log.info("Adding %d partitions", len(p_metas)) for id_, meta in iteritems(p_metas): if meta.leader not in brokers: - raise LeaderNotAvailable() + raise LeaderNotFoundError() if meta.id not in self._partitions: log.debug('Adding partition %s/%s', self.name, meta.id) self._partitions[meta.id] = Partition(