Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract common object store key builder helpers #2438

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 73 additions & 51 deletions crates/worker/src/partition/snapshots/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ pub struct LatestSnapshot {
}

impl LatestSnapshot {
pub fn from_snapshot(snapshot: &PartitionSnapshotMetadata, path: String) -> Self {
pub fn from_snapshot(
snapshot: &PartitionSnapshotMetadata,
snapshot_unique_key: String,
) -> Self {
LatestSnapshot {
version: snapshot.version,
cluster_name: snapshot.cluster_name.clone(),
Expand All @@ -109,7 +112,7 @@ impl LatestSnapshot {
snapshot_id: snapshot.snapshot_id,
created_at: snapshot.created_at.clone(),
min_applied_lsn: snapshot.min_applied_lsn,
path,
path: snapshot_unique_key,
}
}
}
Expand Down Expand Up @@ -208,29 +211,17 @@ impl SnapshotRepository {
snapshot: &PartitionSnapshotMetadata,
local_snapshot_path: &Path,
) -> Result<(), PutSnapshotError> {
// A unique snapshot path within the partition prefix. We pad the LSN to ensure correct
// lexicographic sorting.
let snapshot_prefix = Self::get_snapshot_prefix(snapshot);
let full_snapshot_path = format!(
"{prefix}{partition_id}/{snapshot_prefix}",
prefix = self.prefix,
partition_id = snapshot.partition_id,
);

let snapshot_prefix = self.get_base_prefix(snapshot);
debug!(
"Uploading snapshot from {:?} to {}",
local_snapshot_path, full_snapshot_path
local_snapshot_path, snapshot_prefix
);

let mut progress = SnapshotUploadProgress::with_snapshot_path(full_snapshot_path.clone());
let mut progress = SnapshotUploadProgress::with_snapshot_path(snapshot_prefix.clone());
let mut buf = BytesMut::new();
for file in &snapshot.files {
let filename = file.name.trim_start_matches("/");
let key = object_store::path::Path::from(format!(
"{}/{}",
full_snapshot_path.as_str(),
filename
));
let key = self.get_snapshot_file(snapshot, filename);

let put_result = put_snapshot_object(
local_snapshot_path.join(filename).as_path(),
Expand All @@ -249,10 +240,7 @@ impl SnapshotRepository {
progress.push(file.name.clone());
}

let metadata_key = object_store::path::Path::from(format!(
"{}/metadata.json",
full_snapshot_path.as_str()
));
let metadata_key = self.get_snapshot_file(snapshot, "metadata.json");
let metadata_json_payload = PutPayload::from(
serde_json::to_string_pretty(snapshot).expect("Can always serialize JSON"),
);
Expand All @@ -270,11 +258,7 @@ impl SnapshotRepository {
"Successfully published snapshot metadata",
);

let latest_path = object_store::path::Path::from(format!(
"{prefix}{partition_id}/latest.json",
prefix = self.prefix,
partition_id = snapshot.partition_id,
));
let latest_path = self.get_latest_snapshot_pointer(snapshot.partition_id);

// By performing a CAS on the latest snapshot pointer, we can ensure strictly monotonic updates.
let maybe_stored = self
Expand All @@ -293,7 +277,7 @@ impl SnapshotRepository {
return Ok(());
}

let latest = LatestSnapshot::from_snapshot(snapshot, snapshot_prefix);
let latest = LatestSnapshot::from_snapshot(snapshot, Self::padded_snapshot_key(snapshot));
let latest = PutPayload::from(
serde_json::to_string_pretty(&latest)
.map_err(|e| PutSnapshotError::from(e, progress.clone()))?,
Expand Down Expand Up @@ -329,14 +313,6 @@ impl SnapshotRepository {
Ok(())
}

fn get_snapshot_prefix(snapshot: &PartitionSnapshotMetadata) -> String {
format!(
"lsn_{lsn:020}-{snapshot_id}",
lsn = snapshot.min_applied_lsn,
snapshot_id = snapshot.snapshot_id
)
}

/// Discover and download the latest snapshot available. It is the caller's responsibility
/// to delete the snapshot directory when it is no longer needed.
#[instrument(
Expand All @@ -349,11 +325,7 @@ impl SnapshotRepository {
&self,
partition_id: PartitionId,
) -> anyhow::Result<Option<LocalPartitionSnapshot>> {
let latest_path = object_store::path::Path::from(format!(
"{prefix}{partition_id}/latest.json",
prefix = self.prefix,
partition_id = partition_id,
));
let latest_path = self.get_latest_snapshot_pointer(partition_id);

let latest = self.object_store.get(&latest_path).await;

Expand All @@ -370,10 +342,10 @@ impl SnapshotRepository {
debug!("Latest snapshot metadata: {:?}", latest);

let snapshot_metadata_path = object_store::path::Path::from(format!(
"{prefix}{partition_id}/{path}/metadata.json",
"{prefix}{partition_id}/{snapshot_path}/metadata.json",
prefix = self.prefix,
partition_id = partition_id,
path = latest.path,
snapshot_path = latest.path,
));
let snapshot_metadata = self.object_store.get(&snapshot_metadata_path).await;

Expand Down Expand Up @@ -429,10 +401,10 @@ impl SnapshotRepository {
let filename = file.name.trim_start_matches("/");
let expected_size = file.size;
let key = object_store::path::Path::from(format!(
"{prefix}{partition_id}/{path}/{filename}",
"{prefix}{partition_id}/{snapshot_path}/{filename}",
prefix = self.prefix,
partition_id = partition_id,
path = latest.path,
snapshot_path = latest.path,
filename = filename,
));
let file_path = snapshot_dir.path().join(filename);
Expand Down Expand Up @@ -550,6 +522,56 @@ impl SnapshotRepository {
}
}
}

/// Construct the full object path to the latest snapshot pointer for a given partition.
fn get_latest_snapshot_pointer(&self, partition_id: PartitionId) -> object_store::path::Path {
object_store::path::Path::from(format!(
"{partition_prefix}/latest.json",
partition_prefix = self.get_partition_snapshots_prefix(partition_id)
))
}

/// Construct the prefix relative to the destination root for a given partition's snapshots.
fn get_partition_snapshots_prefix(&self, partition_id: PartitionId) -> String {
// Note: the destination prefix must include a separator if not empty.
format!(
"{prefix}{partition_id}",
prefix = self.prefix,
partition_id = partition_id,
)
}

/// Construct the complete snapshot prefix from the base of the object store destination.
fn get_base_prefix(&self, snapshot_metadata: &PartitionSnapshotMetadata) -> String {
format!(
"{partition_prefix}/{snapshot_path}",
partition_prefix = self.get_partition_snapshots_prefix(snapshot_metadata.partition_id),
snapshot_path = Self::padded_snapshot_key(snapshot_metadata)
)
}

/// Construct the full object path for a specific file from the given snapshot.
fn get_snapshot_file(
&self,
snapshot_metadata: &PartitionSnapshotMetadata,
filename: &str,
) -> object_store::path::Path {
object_store::path::Path::from(format!(
"{base_prefix}/{filename}",
base_prefix = self.get_base_prefix(snapshot_metadata)
))
}

/// Construct the unique path component for a snapshot, e.g. `lsn_00001234-snap_abc123`.
/// The LSN is zero-padded for correct lexicographical sorting.
fn padded_snapshot_key(snapshot: &PartitionSnapshotMetadata) -> String {
// We zero-pad the LSN to ensure correct lexicographic sorting
format!(
"lsn_{lsn:020}-{snapshot_id}",
lsn = snapshot.min_applied_lsn,
snapshot_id = snapshot.snapshot_id
)
}
}

async fn create_object_store_client(destination: Url) -> anyhow::Result<Arc<dyn ObjectStore>> {
Expand Down Expand Up @@ -594,14 +616,14 @@ async fn create_object_store_client(destination: Url) -> anyhow::Result<Arc<dyn

#[derive(Clone, Debug)]
struct SnapshotUploadProgress {
pub full_snapshot_path: String,
pub snapshot_complete_path: String,
pub uploaded_files: Vec<String>,
}

impl SnapshotUploadProgress {
fn with_snapshot_path(full_snapshot_path: String) -> Self {
fn with_snapshot_path(snapshot_complete_path: String) -> Self {
SnapshotUploadProgress {
full_snapshot_path,
snapshot_complete_path,
uploaded_files: vec![],
}
}
Expand All @@ -624,7 +646,7 @@ impl PutSnapshotError {
{
PutSnapshotError {
error: error.into(),
full_snapshot_path: progress.full_snapshot_path,
full_snapshot_path: progress.snapshot_complete_path,
uploaded_files: progress.uploaded_files,
}
}
Expand Down Expand Up @@ -865,7 +887,7 @@ mod tests {

repository.put(&snapshot1, source_dir.clone()).await?;

let snapshot_prefix = SnapshotRepository::get_snapshot_prefix(&snapshot1);
let snapshot_prefix = SnapshotRepository::padded_snapshot_key(&snapshot1);
let data = object_store
.get(&Path::from(format!(
"{}/{}/{}/data.sst",
Expand Down Expand Up @@ -921,7 +943,7 @@ mod tests {
assert_eq!(
LatestSnapshot::from_snapshot(
&snapshot2,
SnapshotRepository::get_snapshot_prefix(&snapshot2)
SnapshotRepository::padded_snapshot_key(&snapshot2)
),
latest
);
Expand Down
Loading