-
Notifications
You must be signed in to change notification settings - Fork 14.7k
KAFKA-19662: Reset share group offsets for unsubscribed topics #20708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the change! Just two small things left.
) { | ||
List<CoordinatorRecord> records = new ArrayList<>(); | ||
ShareGroup group = groupMetadataManager.shareGroup(groupId); | ||
group.validateOffsetsAlterable(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the method validateOffsetsAlterable()
could be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did :)
} catch (ExecutionException ee) { | ||
Throwable cause = ee.getCause(); | ||
if (cause instanceof GroupIdNotFoundException) { | ||
resetOffsetsForInactiveGroup(groupId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wondering, why is there a retry mechanism when GroupIdNotFoundException
is thrown? Is this for better reliability? I see ConsumerGroupCommand also does this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR aims to align the behavior between consumer groups and shared consumer groups.
For regular consumer groups, we already have similar logic — if the group doesn’t exist, it will be automatically created. see
kafka/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
Lines 465 to 466 in fd9b551
log.info("[GroupId {}] Creating a simple consumer group via manual offset commit.", request.groupId()); | |
group = groupMetadataManager.getOrMaybeCreateClassicGroup(request.groupId(), true); |
In this PR, the shared consumer group now follows the same behavior: https://github.com/apache/kafka/pull/20708/files#diff-00f0f81cf13e66781777d94f7d2e68a581663385c37e98792507f2294c91bb09R8309-R8310
Back to your question, I think your understanding is correct. This is something like a retry mechanism: if describeShareGroups throws a GroupIdNotFoundException
, then will call resetOffsetsForInactiveGroup
again, which will implicitly create the shared group.
Perhaps Andrew can elaborate more 😃
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's because if you do the operation on a non-existent group, it creates the group (which is by definition empty) and then resets the offsets. I know this is a bit weird, but if you want to initialise the group in a particular state before starting any consumers, this is how you would do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems the failed test testAlterShareGroupOffsets is related to this patch. I also wasn’t able to get it to pass in my local environment.
By the way, do you think we might need to add a test for the tools change in this PR as well?
return partitionsToReset; | ||
} | ||
|
||
private Map<TopicPartition, OffsetAndMetadata> prepareOffsetsToReset(String groupId, Collection<TopicPartition> partitionsToReset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: now groupId become unused, could you please remove this variable? thanks!
Yes, I agree that the test failure is related. I'm going to fix that test and probably add another few. The fact that so little failed is a good sign that overall this way of addressing the problem works well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
This PR allows the kafka-share-groups.sh --reset-offsets tool to be used
to set offsets for topics which are not currently subscribed in a share
group. It also works if the share group does not yet exist. This brings
the capability in line with the equivalent function in
Kafka-consumer-groups.sh. The primary purpose is to allow offsets to be
set before the share group is first used as a way of initialising in a
known state.