-
Notifications
You must be signed in to change notification settings - Fork 232
Consume multiple topics at once #395
Comments
Duplicate of #354 |
Hi @magcius, thanks for asking. The current pykafka consumer API doesn't support using a single consumers = mymodule.get_consumers()
while True:
for consumer in consumers:
msg = consumer.consume(block=False)
if msg is not None:
print "Got message {} for topic {}".format(msg.value, consumer.topic.name) I think this should be ok CPU-wise, since the consumer internally uses a semaphore to avoid busywaiting. If that doesn't prove to be true, you could look at accessing its internal semaphore to avoid unnecessary CPU usage. I'm going to close this as a duplicate, so please post any other concerns related to this issue on #354 |
Oh, I didn't realize there was a |
Actually, this will still spin CPU-wise, since we still need to keep checking the semaphore over and over -- there's no way to wait on multiple consumers at once, you need an event flag for that. A better approach would be to split out the topic balancer and the thing that pulls from the consumer, so that all the partitions go through the semaphore. |
Yeah, can confirm that your approach, as written, rapidly spins the CPU 100% on my EC2 boxes. Access to the internal semaphore doesn't help, since you effectively need to poll on multiple semaphores at once. |
FYI librdkafka has support for this. |
Hey,
Since you retrieve a consumer from a topic, the API doesn't have a possibility to consume multiple topics at once. Is it possible to consume multiple topics at once from the same consumer, with messages from each topic interleaved?
I suppose I could create multiple consumers and somehow round-robin between them, but I can't figure out a way to poll on all of them at a time, to know when one is ready.
Am I missing something obvious?
The text was updated successfully, but these errors were encountered: