Skip to content

Commit b0e8240

Browse files
committed
[FLINK-38408][checkpoint] Complete the checkpoint CompletableFuture after updating statistics to ensures semantic correctness and prevent test failure
1 parent 6d738f4 commit b0e8240

File tree

2 files changed

+104
-1
lines changed

2 files changed

+104
-1
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1385,8 +1385,8 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint)
13851385
lastSubsumed = null;
13861386
}
13871387

1388-
pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);
13891388
reportCompletedCheckpoint(completedCheckpoint);
1389+
pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);
13901390
} catch (Exception exception) {
13911391
// For robustness reasons, we need catch exception and try marking the checkpoint
13921392
// completed.

flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@
119119
import java.util.concurrent.Future;
120120
import java.util.concurrent.ScheduledExecutorService;
121121
import java.util.concurrent.ScheduledFuture;
122+
import java.util.concurrent.TimeUnit;
122123
import java.util.concurrent.atomic.AtomicBoolean;
123124
import java.util.concurrent.atomic.AtomicLong;
124125
import java.util.concurrent.atomic.AtomicReference;
@@ -4409,4 +4410,106 @@ public boolean isDiscarded() {
44094410
}
44104411
}
44114412
}
4413+
4414+
/**
4415+
* Tests that Checkpoint CompletableFuture completion happens after reportCompletedCheckpoint
4416+
* finishes. This ensures that when external components are notified via the CompletableFuture
4417+
* that a checkpoint is complete, all statistics have already been updated.
4418+
*/
4419+
@Test
4420+
void testCompletionFutureCompletesAfterReporting() throws Exception {
4421+
JobVertexID jobVertexID = new JobVertexID();
4422+
ExecutionGraph graph =
4423+
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
4424+
.addJobVertex(jobVertexID)
4425+
.build(EXECUTOR_RESOURCE.getExecutor());
4426+
4427+
ControllableCheckpointStatsTracker tracker = new ControllableCheckpointStatsTracker();
4428+
4429+
CheckpointCoordinator coordinator =
4430+
new CheckpointCoordinatorBuilder()
4431+
.setCheckpointStatsTracker(tracker)
4432+
.setTimer(manuallyTriggeredScheduledExecutor)
4433+
.build(graph);
4434+
4435+
CompletableFuture<CompletedCheckpoint> checkpointFuture =
4436+
coordinator.triggerCheckpoint(false);
4437+
manuallyTriggeredScheduledExecutor.triggerAll();
4438+
4439+
CompletableFuture<Void> ackTask =
4440+
CompletableFuture.runAsync(
4441+
() -> {
4442+
try {
4443+
ackCheckpoint(
4444+
1L,
4445+
coordinator,
4446+
jobVertexID,
4447+
graph,
4448+
handle(),
4449+
handle(),
4450+
handle());
4451+
} catch (Exception e) {
4452+
throw new RuntimeException(e);
4453+
}
4454+
});
4455+
4456+
assertThat(tracker.getReportStartedFuture().get(20, TimeUnit.SECONDS))
4457+
.as("reportCompletedCheckpoint should be started soon when checkpoint is acked.")
4458+
.isNull();
4459+
4460+
for (int i = 0; i < 30; i++) {
4461+
assertThat(checkpointFuture)
4462+
.as(
4463+
"Checkpoint future should not complete while reportCompletedCheckpoint is blocked")
4464+
.isNotDone();
4465+
Thread.sleep(100);
4466+
}
4467+
4468+
tracker.getReportBlockingFuture().complete(null);
4469+
4470+
CompletedCheckpoint result = checkpointFuture.get(5, TimeUnit.SECONDS);
4471+
assertThat(result)
4472+
.as("Checkpoint future should complete after reportCompletedCheckpoint finishes")
4473+
.isNotNull();
4474+
4475+
ackTask.get(5, TimeUnit.SECONDS);
4476+
}
4477+
4478+
/**
4479+
* A controllable checkpoint stats tracker for testing purposes. Allows precise control over
4480+
* when reportCompletedCheckpoint() completes, enabling verification of execution order and
4481+
* timing in tests.
4482+
*/
4483+
private static class ControllableCheckpointStatsTracker extends DefaultCheckpointStatsTracker {
4484+
private final CompletableFuture<Void> reportStartedFuture;
4485+
private final CompletableFuture<Void> reportBlockingFuture;
4486+
4487+
public ControllableCheckpointStatsTracker() {
4488+
super(
4489+
Integer.MAX_VALUE,
4490+
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
4491+
this.reportStartedFuture = new CompletableFuture<>();
4492+
this.reportBlockingFuture = new CompletableFuture<>();
4493+
}
4494+
4495+
public CompletableFuture<Void> getReportStartedFuture() {
4496+
return reportStartedFuture;
4497+
}
4498+
4499+
public CompletableFuture<Void> getReportBlockingFuture() {
4500+
return reportBlockingFuture;
4501+
}
4502+
4503+
@Override
4504+
public void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
4505+
reportStartedFuture.complete(null);
4506+
4507+
try {
4508+
reportBlockingFuture.get();
4509+
} catch (Exception e) {
4510+
throw new RuntimeException(e);
4511+
}
4512+
super.reportCompletedCheckpoint(completed);
4513+
}
4514+
}
44124515
}

0 commit comments

Comments
 (0)