diff --git a/icechunk/src/asset_manager.rs b/icechunk/src/asset_manager.rs index 2828772ed..15eca5283 100644 --- a/icechunk/src/asset_manager.rs +++ b/icechunk/src/asset_manager.rs @@ -191,6 +191,21 @@ impl AssetManager { ) } + pub fn clone_for_spec_version(&self, spec_version: SpecVersionBin) -> Self { + Self::new( + Arc::clone(&self.storage), + self.storage_settings.clone(), + spec_version, + self.num_snapshot_nodes, + self.num_chunk_refs, + self.num_transaction_changes, + self.num_bytes_attributes, + self.num_bytes_chunks, + self.compression_level, + self.max_concurrent_requests, + ) + } + pub fn spec_version(&self) -> SpecVersionBin { self.spec_version } @@ -292,6 +307,7 @@ impl AssetManager { let manifest_c = Arc::clone(&manifest); let res = write_new_manifest( manifest_c, + self.spec_version(), self.compression_level, self.storage.as_ref(), &self.storage_settings, @@ -370,6 +386,7 @@ impl AssetManager { let snapshot_c = Arc::clone(&snapshot); write_new_snapshot( snapshot_c, + self.spec_version(), self.compression_level, self.storage.as_ref(), &self.storage_settings, @@ -416,6 +433,7 @@ impl AssetManager { write_new_tx_log( transaction_id.clone(), log_c, + self.spec_version(), self.compression_level, self.storage.as_ref(), &self.storage_settings, @@ -482,6 +500,7 @@ impl AssetManager { ) -> RepositoryResult { write_repo_info( info, + self.spec_version(), &storage::VersionInfo::for_creation(), self.compression_level, None, @@ -504,6 +523,7 @@ impl AssetManager { trace!(attempts, "Attempting to update repo object"); match write_repo_info( Arc::clone(&new_repo), + self.spec_version(), &repo_version, self.compression_level, Some(backup_path.as_str()), @@ -853,6 +873,7 @@ fn check_header( async fn write_new_manifest( new_manifest: Arc, + spec_version: SpecVersionBin, compression_level: u8, storage: &(dyn Storage + Send + Sync), storage_settings: &storage::Settings, @@ -862,7 +883,7 @@ async fn write_new_manifest( let metadata = vec![ ( LATEST_ICECHUNK_FORMAT_VERSION_METADATA_KEY.to_string(), - (SpecVersionBin::current() as u8).to_string(), + (spec_version as u8).to_string(), ), (ICECHUNK_CLIENT_NAME_METADATA_KEY.to_string(), ICECHUNK_CLIENT_NAME.to_string()), ( @@ -883,18 +904,14 @@ async fn write_new_manifest( let buffer = tokio::task::spawn_blocking(move || { let _entered = span.entered(); let buffer = binary_file_header( - SpecVersionBin::current(), + spec_version, FileTypeBin::Manifest, CompressionAlgorithmBin::Zstd, ); let mut compressor = zstd::stream::Encoder::new(buffer, compression_level as i32)?; - serialize_manifest( - new_manifest.as_ref(), - SpecVersionBin::current(), - &mut compressor, - )?; + serialize_manifest(new_manifest.as_ref(), spec_version, &mut compressor)?; compressor.finish().map_err(RepositoryErrorKind::IOError) }) @@ -959,6 +976,7 @@ fn check_and_get_decompressor( async fn write_new_snapshot( new_snapshot: Arc, + spec_version: SpecVersionBin, compression_level: u8, storage: &(dyn Storage + Send + Sync), storage_settings: &storage::Settings, @@ -968,7 +986,7 @@ async fn write_new_snapshot( let metadata = vec![ ( LATEST_ICECHUNK_FORMAT_VERSION_METADATA_KEY.to_string(), - (SpecVersionBin::current() as u8).to_string(), + (spec_version as u8).to_string(), ), (ICECHUNK_CLIENT_NAME_METADATA_KEY.to_string(), ICECHUNK_CLIENT_NAME.to_string()), ( @@ -986,18 +1004,14 @@ async fn write_new_snapshot( let buffer = tokio::task::spawn_blocking(move || { let _entered = span.entered(); let buffer = binary_file_header( - SpecVersionBin::current(), + spec_version, FileTypeBin::Snapshot, CompressionAlgorithmBin::Zstd, ); let mut compressor = zstd::stream::Encoder::new(buffer, compression_level as i32)?; - serialize_snapshot( - new_snapshot.as_ref(), - SpecVersionBin::current(), - &mut compressor, - )?; + serialize_snapshot(new_snapshot.as_ref(), spec_version, &mut compressor)?; compressor.finish().map_err(RepositoryErrorKind::IOError) }) @@ -1044,6 +1058,7 @@ async fn fetch_snapshot( async fn write_new_tx_log( transaction_id: SnapshotId, new_log: Arc, + spec_version: SpecVersionBin, compression_level: u8, storage: &(dyn Storage + Send + Sync), storage_settings: &storage::Settings, @@ -1053,7 +1068,7 @@ async fn write_new_tx_log( let metadata = vec![ ( LATEST_ICECHUNK_FORMAT_VERSION_METADATA_KEY.to_string(), - (SpecVersionBin::current() as u8).to_string(), + (spec_version as u8).to_string(), ), (ICECHUNK_CLIENT_NAME_METADATA_KEY.to_string(), ICECHUNK_CLIENT_NAME.to_string()), ( @@ -1070,17 +1085,13 @@ async fn write_new_tx_log( let buffer = tokio::task::spawn_blocking(move || { let _entered = span.entered(); let buffer = binary_file_header( - SpecVersionBin::current(), + spec_version, FileTypeBin::TransactionLog, CompressionAlgorithmBin::Zstd, ); let mut compressor = zstd::stream::Encoder::new(buffer, compression_level as i32)?; - serialize_transaction_log( - new_log.as_ref(), - SpecVersionBin::current(), - &mut compressor, - )?; + serialize_transaction_log(new_log.as_ref(), spec_version, &mut compressor)?; compressor.finish().map_err(RepositoryErrorKind::IOError) }) .await??; @@ -1126,6 +1137,7 @@ async fn fetch_transaction_log( async fn write_repo_info( info: Arc, + spec_version: SpecVersionBin, version: &VersionInfo, compression_level: u8, backup_path: Option<&str>, @@ -1136,7 +1148,7 @@ async fn write_repo_info( let metadata = vec![ ( LATEST_ICECHUNK_FORMAT_VERSION_METADATA_KEY.to_string(), - (SpecVersionBin::current() as u8).to_string(), + (spec_version as u8).to_string(), ), (ICECHUNK_CLIENT_NAME_METADATA_KEY.to_string(), ICECHUNK_CLIENT_NAME.to_string()), ( @@ -1153,13 +1165,13 @@ async fn write_repo_info( let buffer = tokio::task::spawn_blocking(move || { let _entered = span.entered(); let buffer = binary_file_header( - SpecVersionBin::current(), + spec_version, FileTypeBin::RepoInfo, CompressionAlgorithmBin::Zstd, ); let mut compressor = zstd::stream::Encoder::new(buffer, compression_level as i32)?; - serialize_repo_info(info.as_ref(), SpecVersionBin::current(), &mut compressor)?; + serialize_repo_info(info.as_ref(), spec_version, &mut compressor)?; compressor.finish().map_err(RepositoryErrorKind::IOError) }) .await??; diff --git a/icechunk/src/format/repo_info.rs b/icechunk/src/format/repo_info.rs index c51072d01..9aac54c47 100644 --- a/icechunk/src/format/repo_info.rs +++ b/icechunk/src/format/repo_info.rs @@ -91,10 +91,12 @@ pub struct UpdateInfo { type UpdateTuple<'a> = IcechunkResult<(UpdateType, DateTime, Option<&'a str>)>; impl RepoInfo { + #[allow(clippy::too_many_arguments)] pub fn new< 'a, I: IntoIterator, Option<&'a str>)>>, >( + spec_version: SpecVersionBin, tags: impl IntoIterator, branches: impl IntoIterator, deleted_tags: impl IntoIterator, @@ -110,6 +112,7 @@ impl RepoInfo { let mut deleted_tags: Vec<_> = deleted_tags.into_iter().collect(); deleted_tags.sort(); Self::from_parts( + spec_version, tags, branches, deleted_tags, @@ -120,10 +123,12 @@ impl RepoInfo { ) } + #[allow(clippy::too_many_arguments)] fn from_parts< 'a, I: IntoIterator, Option<&'a str>)>>, >( + spec_version: SpecVersionBin, sorted_tags: impl IntoIterator, sorted_branches: impl IntoIterator, sorted_deleted_tags: impl IntoIterator, @@ -263,7 +268,7 @@ impl RepoInfo { branches: Some(branches), deleted_tags: Some(deleted_tags), snapshots: Some(snapshots), - spec_version: SpecVersionBin::current() as u8, + spec_version: spec_version as u8, status: Some(status), metadata: Some(metadata), latest_updates: Some(latest_updates), @@ -362,11 +367,12 @@ impl RepoInfo { Ok((updates, repo_before_updates)) } - pub fn initial(snapshot: SnapshotInfo) -> Self { + pub fn initial(spec_version: SpecVersionBin, snapshot: SnapshotInfo) -> Self { let last_updated_at = snapshot.flushed_at; #[allow(clippy::expect_used)] // This method is basically constant, so it's OK to unwrap in it Self::from_parts( + spec_version, [], [("main", 0)], [], @@ -417,6 +423,7 @@ impl RepoInfo { pub fn add_snapshot( &self, + spec_version: SpecVersionBin, snap: SnapshotInfo, branch: Option<&str>, update_type: UpdateType, @@ -449,6 +456,7 @@ impl RepoInfo { }); let res = Self::from_parts( + spec_version, tags, branches, self.deleted_tags()?, @@ -466,6 +474,7 @@ impl RepoInfo { pub fn add_branch( &self, + spec_version: SpecVersionBin, name: &str, snap: &SnapshotId, previous_file: &str, @@ -485,6 +494,7 @@ impl RepoInfo { branches.sort_by(|(name1, _), (name2, _)| name1.cmp(name2)); let snaps: Vec<_> = self.all_snapshots()?.try_collect()?; Ok(Self::from_parts( + spec_version, self.all_tags()?, branches, self.deleted_tags()?, @@ -507,7 +517,12 @@ impl RepoInfo { } } - pub fn delete_branch(&self, name: &str, previous_file: &str) -> IcechunkResult { + pub fn delete_branch( + &self, + spec_version: SpecVersionBin, + name: &str, + previous_file: &str, + ) -> IcechunkResult { match self.resolve_branch(name) { Ok(previous_snap_id) => { let mut branches: Vec<_> = self.all_branches()?.collect(); @@ -515,6 +530,7 @@ impl RepoInfo { branches.retain(|(n, _)| n != &name); let snaps: Vec<_> = self.all_snapshots()?.try_collect()?; Self::from_parts( + spec_version, self.all_tags()?, branches, self.deleted_tags()?, @@ -544,6 +560,7 @@ impl RepoInfo { pub fn update_branch( &self, + spec_version: SpecVersionBin, name: &str, new_snap: &SnapshotId, previous_file: &str, @@ -556,6 +573,7 @@ impl RepoInfo { }); let snaps: Vec<_> = self.all_snapshots()?.try_collect()?; Ok(Self::from_parts( + spec_version, self.all_tags()?, branches, self.deleted_tags()?, @@ -581,6 +599,7 @@ impl RepoInfo { pub fn add_tag( &self, + spec_version: SpecVersionBin, name: &str, snap: &SnapshotId, previous_file: &str, @@ -600,6 +619,7 @@ impl RepoInfo { tags.sort_by(|(name1, _), (name2, _)| name1.cmp(name2)); let snaps: Vec<_> = self.all_snapshots()?.try_collect()?; Ok(Self::from_parts( + spec_version, tags, self.all_branches()?, self.deleted_tags()?, @@ -622,7 +642,12 @@ impl RepoInfo { } } - pub fn delete_tag(&self, name: &str, previous_file: &str) -> IcechunkResult { + pub fn delete_tag( + &self, + spec_version: SpecVersionBin, + name: &str, + previous_file: &str, + ) -> IcechunkResult { match self.resolve_tag(name) { Ok(previous_snap_id) => { let mut tags: Vec<_> = self.all_tags()?.collect(); @@ -634,6 +659,7 @@ impl RepoInfo { let snaps: Vec<_> = self.all_snapshots()?.try_collect()?; Self::from_parts( + spec_version, tags, self.all_branches()?, deleted_tags, @@ -662,11 +688,13 @@ impl RepoInfo { pub fn set_metadata( &self, + spec_version: SpecVersionBin, metadata: &SnapshotProperties, previous_file: &str, ) -> IcechunkResult { let snaps: Vec<_> = self.all_snapshots()?.try_collect()?; Self::from_parts( + spec_version, self.all_tags()?, self.all_branches()?, self.deleted_tags()?, @@ -1168,7 +1196,7 @@ mod tests { message: "snap 1".to_string(), metadata: Default::default(), }; - let repo = RepoInfo::initial(snap1.clone()); + let repo = RepoInfo::initial(SpecVersionBin::current(), snap1.clone()); assert_eq!(repo.all_snapshots()?.next().unwrap().unwrap(), snap1); let id2 = SnapshotId::random(); @@ -1179,6 +1207,7 @@ mod tests { ..snap1.clone() }; let repo = repo.add_snapshot( + SpecVersionBin::current(), snap2.clone(), Some("main"), UpdateType::NewCommitUpdate { branch: "main".to_string() }, @@ -1206,6 +1235,7 @@ mod tests { ..snap2.clone() }; let repo = repo.add_snapshot( + SpecVersionBin::current(), snap3.clone(), Some("main"), UpdateType::NewCommitUpdate { branch: "main".to_string() }, @@ -1240,11 +1270,16 @@ mod tests { message: "snap 1".to_string(), metadata: Default::default(), }; - let repo = RepoInfo::initial(snap1.clone()); - let repo = repo.add_branch("foo", &id1, "foo")?; - let repo = repo.add_branch("bar", &id1, "bar")?; + let repo = RepoInfo::initial(SpecVersionBin::current(), snap1.clone()); + let repo = repo.add_branch(SpecVersionBin::current(), "foo", &id1, "foo")?; + let repo = repo.add_branch(SpecVersionBin::current(), "bar", &id1, "bar")?; assert!(matches!( - repo.add_branch("bad-snap", &SnapshotId::random(), "bad"), + repo.add_branch( + SpecVersionBin::current(), + "bad-snap", + &SnapshotId::random(), + "bad" + ), Err(IcechunkFormatError { kind: IcechunkFormatErrorKind::SnapshotIdNotFound { .. }, .. @@ -1252,7 +1287,7 @@ mod tests { )); // cannot add existing assert!(matches!( - repo.add_branch("bar", &id1, "/foo/bar"), + repo.add_branch(SpecVersionBin::current(), "bar", &id1, "/foo/bar"), Err(IcechunkFormatError { kind: IcechunkFormatErrorKind::BranchAlreadyExists { .. }, .. @@ -1272,31 +1307,44 @@ mod tests { ..snap1.clone() }; let repo = repo.add_snapshot( + SpecVersionBin::current(), snap2, Some("main"), UpdateType::NewCommitUpdate { branch: "main".to_string() }, "foo", )?; - let repo = repo.add_branch("baz", &id2, "/foo/bar")?; + let repo = repo.add_branch(SpecVersionBin::current(), "baz", &id2, "/foo/bar")?; assert_eq!(repo.resolve_branch("main")?, id2.clone()); assert_eq!(repo.resolve_branch("foo")?, id1.clone()); assert_eq!(repo.resolve_branch("bar")?, id1.clone()); assert_eq!(repo.resolve_branch("baz")?, id2.clone()); - let repo = repo.delete_branch("bar", "bar")?; + let repo = repo.delete_branch(SpecVersionBin::current(), "bar", "bar")?; assert!(repo.resolve_branch("bar").is_err()); assert_eq!( repo.all_branches()?.map(|(n, _)| n).collect::>(), ["main", "foo", "baz"].into() ); - assert!(repo.delete_branch("bad-branch", "bad").is_err()); + assert!( + repo.delete_branch(SpecVersionBin::current(), "bad-branch", "bad").is_err() + ); // tags - let repo = repo.add_tag("tag1", &id1, "tag1")?; - let repo = repo.add_tag("tag2", &id2, "tag2")?; - assert!(repo.add_tag("bad-snap", &SnapshotId::random(), "bad").is_err()); - assert!(repo.add_tag("tag1", &id1, "tag1-again").is_err()); + let repo = repo.add_tag(SpecVersionBin::current(), "tag1", &id1, "tag1")?; + let repo = repo.add_tag(SpecVersionBin::current(), "tag2", &id2, "tag2")?; + assert!( + repo.add_tag( + SpecVersionBin::current(), + "bad-snap", + &SnapshotId::random(), + "bad" + ) + .is_err() + ); + assert!( + repo.add_tag(SpecVersionBin::current(), "tag1", &id1, "tag1-again").is_err() + ); assert_eq!(repo.resolve_tag("tag1")?, id1.clone()); assert_eq!(repo.resolve_tag("tag2")?, id2.clone()); assert_eq!( @@ -1305,16 +1353,21 @@ mod tests { ); // delete tags - let repo = repo.add_tag("tag3", &id1, "tag3")?; - let repo = repo.delete_tag("tag3", "delete-tag3")?; + let repo = repo.add_tag(SpecVersionBin::current(), "tag3", &id1, "tag3")?; + let repo = repo.delete_tag(SpecVersionBin::current(), "tag3", "delete-tag3")?; assert_eq!( repo.all_tags()?.map(|(n, _)| n).collect::>(), ["tag1", "tag2"].into() ); // cannot add deleted - assert!(repo.add_tag("tag3", &id1, "tag3-again").is_err()); + assert!( + repo.add_tag(SpecVersionBin::current(), "tag3", &id1, "tag3-again").is_err() + ); // cannot delete deleted - assert!(repo.delete_tag("tag3", "delete-tag3-again").is_err()); + assert!( + repo.delete_tag(SpecVersionBin::current(), "tag3", "delete-tag3-again") + .is_err() + ); assert_eq!( repo.all_tags()?.map(|(n, _)| n).collect::>(), ["tag1", "tag2"].into() @@ -1335,7 +1388,7 @@ mod tests { }; // check updates for a new repo - let mut repo = RepoInfo::initial(snap1); + let mut repo = RepoInfo::initial(SpecVersionBin::current(), snap1); assert_eq!(repo.latest_updates()?.count(), 1); let (last_update, _, file) = repo.latest_updates()?.next().unwrap()?; assert!(file.is_none()); @@ -1346,6 +1399,7 @@ mod tests { // fill the first page of updates by adding branches for i in 1..=(UPDATES_PER_FILE - 1) { repo = repo.add_branch( + SpecVersionBin::current(), i.to_string().as_str(), &id1, (i - 1).to_string().as_str(), @@ -1381,7 +1435,7 @@ mod tests { assert!(repo.repo_before_updates()?.is_none()); // Now, if we add another change, it won't fit in the first "page" of repo updates - repo = repo.add_tag("tag", &id1, "first-branches")?; + repo = repo.add_tag(SpecVersionBin::current(), "tag", &id1, "first-branches")?; // the file only contains the first "page" worth of updates assert_eq!(repo.latest_updates()?.count(), UPDATES_PER_FILE); // next file is the oldest change diff --git a/icechunk/src/format/snapshot.rs b/icechunk/src/format/snapshot.rs index f69cc1600..0ce042057 100644 --- a/icechunk/src/format/snapshot.rs +++ b/icechunk/src/format/snapshot.rs @@ -364,9 +364,11 @@ impl Snapshot { self.buffer.as_slice() } + #[allow(clippy::too_many_arguments)] pub fn from_iter( id: Option, parent_id: Option, + spec_version: SpecVersionBin, message: String, properties: Option, mut manifest_files: Vec, @@ -395,7 +397,11 @@ impl Snapshot { .iter() .map(|(k, v)| { let name = builder.create_shared_string(k.as_str()); - let serialized = flexbuffers::to_vec(v).map_err(Box::new)?; + let serialized = if spec_version == SpecVersionBin::V1dot0 { + rmp_serde::to_vec(v).map_err(Box::new)? + } else { + flexbuffers::to_vec(v).map_err(Box::new)? + }; let value = builder.create_vector(serialized.as_slice()); Ok::<_, IcechunkFormatError>(generated::MetadataItem::create( @@ -435,15 +441,16 @@ impl Snapshot { let (mut buffer, offset) = builder.collapse(); buffer.drain(0..offset); buffer.shrink_to_fit(); - Ok(Snapshot { buffer, spec_version: SpecVersionBin::current() }) + Ok(Snapshot { buffer, spec_version }) } - pub fn initial() -> IcechunkResult { + pub fn initial(spec_version: SpecVersionBin) -> IcechunkResult { let properties = [("__root".to_string(), serde_json::Value::from(true))].into(); let nodes: Vec> = Vec::new(); Self::from_iter( Some(Self::INITIAL_SNAPSHOT_ID), None, + spec_version, Self::INITIAL_COMMIT_MESSAGE.to_string(), Some(properties), Default::default(), @@ -525,6 +532,7 @@ impl Snapshot { Snapshot::from_iter( Some(new_child.id()), Some(self.id()), + SpecVersionBin::V1dot0, // 2.0 doesn't use adopt new_child.message().clone(), Some(new_child.metadata()?.clone()), new_child.manifest_files().collect(), @@ -839,6 +847,7 @@ mod tests { let st = Snapshot::from_iter( None, None, + SpecVersionBin::current(), String::default(), Default::default(), manifests, diff --git a/icechunk/src/migrations/mod.rs b/icechunk/src/migrations/mod.rs index 34c276a3c..82b687242 100644 --- a/icechunk/src/migrations/mod.rs +++ b/icechunk/src/migrations/mod.rs @@ -118,7 +118,9 @@ async fn do_migrate( delete_unused_v1_files: bool, ) -> MigrationResult<()> { info!("Writing new repository info file"); - let new_version_info = repo.asset_manager().create_repo_info(repo_info).await?; + let new_asset_manager = + repo.asset_manager().clone_for_spec_version(SpecVersionBin::V2dot0); + let new_version_info = new_asset_manager.create_repo_info(repo_info).await?; info!(version=?new_version_info, "Written repository info file"); @@ -222,6 +224,7 @@ pub async fn migrate_1_to_2( info!("Creating repository info file"); let repo_info = Arc::new(RepoInfo::new( + SpecVersionBin::V2dot0, tags, branches, deleted_tags.iter().map(|s| s.as_str()), diff --git a/icechunk/src/ops/gc.rs b/icechunk/src/ops/gc.rs index a3ec9e9eb..c2d323408 100644 --- a/icechunk/src/ops/gc.rs +++ b/icechunk/src/ops/gc.rs @@ -422,6 +422,7 @@ async fn delete_snapshots_from_repo_info( .try_collect()?; let new_repo_info = RepoInfo::new( + asset_manager.spec_version(), repo_info.tags()?, repo_info.branches()?, repo_info.deleted_tags()?, @@ -808,6 +809,7 @@ pub async fn expire_v2( }), ); let new_repo_info = RepoInfo::new( + asset_manager.spec_version(), tags, branches, deleted_tag_names, diff --git a/icechunk/src/repository.rs b/icechunk/src/repository.rs index 00b71d05d..77a52a3a5 100644 --- a/icechunk/src/repository.rs +++ b/icechunk/src/repository.rs @@ -232,12 +232,12 @@ impl Repository { let settings_ref = &storage_settings; let create_repo_info = async move { // On create we need to create the default branch - let new_snapshot = Arc::new(Snapshot::initial()?); + let new_snapshot = Arc::new(Snapshot::initial(spec_version)?); asset_manager_c.write_snapshot(Arc::clone(&new_snapshot)).await?; if spec_version >= SpecVersionBin::V2dot0 { let snap_info = new_snapshot.as_ref().try_into()?; - let repo_info = Arc::new(RepoInfo::initial(snap_info)); + let repo_info = Arc::new(RepoInfo::initial(spec_version, snap_info)); let _ = asset_manager_c.create_repo_info(Arc::clone(&repo_info)).await?; } else { refs::update_branch( @@ -569,7 +569,11 @@ impl Repository { let do_update = |repo_info: Arc, backup_path: &str| { final_metadata = repo_info.metadata()?; final_metadata.extend(metadata.clone()); - Ok(Arc::new(repo_info.set_metadata(&final_metadata, backup_path)?)) + Ok(Arc::new(repo_info.set_metadata( + self.spec_version(), + &final_metadata, + backup_path, + )?)) }; let _ = self @@ -593,7 +597,11 @@ impl Repository { } let do_update = |repo_info: Arc, backup_path: &str| { - Ok(Arc::new(repo_info.set_metadata(metadata, backup_path)?)) + Ok(Arc::new(repo_info.set_metadata( + self.spec_version(), + metadata, + backup_path, + )?)) }; let _ = self @@ -818,7 +826,12 @@ impl Repository { ) -> RepositoryResult<()> { let do_update = |repo_info: Arc, backup_path: &str| { raise_if_invalid_snapshot_id_v2(repo_info.as_ref(), snapshot_id)?; - Ok(Arc::new(repo_info.add_branch(branch_name, snapshot_id, backup_path)?)) + Ok(Arc::new(repo_info.add_branch( + self.spec_version(), + branch_name, + snapshot_id, + backup_path, + )?)) }; let _ = self @@ -1014,7 +1027,7 @@ impl Repository { } let new_repo = repo_info - .update_branch(branch, to_snapshot_id, backup_path) + .update_branch(self.spec_version(), branch, to_snapshot_id, backup_path) .map_err(|err| match err { IcechunkFormatError { kind: IcechunkFormatErrorKind::BranchNotFound { .. }, @@ -1066,8 +1079,9 @@ impl Repository { #[instrument(skip(self))] async fn delete_branch_v2(&self, branch: &str) -> RepositoryResult<()> { let do_update = |repo_info: Arc, backup_path: &str| { - let new_repo = - repo_info.delete_branch(branch, backup_path).map_err(|err| match err { + let new_repo = repo_info + .delete_branch(self.spec_version(), branch, backup_path) + .map_err(|err| match err { IcechunkFormatError { kind: IcechunkFormatErrorKind::BranchNotFound { .. }, .. @@ -1112,8 +1126,9 @@ impl Repository { #[instrument(skip(self))] async fn delete_tag_v2(&self, tag: &str) -> RepositoryResult<()> { let do_update = |repo_info: Arc, backup_path: &str| { - let new_repo = - repo_info.delete_tag(tag, backup_path).map_err(|err| match err { + let new_repo = repo_info + .delete_tag(self.spec_version(), tag, backup_path) + .map_err(|err| match err { IcechunkFormatError { kind: IcechunkFormatErrorKind::TagNotFound { .. }, .. @@ -1178,17 +1193,16 @@ impl Repository { ) -> RepositoryResult<()> { let do_update = |repo_info: Arc, backup_path: &str| { raise_if_invalid_snapshot_id_v2(repo_info.as_ref(), snapshot_id)?; - let new_repo = - repo_info.add_tag(tag_name, snapshot_id, backup_path).map_err(|err| { - match err { - IcechunkFormatError { - kind: IcechunkFormatErrorKind::TagAlreadyExists { .. }, - .. - } => RepositoryError::from(RefError::from( - RefErrorKind::TagAlreadyExists(tag_name.to_string()), - )), - err => RepositoryError::from(err), - } + let new_repo = repo_info + .add_tag(self.spec_version(), tag_name, snapshot_id, backup_path) + .map_err(|err| match err { + IcechunkFormatError { + kind: IcechunkFormatErrorKind::TagAlreadyExists { .. }, + .. + } => RepositoryError::from(RefError::from( + RefErrorKind::TagAlreadyExists(tag_name.to_string()), + )), + err => RepositoryError::from(err), }); Ok(Arc::new(new_repo?)) }; diff --git a/icechunk/src/session.rs b/icechunk/src/session.rs index e91ab2e9a..f92464fb8 100644 --- a/icechunk/src/session.rs +++ b/icechunk/src/session.rs @@ -1155,6 +1155,7 @@ impl Session { ..new_snap.as_ref().try_into()? }; Ok(Arc::new(repo_info.add_snapshot( + self.spec_version(), new_snapshot_info, None, update_type.clone(), @@ -1175,6 +1176,10 @@ impl Session { Ok(()) } + fn spec_version(&self) -> SpecVersionBin { + self.asset_manager.spec_version() + } + pub async fn flush( &mut self, message: &str, @@ -1201,7 +1206,7 @@ impl Session { do_flush(flush_data, message, properties, false, CommitMethod::NewCommit) .await?; - match self.asset_manager.spec_version() { + match self.spec_version() { SpecVersionBin::V1dot0 => self.flush_v1(Arc::clone(&new_snap)).await, SpecVersionBin::V2dot0 => self.flush_v2(Arc::clone(&new_snap)).await, }?; @@ -1265,7 +1270,7 @@ impl Session { // amend is only allowed in spec v2, this should be checked at this point so we only assert assert!( - self.asset_manager.spec_version() >= SpecVersionBin::V2dot0 + self.spec_version() >= SpecVersionBin::V2dot0 || commit_method == CommitMethod::NewCommit ); @@ -1420,7 +1425,7 @@ impl Session { debug!("Rebase started"); - let new_commits = match self.asset_manager.spec_version() { + let new_commits = match self.spec_version() { SpecVersionBin::V1dot0 => { self.commits_to_rebase_v1(branch_name.as_str()).await? } @@ -2402,6 +2407,7 @@ async fn do_flush( let new_snapshot = Snapshot::from_iter( None, parent_id, + flush_data.asset_manager.spec_version(), message.to_string(), Some(properties), flush_data.manifest_files.into_iter().collect(), @@ -2622,6 +2628,7 @@ async fn do_commit_v2( }, }; Ok(Arc::new(repo_info.add_snapshot( + asset_manager.spec_version(), new_snapshot_info, Some(branch_name), update_type, @@ -3182,11 +3189,12 @@ mod tests { }, ]; - let initial = Snapshot::initial().unwrap(); + let initial = Snapshot::initial(SpecVersionBin::current()).unwrap(); let manifests = vec![ManifestFileInfo::new(manifest.as_ref(), manifest_size)]; let snapshot = Arc::new(Snapshot::from_iter( None, None, + SpecVersionBin::current(), "message".to_string(), None, manifests, @@ -3209,12 +3217,15 @@ mod tests { &storage::VersionInfo::for_creation(), ) .await?; - let repo_info = RepoInfo::initial((&initial).try_into()?).add_snapshot( - snapshot.as_ref().try_into()?, - Some("main"), - UpdateType::NewCommitUpdate { branch: "main".to_string() }, - "backup_path", - )?; + let repo_info = + RepoInfo::initial(SpecVersionBin::current(), (&initial).try_into()?) + .add_snapshot( + SpecVersionBin::current(), + snapshot.as_ref().try_into()?, + Some("main"), + UpdateType::NewCommitUpdate { branch: "main".to_string() }, + "backup_path", + )?; asset_manager.create_repo_info(Arc::new(repo_info)).await?; let repo = Repository::open(None, storage, HashMap::new()).await?;