Skip to content

Commit 74f24ee

Browse files
committed
[FLINK-38574][checkpoint] Avoid reusing re-uploaded sst files when checkpoint notification is delayed
1 parent 5156147 commit 74f24ee

File tree

6 files changed

+237
-30
lines changed

6 files changed

+237
-30
lines changed

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategy.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -196,30 +196,28 @@ protected PreviousSnapshot snapshotMetaData(
196196
long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
197197

198198
final long lastCompletedCheckpoint;
199-
final Collection<HandleAndLocalPath> confirmedSstFiles;
199+
final SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles;
200200

201201
// use the last completed checkpoint as the comparison base.
202202
synchronized (uploadedSstFiles) {
203203
lastCompletedCheckpoint = lastCompletedCheckpointId;
204-
confirmedSstFiles = uploadedSstFiles.get(lastCompletedCheckpoint);
205-
LOG.trace(
206-
"Use confirmed SST files for checkpoint {}: {}",
207-
checkpointId,
208-
confirmedSstFiles);
204+
currentUploadedSstFiles =
205+
new TreeMap<>(uploadedSstFiles.tailMap(lastCompletedCheckpoint));
209206
}
207+
PreviousSnapshot previousSnapshot = new PreviousSnapshot(currentUploadedSstFiles);
210208
LOG.trace(
211209
"Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} "
212210
+ "assuming the following (shared) confirmed files as base: {}.",
213211
checkpointId,
214212
lastCompletedCheckpoint,
215-
confirmedSstFiles);
213+
previousSnapshot);
216214

217215
// snapshot meta data to save
218216
for (Map.Entry<String, ForStOperationUtils.ForStKvStateInfo> stateMetaInfoEntry :
219217
kvStateInformation.entrySet()) {
220218
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().metaInfo.snapshot());
221219
}
222-
return new PreviousSnapshot(confirmedSstFiles);
220+
return previousSnapshot;
223221
}
224222

225223
/** Encapsulates the process to perform an incremental snapshot of a ForStKeyedStateBackend. */

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import org.apache.flink.util.ResourceGuard;
4747

4848
import org.forstdb.RocksDB;
49+
import org.slf4j.Logger;
50+
import org.slf4j.LoggerFactory;
4951

5052
import javax.annotation.Nonnegative;
5153
import javax.annotation.Nonnull;
@@ -58,6 +60,7 @@
5860
import java.util.List;
5961
import java.util.Map;
6062
import java.util.Optional;
63+
import java.util.SortedMap;
6164
import java.util.UUID;
6265
import java.util.concurrent.atomic.AtomicBoolean;
6366
import java.util.stream.Collectors;
@@ -70,6 +73,8 @@
7073
public abstract class ForStSnapshotStrategyBase<K, R extends SnapshotResources>
7174
implements CheckpointListener, SnapshotStrategy<KeyedStateHandle, R>, AutoCloseable {
7275

76+
private static final Logger LOG = LoggerFactory.getLogger(ForStSnapshotStrategyBase.class);
77+
7378
@Nonnull private final String description;
7479

7580
/** ForSt instance from the backend. */
@@ -250,23 +255,85 @@ public void release() {
250255
}
251256
}
252257

253-
protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
254-
new PreviousSnapshot(Collections.emptyList());
258+
protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT = new PreviousSnapshot(null);
255259

256260
/** Previous snapshot with uploaded sst files. */
257261
protected static class PreviousSnapshot {
258262

259263
@Nonnull private final Map<String, StreamStateHandle> confirmedSstFiles;
260264

261-
protected PreviousSnapshot(@Nullable Collection<HandleAndLocalPath> confirmedSstFiles) {
265+
/**
266+
* Constructor of PreviousSnapshot. Giving a map of uploaded sst files in previous
267+
* checkpoints, prune the sst files which have been re-uploaded in the following
268+
* checkpoints. The prune logic is used to resolve the mismatch between TM and JM due to
269+
* notification delay. Following steps for example:
270+
*
271+
* <ul>
272+
* <li>1) checkpoint 1 uses file 00001.SST uploaded as xxx.sst.
273+
* <li>2) checkpoint 2 uses the same file 00001.SST but re-uploads it as yyy.sst because
274+
* CP 1 wasn't yet confirmed.
275+
* <li>3) TM get a confirmation of checkpoint 1.
276+
* <li>4) JM completes checkpoint 2 and subsumes checkpoint 1 - removing xxx.sst.
277+
* <li>5) checkpoint 3 tries to re-use file 00001.SST uploaded as xxx.sst in checkpoint 1,
278+
* but it was deleted in (4) by JM.
279+
* </ul>
280+
*
281+
* @param currentUploadedSstFiles the sst files uploaded in previous checkpoints.
282+
*/
283+
protected PreviousSnapshot(
284+
@Nullable SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles) {
262285
this.confirmedSstFiles =
263-
confirmedSstFiles != null
264-
? confirmedSstFiles.stream()
286+
currentUploadedSstFiles != null
287+
? pruneFirstCheckpointSstFiles(currentUploadedSstFiles)
288+
: Collections.emptyMap();
289+
}
290+
291+
/**
292+
* The first checkpoint's uploaded sst files are all included, then for each following
293+
* checkpoint, if a sst file has been re-uploaded, remove it from the first checkpoint's sst
294+
* files.
295+
*
296+
* @param currentUploadedSstFiles the sst files uploaded in the following checkpoint.
297+
*/
298+
private Map<String, StreamStateHandle> pruneFirstCheckpointSstFiles(
299+
@Nonnull SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles) {
300+
Map<String, StreamStateHandle> prunedSstFiles = null;
301+
int removedCount = 0;
302+
for (Map.Entry<Long, Collection<HandleAndLocalPath>> entry :
303+
currentUploadedSstFiles.entrySet()) {
304+
// Iterate checkpoints in ascending order of checkpoint id.
305+
if (prunedSstFiles == null) {
306+
// The first checkpoint's uploaded sst files are all included.
307+
prunedSstFiles =
308+
entry.getValue().stream()
265309
.collect(
266310
Collectors.toMap(
267311
HandleAndLocalPath::getLocalPath,
268-
HandleAndLocalPath::getHandle))
269-
: Collections.emptyMap();
312+
HandleAndLocalPath::getHandle));
313+
} else if (!prunedSstFiles.isEmpty()) {
314+
// Prune sst files which have been re-uploaded in the following checkpoints.
315+
for (HandleAndLocalPath handleAndLocalPath : entry.getValue()) {
316+
if (!(handleAndLocalPath.getHandle()
317+
instanceof PlaceholderStreamStateHandle)) {
318+
// If it's not a placeholder handle, it means the sst file has been
319+
// re-uploaded in the following checkpoint.
320+
if (prunedSstFiles.remove(handleAndLocalPath.getLocalPath()) != null) {
321+
removedCount++;
322+
}
323+
}
324+
}
325+
}
326+
}
327+
if (removedCount > 0 && LOG.isTraceEnabled()) {
328+
LOG.trace(
329+
"Removed {} re-uploaded sst files from base file set for incremental "
330+
+ "checkpoint. Base checkpoint id: {}",
331+
removedCount,
332+
currentUploadedSstFiles.firstKey());
333+
}
334+
return (prunedSstFiles != null && !prunedSstFiles.isEmpty())
335+
? Collections.unmodifiableMap(prunedSstFiles)
336+
: Collections.emptyMap();
270337
}
271338

272339
protected Optional<StreamStateHandle> getUploaded(String filename) {
@@ -290,5 +357,10 @@ protected Optional<StreamStateHandle> getUploaded(String filename) {
290357
protected boolean isEmpty() {
291358
return confirmedSstFiles.isEmpty();
292359
}
360+
361+
@Override
362+
public String toString() {
363+
return "PreviousSnapshot{" + "confirmedSstFiles=" + confirmedSstFiles + '}';
364+
}
293365
}
294366
}

flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,41 @@ private void testCheckpointIsIncremental(boolean forceFirstFull) throws Exceptio
105105
}
106106
}
107107

108+
@Test
109+
void testCheckpointIsIncrementalWithLateNotification() throws Exception {
110+
111+
try (CloseableRegistry closeableRegistry = new CloseableRegistry();
112+
ForStSnapshotStrategyBase<?, ?> checkpointSnapshotStrategy =
113+
createSnapshotStrategy()) {
114+
FsCheckpointStreamFactory checkpointStreamFactory = createFsCheckpointStreamFactory();
115+
116+
// make and checkpoint with id 1
117+
snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory, closeableRegistry);
118+
119+
// make and checkpoint with id 2
120+
snapshot(2L, checkpointSnapshotStrategy, checkpointStreamFactory, closeableRegistry);
121+
122+
// Late notify checkpoint with id 1
123+
checkpointSnapshotStrategy.notifyCheckpointComplete(1L);
124+
125+
// make checkpoint with id 3, based on checkpoint 1
126+
IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle3 =
127+
snapshot(
128+
3L,
129+
checkpointSnapshotStrategy,
130+
checkpointStreamFactory,
131+
closeableRegistry);
132+
133+
// Late notify checkpoint with id 2
134+
checkpointSnapshotStrategy.notifyCheckpointComplete(2L);
135+
136+
// 3rd checkpoint is based on 1st checkpoint, BUT the 2nd checkpoint re-uploaded the 1st
137+
// one, so it should be based on nothing, thus this is effectively a full checkpoint.
138+
assertThat(incrementalRemoteKeyedStateHandle3.getStateSize())
139+
.isEqualTo(incrementalRemoteKeyedStateHandle3.getCheckpointedSize());
140+
}
141+
}
142+
108143
@Test
109144
void testCheckpointIsFull() throws Exception {
110145

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksDBSnapshotStrategyBase.java

Lines changed: 75 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import java.util.List;
6969
import java.util.Map;
7070
import java.util.Optional;
71+
import java.util.SortedMap;
7172
import java.util.UUID;
7273
import java.util.stream.Collectors;
7374

@@ -393,23 +394,85 @@ public void release() {
393394
}
394395
}
395396

396-
protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
397-
new PreviousSnapshot(Collections.emptyList());
397+
protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT = new PreviousSnapshot(null);
398398

399399
/** Previous snapshot with uploaded sst files. */
400400
protected static class PreviousSnapshot {
401401

402402
@Nonnull private final Map<String, StreamStateHandle> confirmedSstFiles;
403403

404-
protected PreviousSnapshot(@Nullable Collection<HandleAndLocalPath> confirmedSstFiles) {
404+
/**
405+
* Constructor of PreviousSnapshot. Giving a map of uploaded sst files in previous
406+
* checkpoints, prune the sst files which have been re-uploaded in the following
407+
* checkpoints. The prune logic is used to resolve the mismatch between TM and JM due to
408+
* notification delay. Following steps for example:
409+
*
410+
* <ul>
411+
* <li>1) checkpoint 1 uses file 00001.SST uploaded as xxx.sst.
412+
* <li>2) checkpoint 2 uses the same file 00001.SST but re-uploads it as yyy.sst because
413+
* CP 1 wasn't yet confirmed.
414+
* <li>3) TM get a confirmation of checkpoint 1.
415+
* <li>4) JM completes checkpoint 2 and subsumes checkpoint 1 - removing xxx.sst.
416+
* <li>5) checkpoint 3 tries to re-use file 00001.SST uploaded as xxx.sst in checkpoint 1,
417+
* but it was deleted in (4) by JM.
418+
* </ul>
419+
*
420+
* @param currentUploadedSstFiles the sst files uploaded in previous checkpoints.
421+
*/
422+
protected PreviousSnapshot(
423+
@Nullable SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles) {
405424
this.confirmedSstFiles =
406-
confirmedSstFiles != null
407-
? confirmedSstFiles.stream()
425+
currentUploadedSstFiles != null
426+
? pruneFirstCheckpointSstFiles(currentUploadedSstFiles)
427+
: Collections.emptyMap();
428+
}
429+
430+
/**
431+
* The first checkpoint's uploaded sst files are all included, then for each following
432+
* checkpoint, if a sst file has been re-uploaded, remove it from the first checkpoint's sst
433+
* files.
434+
*
435+
* @param currentUploadedSstFiles the sst files uploaded in the following checkpoint.
436+
*/
437+
private Map<String, StreamStateHandle> pruneFirstCheckpointSstFiles(
438+
@Nonnull SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles) {
439+
Map<String, StreamStateHandle> prunedSstFiles = null;
440+
int removedCount = 0;
441+
for (Map.Entry<Long, Collection<HandleAndLocalPath>> entry :
442+
currentUploadedSstFiles.entrySet()) {
443+
// Iterate checkpoints in ascending order of checkpoint id.
444+
if (prunedSstFiles == null) {
445+
// The first checkpoint's uploaded sst files are all included.
446+
prunedSstFiles =
447+
entry.getValue().stream()
408448
.collect(
409449
Collectors.toMap(
410450
HandleAndLocalPath::getLocalPath,
411-
HandleAndLocalPath::getHandle))
412-
: Collections.emptyMap();
451+
HandleAndLocalPath::getHandle));
452+
} else if (!prunedSstFiles.isEmpty()) {
453+
// Prune sst files which have been re-uploaded in the following checkpoints.
454+
for (HandleAndLocalPath handleAndLocalPath : entry.getValue()) {
455+
if (!(handleAndLocalPath.getHandle()
456+
instanceof PlaceholderStreamStateHandle)) {
457+
// If it's not a placeholder handle, it means the sst file has been
458+
// re-uploaded in the following checkpoint.
459+
if (prunedSstFiles.remove(handleAndLocalPath.getLocalPath()) != null) {
460+
removedCount++;
461+
}
462+
}
463+
}
464+
}
465+
}
466+
if (removedCount > 0 && LOG.isTraceEnabled()) {
467+
LOG.trace(
468+
"Removed {} re-uploaded sst files from base file set for incremental "
469+
+ "checkpoint. Base checkpoint id: {}",
470+
removedCount,
471+
currentUploadedSstFiles.firstKey());
472+
}
473+
return (prunedSstFiles != null && !prunedSstFiles.isEmpty())
474+
? Collections.unmodifiableMap(prunedSstFiles)
475+
: Collections.emptyMap();
413476
}
414477

415478
protected Optional<StreamStateHandle> getUploaded(String filename) {
@@ -429,5 +492,10 @@ protected Optional<StreamStateHandle> getUploaded(String filename) {
429492
return Optional.empty();
430493
}
431494
}
495+
496+
@Override
497+
public String toString() {
498+
return "PreviousSnapshot{" + "confirmedSstFiles=" + confirmedSstFiles + '}';
499+
}
432500
}
433501
}

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategy.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -196,30 +196,28 @@ protected PreviousSnapshot snapshotMetaData(
196196
long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
197197

198198
final long lastCompletedCheckpoint;
199-
final Collection<HandleAndLocalPath> confirmedSstFiles;
199+
final SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles;
200200

201201
// use the last completed checkpoint as the comparison base.
202202
synchronized (uploadedSstFiles) {
203203
lastCompletedCheckpoint = lastCompletedCheckpointId;
204-
confirmedSstFiles = uploadedSstFiles.get(lastCompletedCheckpoint);
205-
LOG.trace(
206-
"Use confirmed SST files for checkpoint {}: {}",
207-
checkpointId,
208-
confirmedSstFiles);
204+
currentUploadedSstFiles =
205+
new TreeMap<>(uploadedSstFiles.tailMap(lastCompletedCheckpoint));
209206
}
207+
PreviousSnapshot previousSnapshot = new PreviousSnapshot(currentUploadedSstFiles);
210208
LOG.trace(
211209
"Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} "
212210
+ "assuming the following (shared) confirmed files as base: {}.",
213211
checkpointId,
214212
lastCompletedCheckpoint,
215-
confirmedSstFiles);
213+
previousSnapshot);
216214

217215
// snapshot meta data to save
218216
for (Map.Entry<String, RocksDbKvStateInfo> stateMetaInfoEntry :
219217
kvStateInformation.entrySet()) {
220218
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().metaInfo.snapshot());
221219
}
222-
return new PreviousSnapshot(confirmedSstFiles);
220+
return previousSnapshot;
223221
}
224222

225223
/**
@@ -352,6 +350,7 @@ private long uploadSnapshotFiles(
352350
List<Path> miscFilePaths = new ArrayList<>(files.length);
353351

354352
createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
353+
LOG.info("Will re-use {} SST files. {}", sstFiles.size(), sstFiles);
355354

356355
final CheckpointedStateScope stateScope =
357356
sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING

0 commit comments

Comments
 (0)