-
Notifications
You must be signed in to change notification settings - Fork 14.7k
KAFKA-19730: StreamsGroupDescribe result is missing topology #20574
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ology not configured In the streams group heartbeat, we validate the topology set for the group against the topic metadata, to generate the "configured topology" which has a specific number of partitions for each topic. In streams group describe, we use the configured topology to expose this information to the user. However, we leave the topology information as null in the streams group describe response, if the topology is not configured. This triggers a null-pointer exception in the admin client implementation. Instead, we should expose the unconfigured topology when the configured topology is not available, which will still expose useful information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes an issue where StreamsGroupDescribe result was missing topology information when the topology is not configured. Instead of returning null topology, it now returns the unconfigured topology to provide useful information to users.
Key changes:
- Added fallback mechanism to use StreamsTopology when ConfiguredTopology is not ready
- Added conversion method to transform StreamsTopology to describe response format
- Enhanced test coverage for topology fallback scenarios
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
StreamsGroup.java | Modified topology retrieval logic to fallback to unconfigured topology when configured topology is not ready |
StreamsTopology.java | Added conversion method to transform topology data to describe response format |
StreamsTopologyTest.java | Added comprehensive tests for topology conversion methods |
StreamsGroupTest.java | Added tests verifying topology fallback behavior in describe operations |
PlaintextAdminIntegrationTest.scala | Added integration test to verify describe works for not-ready groups |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
Outdated
Show resolved
Hide resolved
…roup/streams/StreamsTopology.java Co-authored-by: Copilot <[email protected]>
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
Show resolved
Hide resolved
.orElse( | ||
topology.get(committedOffset) | ||
.map(StreamsTopology::asStreamsGroupDescribeTopology) | ||
.orElse(null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is orElse(null)
a valid case? Should we not always have a topology, even if it's not configured yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's a valid case in the current code base. It could only happen if something is fundamentally wrong, for example, somehow the StreamsGroupTopologyRecord was removed from the offset commit topic. But I'm not sure if I want to throw an IllegalStateException here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I'm not sure if I want to throw an IllegalStateException here.
That's basically my question. If we just return null
, and it's bug, it might be difficult to find out, where the bug originates from, while throwing an exception would provide a clear signal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, fine.
.setReplicationFactor(topicInfo.replicationFactor()) | ||
.setTopicConfigs( | ||
topicInfo.topicConfigs().stream().map( | ||
y -> new StreamsGroupDescribeResponseData.KeyValue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why y
? Can't we find a better name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mjsax Thanks for the comments, ready for re-review.
.orElse( | ||
topology.get(committedOffset) | ||
.map(StreamsTopology::asStreamsGroupDescribeTopology) | ||
.orElse(null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, fine.
.setReplicationFactor(topicInfo.replicationFactor()) | ||
.setTopicConfigs( | ||
topicInfo.topicConfigs().stream().map( | ||
y -> new StreamsGroupDescribeResponseData.KeyValue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
When toology not configured.
In the streams group heartbeat, we validate the topology set for the
group against the topic metadata, to generate the "configured topology"
which has a specific number of partitions for each topic.
In streams group describe, we use the configured topology to expose this
information to the user. However, we leave the topology information as
null in the streams group describe response, if the topology is not
configured. This triggers an IllegalStateException in the admin client
implementation.
Instead, we should expose the unconfigured topology when the configured
topology is not available, which will still expose useful information.
Reviewers: Matthias J. Sax [email protected]