diff --git a/icechunk-python/python/icechunk/_icechunk_python.pyi b/icechunk-python/python/icechunk/_icechunk_python.pyi index d7cba78e2..318914976 100644 --- a/icechunk-python/python/icechunk/_icechunk_python.pyi +++ b/icechunk-python/python/icechunk/_icechunk_python.pyi @@ -1796,6 +1796,20 @@ class PyRepository: max_compressed_manifest_mem_bytes: int = 512 * 1024 * 1024, max_concurrent_manifest_fetches: int = 500, ) -> GCSummary: ... + def chunk_storage_stats( + self, + *, + max_snapshots_in_memory: int = 50, + max_compressed_manifest_mem_bytes: int = 512 * 1024 * 1024, + max_concurrent_manifest_fetches: int = 500, + ) -> ChunkStorageStats: ... + async def chunk_storage_stats_async( + self, + *, + max_snapshots_in_memory: int = 50, + max_compressed_manifest_mem_bytes: int = 512 * 1024 * 1024, + max_concurrent_manifest_fetches: int = 500, + ) -> ChunkStorageStats: ... def total_chunks_storage( self, *, @@ -2635,3 +2649,52 @@ def _upgrade_icechunk_repository( large version history (thousands of snapshots). """ ... + +class ChunkStorageStats: + """Statistics about chunk storage across different chunk types.""" + + @property + def native_bytes(self) -> int: + """Total bytes stored in native chunks (stored in icechunk's chunk storage)""" + ... + + @property + def virtual_bytes(self) -> int: + """Total bytes stored in virtual chunks (references to external data)""" + ... + + @property + def inlined_bytes(self) -> int: + """Total bytes stored in inline chunks (stored directly in manifests)""" + ... + + @property + def non_virtual_bytes(self) -> int: + """ + Total bytes excluding virtual chunks. + + This represents the approximate size of all objects stored in the + icechunk repository itself (native chunks plus inline chunks). + Virtual chunks are not included since they reference external data. + + Returns: + int: The sum of native_bytes and inlined_bytes + """ + ... + + @property + def total_bytes(self) -> int: + """ + Total bytes across all chunk types. + + Returns the sum of native_bytes, virtual_bytes, and inlined_bytes. + This represents the total size of all data referenced by the repository, + including both data stored in icechunk and external virtual references. + + Returns: + int: The sum of all chunk storage bytes + """ + ... + + def __repr__(self) -> str: ... + def __add__(self, other: ChunkStorageStats) -> ChunkStorageStats: ... diff --git a/icechunk-python/python/icechunk/repository.py b/icechunk-python/python/icechunk/repository.py index 9d0c02456..a408e0326 100644 --- a/icechunk-python/python/icechunk/repository.py +++ b/icechunk-python/python/icechunk/repository.py @@ -1,10 +1,12 @@ import datetime +import warnings from collections.abc import AsyncIterator, Iterator from contextlib import contextmanager from typing import Any, Self, cast from icechunk import ConflictSolver from icechunk._icechunk_python import ( + ChunkStorageStats, Diff, GCSummary, ManifestFileInfo, @@ -1582,6 +1584,72 @@ async def garbage_collect_async( max_concurrent_manifest_fetches=max_concurrent_manifest_fetches, ) + def chunk_storage_stats( + self, + *, + max_snapshots_in_memory: int = 50, + max_compressed_manifest_mem_bytes: int = 512 * 1024 * 1024, + max_concurrent_manifest_fetches: int = 500, + ) -> ChunkStorageStats: + """Calculate the total storage used for chunks, in bytes. + + It reports the storage needed to store all snapshots in the repository that + are reachable from any branches or tags. Unreachable snapshots can be generated + by using `reset_branch` or `expire_snapshots`. The chunks for these snapshots + are not included in the result, and they should probably be deleted using + `garbage_collection`. + + The result is a dataclass with attributes for storage consumed by different + types of chunks (e.g. `native_bytes`, `virtual_bytes`, `total_bytes`). + + Parameters + ---------- + max_snapshots_in_memory: int + Don't prefetch more than this many Snapshots to memory. + max_compressed_manifest_mem_bytes : int + Don't use more than this memory to store compressed in-flight manifests. + max_concurrent_manifest_fetches : int + Don't run more than this many concurrent manifest fetches. + """ + return self._repository.chunk_storage_stats( + max_snapshots_in_memory=max_snapshots_in_memory, + max_compressed_manifest_mem_bytes=max_compressed_manifest_mem_bytes, + max_concurrent_manifest_fetches=max_concurrent_manifest_fetches, + ) + + async def chunk_storage_stats_async( + self, + *, + max_snapshots_in_memory: int = 50, + max_compressed_manifest_mem_bytes: int = 512 * 1024 * 1024, + max_concurrent_manifest_fetches: int = 500, + ) -> ChunkStorageStats: + """Calculate the total storage used for chunks, in bytes (async version). + + It reports the storage needed to store all snapshots in the repository that + are reachable from any branches or tags. Unreachable snapshots can be generated + by using `reset_branch` or `expire_snapshots`. The chunks for these snapshots + are not included in the result, and they should probably be deleted using + `garbage_collection`. + + The result is a dataclass with attributes for storage consumed by different + types of chunks (e.g. `native_bytes`, `virtual_bytes`, `total_bytes`). + + Parameters + ---------- + max_snapshots_in_memory: int + Don't prefetch more than this many Snapshots to memory. + max_compressed_manifest_mem_bytes : int + Don't use more than this memory to store compressed in-flight manifests. + max_concurrent_manifest_fetches : int + Don't run more than this many concurrent manifest fetches. + """ + return await self._repository.chunk_storage_stats_async( + max_snapshots_in_memory=max_snapshots_in_memory, + max_compressed_manifest_mem_bytes=max_compressed_manifest_mem_bytes, + max_concurrent_manifest_fetches=max_concurrent_manifest_fetches, + ) + def total_chunks_storage( self, *, @@ -1589,7 +1657,7 @@ def total_chunks_storage( max_compressed_manifest_mem_bytes: int = 512 * 1024 * 1024, max_concurrent_manifest_fetches: int = 500, ) -> int: - """Calculate the total storage used for chunks, in bytes . + """Calculate the total storage used for chunks, in bytes. It reports the storage needed to store all snapshots in the repository that are reachable from any branches or tags. Unreachable snapshots can be generated @@ -1609,11 +1677,21 @@ def total_chunks_storage( Don't run more than this many concurrent manifest fetches. """ - return self._repository.total_chunks_storage( + warnings.warn( + "The ``total_chunks_storage`` method has been deprecated in favour of the ``chunk_storage_stats`` method. " + "The new method is superior, as it actually calculates storage size occupied by inlined and virtual chunks in addition to native chunks. " + "You can still access just the total native bytes: to keep your existing behaviour using API that will not be removed in a future version, " + "please replace your existing ``.total_chunks_storage(**kwargs)`` method call with ``.chunk_storage_stats(**same_kwargs).native_bytes``.", + DeprecationWarning, + stacklevel=2, + ) + + stats = self._repository.chunk_storage_stats( max_snapshots_in_memory=max_snapshots_in_memory, max_compressed_manifest_mem_bytes=max_compressed_manifest_mem_bytes, max_concurrent_manifest_fetches=max_concurrent_manifest_fetches, ) + return stats.native_bytes async def total_chunks_storage_async( self, @@ -1642,11 +1720,21 @@ async def total_chunks_storage_async( Don't run more than this many concurrent manifest fetches. """ - return await self._repository.total_chunks_storage_async( + warnings.warn( + "The ``total_chunks_storage_async`` method has been deprecated in favour of the ``chunk_storage_stats_async`` method. " + "The new method is superior, as it actually calculates storage size occupied by inlined and virtual chunks in addition to native chunks. " + "You can still access just the total native bytes: to keep your existing behaviour using API that will not be removed in a future version, " + "please replace your existing ``.total_chunks_storage_async(**kwargs)`` method call with ``.chunk_storage_stats_async(**same_kwargs).native_bytes``.", + DeprecationWarning, + stacklevel=2, + ) + + stats = await self._repository.chunk_storage_stats_async( max_snapshots_in_memory=max_snapshots_in_memory, max_compressed_manifest_mem_bytes=max_compressed_manifest_mem_bytes, max_concurrent_manifest_fetches=max_concurrent_manifest_fetches, ) + return stats.native_bytes def inspect_snapshot(self, snapshot_id: str, *, pretty: bool = True) -> str: return self._repository.inspect_snapshot(snapshot_id, pretty=pretty) diff --git a/icechunk-python/src/lib.rs b/icechunk-python/src/lib.rs index 9db085d9c..d2f031de9 100644 --- a/icechunk-python/src/lib.rs +++ b/icechunk-python/src/lib.rs @@ -4,6 +4,7 @@ mod errors; mod pickle; mod repository; mod session; +mod stats; mod store; mod streams; @@ -39,6 +40,7 @@ use repository::{ PyTagDeletedUpdate, PyUpdateType, }; use session::PySession; +use stats::PyChunkStorageStats; use store::{PyStore, VirtualChunkSpec}; #[cfg(feature = "cli")] @@ -167,6 +169,7 @@ fn _icechunk_python(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/icechunk-python/src/repository.rs b/icechunk-python/src/repository.rs index 1f4f1b2ca..e50b1a6b2 100644 --- a/icechunk-python/src/repository.rs +++ b/icechunk-python/src/repository.rs @@ -45,6 +45,7 @@ use crate::{ errors::PyIcechunkStoreError, impl_pickle, session::PySession, + stats::PyChunkStorageStats, streams::PyAsyncGenerator, }; @@ -2215,22 +2216,22 @@ impl PyRepository { }) } - pub(crate) fn total_chunks_storage( + pub(crate) fn chunk_storage_stats( &self, py: Python<'_>, max_snapshots_in_memory: NonZeroU16, max_compressed_manifest_mem_bytes: NonZeroUsize, max_concurrent_manifest_fetches: NonZeroU16, - ) -> PyResult { + ) -> PyResult { // This function calls block_on, so we need to allow other thread python to make progress py.detach(move || { - let result = + let stats = pyo3_async_runtimes::tokio::get_runtime().block_on(async move { let asset_manager = { let lock = self.0.read().await; Arc::clone(lock.asset_manager()) }; - let result = repo_chunks_storage( + let stats = repo_chunks_storage( asset_manager, max_snapshots_in_memory, max_compressed_manifest_mem_bytes, @@ -2238,14 +2239,14 @@ impl PyRepository { ) .await .map_err(PyIcechunkStoreError::RepositoryError)?; - Ok::<_, PyIcechunkStoreError>(result) + Ok::<_, PyIcechunkStoreError>(stats) })?; - Ok(result) + Ok(stats.into()) }) } - fn total_chunks_storage_async<'py>( + pub(crate) fn chunk_storage_stats_async<'py>( &'py self, py: Python<'py>, max_snapshots_in_memory: NonZeroU16, @@ -2253,21 +2254,24 @@ impl PyRepository { max_concurrent_manifest_fetches: NonZeroU16, ) -> PyResult> { let repository = self.0.clone(); - pyo3_async_runtimes::tokio::future_into_py::<_, u64>(py, async move { - let asset_manager = { - let lock = repository.read().await; - Arc::clone(lock.asset_manager()) - }; - let result = repo_chunks_storage( - asset_manager, - max_snapshots_in_memory, - max_compressed_manifest_mem_bytes, - max_concurrent_manifest_fetches, - ) - .await - .map_err(PyIcechunkStoreError::RepositoryError)?; - Ok(result) - }) + pyo3_async_runtimes::tokio::future_into_py::<_, PyChunkStorageStats>( + py, + async move { + let asset_manager = { + let lock = repository.read().await; + Arc::clone(lock.asset_manager()) + }; + let stats = repo_chunks_storage( + asset_manager, + max_snapshots_in_memory, + max_compressed_manifest_mem_bytes, + max_concurrent_manifest_fetches, + ) + .await + .map_err(PyIcechunkStoreError::RepositoryError)?; + Ok(stats.into()) + }, + ) } #[pyo3(signature = (snapshot_id, *, pretty = true))] diff --git a/icechunk-python/src/stats.rs b/icechunk-python/src/stats.rs new file mode 100644 index 000000000..144674574 --- /dev/null +++ b/icechunk-python/src/stats.rs @@ -0,0 +1,72 @@ +use pyo3::{pyclass, pymethods}; + +use icechunk::ops::stats::ChunkStorageStats; + +/// Statistics about chunk storage across different chunk types. +#[pyclass(name = "ChunkStorageStats")] +#[derive(Clone, Debug)] +pub(crate) struct PyChunkStorageStats { + inner: ChunkStorageStats, +} + +impl From for PyChunkStorageStats { + fn from(stats: ChunkStorageStats) -> Self { + Self { inner: stats } + } +} + +#[pymethods] +impl PyChunkStorageStats { + /// Total bytes stored in native chunks (stored in icechunk's chunk storage) + #[getter] + pub(crate) fn native_bytes(&self) -> u64 { + self.inner.native_bytes + } + + /// Total bytes stored in virtual chunks (references to external data) + #[getter] + pub(crate) fn virtual_bytes(&self) -> u64 { + self.inner.virtual_bytes + } + + /// Total bytes stored in inline chunks (stored directly in manifests) + #[getter] + pub(crate) fn inlined_bytes(&self) -> u64 { + self.inner.inlined_bytes + } + + /// Total bytes excluding virtual chunks. + /// + /// This represents the approximate size of all objects stored in the + /// icechunk repository itself (native chunks plus inline chunks). + /// Virtual chunks are not included since they reference external data. + /// + /// Returns: + /// int: The sum of native_bytes and inlined_bytes + pub(crate) fn non_virtual_bytes(&self) -> u64 { + self.inner.non_virtual_bytes() + } + + /// Total bytes across all chunk types. + /// + /// Returns the sum of native_bytes, virtual_bytes, and inlined_bytes. + /// This represents the total size of all data referenced by the repository, + /// including both data stored in icechunk and external virtual references. + /// + /// Returns: + /// int: The sum of all chunk storage bytes + pub(crate) fn total_bytes(&self) -> u64 { + self.inner.total_bytes() + } + + pub(crate) fn __repr__(&self) -> String { + format!( + "ChunkStorageStats(native_bytes={}, virtual_bytes={}, inlined_bytes={})", + self.inner.native_bytes, self.inner.virtual_bytes, self.inner.inlined_bytes + ) + } + + pub(crate) fn __add__(&self, other: &PyChunkStorageStats) -> PyChunkStorageStats { + PyChunkStorageStats { inner: self.inner + other.inner } + } +} diff --git a/icechunk-python/tests/test_stats.py b/icechunk-python/tests/test_stats.py index 29ae17b6f..9af9cf92b 100644 --- a/icechunk-python/tests/test_stats.py +++ b/icechunk-python/tests/test_stats.py @@ -30,7 +30,8 @@ def test_total_chunks_storage(any_spec_version: int | None) -> None: array[:] = 42 session.commit("commit 1") - assert repo.total_chunks_storage() == 100 * 4 + with pytest.warns(DeprecationWarning, match="total_chunks_storage"): + assert repo.total_chunks_storage() == 100 * 4 @pytest.mark.filterwarnings("ignore:datetime.datetime.utcnow") @@ -57,7 +58,8 @@ async def test_total_chunks_storage_async(any_spec_version: int | None) -> None: array[:] = 42 await session.commit_async("commit 1") - assert await repo.total_chunks_storage_async() == 100 * 4 + with pytest.warns(DeprecationWarning, match="total_chunks_storage_async"): + assert await repo.total_chunks_storage_async() == 100 * 4 @pytest.mark.parametrize( @@ -67,7 +69,8 @@ def test_chunk_storage_on_filesystem(dir: str) -> None: repo = ic.Repository.open( storage=ic.local_filesystem_storage(dir), ) - actual = repo.total_chunks_storage() + with pytest.warns(DeprecationWarning, match="total_chunks_storage"): + actual = repo.total_chunks_storage() expected = sum( f.stat().st_size for f in (Path(dir) / "chunks").glob("*") if f.is_file() ) diff --git a/icechunk/src/ops/stats.rs b/icechunk/src/ops/stats.rs index c04b2ddc1..b613c650e 100644 --- a/icechunk/src/ops/stats.rs +++ b/icechunk/src/ops/stats.rs @@ -2,6 +2,7 @@ use futures::{StreamExt, TryStream, TryStreamExt, future::ready, stream}; use std::{ collections::HashSet, num::{NonZeroU16, NonZeroUsize}, + ops::Add, sync::{Arc, Mutex}, }; use tokio::task; @@ -10,8 +11,8 @@ use tracing::trace; use crate::{ asset_manager::AssetManager, format::{ - ChunkId, SnapshotId, - manifest::{ChunkPayload, Manifest}, + ChunkId, ChunkLength, ChunkOffset, SnapshotId, + manifest::{ChunkPayload, Manifest, VirtualChunkLocation}, snapshot::ManifestFileInfo, }, ops::pointed_snapshots, @@ -19,28 +20,118 @@ use crate::{ stream_utils::{StreamLimiter, try_unique_stream}, }; +/// Statistics about chunk storage across different chunk types +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub struct ChunkStorageStats { + /// Total bytes stored in native chunks (stored in icechunk's chunk storage) + pub native_bytes: u64, + /// Total bytes stored in virtual chunks (references to external data) + pub virtual_bytes: u64, + /// Total bytes stored in inline chunks (stored directly in manifests) + pub inlined_bytes: u64, +} + +impl ChunkStorageStats { + /// Create a new ChunkStorageStats with the specified byte counts + pub fn new(native_bytes: u64, virtual_bytes: u64, inlined_bytes: u64) -> Self { + Self { native_bytes, virtual_bytes, inlined_bytes } + } + + /// Get the total bytes excluding virtual chunks (this is ~= to the size of all objects in the icechunk repo) + pub fn non_virtual_bytes(&self) -> u64 { + self.native_bytes.saturating_add(self.inlined_bytes) + } + + /// Get the total bytes across all chunk types + pub fn total_bytes(&self) -> u64 { + self.native_bytes + .saturating_add(self.virtual_bytes) + .saturating_add(self.inlined_bytes) + } +} + +impl Add for ChunkStorageStats { + type Output = Self; + + fn add(self, other: Self) -> Self { + Self { + native_bytes: self.native_bytes.saturating_add(other.native_bytes), + virtual_bytes: self.virtual_bytes.saturating_add(other.virtual_bytes), + inlined_bytes: self.inlined_bytes.saturating_add(other.inlined_bytes), + } + } +} + +/// Helper function to deduplicate chunks by inserting into a HashSet and counting if new +fn insert_and_increment_size_if_new( + seen: &Arc>>, + key: T, + size_increment: u64, + size_counter: &mut u64, +) -> RepositoryResult<()> { + if seen + .lock() + .map_err(|e| { + RepositoryErrorKind::Other(format!( + "Thread panic during manifest_chunk_storage: {e}" + )) + })? + .insert(key) + { + *size_counter += size_increment; + } + Ok(()) +} + fn calculate_manifest_storage( manifest: Arc, - seen_chunks: Arc>>, -) -> RepositoryResult { + // Different types of chunks require using different types of ids to de-duplicate them when counting. + seen_native_chunks: Arc>>, + // Virtual chunks don't necessarily have checksums, so we instead use the (url, offset, length) tuple as an identifier. + // This is more expensive, but should work to de-duplicate because the only way that this identifier could be the same + // for different chunks is if the data were entirely overwritten at that exact storage location. + // In that scenario it makes sense not to count both chunks towards the storage total, + // as the overwritten data is no longer accessible anyway. + seen_virtual_chunks: Arc< + Mutex>, + >, +) -> RepositoryResult { trace!(manifest_id = %manifest.id(), "Processing manifest"); - let mut size = 0; + let mut native_bytes: u64 = 0; + let mut virtual_bytes: u64 = 0; + let mut inlined_bytes: u64 = 0; for payload in manifest.chunk_payloads() { match payload { Ok(ChunkPayload::Ref(chunk_ref)) => { - if seen_chunks - .lock() - .map_err(|e| { - RepositoryErrorKind::Other(format!( - "Thread panic during manifest_chunk_storage: {e}" - )) - })? - .insert(chunk_ref.id) - { - size += chunk_ref.length; - } + // Deduplicate native chunks by ChunkId + insert_and_increment_size_if_new( + &seen_native_chunks, + chunk_ref.id, + chunk_ref.length, + &mut native_bytes, + )?; + } + Ok(ChunkPayload::Virtual(virtual_ref)) => { + // Deduplicate by by (location, offset, length) + let virtual_chunk_identifier = ( + // TODO: Remove the need for this clone somehow? + // It could potentially save a lot of memory usage for large virtual stores with long urls... + virtual_ref.location.clone(), + virtual_ref.offset, + virtual_ref.length, + ); + insert_and_increment_size_if_new( + &seen_virtual_chunks, + virtual_chunk_identifier, + virtual_ref.length, + &mut virtual_bytes, + )?; + } + Ok(ChunkPayload::Inline(bytes)) => { + // Inline chunks are stored in the manifest itself, + // so count each occurrence since they're actually stored repeatedly across different manifests + inlined_bytes += bytes.len() as u64; } - Ok(_) => {} // TODO: don't skip errors Err(err) => { tracing::error!( @@ -51,7 +142,9 @@ fn calculate_manifest_storage( } } trace!(manifest_id = %manifest.id(), "Manifest done"); - Ok(size) + + let stats = ChunkStorageStats::new(native_bytes, virtual_bytes, inlined_bytes); + Ok(stats) } async fn unique_manifest_infos<'a>( @@ -78,13 +171,13 @@ async fn unique_manifest_infos<'a>( } /// Compute the total size in bytes of all committed repo chunks. -/// It doesn't include inline or virtual chunks. +/// The total for each type of chunk is computed separately. pub async fn repo_chunks_storage( asset_manager: Arc, max_snapshots_in_memory: NonZeroU16, max_compressed_manifest_mem_bytes: NonZeroUsize, max_concurrent_manifest_fetches: NonZeroU16, -) -> RepositoryResult { +) -> RepositoryResult { let extra_roots = Default::default(); let manifest_infos = unique_manifest_infos( asset_manager.clone(), @@ -104,7 +197,8 @@ pub async fn repo_chunks_storage( let rate_limited_manifests = limiter.limit_stream(manifest_infos, |minfo| minfo.size_bytes as usize); - let seen_chunks = &Arc::new(Mutex::new(HashSet::new())); + let seen_native_chunks = &Arc::new(Mutex::new(HashSet::new())); + let seen_virtual_chunks = &Arc::new(Mutex::new(HashSet::new())); let asset_manager = &asset_manager; let compute_stream = rate_limited_manifests @@ -117,19 +211,27 @@ pub async fn repo_chunks_storage( // StreamLimiter we know memory is not going to blow up .try_buffer_unordered(max_concurrent_manifest_fetches.get() as usize) .and_then(|(manifest, minfo)| async move { - let seen_chunks = Arc::clone(seen_chunks); - let size = task::spawn_blocking(|| { - calculate_manifest_storage(manifest, seen_chunks) + let seen_native_chunks = Arc::clone(seen_native_chunks); + let seen_virtual_chunks = Arc::clone(seen_virtual_chunks); + let stats = task::spawn_blocking(|| { + calculate_manifest_storage( + manifest, + seen_native_chunks, + seen_virtual_chunks, + ) }) .await??; - Ok((size, minfo)) + Ok((stats, minfo)) }); let (_, res) = limiter .unlimit_stream(compute_stream, |(_, minfo)| minfo.size_bytes as usize) - .try_fold((0u64, 0), |(processed, total_size), (partial, _)| { - //info!("Processed {processed} manifests"); - ready(Ok((processed + 1, total_size + partial))) - }) + .try_fold( + (0u64, ChunkStorageStats::default()), + |(processed, total_stats), (partial, _)| { + //info!("Processed {processed} manifests"); + ready(Ok((processed + 1, total_stats + partial))) + }, + ) .await?; debug_assert_eq!(limiter.current_usage(), 0); diff --git a/icechunk/tests/test_stats.rs b/icechunk/tests/test_stats.rs index 883b8c8c4..34a8a0cda 100644 --- a/icechunk/tests/test_stats.rs +++ b/icechunk/tests/test_stats.rs @@ -97,7 +97,7 @@ pub async fn do_test_repo_chunks_storage( let array_path: Path = "/array".try_into().unwrap(); session.add_array(array_path.clone(), shape, None, user_data.clone()).await?; - // we write 50 native chunks 6 bytes each + // we write 50 native chunks, 6 bytes each for idx in 0..50 { let bytes = Bytes::copy_from_slice(&[0, 1, 2, 3, 4, 5]); let payload = session.get_chunk_writer()?(bytes.clone()).await?; @@ -106,7 +106,7 @@ pub async fn do_test_repo_chunks_storage( .await?; } - // we write a few inline chunks + // we write 10 inline chunks, 1 byte each for idx in 50..60 { let bytes = Bytes::copy_from_slice(&[0]); let payload = session.get_chunk_writer()?(bytes.clone()).await?; @@ -115,7 +115,7 @@ pub async fn do_test_repo_chunks_storage( .await?; } - // we write a few virtual chunks + // we write 10 virtual chunks, 100 bytes each for idx in 60..70 { let payload = ChunkPayload::Virtual(VirtualChunkRef { location: VirtualChunkLocation::from_absolute_path("s3://foo/bar").unwrap(), @@ -128,7 +128,7 @@ pub async fn do_test_repo_chunks_storage( .await?; } - let size = repo_chunks_storage( + let stats = repo_chunks_storage( Arc::clone(&asset_manager), NonZeroU16::new(5).unwrap(), NonZeroUsize::MIN, @@ -137,10 +137,12 @@ pub async fn do_test_repo_chunks_storage( .await .unwrap(); // no commits - assert_eq!(size, 0); + assert_eq!(stats.native_bytes, 0); + assert_eq!(stats.virtual_bytes, 0); + assert_eq!(stats.inlined_bytes, 0); let _ = session.commit("first", None).await?; - let size = repo_chunks_storage( + let stats = repo_chunks_storage( Arc::clone(&asset_manager), NonZeroU16::new(5).unwrap(), NonZeroUsize::MAX, @@ -149,12 +151,14 @@ pub async fn do_test_repo_chunks_storage( .await .unwrap(); - // 50 native chunks 60 bytes each - assert_eq!(size, 50 * 6); + // 50 native chunks 6 bytes each, 10 inline chunks 1 byte each, 10 virtual chunks 100 bytes each + assert_eq!(stats.native_bytes, 50 * 6); + assert_eq!(stats.inlined_bytes, 10 * 1); + assert_eq!(stats.virtual_bytes, 10 * 100); // we do a new commit let mut session = repo.writable_session("main").await?; - // we write 10 native chunks 6 bytes each + // we write 10 more native chunks, 6 bytes each for idx in 0..10 { let bytes = Bytes::copy_from_slice(&[0, 1, 2, 3, 4, 5]); let payload = session.get_chunk_writer()?(bytes.clone()).await?; @@ -164,7 +168,7 @@ pub async fn do_test_repo_chunks_storage( } let second_commit = session.commit("second", None).await?; - let size = repo_chunks_storage( + let stats = repo_chunks_storage( Arc::clone(&asset_manager), NonZeroU16::new(5).unwrap(), NonZeroUsize::MIN, @@ -173,7 +177,11 @@ pub async fn do_test_repo_chunks_storage( .await .unwrap(); // 50 native chunks from first commit, 10 from second - assert_eq!(size, 50 * 6 + 10 * 6); + assert_eq!(stats.native_bytes, (50 + 10) * 6); + // Inline chunks are NOT deduplicated - they're stored in each manifest + // First manifest has 10, second manifest inherits 10 from parent = 20 total + assert_eq!(stats.inlined_bytes, (10 + 10) * 1); + assert_eq!(stats.virtual_bytes, 10 * 100); // add more chunks in a different branch repo.create_branch("dev", &second_commit).await.unwrap(); @@ -195,7 +203,7 @@ pub async fn do_test_repo_chunks_storage( .await?; } let _ = session.commit("third", None).await?; - let size = repo_chunks_storage( + let stats = repo_chunks_storage( Arc::clone(&asset_manager), NonZeroU16::new(5).unwrap(), NonZeroUsize::MAX, @@ -204,6 +212,89 @@ pub async fn do_test_repo_chunks_storage( .await .unwrap(); // 50 native chunks from first commit, 10 from second, 5 from third - assert_eq!(size, 50 * 6 + 10 * 6 + 5 * 6); + assert_eq!(stats.native_bytes, 50 * 6 + 10 * 6 + 5 * 6); + // Inline chunks are stored in each manifest, so NOT deduplicated: + // Manifest 1: 10 bytes, Manifest 2: 10 bytes (inherited), Manifest 3: 10 bytes (rewritten) = 30 total + assert_eq!(stats.inlined_bytes, (10 + 10 + 10) * 1); + assert_eq!(stats.virtual_bytes, 10 * 100); + Ok(()) +} + +#[tokio_test] +pub async fn test_virtual_chunk_deduplication() -> Result<(), Box> +{ + let storage = new_in_memory_storage().await?; + let storage_settings = storage.default_settings().await?; + let asset_manager = Arc::new(AssetManager::new_no_cache( + storage.clone(), + storage_settings.clone(), + SpecVersionBin::current(), + 1, + DEFAULT_MAX_CONCURRENT_REQUESTS, + )); + + let repo = Repository::create( + Some(RepositoryConfig { + inline_chunk_threshold_bytes: Some(5), + ..Default::default() + }), + storage, + Default::default(), + None, + ) + .await?; + + let mut session = repo.writable_session("main").await?; + let user_data = Bytes::new(); + session.add_group(Path::root(), user_data.clone()).await?; + let shape = ArrayShape::new(vec![(100, 1)]).unwrap(); + let array_path: Path = "/array".try_into().unwrap(); + session.add_array(array_path.clone(), shape, None, user_data.clone()).await?; + + // Write virtual chunks with the same location but different offsets + for idx in 0u32..10 { + let payload = ChunkPayload::Virtual(VirtualChunkRef { + location: VirtualChunkLocation::from_absolute_path("s3://bucket/file.dat") + .unwrap(), + offset: (idx * 100) as u64, + length: 100, + checksum: None, + }); + session + .set_chunk_ref(array_path.clone(), ChunkIndices(vec![idx]), Some(payload)) + .await?; + } + + // Write duplicate virtual chunks (same location AND offset AND length) + for idx in 10u32..15 { + let payload = ChunkPayload::Virtual(VirtualChunkRef { + location: VirtualChunkLocation::from_absolute_path("s3://bucket/file.dat") + .unwrap(), + offset: 0, // Same offset as idx=0 + length: 100, + checksum: None, + }); + session + .set_chunk_ref(array_path.clone(), ChunkIndices(vec![idx]), Some(payload)) + .await?; + } + + session.commit("first", None).await?; + + let stats = repo_chunks_storage( + Arc::clone(&asset_manager), + NonZeroU16::new(5).unwrap(), + NonZeroUsize::MAX, + NonZeroU16::try_from(10).unwrap(), + ) + .await + .unwrap(); + + // Should have 10 unique virtual chunks (indices 0-9 with different offsets) + // Indices 10-14 are duplicates of index 0, so they shouldn't add to the count + assert_eq!(stats.virtual_bytes, 10 * 100); + assert_eq!(stats.native_bytes, 0); + assert_eq!(stats.inlined_bytes, 0); + Ok(()) }