Skip to content

Conversation

@Zakelly
Copy link
Contributor

@Zakelly Zakelly commented Oct 27, 2025

What is the purpose of the change

During incremental checkpoints, the RocksDB/ForSt will upload checkpoint files based on previous uploaded ssts. But the uploaded ssts are determined via the checkpoint notifications. If the notification arrives late, it may use wrong sst files which are already subsumed by the shared state registry. See FLINK-38574 for one example. This PR fix this.

Following is the issue:

1. checkpoint 1 uses file 00001.SST uploaded as xxx.sst
2. checkpoint 2 uses the same file 00001.SST but re-uploads it as yyy.sst because CP 1 wasn't yet confirmed
3. TM get a confirmation of checkpoint 1
4. JM completes checkpoint 2 and subsumes checkpoint 1 - removing xxx.sst
5. checkpoint 3 tries to re-use file 00001.SST uploaded as xxx.sst in checkpoint 1, but it was deleted in (4) by JM

Brief change log

  • Change the ForSt/RocksDBIncrementalSnapshotStrategy to avoid reusing the files that has been re-uploaded by following checkpoints.

Verifying this change

  • Added testCheckpointIsIncrementalWithLateNotification for both RocksDB and ForSt.
  • Existing EventTimeWindowCheckpointingITCase.testSlidingTimeWindow and testPreAggregatedTumblingTimeWindow will fail occasionally without this fix.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 27, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@Zakelly
Copy link
Contributor Author

Zakelly commented Oct 28, 2025

@rkhachatryan would you mind take a look?

@rkhachatryan rkhachatryan self-assigned this Oct 28, 2025
Copy link
Contributor

@rkhachatryan rkhachatryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix @Zakelly

Can you verify my understanding of the problem?

  1. checkpoint 1 uses file 00001.SST uploaded as xxx.sst
  2. checkpoint 2 uses the same file 00001.SST but re-uploads it as yyy.sst because CP 1 wasn't yet confirmed
  3. TM get a confirmation of checkpoint 1
  4. JM completes checkpoint 2 and subsumes checkpoint 1 - removing xxx.sst
  5. checkpoint 3 tries to re-use file 00001.SST uploaded as xxx.sst in checkpoint 1, but it was deleted in (4) by JM

If that's correct, could you to add it to the PR / commit (NIT).

@Zakelly
Copy link
Contributor Author

Zakelly commented Oct 30, 2025

Thanks for the fix @Zakelly

Can you verify my understanding of the problem?

  1. checkpoint 1 uses file 00001.SST uploaded as xxx.sst
  2. checkpoint 2 uses the same file 00001.SST but re-uploads it as yyy.sst because CP 1 wasn't yet confirmed
  3. TM get a confirmation of checkpoint 1
  4. JM completes checkpoint 2 and subsumes checkpoint 1 - removing xxx.sst
  5. checkpoint 3 tries to re-use file 00001.SST uploaded as xxx.sst in checkpoint 1, but it was deleted in (4) by JM

If that's correct, could you to add it to the PR / commit (NIT).

Yes that's correct. I've added those in PR description and in javadocs of PreviousSnapshot.

Copy link
Contributor

@rkhachatryan rkhachatryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the PR, LGTM

instanceof PlaceholderStreamStateHandle)) {
// If it's not a placeholder handle, it means the sst file has been
// re-uploaded in the following checkpoint.
prunedSstFiles.remove(handleAndLocalPath.getLocalPath());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: log the number of removed files if it's > 0 (on DEBUG level)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added log of removed file number on trace level as the outer snapshotMetaData also log on trace level.

@Zakelly Zakelly force-pushed the f38574 branch 2 times, most recently from 74f24ee to 223eedd Compare October 30, 2025 10:13
@Zakelly
Copy link
Contributor Author

Zakelly commented Oct 30, 2025

Ah, CI failed, but I don't think that's relevant. Force-pushing to re-trigger...

@Zakelly Zakelly merged commit 9b21a97 into apache:master Oct 31, 2025
@Zakelly Zakelly deleted the f38574 branch October 31, 2025 09:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants