diff --git a/icechunk-python/src/repository.rs b/icechunk-python/src/repository.rs index 893791325..99a4ea5ff 100644 --- a/icechunk-python/src/repository.rs +++ b/icechunk-python/src/repository.rs @@ -2079,9 +2079,10 @@ impl PyRepository { py.detach(move || { let result = pyo3_async_runtimes::tokio::get_runtime().block_on(async move { - let asset_manager = { + let (asset_manager, num_updates) = { let lock = self.0.read().await; - Arc::clone(lock.asset_manager()) + let num_updates = lock.config().num_updates_per_repo_info_file(); + (Arc::clone(lock.asset_manager()), num_updates) }; let result = expire( @@ -2097,6 +2098,7 @@ impl PyRepository { } else { ExpiredRefAction::Ignore }, + num_updates, ) .await .map_err(PyIcechunkStoreError::GCError)?; @@ -2123,9 +2125,10 @@ impl PyRepository { ) -> PyResult> { let repository = self.0.clone(); pyo3_async_runtimes::tokio::future_into_py::<_, HashSet>(py, async move { - let asset_manager = { + let (asset_manager, num_updates) = { let lock = repository.read().await; - Arc::clone(lock.asset_manager()) + let num_updates = lock.config().num_updates_per_repo_info_file(); + (Arc::clone(lock.asset_manager()), num_updates) }; let result = expire( @@ -2141,6 +2144,7 @@ impl PyRepository { } else { ExpiredRefAction::Ignore }, + num_updates, ) .await .map_err(PyIcechunkStoreError::GCError)?; @@ -2170,11 +2174,12 @@ impl PyRepository { max_concurrent_manifest_fetches, dry_run, ); - let asset_manager = { + let (asset_manager, num_updates) = { let lock = self.0.read().await; - Arc::clone(lock.asset_manager()) + let num_updates = lock.config().num_updates_per_repo_info_file(); + (Arc::clone(lock.asset_manager()), num_updates) }; - let result = garbage_collect(asset_manager, &gc_config) + let result = garbage_collect(asset_manager, &gc_config, num_updates) .await .map_err(PyIcechunkStoreError::GCError)?; Ok::<_, PyIcechunkStoreError>(result.into()) @@ -2204,11 +2209,12 @@ impl PyRepository { max_concurrent_manifest_fetches, dry_run, ); - let asset_manager = { + let (asset_manager, num_updates) = { let lock = repository.read().await; - Arc::clone(lock.asset_manager()) + let num_updates = lock.config().num_updates_per_repo_info_file(); + (Arc::clone(lock.asset_manager()), num_updates) }; - let result = garbage_collect(asset_manager, &gc_config) + let result = garbage_collect(asset_manager, &gc_config, num_updates) .await .map_err(PyIcechunkStoreError::GCError)?; Ok(result.into()) diff --git a/icechunk/src/config.rs b/icechunk/src/config.rs index 4fd9ebaaf..cc2671f54 100644 --- a/icechunk/src/config.rs +++ b/icechunk/src/config.rs @@ -402,12 +402,17 @@ pub struct RepositoryConfig { #[serde(default)] pub previous_file: Option, + + #[serde(default)] + pub num_updates_per_repo_info_file: Option, } static DEFAULT_COMPRESSION: OnceLock = OnceLock::new(); static DEFAULT_CACHING: OnceLock = OnceLock::new(); static DEFAULT_MANIFEST_CONFIG: OnceLock = OnceLock::new(); pub const DEFAULT_MAX_CONCURRENT_REQUESTS: u16 = 256; +// usize? +pub const DEFAULT_NUM_UPDATES_PER_REPO_INFO_FILE: usize = 100; impl RepositoryConfig { pub fn inline_chunk_threshold_bytes(&self) -> u16 { @@ -441,6 +446,11 @@ impl RepositoryConfig { self.max_concurrent_requests.unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) } + pub fn num_updates_per_repo_info_file(&self) -> usize { + self.num_updates_per_repo_info_file + .unwrap_or(DEFAULT_NUM_UPDATES_PER_REPO_INFO_FILE) + } + pub fn merge(&self, other: Self) -> Self { Self { inline_chunk_threshold_bytes: other @@ -501,6 +511,15 @@ impl RepositoryConfig { (Some(c), None) => Some(c.clone()), (Some(_), Some(theirs)) => Some(theirs), }, + num_updates_per_repo_info_file: match ( + &self.num_updates_per_repo_info_file, + other.num_updates_per_repo_info_file, + ) { + (None, None) => None, + (None, Some(c)) => Some(c), + (Some(c), None) => Some(*c), + (Some(_), Some(theirs)) => Some(theirs), + }, } } } diff --git a/icechunk/src/format/repo_info.rs b/icechunk/src/format/repo_info.rs index c51072d01..1b3c2351e 100644 --- a/icechunk/src/format/repo_info.rs +++ b/icechunk/src/format/repo_info.rs @@ -23,7 +23,7 @@ pub struct RepoInfo { } // FIXME: make configurable -const UPDATES_PER_FILE: usize = 100; +// const UPDATES_PER_FILE: usize = 100; impl std::fmt::Debug for RepoInfo { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -102,6 +102,7 @@ impl RepoInfo { metadata: &SnapshotProperties, update: UpdateInfo, backup_path: Option<&'a str>, + num_updates_per_file: usize, ) -> IcechunkResult { let mut snapshots: Vec<_> = snapshots.into_iter().collect(); snapshots.sort_by(|a, b| a.id.0.cmp(&b.id.0)); @@ -117,6 +118,7 @@ impl RepoInfo { metadata, update, backup_path, + num_updates_per_file, ) } @@ -131,6 +133,7 @@ impl RepoInfo { metadata: &SnapshotProperties, update: UpdateInfo, backup_path: Option<&'a str>, + num_updates_per_file: usize, ) -> IcechunkResult { let mut builder = flatbuffers::FlatBufferBuilder::with_capacity(4_096); let tags = sorted_tags @@ -254,8 +257,12 @@ impl RepoInfo { let metadata = builder.create_vector(metadata_items.as_slice()); - let (latest_updates, repo_before_updates) = - Self::mk_latest_updates(&mut builder, update, backup_path)?; + let (latest_updates, repo_before_updates) = Self::mk_latest_updates( + &mut builder, + update, + backup_path, + num_updates_per_file, + )?; // TODO: provide accessors for last_updated_at, status, metadata, etc. let repo_args = generated::RepoArgs { @@ -286,6 +293,7 @@ impl RepoInfo { builder: &mut flatbuffers::FlatBufferBuilder<'bldr>, update: UpdateInfo, backup_path: Option<&'a str>, + num_updates_per_file: usize, ) -> IcechunkResult<( WIPOffset< flatbuffers::Vector< @@ -327,12 +335,12 @@ impl RepoInfo { let mut repo_before_updates = None; let updates: Vec<_> = all_updates - .take(UPDATES_PER_FILE + 1) + .take(num_updates_per_file + 1) .enumerate() .map(|(idx, maybe_data)| maybe_data.map(|(d1, d2, d3)| (idx, d1, d2, d3))) .map(|maybe_data| { let (idx, u_type, u_time, file) = maybe_data?; - if idx == UPDATES_PER_FILE { + if idx == num_updates_per_file { repo_before_updates = file; } let (update_type_type, update_type) = @@ -352,17 +360,17 @@ impl RepoInfo { .try_collect()?; debug_assert!( - updates.len() <= UPDATES_PER_FILE + 1, + updates.len() <= num_updates_per_file + 1, "Too many latest updates in repo file" ); - let size = (UPDATES_PER_FILE - 1).min(updates.len() - 1); + let size = (num_updates_per_file - 1).min(updates.len() - 1); let updates = builder.create_vector(&updates[0..=size]); let repo_before_updates = repo_before_updates.map(|s| builder.create_string(s)); Ok((updates, repo_before_updates)) } - pub fn initial(snapshot: SnapshotInfo) -> Self { + pub fn initial(snapshot: SnapshotInfo, num_updates_per_file: usize) -> Self { let last_updated_at = snapshot.flushed_at; #[allow(clippy::expect_used)] // This method is basically constant, so it's OK to unwrap in it @@ -378,6 +386,7 @@ impl RepoInfo { previous_updates: [], }, None, + num_updates_per_file, ) .expect("Cannot generate initial snapshot") } @@ -421,6 +430,7 @@ impl RepoInfo { branch: Option<&str>, update_type: UpdateType, previous_file: &str, + num_updates_per_file: usize, ) -> IcechunkResult { let flushed_at = snap.flushed_at; let mut snapshots: Vec<_> = self.all_snapshots()?.try_collect()?; @@ -460,6 +470,7 @@ impl RepoInfo { previous_updates: self.latest_updates()?, }, Some(previous_file), + num_updates_per_file, )?; Ok(res) } @@ -469,6 +480,7 @@ impl RepoInfo { name: &str, snap: &SnapshotId, previous_file: &str, + num_updates_per_file: usize, ) -> IcechunkResult { if let Ok(snapshot_id) = self.resolve_branch(name) { return Err(IcechunkFormatErrorKind::BranchAlreadyExists { @@ -498,6 +510,7 @@ impl RepoInfo { previous_updates: self.latest_updates()?, }, Some(previous_file), + num_updates_per_file, )?) } None => Err(IcechunkFormatErrorKind::SnapshotIdNotFound { @@ -507,7 +520,12 @@ impl RepoInfo { } } - pub fn delete_branch(&self, name: &str, previous_file: &str) -> IcechunkResult { + pub fn delete_branch( + &self, + name: &str, + previous_file: &str, + num_updates_per_file: usize, + ) -> IcechunkResult { match self.resolve_branch(name) { Ok(previous_snap_id) => { let mut branches: Vec<_> = self.all_branches()?.collect(); @@ -529,6 +547,7 @@ impl RepoInfo { previous_updates: self.latest_updates()?, }, Some(previous_file), + num_updates_per_file, ) } Err(IcechunkFormatError { @@ -547,6 +566,7 @@ impl RepoInfo { name: &str, new_snap: &SnapshotId, previous_file: &str, + num_updates_per_file: usize, ) -> IcechunkResult { let previous_snap_id = self.resolve_branch(name)?; match self.resolve_snapshot_index(new_snap)? { @@ -570,6 +590,7 @@ impl RepoInfo { previous_updates: self.latest_updates()?, }, Some(previous_file), + num_updates_per_file, )?) } None => Err(IcechunkFormatErrorKind::SnapshotIdNotFound { @@ -584,6 +605,7 @@ impl RepoInfo { name: &str, snap: &SnapshotId, previous_file: &str, + num_updates_per_file: usize, ) -> IcechunkResult { if self.resolve_tag(name).is_ok() || self.tag_was_deleted(name)? { // TODO: better error on tag already deleted @@ -613,6 +635,7 @@ impl RepoInfo { previous_updates: self.latest_updates()?, }, Some(previous_file), + num_updates_per_file, )?) } None => Err(IcechunkFormatErrorKind::SnapshotIdNotFound { @@ -622,7 +645,12 @@ impl RepoInfo { } } - pub fn delete_tag(&self, name: &str, previous_file: &str) -> IcechunkResult { + pub fn delete_tag( + &self, + name: &str, + previous_file: &str, + num_updates_per_file: usize, + ) -> IcechunkResult { match self.resolve_tag(name) { Ok(previous_snap_id) => { let mut tags: Vec<_> = self.all_tags()?.collect(); @@ -648,6 +676,7 @@ impl RepoInfo { previous_updates: self.latest_updates()?, }, Some(previous_file), + num_updates_per_file, ) } Err(IcechunkFormatError { @@ -664,6 +693,7 @@ impl RepoInfo { &self, metadata: &SnapshotProperties, previous_file: &str, + num_updates_per_file: usize, ) -> IcechunkResult { let snaps: Vec<_> = self.all_snapshots()?.try_collect()?; Self::from_parts( @@ -678,6 +708,7 @@ impl RepoInfo { previous_updates: self.latest_updates()?, }, Some(previous_file), + num_updates_per_file, ) } @@ -1168,7 +1199,7 @@ mod tests { message: "snap 1".to_string(), metadata: Default::default(), }; - let repo = RepoInfo::initial(snap1.clone()); + let repo = RepoInfo::initial(snap1.clone(), 100); assert_eq!(repo.all_snapshots()?.next().unwrap().unwrap(), snap1); let id2 = SnapshotId::random(); @@ -1335,16 +1366,17 @@ mod tests { }; // check updates for a new repo - let mut repo = RepoInfo::initial(snap1); + let mut repo = RepoInfo::initial(snap1, 100); assert_eq!(repo.latest_updates()?.count(), 1); let (last_update, _, file) = repo.latest_updates()?.next().unwrap()?; assert!(file.is_none()); assert_eq!(last_update, UpdateType::RepoInitializedUpdate); assert!(repo.repo_before_updates()?.is_none()); - // check updates after UPDATES_PER_FILE changes + let num_updates_per_file: usize = 10; + // check updates after num_updates_per_file changes // fill the first page of updates by adding branches - for i in 1..=(UPDATES_PER_FILE - 1) { + for i in 1..=(num_updates_per_file - 1) { repo = repo.add_branch( i.to_string().as_str(), &id1, @@ -1352,20 +1384,20 @@ mod tests { )? } - assert_eq!(repo.latest_updates()?.count(), UPDATES_PER_FILE); + assert_eq!(repo.latest_updates()?.count(), num_updates_per_file); let updates = repo.latest_updates()?; // check all other updates for (idx, update) in updates.enumerate() { let (update, _, file) = update?; - if idx == UPDATES_PER_FILE - 1 { + if idx == num_updates_per_file - 1 { assert_eq!(update, UpdateType::RepoInitializedUpdate); assert_eq!(file, Some("0")); } else { assert_eq!( update, UpdateType::BranchCreatedUpdate { - name: (UPDATES_PER_FILE - 1 - idx).to_string() + name: (num_updates_per_file - 1 - idx).to_string() } ); if idx == 0 { @@ -1373,7 +1405,7 @@ mod tests { } else { assert_eq!( file, - Some((UPDATES_PER_FILE - 1 - idx).to_string().as_str()) + Some((num_updates_per_file - 1 - idx).to_string().as_str()) ); } } @@ -1383,7 +1415,7 @@ mod tests { // Now, if we add another change, it won't fit in the first "page" of repo updates repo = repo.add_tag("tag", &id1, "first-branches")?; // the file only contains the first "page" worth of updates - assert_eq!(repo.latest_updates()?.count(), UPDATES_PER_FILE); + assert_eq!(repo.latest_updates()?.count(), num_updates_per_file); // next file is the oldest change assert_eq!(repo.repo_before_updates()?, Some("0")); let mut updates = repo.latest_updates()?; @@ -1403,11 +1435,11 @@ mod tests { // all other changes are branch creation (repo creation is in the next page) for (idx, update) in updates.enumerate() { let (update, _, file) = update?; - assert_eq!(file, Some((UPDATES_PER_FILE - 2 - idx).to_string().as_str())); + assert_eq!(file, Some((num_updates_per_file - 2 - idx).to_string().as_str())); assert_eq!( update, UpdateType::BranchCreatedUpdate { - name: (UPDATES_PER_FILE - 2 - idx).to_string() + name: (num_updates_per_file - 2 - idx).to_string() } ); } diff --git a/icechunk/src/migrations/mod.rs b/icechunk/src/migrations/mod.rs index 34c276a3c..6cda8f6fb 100644 --- a/icechunk/src/migrations/mod.rs +++ b/icechunk/src/migrations/mod.rs @@ -236,6 +236,7 @@ pub async fn migrate_1_to_2( previous_updates: [], }, None, + repo.config().num_updates_per_repo_info_file(), )?); if dry_run { diff --git a/icechunk/src/ops/gc.rs b/icechunk/src/ops/gc.rs index a3ec9e9eb..cbbca2c42 100644 --- a/icechunk/src/ops/gc.rs +++ b/icechunk/src/ops/gc.rs @@ -320,6 +320,7 @@ pub async fn find_retained( pub async fn garbage_collect( asset_manager: Arc, config: &GCConfig, + num_updates_per_repo_info_file: usize, ) -> GCResult { if !asset_manager.can_write_to_storage().await? { return Err(GCError::Repository( @@ -379,8 +380,12 @@ pub async fn garbage_collect( // work: we want to dolete snapshots before deleting chunks, etc if config.deletes_snapshots() { if !config.dry_run && repo_info.is_some() { - delete_snapshots_from_repo_info(asset_manager.as_ref(), &keep_snapshots) - .await?; + delete_snapshots_from_repo_info( + asset_manager.as_ref(), + &keep_snapshots, + num_updates_per_repo_info_file, + ) + .await?; } let res = gc_snapshots(asset_manager.as_ref(), config, &keep_snapshots).await?; summary.snapshots_deleted = res.deleted_objects; @@ -410,6 +415,7 @@ pub async fn garbage_collect( async fn delete_snapshots_from_repo_info( asset_manager: &AssetManager, keep_snapshots: &HashSet, + num_updates_per_repo_info_file: usize, ) -> GCResult<()> { // FIXME: IMPORTANT // Notice this loses any new snapshots that may have been created while GC was running @@ -433,6 +439,7 @@ async fn delete_snapshots_from_repo_info( previous_updates: repo_info.latest_updates()?, }, Some(backup_path), + num_updates_per_repo_info_file, )?; Ok(Arc::new(new_repo_info)) @@ -614,6 +621,7 @@ pub async fn expire( older_than: DateTime, expired_branches: ExpiredRefAction, expired_tags: ExpiredRefAction, + num_updates_per_repo_info_file: usize, ) -> GCResult { match asset_manager.spec_version() { SpecVersionBin::V1dot0 => { @@ -626,7 +634,14 @@ pub async fn expire( .await } SpecVersionBin::V2dot0 => { - expire_v2(asset_manager, older_than, expired_branches, expired_tags).await + expire_v2( + asset_manager, + older_than, + expired_branches, + expired_tags, + num_updates_per_repo_info_file, + ) + .await } } } @@ -637,6 +652,7 @@ pub async fn expire_v2( older_than: DateTime, expired_branches: ExpiredRefAction, expired_tags: ExpiredRefAction, + num_updates_per_repo_info_file: usize, ) -> GCResult { if !asset_manager.can_write_to_storage().await? { return Err(GCError::Repository( @@ -819,6 +835,7 @@ pub async fn expire_v2( previous_updates: repo_info.latest_updates()?, }, Some(backup_path), + num_updates_per_repo_info_file, )?; Ok(Arc::new(new_repo_info)) diff --git a/icechunk/src/repository.rs b/icechunk/src/repository.rs index 00b71d05d..1989998b4 100644 --- a/icechunk/src/repository.rs +++ b/icechunk/src/repository.rs @@ -230,6 +230,7 @@ impl Repository { let asset_manager_c = Arc::clone(&asset_manager); let storage_c = Arc::clone(&storage); let settings_ref = &storage_settings; + let num_updates = config.num_updates_per_repo_info_file(); let create_repo_info = async move { // On create we need to create the default branch let new_snapshot = Arc::new(Snapshot::initial()?); @@ -237,7 +238,7 @@ impl Repository { if spec_version >= SpecVersionBin::V2dot0 { let snap_info = new_snapshot.as_ref().try_into()?; - let repo_info = Arc::new(RepoInfo::initial(snap_info)); + let repo_info = Arc::new(RepoInfo::initial(snap_info, num_updates)); let _ = asset_manager_c.create_repo_info(Arc::clone(&repo_info)).await?; } else { refs::update_branch( @@ -566,10 +567,15 @@ impl Repository { .into()); } let mut final_metadata = Default::default(); + let num_updates = self.config().num_updates_per_repo_info_file(); let do_update = |repo_info: Arc, backup_path: &str| { final_metadata = repo_info.metadata()?; final_metadata.extend(metadata.clone()); - Ok(Arc::new(repo_info.set_metadata(&final_metadata, backup_path)?)) + Ok(Arc::new(repo_info.set_metadata( + &final_metadata, + backup_path, + num_updates, + )?)) }; let _ = self @@ -592,8 +598,9 @@ impl Repository { .into()); } + let num_updates = self.config().num_updates_per_repo_info_file(); let do_update = |repo_info: Arc, backup_path: &str| { - Ok(Arc::new(repo_info.set_metadata(metadata, backup_path)?)) + Ok(Arc::new(repo_info.set_metadata(metadata, backup_path, num_updates)?)) }; let _ = self @@ -816,9 +823,15 @@ impl Repository { branch_name: &str, snapshot_id: &SnapshotId, ) -> RepositoryResult<()> { + let num_updates = self.config.num_updates_per_repo_info_file(); let do_update = |repo_info: Arc, backup_path: &str| { raise_if_invalid_snapshot_id_v2(repo_info.as_ref(), snapshot_id)?; - Ok(Arc::new(repo_info.add_branch(branch_name, snapshot_id, backup_path)?)) + Ok(Arc::new(repo_info.add_branch( + branch_name, + snapshot_id, + backup_path, + num_updates, + )?)) }; let _ = self @@ -1012,9 +1025,10 @@ impl Repository { } .into()); } + let num_updates = self.config.num_updates_per_repo_info_file(); let new_repo = repo_info - .update_branch(branch, to_snapshot_id, backup_path) + .update_branch(branch, to_snapshot_id, backup_path, num_updates) .map_err(|err| match err { IcechunkFormatError { kind: IcechunkFormatErrorKind::BranchNotFound { .. }, @@ -1065,9 +1079,11 @@ impl Repository { #[instrument(skip(self))] async fn delete_branch_v2(&self, branch: &str) -> RepositoryResult<()> { + let num_updates = self.config.num_updates_per_repo_info_file(); let do_update = |repo_info: Arc, backup_path: &str| { - let new_repo = - repo_info.delete_branch(branch, backup_path).map_err(|err| match err { + let new_repo = repo_info + .delete_branch(branch, backup_path, num_updates) + .map_err(|err| match err { IcechunkFormatError { kind: IcechunkFormatErrorKind::BranchNotFound { .. }, .. @@ -1111,16 +1127,19 @@ impl Repository { #[instrument(skip(self))] async fn delete_tag_v2(&self, tag: &str) -> RepositoryResult<()> { + let num_updates = self.config.num_updates_per_repo_info_file(); let do_update = |repo_info: Arc, backup_path: &str| { let new_repo = - repo_info.delete_tag(tag, backup_path).map_err(|err| match err { - IcechunkFormatError { - kind: IcechunkFormatErrorKind::TagNotFound { .. }, - .. - } => RepositoryError::from(RefError::from( - RefErrorKind::RefNotFound(tag.to_string()), - )), - err => RepositoryError::from(err), + repo_info.delete_tag(tag, backup_path, num_updates).map_err(|err| { + match err { + IcechunkFormatError { + kind: IcechunkFormatErrorKind::TagNotFound { .. }, + .. + } => RepositoryError::from(RefError::from( + RefErrorKind::RefNotFound(tag.to_string()), + )), + err => RepositoryError::from(err), + } }); Ok(Arc::new(new_repo?)) }; @@ -1176,19 +1195,19 @@ impl Repository { tag_name: &str, snapshot_id: &SnapshotId, ) -> RepositoryResult<()> { + let num_updates = self.config.num_updates_per_repo_info_file(); let do_update = |repo_info: Arc, backup_path: &str| { raise_if_invalid_snapshot_id_v2(repo_info.as_ref(), snapshot_id)?; - let new_repo = - repo_info.add_tag(tag_name, snapshot_id, backup_path).map_err(|err| { - match err { - IcechunkFormatError { - kind: IcechunkFormatErrorKind::TagAlreadyExists { .. }, - .. - } => RepositoryError::from(RefError::from( - RefErrorKind::TagAlreadyExists(tag_name.to_string()), - )), - err => RepositoryError::from(err), - } + let new_repo = repo_info + .add_tag(tag_name, snapshot_id, backup_path, num_updates) + .map_err(|err| match err { + IcechunkFormatError { + kind: IcechunkFormatErrorKind::TagAlreadyExists { .. }, + .. + } => RepositoryError::from(RefError::from( + RefErrorKind::TagAlreadyExists(tag_name.to_string()), + )), + err => RepositoryError::from(err), }); Ok(Arc::new(new_repo?)) }; diff --git a/icechunk/src/session.rs b/icechunk/src/session.rs index e91ab2e9a..ff9588036 100644 --- a/icechunk/src/session.rs +++ b/icechunk/src/session.rs @@ -1148,7 +1148,7 @@ impl Session { async fn flush_v2(&mut self, new_snap: Arc) -> SessionResult<()> { let update_type = UpdateType::NewDetachedSnapshotUpdate { new_snap_id: new_snap.id().clone() }; - + let num_updates = self.config.num_updates_per_repo_info_file(); let do_update = |repo_info: Arc, backup_path: &str| { let new_snapshot_info = SnapshotInfo { parent_id: Some(self.snapshot_id().clone()), @@ -1159,6 +1159,7 @@ impl Session { None, update_type.clone(), backup_path, + num_updates, )?)) }; @@ -1279,6 +1280,7 @@ impl Session { merged }) .unwrap_or(default_metadata); + let num_updates = self.config().num_updates_per_repo_info_file(); { // we need to play this trick because we need to borrow from self twice // once to get the mutable change set, and other to compute @@ -1300,6 +1302,7 @@ impl Session { &self.splits, rewrite_manifests, commit_method, + num_updates, ) .await?; @@ -2488,6 +2491,7 @@ async fn do_commit( splits: &HashMap, rewrite_manifests: bool, commit_method: CommitMethod, + num_updates_per_repo_info_file: usize, ) -> SessionResult { info!(branch_name, old_snapshot_id=%snapshot_id, "Commit started"); let properties = properties.unwrap_or_default(); @@ -2518,6 +2522,7 @@ async fn do_commit( snapshot_id, new_snapshot, commit_method, + num_updates_per_repo_info_file, ) .await } @@ -2583,6 +2588,7 @@ async fn do_commit_v2( parent_snapshot_id: &SnapshotId, new_snapshot: Arc, commit_method: CommitMethod, + num_updates_per_repo_info_file: usize, ) -> RepositoryResult { let mut attempt = 0; let new_snapshot_id = new_snapshot.id(); @@ -2626,6 +2632,7 @@ async fn do_commit_v2( Some(branch_name), update_type, backup_path, + num_updates_per_repo_info_file, )?)) };