diff --git a/pykafka/exceptions.py b/pykafka/exceptions.py index c4d583916..934085e5d 100644 --- a/pykafka/exceptions.py +++ b/pykafka/exceptions.py @@ -51,6 +51,11 @@ class NoMessagesConsumedError(KafkaException): pass +class MessageSetDecodeFailure(KafkaException): + """Indicates a generic failure in the decoding of a MessageSet from the broker""" + pass + + class ProducerQueueFullError(KafkaException): """Indicates that one or more of the AsyncProducer's internal queues contain at least max_queued_messages messages""" pass diff --git a/pykafka/protocol.py b/pykafka/protocol.py index 1dc31e5b6..fa2ce0bdb 100644 --- a/pykafka/protocol.py +++ b/pykafka/protocol.py @@ -65,7 +65,7 @@ from .common import CompressionType, Message -from .exceptions import ERROR_CODES, MessageSizeTooLarge +from .exceptions import ERROR_CODES, MessageSetDecodeFailure from .utils import Serializable, compression, struct_helpers from .utils.compat import iteritems, itervalues, buffer @@ -379,7 +379,7 @@ def decode(cls, buff, partition_id=-1): messages.append(message) offset += size if len(messages) == 0 and attempted: - raise MessageSizeTooLarge(size) + raise MessageSetDecodeFailure(size) return MessageSet(messages=messages) def pack_into(self, buff, offset):