Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Allow more BalancedConsumers than partitions on a topic #527

Closed
jianbin-wei opened this issue Apr 18, 2016 · 10 comments · Fixed by #555
Closed

Allow more BalancedConsumers than partitions on a topic #527

jianbin-wei opened this issue Apr 18, 2016 · 10 comments · Fixed by #555

Comments

@jianbin-wei
Copy link

In case of one topic with one partition, if two balanced consumers are created within the same group for the topic, the later one would raise exception. When the first balanced consumer is down, messages are not consumed anymore

However, in Java kafka client, the later kafka consumer would continue to consume the topic's messages. The behavior is more resilient.

2016-04-18 06:25:09,420 ERROR balancedconsumer start:304 31791 Stopping consumer in response to error
Traceback (most recent call last):
File "/service-venv/lib/python2.7/site-packages/pykafka/balancedconsumer.py", line 299, in start
self._add_self()
File "/service-venv/lib/python2.7/site-packages/pykafka/balancedconsumer.py", line 541, in _add_self
raise KafkaException("Cannot add consumer: more consumers than partitions")

PyKafka version: 2.3.1
Kafka version: 0.8.2.1

@emmettbutler
Copy link
Contributor

@jianbin-wei If you have a topic with a single partition, I'd suggest using a pair of SimpleConsumers to consume it with some degree of resilience. A BalancedConsumer cannot be added to a full group, since doing so would violate the semantics Kafka defines for balanced consumption. The subset of messages that the N+1th consumer of an N-partition topic should consume is not defined by Kafka's balancing semantics - should it consume the whole stream? A random subset of partitions? A single partition? Since this behavior is undefined, pykafka chooses to disallow it. Pykafka still supports more than N consumers of a single topic if those consumers are SimpleConsumer instances.

@jianbin-wei
Copy link
Author

A BalancedConsumer cannot be added to a full group, since doing so would violate the semantics Kafka defines for balanced consumption. The subset of messages that the N+1th consumer of an N-partition topic should consume is not defined by Kafka's balancing semantics - should it consume the whole stream? A random subset of partitions? A single partition?

From what I see the behavior of Kafka console consumer is good. The N+1th consumer is idle as its assigned partition is none. When one consumer dies, the N+1th consumer starts to consume (after rebalancing).

@emmettbutler emmettbutler reopened this Apr 18, 2016
@emmettbutler emmettbutler changed the title BalancedConsumer is not auto balanced Allow more BalancedConsumers than partitions on a topic Apr 18, 2016
@jianbin-wei
Copy link
Author

thanks a lot.

@jianbin-wei
Copy link
Author

Actually this one is needed for #354 . In that case, for one consumer to consume from multiple topics with different number of partitions, it would be better to idle around and consume if needed.

@yungchin
Copy link
Contributor

Given that this already works as desired for the edge case where consumers are added in quick succession (fixed in #392), and with a quick look at _decide_partitions(), I think all that's required here is no longer raising this exception in _add_self().

@jianbin-wei could you test if that's good enough, ie just removing

if len(self._topic.partitions) <= len(participants):
raise KafkaException("Cannot add consumer: more consumers than partitions")
?

@jianbin-wei
Copy link
Author

@yungchin Yes in my simple test removing those two lines is enough. You would need to have regression test done though.

@emmettbutler
Copy link
Contributor

+1 to the idea of idling extra consumers

@vitalyli
Copy link

vitalyli commented May 24, 2016

fyi: I've tested this one with 10k messages sent and received by 10 consumers; removing these two lines allows for more consumers than partitions and amount of data produced equals to amount of data consumed with one consumer thread getting exactly one message:

balancedconsumer.py line 540 - below two lines need to be removed:
if len(self._topic.partitions) <= len(participants):
raise KafkaException("Cannot add consumer: more consumers than partitions")

Let's verify and merge this; it is a blocker for me at the moment.

@emmettbutler
Copy link
Contributor

@vitalyli Thanks for that investigation. Mind opening a pull request that removes those lines and adds a test case that verifies that it works as expected?

@vitalyli
Copy link

Please see PR for it: #554

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants