Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Zookeeper topic and consumer names with b literal #888

Open
Atheuz opened this issue Nov 9, 2018 · 4 comments
Open

Zookeeper topic and consumer names with b literal #888

Atheuz opened this issue Nov 9, 2018 · 4 comments
Labels

Comments

@Atheuz
Copy link

Atheuz commented Nov 9, 2018

I think a problem that I am facing relates to these two issues:

#567
#569

Specifically, my problem is that Zookeeper is saving the b literal inside the string of topic and consumer group names, see the log line below:

2018-11-09 18:18:31     INFO [pykafka.balancedconsumer] Rebalancing consumer "b'DESKTOP-K7VPQI3:f1ea4868-71c1-44f2-a55c-00e7a908b1dc'" for topic "b'this-is-a-topic'".

and:

2018-11-09 18:22:59     INFO [pykafka.cluster] Attempting to discover offset manager for consumer group 'b'this-is-a-consumer-group''

My initialization of pykafka looks like this:

    self.kazoo = KazooClient(hosts=self.zookeeper_hosts)
    self.kazoo.start()
    while not self.kazoo.connected:
        continue

    # Set up the client with the hosts
    self.client = pykafka.KafkaClient(
        zookeeper_hosts=self.zookeeper_hosts,
        socket_timeout_ms=self.socket_timeout_ms,
        broker_version=self.broker_version,
    )

    return self.client.topics[self.topic].get_balanced_consumer(
        consumer_group=self.group_id,
        zookeeper=self.kazoo,
        consumer_timeout_ms=self._consumer_timeout_ms,
        auto_offset_reset=OffsetType.LATEST,
        auto_commit_enable=self._auto_commit_enable,
        auto_commit_interval_ms=self._auto_commit_interval_ms,
    )

If I query Zookeeper with Kazoo with this bit of code:

 print(kazoo.get_children("/consumers/this-is-a-consumer-group/owners"))

I get this:

["b'this-is-a-topic'"]

I don't know if this is an issue with my set up being wrong, but it's not obvious to me why it should be behaving like this. I can't get it to become part of the correct partition and start consuming from the latest offset there, because of this.

PyKafka version: 2.8.0
Kafka version: 0.8.2
Python version: 3.6.6

@emmettbutler
Copy link
Contributor

Thanks for reporting this issue, @Atheuz. What are the types of self.topic and self.group_id? Since #760, they should be str instances.

@Atheuz
Copy link
Author

Atheuz commented Nov 9, 2018

I've tried with both str and b literal strings, and the behaviour is the same. If I run a consumer with either:

  1. It does not appear to commit the offset of its messages as the lag for that consumer group just grows unbounded, and it doesn't appear that any offsets are committed.

  2. It doesn't matter if I use b literals or str instances, the behaviour is the same.

  3. It's giving me these partitions:

    My partitions: ["b'my-unused-topic'-4-112", "b'my-unused-topic'-3-63", "b'my-unused-topic'-5-125", "b'my-unused-topic'-4-40", "b'my-unused-topic'-2-110", ...]

Maybe I'm just not using it correctly, can you tell me where ZK/pykafka saves the partitions and offsets? Is it in /consumers/my-unused-group/offsets/my-unused-topic? That's where my lag metric is being read from. I saw a post where you mentioned that the offset is not being stored in ZK, but in kafka, but I'd assume it'd have to be replicated on ZK given that I don't think the partitions can be stored in Kafka in 0.8.2 and I know those offsets don't match.

@emmettbutler
Copy link
Contributor

Regarding the b literal in zookeeper, I think that's a bug that is definitely annoying but doesn't strictly preclude use of the consumer.
Pykafka stores offset information in Kafka itself via the OffsetCommitRequest and OffsetFetchRequest interface - it does not use Zookeeper for offset storage in any capacity. Pykafka's BalancedConsumer, which you're using, does use Zookeeper for the storage of partition ownership information, which is used to balance consumer groups and is distinct from offset information. Thus I believe your assertion that offset information is "replicated in ZK" is not correct.
The code you've provided appears to be correct with respect to automatic offset committing, assuming that you're setting auto_commit_enable to True and auto_commit_interval_ms to a reasonable number of milliseconds. If that's the case, it's hard to debug further without additional code. You mention that "the lag for that consumer group just grows unbounded" - what do you mean by that? More information about exactly how you're checking for committed offsets would be helpful in fixing your issue.

@Atheuz
Copy link
Author

Atheuz commented Nov 18, 2018

@emmett9001 my mistake. The problem I was having was that I could not get it to report offset lag as I was expecting it to, I changed over to the method used in kafka_tools.py and that way I was getting the correct offset lag - so more precisely the problem was that the offsets were not being stored in Zookeeper where I was expecting them to be stored.

I still get the weird b literal in in the pykafka logs, and I'm not sure what to make of it, but it seems to be consuming/producing messages properly.

This is not a 'bug' for me any longer, I just had to fully grok what pykafka was actually doing.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

2 participants