Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4463,6 +4463,47 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
}

@Test
def testDescribeStreamsGroupsNotReady(): Unit = {
val streamsGroupId = "stream_group_id"
val testTopicName = "test_topic"

val config = createConfig
client = Admin.create(config)

val streams = createStreamsGroup(
inputTopic = testTopicName,
streamsGroupId = streamsGroupId
)
streams.poll(JDuration.ofMillis(500L))

try {
TestUtils.waitUntilTrue(() => {
val firstGroup = client.listGroups().all().get().stream()
.filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null)
firstGroup.groupState().orElse(null) == GroupState.NOT_READY && firstGroup.groupId() == streamsGroupId
}, "Stream group not NOT_READY yet")

// Verify the describe call works correctly
val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
val group = describedGroups.get(streamsGroupId)
assertNotNull(group)
assertEquals(streamsGroupId, group.groupId())
assertFalse(group.members().isEmpty)
assertNotNull(group.subtopologies())
assertFalse(group.subtopologies().isEmpty)

// Verify the topology contains the expected source and sink topics
val subtopologies = group.subtopologies().asScala
assertTrue(subtopologies.exists(subtopology =>
subtopology.sourceTopics().contains(testTopicName)))

} finally {
Utils.closeQuietly(streams, "streams")
Utils.closeQuietly(client, "adminClient")
}
}

@Test
def testDeleteStreamsGroups(): Unit = {
val testTopicName = "test_topic"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,16 @@ public StreamsGroupDescribeResponseData.DescribedGroup asDescribedGroup(
.setGroupEpoch(groupEpoch.get(committedOffset))
.setGroupState(state.get(committedOffset).toString())
.setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset))
.setTopology(configuredTopology.get(committedOffset).map(ConfiguredTopology::asStreamsGroupDescribeTopology).orElse(null));
.setTopology(
configuredTopology.get(committedOffset)
.filter(ConfiguredTopology::isReady)
.map(ConfiguredTopology::asStreamsGroupDescribeTopology)
.orElse(
topology.get(committedOffset)
.map(StreamsTopology::asStreamsGroupDescribeTopology)
.orElseThrow(() -> new IllegalStateException("There should always be a topology for a streams group."))
)
);
members.entrySet(committedOffset).forEach(
entry -> describedGroup.members().add(
entry.getValue().asStreamsGroupDescribeMember(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
*/
package org.apache.kafka.coordinator.group.streams;

import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;

import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -95,4 +97,43 @@ public static StreamsTopology fromHeartbeatRequest(StreamsGroupHeartbeatRequestD
.collect(Collectors.toMap(StreamsGroupTopologyValue.Subtopology::subtopologyId, x -> x));
return new StreamsTopology(topology.epoch(), subtopologyMap);
}

public StreamsGroupDescribeResponseData.Topology asStreamsGroupDescribeTopology() {
return new StreamsGroupDescribeResponseData.Topology()
.setEpoch(topologyEpoch)
.setSubtopologies(
subtopologies.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(entry -> asStreamsGroupDescribeSubtopology(entry.getKey(), entry.getValue()))
.toList()
);
}

private StreamsGroupDescribeResponseData.Subtopology asStreamsGroupDescribeSubtopology(String subtopologyId, StreamsGroupTopologyValue.Subtopology subtopology) {
return new StreamsGroupDescribeResponseData.Subtopology()
.setSubtopologyId(subtopologyId)
.setSourceTopics(subtopology.sourceTopics().stream().sorted().toList())
.setRepartitionSinkTopics(subtopology.repartitionSinkTopics().stream().sorted().toList())
.setRepartitionSourceTopics(subtopology.repartitionSourceTopics().stream()
.map(this::asStreamsGroupDescribeTopicInfo)
.sorted(Comparator.comparing(StreamsGroupDescribeResponseData.TopicInfo::name)).toList())
.setStateChangelogTopics(subtopology.stateChangelogTopics().stream()
.map(this::asStreamsGroupDescribeTopicInfo)
.sorted(Comparator.comparing(StreamsGroupDescribeResponseData.TopicInfo::name)).toList());
}

private StreamsGroupDescribeResponseData.TopicInfo asStreamsGroupDescribeTopicInfo(StreamsGroupTopologyValue.TopicInfo topicInfo) {
return new StreamsGroupDescribeResponseData.TopicInfo()
.setName(topicInfo.name())
.setPartitions(topicInfo.partitions())
.setReplicationFactor(topicInfo.replicationFactor())
.setTopicConfigs(
topicInfo.topicConfigs().stream().map(
topicConfig -> new StreamsGroupDescribeResponseData.KeyValue()
.setKey(topicConfig.key())
.setValue(topicConfig.value())
).toList()
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9645,19 +9645,43 @@ public void testStreamsGroupDescribeNoErrors() {
.setProcessId("processId")
.setMemberEpoch(epoch)
.setPreviousMemberEpoch(epoch - 1);
String subtopology1 = "subtopology1";
String fooTopicName = "foo";
StreamsTopology topology = new StreamsTopology(
0,
Map.of(subtopology1,
new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId(subtopology1)
.setSourceTopics(List.of(fooTopicName))
)
);

GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(0), epoch))
.withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(0), epoch)
.withTopology(topology)
)
.withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(1), epoch)
.withMember(memberBuilder.build()))
.withMember(memberBuilder.build())
.withTopology(topology)
)
.build();

StreamsGroupDescribeResponseData.Topology expectedTopology =
new StreamsGroupDescribeResponseData.Topology()
.setEpoch(0)
.setSubtopologies(List.of(
new StreamsGroupDescribeResponseData.Subtopology()
.setSubtopologyId(subtopology1)
.setSourceTopics(List.of(fooTopicName))
));

List<StreamsGroupDescribeResponseData.DescribedGroup> expected = Arrays.asList(
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupEpoch(epoch)
.setGroupId(streamsGroupIds.get(0))
.setGroupState(StreamsGroupState.EMPTY.toString())
.setAssignmentEpoch(0),
.setAssignmentEpoch(0)
.setTopology(expectedTopology),
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupEpoch(epoch)
.setGroupId(streamsGroupIds.get(1))
Expand All @@ -9666,6 +9690,7 @@ public void testStreamsGroupDescribeNoErrors() {
TasksTuple.EMPTY
)
))
.setTopology(expectedTopology)
.setGroupState(StreamsGroupState.NOT_READY.toString())
);
List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.sendStreamsGroupDescribe(streamsGroupIds);
Expand Down Expand Up @@ -9695,13 +9720,24 @@ public void testStreamsGroupDescribeBeforeAndAfterCommittingOffset() {
String memberId1 = "memberId1";
String memberId2 = "memberId2";
String subtopologyId = "subtopology1";
String fooTopicName = "foo";
StreamsGroupTopologyValue topology = new StreamsGroupTopologyValue()
.setEpoch(0)
.setSubtopologies(
List.of(
new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId(subtopologyId)
.setSourceTopics(List.of(fooTopicName))
)
);

GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder().build();

StreamsGroupMember.Builder memberBuilder1 = streamsGroupMemberBuilderWithDefaults(memberId1);
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder1.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId, memberBuilder1.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 1, 0));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(streamsGroupId, topology));

TasksTuple assignment = new TasksTuple(
Map.of(subtopologyId, Set.of(0, 1)),
Expand Down Expand Up @@ -9733,6 +9769,17 @@ public void testStreamsGroupDescribeBeforeAndAfterCommittingOffset() {
memberBuilder1.build().asStreamsGroupDescribeMember(TasksTuple.EMPTY),
memberBuilder2.build().asStreamsGroupDescribeMember(assignment)
))
.setTopology(
new StreamsGroupDescribeResponseData.Topology()
.setEpoch(0)
.setSubtopologies(
List.of(
new StreamsGroupDescribeResponseData.Subtopology()
.setSubtopologyId(subtopologyId)
.setSourceTopics(List.of(fooTopicName))
)
)
)
.setGroupState(StreamsGroup.StreamsGroupState.NOT_READY.toString())
.setGroupEpoch(epoch + 2);
assertEquals(1, actual.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1019,4 +1019,124 @@ public void testShutdownRequestedMethods() {
streamsGroup.removeMember(memberId2);
assertEquals(Optional.empty(), streamsGroup.getShutdownRequestMemberId());
}

@Test
public void testAsDescribedGroupWithStreamsTopologyHavingSubtopologies() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-with-topology");
snapshotRegistry.idempotentCreateSnapshot(0);

// Create a topology with subtopologies
Map<String, StreamsGroupTopologyValue.Subtopology> subtopologies = Map.of(
"sub-1", new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId("sub-1")
.setSourceTopics(List.of("input-topic"))
.setRepartitionSourceTopics(List.of(
new StreamsGroupTopologyValue.TopicInfo().setName("repartition-topic")
))
.setStateChangelogTopics(List.of(
new StreamsGroupTopologyValue.TopicInfo().setName("changelog-topic")
))
);

group.setGroupEpoch(2);
group.setTopology(new StreamsTopology(2, subtopologies));
group.setTargetAssignmentEpoch(2);
group.updateMember(new StreamsGroupMember.Builder("member1")
.setMemberEpoch(2)
.setPreviousMemberEpoch(1)
.setState(MemberState.STABLE)
.setInstanceId("instance1")
.setRackId("rack1")
.setClientId("client1")
.setClientHost("host1")
.setRebalanceTimeoutMs(1000)
.setTopologyEpoch(2)
.setProcessId("process1")
.setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost("host1").setPort(9092))
.setClientTags(Map.of("tag1", "value1"))
.setAssignedTasks(new TasksTuple(Map.of(), Map.of(), Map.of()))
.setTasksPendingRevocation(new TasksTuple(Map.of(), Map.of(), Map.of()))
.build());
snapshotRegistry.idempotentCreateSnapshot(1);

StreamsGroupDescribeResponseData.DescribedGroup describedGroup = group.asDescribedGroup(1);

assertEquals("group-id-with-topology", describedGroup.groupId());
assertEquals(StreamsGroup.StreamsGroupState.NOT_READY.toString(), describedGroup.groupState());
assertEquals(2, describedGroup.groupEpoch());
assertEquals(2, describedGroup.assignmentEpoch());

// Verify topology is correctly described
assertNotNull(describedGroup.topology());
assertEquals(2, describedGroup.topology().epoch());
assertEquals(1, describedGroup.topology().subtopologies().size());

StreamsGroupDescribeResponseData.Subtopology subtopology = describedGroup.topology().subtopologies().get(0);
assertEquals("sub-1", subtopology.subtopologyId());
assertEquals(List.of("input-topic"), subtopology.sourceTopics());
assertEquals(1, subtopology.repartitionSourceTopics().size());
assertEquals("repartition-topic", subtopology.repartitionSourceTopics().get(0).name());
assertEquals(1, subtopology.stateChangelogTopics().size());
assertEquals("changelog-topic", subtopology.stateChangelogTopics().get(0).name());

assertEquals(1, describedGroup.members().size());
assertEquals("member1", describedGroup.members().get(0).memberId());
}

@Test
public void testAsDescribedGroupPrefersConfiguredTopologyOverStreamsTopology() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-configured");
snapshotRegistry.idempotentCreateSnapshot(0);

// Create both StreamsTopology and ConfiguredTopology
Map<String, StreamsGroupTopologyValue.Subtopology> subtopologies = Map.of(
"sub-1", new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId("sub-1")
.setSourceTopics(List.of("streams-topic"))
);

group.setGroupEpoch(3);
group.setTopology(new StreamsTopology(2, subtopologies));
group.setConfiguredTopology(new ConfiguredTopology(3, 0, Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
group.setTargetAssignmentEpoch(3);
snapshotRegistry.idempotentCreateSnapshot(1);

StreamsGroupDescribeResponseData.DescribedGroup describedGroup = group.asDescribedGroup(1);

// Should prefer ConfiguredTopology over StreamsTopology
assertNotNull(describedGroup.topology());
assertEquals(3, describedGroup.topology().epoch()); // ConfiguredTopology epoch
assertEquals(0, describedGroup.topology().subtopologies().size()); // Empty configured topology
}

@Test
public void testAsDescribedGroupFallbackToStreamsTopologyWhenConfiguredTopologyEmpty() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-fallback");
snapshotRegistry.idempotentCreateSnapshot(0);

// Create StreamsTopology with subtopologies
Map<String, StreamsGroupTopologyValue.Subtopology> subtopologies = Map.of(
"sub-1", new StreamsGroupTopologyValue.Subtopology()
.setSubtopologyId("sub-1")
.setSourceTopics(List.of("fallback-topic"))
);

group.setGroupEpoch(4);
group.setTopology(new StreamsTopology(4, subtopologies));
// No ConfiguredTopology set, so should fallback to StreamsTopology
group.setTargetAssignmentEpoch(4);
snapshotRegistry.idempotentCreateSnapshot(1);

StreamsGroupDescribeResponseData.DescribedGroup describedGroup = group.asDescribedGroup(1);

// Should use StreamsTopology when ConfiguredTopology is not available
assertNotNull(describedGroup.topology());
assertEquals(4, describedGroup.topology().epoch()); // StreamsTopology epoch
assertEquals(1, describedGroup.topology().subtopologies().size());
assertEquals("sub-1", describedGroup.topology().subtopologies().get(0).subtopologyId());
assertEquals(List.of("fallback-topic"), describedGroup.topology().subtopologies().get(0).sourceTopics());
}
}
Loading