diff --git a/icechunk/src/format/snapshot.rs b/icechunk/src/format/snapshot.rs index 9ba0c9da6..2992a7900 100644 --- a/icechunk/src/format/snapshot.rs +++ b/icechunk/src/format/snapshot.rs @@ -313,6 +313,7 @@ impl Snapshot { message: String, properties: Option, mut manifest_files: Vec, + flushed_at: Option>, sorted_iter: I, ) -> IcechunkResult where @@ -349,7 +350,7 @@ impl Snapshot { let message = builder.create_string(&message); let parent_id = parent_id.map(|oid| generated::ObjectId12::new(&oid.0)); - let flushed_at = Utc::now().timestamp_micros() as u64; + let flushed_at = flushed_at.unwrap_or_else(Utc::now).timestamp_micros() as u64; let id = generated::ObjectId12::new(&id.unwrap_or_else(SnapshotId::random).0); let nodes: Vec<_> = sorted_iter @@ -387,6 +388,7 @@ impl Snapshot { Self::INITIAL_COMMIT_MESSAGE.to_string(), Some(properties), Default::default(), + None, nodes, ) } @@ -459,6 +461,7 @@ impl Snapshot { new_child.message().clone(), Some(new_child.metadata()?.clone()), new_child.manifest_files().collect(), + Some(new_child.flushed_at()?), new_child.iter(), ) } @@ -736,6 +739,7 @@ mod tests { String::default(), Default::default(), manifests, + None, nodes.into_iter().map(Ok::), ) .unwrap(); diff --git a/icechunk/src/ops/gc.rs b/icechunk/src/ops/gc.rs index 109b8662f..a7943764f 100644 --- a/icechunk/src/ops/gc.rs +++ b/icechunk/src/ops/gc.rs @@ -344,9 +344,12 @@ async fn gc_transaction_logs( #[derive(Debug, PartialEq, Eq, Clone)] pub enum ExpireRefResult { - RefIsExpired, NothingToDo, - Done { released_snapshots: HashSet, edited_snapshot: SnapshotId }, + Done { + released_snapshots: HashSet, + edited_snapshot: SnapshotId, + ref_is_expired: bool, + }, } /// Expire snapshots older than a threshold. @@ -394,10 +397,10 @@ pub async fn expire_ref( pin!(ancestry); - // If we point to an expired snapshot already, there is nothing to do + let mut ref_is_expired = false; if let Some(Ok(info)) = ancestry.as_mut().peek().await { if info.flushed_at < older_than { - return Ok(ExpireRefResult::RefIsExpired); + ref_is_expired = true; } } @@ -434,8 +437,6 @@ pub async fn expire_ref( // and, we only set a root as parent assert!(root.parent_id().is_none()); - assert!(editable_snap.flushed_at()? >= older_than); - // TODO: add properties to the snapshot that tell us it was history edited let new_snapshot = Arc::new(root.adopt(&editable_snap)?); asset_manager.write_snapshot(new_snapshot).await?; @@ -443,6 +444,7 @@ pub async fn expire_ref( Ok(ExpireRefResult::Done { released_snapshots: released, edited_snapshot: editable_snap.id().clone(), + ref_is_expired, }) } @@ -500,33 +502,41 @@ pub async fn expire( }) .try_fold(ExpireResult::default(), |mut result, (r, ref_result)| async move { match ref_result { - ExpireRefResult::Done { released_snapshots, edited_snapshot } => { + ExpireRefResult::Done { + released_snapshots, + edited_snapshot, + ref_is_expired, + } => { result.released_snapshots.extend(released_snapshots.into_iter()); result.edited_snapshots.insert(edited_snapshot); - Ok(result) - } - ExpireRefResult::RefIsExpired => match &r { - Ref::Tag(name) => { - if expired_tags == ExpiredRefAction::Delete { - delete_tag(storage, storage_settings, name.as_str()) - .await - .map_err(GCError::Ref)?; - result.deleted_refs.insert(r); + if ref_is_expired { + match &r { + Ref::Tag(name) => { + if expired_tags == ExpiredRefAction::Delete { + delete_tag(storage, storage_settings, name.as_str()) + .await + .map_err(GCError::Ref)?; + result.deleted_refs.insert(r); + } + } + Ref::Branch(name) => { + if expired_branches == ExpiredRefAction::Delete + && name != Ref::DEFAULT_BRANCH + { + delete_branch( + storage, + storage_settings, + name.as_str(), + ) + .await + .map_err(GCError::Ref)?; + result.deleted_refs.insert(r); + } + } } - Ok(result) } - Ref::Branch(name) => { - if expired_branches == ExpiredRefAction::Delete - && name != Ref::DEFAULT_BRANCH - { - delete_branch(storage, storage_settings, name.as_str()) - .await - .map_err(GCError::Ref)?; - result.deleted_refs.insert(r); - } - Ok(result) - } - }, + Ok(result) + } ExpireRefResult::NothingToDo => Ok(result), } }) diff --git a/icechunk/src/session.rs b/icechunk/src/session.rs index ce5293180..72c7d2c61 100644 --- a/icechunk/src/session.rs +++ b/icechunk/src/session.rs @@ -1576,6 +1576,7 @@ async fn flush( message.to_string(), Some(properties), flush_data.manifest_files.into_iter().collect(), + None, all_nodes.into_iter().map(Ok::<_, Infallible>), )?; @@ -2080,6 +2081,7 @@ mod tests { "message".to_string(), None, manifests, + None, nodes.iter().cloned().map(Ok::), )?); asset_manager.write_snapshot(Arc::clone(&snapshot)).await?; diff --git a/icechunk/tests/test_gc.rs b/icechunk/tests/test_gc.rs index e1700037f..2bd042b56 100644 --- a/icechunk/tests/test_gc.rs +++ b/icechunk/tests/test_gc.rs @@ -293,14 +293,12 @@ pub async fn test_expire_ref() -> Result<(), Box> { ) .await? { - ExpireRefResult::RefIsExpired => { - panic!() - } ExpireRefResult::NothingToDo => { panic!() } - ExpireRefResult::Done { released_snapshots, .. } => { + ExpireRefResult::Done { released_snapshots, ref_is_expired, .. } => { assert_eq!(released_snapshots.len(), 4); + assert!(!ref_is_expired); } } @@ -332,9 +330,9 @@ pub async fn test_expire_ref() -> Result<(), Box> { ) .await? { - ExpireRefResult::RefIsExpired => panic!(), ExpireRefResult::NothingToDo => panic!(), - ExpireRefResult::Done { released_snapshots, .. } => { + ExpireRefResult::Done { released_snapshots, ref_is_expired, .. } => { + assert!(!ref_is_expired); assert_eq!(released_snapshots.len(), 4); } } @@ -367,9 +365,9 @@ pub async fn test_expire_ref() -> Result<(), Box> { ) .await? { - ExpireRefResult::RefIsExpired => panic!(), ExpireRefResult::NothingToDo => panic!(), - ExpireRefResult::Done { released_snapshots, .. } => { + ExpireRefResult::Done { released_snapshots, ref_is_expired, .. } => { + assert!(!ref_is_expired); assert_eq!(released_snapshots.len(), 5); } } @@ -402,9 +400,9 @@ pub async fn test_expire_ref() -> Result<(), Box> { ) .await? { - ExpireRefResult::RefIsExpired => panic!(), ExpireRefResult::NothingToDo => panic!(), - ExpireRefResult::Done { released_snapshots, .. } => { + ExpireRefResult::Done { released_snapshots, ref_is_expired, .. } => { + assert!(!ref_is_expired); assert_eq!(released_snapshots.len(), 5); } } @@ -432,8 +430,8 @@ pub async fn test_expire_ref() -> Result<(), Box> { } #[tokio::test] -pub async fn test_expire_ref_with_odd_timestamp() -> Result<(), Box> -{ +pub async fn test_expire_ref_with_odd_timestamps() +-> Result<(), Box> { let storage: Arc = new_in_memory_storage().await?; let storage_settings = storage.default_settings(); let repo = Repository::create(None, Arc::clone(&storage), HashMap::new()).await?; @@ -489,7 +487,16 @@ pub async fn test_expire_ref_with_odd_timestamp() -> Result<(), Box {} + ExpireRefResult::Done { ref_is_expired, .. } => { + assert!(ref_is_expired); + // create another repo to avoid caching issues + let repo = + Repository::open(None, Arc::clone(&storage), HashMap::new()).await?; + assert_eq!( + branch_commit_messages(&repo, "main").await, + Vec::from(["third", "Repository initialized"]) + ); + } _ => panic!(), } @@ -497,7 +504,7 @@ pub async fn test_expire_ref_with_odd_timestamp() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box> { + let storage: Arc = new_in_memory_storage().await?; + let storage_settings = storage.default_settings(); + let mut repo = Repository::create(None, Arc::clone(&storage), HashMap::new()).await?; + + let expire_older_than = make_design_doc_repo(&mut repo).await?; + + let asset_manager = Arc::new(AssetManager::new_no_cache( + storage.clone(), + storage_settings.clone(), + 1, + )); + + let result = expire( + storage.as_ref(), + &storage_settings, + asset_manager.clone(), + expire_older_than, + // This is different compared to the previous test + ExpiredRefAction::Delete, + ExpiredRefAction::Delete, + ) + .await?; + + assert_eq!(result.released_snapshots.len(), 7); + assert_eq!(result.deleted_refs.len(), 2); + + let now = Utc::now(); + let gc_config = GCConfig::clean_all(now, now, None); + let summary = garbage_collect( + storage.as_ref(), + &storage_settings, + asset_manager.clone(), + &gc_config, + ) + .await?; + + assert_eq!(summary.snapshots_deleted, 7); + assert_eq!(summary.transaction_logs_deleted, 7); + + // only the non expired snapshots left + assert_eq!(storage.list_snapshots(&storage_settings).await?.count().await, 8); + Ok(()) +}