diff --git a/crates/worker/src/partition/snapshots/repository.rs b/crates/worker/src/partition/snapshots/repository.rs index cf0bbcdd4..aabe16ddd 100644 --- a/crates/worker/src/partition/snapshots/repository.rs +++ b/crates/worker/src/partition/snapshots/repository.rs @@ -100,7 +100,10 @@ pub struct LatestSnapshot { } impl LatestSnapshot { - pub fn from_snapshot(snapshot: &PartitionSnapshotMetadata, path: String) -> Self { + pub fn from_snapshot( + snapshot: &PartitionSnapshotMetadata, + snapshot_unique_key: String, + ) -> Self { LatestSnapshot { version: snapshot.version, cluster_name: snapshot.cluster_name.clone(), @@ -109,7 +112,7 @@ impl LatestSnapshot { snapshot_id: snapshot.snapshot_id, created_at: snapshot.created_at.clone(), min_applied_lsn: snapshot.min_applied_lsn, - path, + path: snapshot_unique_key, } } } @@ -208,29 +211,17 @@ impl SnapshotRepository { snapshot: &PartitionSnapshotMetadata, local_snapshot_path: &Path, ) -> Result<(), PutSnapshotError> { - // A unique snapshot path within the partition prefix. We pad the LSN to ensure correct - // lexicographic sorting. - let snapshot_prefix = Self::get_snapshot_prefix(snapshot); - let full_snapshot_path = format!( - "{prefix}{partition_id}/{snapshot_prefix}", - prefix = self.prefix, - partition_id = snapshot.partition_id, - ); - + let snapshot_prefix = self.get_base_prefix(snapshot); debug!( "Uploading snapshot from {:?} to {}", - local_snapshot_path, full_snapshot_path + local_snapshot_path, snapshot_prefix ); - let mut progress = SnapshotUploadProgress::with_snapshot_path(full_snapshot_path.clone()); + let mut progress = SnapshotUploadProgress::with_snapshot_path(snapshot_prefix.clone()); let mut buf = BytesMut::new(); for file in &snapshot.files { let filename = file.name.trim_start_matches("/"); - let key = object_store::path::Path::from(format!( - "{}/{}", - full_snapshot_path.as_str(), - filename - )); + let key = self.get_snapshot_file(snapshot, filename); let put_result = put_snapshot_object( local_snapshot_path.join(filename).as_path(), @@ -249,10 +240,7 @@ impl SnapshotRepository { progress.push(file.name.clone()); } - let metadata_key = object_store::path::Path::from(format!( - "{}/metadata.json", - full_snapshot_path.as_str() - )); + let metadata_key = self.get_snapshot_file(snapshot, "metadata.json"); let metadata_json_payload = PutPayload::from( serde_json::to_string_pretty(snapshot).expect("Can always serialize JSON"), ); @@ -270,11 +258,7 @@ impl SnapshotRepository { "Successfully published snapshot metadata", ); - let latest_path = object_store::path::Path::from(format!( - "{prefix}{partition_id}/latest.json", - prefix = self.prefix, - partition_id = snapshot.partition_id, - )); + let latest_path = self.get_latest_snapshot_pointer(snapshot.partition_id); // By performing a CAS on the latest snapshot pointer, we can ensure strictly monotonic updates. let maybe_stored = self @@ -293,7 +277,7 @@ impl SnapshotRepository { return Ok(()); } - let latest = LatestSnapshot::from_snapshot(snapshot, snapshot_prefix); + let latest = LatestSnapshot::from_snapshot(snapshot, Self::padded_snapshot_key(snapshot)); let latest = PutPayload::from( serde_json::to_string_pretty(&latest) .map_err(|e| PutSnapshotError::from(e, progress.clone()))?, @@ -329,14 +313,6 @@ impl SnapshotRepository { Ok(()) } - fn get_snapshot_prefix(snapshot: &PartitionSnapshotMetadata) -> String { - format!( - "lsn_{lsn:020}-{snapshot_id}", - lsn = snapshot.min_applied_lsn, - snapshot_id = snapshot.snapshot_id - ) - } - /// Discover and download the latest snapshot available. It is the caller's responsibility /// to delete the snapshot directory when it is no longer needed. #[instrument( @@ -349,11 +325,7 @@ impl SnapshotRepository { &self, partition_id: PartitionId, ) -> anyhow::Result> { - let latest_path = object_store::path::Path::from(format!( - "{prefix}{partition_id}/latest.json", - prefix = self.prefix, - partition_id = partition_id, - )); + let latest_path = self.get_latest_snapshot_pointer(partition_id); let latest = self.object_store.get(&latest_path).await; @@ -370,10 +342,10 @@ impl SnapshotRepository { debug!("Latest snapshot metadata: {:?}", latest); let snapshot_metadata_path = object_store::path::Path::from(format!( - "{prefix}{partition_id}/{path}/metadata.json", + "{prefix}{partition_id}/{snapshot_path}/metadata.json", prefix = self.prefix, partition_id = partition_id, - path = latest.path, + snapshot_path = latest.path, )); let snapshot_metadata = self.object_store.get(&snapshot_metadata_path).await; @@ -429,10 +401,10 @@ impl SnapshotRepository { let filename = file.name.trim_start_matches("/"); let expected_size = file.size; let key = object_store::path::Path::from(format!( - "{prefix}{partition_id}/{path}/{filename}", + "{prefix}{partition_id}/{snapshot_path}/{filename}", prefix = self.prefix, partition_id = partition_id, - path = latest.path, + snapshot_path = latest.path, filename = filename, )); let file_path = snapshot_dir.path().join(filename); @@ -550,6 +522,56 @@ impl SnapshotRepository { } } } + + /// Construct the full object path to the latest snapshot pointer for a given partition. + fn get_latest_snapshot_pointer(&self, partition_id: PartitionId) -> object_store::path::Path { + object_store::path::Path::from(format!( + "{partition_prefix}/latest.json", + partition_prefix = self.get_partition_snapshots_prefix(partition_id) + )) + } + + /// Construct the prefix relative to the destination root for a given partition's snapshots. + fn get_partition_snapshots_prefix(&self, partition_id: PartitionId) -> String { + // Note: the destination prefix must include a separator if not empty. + format!( + "{prefix}{partition_id}", + prefix = self.prefix, + partition_id = partition_id, + ) + } + + /// Construct the complete snapshot prefix from the base of the object store destination. + fn get_base_prefix(&self, snapshot_metadata: &PartitionSnapshotMetadata) -> String { + format!( + "{partition_prefix}/{snapshot_path}", + partition_prefix = self.get_partition_snapshots_prefix(snapshot_metadata.partition_id), + snapshot_path = Self::padded_snapshot_key(snapshot_metadata) + ) + } + + /// Construct the full object path for a specific file from the given snapshot. + fn get_snapshot_file( + &self, + snapshot_metadata: &PartitionSnapshotMetadata, + filename: &str, + ) -> object_store::path::Path { + object_store::path::Path::from(format!( + "{base_prefix}/{filename}", + base_prefix = self.get_base_prefix(snapshot_metadata) + )) + } + + /// Construct the unique path component for a snapshot, e.g. `lsn_00001234-snap_abc123`. + /// The LSN is zero-padded for correct lexicographical sorting. + fn padded_snapshot_key(snapshot: &PartitionSnapshotMetadata) -> String { + // We zero-pad the LSN to ensure correct lexicographic sorting + format!( + "lsn_{lsn:020}-{snapshot_id}", + lsn = snapshot.min_applied_lsn, + snapshot_id = snapshot.snapshot_id + ) + } } async fn create_object_store_client(destination: Url) -> anyhow::Result> { @@ -594,14 +616,14 @@ async fn create_object_store_client(destination: Url) -> anyhow::Result, } impl SnapshotUploadProgress { - fn with_snapshot_path(full_snapshot_path: String) -> Self { + fn with_snapshot_path(snapshot_complete_path: String) -> Self { SnapshotUploadProgress { - full_snapshot_path, + snapshot_complete_path, uploaded_files: vec![], } } @@ -624,7 +646,7 @@ impl PutSnapshotError { { PutSnapshotError { error: error.into(), - full_snapshot_path: progress.full_snapshot_path, + full_snapshot_path: progress.snapshot_complete_path, uploaded_files: progress.uploaded_files, } } @@ -865,7 +887,7 @@ mod tests { repository.put(&snapshot1, source_dir.clone()).await?; - let snapshot_prefix = SnapshotRepository::get_snapshot_prefix(&snapshot1); + let snapshot_prefix = SnapshotRepository::padded_snapshot_key(&snapshot1); let data = object_store .get(&Path::from(format!( "{}/{}/{}/data.sst", @@ -921,7 +943,7 @@ mod tests { assert_eq!( LatestSnapshot::from_snapshot( &snapshot2, - SnapshotRepository::get_snapshot_prefix(&snapshot2) + SnapshotRepository::padded_snapshot_key(&snapshot2) ), latest );