Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

consuming rate decreasing #790

Closed
arita37 opened this issue Apr 24, 2018 · 4 comments
Closed

consuming rate decreasing #790

arita37 opened this issue Apr 24, 2018 · 4 comments
Labels

Comments

@arita37
Copy link

arita37 commented Apr 24, 2018

We have the following scheme with several python jobs:

While True :
balancedConsumer
processMsg
produceMsg

All in auto-commit

Kafka server has fixed output rate 1000 msg/sec.
For some reasons, Input rate is very high at beginning 1200 msg/sec and after
drops to 500 msg/sec, around half of the normal output rate.

Why the reason of the drop ?
How to check that consumer input rate is same than kafka server ?

processMSG is very low latency (<1 ms processing).

PyKafka version: v2.7.0
Kafka version: 0.8.1

@arita37
Copy link
Author

arita37 commented Apr 24, 2018

What happens is consumer is consuming
at 1/2 of the real data flow.

@emmettbutler
Copy link
Contributor

Thanks for the ticket @arita37. Without actual example code, it's hard to know what's really happening. Based on the logic you referenced, I'm guessing you might be doing something like this:

while True:
    consumer = topic.get_balanced_consumer()
    consumer.consume()

If you're indeed calling topic.get_balanced_consumer() on every loop iteration, that would definitely be a cause of the slowdown you're seeing. The consumer only needs to be instantiated once and can be used thereafter in a loop, like this:

consumer = topic.get_balanced_consumer()
while True:   
    consumer.consume()

Please feel free to reopen or comment on this issue I've gotten it wrong.

@arita37
Copy link
Author

arita37 commented Apr 25, 2018

Hello,

We are using normal code as follow :

consumer = topic.get_balanced_consumer()
while True:
    try:
        for message in consumer:
              ii+=1
              msgraw = consumer.consume()
              msg    = msgraw.value.decode('utf8')
              msg    = json.loads(msg)   
              .... #very fast process
              producer.produce(json_data)
   except Exception as e : 
               print(e)

What happens :
0min - 2mins : Input flow is same Kafka server (test server with fixed flow).
After 2mins, around half of the expected.

  1. Just wondering if you have done some tests to see the consuming flow is matching kafka flow ?

  2. Anything related to producer latency.
    Most of the case, it's around 1/2 of the expected flow.....

Can we re-open this issue ?

@emmettbutler
Copy link
Contributor

You can delete the line msgraw = consumer.consume() from your example code. It has the same function as for message in consumer, which uses the iterator protocol to provide one consumed message per iteration. What your example code is doing is to effectively skip every other message from the consumer: message is ignored, then the next msgraw is consumed and processed. This would predictably lead to the halving of throughput you're experiencing. We automatically test that pykafka doesn't drop messages between fetch and consume.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

2 participants