Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6c4fba4
fix indentation
lucliu1108 Jul 25, 2025
c272552
clean up test
lucliu1108 Jul 25, 2025
503483c
add depency
lucliu1108 Jul 26, 2025
71e0ab0
add deleteStreamsGroup integration test
lucliu1108 Jul 28, 2025
6524912
remove redundant code
lucliu1108 Jul 28, 2025
d923d01
remove parts that shouldn't be included
lucliu1108 Jul 28, 2025
4196434
cleanup spacing
lucliu1108 Jul 28, 2025
a3da136
Merge branch 'trunk' into KSTREAMS-7247
lucliu1108 Jul 28, 2025
316a417
reorganize code
lucliu1108 Jul 29, 2025
a50ed7e
reorganize code
lucliu1108 Jul 29, 2025
ab67b5b
Merge branch 'trunk' into KSTREAMS-7247
lucliu1108 Aug 4, 2025
ad59b00
revise describeStreamsGroup wait until stable
lucliu1108 Aug 4, 2025
03f64e4
add integration test for listgroups for streams protocol
lucliu1108 Aug 5, 2025
f33fddb
move list streams group test into existing listGroups test
lucliu1108 Aug 5, 2025
885e626
Merge branch 'trunk' into KSTREAMS-7247
lucliu1108 Aug 5, 2025
d9246de
Merge branch 'trunk' into KSTREAMS-7247
lucliu1108 Aug 26, 2025
069a406
modify tests
lucliu1108 Aug 26, 2025
1e9a87a
Merge branch 'trunk' into KSTREAMS-7247
lucliu1108 Sep 1, 2025
ac83e37
revise streamsgroup creation
lucliu1108 Sep 1, 2025
7e6afe0
remove unncessary gradle dependencies
lucliu1108 Sep 1, 2025
7f75f03
move consumer poll to test
lucliu1108 Sep 1, 2025
6464811
reformat code
lucliu1108 Sep 2, 2025
926e7e4
reformat code
lucliu1108 Sep 2, 2025
00f271a
reformat code
lucliu1108 Sep 2, 2025
c6a42bd
remove redundant consumer config for tests
lucliu1108 Sep 2, 2025
a1f1e88
add back file last changeline
lucliu1108 Sep 2, 2025
aed6c1d
modify code based on copilot suggestions
lucliu1108 Sep 2, 2025
04433c7
fix spacing
lucliu1108 Sep 2, 2025
be27bd8
move topic creation inside try block
lucliu1108 Sep 3, 2025
b996f95
Apply suggestions from code review
lucasbru Sep 3, 2025
7b036a7
Apply suggestions from code review
lucasbru Sep 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,26 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsume
import kafka.utils.TestUtils
import kafka.utils.Implicits._

import java.util.{Optional, Properties}
import java.util
import java.util.{Optional, Properties, UUID}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import kafka.server.KafkaConfig
import kafka.integration.KafkaServerTestHarness
import kafka.security.JaasTestUtils
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.clients.consumer.internals.{AsyncKafkaConsumer, StreamsRebalanceData}
import org.apache.kafka.clients.consumer.internals.{AsyncKafkaConsumer, StreamsRebalanceData, StreamsRebalanceListener}
import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.MetadataLogConfig
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs}

import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}

import scala.collection.mutable
import scala.collection.Seq
import scala.jdk.CollectionConverters._
import scala.jdk.javaapi.OptionConverters

/**
Expand All @@ -56,6 +59,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
val superuserClientConfig = new Properties
val serverConfig = new Properties
val controllerConfig = new Properties
var streamsGroupConfig = new Properties

private val consumers = mutable.Buffer[Consumer[_, _]]()
private val shareConsumers = mutable.Buffer[ShareConsumer[_, _]]()
Expand Down Expand Up @@ -152,12 +156,12 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
shareConsumerConfig.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group")
shareConsumerConfig.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
shareConsumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)

streamsConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
streamsConsumerConfig.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group")
streamsConsumerConfig.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
streamsConsumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)

adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())

doSuperuserSetup(testInfo)
Expand Down Expand Up @@ -235,6 +239,52 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
streamsConsumer
}

def createStreamsGroup[K, V](configOverrides: Properties = new Properties,
configsToRemove: List[String] = List(),
inputTopic: String,
streamsGroupId: String): AsyncKafkaConsumer[K, V] = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
props.put(ConsumerConfig.GROUP_ID_CONFIG, streamsGroupId)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
props ++= configOverrides
configsToRemove.foreach(props.remove(_))

val streamsRebalanceData = new StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
util.Map.of(
"subtopology-0", new StreamsRebalanceData.Subtopology(
util.Set.of(inputTopic),
util.Set.of(),
util.Map.of(),
util.Map.of(inputTopic + "-store-changelog", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), util.Map.of())),
util.Set.of()
)),
Map.empty[String, String].asJava
)

val consumer = createStreamsConsumer(
keyDeserializer = new ByteArrayDeserializer().asInstanceOf[Deserializer[K]],
valueDeserializer = new ByteArrayDeserializer().asInstanceOf[Deserializer[V]],
configOverrides = props,
streamsRebalanceData = streamsRebalanceData
)
consumer.subscribe(util.Set.of(inputTopic),
new StreamsRebalanceListener {
override def onTasksRevoked(tasks: util.Set[StreamsRebalanceData.TaskId]): Optional[Exception] =
Optional.empty()
override def onTasksAssigned(assignment: StreamsRebalanceData.Assignment): Optional[Exception] = {
Optional.empty()
}
override def onAllTasksLost(): Optional[Exception] =
Optional.empty()
})
consumer
}

def createAdminClient(
listenerName: ListenerName = listenerName,
configOverrides: Properties = new Properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.kafka.clients.HostResolver
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer
import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, GroupProtocol, KafkaConsumer, OffsetAndMetadata, ShareConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
Expand Down Expand Up @@ -2573,8 +2574,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val consumerGroupId = "consumer_group_id"
val shareGroupId = "share_group_id"
val simpleGroupId = "simple_group_id"
val streamsGroupId = "streams_group_id"
val testTopicName = "test_topic"

consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name)
val classicGroupConfig = new Properties(consumerConfig)
classicGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, classicGroupId)
Expand All @@ -2589,13 +2590,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
shareGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroupId)
val shareGroup = createShareConsumer(configOverrides = shareGroupConfig)

val streamsGroup = createStreamsGroup(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer may create testTopicName due to a metadata update, causing the subsequent client.createTopics call to fail since the topic already exists.

see https://github.com/apache/kafka/actions/runs/17495296037/job/49695550299?pr=20472

@lucliu1108 WDYT? Perhaps we could create the topic first?

Copy link
Contributor Author

@lucliu1108 lucliu1108 Sep 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Thanks for pointing out! I'll make a patch for that @chia7712

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Patch created: #20496. Thanks! @chia7712

inputTopic = testTopicName,
streamsGroupId = streamsGroupId
)

val config = createConfig
client = Admin.create(config)
try {
client.createTopics(util.Set.of(
new NewTopic(testTopicName, 1, 1.toShort)
)).all().get()
waitForTopics(client, List(testTopicName), List())
val topicPartition = new TopicPartition(testTopicName, 0)

classicGroup.subscribe(util.Set.of(testTopicName))
Expand All @@ -2604,6 +2606,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
consumerGroup.poll(JDuration.ofMillis(1000))
shareGroup.subscribe(util.Set.of(testTopicName))
shareGroup.poll(JDuration.ofMillis(1000))
streamsGroup.poll(JDuration.ofMillis(1000))

val alterConsumerGroupOffsetsResult = client.alterConsumerGroupOffsets(simpleGroupId,
util.Map.of(topicPartition, new OffsetAndMetadata(0L)))
Expand All @@ -2612,18 +2615,27 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {

TestUtils.waitUntilTrue(() => {
val groups = client.listGroups().all().get()
groups.size() == 4
groups.size() == 5
}, "Expected to find all groups")

val classicGroupListing = new GroupListing(classicGroupId, Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE))
val consumerGroupListing = new GroupListing(consumerGroupId, Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE))
val shareGroupListing = new GroupListing(shareGroupId, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE))
val simpleGroupListing = new GroupListing(simpleGroupId, Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.EMPTY))
// Streams group could either be in STABLE or NOT_READY state
val streamsGroupListingStable = new GroupListing(streamsGroupId, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE))
val streamsGroupListingNotReady = new GroupListing(streamsGroupId, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.NOT_READY))

var listGroupsResult = client.listGroups()
assertTrue(listGroupsResult.errors().get().isEmpty)
assertEquals(Set(classicGroupListing, simpleGroupListing, consumerGroupListing, shareGroupListing), listGroupsResult.all().get().asScala.toSet)
assertEquals(Set(classicGroupListing, simpleGroupListing, consumerGroupListing, shareGroupListing), listGroupsResult.valid().get().asScala.toSet)

val expectedStreamListings = Set(streamsGroupListingStable, streamsGroupListingNotReady)
val expectedListings = Set(classicGroupListing, simpleGroupListing, consumerGroupListing, shareGroupListing)
val actualListings = listGroupsResult.all().get().asScala.toSet

// Check that actualListings contains all expectedListings and one of the streams listings
assertTrue(expectedListings.subsetOf(actualListings))
assertTrue(actualListings.exists(expectedStreamListings.contains))

listGroupsResult = client.listGroups(new ListGroupsOptions().withTypes(util.Set.of(GroupType.CLASSIC)))
assertTrue(listGroupsResult.errors().get().isEmpty)
Expand All @@ -2639,11 +2651,20 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertTrue(listGroupsResult.errors().get().isEmpty)
assertEquals(Set(shareGroupListing), listGroupsResult.all().get().asScala.toSet)
assertEquals(Set(shareGroupListing), listGroupsResult.valid().get().asScala.toSet)

listGroupsResult = client.listGroups(new ListGroupsOptions().withTypes(util.Set.of(GroupType.STREAMS)))
assertTrue(listGroupsResult.errors().get().isEmpty)
assertTrue(listGroupsResult.all().get().asScala.toSet.equals(Set(streamsGroupListingStable)) ||
listGroupsResult.all().get().asScala.toSet.equals(Set(streamsGroupListingNotReady)))
assertTrue(listGroupsResult.valid().get().asScala.toSet.equals(Set(streamsGroupListingStable)) ||
listGroupsResult.valid().get().asScala.toSet.equals(Set(streamsGroupListingNotReady)))

} finally {
Utils.closeQuietly(classicGroup, "classicGroup")
Utils.closeQuietly(consumerGroup, "consumerGroup")
Utils.closeQuietly(shareGroup, "shareGroup")
Utils.closeQuietly(client, "adminClient")
Utils.closeQuietly(streamsGroup, "streamsGroup")
}
}

Expand Down Expand Up @@ -3603,7 +3624,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
client.incrementalAlterConfigs(util.Map.of(broker0Resource,
util.List.of(new AlterConfigOp(new ConfigEntry(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, "123"),
AlterConfigOp.OpType.SET),
AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, "456"),
AlterConfigOp.OpType.SET)
))).all().get()
Expand Down Expand Up @@ -3809,7 +3830,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val longTopicName = String.join("", Collections.nCopies(249, "x"))
val invalidTopicName = String.join("", Collections.nCopies(250, "x"))
val newTopics2 = util.List.of(new NewTopic(invalidTopicName, 3, 3.toShort),
new NewTopic(longTopicName, 3, 3.toShort))
new NewTopic(longTopicName, 3, 3.toShort))
val results = client.createTopics(newTopics2).values()
assertTrue(results.containsKey(longTopicName))
results.get(longTopicName).get()
Expand Down Expand Up @@ -4363,6 +4384,136 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
}
}

@Test
def testDescribeStreamsGroups(): Unit = {
val streamsGroupId = "stream_group_id"
val testTopicName = "test_topic"
val testNumPartitions = 1

val config = createConfig
client = Admin.create(config)

prepareTopics(List(testTopicName), testNumPartitions)
prepareRecords(testTopicName)

val streams = createStreamsGroup(
inputTopic = testTopicName,
streamsGroupId = streamsGroupId
)
streams.poll(JDuration.ofMillis(500L))

try {
TestUtils.waitUntilTrue(() => {
val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null)
firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId
}, "Stream group not stable yet")

// Verify the describe call works correctly
val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
val group = describedGroups.get(streamsGroupId)
assertNotNull(group)
assertEquals(streamsGroupId, group.groupId())
assertFalse(group.members().isEmpty)
assertNotNull(group.subtopologies())
assertFalse(group.subtopologies().isEmpty)

// Verify the topology contains the expected source and sink topics
val subtopologies = group.subtopologies().asScala
assertTrue(subtopologies.exists(subtopology =>
subtopology.sourceTopics().contains(testTopicName)))

// Test describing a non-existing group
val nonExistingGroup = "non_existing_stream_group"
val describedNonExistingGroupResponse = client.describeStreamsGroups(util.List.of(nonExistingGroup))
assertFutureThrows(classOf[GroupIdNotFoundException], describedNonExistingGroupResponse.all())

} finally {
Utils.closeQuietly(streams, "streams")
Utils.closeQuietly(client, "adminClient")
}
}

@Test
def testDeleteStreamsGroups(): Unit = {
val testTopicName = "test_topic"
val testNumPartitions = 3
val testNumStreamsGroup = 3

val targetDeletedGroups = util.List.of("stream_group_id_2", "stream_group_id_3")
val targetRemainingGroups = util.List.of("stream_group_id_1")

val config = createConfig
client = Admin.create(config)

prepareTopics(List(testTopicName), testNumPartitions)
prepareRecords(testTopicName)

val streamsList = scala.collection.mutable.ListBuffer[(String, AsyncKafkaConsumer[_,_])]()

try {
for (i <- 1 to testNumStreamsGroup) {
val streamsGroupId = s"stream_group_id_$i"

val streams = createStreamsGroup(
inputTopic = testTopicName,
streamsGroupId = streamsGroupId,
)
streams.poll(JDuration.ofMillis(500L))
streamsList += ((streamsGroupId, streams))
}

TestUtils.waitUntilTrue(() => {
val groups = client.listGroups().all().get()
groups.stream()
.anyMatch(g => g.groupId().startsWith("stream_group_id_")) && testNumStreamsGroup == groups.size()
}, "Streams groups not ready to delete yet")

// Test deletion of non-empty existing groups
var deleteStreamsGroupResult = client.deleteStreamsGroups(targetDeletedGroups)
assertFutureThrows(classOf[GroupNotEmptyException], deleteStreamsGroupResult.all())
assertEquals(2, deleteStreamsGroupResult.deletedGroups().size())

// Stop and clean up the streams for the groups that are going to be deleted
streamsList
.filter { case (groupId, _) => targetDeletedGroups.contains(groupId) }
.foreach { case (_, streams) =>
streams.close()
}

val listTopicResult = client.listTopics()
assertEquals(2, listTopicResult.names().get().size())

// Test deletion of emptied existing streams groups
deleteStreamsGroupResult = client.deleteStreamsGroups(targetDeletedGroups)
assertEquals(2, deleteStreamsGroupResult.deletedGroups().size())

// Wait for the deleted groups to be removed
TestUtils.waitUntilTrue(() => {
val groupIds = client.listGroups().all().get().asScala.map(_.groupId()).toSet
targetDeletedGroups.asScala.forall(id => !groupIds.contains(id))
}, "Deleted groups not yet deleted")

// Verify that the deleted groups are no longer present
val remainingGroups = client.listGroups().all().get()
assertEquals(targetRemainingGroups.size(), remainingGroups.size())
remainingGroups.stream().forEach(g => {
assertTrue(targetRemainingGroups.contains(g.groupId()))
})

// Test deletion of a non-existing group
val nonExistingGroup = "non_existing_stream_group"
val deleteNonExistingGroupResult = client.deleteStreamsGroups(util.List.of(nonExistingGroup))
assertFutureThrows(classOf[GroupIdNotFoundException], deleteNonExistingGroupResult.all())
assertEquals(deleteNonExistingGroupResult.deletedGroups().size(), 1)

} finally{
streamsList.foreach { case (_, streams) =>
streams.close()
}
Utils.closeQuietly(client, "adminClient")
}
}
}

object PlaintextAdminIntegrationTest {
Expand Down