Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proposal] New concurrency parameter for consuming #45

Open
pfreixes opened this issue Dec 28, 2020 · 1 comment
Open

[Proposal] New concurrency parameter for consuming #45

pfreixes opened this issue Dec 28, 2020 · 1 comment

Comments

@pfreixes
Copy link
Contributor

Inspired on the solution draft in this blog [1] I would like to propose the addition of a new parameter when new subscribers are configured for enforcing concurrency.

The concurrency parameter - default None meaning no concurrency - will tell Kafkaesk the maximum number of inflight messages that are processed at the same time.

Concurrency will be aplied between records that do not belong to the same partition, so at any time all inflight messages must belong to different partitions.

Maximum throughtput would be achieved only if there are enough partitons assigned to tha consumer. At sone point the fact of having more consuners might imply having less throughtput, since consumers could not make usage of the concurrency since would not be enough partitons. Limiting the size of the cluster should address the issue.

Rationale

Some consumer types might need to spent some time making IO operations, adding more consumers within the cobsumer group would eventually mitigate the issue but at some cost.

By allowing the consuners making concurrent process of partitions within the same consumer the problem is addressed in a more cost effective way.

Implementation details

  • Subsequent calls to getmany must return messages from partitions that do not have messages inflight within the consumer.
  • The consumer would put together messages from partitions keeping the original order.
  • A limited number of Asyncio tasks would gather messages from previous group using a RounRobin algorithm
  • Offset commits would be done per partition.

[1] https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/

@vangheem
Copy link
Contributor

This would be really nice. That article is really nice.

This might not be trivial because...

  • you need to manage gathering data manually against assigned partitions with read timeouts as default implementation reads from all and does not interleave data between partitions
  • manually track offsets to commit. We had an implementation of this that I ended up reverting as I suspected it could be affecting our rebalancing issues(I'm not convinced it was actually the problem though).

But advantages of this is that you can still work on and commit messages against different topics/partitions that aren't failing.

An alternative, much more simple approach could be to do simple concurrency on messages coming in against all partitions(aiokafka batches against a single partition and does not interleave) and do not worry about splitting against partitions and managing the manual commit. A failure in one message means you might need have messages that were processed but can not be committed. That is the case right now technically anyways unfortunately because we commit on interval and do not track last known good offsets and only commit that(we had implementation of this but removed). So maybe no matter what implementation, we'll need a more correct and safe manual commit anyways.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants