diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategy.java index 253381c801ab9..45171f04700c9 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategy.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategy.java @@ -196,30 +196,29 @@ protected PreviousSnapshot snapshotMetaData( long checkpointId, @Nonnull List stateMetaInfoSnapshots) { final long lastCompletedCheckpoint; - final Collection confirmedSstFiles; + final SortedMap> currentUploadedSstFiles; // use the last completed checkpoint as the comparison base. synchronized (uploadedSstFiles) { lastCompletedCheckpoint = lastCompletedCheckpointId; - confirmedSstFiles = uploadedSstFiles.get(lastCompletedCheckpoint); - LOG.trace( - "Use confirmed SST files for checkpoint {}: {}", - checkpointId, - confirmedSstFiles); + currentUploadedSstFiles = + new TreeMap<>(uploadedSstFiles.tailMap(lastCompletedCheckpoint)); } + PreviousSnapshot previousSnapshot = + new PreviousSnapshot(currentUploadedSstFiles, lastCompletedCheckpoint); LOG.trace( "Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " + "assuming the following (shared) confirmed files as base: {}.", checkpointId, lastCompletedCheckpoint, - confirmedSstFiles); + previousSnapshot); // snapshot meta data to save for (Map.Entry stateMetaInfoEntry : kvStateInformation.entrySet()) { stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().metaInfo.snapshot()); } - return new PreviousSnapshot(confirmedSstFiles); + return previousSnapshot; } /** Encapsulates the process to perform an incremental snapshot of a ForStKeyedStateBackend. */ diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java index 7350e8aaf6c1b..78c1a2e3e7316 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java @@ -46,6 +46,8 @@ import org.apache.flink.util.ResourceGuard; import org.forstdb.RocksDB; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nonnegative; import javax.annotation.Nonnull; @@ -58,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.SortedMap; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -70,6 +73,8 @@ public abstract class ForStSnapshotStrategyBase implements CheckpointListener, SnapshotStrategy, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(ForStSnapshotStrategyBase.class); + @Nonnull private final String description; /** ForSt instance from the backend. */ @@ -251,22 +256,94 @@ public void release() { } protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT = - new PreviousSnapshot(Collections.emptyList()); + new PreviousSnapshot(null, -1L); /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { @Nonnull private final Map confirmedSstFiles; - protected PreviousSnapshot(@Nullable Collection confirmedSstFiles) { + /** + * Constructor of PreviousSnapshot. Giving a map of uploaded sst files in previous + * checkpoints, prune the sst files which have been re-uploaded in the following + * checkpoints. The prune logic is used to resolve the mismatch between TM and JM due to + * notification delay. Following steps for example: + * + *
    + *
  • 1) checkpoint 1 uses file 00001.SST uploaded as xxx.sst. + *
  • 2) checkpoint 2 uses the same file 00001.SST but re-uploads it as yyy.sst because + * CP 1 wasn't yet confirmed. + *
  • 3) TM get a confirmation of checkpoint 1. + *
  • 4) JM completes checkpoint 2 and subsumes checkpoint 1 - removing xxx.sst. + *
  • 5) checkpoint 3 tries to re-use file 00001.SST uploaded as xxx.sst in checkpoint 1, + * but it was deleted in (4) by JM. + *
+ * + * @param currentUploadedSstFiles the sst files uploaded in previous checkpoints. + * @param lastCompletedCheckpoint the last completed checkpoint id. + */ + protected PreviousSnapshot( + @Nullable SortedMap> currentUploadedSstFiles, + long lastCompletedCheckpoint) { this.confirmedSstFiles = - confirmedSstFiles != null - ? confirmedSstFiles.stream() + currentUploadedSstFiles != null + ? pruneFirstCheckpointSstFiles( + currentUploadedSstFiles, lastCompletedCheckpoint) + : Collections.emptyMap(); + } + + /** + * The last completed checkpoint's uploaded sst files are all included, then for each + * following checkpoint, if a sst file has been re-uploaded, remove it from the first + * checkpoint's sst files. + * + * @param currentUploadedSstFiles the sst files uploaded in the following checkpoint. + * @param lastCompletedCheckpoint the last completed checkpoint id. + */ + private Map pruneFirstCheckpointSstFiles( + @Nonnull SortedMap> currentUploadedSstFiles, + long lastCompletedCheckpoint) { + Map prunedSstFiles = null; + int removedCount = 0; + for (Map.Entry> entry : + currentUploadedSstFiles.entrySet()) { + // Iterate checkpoints in ascending order of checkpoint id. + if (entry.getKey() == lastCompletedCheckpoint) { + // The first checkpoint's uploaded sst files are all included. + prunedSstFiles = + entry.getValue().stream() .collect( Collectors.toMap( HandleAndLocalPath::getLocalPath, - HandleAndLocalPath::getHandle)) - : Collections.emptyMap(); + HandleAndLocalPath::getHandle)); + } else if (prunedSstFiles == null) { + // The last completed checkpoint's uploaded sst files are not existed. + // So we skip the pruning process. + break; + } else if (!prunedSstFiles.isEmpty()) { + // Prune sst files which have been re-uploaded in the following checkpoints. + for (HandleAndLocalPath handleAndLocalPath : entry.getValue()) { + if (!(handleAndLocalPath.getHandle() + instanceof PlaceholderStreamStateHandle)) { + // If it's not a placeholder handle, it means the sst file has been + // re-uploaded in the following checkpoint. + if (prunedSstFiles.remove(handleAndLocalPath.getLocalPath()) != null) { + removedCount++; + } + } + } + } + } + if (removedCount > 0 && LOG.isTraceEnabled()) { + LOG.trace( + "Removed {} re-uploaded sst files from base file set for incremental " + + "checkpoint. Base checkpoint id: {}", + removedCount, + currentUploadedSstFiles.firstKey()); + } + return (prunedSstFiles != null && !prunedSstFiles.isEmpty()) + ? Collections.unmodifiableMap(prunedSstFiles) + : Collections.emptyMap(); } protected Optional getUploaded(String filename) { @@ -290,5 +367,10 @@ protected Optional getUploaded(String filename) { protected boolean isEmpty() { return confirmedSstFiles.isEmpty(); } + + @Override + public String toString() { + return "PreviousSnapshot{" + "confirmedSstFiles=" + confirmedSstFiles + '}'; + } } } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java index acd19b48b7e47..034df702554ac 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java @@ -105,6 +105,41 @@ private void testCheckpointIsIncremental(boolean forceFirstFull) throws Exceptio } } + @Test + void testCheckpointIsIncrementalWithLateNotification() throws Exception { + + try (CloseableRegistry closeableRegistry = new CloseableRegistry(); + ForStSnapshotStrategyBase checkpointSnapshotStrategy = + createSnapshotStrategy()) { + FsCheckpointStreamFactory checkpointStreamFactory = createFsCheckpointStreamFactory(); + + // make and checkpoint with id 1 + snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory, closeableRegistry); + + // make and checkpoint with id 2 + snapshot(2L, checkpointSnapshotStrategy, checkpointStreamFactory, closeableRegistry); + + // Late notify checkpoint with id 1 + checkpointSnapshotStrategy.notifyCheckpointComplete(1L); + + // make checkpoint with id 3, based on checkpoint 1 + IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle3 = + snapshot( + 3L, + checkpointSnapshotStrategy, + checkpointStreamFactory, + closeableRegistry); + + // Late notify checkpoint with id 2 + checkpointSnapshotStrategy.notifyCheckpointComplete(2L); + + // 3rd checkpoint is based on 1st checkpoint, BUT the 2nd checkpoint re-uploaded the 1st + // one, so it should be based on nothing, thus this is effectively a full checkpoint. + assertThat(incrementalRemoteKeyedStateHandle3.getStateSize()) + .isEqualTo(incrementalRemoteKeyedStateHandle3.getCheckpointedSize()); + } + } + @Test void testCheckpointIsFull() throws Exception { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksDBSnapshotStrategyBase.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksDBSnapshotStrategyBase.java index e06cd9896fab3..85619a669df7f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksDBSnapshotStrategyBase.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksDBSnapshotStrategyBase.java @@ -68,6 +68,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.SortedMap; import java.util.UUID; import java.util.stream.Collectors; @@ -394,22 +395,94 @@ public void release() { } protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT = - new PreviousSnapshot(Collections.emptyList()); + new PreviousSnapshot(null, -1L); /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { @Nonnull private final Map confirmedSstFiles; - protected PreviousSnapshot(@Nullable Collection confirmedSstFiles) { + /** + * Constructor of PreviousSnapshot. Giving a map of uploaded sst files in previous + * checkpoints, prune the sst files which have been re-uploaded in the following + * checkpoints. The prune logic is used to resolve the mismatch between TM and JM due to + * notification delay. Following steps for example: + * + *
    + *
  • 1) checkpoint 1 uses file 00001.SST uploaded as xxx.sst. + *
  • 2) checkpoint 2 uses the same file 00001.SST but re-uploads it as yyy.sst because + * CP 1 wasn't yet confirmed. + *
  • 3) TM get a confirmation of checkpoint 1. + *
  • 4) JM completes checkpoint 2 and subsumes checkpoint 1 - removing xxx.sst. + *
  • 5) checkpoint 3 tries to re-use file 00001.SST uploaded as xxx.sst in checkpoint 1, + * but it was deleted in (4) by JM. + *
+ * + * @param currentUploadedSstFiles the sst files uploaded in previous checkpoints. + * @param lastCompletedCheckpoint the last completed checkpoint id. + */ + protected PreviousSnapshot( + @Nullable SortedMap> currentUploadedSstFiles, + long lastCompletedCheckpoint) { this.confirmedSstFiles = - confirmedSstFiles != null - ? confirmedSstFiles.stream() + currentUploadedSstFiles != null + ? pruneFirstCheckpointSstFiles( + currentUploadedSstFiles, lastCompletedCheckpoint) + : Collections.emptyMap(); + } + + /** + * The last completed checkpoint's uploaded sst files are all included, then for each + * following checkpoint, if a sst file has been re-uploaded, remove it from the first + * checkpoint's sst files. + * + * @param currentUploadedSstFiles the sst files uploaded in the following checkpoint. + * @param lastCompletedCheckpoint the last completed checkpoint id. + */ + private Map pruneFirstCheckpointSstFiles( + @Nonnull SortedMap> currentUploadedSstFiles, + long lastCompletedCheckpoint) { + Map prunedSstFiles = null; + int removedCount = 0; + for (Map.Entry> entry : + currentUploadedSstFiles.entrySet()) { + // Iterate checkpoints in ascending order of checkpoint id. + if (entry.getKey() == lastCompletedCheckpoint) { + // The first checkpoint's uploaded sst files are all included. + prunedSstFiles = + entry.getValue().stream() .collect( Collectors.toMap( HandleAndLocalPath::getLocalPath, - HandleAndLocalPath::getHandle)) - : Collections.emptyMap(); + HandleAndLocalPath::getHandle)); + } else if (prunedSstFiles == null) { + // The last completed checkpoint's uploaded sst files are not existed. + // So we skip the pruning process. + break; + } else if (!prunedSstFiles.isEmpty()) { + // Prune sst files which have been re-uploaded in the following checkpoints. + for (HandleAndLocalPath handleAndLocalPath : entry.getValue()) { + if (!(handleAndLocalPath.getHandle() + instanceof PlaceholderStreamStateHandle)) { + // If it's not a placeholder handle, it means the sst file has been + // re-uploaded in the following checkpoint. + if (prunedSstFiles.remove(handleAndLocalPath.getLocalPath()) != null) { + removedCount++; + } + } + } + } + } + if (removedCount > 0 && LOG.isTraceEnabled()) { + LOG.trace( + "Removed {} re-uploaded sst files from base file set for incremental " + + "checkpoint. Base checkpoint id: {}", + removedCount, + currentUploadedSstFiles.firstKey()); + } + return (prunedSstFiles != null && !prunedSstFiles.isEmpty()) + ? Collections.unmodifiableMap(prunedSstFiles) + : Collections.emptyMap(); } protected Optional getUploaded(String filename) { @@ -429,5 +502,10 @@ protected Optional getUploaded(String filename) { return Optional.empty(); } } + + @Override + public String toString() { + return "PreviousSnapshot{" + "confirmedSstFiles=" + confirmedSstFiles + '}'; + } } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategy.java index 5ae5cdd904f6f..af81c29fc8228 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategy.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategy.java @@ -196,30 +196,29 @@ protected PreviousSnapshot snapshotMetaData( long checkpointId, @Nonnull List stateMetaInfoSnapshots) { final long lastCompletedCheckpoint; - final Collection confirmedSstFiles; + final SortedMap> currentUploadedSstFiles; // use the last completed checkpoint as the comparison base. synchronized (uploadedSstFiles) { lastCompletedCheckpoint = lastCompletedCheckpointId; - confirmedSstFiles = uploadedSstFiles.get(lastCompletedCheckpoint); - LOG.trace( - "Use confirmed SST files for checkpoint {}: {}", - checkpointId, - confirmedSstFiles); + currentUploadedSstFiles = + new TreeMap<>(uploadedSstFiles.tailMap(lastCompletedCheckpoint)); } + PreviousSnapshot previousSnapshot = + new PreviousSnapshot(currentUploadedSstFiles, lastCompletedCheckpoint); LOG.trace( "Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " + "assuming the following (shared) confirmed files as base: {}.", checkpointId, lastCompletedCheckpoint, - confirmedSstFiles); + previousSnapshot); // snapshot meta data to save for (Map.Entry stateMetaInfoEntry : kvStateInformation.entrySet()) { stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().metaInfo.snapshot()); } - return new PreviousSnapshot(confirmedSstFiles); + return previousSnapshot; } /** @@ -352,6 +351,7 @@ private long uploadSnapshotFiles( List miscFilePaths = new ArrayList<>(files.length); createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths); + LOG.info("Will re-use {} SST files. {}", sstFiles.size(), sstFiles); final CheckpointedStateScope stateScope = sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategyTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategyTest.java index 9007a7da70a33..e6a7d3fdb2fb1 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategyTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/snapshot/RocksIncrementalSnapshotStrategyTest.java @@ -96,6 +96,41 @@ void testCheckpointIsIncremental() throws Exception { } } + @Test + void testCheckpointIsIncrementalWithLateNotification() throws Exception { + + try (CloseableRegistry closeableRegistry = new CloseableRegistry(); + RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy = + createSnapshotStrategy()) { + FsCheckpointStreamFactory checkpointStreamFactory = createFsCheckpointStreamFactory(); + + // make and checkpoint with id 1 + snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory, closeableRegistry); + + // make and checkpoint with id 2 + snapshot(2L, checkpointSnapshotStrategy, checkpointStreamFactory, closeableRegistry); + + // Late notify checkpoint with id 1 + checkpointSnapshotStrategy.notifyCheckpointComplete(1L); + + // make checkpoint with id 3, based on checkpoint 1 + IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle3 = + snapshot( + 3L, + checkpointSnapshotStrategy, + checkpointStreamFactory, + closeableRegistry); + + // Late notify checkpoint with id 2 + checkpointSnapshotStrategy.notifyCheckpointComplete(2L); + + // 3rd checkpoint is based on 1st checkpoint, BUT the 2nd checkpoint re-uploaded the 1st + // one, so it should be based on nothing, thus this is effectively a full checkpoint. + assertThat(incrementalRemoteKeyedStateHandle3.getStateSize()) + .isEqualTo(incrementalRemoteKeyedStateHandle3.getCheckpointedSize()); + } + } + public RocksIncrementalSnapshotStrategy createSnapshotStrategy() throws IOException, RocksDBException {