diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java index 363fe516b8e66..51562ef19ab34 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java @@ -19,11 +19,11 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphBuilder; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -56,6 +56,13 @@ class DefaultSchedulerCheckpointCoordinatorTest { private static final TestExecutorExtension EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension(); + @RegisterExtension + static final TestingComponentMainThreadExecutor.Extension MAIN_EXECUTOR_RESOURCE = + new TestingComponentMainThreadExecutor.Extension(); + + private final TestingComponentMainThreadExecutor mainThreadExecutor = + MAIN_EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor(); + /** Tests that the checkpoint coordinator is shut down if the execution graph is failed. */ @Test void testClosingSchedulerShutsDownCheckpointCoordinatorOnFailedExecutionGraph() @@ -77,9 +84,14 @@ void testClosingSchedulerShutsDownCheckpointCoordinatorOnFailedExecutionGraph() assertThat(checkpointCoordinator).isNotNull(); assertThat(checkpointCoordinator.isShutdown()).isFalse(); - graph.failJob(new Exception("Test Exception"), System.currentTimeMillis()); - - scheduler.closeAsync().get(); + mainThreadExecutor + .execute( + () -> { + graph.failJob( + new Exception("Test Exception"), System.currentTimeMillis()); + return scheduler.closeAsync(); + }) + .get(); assertThat(checkpointCoordinator.isShutdown()).isTrue(); assertThat(counterShutdownFuture).isCompletedWithValue(JobStatus.FAILED); @@ -107,9 +119,13 @@ void testClosingSchedulerShutsDownCheckpointCoordinatorOnSuspendedExecutionGraph assertThat(checkpointCoordinator).isNotNull(); assertThat(checkpointCoordinator.isShutdown()).isFalse(); - graph.suspend(new Exception("Test Exception")); - - scheduler.closeAsync().get(); + mainThreadExecutor + .execute( + () -> { + graph.suspend(new Exception("Test Exception")); + return scheduler.closeAsync(); + }) + .get(); assertThat(checkpointCoordinator.isShutdown()).isTrue(); assertThat(counterShutdownFuture).isCompletedWithValue(JobStatus.SUSPENDED); @@ -137,18 +153,22 @@ void testClosingSchedulerShutsDownCheckpointCoordinatorOnFinishedExecutionGraph( assertThat(checkpointCoordinator).isNotNull(); assertThat(checkpointCoordinator.isShutdown()).isFalse(); - scheduler.startScheduling(); - - for (ExecutionVertex executionVertex : graph.getAllExecutionVertices()) { - final Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt(); - scheduler.updateTaskExecutionState( - new TaskExecutionState( - currentExecutionAttempt.getAttemptId(), ExecutionState.FINISHED)); - } + mainThreadExecutor.execute( + () -> { + scheduler.startScheduling(); + for (ExecutionVertex executionVertex : graph.getAllExecutionVertices()) { + final Execution currentExecutionAttempt = + executionVertex.getCurrentExecutionAttempt(); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + currentExecutionAttempt.getAttemptId(), + ExecutionState.FINISHED)); + } + }); assertThat(graph.getTerminationFuture()).isCompletedWithValue(JobStatus.FINISHED); - scheduler.closeAsync().get(); + mainThreadExecutor.execute(scheduler::closeAsync).get(); assertThat(checkpointCoordinator.isShutdown()).isTrue(); assertThat(counterShutdownFuture).isCompletedWithValue(JobStatus.FINISHED); @@ -176,7 +196,7 @@ void testClosingSchedulerSuspendsExecutionGraphAndShutsDownCheckpointCoordinator assertThat(checkpointCoordinator).isNotNull(); assertThat(checkpointCoordinator.isShutdown()).isFalse(); - scheduler.closeAsync().get(); + mainThreadExecutor.execute(scheduler::closeAsync).get(); assertThat(graph.getState()).isEqualTo(JobStatus.SUSPENDED); assertThat(checkpointCoordinator.isShutdown()).isTrue(); @@ -208,7 +228,7 @@ private DefaultScheduler createSchedulerAndEnableCheckpointing( return new DefaultSchedulerBuilder( jobGraph, - ComponentMainThreadExecutorServiceAdapter.forMainThread(), + mainThreadExecutor.getMainThreadExecutor(), EXECUTOR_EXTENSION.getExecutor()) .setCheckpointRecoveryFactory(new TestingCheckpointRecoveryFactory(store, counter)) .setRpcTimeout(timeout)