Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions icechunk-python/python/icechunk/_icechunk_python.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1651,6 +1651,20 @@ class PyRepository:
max_compressed_manifest_mem_bytes: int = 512 * 1024 * 1024,
max_concurrent_manifest_fetches: int = 500,
) -> GCSummary: ...
def chunk_storage_stats(
self,
*,
max_snapshots_in_memory: int = 50,
max_compressed_manifest_mem_bytes: int = 512 * 1024 * 1024,
max_concurrent_manifest_fetches: int = 500,
) -> ChunkStorageStats: ...
async def chunk_storage_stats_async(
self,
*,
max_snapshots_in_memory: int = 50,
max_compressed_manifest_mem_bytes: int = 512 * 1024 * 1024,
max_concurrent_manifest_fetches: int = 500,
) -> ChunkStorageStats: ...
def total_chunks_storage(
self,
*,
Expand Down Expand Up @@ -2404,3 +2418,52 @@ def spec_version() -> int:
int: The version of the Icechunk specification that the library is compatible with
"""
...

class ChunkStorageStats:
"""Statistics about chunk storage across different chunk types."""

@property
def native_bytes(self) -> int:
"""Total bytes stored in native chunks (stored in icechunk's chunk storage)"""
...

@property
def virtual_bytes(self) -> int:
"""Total bytes stored in virtual chunks (references to external data)"""
...

@property
def inlined_bytes(self) -> int:
"""Total bytes stored in inline chunks (stored directly in manifests)"""
...

@property
def non_virtual_bytes(self) -> int:
"""
Total bytes excluding virtual chunks.

This represents the approximate size of all objects stored in the
icechunk repository itself (native chunks plus inline chunks).
Virtual chunks are not included since they reference external data.

Returns:
int: The sum of native_bytes and inlined_bytes
"""
...

@property
def total_bytes(self) -> int:
"""
Total bytes across all chunk types.

Returns the sum of native_bytes, virtual_bytes, and inlined_bytes.
This represents the total size of all data referenced by the repository,
including both data stored in icechunk and external virtual references.

Returns:
int: The sum of all chunk storage bytes
"""
...

def __repr__(self) -> str: ...
def __add__(self, other: ChunkStorageStats) -> ChunkStorageStats: ...
94 changes: 91 additions & 3 deletions icechunk-python/python/icechunk/repository.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import datetime
import warnings
from collections.abc import AsyncIterator, Iterator
from contextlib import contextmanager
from typing import Any, Self, cast

from icechunk import ConflictSolver
from icechunk._icechunk_python import (
ChunkStorageStats,
Diff,
GCSummary,
PyRepository,
Expand Down Expand Up @@ -1373,14 +1375,80 @@ async def garbage_collect_async(
max_concurrent_manifest_fetches=max_concurrent_manifest_fetches,
)

def chunk_storage_stats(
self,
*,
max_snapshots_in_memory: int = 50,
max_compressed_manifest_mem_bytes: int = 512 * 1024 * 1024,
max_concurrent_manifest_fetches: int = 500,
) -> ChunkStorageStats:
"""Calculate the total storage used for chunks, in bytes.

It reports the storage needed to store all snapshots in the repository that
are reachable from any branches or tags. Unreachable snapshots can be generated
by using `reset_branch` or `expire_snapshots`. The chunks for these snapshots
are not included in the result, and they should probably be deleted using
`garbage_collection`.

The result is a dataclass with attributes for storage consumed by different
types of chunks (e.g. `native_bytes`, `virtual_bytes`, `total_bytes`).

Parameters
----------
max_snapshots_in_memory: int
Don't prefetch more than this many Snapshots to memory.
max_compressed_manifest_mem_bytes : int
Don't use more than this memory to store compressed in-flight manifests.
max_concurrent_manifest_fetches : int
Don't run more than this many concurrent manifest fetches.
"""
return self._repository.chunk_storage_stats(
max_snapshots_in_memory=max_snapshots_in_memory,
max_compressed_manifest_mem_bytes=max_compressed_manifest_mem_bytes,
max_concurrent_manifest_fetches=max_concurrent_manifest_fetches,
)

async def chunk_storage_stats_async(
self,
*,
max_snapshots_in_memory: int = 50,
max_compressed_manifest_mem_bytes: int = 512 * 1024 * 1024,
max_concurrent_manifest_fetches: int = 500,
) -> ChunkStorageStats:
"""Calculate the total storage used for chunks, in bytes (async version).

It reports the storage needed to store all snapshots in the repository that
are reachable from any branches or tags. Unreachable snapshots can be generated
by using `reset_branch` or `expire_snapshots`. The chunks for these snapshots
are not included in the result, and they should probably be deleted using
`garbage_collection`.

The result is a dataclass with attributes for storage consumed by different
types of chunks (e.g. `native_bytes`, `virtual_bytes`, `total_bytes`).

Parameters
----------
max_snapshots_in_memory: int
Don't prefetch more than this many Snapshots to memory.
max_compressed_manifest_mem_bytes : int
Don't use more than this memory to store compressed in-flight manifests.
max_concurrent_manifest_fetches : int
Don't run more than this many concurrent manifest fetches.
"""
return await self._repository.chunk_storage_stats_async(
max_snapshots_in_memory=max_snapshots_in_memory,
max_compressed_manifest_mem_bytes=max_compressed_manifest_mem_bytes,
max_concurrent_manifest_fetches=max_concurrent_manifest_fetches,
)

def total_chunks_storage(
self,
*,
max_snapshots_in_memory: int = 50,
max_compressed_manifest_mem_bytes: int = 512 * 1024 * 1024,
max_concurrent_manifest_fetches: int = 500,
) -> int:
"""Calculate the total storage used for chunks, in bytes .
"""Calculate the total storage used for chunks, in bytes.

It reports the storage needed to store all snapshots in the repository that
are reachable from any branches or tags. Unreachable snapshots can be generated
Expand All @@ -1400,11 +1468,21 @@ def total_chunks_storage(
Don't run more than this many concurrent manifest fetches.
"""

return self._repository.total_chunks_storage(
warnings.warn(
"The ``total_chunks_storage`` method has been deprecated in favour of the ``chunk_storage_stats`` method. "
"The new method is superior, as it actually calculates storage size occupied by inlined and virtual chunks in addition to native chunks. "
"You can still access just the total native bytes: to keep your existing behaviour using API that will not be removed in a future version, "
"please replace your existing ``.total_chunks_storage(**kwargs)`` method call with ``.chunk_storage_stats(**same_kwargs).native_bytes``.",
DeprecationWarning,
stacklevel=2,
)

stats = self._repository.chunk_storage_stats(
max_snapshots_in_memory=max_snapshots_in_memory,
max_compressed_manifest_mem_bytes=max_compressed_manifest_mem_bytes,
max_concurrent_manifest_fetches=max_concurrent_manifest_fetches,
)
return stats.native_bytes

async def total_chunks_storage_async(
self,
Expand Down Expand Up @@ -1433,11 +1511,21 @@ async def total_chunks_storage_async(
Don't run more than this many concurrent manifest fetches.
"""

return await self._repository.total_chunks_storage_async(
warnings.warn(
"The ``total_chunks_storage_async`` method has been deprecated in favour of the ``chunk_storage_stats_async`` method. "
"The new method is superior, as it actually calculates storage size occupied by inlined and virtual chunks in addition to native chunks. "
"You can still access just the total native bytes: to keep your existing behaviour using API that will not be removed in a future version, "
"please replace your existing ``.total_chunks_storage_async(**kwargs)`` method call with ``.chunk_storage_stats_async(**same_kwargs).native_bytes``.",
DeprecationWarning,
stacklevel=2,
)

stats = await self._repository.chunk_storage_stats_async(
max_snapshots_in_memory=max_snapshots_in_memory,
max_compressed_manifest_mem_bytes=max_compressed_manifest_mem_bytes,
max_concurrent_manifest_fetches=max_concurrent_manifest_fetches,
)
return stats.native_bytes

def inspect_snapshot(self, snapshot_id: str, *, pretty: bool = True) -> str:
return self._repository.inspect_snapshot(snapshot_id, pretty=pretty)
Expand Down
3 changes: 3 additions & 0 deletions icechunk-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod errors;
mod pickle;
mod repository;
mod session;
mod stats;
mod store;
mod streams;

Expand Down Expand Up @@ -32,6 +33,7 @@ use pyo3::types::PyMapping;
use pyo3::wrap_pyfunction;
use repository::{PyDiff, PyGCSummary, PyManifestFileInfo, PyRepository, PySnapshotInfo};
use session::PySession;
use stats::PyChunkStorageStats;
use store::{PyStore, VirtualChunkSpec};

#[cfg(feature = "cli")]
Expand Down Expand Up @@ -149,6 +151,7 @@ fn _icechunk_python(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyStore>()?;
m.add_class::<PySnapshotInfo>()?;
m.add_class::<PyManifestFileInfo>()?;
m.add_class::<PyChunkStorageStats>()?;
m.add_class::<PyConflictSolver>()?;
m.add_class::<PyBasicConflictSolver>()?;
m.add_class::<PyConflictDetector>()?;
Expand Down
58 changes: 31 additions & 27 deletions icechunk-python/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::{
errors::PyIcechunkStoreError,
impl_pickle,
session::PySession,
stats::PyChunkStorageStats,
streams::PyAsyncGenerator,
};

Expand Down Expand Up @@ -1600,16 +1601,16 @@ impl PyRepository {
})
}

pub(crate) fn total_chunks_storage(
pub(crate) fn chunk_storage_stats(
&self,
py: Python<'_>,
max_snapshots_in_memory: NonZeroU16,
max_compressed_manifest_mem_bytes: NonZeroUsize,
max_concurrent_manifest_fetches: NonZeroU16,
) -> PyResult<u64> {
) -> PyResult<PyChunkStorageStats> {
// This function calls block_on, so we need to allow other thread python to make progress
py.detach(move || {
let result =
let stats =
pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
let (storage, storage_settings, asset_manager) = {
let lock = self.0.read().await;
Expand All @@ -1619,7 +1620,7 @@ impl PyRepository {
Arc::clone(lock.asset_manager()),
)
};
let result = repo_chunks_storage(
let stats = repo_chunks_storage(
storage.as_ref(),
&storage_settings,
asset_manager,
Expand All @@ -1629,42 +1630,45 @@ impl PyRepository {
)
.await
.map_err(PyIcechunkStoreError::RepositoryError)?;
Ok::<_, PyIcechunkStoreError>(result)
Ok::<_, PyIcechunkStoreError>(stats)
})?;

Ok(result)
Ok(stats.into())
})
}

fn total_chunks_storage_async<'py>(
pub(crate) fn chunk_storage_stats_async<'py>(
&'py self,
py: Python<'py>,
max_snapshots_in_memory: NonZeroU16,
max_compressed_manifest_mem_bytes: NonZeroUsize,
max_concurrent_manifest_fetches: NonZeroU16,
) -> PyResult<Bound<'py, PyAny>> {
let repository = self.0.clone();
pyo3_async_runtimes::tokio::future_into_py::<_, u64>(py, async move {
let (storage, storage_settings, asset_manager) = {
let lock = repository.read().await;
(
Arc::clone(lock.storage()),
lock.storage_settings().clone(),
Arc::clone(lock.asset_manager()),
pyo3_async_runtimes::tokio::future_into_py::<_, PyChunkStorageStats>(
py,
async move {
let (storage, storage_settings, asset_manager) = {
let lock = repository.read().await;
(
Arc::clone(lock.storage()),
lock.storage_settings().clone(),
Arc::clone(lock.asset_manager()),
)
};
let stats = repo_chunks_storage(
storage.as_ref(),
&storage_settings,
asset_manager,
max_snapshots_in_memory,
max_compressed_manifest_mem_bytes,
max_concurrent_manifest_fetches,
)
};
let result = repo_chunks_storage(
storage.as_ref(),
&storage_settings,
asset_manager,
max_snapshots_in_memory,
max_compressed_manifest_mem_bytes,
max_concurrent_manifest_fetches,
)
.await
.map_err(PyIcechunkStoreError::RepositoryError)?;
Ok(result)
})
.await
.map_err(PyIcechunkStoreError::RepositoryError)?;
Ok(stats.into())
},
)
}

#[pyo3(signature = (snapshot_id, *, pretty = true))]
Expand Down
Loading