Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #714 from Parsely/refactor/broker_exceptions
Browse files Browse the repository at this point in the history
use LeaderNotFoundError instead of LeaderNotAvailable
  • Loading branch information
emmettbutler authored Aug 24, 2017
2 parents 467c047 + 27f7248 commit 3c125f5
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 12 deletions.
9 changes: 5 additions & 4 deletions pykafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
GroupCoordinatorNotAvailable,
NoBrokersAvailableError,
SocketDisconnectedError,
LeaderNotFoundError,
LeaderNotAvailable)
from .protocol import GroupCoordinatorRequest, GroupCoordinatorResponse
from .topic import Topic
Expand Down Expand Up @@ -66,8 +67,8 @@ def __getitem__(self, key):
meta = self._cluster()._get_metadata([key])
try:
topic = Topic(self._cluster(), meta.topics[key])
except LeaderNotAvailable:
log.warning("LeaderNotAvailable encountered during Topic creation")
except LeaderNotFoundError:
log.warning("LeaderNotFoundError encountered during Topic creation")
if i == self._cluster()._max_connection_retries - 1:
raise
else:
Expand Down Expand Up @@ -455,8 +456,8 @@ def update(self):
self._update_brokers(metadata.brokers)
try:
self._topics._update_topics(metadata.topics)
except LeaderNotAvailable:
log.warning("LeaderNotAvailable encountered. This may be "
except LeaderNotFoundError:
log.warning("LeaderNotFoundError encountered. This may be "
"because one or more partitions have no available replicas.")
if i == self._max_connection_retries - 1:
raise
Expand Down
17 changes: 15 additions & 2 deletions pykafka/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ class NoBrokersAvailableError(KafkaException):
pass


class LeaderNotFoundError(KafkaException):
"""Indicates that the leader broker for a given partition was not found during
an update in response to a MetadataRequest
"""
pass


class SocketDisconnectedError(KafkaException):
"""Indicates that the socket connecting this client to a kafka broker has
become disconnected
Expand Down Expand Up @@ -79,8 +86,14 @@ def __init__(self, partition, *args, **kwargs):
self.partition = partition


# Protocol Client Exceptions
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
"""
Protocol Client Exceptions
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
NOTE: Don't raise these from client code unless it's in direct response to an error
code from the broker. When that's not the case, the exception raised should instead be
a subclass of KafkaException.
"""


class ProtocolClientError(KafkaException):
Expand Down
8 changes: 4 additions & 4 deletions pykafka/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import weakref

from .common import OffsetType
from .exceptions import LeaderNotAvailable
from .exceptions import LeaderNotFoundError
from .protocol import PartitionOffsetRequest

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -153,6 +153,6 @@ def update(self, brokers, metadata):
log.info('Updating in sync replicas list for %s', self)
self._isr = [brokers[b] for b in metadata.isr]
except KeyError:
raise LeaderNotAvailable("Replica for partition %s not available. This is "
"probably because none of its replicas are "
"available.", self.id)
raise LeaderNotFoundError("Replica for partition %s not available. This is "
"probably because none of its replicas are "
"available.", self.id)
4 changes: 2 additions & 2 deletions pykafka/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from .balancedconsumer import BalancedConsumer
from .common import OffsetType
from .exceptions import LeaderNotAvailable
from .exceptions import LeaderNotFoundError
from .managedbalancedconsumer import ManagedBalancedConsumer
from .partition import Partition
from .producer import Producer
Expand Down Expand Up @@ -164,7 +164,7 @@ def update(self, metadata):
log.info("Adding %d partitions", len(p_metas))
for id_, meta in iteritems(p_metas):
if meta.leader not in brokers:
raise LeaderNotAvailable()
raise LeaderNotFoundError()
if meta.id not in self._partitions:
log.debug('Adding partition %s/%s', self.name, meta.id)
self._partitions[meta.id] = Partition(
Expand Down

0 comments on commit 3c125f5

Please sign in to comment.