From 92b75f3186c594b89c4c9850514c3f65e8e7dd96 Mon Sep 17 00:00:00 2001 From: Emmett Butler Date: Thu, 24 Aug 2017 11:43:29 -0700 Subject: [PATCH] fix exception when messageset is too large for decode buffer new exception does not inherit from ProtocolClientError, removing ambiguity about whether this error originated at the broker or the client fixes #697 --- pykafka/exceptions.py | 5 +++++ pykafka/protocol.py | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pykafka/exceptions.py b/pykafka/exceptions.py index c4d583916..934085e5d 100644 --- a/pykafka/exceptions.py +++ b/pykafka/exceptions.py @@ -51,6 +51,11 @@ class NoMessagesConsumedError(KafkaException): pass +class MessageSetDecodeFailure(KafkaException): + """Indicates a generic failure in the decoding of a MessageSet from the broker""" + pass + + class ProducerQueueFullError(KafkaException): """Indicates that one or more of the AsyncProducer's internal queues contain at least max_queued_messages messages""" pass diff --git a/pykafka/protocol.py b/pykafka/protocol.py index 1dc31e5b6..fa2ce0bdb 100644 --- a/pykafka/protocol.py +++ b/pykafka/protocol.py @@ -65,7 +65,7 @@ from .common import CompressionType, Message -from .exceptions import ERROR_CODES, MessageSizeTooLarge +from .exceptions import ERROR_CODES, MessageSetDecodeFailure from .utils import Serializable, compression, struct_helpers from .utils.compat import iteritems, itervalues, buffer @@ -379,7 +379,7 @@ def decode(cls, buff, partition_id=-1): messages.append(message) offset += size if len(messages) == 0 and attempted: - raise MessageSizeTooLarge(size) + raise MessageSetDecodeFailure(size) return MessageSet(messages=messages) def pack_into(self, buff, offset):