Skip to content

Commit b1a9728

Browse files
committed
[FLINK-38574][checkpoint] Avoid reusing re-uploaded sst files when checkpoint notification is delayed
1 parent 5156147 commit b1a9728

File tree

6 files changed

+143
-12
lines changed

6 files changed

+143
-12
lines changed

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategy.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -197,29 +197,34 @@ protected PreviousSnapshot snapshotMetaData(
197197

198198
final long lastCompletedCheckpoint;
199199
final Collection<HandleAndLocalPath> confirmedSstFiles;
200+
final List<Collection<HandleAndLocalPath>> followingSstFiles;
200201

201202
// use the last completed checkpoint as the comparison base.
202203
synchronized (uploadedSstFiles) {
203204
lastCompletedCheckpoint = lastCompletedCheckpointId;
204205
confirmedSstFiles = uploadedSstFiles.get(lastCompletedCheckpoint);
205-
LOG.trace(
206-
"Use confirmed SST files for checkpoint {}: {}",
207-
checkpointId,
208-
confirmedSstFiles);
206+
// All the uploaded sst files in the checkpoints following the last completed checkpoint
207+
// in ascending order of checkpoint IDs.
208+
followingSstFiles =
209+
new ArrayList<>(uploadedSstFiles.tailMap(lastCompletedCheckpoint + 1).values());
209210
}
211+
PreviousSnapshot previousSnapshot = new PreviousSnapshot(confirmedSstFiles);
212+
// This will remove all the SST files that have been re-uploaded in the checkpoints
213+
// following the last completed checkpoint.
214+
followingSstFiles.forEach(previousSnapshot::removeReUploadedConfirmedSstFiles);
210215
LOG.trace(
211216
"Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} "
212217
+ "assuming the following (shared) confirmed files as base: {}.",
213218
checkpointId,
214219
lastCompletedCheckpoint,
215-
confirmedSstFiles);
220+
previousSnapshot);
216221

217222
// snapshot meta data to save
218223
for (Map.Entry<String, ForStOperationUtils.ForStKvStateInfo> stateMetaInfoEntry :
219224
kvStateInformation.entrySet()) {
220225
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().metaInfo.snapshot());
221226
}
222-
return new PreviousSnapshot(confirmedSstFiles);
227+
return previousSnapshot;
223228
}
224229

225230
/** Encapsulates the process to perform an incremental snapshot of a ForStKeyedStateBackend. */

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,26 @@ protected PreviousSnapshot(@Nullable Collection<HandleAndLocalPath> confirmedSst
269269
: Collections.emptyMap();
270270
}
271271

272+
/**
273+
* Remove the sst files which have been re-uploaded in the following checkpoint from the
274+
* confirmed sst files.
275+
*
276+
* @param followingUploadedSstFiles the sst files uploaded in the following checkpoint.
277+
*/
278+
protected void removeReUploadedConfirmedSstFiles(
279+
@Nonnull Collection<HandleAndLocalPath> followingUploadedSstFiles) {
280+
if (!confirmedSstFiles.isEmpty()) {
281+
followingUploadedSstFiles.forEach(
282+
e -> {
283+
if (!(e.getHandle() instanceof PlaceholderStreamStateHandle)) {
284+
// If it's not a placeholder handle, it means the sst file has been
285+
// re-uploaded in the following checkpoint.
286+
confirmedSstFiles.remove(e.getLocalPath());
287+
}
288+
});
289+
}
290+
}
291+
272292
protected Optional<StreamStateHandle> getUploaded(String filename) {
273293
if (confirmedSstFiles.containsKey(filename)) {
274294
StreamStateHandle handle = confirmedSstFiles.get(filename);
@@ -290,5 +310,10 @@ protected Optional<StreamStateHandle> getUploaded(String filename) {
290310
protected boolean isEmpty() {
291311
return confirmedSstFiles.isEmpty();
292312
}
313+
314+
@Override
315+
public String toString() {
316+
return "PreviousSnapshot{" + "confirmedSstFiles=" + confirmedSstFiles + '}';
317+
}
293318
}
294319
}

flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,41 @@ private void testCheckpointIsIncremental(boolean forceFirstFull) throws Exceptio
105105
}
106106
}
107107

108+
@Test
109+
void testCheckpointIsIncrementalWithLateNotification() throws Exception {
110+
111+
try (CloseableRegistry closeableRegistry = new CloseableRegistry();
112+
ForStSnapshotStrategyBase<?, ?> checkpointSnapshotStrategy =
113+
createSnapshotStrategy()) {
114+
FsCheckpointStreamFactory checkpointStreamFactory = createFsCheckpointStreamFactory();
115+
116+
// make and checkpoint with id 1
117+
snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory, closeableRegistry);
118+
119+
// make and checkpoint with id 2
120+
snapshot(2L, checkpointSnapshotStrategy, checkpointStreamFactory, closeableRegistry);
121+
122+
// Late notify checkpoint with id 1
123+
checkpointSnapshotStrategy.notifyCheckpointComplete(1L);
124+
125+
// make checkpoint with id 3, based on checkpoint 1
126+
IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle3 =
127+
snapshot(
128+
3L,
129+
checkpointSnapshotStrategy,
130+
checkpointStreamFactory,
131+
closeableRegistry);
132+
133+
// Late notify checkpoint with id 2
134+
checkpointSnapshotStrategy.notifyCheckpointComplete(2L);
135+
136+
// 3rd checkpoint is based on 1st checkpoint, BUT the 2nd checkpoint re-uploaded the 1st
137+
// one, so it should be based on nothing, thus this is effectively a full checkpoint.
138+
assertThat(incrementalRemoteKeyedStateHandle3.getStateSize())
139+
.isEqualTo(incrementalRemoteKeyedStateHandle3.getCheckpointedSize());
140+
}
141+
}
142+
108143
@Test
109144
void testCheckpointIsFull() throws Exception {
110145

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksDBSnapshotStrategyBase.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,26 @@ protected PreviousSnapshot(@Nullable Collection<HandleAndLocalPath> confirmedSst
412412
: Collections.emptyMap();
413413
}
414414

415+
/**
416+
* Remove the sst files which have been re-uploaded in the following checkpoint from the
417+
* confirmed sst files.
418+
*
419+
* @param followingUploadedSstFiles the sst files uploaded in the following checkpoint.
420+
*/
421+
protected void removeReUploadedConfirmedSstFiles(
422+
@Nonnull Collection<HandleAndLocalPath> followingUploadedSstFiles) {
423+
if (!confirmedSstFiles.isEmpty()) {
424+
followingUploadedSstFiles.forEach(
425+
e -> {
426+
if (!(e.getHandle() instanceof PlaceholderStreamStateHandle)) {
427+
// If it's not a placeholder handle, it means the sst file has been
428+
// re-uploaded in the following checkpoint.
429+
confirmedSstFiles.remove(e.getLocalPath());
430+
}
431+
});
432+
}
433+
}
434+
415435
protected Optional<StreamStateHandle> getUploaded(String filename) {
416436
if (confirmedSstFiles.containsKey(filename)) {
417437
StreamStateHandle handle = confirmedSstFiles.get(filename);
@@ -429,5 +449,10 @@ protected Optional<StreamStateHandle> getUploaded(String filename) {
429449
return Optional.empty();
430450
}
431451
}
452+
453+
@Override
454+
public String toString() {
455+
return "PreviousSnapshot{" + "confirmedSstFiles=" + confirmedSstFiles + '}';
456+
}
432457
}
433458
}

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategy.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -197,29 +197,34 @@ protected PreviousSnapshot snapshotMetaData(
197197

198198
final long lastCompletedCheckpoint;
199199
final Collection<HandleAndLocalPath> confirmedSstFiles;
200+
final List<Collection<HandleAndLocalPath>> followingSstFiles;
200201

201202
// use the last completed checkpoint as the comparison base.
202203
synchronized (uploadedSstFiles) {
203204
lastCompletedCheckpoint = lastCompletedCheckpointId;
204205
confirmedSstFiles = uploadedSstFiles.get(lastCompletedCheckpoint);
205-
LOG.trace(
206-
"Use confirmed SST files for checkpoint {}: {}",
207-
checkpointId,
208-
confirmedSstFiles);
206+
// All the uploaded sst files in the checkpoints following the last completed checkpoint
207+
// in ascending order of checkpoint IDs.
208+
followingSstFiles =
209+
new ArrayList<>(uploadedSstFiles.tailMap(lastCompletedCheckpoint + 1).values());
209210
}
211+
PreviousSnapshot previousSnapshot = new PreviousSnapshot(confirmedSstFiles);
212+
// This will remove all the SST files that have been re-uploaded in the checkpoints
213+
// following the last completed checkpoint.
214+
followingSstFiles.forEach(previousSnapshot::removeReUploadedConfirmedSstFiles);
210215
LOG.trace(
211216
"Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} "
212217
+ "assuming the following (shared) confirmed files as base: {}.",
213218
checkpointId,
214219
lastCompletedCheckpoint,
215-
confirmedSstFiles);
220+
previousSnapshot);
216221

217222
// snapshot meta data to save
218223
for (Map.Entry<String, RocksDbKvStateInfo> stateMetaInfoEntry :
219224
kvStateInformation.entrySet()) {
220225
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().metaInfo.snapshot());
221226
}
222-
return new PreviousSnapshot(confirmedSstFiles);
227+
return previousSnapshot;
223228
}
224229

225230
/**
@@ -352,6 +357,7 @@ private long uploadSnapshotFiles(
352357
List<Path> miscFilePaths = new ArrayList<>(files.length);
353358

354359
createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
360+
LOG.info("Will re-use {} SST files. {}", sstFiles.size(), sstFiles);
355361

356362
final CheckpointedStateScope stateScope =
357363
sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING

flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategyTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,41 @@ void testCheckpointIsIncremental() throws Exception {
9696
}
9797
}
9898

99+
@Test
100+
void testCheckpointIsIncrementalWithLateNotification() throws Exception {
101+
102+
try (CloseableRegistry closeableRegistry = new CloseableRegistry();
103+
RocksIncrementalSnapshotStrategy<?> checkpointSnapshotStrategy =
104+
createSnapshotStrategy()) {
105+
FsCheckpointStreamFactory checkpointStreamFactory = createFsCheckpointStreamFactory();
106+
107+
// make and checkpoint with id 1
108+
snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory, closeableRegistry);
109+
110+
// make and checkpoint with id 2
111+
snapshot(2L, checkpointSnapshotStrategy, checkpointStreamFactory, closeableRegistry);
112+
113+
// Late notify checkpoint with id 1
114+
checkpointSnapshotStrategy.notifyCheckpointComplete(1L);
115+
116+
// make checkpoint with id 3, based on checkpoint 1
117+
IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle3 =
118+
snapshot(
119+
3L,
120+
checkpointSnapshotStrategy,
121+
checkpointStreamFactory,
122+
closeableRegistry);
123+
124+
// Late notify checkpoint with id 2
125+
checkpointSnapshotStrategy.notifyCheckpointComplete(2L);
126+
127+
// 3rd checkpoint is based on 1st checkpoint, BUT the 2nd checkpoint re-uploaded the 1st
128+
// one, so it should be based on nothing, thus this is effectively a full checkpoint.
129+
assertThat(incrementalRemoteKeyedStateHandle3.getStateSize())
130+
.isEqualTo(incrementalRemoteKeyedStateHandle3.getCheckpointedSize());
131+
}
132+
}
133+
99134
public RocksIncrementalSnapshotStrategy<?> createSnapshotStrategy()
100135
throws IOException, RocksDBException {
101136

0 commit comments

Comments
 (0)