Skip to content

Commit

Permalink
KAFKA-15793 Fix ZkMigrationIntegrationTest#testMigrateTopicDeletions (#…
Browse files Browse the repository at this point in the history
…17004)


Reviewers: Igor Soarez <[email protected]>, Ajit Singh <>
  • Loading branch information
mumrah authored Sep 6, 2024
1 parent a9a4a52 commit e4d108d
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 25 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/deflake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
- name: Test
timeout-minutes: 60
run: |
./gradlew --build-cache --scan --continue \
./gradlew --info --build-cache --scan --continue \
-PtestLoggingEvents=started,passed,skipped,failed \
-PignoreFailures=true -PmaxParallelForks=2 \
-Pkafka.cluster.test.repeat=${{ inputs.test-repeat }} \
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.logger.kafka=WARN
log4j.logger.org.apache.kafka=WARN
log4j.logger.kafka=INFO
log4j.logger.org.apache.kafka=INFO

# zkclient can be verbose, during debugging it is common to adjust it separately
log4j.logger.org.apache.zookeeper=WARN
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ import scala.jdk.CollectionConverters._
object ZkMigrationIntegrationTest {
def zkClustersForAllMigrationVersions(): java.util.List[ClusterConfig] = {
Seq(
MetadataVersion.IBP_3_4_IV0,
MetadataVersion.IBP_3_5_IV2,
MetadataVersion.IBP_3_6_IV2,
MetadataVersion.IBP_3_7_IV0,
Expand All @@ -92,7 +91,7 @@ object ZkMigrationIntegrationTest {
}

@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@Timeout(300)
@Timeout(600)
class ZkMigrationIntegrationTest {

val log: Logger = LoggerFactory.getLogger(classOf[ZkMigrationIntegrationTest])
Expand Down Expand Up @@ -296,13 +295,29 @@ class ZkMigrationIntegrationTest {
def testMigrateTopicDeletions(zkCluster: ClusterInstance): Unit = {
// Create some topics in ZK mode
var admin = zkCluster.createAdminClient()
val newTopics = new util.ArrayList[NewTopic]()
newTopics.add(new NewTopic("test-topic-1", 10, 3.toShort))
newTopics.add(new NewTopic("test-topic-2", 10, 3.toShort))
newTopics.add(new NewTopic("test-topic-3", 10, 3.toShort))
val createTopicResult = admin.createTopics(newTopics)
createTopicResult.all().get(300, TimeUnit.SECONDS)
admin.close()
try {
val newTopics = new util.ArrayList[NewTopic]()
newTopics.add(new NewTopic("test-topic-1", 10, 3.toShort))
newTopics.add(new NewTopic("test-topic-2", 10, 3.toShort))
newTopics.add(new NewTopic("test-topic-3", 10, 3.toShort))
val createTopicResult = admin.createTopics(newTopics)
createTopicResult.all().get(61, TimeUnit.SECONDS)
TestUtils.waitUntilTrue(() => {
val topicDescribe = admin.describeTopics(Seq("test-topic-1", "test-topic-2", "test-topic-3").asJava)
if (topicDescribe.topicNameValues() == null || topicDescribe.topicNameValues().size() < 3) {
false
} else {
topicDescribe.topicNameValues().values().stream().allMatch {
topic => topic.get(62, TimeUnit.SECONDS).partitions().stream().allMatch(part => {
part.leader() != null && part.isr().size() == 3
})
}
}
}, msg="waiting for topics to be available", waitTimeMs=303000)
} finally {
admin.close()
}

val zkClient = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient

// Bootstrap the ZK cluster ID into KRaft
Expand Down Expand Up @@ -339,7 +354,7 @@ class ZkMigrationIntegrationTest {
zkClient.createDeleteTopicPath("test-topic-3")

zkCluster.waitForReadyBrokers()
readyFuture.get(60, TimeUnit.SECONDS)
readyFuture.get(64, TimeUnit.SECONDS)

// Only continue with the test if there are some pending deletions to verify. If there are not any pending
// deletions, this will mark the test as "skipped" instead of failed.
Expand All @@ -352,12 +367,13 @@ class ZkMigrationIntegrationTest {
TestUtils.waitUntilTrue(
() => zkClient.getOrCreateMigrationState(ZkMigrationLeadershipState.EMPTY).initialZkMigrationComplete(),
"Timed out waiting for migration to complete",
30000)
65000)

// At this point, some of the topics may have been deleted by ZK controller and the rest will be
// implicitly deleted by the KRaft controller and remove from the ZK brokers as stray partitions
def topicsAllDeleted(admin: Admin): Boolean = {
val topics = admin.listTopics().names().get(60, TimeUnit.SECONDS)
val topics = admin.listTopics().names().get(66, TimeUnit.SECONDS)
log.info("Topics are {}", topics)
topics.retainAll(util.Arrays.asList(
"test-topic-1", "test-topic-2", "test-topic-3"
))
Expand All @@ -369,18 +385,19 @@ class ZkMigrationIntegrationTest {
TestUtils.waitUntilTrue(
() => topicsAllDeleted(admin),
"Timed out waiting for topics to be deleted",
30000,
307000,
1000)
log.info("Topics were deleted. Now re-creating them.")

val newTopics = new util.ArrayList[NewTopic]()
newTopics.add(new NewTopic("test-topic-1", 2, 3.toShort))
newTopics.add(new NewTopic("test-topic-2", 1, 3.toShort))
newTopics.add(new NewTopic("test-topic-3", 10, 3.toShort))
val createTopicResult = admin.createTopics(newTopics)
createTopicResult.all().get(60, TimeUnit.SECONDS)
createTopicResult.all().get(68, TimeUnit.SECONDS)

def topicsAllRecreated(admin: Admin): Boolean = {
val topics = admin.listTopics().names().get(60, TimeUnit.SECONDS)
val topics = admin.listTopics().names().get(69, TimeUnit.SECONDS)
topics.retainAll(util.Arrays.asList(
"test-topic-1", "test-topic-2", "test-topic-3"
))
Expand All @@ -391,21 +408,22 @@ class ZkMigrationIntegrationTest {
TestUtils.waitUntilTrue(
() => topicsAllRecreated(admin),
"Timed out waiting for topics to be created",
30000,
70000,
1000)

TestUtils.retry(300000) {
log.info("Topics were re-created. Now waiting for consistent topic state.")
TestUtils.retry(311000) {
// Need a retry here since topic metadata may be inconsistent between brokers
val topicDescriptions = try {
admin.describeTopics(util.Arrays.asList(
"test-topic-1", "test-topic-2", "test-topic-3"
)).topicNameValues().asScala.map { case (name, description) =>
name -> description.get(60, TimeUnit.SECONDS)
name -> description.get(72, TimeUnit.SECONDS)
}.toMap
} catch {
case e: ExecutionException if e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => Map.empty[String, TopicDescription]
case t: Throwable => fail("Error describing topics", t.getCause)
}
log.debug("Topic describe: {}", topicDescriptions);

assertEquals(2, topicDescriptions("test-topic-1").partitions().size())
assertEquals(1, topicDescriptions("test-topic-2").partitions().size())
Expand All @@ -417,14 +435,13 @@ class ZkMigrationIntegrationTest {
})
}

val absentTopics = admin.listTopics().names().get(60, TimeUnit.SECONDS).asScala
val absentTopics = admin.listTopics().names().get(73, TimeUnit.SECONDS).asScala
assertTrue(absentTopics.contains("test-topic-1"))
assertTrue(absentTopics.contains("test-topic-2"))
assertTrue(absentTopics.contains("test-topic-3"))
}

admin.close()
} finally {
admin.close()
shutdownInSequence(zkCluster, kraftCluster)
}
}
Expand Down Expand Up @@ -1153,6 +1170,7 @@ class ZkMigrationIntegrationTest {

def shutdownInSequence(zkCluster: ClusterInstance, kraftCluster: KafkaClusterTestKit): Unit = {
zkCluster.brokerIds().forEach(zkCluster.shutdownBroker(_))
kraftCluster.nonFatalFaultHandler().setIgnore(true)
kraftCluster.close()
zkCluster.stop()
}
Expand Down

0 comments on commit e4d108d

Please sign in to comment.