diff --git a/pykafka/balancedconsumer.py b/pykafka/balancedconsumer.py index 3fce1fc54..3bfaf2eac 100644 --- a/pykafka/balancedconsumer.py +++ b/pykafka/balancedconsumer.py @@ -535,14 +535,7 @@ def _set_watches(self): self._setting_watches = False def _add_self(self): - """Register this consumer in zookeeper. - - This method ensures that the number of participants is at most the - number of partitions. - """ - participants = self._get_participants() - if len(self._topic.partitions) <= len(participants): - raise KafkaException("Cannot add consumer: more consumers than partitions") + """Register this consumer in zookeeper.""" self._zookeeper.create( self._path_self, self._topic.name, ephemeral=True, makepath=True) diff --git a/tests/pykafka/test_issue_527.py b/tests/pykafka/test_issue_527.py new file mode 100644 index 000000000..b3d2c81d0 --- /dev/null +++ b/tests/pykafka/test_issue_527.py @@ -0,0 +1,118 @@ + +#!/usr/bin/env python +import threading, logging, time, traceback + +from pykafka import KafkaClient +import time + +#FIXME: update these to match your cluster +kafka_hosts='kafka_host:9000' +kafka_zookeeper='zkhost:2181/kafka' +topic_name='test_topic' + +lock2 = threading.Lock() + +mdict = {} +tdict={} + +#This test demonstrates how to setup one message to one consumer (a distributed logical queue) in a cluster of N consumers. +#It also proves that it's necessary to address this issue to make it work: https://github.com/Parsely/pykafka/issues/527 + +class Producer(threading.Thread): + daemon = True + + def run(self): + th_name = threading.currentThread().getName() + count = 0 + client = KafkaClient(hosts=kafka_hosts) + topic = client.topics[topic_name] + producer = topic.get_producer() + while True: + msg = """{"test":%d}""" % (count) + + #keep track of all 10k messages produced + lock2.acquire() + mdict[msg]=1 + lock2.release() + + producer.produce(msg.encode("utf-8")) + count += 1 + if (count >= 10000): + break + while True: + + time.sleep(5) + if len(mdict) > 0: + print(len(mdict)) + else: + #check if only one consumer processed a given message; total of 10k + print(0) + ccnt = 0 + for v in tdict.values(): + ccnt += v + print(ccnt) + +class Consumer(threading.Thread): + daemon = True + + def run(self): + th_name = threading.currentThread().getName() + + client = KafkaClient(hosts=kafka_hosts) + topic = client.topics[topic_name] + + consumer = topic.get_balanced_consumer(consumer_group='group1', auto_commit_enable=True, zookeeper_connect=kafka_zookeeper) + + while True: + try: + message = consumer.consume(block=True) + + txt = message.value.decode("utf-8") + + #keep track of not only that message was processed, but also by how many consumers (should be only one to one) + lock2.acquire() + if mdict.has_key(txt): + mdict.pop(txt) + + cnt = tdict.get(txt) + + if cnt is None: + tdict[txt]=1 + else: + tdict[txt]=(cnt+1) + + lock2.release() + + print ("Consumer %s; Offset %s; messsage %s" % (th_name, message.offset, txt)) + + except Exception as e: + print(e) + logging.error(traceback.format_exc()) + consumer = topic.get_balanced_consumer(consumer_group='group1', zookeeper_connect=kafka_zookeeper) +def main(): + threads = [ + Producer(), + Consumer(), + Consumer(), + Consumer(), + Consumer(), + Consumer(), + Consumer(), + Consumer(), + Consumer(), + Consumer(), + Consumer(), + Consumer() + ] + + for t in threads: + t.start() + + time.sleep(1000000) + +if __name__ == "__main__": + logging.basicConfig( + format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', + level=logging.INFO + ) + main()