diff --git a/core/src/test/scala/org/apache/spark/monotasks/compute/ComputeSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/monotasks/compute/ComputeSchedulerSuite.scala index f8dcf749a..087007cf6 100644 --- a/core/src/test/scala/org/apache/spark/monotasks/compute/ComputeSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/monotasks/compute/ComputeSchedulerSuite.scala @@ -18,9 +18,14 @@ package org.apache.spark.monotasks.compute import java.nio.ByteBuffer +import scala.concurrent.duration._ +import scala.language.postfixOps + import org.mockito.Mockito.{mock, never, timeout, verify, when} import org.scalatest.FunSuite +import org.scalatest.concurrent.Eventually.{eventually, interval => scalatestInterval, + timeout => scalatestTimeout} import org.apache.spark.TaskContextImpl import org.apache.spark.storage.{BlockManager, MemoryStore, TestBlockId} @@ -96,8 +101,11 @@ class ComputeSchedulerSuite extends FunSuite { computeScheduler.submitTask(computeMonotask2) computeScheduler.submitTask(computeMonotask3) - assert(computeMonotask1.isStarted) - assert(computeMonotask2.isStarted) + // Wrap this in an eventually loop, since the compute threads pull tasks asynchronously. + eventually(scalatestTimeout(3 seconds), scalatestInterval(10 milliseconds)) { + assert(computeMonotask1.isStarted) + assert(computeMonotask2.isStarted) + } assert(!computeMonotask3.isStarted) // Now submit a prepare monotask, and finish one of the other compute monotasks. The prepare