Skip to content

Commit 324d58b

Browse files
committed
[FLINK-38574][checkpoint] Avoid reusing re-uploaded sst files when checkpoint notification is delayed
1 parent 5156147 commit 324d58b

File tree

6 files changed

+257
-28
lines changed

6 files changed

+257
-28
lines changed

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategy.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -196,30 +196,29 @@ protected PreviousSnapshot snapshotMetaData(
196196
long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
197197

198198
final long lastCompletedCheckpoint;
199-
final Collection<HandleAndLocalPath> confirmedSstFiles;
199+
final SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles;
200200

201201
// use the last completed checkpoint as the comparison base.
202202
synchronized (uploadedSstFiles) {
203203
lastCompletedCheckpoint = lastCompletedCheckpointId;
204-
confirmedSstFiles = uploadedSstFiles.get(lastCompletedCheckpoint);
205-
LOG.trace(
206-
"Use confirmed SST files for checkpoint {}: {}",
207-
checkpointId,
208-
confirmedSstFiles);
204+
currentUploadedSstFiles =
205+
new TreeMap<>(uploadedSstFiles.tailMap(lastCompletedCheckpoint));
209206
}
207+
PreviousSnapshot previousSnapshot =
208+
new PreviousSnapshot(currentUploadedSstFiles, lastCompletedCheckpoint);
210209
LOG.trace(
211210
"Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} "
212211
+ "assuming the following (shared) confirmed files as base: {}.",
213212
checkpointId,
214213
lastCompletedCheckpoint,
215-
confirmedSstFiles);
214+
previousSnapshot);
216215

217216
// snapshot meta data to save
218217
for (Map.Entry<String, ForStOperationUtils.ForStKvStateInfo> stateMetaInfoEntry :
219218
kvStateInformation.entrySet()) {
220219
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().metaInfo.snapshot());
221220
}
222-
return new PreviousSnapshot(confirmedSstFiles);
221+
return previousSnapshot;
223222
}
224223

225224
/** Encapsulates the process to perform an incremental snapshot of a ForStKeyedStateBackend. */

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java

Lines changed: 88 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import org.apache.flink.util.ResourceGuard;
4747

4848
import org.forstdb.RocksDB;
49+
import org.slf4j.Logger;
50+
import org.slf4j.LoggerFactory;
4951

5052
import javax.annotation.Nonnegative;
5153
import javax.annotation.Nonnull;
@@ -58,6 +60,7 @@
5860
import java.util.List;
5961
import java.util.Map;
6062
import java.util.Optional;
63+
import java.util.SortedMap;
6164
import java.util.UUID;
6265
import java.util.concurrent.atomic.AtomicBoolean;
6366
import java.util.stream.Collectors;
@@ -70,6 +73,8 @@
7073
public abstract class ForStSnapshotStrategyBase<K, R extends SnapshotResources>
7174
implements CheckpointListener, SnapshotStrategy<KeyedStateHandle, R>, AutoCloseable {
7275

76+
private static final Logger LOG = LoggerFactory.getLogger(ForStSnapshotStrategyBase.class);
77+
7378
@Nonnull private final String description;
7479

7580
/** ForSt instance from the backend. */
@@ -251,22 +256,94 @@ public void release() {
251256
}
252257

253258
protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
254-
new PreviousSnapshot(Collections.emptyList());
259+
new PreviousSnapshot(null, -1L);
255260

256261
/** Previous snapshot with uploaded sst files. */
257262
protected static class PreviousSnapshot {
258263

259264
@Nonnull private final Map<String, StreamStateHandle> confirmedSstFiles;
260265

261-
protected PreviousSnapshot(@Nullable Collection<HandleAndLocalPath> confirmedSstFiles) {
266+
/**
267+
* Constructor of PreviousSnapshot. Giving a map of uploaded sst files in previous
268+
* checkpoints, prune the sst files which have been re-uploaded in the following
269+
* checkpoints. The prune logic is used to resolve the mismatch between TM and JM due to
270+
* notification delay. Following steps for example:
271+
*
272+
* <ul>
273+
* <li>1) checkpoint 1 uses file 00001.SST uploaded as xxx.sst.
274+
* <li>2) checkpoint 2 uses the same file 00001.SST but re-uploads it as yyy.sst because
275+
* CP 1 wasn't yet confirmed.
276+
* <li>3) TM get a confirmation of checkpoint 1.
277+
* <li>4) JM completes checkpoint 2 and subsumes checkpoint 1 - removing xxx.sst.
278+
* <li>5) checkpoint 3 tries to re-use file 00001.SST uploaded as xxx.sst in checkpoint 1,
279+
* but it was deleted in (4) by JM.
280+
* </ul>
281+
*
282+
* @param currentUploadedSstFiles the sst files uploaded in previous checkpoints.
283+
* @param lastCompletedCheckpoint the last completed checkpoint id.
284+
*/
285+
protected PreviousSnapshot(
286+
@Nullable SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles,
287+
long lastCompletedCheckpoint) {
262288
this.confirmedSstFiles =
263-
confirmedSstFiles != null
264-
? confirmedSstFiles.stream()
289+
currentUploadedSstFiles != null
290+
? pruneFirstCheckpointSstFiles(
291+
currentUploadedSstFiles, lastCompletedCheckpoint)
292+
: Collections.emptyMap();
293+
}
294+
295+
/**
296+
* The last completed checkpoint's uploaded sst files are all included, then for each
297+
* following checkpoint, if a sst file has been re-uploaded, remove it from the first
298+
* checkpoint's sst files.
299+
*
300+
* @param currentUploadedSstFiles the sst files uploaded in the following checkpoint.
301+
* @param lastCompletedCheckpoint the last completed checkpoint id.
302+
*/
303+
private Map<String, StreamStateHandle> pruneFirstCheckpointSstFiles(
304+
@Nonnull SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles,
305+
long lastCompletedCheckpoint) {
306+
Map<String, StreamStateHandle> prunedSstFiles = null;
307+
int removedCount = 0;
308+
for (Map.Entry<Long, Collection<HandleAndLocalPath>> entry :
309+
currentUploadedSstFiles.entrySet()) {
310+
// Iterate checkpoints in ascending order of checkpoint id.
311+
if (entry.getKey() == lastCompletedCheckpoint) {
312+
// The first checkpoint's uploaded sst files are all included.
313+
prunedSstFiles =
314+
entry.getValue().stream()
265315
.collect(
266316
Collectors.toMap(
267317
HandleAndLocalPath::getLocalPath,
268-
HandleAndLocalPath::getHandle))
269-
: Collections.emptyMap();
318+
HandleAndLocalPath::getHandle));
319+
} else if (prunedSstFiles == null) {
320+
// The last completed checkpoint's uploaded sst files are not existed.
321+
// So we skip the pruning process.
322+
break;
323+
} else if (!prunedSstFiles.isEmpty()) {
324+
// Prune sst files which have been re-uploaded in the following checkpoints.
325+
for (HandleAndLocalPath handleAndLocalPath : entry.getValue()) {
326+
if (!(handleAndLocalPath.getHandle()
327+
instanceof PlaceholderStreamStateHandle)) {
328+
// If it's not a placeholder handle, it means the sst file has been
329+
// re-uploaded in the following checkpoint.
330+
if (prunedSstFiles.remove(handleAndLocalPath.getLocalPath()) != null) {
331+
removedCount++;
332+
}
333+
}
334+
}
335+
}
336+
}
337+
if (removedCount > 0 && LOG.isTraceEnabled()) {
338+
LOG.trace(
339+
"Removed {} re-uploaded sst files from base file set for incremental "
340+
+ "checkpoint. Base checkpoint id: {}",
341+
removedCount,
342+
currentUploadedSstFiles.firstKey());
343+
}
344+
return (prunedSstFiles != null && !prunedSstFiles.isEmpty())
345+
? Collections.unmodifiableMap(prunedSstFiles)
346+
: Collections.emptyMap();
270347
}
271348

272349
protected Optional<StreamStateHandle> getUploaded(String filename) {
@@ -290,5 +367,10 @@ protected Optional<StreamStateHandle> getUploaded(String filename) {
290367
protected boolean isEmpty() {
291368
return confirmedSstFiles.isEmpty();
292369
}
370+
371+
@Override
372+
public String toString() {
373+
return "PreviousSnapshot{" + "confirmedSstFiles=" + confirmedSstFiles + '}';
374+
}
293375
}
294376
}

flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,41 @@ private void testCheckpointIsIncremental(boolean forceFirstFull) throws Exceptio
105105
}
106106
}
107107

108+
@Test
109+
void testCheckpointIsIncrementalWithLateNotification() throws Exception {
110+
111+
try (CloseableRegistry closeableRegistry = new CloseableRegistry();
112+
ForStSnapshotStrategyBase<?, ?> checkpointSnapshotStrategy =
113+
createSnapshotStrategy()) {
114+
FsCheckpointStreamFactory checkpointStreamFactory = createFsCheckpointStreamFactory();
115+
116+
// make and checkpoint with id 1
117+
snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory, closeableRegistry);
118+
119+
// make and checkpoint with id 2
120+
snapshot(2L, checkpointSnapshotStrategy, checkpointStreamFactory, closeableRegistry);
121+
122+
// Late notify checkpoint with id 1
123+
checkpointSnapshotStrategy.notifyCheckpointComplete(1L);
124+
125+
// make checkpoint with id 3, based on checkpoint 1
126+
IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle3 =
127+
snapshot(
128+
3L,
129+
checkpointSnapshotStrategy,
130+
checkpointStreamFactory,
131+
closeableRegistry);
132+
133+
// Late notify checkpoint with id 2
134+
checkpointSnapshotStrategy.notifyCheckpointComplete(2L);
135+
136+
// 3rd checkpoint is based on 1st checkpoint, BUT the 2nd checkpoint re-uploaded the 1st
137+
// one, so it should be based on nothing, thus this is effectively a full checkpoint.
138+
assertThat(incrementalRemoteKeyedStateHandle3.getStateSize())
139+
.isEqualTo(incrementalRemoteKeyedStateHandle3.getCheckpointedSize());
140+
}
141+
}
142+
108143
@Test
109144
void testCheckpointIsFull() throws Exception {
110145

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksDBSnapshotStrategyBase.java

Lines changed: 84 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import java.util.List;
6969
import java.util.Map;
7070
import java.util.Optional;
71+
import java.util.SortedMap;
7172
import java.util.UUID;
7273
import java.util.stream.Collectors;
7374

@@ -394,22 +395,94 @@ public void release() {
394395
}
395396

396397
protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
397-
new PreviousSnapshot(Collections.emptyList());
398+
new PreviousSnapshot(null, -1L);
398399

399400
/** Previous snapshot with uploaded sst files. */
400401
protected static class PreviousSnapshot {
401402

402403
@Nonnull private final Map<String, StreamStateHandle> confirmedSstFiles;
403404

404-
protected PreviousSnapshot(@Nullable Collection<HandleAndLocalPath> confirmedSstFiles) {
405+
/**
406+
* Constructor of PreviousSnapshot. Giving a map of uploaded sst files in previous
407+
* checkpoints, prune the sst files which have been re-uploaded in the following
408+
* checkpoints. The prune logic is used to resolve the mismatch between TM and JM due to
409+
* notification delay. Following steps for example:
410+
*
411+
* <ul>
412+
* <li>1) checkpoint 1 uses file 00001.SST uploaded as xxx.sst.
413+
* <li>2) checkpoint 2 uses the same file 00001.SST but re-uploads it as yyy.sst because
414+
* CP 1 wasn't yet confirmed.
415+
* <li>3) TM get a confirmation of checkpoint 1.
416+
* <li>4) JM completes checkpoint 2 and subsumes checkpoint 1 - removing xxx.sst.
417+
* <li>5) checkpoint 3 tries to re-use file 00001.SST uploaded as xxx.sst in checkpoint 1,
418+
* but it was deleted in (4) by JM.
419+
* </ul>
420+
*
421+
* @param currentUploadedSstFiles the sst files uploaded in previous checkpoints.
422+
* @param lastCompletedCheckpoint the last completed checkpoint id.
423+
*/
424+
protected PreviousSnapshot(
425+
@Nullable SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles,
426+
long lastCompletedCheckpoint) {
405427
this.confirmedSstFiles =
406-
confirmedSstFiles != null
407-
? confirmedSstFiles.stream()
428+
currentUploadedSstFiles != null
429+
? pruneFirstCheckpointSstFiles(
430+
currentUploadedSstFiles, lastCompletedCheckpoint)
431+
: Collections.emptyMap();
432+
}
433+
434+
/**
435+
* The last completed checkpoint's uploaded sst files are all included, then for each
436+
* following checkpoint, if a sst file has been re-uploaded, remove it from the first
437+
* checkpoint's sst files.
438+
*
439+
* @param currentUploadedSstFiles the sst files uploaded in the following checkpoint.
440+
* @param lastCompletedCheckpoint the last completed checkpoint id.
441+
*/
442+
private Map<String, StreamStateHandle> pruneFirstCheckpointSstFiles(
443+
@Nonnull SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles,
444+
long lastCompletedCheckpoint) {
445+
Map<String, StreamStateHandle> prunedSstFiles = null;
446+
int removedCount = 0;
447+
for (Map.Entry<Long, Collection<HandleAndLocalPath>> entry :
448+
currentUploadedSstFiles.entrySet()) {
449+
// Iterate checkpoints in ascending order of checkpoint id.
450+
if (entry.getKey() == lastCompletedCheckpoint) {
451+
// The first checkpoint's uploaded sst files are all included.
452+
prunedSstFiles =
453+
entry.getValue().stream()
408454
.collect(
409455
Collectors.toMap(
410456
HandleAndLocalPath::getLocalPath,
411-
HandleAndLocalPath::getHandle))
412-
: Collections.emptyMap();
457+
HandleAndLocalPath::getHandle));
458+
} else if (prunedSstFiles == null) {
459+
// The last completed checkpoint's uploaded sst files are not existed.
460+
// So we skip the pruning process.
461+
break;
462+
} else if (!prunedSstFiles.isEmpty()) {
463+
// Prune sst files which have been re-uploaded in the following checkpoints.
464+
for (HandleAndLocalPath handleAndLocalPath : entry.getValue()) {
465+
if (!(handleAndLocalPath.getHandle()
466+
instanceof PlaceholderStreamStateHandle)) {
467+
// If it's not a placeholder handle, it means the sst file has been
468+
// re-uploaded in the following checkpoint.
469+
if (prunedSstFiles.remove(handleAndLocalPath.getLocalPath()) != null) {
470+
removedCount++;
471+
}
472+
}
473+
}
474+
}
475+
}
476+
if (removedCount > 0 && LOG.isTraceEnabled()) {
477+
LOG.trace(
478+
"Removed {} re-uploaded sst files from base file set for incremental "
479+
+ "checkpoint. Base checkpoint id: {}",
480+
removedCount,
481+
currentUploadedSstFiles.firstKey());
482+
}
483+
return (prunedSstFiles != null && !prunedSstFiles.isEmpty())
484+
? Collections.unmodifiableMap(prunedSstFiles)
485+
: Collections.emptyMap();
413486
}
414487

415488
protected Optional<StreamStateHandle> getUploaded(String filename) {
@@ -429,5 +502,10 @@ protected Optional<StreamStateHandle> getUploaded(String filename) {
429502
return Optional.empty();
430503
}
431504
}
505+
506+
@Override
507+
public String toString() {
508+
return "PreviousSnapshot{" + "confirmedSstFiles=" + confirmedSstFiles + '}';
509+
}
432510
}
433511
}

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategy.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -196,30 +196,29 @@ protected PreviousSnapshot snapshotMetaData(
196196
long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
197197

198198
final long lastCompletedCheckpoint;
199-
final Collection<HandleAndLocalPath> confirmedSstFiles;
199+
final SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles;
200200

201201
// use the last completed checkpoint as the comparison base.
202202
synchronized (uploadedSstFiles) {
203203
lastCompletedCheckpoint = lastCompletedCheckpointId;
204-
confirmedSstFiles = uploadedSstFiles.get(lastCompletedCheckpoint);
205-
LOG.trace(
206-
"Use confirmed SST files for checkpoint {}: {}",
207-
checkpointId,
208-
confirmedSstFiles);
204+
currentUploadedSstFiles =
205+
new TreeMap<>(uploadedSstFiles.tailMap(lastCompletedCheckpoint));
209206
}
207+
PreviousSnapshot previousSnapshot =
208+
new PreviousSnapshot(currentUploadedSstFiles, lastCompletedCheckpoint);
210209
LOG.trace(
211210
"Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} "
212211
+ "assuming the following (shared) confirmed files as base: {}.",
213212
checkpointId,
214213
lastCompletedCheckpoint,
215-
confirmedSstFiles);
214+
previousSnapshot);
216215

217216
// snapshot meta data to save
218217
for (Map.Entry<String, RocksDbKvStateInfo> stateMetaInfoEntry :
219218
kvStateInformation.entrySet()) {
220219
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().metaInfo.snapshot());
221220
}
222-
return new PreviousSnapshot(confirmedSstFiles);
221+
return previousSnapshot;
223222
}
224223

225224
/**
@@ -352,6 +351,7 @@ private long uploadSnapshotFiles(
352351
List<Path> miscFilePaths = new ArrayList<>(files.length);
353352

354353
createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
354+
LOG.info("Will re-use {} SST files. {}", sstFiles.size(), sstFiles);
355355

356356
final CheckpointedStateScope stateScope =
357357
sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING

0 commit comments

Comments
 (0)