Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,30 +196,29 @@ protected PreviousSnapshot snapshotMetaData(
long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {

final long lastCompletedCheckpoint;
final Collection<HandleAndLocalPath> confirmedSstFiles;
final SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles;

// use the last completed checkpoint as the comparison base.
synchronized (uploadedSstFiles) {
lastCompletedCheckpoint = lastCompletedCheckpointId;
confirmedSstFiles = uploadedSstFiles.get(lastCompletedCheckpoint);
LOG.trace(
"Use confirmed SST files for checkpoint {}: {}",
checkpointId,
confirmedSstFiles);
currentUploadedSstFiles =
new TreeMap<>(uploadedSstFiles.tailMap(lastCompletedCheckpoint));
}
PreviousSnapshot previousSnapshot =
new PreviousSnapshot(currentUploadedSstFiles, lastCompletedCheckpoint);
LOG.trace(
"Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} "
+ "assuming the following (shared) confirmed files as base: {}.",
checkpointId,
lastCompletedCheckpoint,
confirmedSstFiles);
previousSnapshot);

// snapshot meta data to save
for (Map.Entry<String, ForStOperationUtils.ForStKvStateInfo> stateMetaInfoEntry :
kvStateInformation.entrySet()) {
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().metaInfo.snapshot());
}
return new PreviousSnapshot(confirmedSstFiles);
return previousSnapshot;
}

/** Encapsulates the process to perform an incremental snapshot of a ForStKeyedStateBackend. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.flink.util.ResourceGuard;

import org.forstdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
Expand All @@ -58,6 +60,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
Expand All @@ -70,6 +73,8 @@
public abstract class ForStSnapshotStrategyBase<K, R extends SnapshotResources>
implements CheckpointListener, SnapshotStrategy<KeyedStateHandle, R>, AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(ForStSnapshotStrategyBase.class);

@Nonnull private final String description;

/** ForSt instance from the backend. */
Expand Down Expand Up @@ -251,22 +256,94 @@ public void release() {
}

protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
new PreviousSnapshot(Collections.emptyList());
new PreviousSnapshot(null, -1L);

/** Previous snapshot with uploaded sst files. */
protected static class PreviousSnapshot {

@Nonnull private final Map<String, StreamStateHandle> confirmedSstFiles;

protected PreviousSnapshot(@Nullable Collection<HandleAndLocalPath> confirmedSstFiles) {
/**
* Constructor of PreviousSnapshot. Giving a map of uploaded sst files in previous
* checkpoints, prune the sst files which have been re-uploaded in the following
* checkpoints. The prune logic is used to resolve the mismatch between TM and JM due to
* notification delay. Following steps for example:
*
* <ul>
* <li>1) checkpoint 1 uses file 00001.SST uploaded as xxx.sst.
* <li>2) checkpoint 2 uses the same file 00001.SST but re-uploads it as yyy.sst because
* CP 1 wasn't yet confirmed.
* <li>3) TM get a confirmation of checkpoint 1.
* <li>4) JM completes checkpoint 2 and subsumes checkpoint 1 - removing xxx.sst.
* <li>5) checkpoint 3 tries to re-use file 00001.SST uploaded as xxx.sst in checkpoint 1,
* but it was deleted in (4) by JM.
* </ul>
*
* @param currentUploadedSstFiles the sst files uploaded in previous checkpoints.
* @param lastCompletedCheckpoint the last completed checkpoint id.
*/
protected PreviousSnapshot(
@Nullable SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles,
long lastCompletedCheckpoint) {
this.confirmedSstFiles =
confirmedSstFiles != null
? confirmedSstFiles.stream()
currentUploadedSstFiles != null
? pruneFirstCheckpointSstFiles(
currentUploadedSstFiles, lastCompletedCheckpoint)
: Collections.emptyMap();
}

/**
* The last completed checkpoint's uploaded sst files are all included, then for each
* following checkpoint, if a sst file has been re-uploaded, remove it from the first
* checkpoint's sst files.
*
* @param currentUploadedSstFiles the sst files uploaded in the following checkpoint.
* @param lastCompletedCheckpoint the last completed checkpoint id.
*/
private Map<String, StreamStateHandle> pruneFirstCheckpointSstFiles(
@Nonnull SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles,
long lastCompletedCheckpoint) {
Map<String, StreamStateHandle> prunedSstFiles = null;
int removedCount = 0;
for (Map.Entry<Long, Collection<HandleAndLocalPath>> entry :
currentUploadedSstFiles.entrySet()) {
// Iterate checkpoints in ascending order of checkpoint id.
if (entry.getKey() == lastCompletedCheckpoint) {
// The first checkpoint's uploaded sst files are all included.
prunedSstFiles =
entry.getValue().stream()
.collect(
Collectors.toMap(
HandleAndLocalPath::getLocalPath,
HandleAndLocalPath::getHandle))
: Collections.emptyMap();
HandleAndLocalPath::getHandle));
} else if (prunedSstFiles == null) {
// The last completed checkpoint's uploaded sst files are not existed.
// So we skip the pruning process.
break;
} else if (!prunedSstFiles.isEmpty()) {
// Prune sst files which have been re-uploaded in the following checkpoints.
for (HandleAndLocalPath handleAndLocalPath : entry.getValue()) {
if (!(handleAndLocalPath.getHandle()
instanceof PlaceholderStreamStateHandle)) {
// If it's not a placeholder handle, it means the sst file has been
// re-uploaded in the following checkpoint.
if (prunedSstFiles.remove(handleAndLocalPath.getLocalPath()) != null) {
removedCount++;
}
}
}
}
}
if (removedCount > 0 && LOG.isTraceEnabled()) {
LOG.trace(
"Removed {} re-uploaded sst files from base file set for incremental "
+ "checkpoint. Base checkpoint id: {}",
removedCount,
currentUploadedSstFiles.firstKey());
}
return (prunedSstFiles != null && !prunedSstFiles.isEmpty())
? Collections.unmodifiableMap(prunedSstFiles)
: Collections.emptyMap();
}

protected Optional<StreamStateHandle> getUploaded(String filename) {
Expand All @@ -290,5 +367,10 @@ protected Optional<StreamStateHandle> getUploaded(String filename) {
protected boolean isEmpty() {
return confirmedSstFiles.isEmpty();
}

@Override
public String toString() {
return "PreviousSnapshot{" + "confirmedSstFiles=" + confirmedSstFiles + '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,41 @@ private void testCheckpointIsIncremental(boolean forceFirstFull) throws Exceptio
}
}

@Test
void testCheckpointIsIncrementalWithLateNotification() throws Exception {

try (CloseableRegistry closeableRegistry = new CloseableRegistry();
ForStSnapshotStrategyBase<?, ?> checkpointSnapshotStrategy =
createSnapshotStrategy()) {
FsCheckpointStreamFactory checkpointStreamFactory = createFsCheckpointStreamFactory();

// make and checkpoint with id 1
snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory, closeableRegistry);

// make and checkpoint with id 2
snapshot(2L, checkpointSnapshotStrategy, checkpointStreamFactory, closeableRegistry);

// Late notify checkpoint with id 1
checkpointSnapshotStrategy.notifyCheckpointComplete(1L);

// make checkpoint with id 3, based on checkpoint 1
IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle3 =
snapshot(
3L,
checkpointSnapshotStrategy,
checkpointStreamFactory,
closeableRegistry);

// Late notify checkpoint with id 2
checkpointSnapshotStrategy.notifyCheckpointComplete(2L);

// 3rd checkpoint is based on 1st checkpoint, BUT the 2nd checkpoint re-uploaded the 1st
// one, so it should be based on nothing, thus this is effectively a full checkpoint.
assertThat(incrementalRemoteKeyedStateHandle3.getStateSize())
.isEqualTo(incrementalRemoteKeyedStateHandle3.getCheckpointedSize());
}
}

@Test
void testCheckpointIsFull() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.UUID;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -394,22 +395,94 @@ public void release() {
}

protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
new PreviousSnapshot(Collections.emptyList());
new PreviousSnapshot(null, -1L);

/** Previous snapshot with uploaded sst files. */
protected static class PreviousSnapshot {

@Nonnull private final Map<String, StreamStateHandle> confirmedSstFiles;

protected PreviousSnapshot(@Nullable Collection<HandleAndLocalPath> confirmedSstFiles) {
/**
* Constructor of PreviousSnapshot. Giving a map of uploaded sst files in previous
* checkpoints, prune the sst files which have been re-uploaded in the following
* checkpoints. The prune logic is used to resolve the mismatch between TM and JM due to
* notification delay. Following steps for example:
*
* <ul>
* <li>1) checkpoint 1 uses file 00001.SST uploaded as xxx.sst.
* <li>2) checkpoint 2 uses the same file 00001.SST but re-uploads it as yyy.sst because
* CP 1 wasn't yet confirmed.
* <li>3) TM get a confirmation of checkpoint 1.
* <li>4) JM completes checkpoint 2 and subsumes checkpoint 1 - removing xxx.sst.
* <li>5) checkpoint 3 tries to re-use file 00001.SST uploaded as xxx.sst in checkpoint 1,
* but it was deleted in (4) by JM.
* </ul>
*
* @param currentUploadedSstFiles the sst files uploaded in previous checkpoints.
* @param lastCompletedCheckpoint the last completed checkpoint id.
*/
protected PreviousSnapshot(
@Nullable SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles,
long lastCompletedCheckpoint) {
this.confirmedSstFiles =
confirmedSstFiles != null
? confirmedSstFiles.stream()
currentUploadedSstFiles != null
? pruneFirstCheckpointSstFiles(
currentUploadedSstFiles, lastCompletedCheckpoint)
: Collections.emptyMap();
}

/**
* The last completed checkpoint's uploaded sst files are all included, then for each
* following checkpoint, if a sst file has been re-uploaded, remove it from the first
* checkpoint's sst files.
*
* @param currentUploadedSstFiles the sst files uploaded in the following checkpoint.
* @param lastCompletedCheckpoint the last completed checkpoint id.
*/
private Map<String, StreamStateHandle> pruneFirstCheckpointSstFiles(
@Nonnull SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles,
long lastCompletedCheckpoint) {
Map<String, StreamStateHandle> prunedSstFiles = null;
int removedCount = 0;
for (Map.Entry<Long, Collection<HandleAndLocalPath>> entry :
currentUploadedSstFiles.entrySet()) {
// Iterate checkpoints in ascending order of checkpoint id.
if (entry.getKey() == lastCompletedCheckpoint) {
// The first checkpoint's uploaded sst files are all included.
prunedSstFiles =
entry.getValue().stream()
.collect(
Collectors.toMap(
HandleAndLocalPath::getLocalPath,
HandleAndLocalPath::getHandle))
: Collections.emptyMap();
HandleAndLocalPath::getHandle));
} else if (prunedSstFiles == null) {
// The last completed checkpoint's uploaded sst files are not existed.
// So we skip the pruning process.
break;
} else if (!prunedSstFiles.isEmpty()) {
// Prune sst files which have been re-uploaded in the following checkpoints.
for (HandleAndLocalPath handleAndLocalPath : entry.getValue()) {
if (!(handleAndLocalPath.getHandle()
instanceof PlaceholderStreamStateHandle)) {
// If it's not a placeholder handle, it means the sst file has been
// re-uploaded in the following checkpoint.
if (prunedSstFiles.remove(handleAndLocalPath.getLocalPath()) != null) {
removedCount++;
}
}
}
}
}
if (removedCount > 0 && LOG.isTraceEnabled()) {
LOG.trace(
"Removed {} re-uploaded sst files from base file set for incremental "
+ "checkpoint. Base checkpoint id: {}",
removedCount,
currentUploadedSstFiles.firstKey());
}
return (prunedSstFiles != null && !prunedSstFiles.isEmpty())
? Collections.unmodifiableMap(prunedSstFiles)
: Collections.emptyMap();
}

protected Optional<StreamStateHandle> getUploaded(String filename) {
Expand All @@ -429,5 +502,10 @@ protected Optional<StreamStateHandle> getUploaded(String filename) {
return Optional.empty();
}
}

@Override
public String toString() {
return "PreviousSnapshot{" + "confirmedSstFiles=" + confirmedSstFiles + '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,30 +196,29 @@ protected PreviousSnapshot snapshotMetaData(
long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {

final long lastCompletedCheckpoint;
final Collection<HandleAndLocalPath> confirmedSstFiles;
final SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles;

// use the last completed checkpoint as the comparison base.
synchronized (uploadedSstFiles) {
lastCompletedCheckpoint = lastCompletedCheckpointId;
confirmedSstFiles = uploadedSstFiles.get(lastCompletedCheckpoint);
LOG.trace(
"Use confirmed SST files for checkpoint {}: {}",
checkpointId,
confirmedSstFiles);
currentUploadedSstFiles =
new TreeMap<>(uploadedSstFiles.tailMap(lastCompletedCheckpoint));
}
PreviousSnapshot previousSnapshot =
new PreviousSnapshot(currentUploadedSstFiles, lastCompletedCheckpoint);
LOG.trace(
"Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} "
+ "assuming the following (shared) confirmed files as base: {}.",
checkpointId,
lastCompletedCheckpoint,
confirmedSstFiles);
previousSnapshot);

// snapshot meta data to save
for (Map.Entry<String, RocksDbKvStateInfo> stateMetaInfoEntry :
kvStateInformation.entrySet()) {
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().metaInfo.snapshot());
}
return new PreviousSnapshot(confirmedSstFiles);
return previousSnapshot;
}

/**
Expand Down Expand Up @@ -352,6 +351,7 @@ private long uploadSnapshotFiles(
List<Path> miscFilePaths = new ArrayList<>(files.length);

createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
LOG.info("Will re-use {} SST files. {}", sstFiles.size(), sstFiles);

final CheckpointedStateScope stateScope =
sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING
Expand Down
Loading