-
Notifications
You must be signed in to change notification settings - Fork 232
Allow more consumers than partitions for a balanced consumer #554
Conversation
@@ -537,13 +537,6 @@ def _set_watches(self): | |||
def _add_self(self): | |||
"""Register this consumer in zookeeper. | |||
|
|||
This method ensures that the number of participants is at most the | |||
number of partitions. | |||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This endquote needs to be kept.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops; fixing
Thanks @vitalyli, this is a good start. For this to be merged, the test will need to be runnable in Travis. That means it should probably avoid creating new threads wherever possible, since daemon threads can cause Travis to hang. Producing 10000 messages will probably make test runs take much longer than they need to. Additionally, this test only barely verifies that there are no crashes in the code you've written, but since there are components running on separate threads, crashes aren't guaranteed to be visible to Travis. I suggest moving your test case to Broadly, this is the logic I'd think would be required for a test of this functionality (pseudocode): # produce some test messages to 2-partition topic
c1 = topic.get_balanced_consumer(group)
c2 = topic.get_balanced_consumer(group)
c3 = topic.get_balanced_consumer(group)
# verify that the first two consumers can consume correctly
m1 = c1.consume()
m2 = c2.consume()
self.assertNotEqual(m1, None)
self.assertNotEqual(m2, None)
# verify that the last consumer doesn't get any messages
m3 = c3.consume()
self.assertEqual(m3, None)
# verify that the last consumer can start consuming if some partitions become available
c2.stop()
m2 = c2.consume()
m3 = c3.consume()
self.assertEqual(m2, None)
self.assertNotEqual(m3, None) This is a high-level view of what I think the test for this change needs to look like. |
@emmett9001 I'm not really familiar with travis and quite new to this project; literally spent one day banging my head against Kafka and found what works on my actual cluster. The test is by no means exhaustive. I'm in a process of setting up 10 or 20 dockers on ten machines; I will verify it there, but it will take a bit longer. If you guys want to take it over and make a better test for it, I'm ok closing this PR. |
@vitalyli Sounds fine, thanks for being so responsive. Of course, if you find that this change works in your own environment, you can certainly run a forked version of pykafka until this change makes it into a proper release. Even with the test being finished right now, it would probably take a few weeks to cut a new release. I'll prioritize this for the next time I do some work on open issues. |
Sounds good thanks! |
Addresses issue: #527