diff --git a/icechunk/src/change_set.rs b/icechunk/src/change_set.rs index 6958ed09e..bf5194cc2 100644 --- a/icechunk/src/change_set.rs +++ b/icechunk/src/change_set.rs @@ -71,6 +71,16 @@ impl EditChanges { }) }); } + + // extract the `set_chunks` manifest and replace with an empty one + pub(crate) fn extract_manifest( + &mut self, + ) -> BTreeMap> { + let mut fresh: BTreeMap> = + Default::default(); + std::mem::swap(&mut self.set_chunks, &mut fresh); + fresh + } } pub static EMPTY_EDITS: LazyLock = LazyLock::new(Default::default); @@ -336,8 +346,7 @@ impl ChangeSet { } // TODO: replace with `BTreeMap.drain_filter` after it is stable. - let mut extracted = - BTreeMap::>::new(); + let mut extracted = SplitManifest::new(); chunks.retain(|coord, payload| { let cond = new_extents.contains(coord.0.as_slice()); if cond { @@ -453,10 +462,7 @@ impl ChangeSet { .set_chunks .entry(node_id) .or_insert_with(|| { - HashMap::< - ManifestExtents, - BTreeMap>, - >::with_capacity(splits.len()) + HashMap::::with_capacity(splits.len()) }) .entry(extent.clone()) .or_default() @@ -464,6 +470,27 @@ impl ChangeSet { Ok(()) } + // extracts the chunk refs in `set_chunks`, and re-sets them using new `splits`. + pub fn replay_set_chunk_refs( + &mut self, + splits: &HashMap, + ) -> SessionResult<()> { + let manifests = self.edits_mut()?.extract_manifest(); + manifests + .into_iter() + .flat_map(|(node_id, manifest)| { + manifest.into_values().flatten().map(move |item| (node_id.clone(), item)) + }) + .try_for_each(|(node_id, (coord, data))| { + #[allow(clippy::expect_used)] + let node_splits = splits.get(&node_id).expect( + "logic bug! manifest splits not set when replaying set chunk refs.", + ); + self.set_chunk_ref(node_id, coord, data, node_splits) + })?; + Ok(()) + } + pub fn get_chunk_ref( &self, node_id: &NodeId, diff --git a/icechunk/src/conflicts/detector.rs b/icechunk/src/conflicts/detector.rs index db021640b..556f1f7e3 100644 --- a/icechunk/src/conflicts/detector.rs +++ b/icechunk/src/conflicts/detector.rs @@ -253,14 +253,14 @@ impl ConflictSolver for ConflictDetector { } } -struct PathFinder(Mutex<(HashMap, Option)>); +pub(crate) struct PathFinder(Mutex<(HashMap, Option)>); impl>> PathFinder { - fn new(iter: It) -> Self { + pub(crate) fn new(iter: It) -> Self { Self(Mutex::new((HashMap::new(), Some(iter)))) } - fn find(&self, node_id: &NodeId) -> SessionResult { + pub(crate) fn find(&self, node_id: &NodeId) -> SessionResult { // we can safely unwrap the result of `lock` because there is no failing code called while // the mutex is hold. The mutex is there purely to support interior mutability #![allow(clippy::expect_used)] diff --git a/icechunk/src/session.rs b/icechunk/src/session.rs index e91ab2e9a..9420fd865 100644 --- a/icechunk/src/session.rs +++ b/icechunk/src/session.rs @@ -28,7 +28,7 @@ use crate::{ asset_manager::AssetManager, change_set::{ArrayData, ChangeSet}, config::{ManifestSplitDim, ManifestSplitDimCondition, ManifestSplittingConfig}, - conflicts::{Conflict, ConflictResolution, ConflictSolver}, + conflicts::{Conflict, ConflictResolution, ConflictSolver, detector::PathFinder}, error::ICError, format::{ ByteRange, ChunkIndices, ChunkOffset, IcechunkFormatError, @@ -291,6 +291,8 @@ impl Session { snapshot_id, change_set: ChangeSet::for_edits(), default_commit_metadata, + // Splits are populated for a node during + // `add_array`, `update_array`, and `set_chunk_ref` splits: Default::default(), } } @@ -1102,7 +1104,8 @@ impl Session { if self.read_only() { return Err(SessionErrorKind::ReadOnlySession.into()); } - let Session { splits: other_splits, change_set, .. } = other; + let Session { splits: other_splits, change_set, config: other_config, .. } = + other; if self.splits.iter().any(|(node, our_splits)| { other_splits @@ -1110,7 +1113,7 @@ impl Session { .is_some_and(|their_splits| !our_splits.compatible_with(their_splits)) }) { let ours = self.config().manifest().splitting().clone(); - let theirs = self.config().manifest().splitting().clone(); + let theirs = other_config.manifest().splitting().clone(); return Err( SessionErrorKind::IncompatibleSplittingConfig { ours, theirs }.into() ); @@ -1445,15 +1448,47 @@ impl Session { snap_id.clone(), ); + let finder = PathFinder::new(session.list_nodes(&Path::root()).await?); let mut fresh = self.change_set().fresh(); std::mem::swap(self.change_set_mut()?, &mut fresh); let change_set = fresh; // TODO: this should probably execute in a worker thread + match solver.solve(&tx_log, &session, change_set, self).await? { - ConflictResolution::Patched(patched_changeset) => { + ConflictResolution::Patched(mut patched_changeset) => { trace!("Snapshot rebased"); - self.change_set = patched_changeset; + + // important to set this here, so that the `get_array` below gets the + // *updated* node with updated shape and dimension_names self.snapshot_id = snap_id; + + let mut new_splits: HashMap = HashMap::new(); + // we have rebased on top of `snap_id`, now we update splits to match the config in that snap. + // `splits` would have changed if an array was resized. `flush` uses `splits` so if left unupdated, + // we run the risk of losing data. + for node_id in patched_changeset.arrays_with_chunk_changes() { + let node_path = finder.find(node_id)?; + // by now the conflict solver has solved any needed conflicts, + // we grab the *updated* node to get the *updated* metadata (shape, dimension_names) + if let NodeSnapshot { + node_data: NodeData::Array { shape, dimension_names, .. }, + .. + } = self.get_array(&node_path).await? + { + let new_size = session + .config() + .manifest() + .splitting() + .get_split_sizes(&node_path, &shape, &dimension_names); + new_splits.insert(node_id.clone(), new_size); + }; + } + // now we update the changeset to be consistent with these new splits + patched_changeset.replay_set_chunk_refs(&new_splits)?; + + self.change_set = patched_changeset; + // important to keep this consistent since it is used in flush + self.splits = new_splits; } ConflictResolution::Unsolvable { reason, unmodified } => { warn!("Snapshot cannot be rebased. Aborting rebase."); @@ -2108,7 +2143,9 @@ impl<'a> FlushProcess<'a> { // ``modified_splits`` (i.e. splits used in this session) // must be a subset of ``splits`` (the splits set in the config) - debug_assert!(modified_splits.is_subset(&splits.iter().collect::>())); + // This is not a debug_assert! because custom conflict solvers might + // modify a changeset and break this invariant. + assert!(modified_splits.is_subset(&splits.iter().collect::>())); for extent in splits.iter() { if rewrite_manifests || modified_splits.contains(extent) { @@ -2736,6 +2773,7 @@ mod tests { }; use super::*; + use async_trait::async_trait; use icechunk_macros::tokio_test; use itertools::{Itertools, assert_equal}; use pretty_assertions::assert_eq; @@ -5194,6 +5232,101 @@ mod tests { Ok(()) } + #[icechunk_macros::tokio_test] + // Test rebase over a commit with a resize. + // + // Error-triggering flow: + // 1. Session A starts from snapshot S1 (array shape [5, 1]) + // 2. Splits are cached for the array with extent [0..5) + // 3. Session A writes chunk [3] + // 4. Meanwhile, another session resizes array to [20, 1] and writes chunk [10] + // 5. Session A rebases to the new snapshot + // 6. BUG: Session A's splits are NOT updated (still [0..5)) + // 7. Session A commits - during flush, verified_node_chunk_iterator filters + // parent chunks by extent, dropping chunk [10] because it's outside [0..5) + // 8. Result: chunk [10] is lost + // + // The fix requires updating Session.splits during rebase to match the new + // parent snapshot's array shapes. + async fn test_rebase_over_resize() -> Result<(), Box> { + struct YoloSolver; + #[async_trait] + impl ConflictSolver for YoloSolver { + async fn solve( + &self, + _previous_change: &TransactionLog, + _previous_repo: &Session, + current_changes: ChangeSet, + _sccurrent_repo: &Session, + ) -> SessionResult { + Ok(ConflictResolution::Patched(current_changes)) + } + } + + let repo = get_repo_for_conflict().await?; + + let mut ds1 = repo.writable_session("main").await?; + let mut ds2 = repo.writable_session("main").await?; + + let path: Path = "/foo/bar/some-array".try_into().unwrap(); + ds1.set_chunk_ref( + path.clone(), + ChunkIndices(vec![1]), + Some(ChunkPayload::Inline("repo 1".into())), + ) + .await?; + ds1.commit("writer 1 updated non-conflict chunk", None).await?; + + let mut ds1 = repo.writable_session("main").await?; + ds1.update_array( + &path, + ArrayShape::new(vec![(20, 1)]).unwrap(), + None, + Bytes::new(), + ) + .await?; + // Write a chunk beyond the original extent [0..5) to trigger the bug + ds1.set_chunk_ref( + path.clone(), + ChunkIndices(vec![10]), + Some(ChunkPayload::Inline("repo 1 chunk 10".into())), + ) + .await?; + ds1.commit("writer 1 updates array size and adds chunk 10", None).await?; + + // now set a chunk ref that is valid with both old and new shape. + ds2.set_chunk_ref( + path.clone(), + ChunkIndices(vec![3]), + Some(ChunkPayload::Inline("repo 2".into())), + ) + .await?; + ds2.commit_rebasing( + &YoloSolver, + 1u16, + "writer 2 writes chunk 0", + None, + async |_| {}, + async |_| {}, + ) + .await?; + + let ds3 = repo.writable_session("main").await?; + // All three chunks should be present: [1] and [10] from ds1, [3] from ds2 + for i in [1u32, 3, 10] { + assert!( + get_chunk( + ds3.get_chunk_reader(&path, &ChunkIndices(vec![i]), &ByteRange::ALL,) + .await? + ) + .await? + .is_some(), + "chunk [{i}] should be present" + ); + } + Ok(()) + } + #[tokio_test] /// Tests `commit_rebasing` retries the proper number of times when there are conflicts async fn test_commit_rebasing_attempts() -> Result<(), Box> {