Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 44 additions & 146 deletions icechunk/src/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use serde::{Deserialize, Serialize};
use crate::{
format::{
ChunkIndices, NodeId, Path,
manifest::{ChunkInfo, ChunkPayload, ManifestExtents, ManifestSplits, Overlap},
manifest::{ChunkInfo, ChunkPayload},
snapshot::{ArrayShape, DimensionName, NodeData, NodeSnapshot},
},
session::{SessionErrorKind, SessionResult, find_coord},
session::{SessionErrorKind, SessionResult},
};

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
Expand All @@ -25,15 +25,15 @@ pub struct ArrayData {
pub user_data: Bytes,
}

type SplitManifest = BTreeMap<ChunkIndices, Option<ChunkPayload>>;
type ChunkTable = BTreeMap<ChunkIndices, Option<ChunkPayload>>;

#[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)]
pub struct EditChanges {
new_groups: HashMap<Path, (NodeId, Bytes)>,
new_arrays: HashMap<Path, (NodeId, ArrayData)>,
updated_arrays: HashMap<NodeId, ArrayData>,
updated_groups: HashMap<NodeId, Bytes>,
set_chunks: BTreeMap<NodeId, HashMap<ManifestExtents, SplitManifest>>,
set_chunks: BTreeMap<NodeId, ChunkTable>,
// This map keeps track of any chunk deletes that are
// outside the domain of the current array shape. This is needed to handle
// the very unlikely case of multiple resizes in the same session.
Expand All @@ -60,15 +60,13 @@ impl EditChanges {
// FIXME: do we even test this?
self.deleted_chunks_outside_bounds.extend(other.deleted_chunks_outside_bounds);

other.set_chunks.into_iter().for_each(|(node, other_splits)| {
let manifests = self.set_chunks.entry(node).or_insert_with(|| {
HashMap::<ManifestExtents, SplitManifest>::with_capacity(
other_splits.len(),
)
});
other_splits.into_iter().for_each(|(extent, their_manifest)| {
manifests.entry(extent).or_default().extend(their_manifest)
})
other.set_chunks.into_iter().for_each(|(node, other_manifest)| {
match self.set_chunks.get_mut(&node) {
Some(manifest) => manifest.extend(other_manifest),
None => {
self.set_chunks.insert(node, other_manifest);
}
}
});
}
}
Expand Down Expand Up @@ -225,9 +223,20 @@ impl ChangeSet {
pub fn changed_chunks(
&self,
) -> impl Iterator<Item = (&NodeId, impl Iterator<Item = &ChunkIndices>)> {
self.edits().set_chunks.iter().map(|(node_id, split_map)| {
(node_id, split_map.values().flat_map(|x| x.keys()))
})
self.edits()
.set_chunks
.iter()
.map(|(node_id, manifest)| (node_id, manifest.keys()))
}

pub fn changed_node_chunks(
&self,
node_id: &NodeId,
) -> impl Iterator<Item = &ChunkIndices> {
match self.edits().set_chunks.get(node_id) {
Some(chunks) => Either::Left(chunks.keys()),
None => Either::Right(iter::empty()),
}
}

pub fn is_updated_array(&self, node: &NodeId) -> bool {
Expand Down Expand Up @@ -308,7 +317,6 @@ impl ChangeSet {
node_id: &NodeId,
path: &Path,
array_data: ArrayData,
new_splits: &ManifestSplits,
) -> SessionResult<()> {
let edits = self.edits_mut()?;
match edits.new_arrays.get(path) {
Expand All @@ -321,65 +329,6 @@ impl ChangeSet {
}
}

// update existing splits
let mut to_remove = HashSet::<ChunkIndices>::new();
if let Some(manifests) = edits.set_chunks.remove(node_id) {
let mut new_deleted_chunks = HashSet::<ChunkIndices>::new();
let mut new_manifests =
HashMap::<ManifestExtents, SplitManifest>::with_capacity(
new_splits.len(),
);
for (old_extents, mut chunks) in manifests.into_iter() {
for new_extents in new_splits.iter() {
if old_extents.overlap_with(new_extents) == Overlap::None {
continue;
}

// TODO: replace with `BTreeMap.drain_filter` after it is stable.
let mut extracted =
BTreeMap::<ChunkIndices, Option<ChunkPayload>>::new();
chunks.retain(|coord, payload| {
let cond = new_extents.contains(coord.0.as_slice());
if cond {
extracted.insert(coord.clone(), payload.clone());
}
!cond
});
new_manifests
.entry(new_extents.clone())
.or_default()
.extend(extracted);
}
new_deleted_chunks.extend(
chunks.into_iter().filter_map(|(coord, payload)| {
payload.is_none().then_some(coord)
}),
);
}

// bring back any previously tracked deletes
if let Some(deletes) = edits.deleted_chunks_outside_bounds.get_mut(node_id) {
for coord in deletes.iter() {
if let Some(extents) = new_splits.find(coord) {
new_manifests
.entry(extents.clone())
.or_default()
.insert(coord.clone(), None);
to_remove.insert(coord.clone());
};
}
deletes.retain(|item| !to_remove.contains(item));
to_remove.drain();
};
edits.set_chunks.insert(node_id.clone(), new_manifests);

// keep track of any deletes not inserted in to set_chunks
edits
.deleted_chunks_outside_bounds
.entry(node_id.clone())
.or_default()
.extend(new_deleted_chunks);
}
Ok(())
}

Expand Down Expand Up @@ -443,24 +392,10 @@ impl ChangeSet {
node_id: NodeId,
coord: ChunkIndices,
data: Option<ChunkPayload>,
splits: &ManifestSplits,
) -> SessionResult<()> {
#[allow(clippy::expect_used)]
let extent = splits.find(&coord).expect("logic bug. Trying to set chunk ref but can't find the appropriate split manifest.");
// this implementation makes delete idempotent
// it allows deleting a deleted chunk by repeatedly setting None.
self.edits_mut()?
.set_chunks
.entry(node_id)
.or_insert_with(|| {
HashMap::<
ManifestExtents,
BTreeMap<ChunkIndices, Option<ChunkPayload>>,
>::with_capacity(splits.len())
})
.entry(extent.clone())
.or_default()
.insert(coord, data);
self.edits_mut()?.set_chunks.entry(node_id).or_default().insert(coord, data);
Ok(())
}

Expand All @@ -469,11 +404,10 @@ impl ChangeSet {
node_id: &NodeId,
coords: &ChunkIndices,
) -> Option<&Option<ChunkPayload>> {
self.edits().set_chunks.get(node_id).and_then(|node_chunks| {
find_coord(node_chunks.keys(), coords).and_then(|(_, extent)| {
node_chunks.get(extent).and_then(|s| s.get(coords))
})
})
self.edits()
.set_chunks
.get(node_id)
.and_then(|node_chunks| node_chunks.get(coords))
}

/// Drop the updated chunk references for the node.
Expand All @@ -484,9 +418,7 @@ impl ChangeSet {
predicate: impl Fn(&ChunkIndices) -> bool,
) -> SessionResult<()> {
if let Some(changes) = self.edits_mut()?.set_chunks.get_mut(node_id) {
for split in changes.values_mut() {
split.retain(|coord, _| !predicate(coord));
}
changes.retain(|coord, _| !predicate(coord));
}
Ok(())
}
Expand All @@ -505,37 +437,30 @@ impl ChangeSet {
&self,
node_id: &NodeId,
node_path: &Path,
extent: ManifestExtents,
) -> impl Iterator<Item = (&ChunkIndices, &Option<ChunkPayload>)> + use<'_> {
if self.is_deleted(node_path, node_id) {
return Either::Left(iter::empty());
}
match self.edits().set_chunks.get(node_id) {
None => Either::Left(iter::empty()),
Some(h) => Either::Right(
h.iter()
.filter(move |(manifest_extent, _)| extent.matches(manifest_extent))
.flat_map(|(_, manifest)| manifest.iter()),
),
Some(manifest) => Either::Right(manifest.iter()),
}
}

pub fn new_arrays_chunk_iterator(
&self,
) -> impl Iterator<Item = (Path, ChunkInfo)> + use<'_> {
self.edits().new_arrays.iter().flat_map(|(path, (node_id, _))| {
self.new_array_chunk_iterator(node_id, path, ManifestExtents::ALL)
.map(|ci| (path.clone(), ci))
self.new_array_chunk_iterator(node_id, path).map(|ci| (path.clone(), ci))
})
}

pub fn new_array_chunk_iterator<'a>(
&'a self,
node_id: &'a NodeId,
node_path: &Path,
extent: ManifestExtents,
) -> impl Iterator<Item = ChunkInfo> + use<'a> {
self.array_chunks_iterator(node_id, node_path, extent).filter_map(
self.array_chunks_iterator(node_id, node_path).filter_map(
move |(coords, payload)| {
payload.as_ref().map(|p| ChunkInfo {
node: node_id.clone(),
Expand All @@ -546,38 +471,23 @@ impl ChangeSet {
)
}

pub fn modified_manifest_extents_iterator(
&self,
node_id: &NodeId,
node_path: &Path,
) -> impl Iterator<Item = &ManifestExtents> + use<'_> {
if self.is_deleted(node_path, node_id) {
return Either::Left(iter::empty());
}
match self.edits().set_chunks.get(node_id) {
None => Either::Left(iter::empty()),
Some(h) => Either::Right(h.keys()),
}
}

pub fn array_manifest(
&self,
node_id: &NodeId,
extent: &ManifestExtents,
) -> Option<&SplitManifest> {
self.edits().set_chunks.get(node_id).and_then(|x| x.get(extent))
pub fn array_manifest(&self, node_id: &NodeId) -> Option<&ChunkTable> {
self.edits().set_chunks.get(node_id)
}

pub fn new_nodes(&self) -> impl Iterator<Item = (&Path, &NodeId)> {
self.new_groups().chain(self.new_arrays())
self.new_groups().chain(self.new_arrays().map(|(path, id, _)| (path, id)))
}

pub fn new_groups(&self) -> impl Iterator<Item = (&Path, &NodeId)> {
self.edits().new_groups.iter().map(|(path, (node_id, _))| (path, node_id))
}

pub fn new_arrays(&self) -> impl Iterator<Item = (&Path, &NodeId)> {
self.edits().new_arrays.iter().map(|(path, (node_id, _))| (path, node_id))
pub fn new_arrays(&self) -> impl Iterator<Item = (&Path, &NodeId, &ArrayData)> {
self.edits()
.new_arrays
.iter()
.map(|(path, (node_id, node_data))| (path, node_id, node_data))
}

/// Merge this ChangeSet with `other`.
Expand Down Expand Up @@ -719,7 +629,7 @@ mod tests {
change_set::{ArrayData, MoveTracker},
format::{
ChunkIndices, NodeId, Path,
manifest::{ChunkInfo, ChunkPayload, ManifestSplits},
manifest::{ChunkInfo, ChunkPayload},
snapshot::ArrayShape,
},
};
Expand Down Expand Up @@ -751,41 +661,29 @@ mod tests {
)?;
assert_eq!(None, change_set.new_arrays_chunk_iterator().next());

let splits1 = ManifestSplits::from_edges(vec![vec![0, 10], vec![0, 10]]);

change_set.set_chunk_ref(
node_id1.clone(),
ChunkIndices(vec![0, 1]),
None,
&splits1,
)?;
change_set.set_chunk_ref(node_id1.clone(), ChunkIndices(vec![0, 1]), None)?;
assert_eq!(None, change_set.new_arrays_chunk_iterator().next());

change_set.set_chunk_ref(
node_id1.clone(),
ChunkIndices(vec![1, 0]),
Some(ChunkPayload::Inline("bar1".into())),
&splits1,
)?;
change_set.set_chunk_ref(
node_id1.clone(),
ChunkIndices(vec![1, 1]),
Some(ChunkPayload::Inline("bar2".into())),
&splits1,
)?;

let splits2 = ManifestSplits::from_edges(vec![vec![0, 10]]);
change_set.set_chunk_ref(
node_id2.clone(),
ChunkIndices(vec![0]),
Some(ChunkPayload::Inline("baz1".into())),
&splits2,
)?;
change_set.set_chunk_ref(
node_id2.clone(),
ChunkIndices(vec![1]),
Some(ChunkPayload::Inline("baz2".into())),
&splits2,
)?;

{
Expand Down
Loading