Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -779,10 +779,7 @@ public CoordinatorResult<DeleteShareGroupOffsetsResponseData, CoordinatorRecord>
}

/**
* Make the following checks to make sure the AlterShareGroupOffsetsRequest request is valid:
* 1. Checks whether the provided group is empty
* 2. Checks the requested topics are presented in the metadataImage
* 3. Checks the corresponding share partitions in AlterShareGroupOffsetsRequest are existing
* Alters the offsets for a share group.
*
* @param groupId - The group ID
* @param alterShareGroupOffsetsRequestData - The request data for AlterShareGroupOffsetsRequestData
Expand All @@ -793,19 +790,7 @@ public CoordinatorResult<Map.Entry<AlterShareGroupOffsetsResponseData, Initializ
String groupId,
AlterShareGroupOffsetsRequestData alterShareGroupOffsetsRequestData
) {
List<CoordinatorRecord> records = new ArrayList<>();
ShareGroup group = groupMetadataManager.shareGroup(groupId);
group.validateOffsetsAlterable();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the method validateOffsetsAlterable() could be removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did :)


Map.Entry<AlterShareGroupOffsetsResponseData, InitializeShareGroupStateParameters> response = groupMetadataManager.completeAlterShareGroupOffsets(
groupId,
alterShareGroupOffsetsRequestData,
records
);
return new CoordinatorResult<>(
records,
response
);
return groupMetadataManager.alterShareGroupOffsets(groupId, alterShareGroupOffsetsRequestData.topics());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.common.errors.FencedMemberEpochException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
import org.apache.kafka.common.errors.InvalidRequestException;
Expand Down Expand Up @@ -1513,6 +1514,21 @@ private void throwIfShareGroupIsFull(
}
}

/**
* Checks whether the share group is empty.
*
* @param group The share group.
*
* @throws GroupNotEmptyException if the group is not empty.
*/
private void throwIfShareGroupIsNotEmpty(
ShareGroup group
) throws GroupNotEmptyException {
if (group.numMembers() > 0) {
throw new GroupNotEmptyException(Errors.NON_EMPTY_GROUP.message());
}
}

/**
* Validates the member epoch provided in the heartbeat request.
*
Expand Down Expand Up @@ -8269,19 +8285,37 @@ public List<DeleteShareGroupStateRequestData.DeleteStateData> sharePartitionsEli
return deleteShareGroupStateRequestTopicsData;
}

public Map.Entry<AlterShareGroupOffsetsResponseData, InitializeShareGroupStateParameters> completeAlterShareGroupOffsets(
/**
* Handles an AlterShareGroupOffsets request.
*
* Make the following checks to make sure the AlterShareGroupOffsetsRequest request is valid:
* 1. Checks whether the provided group is empty
* 2. Checks the requested topics are presented in the metadataImage
* 3. Checks the corresponding share partitions in AlterShareGroupOffsetsRequest are existing
*
* @param groupId The group id from the request.
* @param topics The topic information for altering the share group's offsets from the request.
*
* @return A Result containing a pair of ShareGroupHeartbeat response and maybe InitializeShareGroupStateParameters
* and a list of records to update the state machine.
*/
public CoordinatorResult<Map.Entry<AlterShareGroupOffsetsResponseData, InitializeShareGroupStateParameters>, CoordinatorRecord> alterShareGroupOffsets(
String groupId,
AlterShareGroupOffsetsRequestData alterShareGroupOffsetsRequest,
List<CoordinatorRecord> records
) {
AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection topics
) throws ApiException {
final long currentTimeMs = time.milliseconds();
Group group = groups.get(groupId);
final List<CoordinatorRecord> records = new ArrayList<>();

// Get or create the share group. If the group exists, check that it's empty. If it is created, it is empty.
final ShareGroup group = getOrMaybeCreateShareGroup(groupId, true);
throwIfShareGroupIsNotEmpty(group);

AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection alterShareGroupOffsetsResponseTopics = new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection();

Map<Uuid, InitMapValue> initializingTopics = new HashMap<>();
Map<Uuid, Map<Integer, Long>> offsetByTopicPartitions = new HashMap<>();

alterShareGroupOffsetsRequest.topics().forEach(topic -> {
topics.forEach(topic -> {
Optional<CoordinatorMetadataImage.TopicMetadata> topicMetadataOpt = metadataImage.topicMetadata(topic.topicName());
if (topicMetadataOpt.isPresent()) {
var topicMetadata = topicMetadataOpt.get();
Expand Down Expand Up @@ -8335,10 +8369,13 @@ public Map.Entry<AlterShareGroupOffsetsResponseData, InitializeShareGroupStatePa
});

addInitializingTopicsRecords(groupId, records, initializingTopics);
return Map.entry(
new AlterShareGroupOffsetsResponseData()
.setResponses(alterShareGroupOffsetsResponseTopics),
buildInitializeShareGroupState(groupId, ((ShareGroup) group).groupEpoch(), offsetByTopicPartitions)
return new CoordinatorResult<>(
records,
Map.entry(
new AlterShareGroupOffsetsResponseData()
.setResponses(alterShareGroupOffsetsResponseTopics),
buildInitializeShareGroupState(groupId, group.groupEpoch(), offsetByTopicPartitions)
)
);
}

Expand Down Expand Up @@ -8401,7 +8438,7 @@ public CoordinatorResult<Void, CoordinatorRecord> maybeCleanupShareGroupState(
return new CoordinatorResult<>(records);
}

/*
/**
* Returns a list of {@link DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic} corresponding to the
* topics for which persister delete share group state request was successful
* @param groupId group ID of the share group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,6 @@ public void validateDeleteGroup() throws ApiException {
validateEmptyGroup();
}

public void validateOffsetsAlterable() throws ApiException {
validateEmptyGroup();
}

public void validateEmptyGroup() {
if (state() != ShareGroupState.EMPTY) {
throw Errors.NON_EMPTY_GROUP.exception();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Utils;
Expand Down Expand Up @@ -385,10 +386,25 @@ void resetOffsets() {
if (!(GroupState.EMPTY.equals(shareGroupDescription.groupState()) || GroupState.DEAD.equals(shareGroupDescription.groupState()))) {
CommandLineUtils.printErrorAndExit(String.format("Share group '%s' is not empty.", groupId));
}
Map<TopicPartition, OffsetAndMetadata> offsetsToReset = prepareOffsetsToReset(groupId);
if (offsetsToReset == null) {
return;
resetOffsetsForInactiveGroup(groupId);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof GroupIdNotFoundException) {
resetOffsetsForInactiveGroup(groupId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering, why is there a retry mechanism when GroupIdNotFoundException is thrown? Is this for better reliability? I see ConsumerGroupCommand also does this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR aims to align the behavior between consumer groups and shared consumer groups.

For regular consumer groups, we already have similar logic — if the group doesn’t exist, it will be automatically created. see

log.info("[GroupId {}] Creating a simple consumer group via manual offset commit.", request.groupId());
group = groupMetadataManager.getOrMaybeCreateClassicGroup(request.groupId(), true);

In this PR, the shared consumer group now follows the same behavior: https://github.com/apache/kafka/pull/20708/files#diff-00f0f81cf13e66781777d94f7d2e68a581663385c37e98792507f2294c91bb09R8309-R8310

Back to your question, I think your understanding is correct. This is something like a retry mechanism: if describeShareGroups throws a GroupIdNotFoundException, then will call resetOffsetsForInactiveGroup again, which will implicitly create the shared group.

Perhaps Andrew can elaborate more 😃

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because if you do the operation on a non-existent group, it creates the group (which is by definition empty) and then resets the offsets. I know this is a bit weird, but if you want to initialise the group in a particular state before starting any consumers, this is how you would do it.

} else if (cause instanceof KafkaException) {
CommandLineUtils.printErrorAndExit(cause.getMessage());
} else {
throw new RuntimeException(cause);
}
}
}

private void resetOffsetsForInactiveGroup(String groupId) {
try {
Collection<TopicPartition> partitionsToReset = getPartitionsToReset(groupId);
Map<TopicPartition, OffsetAndMetadata> offsetsToReset = prepareOffsetsToReset(groupId, partitionsToReset);
boolean dryRun = opts.options.has(opts.dryRunOpt) || !opts.options.has(opts.executeOpt);
if (!dryRun) {
adminClient.alterShareGroupOffsets(groupId,
Expand All @@ -404,24 +420,28 @@ void resetOffsets() {
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof KafkaException) {
CommandLineUtils.printErrorAndExit(cause.getMessage());
throw (KafkaException) cause;
} else {
throw new RuntimeException(cause);
}
}
}

protected Map<TopicPartition, OffsetAndMetadata> prepareOffsetsToReset(String groupId) throws ExecutionException, InterruptedException {
Map<String, ListShareGroupOffsetsSpec> groupSpecs = Map.of(groupId, new ListShareGroupOffsetsSpec());
Map<TopicPartition, OffsetAndMetadata> offsetsByTopicPartitions = adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
private Collection<TopicPartition> getPartitionsToReset(String groupId) throws ExecutionException, InterruptedException {
Collection<TopicPartition> partitionsToReset;

if (opts.options.has(opts.topicOpt)) {
partitionsToReset = offsetsUtils.parseTopicPartitionsToReset(opts.options.valuesOf(opts.topicOpt));
} else {
Map<String, ListShareGroupOffsetsSpec> groupSpecs = Map.of(groupId, new ListShareGroupOffsetsSpec());
Map<TopicPartition, OffsetAndMetadata> offsetsByTopicPartitions = adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
partitionsToReset = offsetsByTopicPartitions.keySet();
}

return partitionsToReset;
}

private Map<TopicPartition, OffsetAndMetadata> prepareOffsetsToReset(String groupId, Collection<TopicPartition> partitionsToReset) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: now groupId become unused, could you please remove this variable? thanks!

offsetsUtils.checkAllTopicPartitionsValid(partitionsToReset);
if (opts.options.has(opts.resetToEarliestOpt)) {
return offsetsUtils.resetToEarliest(partitionsToReset);
Expand Down
Loading