Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

use LeaderNotFoundError instead of LeaderNotAvailable #714

Merged
merged 2 commits into from
Aug 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions pykafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
GroupCoordinatorNotAvailable,
NoBrokersAvailableError,
SocketDisconnectedError,
LeaderNotFoundError,
LeaderNotAvailable)
from .protocol import GroupCoordinatorRequest, GroupCoordinatorResponse
from .topic import Topic
Expand Down Expand Up @@ -66,8 +67,8 @@ def __getitem__(self, key):
meta = self._cluster()._get_metadata([key])
try:
topic = Topic(self._cluster(), meta.topics[key])
except LeaderNotAvailable:
log.warning("LeaderNotAvailable encountered during Topic creation")
except LeaderNotFoundError:
log.warning("LeaderNotFoundError encountered during Topic creation")
if i == self._cluster()._max_connection_retries - 1:
raise
else:
Expand Down Expand Up @@ -455,8 +456,8 @@ def update(self):
self._update_brokers(metadata.brokers)
try:
self._topics._update_topics(metadata.topics)
except LeaderNotAvailable:
log.warning("LeaderNotAvailable encountered. This may be "
except LeaderNotFoundError:
log.warning("LeaderNotFoundError encountered. This may be "
"because one or more partitions have no available replicas.")
if i == self._max_connection_retries - 1:
raise
Expand Down
17 changes: 15 additions & 2 deletions pykafka/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ class NoBrokersAvailableError(KafkaException):
pass


class LeaderNotFoundError(KafkaException):
"""Indicates that the leader broker for a given partition was not found during
an update in response to a MetadataRequest
"""
pass


class SocketDisconnectedError(KafkaException):
"""Indicates that the socket connecting this client to a kafka broker has
become disconnected
Expand Down Expand Up @@ -74,8 +81,14 @@ def __init__(self, partition, *args, **kwargs):
self.partition = partition


# Protocol Client Exceptions
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
"""
Protocol Client Exceptions
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes

NOTE: Don't raise these from client code unless it's in direct response to an error
code from the broker. When that's not the case, the exception raised should instead be
a subclass of KafkaException.
"""


class ProtocolClientError(KafkaException):
Expand Down
8 changes: 4 additions & 4 deletions pykafka/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import weakref

from .common import OffsetType
from .exceptions import LeaderNotAvailable
from .exceptions import LeaderNotFoundError
from .protocol import PartitionOffsetRequest

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -153,6 +153,6 @@ def update(self, brokers, metadata):
log.info('Updating in sync replicas list for %s', self)
self._isr = [brokers[b] for b in metadata.isr]
except KeyError:
raise LeaderNotAvailable("Replica for partition %s not available. This is "
"probably because none of its replicas are "
"available.", self.id)
raise LeaderNotFoundError("Replica for partition %s not available. This is "
"probably because none of its replicas are "
"available.", self.id)
4 changes: 2 additions & 2 deletions pykafka/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from .balancedconsumer import BalancedConsumer
from .common import OffsetType
from .exceptions import LeaderNotAvailable
from .exceptions import LeaderNotFoundError
from .managedbalancedconsumer import ManagedBalancedConsumer
from .partition import Partition
from .producer import Producer
Expand Down Expand Up @@ -164,7 +164,7 @@ def update(self, metadata):
log.info("Adding %d partitions", len(p_metas))
for id_, meta in iteritems(p_metas):
if meta.leader not in brokers:
raise LeaderNotAvailable()
raise LeaderNotFoundError()
if meta.id not in self._partitions:
log.debug('Adding partition %s/%s', self.name, meta.id)
self._partitions[meta.id] = Partition(
Expand Down