Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions icechunk/src/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ impl EditChanges {
})
});
}

// extract the `set_chunks` manifest and replace with an empty one
pub(crate) fn extract_manifest(
&mut self,
) -> BTreeMap<NodeId, HashMap<ManifestExtents, SplitManifest>> {
let mut fresh: BTreeMap<NodeId, HashMap<ManifestExtents, SplitManifest>> =
Default::default();
std::mem::swap(&mut self.set_chunks, &mut fresh);
fresh
}
}

pub static EMPTY_EDITS: LazyLock<EditChanges> = LazyLock::new(Default::default);
Expand Down Expand Up @@ -336,8 +346,7 @@ impl ChangeSet {
}

// TODO: replace with `BTreeMap.drain_filter` after it is stable.
let mut extracted =
BTreeMap::<ChunkIndices, Option<ChunkPayload>>::new();
let mut extracted = SplitManifest::new();
chunks.retain(|coord, payload| {
let cond = new_extents.contains(coord.0.as_slice());
if cond {
Expand Down Expand Up @@ -453,17 +462,35 @@ impl ChangeSet {
.set_chunks
.entry(node_id)
.or_insert_with(|| {
HashMap::<
ManifestExtents,
BTreeMap<ChunkIndices, Option<ChunkPayload>>,
>::with_capacity(splits.len())
HashMap::<ManifestExtents, SplitManifest>::with_capacity(splits.len())
})
.entry(extent.clone())
.or_default()
.insert(coord, data);
Ok(())
}

// extracts the chunk refs in `set_chunks`, and re-sets them using new `splits`.
pub fn replay_set_chunk_refs(
&mut self,
splits: &HashMap<NodeId, ManifestSplits>,
) -> SessionResult<()> {
let manifests = self.edits_mut()?.extract_manifest();
manifests
.into_iter()
.flat_map(|(node_id, manifest)| {
manifest.into_values().flatten().map(move |item| (node_id.clone(), item))
})
.try_for_each(|(node_id, (coord, data))| {
#[allow(clippy::expect_used)]
let node_splits = splits.get(&node_id).expect(
"logic bug! manifest splits not set when replaying set chunk refs.",
);
self.set_chunk_ref(node_id, coord, data, node_splits)
})?;
Ok(())
}

pub fn get_chunk_ref(
&self,
node_id: &NodeId,
Expand Down
6 changes: 3 additions & 3 deletions icechunk/src/conflicts/detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,14 @@ impl ConflictSolver for ConflictDetector {
}
}

struct PathFinder<It>(Mutex<(HashMap<NodeId, Path>, Option<It>)>);
pub(crate) struct PathFinder<It>(Mutex<(HashMap<NodeId, Path>, Option<It>)>);

impl<It: Iterator<Item = SessionResult<NodeSnapshot>>> PathFinder<It> {
fn new(iter: It) -> Self {
pub(crate) fn new(iter: It) -> Self {
Self(Mutex::new((HashMap::new(), Some(iter))))
}

fn find(&self, node_id: &NodeId) -> SessionResult<Path> {
pub(crate) fn find(&self, node_id: &NodeId) -> SessionResult<Path> {
// we can safely unwrap the result of `lock` because there is no failing code called while
// the mutex is hold. The mutex is there purely to support interior mutability
#![allow(clippy::expect_used)]
Expand Down
145 changes: 139 additions & 6 deletions icechunk/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
asset_manager::AssetManager,
change_set::{ArrayData, ChangeSet},
config::{ManifestSplitDim, ManifestSplitDimCondition, ManifestSplittingConfig},
conflicts::{Conflict, ConflictResolution, ConflictSolver},
conflicts::{Conflict, ConflictResolution, ConflictSolver, detector::PathFinder},
error::ICError,
format::{
ByteRange, ChunkIndices, ChunkOffset, IcechunkFormatError,
Expand Down Expand Up @@ -291,6 +291,8 @@ impl Session {
snapshot_id,
change_set: ChangeSet::for_edits(),
default_commit_metadata,
// Splits are populated for a node during
// `add_array`, `update_array`, and `set_chunk_ref`
splits: Default::default(),
}
}
Expand Down Expand Up @@ -1102,15 +1104,16 @@ impl Session {
if self.read_only() {
return Err(SessionErrorKind::ReadOnlySession.into());
}
let Session { splits: other_splits, change_set, .. } = other;
let Session { splits: other_splits, change_set, config: other_config, .. } =
other;

if self.splits.iter().any(|(node, our_splits)| {
other_splits
.get(node)
.is_some_and(|their_splits| !our_splits.compatible_with(their_splits))
}) {
let ours = self.config().manifest().splitting().clone();
let theirs = self.config().manifest().splitting().clone();
let theirs = other_config.manifest().splitting().clone();
Copy link
Contributor Author

@dcherian dcherian Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spotted this minor error while reading the code

return Err(
SessionErrorKind::IncompatibleSplittingConfig { ours, theirs }.into()
);
Expand Down Expand Up @@ -1445,15 +1448,47 @@ impl Session {
snap_id.clone(),
);

let finder = PathFinder::new(session.list_nodes(&Path::root()).await?);
let mut fresh = self.change_set().fresh();
std::mem::swap(self.change_set_mut()?, &mut fresh);
let change_set = fresh;
// TODO: this should probably execute in a worker thread

match solver.solve(&tx_log, &session, change_set, self).await? {
ConflictResolution::Patched(patched_changeset) => {
ConflictResolution::Patched(mut patched_changeset) => {
trace!("Snapshot rebased");
self.change_set = patched_changeset;

// important to set this here, so that the `get_array` below gets the
// *updated* node with updated shape and dimension_names
self.snapshot_id = snap_id;

let mut new_splits: HashMap<NodeId, ManifestSplits> = HashMap::new();
Copy link
Contributor Author

@dcherian dcherian Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered somehow using self.merge here but the splits update logic is different. This current logic does what I need it to do.

HOWEVER, ChangeSet has set_chunks: BTreeMap<NodeId, HashMap<ManifestExtents, SplitManifest>>, and those ManifestExtents are out of date. (I updated a debug_assert! to an assert! below to enforce this invariant at runtime). So I need to recreate this ChangeSet.

To me, that should be the responsibility of the ConflictSolver. But to do that, I need to call get_split_sizes exactly as below to get updated splits yet I don't have access to NodePath exactly as in here.

  • Should PathFinder be a public utility? That seems wrong, but this NodeId vs NodePath issue seems irreversibly exposed to outside users.
  • we could pass splits from previous_repo: Session to the solver; but we are careful to only set them for arrays with chunk changes I believe, so it's not necessarily complete enough to cover changes in current_repo: Session
  • we could make the Changeset consistent here; but that seems yucky to me (the user has provided us a "fixed" changeset, we should not modify it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed my mind; this whole thing seems too hard to punt to a user so now rebase remakes the set_chunks HashMap with the same chunk refs and (possibly new) splits.

// we have rebased on top of `snap_id`, now we update splits to match the config in that snap.
// `splits` would have changed if an array was resized. `flush` uses `splits` so if left unupdated,
// we run the risk of losing data.
for node_id in patched_changeset.arrays_with_chunk_changes() {
let node_path = finder.find(node_id)?;
// by now the conflict solver has solved any needed conflicts,
// we grab the *updated* node to get the *updated* metadata (shape, dimension_names)
if let NodeSnapshot {
node_data: NodeData::Array { shape, dimension_names, .. },
..
} = self.get_array(&node_path).await?
{
let new_size = session
.config()
.manifest()
.splitting()
.get_split_sizes(&node_path, &shape, &dimension_names);
new_splits.insert(node_id.clone(), new_size);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if both changes updated the splits?

};
}
// now we update the changeset to be consistent with these new splits
patched_changeset.replay_set_chunk_refs(&new_splits)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is expensive, can we do it only for the arrays that had a change to their splits?


self.change_set = patched_changeset;
// important to keep this consistent since it is used in flush
self.splits = new_splits;
}
ConflictResolution::Unsolvable { reason, unmodified } => {
warn!("Snapshot cannot be rebased. Aborting rebase.");
Expand Down Expand Up @@ -2108,7 +2143,9 @@ impl<'a> FlushProcess<'a> {

// ``modified_splits`` (i.e. splits used in this session)
// must be a subset of ``splits`` (the splits set in the config)
debug_assert!(modified_splits.is_subset(&splits.iter().collect::<HashSet<_>>()));
// This is not a debug_assert! because custom conflict solvers might
// modify a changeset and break this invariant.
assert!(modified_splits.is_subset(&splits.iter().collect::<HashSet<_>>()));

for extent in splits.iter() {
if rewrite_manifests || modified_splits.contains(extent) {
Expand Down Expand Up @@ -2736,6 +2773,7 @@ mod tests {
};

use super::*;
use async_trait::async_trait;
use icechunk_macros::tokio_test;
use itertools::{Itertools, assert_equal};
use pretty_assertions::assert_eq;
Expand Down Expand Up @@ -5194,6 +5232,101 @@ mod tests {
Ok(())
}

#[icechunk_macros::tokio_test]
// Test rebase over a commit with a resize.
//
// Error-triggering flow:
// 1. Session A starts from snapshot S1 (array shape [5, 1])
// 2. Splits are cached for the array with extent [0..5)
// 3. Session A writes chunk [3]
// 4. Meanwhile, another session resizes array to [20, 1] and writes chunk [10]
// 5. Session A rebases to the new snapshot
// 6. BUG: Session A's splits are NOT updated (still [0..5))
// 7. Session A commits - during flush, verified_node_chunk_iterator filters
// parent chunks by extent, dropping chunk [10] because it's outside [0..5)
// 8. Result: chunk [10] is lost
//
// The fix requires updating Session.splits during rebase to match the new
// parent snapshot's array shapes.
async fn test_rebase_over_resize() -> Result<(), Box<dyn Error>> {
struct YoloSolver;
#[async_trait]
impl ConflictSolver for YoloSolver {
async fn solve(
&self,
_previous_change: &TransactionLog,
_previous_repo: &Session,
current_changes: ChangeSet,
_sccurrent_repo: &Session,
) -> SessionResult<ConflictResolution> {
Ok(ConflictResolution::Patched(current_changes))
}
}

let repo = get_repo_for_conflict().await?;

let mut ds1 = repo.writable_session("main").await?;
let mut ds2 = repo.writable_session("main").await?;

let path: Path = "/foo/bar/some-array".try_into().unwrap();
ds1.set_chunk_ref(
path.clone(),
ChunkIndices(vec![1]),
Some(ChunkPayload::Inline("repo 1".into())),
)
.await?;
ds1.commit("writer 1 updated non-conflict chunk", None).await?;

let mut ds1 = repo.writable_session("main").await?;
ds1.update_array(
&path,
ArrayShape::new(vec![(20, 1)]).unwrap(),
None,
Bytes::new(),
)
.await?;
// Write a chunk beyond the original extent [0..5) to trigger the bug
ds1.set_chunk_ref(
path.clone(),
ChunkIndices(vec![10]),
Some(ChunkPayload::Inline("repo 1 chunk 10".into())),
)
.await?;
ds1.commit("writer 1 updates array size and adds chunk 10", None).await?;

// now set a chunk ref that is valid with both old and new shape.
ds2.set_chunk_ref(
path.clone(),
ChunkIndices(vec![3]),
Some(ChunkPayload::Inline("repo 2".into())),
)
.await?;
ds2.commit_rebasing(
&YoloSolver,
1u16,
"writer 2 writes chunk 0",
None,
async |_| {},
async |_| {},
)
.await?;

let ds3 = repo.writable_session("main").await?;
// All three chunks should be present: [1] and [10] from ds1, [3] from ds2
for i in [1u32, 3, 10] {
assert!(
get_chunk(
ds3.get_chunk_reader(&path, &ChunkIndices(vec![i]), &ByteRange::ALL,)
.await?
)
.await?
.is_some(),
"chunk [{i}] should be present"
);
}
Ok(())
}

#[tokio_test]
/// Tests `commit_rebasing` retries the proper number of times when there are conflicts
async fn test_commit_rebasing_attempts() -> Result<(), Box<dyn Error>> {
Expand Down
Loading