Skip to content

Commit 8430dbf

Browse files
committed
[SPARK-53961][SQL][TESTS] Fix FileStreamSinkSuite flakiness by using walkFileTree instead of walk
### What changes were proposed in this pull request? This PR aims to fix `FileStreamSinkSuite` flakiness by using `walkFileTree` instead of `walk`. ### Why are the changes needed? `Files.walk` is flaky like the following when the directory has a race condition. `walkFileTree` has more robust error handling. https://github.com/apache/spark/blob/2bb73fbdeb19f0a972786d3ea33d3263bf84ab66/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala#L543-L547 ``` [info] - cleanup complete but invalid output for aborted job *** FAILED *** (438 milliseconds) [info] java.io.UncheckedIOException: java.nio.file.NoSuchFileException: ***/spark-4c7ad10b-5848-45d7-ba43-dae4020ad011/output #output/part-00007-e582f3e3-87e3-40fa-8269-7fac9b545775-c000.snappy.parquet [info] at java.base/java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:87) [info] at java.base/java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:103) [info] at java.base/java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1855) [info] at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:292) [info] at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206) [info] at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:169) [info] at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:298) [info] at java.base/java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) [info] at scala.collection.convert.JavaCollectionWrappers$JIteratorWrapper.hasNext(JavaCollectionWrappers.scala:46) [info] at scala.collection.Iterator$$anon$6.hasNext(Iterator.scala:480) [info] at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583) [info] at scala.collection.mutable.Growable.addAll(Growable.scala:61) [info] at scala.collection.mutable.Growable.addAll$(Growable.scala:57) [info] at scala.collection.immutable.SetBuilderImpl.addAll(Set.scala:405) [info] at scala.collection.immutable.Set$.from(Set.scala:362) [info] at scala.collection.IterableOnceOps.toSet(IterableOnce.scala:1469) [info] at scala.collection.IterableOnceOps.toSet$(IterableOnce.scala:1469) [info] at scala.collection.AbstractIterator.toSet(Iterator.scala:1306) [info] at org.apache.spark.sql.streaming.FileStreamSinkSuite.$anonfun$new$52(FileStreamSinkSuite.scala:537) ``` ### Does this PR introduce _any_ user-facing change? No, this is a test case change. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52671 from dongjoon-hyun/SPARK-53961. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 94cccad commit 8430dbf

File tree

1 file changed

+20
-4
lines changed

1 file changed

+20
-4
lines changed

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming
1919

2020
import java.io.{File, IOException}
2121
import java.nio.file.{Files, Paths}
22+
import java.nio.file.attribute.BasicFileAttributes
2223
import java.util.Locale
2324

2425
import scala.collection.mutable.ArrayBuffer
@@ -534,10 +535,25 @@ abstract class FileStreamSinkSuite extends StreamTest {
534535
}
535536

536537
import PendingCommitFilesTrackingManifestFileCommitProtocol._
537-
val outputFileNames = Files.walk(outputDir.toPath).iterator().asScala
538-
.filter(_.toString.endsWith(".parquet"))
539-
.map(_.getFileName.toString)
540-
.toSet
538+
import java.nio.file.{Path, _}
539+
val outputFileNames = scala.collection.mutable.Set.empty[String]
540+
Files.walkFileTree(
541+
outputDir.toPath,
542+
new SimpleFileVisitor[Path] {
543+
override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
544+
val fileName = file.getFileName.toString
545+
if (fileName.endsWith(".parquet")) outputFileNames += fileName
546+
FileVisitResult.CONTINUE
547+
}
548+
override def visitFileFailed(file: Path, exc: IOException): FileVisitResult = {
549+
exc match {
550+
case _: NoSuchFileException =>
551+
FileVisitResult.CONTINUE
552+
case _ =>
553+
FileVisitResult.TERMINATE
554+
}
555+
}
556+
})
541557
val trackingFileNames = tracking.map(SparkPath.fromUrlString(_).toPath.getName).toSet
542558

543559
// there would be possible to have race condition:

0 commit comments

Comments
 (0)