diff --git a/pykafka/rdkafka/simple_consumer.py b/pykafka/rdkafka/simple_consumer.py index d3444463d..7e71cd854 100644 --- a/pykafka/rdkafka/simple_consumer.py +++ b/pykafka/rdkafka/simple_consumer.py @@ -13,6 +13,8 @@ log = logging.getLogger(__name__) +LIBRDKAFKA_MAX_ALLOWED_QUEUED_MAX_MESSAGE_BYTES = 2097151 # see librdkafka commit cc43f4 + class RdKafkaSimpleConsumer(SimpleConsumer): """A pykafka.SimpleConsumer with librdkafka-based fetchers @@ -248,9 +250,10 @@ def _mk_rdkafka_config_lists(self): # queued.max.messages.kbytes so for now we infer the implied # maximum (which, with default settings, is ~2GB per partition): "queued.min.messages": self._queued_max_messages, - "queued.max.messages.kbytes": str( + "queued.max.messages.kbytes": str(min( self._queued_max_messages - * self._fetch_message_max_bytes // 1024), + * self._fetch_message_max_bytes // 1024, + LIBRDKAFKA_MAX_ALLOWED_QUEUED_MAX_MESSAGE_BYTES)), "fetch.wait.max.ms": self._fetch_wait_max_ms, "fetch.message.max.bytes": self._fetch_message_max_bytes,