Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Allow more consumers than partitions for a balanced consumer #554

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions pykafka/balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,13 +537,6 @@ def _set_watches(self):
def _add_self(self):
"""Register this consumer in zookeeper.

This method ensures that the number of participants is at most the
number of partitions.
"""
participants = self._get_participants()
if len(self._topic.partitions) <= len(participants):
raise KafkaException("Cannot add consumer: more consumers than partitions")

self._zookeeper.create(
self._path_self, self._topic.name, ephemeral=True, makepath=True)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This endquote needs to be kept.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops; fixing

Expand Down
118 changes: 118 additions & 0 deletions tests/pykafka/test_issue_527.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@

#!/usr/bin/env python
import threading, logging, time, traceback

from pykafka import KafkaClient
import time

#FIXME: update these to match your cluster
kafka_hosts='kafka_host:9000'
kafka_zookeeper='zkhost:2181/kafka'
topic_name='test_topic'

lock2 = threading.Lock()

mdict = {}
tdict={}

#This test demonstrates how to setup one message to one consumer (a distributed logical queue) in a cluster of N consumers.
#It also proves that it's necessary to address this issue to make it work: https://github.com/Parsely/pykafka/issues/527

class Producer(threading.Thread):
daemon = True

def run(self):
th_name = threading.currentThread().getName()
count = 0
client = KafkaClient(hosts=kafka_hosts)
topic = client.topics[topic_name]
producer = topic.get_producer()
while True:
msg = """{"test":%d}""" % (count)

#keep track of all 10k messages produced
lock2.acquire()
mdict[msg]=1
lock2.release()

producer.produce(msg.encode("utf-8"))
count += 1
if (count >= 10000):
break
while True:

time.sleep(5)
if len(mdict) > 0:
print(len(mdict))
else:
#check if only one consumer processed a given message; total of 10k
print(0)
ccnt = 0
for v in tdict.values():
ccnt += v
print(ccnt)

class Consumer(threading.Thread):
daemon = True

def run(self):
th_name = threading.currentThread().getName()

client = KafkaClient(hosts=kafka_hosts)
topic = client.topics[topic_name]

consumer = topic.get_balanced_consumer(consumer_group='group1', auto_commit_enable=True, zookeeper_connect=kafka_zookeeper)

while True:
try:
message = consumer.consume(block=True)

txt = message.value.decode("utf-8")

#keep track of not only that message was processed, but also by how many consumers (should be only one to one)
lock2.acquire()
if mdict.has_key(txt):
mdict.pop(txt)

cnt = tdict.get(txt)

if cnt is None:
tdict[txt]=1
else:
tdict[txt]=(cnt+1)

lock2.release()

print ("Consumer %s; Offset %s; messsage %s" % (th_name, message.offset, txt))

except Exception as e:
print(e)
logging.error(traceback.format_exc())
consumer = topic.get_balanced_consumer(consumer_group='group1', zookeeper_connect=kafka_zookeeper)
def main():
threads = [
Producer(),
Consumer(),
Consumer(),
Consumer(),
Consumer(),
Consumer(),
Consumer(),
Consumer(),
Consumer(),
Consumer(),
Consumer(),
Consumer()
]

for t in threads:
t.start()

time.sleep(1000000)

if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
level=logging.INFO
)
main()