diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index c8e26445922f6..21f4d697ad408 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -2934,7 +2934,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val testTopicName = "test_topic" val testGroupId = "test_group_id" val testClientId = "test_client_id" - val fakeGroupId = "fake_group_id" + val nonexistentGroupId = "nonexistent_group_id" val fakeTopicName = "foo" val tp1 = new TopicPartition(testTopicName, 0) @@ -2968,12 +2968,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertFutureThrows(classOf[GroupNotEmptyException], offsetAlterResult.partitionResult(tp1)) assertFutureThrows(classOf[GroupNotEmptyException], offsetAlterResult.partitionResult(tp2)) - // Test the fake group ID - val fakeAlterResult = client.alterShareGroupOffsets(fakeGroupId, util.Map.of(tp1, 0, tp2, 0)) + // Test the non-existent group ID + val nonexistentAlterResult = client.alterShareGroupOffsets(nonexistentGroupId, util.Map.of(tp1, 0, tp2, 0)) - assertFutureThrows(classOf[GroupIdNotFoundException], fakeAlterResult.all()) - assertFutureThrows(classOf[GroupIdNotFoundException], fakeAlterResult.partitionResult(tp1)) - assertFutureThrows(classOf[GroupIdNotFoundException], fakeAlterResult.partitionResult(tp2)) + assertFutureThrows(classOf[UnknownTopicOrPartitionException], nonexistentAlterResult.all()) + assertNull(nonexistentAlterResult.partitionResult(tp1).get()) + assertFutureThrows(classOf[UnknownTopicOrPartitionException], nonexistentAlterResult.partitionResult(tp2)) } // Test offset alter when group is empty diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index c591f8d376768..3165d10d8f477 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -779,10 +779,7 @@ public CoordinatorResult } /** - * Make the following checks to make sure the AlterShareGroupOffsetsRequest request is valid: - * 1. Checks whether the provided group is empty - * 2. Checks the requested topics are presented in the metadataImage - * 3. Checks the corresponding share partitions in AlterShareGroupOffsetsRequest are existing + * Alters the offsets for a share group. * * @param groupId - The group ID * @param alterShareGroupOffsetsRequestData - The request data for AlterShareGroupOffsetsRequestData @@ -793,19 +790,7 @@ public CoordinatorResult records = new ArrayList<>(); - ShareGroup group = groupMetadataManager.shareGroup(groupId); - group.validateOffsetsAlterable(); - - Map.Entry response = groupMetadataManager.completeAlterShareGroupOffsets( - groupId, - alterShareGroupOffsetsRequestData, - records - ); - return new CoordinatorResult<>( - records, - response - ); + return groupMetadataManager.alterShareGroupOffsets(groupId, alterShareGroupOffsetsRequestData.topics()); } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index be9c4f9abc311..de955eb1321a4 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.errors.FencedMemberEpochException; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.IllegalGenerationException; import org.apache.kafka.common.errors.InconsistentGroupProtocolException; import org.apache.kafka.common.errors.InvalidRequestException; @@ -1513,6 +1514,21 @@ private void throwIfShareGroupIsFull( } } + /** + * Checks whether the share group is empty. + * + * @param group The share group. + * + * @throws GroupNotEmptyException if the group is not empty. + */ + private void throwIfShareGroupIsNotEmpty( + ShareGroup group + ) throws GroupNotEmptyException { + if (group.numMembers() > 0) { + throw new GroupNotEmptyException(Errors.NON_EMPTY_GROUP.message()); + } + } + /** * Validates the member epoch provided in the heartbeat request. * @@ -8269,19 +8285,37 @@ public List sharePartitionsEli return deleteShareGroupStateRequestTopicsData; } - public Map.Entry completeAlterShareGroupOffsets( + /** + * Handles an AlterShareGroupOffsets request. + * + * Make the following checks to make sure the AlterShareGroupOffsetsRequest request is valid: + * 1. Checks whether the provided group is empty + * 2. Checks the requested topics are presented in the metadataImage + * 3. Checks the corresponding share partitions in AlterShareGroupOffsetsRequest are existing + * + * @param groupId The group id from the request. + * @param topics The topic information for altering the share group's offsets from the request. + * + * @return A Result containing a pair of ShareGroupHeartbeat response and maybe InitializeShareGroupStateParameters + * and a list of records to update the state machine. + */ + public CoordinatorResult, CoordinatorRecord> alterShareGroupOffsets( String groupId, - AlterShareGroupOffsetsRequestData alterShareGroupOffsetsRequest, - List records - ) { + AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection topics + ) throws ApiException { final long currentTimeMs = time.milliseconds(); - Group group = groups.get(groupId); + final List records = new ArrayList<>(); + + // Get or create the share group. If the group exists, check that it's empty. If it is created, it is empty. + final ShareGroup group = getOrMaybeCreateShareGroup(groupId, true); + throwIfShareGroupIsNotEmpty(group); + AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection alterShareGroupOffsetsResponseTopics = new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection(); Map initializingTopics = new HashMap<>(); Map> offsetByTopicPartitions = new HashMap<>(); - alterShareGroupOffsetsRequest.topics().forEach(topic -> { + topics.forEach(topic -> { Optional topicMetadataOpt = metadataImage.topicMetadata(topic.topicName()); if (topicMetadataOpt.isPresent()) { var topicMetadata = topicMetadataOpt.get(); @@ -8335,10 +8369,13 @@ public Map.Entry( + records, + Map.entry( + new AlterShareGroupOffsetsResponseData() + .setResponses(alterShareGroupOffsetsResponseTopics), + buildInitializeShareGroupState(groupId, group.groupEpoch(), offsetByTopicPartitions) + ) ); } @@ -8401,7 +8438,7 @@ public CoordinatorResult maybeCleanupShareGroupState( return new CoordinatorResult<>(records); } - /* + /** * Returns a list of {@link DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic} corresponding to the * topics for which persister delete share group state request was successful * @param groupId group ID of the share group diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java index 8a02e941008da..7ddc1238f5fc7 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java @@ -244,10 +244,6 @@ public void validateDeleteGroup() throws ApiException { validateEmptyGroup(); } - public void validateOffsetsAlterable() throws ApiException { - validateEmptyGroup(); - } - public void validateEmptyGroup() { if (state() != ShareGroupState.EMPTY) { throw Errors.NON_EMPTY_GROUP.exception(); diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java index 50565dcc0ba02..87cf0f1e83742 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.Utils; @@ -385,10 +386,25 @@ void resetOffsets() { if (!(GroupState.EMPTY.equals(shareGroupDescription.groupState()) || GroupState.DEAD.equals(shareGroupDescription.groupState()))) { CommandLineUtils.printErrorAndExit(String.format("Share group '%s' is not empty.", groupId)); } - Map offsetsToReset = prepareOffsetsToReset(groupId); - if (offsetsToReset == null) { - return; + resetOffsetsForInactiveGroup(groupId); + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } catch (ExecutionException ee) { + Throwable cause = ee.getCause(); + if (cause instanceof GroupIdNotFoundException) { + resetOffsetsForInactiveGroup(groupId); + } else if (cause instanceof KafkaException) { + CommandLineUtils.printErrorAndExit(cause.getMessage()); + } else { + throw new RuntimeException(cause); } + } + } + + private void resetOffsetsForInactiveGroup(String groupId) { + try { + Collection partitionsToReset = getPartitionsToReset(groupId); + Map offsetsToReset = prepareOffsetsToReset(partitionsToReset); boolean dryRun = opts.options.has(opts.dryRunOpt) || !opts.options.has(opts.executeOpt); if (!dryRun) { adminClient.alterShareGroupOffsets(groupId, @@ -404,24 +420,28 @@ void resetOffsets() { } catch (ExecutionException ee) { Throwable cause = ee.getCause(); if (cause instanceof KafkaException) { - CommandLineUtils.printErrorAndExit(cause.getMessage()); + throw (KafkaException) cause; } else { throw new RuntimeException(cause); } } } - protected Map prepareOffsetsToReset(String groupId) throws ExecutionException, InterruptedException { - Map groupSpecs = Map.of(groupId, new ListShareGroupOffsetsSpec()); - Map offsetsByTopicPartitions = adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId); + private Collection getPartitionsToReset(String groupId) throws ExecutionException, InterruptedException { Collection partitionsToReset; if (opts.options.has(opts.topicOpt)) { partitionsToReset = offsetsUtils.parseTopicPartitionsToReset(opts.options.valuesOf(opts.topicOpt)); } else { + Map groupSpecs = Map.of(groupId, new ListShareGroupOffsetsSpec()); + Map offsetsByTopicPartitions = adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId); partitionsToReset = offsetsByTopicPartitions.keySet(); } + return partitionsToReset; + } + + private Map prepareOffsetsToReset(Collection partitionsToReset) { offsetsUtils.checkAllTopicPartitionsValid(partitionsToReset); if (opts.options.has(opts.resetToEarliestOpt)) { return offsetsUtils.resetToEarliest(partitionsToReset); diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java index 3b1843b95cd17..29fe151ba2b7b 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java @@ -1313,7 +1313,7 @@ public void testAlterShareGroupOffsetsFailureWithoutTopic() { } @Test - public void testAlterShareGroupOffsetsFailureWithNoneEmptyGroup() { + public void testAlterShareGroupOffsetsFailureWithNonEmptyGroup() { String group = "share-group"; String topic = "topic"; String bootstrapServer = "localhost:9092"; @@ -1418,6 +1418,50 @@ topic, new TopicDescription(topic, false, List.of( } } + @Test + public void testAlterShareGroupNonExistentGroupSuccess() { + String group = "share-group"; + String topic = "none"; + String bootstrapServer = "localhost:9092"; + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--reset-offsets", "--to-earliest", "--execute", "--topic", topic, "--group", group}; + Admin adminClient = mock(KafkaAdminClient.class); + + ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( + Map.of( + group, + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10L))) + ) + ); + when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult); + + AlterShareGroupOffsetsResult alterShareGroupOffsetsResult = mockAlterShareGroupOffsets(adminClient, group); + TopicPartition tp0 = new TopicPartition(topic, 0); + Map partitionOffsets = Map.of(tp0, new OffsetAndMetadata(0L)); + ListOffsetsResult listOffsetsResult = AdminClientTestUtils.createListOffsetsResult(partitionOffsets); + when(adminClient.listOffsets(any(), any(ListOffsetsOptions.class))).thenReturn(listOffsetsResult); + + KafkaFutureImpl missingGroupFuture = new KafkaFutureImpl<>(); + missingGroupFuture.completeExceptionally(new GroupIdNotFoundException("Group " + group + " not found.")); + DescribeShareGroupsResult describeShareGroupsResult = mock(DescribeShareGroupsResult.class); + when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(group, missingGroupFuture)); + when(adminClient.describeShareGroups(any(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult); + Map descriptions = Map.of( + topic, new TopicDescription(topic, false, List.of( + new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of()) + ))); + DescribeTopicsResult describeTopicResult = mock(DescribeTopicsResult.class); + when(describeTopicResult.allTopicNames()).thenReturn(completedFuture(descriptions)); + when(adminClient.describeTopics(anyCollection())).thenReturn(describeTopicResult); + when(adminClient.describeTopics(anyCollection(), any(DescribeTopicsOptions.class))).thenReturn(describeTopicResult); + try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) { + service.resetOffsets(); + verify(adminClient).alterShareGroupOffsets(eq(group), anyMap()); + verify(adminClient).describeTopics(anyCollection(), any(DescribeTopicsOptions.class)); + verify(alterShareGroupOffsetsResult, times(1)).all(); + verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class)); + } + } + private AlterShareGroupOffsetsResult mockAlterShareGroupOffsets(Admin client, String groupId) { AlterShareGroupOffsetsResult alterShareGroupOffsetsResult = mock(AlterShareGroupOffsetsResult.class); KafkaFutureImpl resultFuture = new KafkaFutureImpl<>();