Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 36 additions & 24 deletions icechunk/src/asset_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,21 @@ impl AssetManager {
)
}

pub fn clone_for_spec_version(&self, spec_version: SpecVersionBin) -> Self {
Self::new(
Arc::clone(&self.storage),
self.storage_settings.clone(),
spec_version,
self.num_snapshot_nodes,
self.num_chunk_refs,
self.num_transaction_changes,
self.num_bytes_attributes,
self.num_bytes_chunks,
self.compression_level,
self.max_concurrent_requests,
)
}

pub fn spec_version(&self) -> SpecVersionBin {
self.spec_version
}
Expand Down Expand Up @@ -292,6 +307,7 @@ impl AssetManager {
let manifest_c = Arc::clone(&manifest);
let res = write_new_manifest(
manifest_c,
self.spec_version(),
self.compression_level,
self.storage.as_ref(),
&self.storage_settings,
Expand Down Expand Up @@ -370,6 +386,7 @@ impl AssetManager {
let snapshot_c = Arc::clone(&snapshot);
write_new_snapshot(
snapshot_c,
self.spec_version(),
self.compression_level,
self.storage.as_ref(),
&self.storage_settings,
Expand Down Expand Up @@ -416,6 +433,7 @@ impl AssetManager {
write_new_tx_log(
transaction_id.clone(),
log_c,
self.spec_version(),
self.compression_level,
self.storage.as_ref(),
&self.storage_settings,
Expand Down Expand Up @@ -482,6 +500,7 @@ impl AssetManager {
) -> RepositoryResult<VersionInfo> {
write_repo_info(
info,
self.spec_version(),
&storage::VersionInfo::for_creation(),
self.compression_level,
None,
Expand All @@ -504,6 +523,7 @@ impl AssetManager {
trace!(attempts, "Attempting to update repo object");
match write_repo_info(
Arc::clone(&new_repo),
self.spec_version(),
&repo_version,
self.compression_level,
Some(backup_path.as_str()),
Expand Down Expand Up @@ -853,6 +873,7 @@ fn check_header(

async fn write_new_manifest(
new_manifest: Arc<Manifest>,
spec_version: SpecVersionBin,
compression_level: u8,
storage: &(dyn Storage + Send + Sync),
storage_settings: &storage::Settings,
Expand All @@ -862,7 +883,7 @@ async fn write_new_manifest(
let metadata = vec![
(
LATEST_ICECHUNK_FORMAT_VERSION_METADATA_KEY.to_string(),
(SpecVersionBin::current() as u8).to_string(),
(spec_version as u8).to_string(),
),
(ICECHUNK_CLIENT_NAME_METADATA_KEY.to_string(), ICECHUNK_CLIENT_NAME.to_string()),
(
Expand All @@ -883,18 +904,14 @@ async fn write_new_manifest(
let buffer = tokio::task::spawn_blocking(move || {
let _entered = span.entered();
let buffer = binary_file_header(
SpecVersionBin::current(),
spec_version,
FileTypeBin::Manifest,
CompressionAlgorithmBin::Zstd,
);
let mut compressor =
zstd::stream::Encoder::new(buffer, compression_level as i32)?;

serialize_manifest(
new_manifest.as_ref(),
SpecVersionBin::current(),
&mut compressor,
)?;
serialize_manifest(new_manifest.as_ref(), spec_version, &mut compressor)?;

compressor.finish().map_err(RepositoryErrorKind::IOError)
})
Expand Down Expand Up @@ -959,6 +976,7 @@ fn check_and_get_decompressor(

async fn write_new_snapshot(
new_snapshot: Arc<Snapshot>,
spec_version: SpecVersionBin,
compression_level: u8,
storage: &(dyn Storage + Send + Sync),
storage_settings: &storage::Settings,
Expand All @@ -968,7 +986,7 @@ async fn write_new_snapshot(
let metadata = vec![
(
LATEST_ICECHUNK_FORMAT_VERSION_METADATA_KEY.to_string(),
(SpecVersionBin::current() as u8).to_string(),
(spec_version as u8).to_string(),
),
(ICECHUNK_CLIENT_NAME_METADATA_KEY.to_string(), ICECHUNK_CLIENT_NAME.to_string()),
(
Expand All @@ -986,18 +1004,14 @@ async fn write_new_snapshot(
let buffer = tokio::task::spawn_blocking(move || {
let _entered = span.entered();
let buffer = binary_file_header(
SpecVersionBin::current(),
spec_version,
FileTypeBin::Snapshot,
CompressionAlgorithmBin::Zstd,
);
let mut compressor =
zstd::stream::Encoder::new(buffer, compression_level as i32)?;

serialize_snapshot(
new_snapshot.as_ref(),
SpecVersionBin::current(),
&mut compressor,
)?;
serialize_snapshot(new_snapshot.as_ref(), spec_version, &mut compressor)?;

compressor.finish().map_err(RepositoryErrorKind::IOError)
})
Expand Down Expand Up @@ -1044,6 +1058,7 @@ async fn fetch_snapshot(
async fn write_new_tx_log(
transaction_id: SnapshotId,
new_log: Arc<TransactionLog>,
spec_version: SpecVersionBin,
compression_level: u8,
storage: &(dyn Storage + Send + Sync),
storage_settings: &storage::Settings,
Expand All @@ -1053,7 +1068,7 @@ async fn write_new_tx_log(
let metadata = vec![
(
LATEST_ICECHUNK_FORMAT_VERSION_METADATA_KEY.to_string(),
(SpecVersionBin::current() as u8).to_string(),
(spec_version as u8).to_string(),
),
(ICECHUNK_CLIENT_NAME_METADATA_KEY.to_string(), ICECHUNK_CLIENT_NAME.to_string()),
(
Expand All @@ -1070,17 +1085,13 @@ async fn write_new_tx_log(
let buffer = tokio::task::spawn_blocking(move || {
let _entered = span.entered();
let buffer = binary_file_header(
SpecVersionBin::current(),
spec_version,
FileTypeBin::TransactionLog,
CompressionAlgorithmBin::Zstd,
);
let mut compressor =
zstd::stream::Encoder::new(buffer, compression_level as i32)?;
serialize_transaction_log(
new_log.as_ref(),
SpecVersionBin::current(),
&mut compressor,
)?;
serialize_transaction_log(new_log.as_ref(), spec_version, &mut compressor)?;
compressor.finish().map_err(RepositoryErrorKind::IOError)
})
.await??;
Expand Down Expand Up @@ -1126,6 +1137,7 @@ async fn fetch_transaction_log(

async fn write_repo_info(
info: Arc<RepoInfo>,
spec_version: SpecVersionBin,
version: &VersionInfo,
compression_level: u8,
backup_path: Option<&str>,
Expand All @@ -1136,7 +1148,7 @@ async fn write_repo_info(
let metadata = vec![
(
LATEST_ICECHUNK_FORMAT_VERSION_METADATA_KEY.to_string(),
(SpecVersionBin::current() as u8).to_string(),
(spec_version as u8).to_string(),
),
(ICECHUNK_CLIENT_NAME_METADATA_KEY.to_string(), ICECHUNK_CLIENT_NAME.to_string()),
(
Expand All @@ -1153,13 +1165,13 @@ async fn write_repo_info(
let buffer = tokio::task::spawn_blocking(move || {
let _entered = span.entered();
let buffer = binary_file_header(
SpecVersionBin::current(),
spec_version,
FileTypeBin::RepoInfo,
CompressionAlgorithmBin::Zstd,
);
let mut compressor =
zstd::stream::Encoder::new(buffer, compression_level as i32)?;
serialize_repo_info(info.as_ref(), SpecVersionBin::current(), &mut compressor)?;
serialize_repo_info(info.as_ref(), spec_version, &mut compressor)?;
compressor.finish().map_err(RepositoryErrorKind::IOError)
})
.await??;
Expand Down
Loading