Skip to content
This repository was archived by the owner on Jul 31, 2023. It is now read-only.

Commit bd6b53c

Browse files
yaooqinnmaropu
authored andcommitted
[SPARK-31631][TESTS] Fix test flakiness caused by MiniKdc which throws 'address in use' BindException with retry
### What changes were proposed in this pull request? The `Kafka*Suite`s are flaky because of the Hadoop MiniKdc issue - https://issues.apache.org/jira/browse/HADOOP-12656 > Looking at MiniKdc implementation, if port is 0, the constructor use ServerSocket to find an unused port, assign the port number to the member variable port and close the ServerSocket object; later, in initKDCServer(), instantiate a TcpTransport object and bind at that port. > It appears that the port may be used in between, and then throw the exception. Related test failures are suspected, such as https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/122225/testReport/org.apache.spark.sql.kafka010/KafkaDelegationTokenSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/ ```scala [info] org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite *** ABORTED *** (15 seconds, 426 milliseconds) [info] java.net.BindException: Address already in use [info] at sun.nio.ch.Net.bind0(Native Method) [info] at sun.nio.ch.Net.bind(Net.java:433) [info] at sun.nio.ch.Net.bind(Net.java:425) [info] at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223) [info] at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) [info] at org.apache.mina.transport.socket.nio.NioSocketAcceptor.open(NioSocketAcceptor.java:198) [info] at org.apache.mina.transport.socket.nio.NioSocketAcceptor.open(NioSocketAcceptor.java:51) [info] at org.apache.mina.core.polling.AbstractPollingIoAcceptor.registerHandles(AbstractPollingIoAcceptor.java:547) [info] at org.apache.mina.core.polling.AbstractPollingIoAcceptor.access$400(AbstractPollingIoAcceptor.java:68) [info] at org.apache.mina.core.polling.AbstractPollingIoAcceptor$Acceptor.run(AbstractPollingIoAcceptor.java:422) [info] at org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:64) [info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [info] at java.lang.Thread.run(Thread.java:748) ``` After comparing the error stack trace with similar issues reported in different projects, such as https://issues.apache.org/jira/browse/KAFKA-3453 https://issues.apache.org/jira/browse/HBASE-14734 We can be sure that they are caused by the same problem issued in HADOOP-12656. In the PR, We apply the approach from HBASE first before we finally drop Hadoop 2.7.x ### Why are the changes needed? fix test flakiness ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? the test itself passing Jenkins Closes apache#28442 from yaooqinn/SPARK-31631. Authored-by: Kent Yao <[email protected]> Signed-off-by: Takeshi Yamamuro <[email protected]>
1 parent 052ff49 commit bd6b53c

File tree

2 files changed

+54
-5
lines changed

2 files changed

+54
-5
lines changed

core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala

+28-2
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,14 @@ package org.apache.spark.deploy.security
1919

2020
import java.security.PrivilegedExceptionAction
2121

22+
import scala.util.control.NonFatal
23+
2224
import org.apache.hadoop.conf.Configuration
2325
import org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION
2426
import org.apache.hadoop.minikdc.MiniKdc
2527
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
28+
import org.scalatest.concurrent.Eventually._
29+
import org.scalatest.time.SpanSugar._
2630

2731
import org.apache.spark.{SparkConf, SparkFunSuite}
2832
import org.apache.spark.deploy.SparkHadoopUtil
@@ -88,8 +92,30 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite {
8892
// krb5.conf. MiniKdc sets "java.security.krb5.conf" in start and removes it when stop called.
8993
val kdcDir = Utils.createTempDir()
9094
val kdcConf = MiniKdc.createConf()
91-
kdc = new MiniKdc(kdcConf, kdcDir)
92-
kdc.start()
95+
// The port for MiniKdc service gets selected in the constructor, but will be bound
96+
// to it later in MiniKdc.start() -> MiniKdc.initKDCServer() -> KdcServer.start().
97+
// In meantime, when some other service might capture the port during this progress, and
98+
// cause BindException.
99+
// This makes our tests which have dedicated JVMs and rely on MiniKDC being flaky
100+
//
101+
// https://issues.apache.org/jira/browse/HADOOP-12656 get fixed in Hadoop 2.8.0.
102+
//
103+
// The workaround here is to periodically repeat this process with a timeout , since we are
104+
// using Hadoop 2.7.4 as default.
105+
// https://issues.apache.org/jira/browse/SPARK-31631
106+
eventually(timeout(10.seconds), interval(1.second)) {
107+
try {
108+
kdc = new MiniKdc(kdcConf, kdcDir)
109+
kdc.start()
110+
} catch {
111+
case NonFatal(e) =>
112+
if (kdc != null) {
113+
kdc.stop()
114+
kdc = null
115+
}
116+
throw e
117+
}
118+
}
93119

94120
val krbConf = new Configuration()
95121
krbConf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos")

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala

+26-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import javax.security.auth.login.Configuration
2727
import scala.collection.JavaConverters._
2828
import scala.io.Source
2929
import scala.util.Random
30+
import scala.util.control.NonFatal
3031

3132
import com.google.common.io.Files
3233
import kafka.api.Request
@@ -36,7 +37,7 @@ import kafka.zk.KafkaZkClient
3637
import org.apache.hadoop.minikdc.MiniKdc
3738
import org.apache.hadoop.security.UserGroupInformation
3839
import org.apache.kafka.clients.CommonClientConfigs
39-
import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, ListConsumerGroupsResult, NewPartitions, NewTopic}
40+
import org.apache.kafka.clients.admin._
4041
import org.apache.kafka.clients.consumer.KafkaConsumer
4142
import org.apache.kafka.clients.producer._
4243
import org.apache.kafka.common.TopicPartition
@@ -134,8 +135,30 @@ class KafkaTestUtils(
134135
val kdcDir = Utils.createTempDir()
135136
val kdcConf = MiniKdc.createConf()
136137
kdcConf.setProperty(MiniKdc.DEBUG, "true")
137-
kdc = new MiniKdc(kdcConf, kdcDir)
138-
kdc.start()
138+
// The port for MiniKdc service gets selected in the constructor, but will be bound
139+
// to it later in MiniKdc.start() -> MiniKdc.initKDCServer() -> KdcServer.start().
140+
// In meantime, when some other service might capture the port during this progress, and
141+
// cause BindException.
142+
// This makes our tests which have dedicated JVMs and rely on MiniKDC being flaky
143+
//
144+
// https://issues.apache.org/jira/browse/HADOOP-12656 get fixed in Hadoop 2.8.0.
145+
//
146+
// The workaround here is to periodically repeat this process with a timeout , since we are
147+
// using Hadoop 2.7.4 as default.
148+
// https://issues.apache.org/jira/browse/SPARK-31631
149+
eventually(timeout(10.seconds), interval(1.second)) {
150+
try {
151+
kdc = new MiniKdc(kdcConf, kdcDir)
152+
kdc.start()
153+
} catch {
154+
case NonFatal(e) =>
155+
if (kdc != null) {
156+
kdc.stop()
157+
kdc = null
158+
}
159+
throw e
160+
}
161+
}
139162
// TODO https://issues.apache.org/jira/browse/SPARK-30037
140163
// Need to build spark's own MiniKDC and customize krb5.conf like Kafka
141164
rewriteKrb5Conf()

0 commit comments

Comments
 (0)