Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions icechunk-python/python/icechunk/_icechunk_python.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1810,6 +1810,7 @@ class PyRepository:
max_compressed_manifest_mem_bytes: int = 512 * 1024 * 1024,
max_concurrent_manifest_fetches: int = 500,
) -> ChunkStorageStats: ...
async def chunk_storage_stats_by_prefix_async(self) -> ChunkStorageStats: ...
def total_chunks_storage(
self,
*,
Expand Down
18 changes: 18 additions & 0 deletions icechunk-python/python/icechunk/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -1736,6 +1736,24 @@ async def total_chunks_storage_async(
)
return stats.native_bytes

async def chunk_storage_stats_by_prefix_async(self) -> ChunkStorageStats:
"""Calculate storage stats by listing the chunks prefix directly (async version).

This is a memory-efficient alternative to chunk_storage_stats_async that lists
objects in storage directly instead of fetching and parsing manifests. This
dramatically reduces memory usage for large repositories.

Note: This method only calculates native_bytes. Virtual and inline chunk
statistics will be zero since they cannot be determined from storage listings alone.

Returns
-------
ChunkStorageStats
Storage statistics with native_bytes populated from storage listing.
virtual_bytes and inlined_bytes will be 0.
"""
return await self._repository.chunk_storage_stats_by_prefix_async()

def inspect_snapshot(self, snapshot_id: str, *, pretty: bool = True) -> str:
return self._repository.inspect_snapshot(snapshot_id, pretty=pretty)

Expand Down
22 changes: 21 additions & 1 deletion icechunk-python/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use icechunk::{
ops::{
gc::{ExpiredRefAction, GCConfig, GCSummary, expire, garbage_collect},
manifests::rewrite_manifests,
stats::repo_chunks_storage,
stats::{repo_chunks_storage, repo_chunks_storage_by_prefix},
},
repository::{RepositoryError, RepositoryErrorKind, VersionInfo},
};
Expand Down Expand Up @@ -2274,6 +2274,26 @@ impl PyRepository {
)
}

pub(crate) fn chunk_storage_stats_by_prefix_async<'py>(
&'py self,
py: Python<'py>,
) -> PyResult<Bound<'py, PyAny>> {
let repository = self.0.clone();
pyo3_async_runtimes::tokio::future_into_py::<_, PyChunkStorageStats>(
py,
async move {
let asset_manager = {
let lock = repository.read().await;
Arc::clone(lock.asset_manager())
};
let stats = repo_chunks_storage_by_prefix(asset_manager)
.await
.map_err(PyIcechunkStoreError::RepositoryError)?;
Ok(stats.into())
},
)
}

#[pyo3(signature = (snapshot_id, *, pretty = true))]
fn inspect_snapshot(&self, snapshot_id: String, pretty: bool) -> PyResult<String> {
let result = pyo3_async_runtimes::tokio::get_runtime()
Expand Down
49 changes: 49 additions & 0 deletions icechunk-python/tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,52 @@ def test_chunk_storage_on_filesystem(dir: str) -> None:
f.stat().st_size for f in (Path(dir) / "chunks").glob("*") if f.is_file()
)
assert actual == expected


@pytest.mark.filterwarnings("ignore:datetime.datetime.utcnow")
async def test_chunk_storage_by_prefix_async(any_spec_version: int | None) -> None:
"""Test the memory-efficient by_prefix async version"""

repo = await ic.Repository.create_async(
storage=ic.in_memory_storage(),
config=ic.RepositoryConfig(inline_chunk_threshold_bytes=0),
spec_version=any_spec_version,
)
session = await repo.writable_session_async("main")
store = session.store

group = zarr.group(store=store, overwrite=True)
array = group.create_array(
"array",
shape=(100),
chunks=(1,),
dtype="i4",
compressors=None,
)

array[:] = 42
await session.commit_async("commit 1")

# Test the by_prefix method
stats = await repo.chunk_storage_stats_by_prefix_async()
assert stats.native_bytes == 100 * 4
# Virtual and inline should be 0 since we're only listing storage
assert stats.virtual_bytes == 0
assert stats.inlined_bytes == 0


@pytest.mark.parametrize(
"dir", ["./tests/data/test-repo-v2", "./tests/data/test-repo-v1"]
)
async def test_chunk_storage_by_prefix_on_filesystem(dir: str) -> None:
"""Test that by_prefix method gives same results as listing filesystem"""
repo = await ic.Repository.open_async(
storage=ic.local_filesystem_storage(dir),
)
stats = await repo.chunk_storage_stats_by_prefix_async()
expected = sum(
f.stat().st_size for f in (Path(dir) / "chunks").glob("*") if f.is_file()
)
assert stats.native_bytes == expected
assert stats.virtual_bytes == 0
assert stats.inlined_bytes == 0
32 changes: 31 additions & 1 deletion icechunk/src/ops/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tracing::trace;
use crate::{
asset_manager::AssetManager,
format::{
ChunkId, ChunkLength, ChunkOffset, SnapshotId,
ChunkId, ChunkLength, ChunkOffset, SnapshotId, CHUNKS_FILE_PATH,
manifest::{ChunkPayload, Manifest, VirtualChunkLocation},
snapshot::ManifestFileInfo,
},
Expand Down Expand Up @@ -238,3 +238,33 @@ pub async fn repo_chunks_storage(

Ok(res)
}

/// Compute the total size in bytes of native chunks by listing storage directly.
/// This is a memory-efficient alternative to `repo_chunks_storage` that avoids
/// loading manifests and building large HashSets. It only counts native chunks
/// by listing the "chunks/" prefix in storage.
///
/// This is significantly more memory efficient but only returns native_bytes.
/// Virtual and inline chunk statistics will be zero.
pub async fn repo_chunks_storage_by_prefix(
asset_manager: Arc<AssetManager>,
) -> RepositoryResult<ChunkStorageStats> {
let storage = asset_manager.storage();
let settings = asset_manager.storage_settings();

// List all objects under the "chunks/" prefix
let chunks_stream = storage.list_objects(settings, CHUNKS_FILE_PATH).await?;

// Sum up the sizes
let native_bytes = chunks_stream
.try_fold(0u64, |total, list_info| {
ready(Ok(total.saturating_add(list_info.size_bytes)))
})
.await?;

Ok(ChunkStorageStats {
native_bytes,
virtual_bytes: 0,
inlined_bytes: 0,
})
}
Loading