Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Allow for producing to topics dynamically. #354

Open
ottomata opened this issue Nov 13, 2015 · 17 comments
Open

Allow for producing to topics dynamically. #354

ottomata opened this issue Nov 13, 2015 · 17 comments

Comments

@ottomata
Copy link
Contributor

Currently, a new producer needs to be instantiated for each topic that you want to produce to. I am working on a service that intakes messages from many producers, and then sends them on to different Kafka topics based on the content of the message. Instantiating a new producer for each topic is a lot of overhead. It would be handy if there was a way to produce to a topic by specifying topic as a parameter to a produce method.

@yungchin
Copy link
Contributor

This would be extra efficient, not just because there would be fewer producer instances and threads around, but because you could combine and send message sets for multiple topics in the same request.

It's not the smallest change, but I can see how we could upgrade the current Producer to become a MultiTopicProducer (horrible name I know), and then provide a new Producer providing the current interface, which would just be a thin class around a MultiTopicProducer.

@emmettbutler emmettbutler added this to the 2.2.0 milestone Nov 19, 2015
@ryan-stateless
Copy link

+1

@emmettbutler
Copy link
Contributor

The same applies to consumers, noted in #395

@emmettbutler emmettbutler modified the milestones: 2.2.1, 2.2.0 Feb 5, 2016
@emmettbutler emmettbutler removed this from the 2.3.0 milestone Mar 8, 2016
@albertein
Copy link

Is there any plans on this, I would find this useful as well.

@jianbin-wei
Copy link

Any update on this? I saw it is postponed multiple times.

@emmettbutler
Copy link
Contributor

It's not a very high priority at the moment. It's a really big change for not that big of a benefit from my perspective, though I gather from the activity on this thread that a lot of folks feel differently.

@jianbin-wei
Copy link

In our use case we are trying to produce to/consume from thousands topics at the same time. Without this feature, it would have big performance impact on our system.

Is there a way to prioritize this feature?

@jianbin-wei
Copy link

Just notice that Kafka consumer in 0.9.0

Each Kafka consumer is able to configure a consumer group that it belongs to, and can dynamically set the list of topics it wants to subscribe to through subscribe(List, ConsumerRebalanceListener), or subscribe to all topics matching certain pattern through subscribe(Pattern, ConsumerRebalanceListener).

Wonder how we can do that with pykafka?

@emmettbutler
Copy link
Contributor

@jianbin-wei What's the source of that quote? I'd like to know if this is a recommended practice, something an existing client already does, or something else.

@albertein
Copy link

@emmett9001 It's on the java consumer doc under "Consumer Groups and Topic Subscriptions"

@emmettbutler
Copy link
Contributor

Yes, last I checked kafka-python's API is similar. This is a reasonable case for supporting it in pykafka, though it's far from a trivial change.

@albertein
Copy link

Any opinions on how the api should look if this gets implemented?

@emmettbutler
Copy link
Contributor

The basic issue is that the only way pykafka currently has to get a consumer or producer instance is to call a method on a Topic instance. Supporting multi-topic consumers and producers would require adding a method to KafkaClient like get_consumer(topics) that accepts a sequence of Topic instances. Beyond that, SimpleConsumer would have to hold a sequence of Topics rather than a single one, and consume() might have to incorporate a topics parameter to control which topic is being consumed from. This in turn would require the consumer to store its queued messages with some way to retrieve them on a per-topic basis. The way the consumer holds its partitions would also have to be altered. I admit that this sounds like only slightly less problematic of a change than I'd anticipated. It's the kind of thing that I'd be able to work on over the course of a few Fridays. Of course, if anyone on this thread would like to take a crack at it, I'm very open to a substantive pull request addressing this issue.

@jianbin-wei
Copy link

@emmett9001 one thing to note from your comment is that this subscribe can be dynamic. It means a pattern. Say it subscribe "topic-test*" and all existing and future topics starts with "topic-test" would be consumed by this instance. A watch on topics would do.

Right now I wrap around BalancedConsumer for this purpose but it is not efficient.

It would be great if this feature is implemented.

@jianbin-wei
Copy link

Following are my thoughts after quick reading of java client and my experience on writing a wrapper to implement the idea of dynamically producing/consuming from topics. Here I only comment on consuming side. @emmett9001 @yungchin @ottomata please let me know your thoughts.

Supporting multi-topic consumers and producers would require adding a method to KafkaClient like get_consumer(topics) that accepts a sequence of Topic instances.

I am thinking about passing a list of topic names (not Topic instances) or a name pattern. A method to dynamically change the consuming topics later would be handy. No need to call get_consumer again and again.

Some change on BalancedConsumer may work.

Beyond that, SimpleConsumer would have to hold a sequence of Topics rather than a single one,

The interface of SimpleConsumer() would be a sequence of topic/partition. Consuming from multiple topics also requires balancing between all available partitions to get partition-level scalability.

and consume() might have to incorporate a topics parameter to control which topic is being consumed from.

IMO, it would be fine without topic parameter. The user should expect a mix of messages from all consuming topics. Each returned message should have meta data about the topic/partition (maybe some other information too) so users can do message handling accordingly.

This in turn would require the consumer to store its queued messages with some way to retrieve them on a per-topic basis.

With above idea, therefore, no need to hold internal queues for each topic/partition. Only offsets of consumed messages need to be held for each topic/partition.

In our use case, we saw large memory usage as It holds messages from each topic/partition it consumes from until the max queued message number. By default it is 2000 and each message in our case is 10KB. We have about 1000 topic/partitions and it has 20G memory print.

The way the consumer holds its partitions would also have to be altered.

If messages are still stored as per-partition, it would not need to be changed. If it is to be stored in one single queue, then yes.

@emmettbutler
Copy link
Contributor

See #786 requesting regex-based topic subscription, which is blocked by this ticket.

@sumantpandey
Copy link

confluent kafka-connect and sink connectors are best solution to this problem

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

7 participants