Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Allow more consumers than partitions for a balanced consumer #554

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions pykafka/balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,14 +535,7 @@ def _set_watches(self):
self._setting_watches = False

def _add_self(self):
"""Register this consumer in zookeeper.

This method ensures that the number of participants is at most the
number of partitions.
"""
participants = self._get_participants()
if len(self._topic.partitions) <= len(participants):
raise KafkaException("Cannot add consumer: more consumers than partitions")
"""Register this consumer in zookeeper."""

self._zookeeper.create(
self._path_self, self._topic.name, ephemeral=True, makepath=True)
Expand Down
118 changes: 118 additions & 0 deletions tests/pykafka/test_issue_527.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@

#!/usr/bin/env python
import threading, logging, time, traceback

from pykafka import KafkaClient
import time

#FIXME: update these to match your cluster
kafka_hosts='kafka_host:9000'
kafka_zookeeper='zkhost:2181/kafka'
topic_name='test_topic'

lock2 = threading.Lock()

mdict = {}
tdict={}

#This test demonstrates how to setup one message to one consumer (a distributed logical queue) in a cluster of N consumers.
#It also proves that it's necessary to address this issue to make it work: https://github.com/Parsely/pykafka/issues/527

class Producer(threading.Thread):
daemon = True

def run(self):
th_name = threading.currentThread().getName()
count = 0
client = KafkaClient(hosts=kafka_hosts)
topic = client.topics[topic_name]
producer = topic.get_producer()
while True:
msg = """{"test":%d}""" % (count)

#keep track of all 10k messages produced
lock2.acquire()
mdict[msg]=1
lock2.release()

producer.produce(msg.encode("utf-8"))
count += 1
if (count >= 10000):
break
while True:

time.sleep(5)
if len(mdict) > 0:
print(len(mdict))
else:
#check if only one consumer processed a given message; total of 10k
print(0)
ccnt = 0
for v in tdict.values():
ccnt += v
print(ccnt)

class Consumer(threading.Thread):
daemon = True

def run(self):
th_name = threading.currentThread().getName()

client = KafkaClient(hosts=kafka_hosts)
topic = client.topics[topic_name]

consumer = topic.get_balanced_consumer(consumer_group='group1', auto_commit_enable=True, zookeeper_connect=kafka_zookeeper)

while True:
try:
message = consumer.consume(block=True)

txt = message.value.decode("utf-8")

#keep track of not only that message was processed, but also by how many consumers (should be only one to one)
lock2.acquire()
if mdict.has_key(txt):
mdict.pop(txt)

cnt = tdict.get(txt)

if cnt is None:
tdict[txt]=1
else:
tdict[txt]=(cnt+1)

lock2.release()

print ("Consumer %s; Offset %s; messsage %s" % (th_name, message.offset, txt))

except Exception as e:
print(e)
logging.error(traceback.format_exc())
consumer = topic.get_balanced_consumer(consumer_group='group1', zookeeper_connect=kafka_zookeeper)
def main():
threads = [
Producer(),
Consumer(),
Consumer(),
Consumer(),
Consumer(),
Consumer(),
Consumer(),
Consumer(),
Consumer(),
Consumer(),
Consumer(),
Consumer()
]

for t in threads:
t.start()

time.sleep(1000000)

if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
level=logging.INFO
)
main()