Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

RdKafkaException: receive.message.max.bytes must be >= fetch.max.bytes + 512 #960

Closed
aguyyala-disco opened this issue Aug 20, 2019 · 2 comments

Comments

@aguyyala-disco
Copy link

aguyyala-disco commented Aug 20, 2019

I'm running into the following exception when using balanced_consumer with rdkafka=True.

pykafka.exceptions.RdKafkaException: receive.message.max.bytes must be >= fetch.max.bytes + 512

The same piece of code by just making rdkafka=False will work fine.

The following is the code.

import io
import avro.schema
import time
import avro.io

import logging
kafkalogger = logging.getLogger('pykafka')
kafkalogger.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.DEBUG)
kafkalogger.addHandler(console_handler)

client = KafkaClient(hosts='localhost:9092')

topic = client.topics['ingest']

key_schema = avro.schema.Parse(open("ifs_ingest_key.avsc", "r").read())
value_schema = avro.schema.Parse(open("ifs_ingest_value.avsc", "r").read())

balanced_consumer = topic.get_balanced_consumer(
    consumer_group='testgroup',
    zookeeper_connect='10.10.146.168:2181,10.10.132.73:2181,10.10.175.137:2181',
    use_rdkafka=True
)

start_time = time.time()

cnt = 0
for message in balanced_consumer:
    if message is not None:
        bytes_reader = io.BytesIO(message.value)
        decoder = avro.io.BinaryDecoder(bytes_reader)
        reader = avro.io.DatumReader(value_schema)
        record = reader.read(decoder)
        print(record)
        cnt += 1

balanced_consumer.stop()
print("%d --- %s seconds ---" % (cnt, time.time() - start_time))

The following is debug output:

Traceback (most recent call last):
  File "/Users/guyyala/git/IFS/venv/lib/python3.6/site-packages/pykafka/balancedconsumer.py", line 342, in start
    self._rebalance()
  File "/Users/guyyala/git/IFS/venv/lib/python3.6/site-packages/pykafka/balancedconsumer.py", line 615, in _rebalance
    self._update_member_assignment()
  File "/Users/guyyala/git/IFS/venv/lib/python3.6/site-packages/pykafka/balancedconsumer.py", line 586, in _update_member_assignment
    if self._setup_internal_consumer(new_partitions):
  File "/Users/guyyala/git/IFS/venv/lib/python3.6/site-packages/pykafka/balancedconsumer.py", line 395, in _setup_internal_consumer
    cns = self._get_internal_consumer(partitions=list(partitions), start=start)
  File "/Users/guyyala/git/IFS/venv/lib/python3.6/site-packages/pykafka/balancedconsumer.py", line 464, in _get_internal_consumer
    cns.start()
  File "/Users/guyyala/git/IFS/venv/lib/python3.6/site-packages/pykafka/simpleconsumer.py", line 300, in start
    self._fetch_workers = self._setup_fetch_workers()
  File "/Users/guyyala/git/IFS/venv/lib/python3.6/site-packages/pykafka/rdkafka/simple_consumer.py", line 88, in _setup_fetch_workers
    self._rdk_consumer.start(**start_kwargs)
pykafka.exceptions.RdKafkaException: receive.message.max.bytes must be >= fetch.max.bytes + 512
2019-08-20 16:17:47,041 - pykafka.balancedconsumer - DEBUG - Stopping <pykafka.balancedconsumer.BalancedConsumer at 0x1093f7cc0 (consumer_group=testgroup)>
Traceback (most recent call last):
  File "/Users/guyyala/git/IFS/venv/lib/python3.6/site-packages/pykafka/balancedconsumer.py", line 748, in consume
    message = self._consumer.consume(block=block, unblock_event=self._rebalancing_in_progress)
AttributeError: 'NoneType' object has no attribute 'consume'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/guyyala/Library/Application Support/JetBrains/Toolbox/apps/PyCharm-P/ch-0/192.5728.105/PyCharm.app/Contents/helpers/pydev/pydevd.py", line 2060, in <module>
    main()
  File "/Users/guyyala/Library/Application Support/JetBrains/Toolbox/apps/PyCharm-P/ch-0/192.5728.105/PyCharm.app/Contents/helpers/pydev/pydevd.py", line 2054, in main
    globals = debugger.run(setup['file'], None, None, is_module)
  File "/Users/guyyala/Library/Application Support/JetBrains/Toolbox/apps/PyCharm-P/ch-0/192.5728.105/PyCharm.app/Contents/helpers/pydev/pydevd.py", line 1405, in run
    return self._exec(is_module, entry_point_fn, module_name, file, globals, locals)
  File "/Users/guyyala/Library/Application Support/JetBrains/Toolbox/apps/PyCharm-P/ch-0/192.5728.105/PyCharm.app/Contents/helpers/pydev/pydevd.py", line 1412, in _exec
    pydev_imports.execfile(file, globals, locals)  # execute the script
  File "/Users/guyyala/Library/Application Support/JetBrains/Toolbox/apps/PyCharm-P/ch-0/192.5728.105/PyCharm.app/Contents/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "/Users/guyyala/git/IFS/pykafka_consumer.py", line 40, in <module>
    for message in balanced_consumer:
  File "/Users/guyyala/git/IFS/venv/lib/python3.6/site-packages/pykafka/balancedconsumer.py", line 767, in __iter__
    message = self.consume(block=True)
  File "/Users/guyyala/git/IFS/venv/lib/python3.6/site-packages/pykafka/balancedconsumer.py", line 757, in consume
    raise ConsumerStoppedException
pykafka.exceptions.ConsumerStoppedException
2019-08-20 16:17:48,174 - pykafka.simpleconsumer - DEBUG - Finalising <pykafka.rdkafka.simple_consumer.RdKafkaSimpleConsumer at 0x109411940 (consumer_group=b'testgroup')>
2019-08-20 16:17:48,174 - pykafka.rdkafka.simple_consumer - DEBUG - Issued stop to _rdk_consumer.
2019-08-20 16:17:48,175 - pykafka.balancedconsumer - DEBUG - Finalising <pykafka.balancedconsumer.BalancedConsumer at 0x1093f7cc0 (consumer_group=testgroup)>

PyKafka version: 2.8.0
Kafka version: 2.2.1 (AWS MSK)
librdkafka: 1.1.0 (installed on mac using brew)

I googled on this error and searched all stackoverflow but not able to find any solution. Can someone please help me out here.

Thank you!!!

@mkmoisen
Copy link

Have you found a workaround for this yet?

I'm getting a similar error which I've detailed here.

@aguyyala-disco
Copy link
Author

Hi @mkmoisen , No I haven't found a workaround for this. We switched back to using confluent-kafka-python client.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants