Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
package org.apache.flink.runtime.checkpoint;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
Expand Down Expand Up @@ -56,6 +56,13 @@ class DefaultSchedulerCheckpointCoordinatorTest {
private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION =
TestingUtils.defaultExecutorExtension();

@RegisterExtension
static final TestingComponentMainThreadExecutor.Extension MAIN_EXECUTOR_RESOURCE =
new TestingComponentMainThreadExecutor.Extension();

private final TestingComponentMainThreadExecutor mainThreadExecutor =
MAIN_EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();

/** Tests that the checkpoint coordinator is shut down if the execution graph is failed. */
@Test
void testClosingSchedulerShutsDownCheckpointCoordinatorOnFailedExecutionGraph()
Expand All @@ -77,9 +84,14 @@ void testClosingSchedulerShutsDownCheckpointCoordinatorOnFailedExecutionGraph()
assertThat(checkpointCoordinator).isNotNull();
assertThat(checkpointCoordinator.isShutdown()).isFalse();

graph.failJob(new Exception("Test Exception"), System.currentTimeMillis());

scheduler.closeAsync().get();
mainThreadExecutor
.execute(
() -> {
graph.failJob(
new Exception("Test Exception"), System.currentTimeMillis());
return scheduler.closeAsync();
})
.get();

assertThat(checkpointCoordinator.isShutdown()).isTrue();
assertThat(counterShutdownFuture).isCompletedWithValue(JobStatus.FAILED);
Expand Down Expand Up @@ -107,9 +119,13 @@ void testClosingSchedulerShutsDownCheckpointCoordinatorOnSuspendedExecutionGraph
assertThat(checkpointCoordinator).isNotNull();
assertThat(checkpointCoordinator.isShutdown()).isFalse();

graph.suspend(new Exception("Test Exception"));

scheduler.closeAsync().get();
mainThreadExecutor
.execute(
() -> {
graph.suspend(new Exception("Test Exception"));
return scheduler.closeAsync();
})
.get();

assertThat(checkpointCoordinator.isShutdown()).isTrue();
assertThat(counterShutdownFuture).isCompletedWithValue(JobStatus.SUSPENDED);
Expand Down Expand Up @@ -137,18 +153,22 @@ void testClosingSchedulerShutsDownCheckpointCoordinatorOnFinishedExecutionGraph(
assertThat(checkpointCoordinator).isNotNull();
assertThat(checkpointCoordinator.isShutdown()).isFalse();

scheduler.startScheduling();

for (ExecutionVertex executionVertex : graph.getAllExecutionVertices()) {
final Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
scheduler.updateTaskExecutionState(
new TaskExecutionState(
currentExecutionAttempt.getAttemptId(), ExecutionState.FINISHED));
}
mainThreadExecutor.execute(
() -> {
scheduler.startScheduling();
for (ExecutionVertex executionVertex : graph.getAllExecutionVertices()) {
final Execution currentExecutionAttempt =
executionVertex.getCurrentExecutionAttempt();
scheduler.updateTaskExecutionState(
new TaskExecutionState(
currentExecutionAttempt.getAttemptId(),
ExecutionState.FINISHED));
}
});

assertThat(graph.getTerminationFuture()).isCompletedWithValue(JobStatus.FINISHED);

scheduler.closeAsync().get();
mainThreadExecutor.execute(scheduler::closeAsync).get();

assertThat(checkpointCoordinator.isShutdown()).isTrue();
assertThat(counterShutdownFuture).isCompletedWithValue(JobStatus.FINISHED);
Expand Down Expand Up @@ -176,7 +196,7 @@ void testClosingSchedulerSuspendsExecutionGraphAndShutsDownCheckpointCoordinator
assertThat(checkpointCoordinator).isNotNull();
assertThat(checkpointCoordinator.isShutdown()).isFalse();

scheduler.closeAsync().get();
mainThreadExecutor.execute(scheduler::closeAsync).get();

assertThat(graph.getState()).isEqualTo(JobStatus.SUSPENDED);
assertThat(checkpointCoordinator.isShutdown()).isTrue();
Expand Down Expand Up @@ -208,7 +228,7 @@ private DefaultScheduler createSchedulerAndEnableCheckpointing(

return new DefaultSchedulerBuilder(
jobGraph,
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
mainThreadExecutor.getMainThreadExecutor(),
EXECUTOR_EXTENSION.getExecutor())
.setCheckpointRecoveryFactory(new TestingCheckpointRecoveryFactory(store, counter))
.setRpcTimeout(timeout)
Expand Down