-
Notifications
You must be signed in to change notification settings - Fork 61
Extend storage stats calculation to include virtual and inline chunks #1483
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
824fec5
e63ab97
cc78013
772d4e5
72cbd45
7cc2da6
f8fadcf
1669a5b
7264f32
710f0e0
ce7cdd5
6e55996
39d382f
3b910e2
68589c6
6fa763f
03fbac5
262567c
1c71151
76226be
57cbc69
c532eee
b71c195
3d3ece4
f7b0578
641f80a
1049a69
d53554c
138b025
14b8ad3
e3bc18c
67849f6
c9e7284
794b560
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| use pyo3::{pyclass, pymethods}; | ||
|
|
||
| use icechunk::ops::stats::ChunkStorageStats; | ||
|
|
||
| /// Statistics about chunk storage across different chunk types. | ||
| #[pyclass(name = "ChunkStorageStats")] | ||
| #[derive(Clone, Debug)] | ||
| pub struct PyChunkStorageStats { | ||
| /// Total bytes stored in native chunks (stored in icechunk's chunk storage) | ||
| #[pyo3(get)] | ||
| pub native_bytes: u64, | ||
| /// Total bytes stored in virtual chunks (references to external data) | ||
| #[pyo3(get)] | ||
| pub virtual_bytes: u64, | ||
| /// Total bytes stored in inline chunks (stored directly in manifests) | ||
| #[pyo3(get)] | ||
| pub inlined_bytes: u64, | ||
| } | ||
|
|
||
| impl From<ChunkStorageStats> for PyChunkStorageStats { | ||
| fn from(stats: ChunkStorageStats) -> Self { | ||
| Self { | ||
| native_bytes: stats.native_bytes, | ||
| virtual_bytes: stats.virtual_bytes, | ||
| inlined_bytes: stats.inlined_bytes, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[pymethods] | ||
| impl PyChunkStorageStats { | ||
| /// Total bytes excluding virtual chunks. | ||
| /// | ||
| /// This represents the approximate size of all objects stored in the | ||
| /// icechunk repository itself (native chunks plus inline chunks). | ||
| /// Virtual chunks are not included since they reference external data. | ||
| /// | ||
| /// Returns: | ||
| /// int: The sum of native_bytes and inlined_bytes | ||
| pub fn non_virtual_bytes(&self) -> u64 { | ||
| self.native_bytes | ||
| .saturating_add(self.native_bytes) | ||
| .saturating_add(self.inlined_bytes) | ||
| } | ||
|
|
||
| /// Total bytes across all chunk types. | ||
| /// | ||
| /// Returns the sum of native_bytes, virtual_bytes, and inlined_bytes. | ||
| /// This represents the total size of all data referenced by the repository, | ||
| /// including both data stored in icechunk and external virtual references. | ||
| /// | ||
| /// Returns: | ||
| /// int: The sum of all chunk storage bytes | ||
| pub fn total_bytes(&self) -> u64 { | ||
| self.native_bytes | ||
| .saturating_add(self.virtual_bytes) | ||
| .saturating_add(self.inlined_bytes) | ||
| } | ||
|
|
||
| pub fn __repr__(&self) -> String { | ||
| format!( | ||
| "ChunkStorageStats(native_bytes={}, virtual_bytes={}, inlined_bytes={})", | ||
| self.native_bytes, self.virtual_bytes, self.inlined_bytes | ||
| ) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ use futures::{StreamExt, TryStream, TryStreamExt, future::ready, stream}; | |
| use std::{ | ||
| collections::HashSet, | ||
| num::{NonZeroU16, NonZeroUsize}, | ||
| ops::Add, | ||
| sync::{Arc, Mutex}, | ||
| }; | ||
| use tokio::task; | ||
|
|
@@ -10,37 +11,138 @@ use tracing::trace; | |
| use crate::{ | ||
| asset_manager::AssetManager, | ||
| format::{ | ||
| ChunkId, SnapshotId, | ||
| manifest::{ChunkPayload, Manifest}, | ||
| ChunkId, ChunkLength, ChunkOffset, SnapshotId, | ||
| manifest::{Checksum, ChunkPayload, Manifest, VirtualChunkLocation}, | ||
| snapshot::ManifestFileInfo, | ||
| }, | ||
| ops::pointed_snapshots, | ||
| repository::{RepositoryError, RepositoryErrorKind, RepositoryResult}, | ||
| stream_utils::{StreamLimiter, try_unique_stream}, | ||
| }; | ||
|
|
||
| /// Statistics about chunk storage across different chunk types | ||
| #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] | ||
| pub struct ChunkStorageStats { | ||
| /// Total bytes stored in native chunks (stored in icechunk's chunk storage) | ||
| pub native_bytes: u64, | ||
| /// Total bytes stored in virtual chunks (references to external data) | ||
| pub virtual_bytes: u64, | ||
| /// Total bytes stored in inline chunks (stored directly in manifests) | ||
| pub inlined_bytes: u64, | ||
| } | ||
|
|
||
| impl ChunkStorageStats { | ||
| /// Create a new ChunkStorageStats with the specified byte counts | ||
| pub fn new(native_bytes: u64, virtual_bytes: u64, inlined_bytes: u64) -> Self { | ||
| Self { native_bytes, virtual_bytes, inlined_bytes } | ||
| } | ||
|
|
||
| /// Get the total bytes excluding virtual chunks (this is ~= to the size of all objects in the icechunk repo) | ||
| pub fn non_virtual_bytes(&self) -> u64 { | ||
| self.native_bytes | ||
| .saturating_add(self.native_bytes) | ||
| .saturating_add(self.inlined_bytes) | ||
| } | ||
|
|
||
| /// Get the total bytes across all chunk types | ||
| pub fn total_bytes(&self) -> u64 { | ||
| self.native_bytes | ||
| .saturating_add(self.virtual_bytes) | ||
| .saturating_add(self.inlined_bytes) | ||
| } | ||
| } | ||
|
|
||
| impl Add for ChunkStorageStats { | ||
| type Output = Self; | ||
|
|
||
| fn add(self, other: Self) -> Self { | ||
| Self { | ||
| native_bytes: self.native_bytes.saturating_add(other.native_bytes), | ||
| virtual_bytes: self.virtual_bytes.saturating_add(other.virtual_bytes), | ||
| inlined_bytes: self.inlined_bytes.saturating_add(other.inlined_bytes), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Helper function to deduplicate chunks by inserting into a HashSet and counting if new | ||
| fn insert_and_increment_size_if_new<T: Eq + std::hash::Hash>( | ||
| seen: &Arc<Mutex<HashSet<T>>>, | ||
| key: T, | ||
| size_increment: u64, | ||
| size_counter: &mut u64, | ||
| ) -> RepositoryResult<()> { | ||
| if seen | ||
| .lock() | ||
| .map_err(|e| { | ||
| RepositoryErrorKind::Other(format!( | ||
| "Thread panic during manifest_chunk_storage: {e}" | ||
| )) | ||
| })? | ||
| .insert(key) | ||
| { | ||
| *size_counter += size_increment; | ||
| } | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn calculate_manifest_storage( | ||
| manifest: Arc<Manifest>, | ||
| seen_chunks: Arc<Mutex<HashSet<ChunkId>>>, | ||
| ) -> RepositoryResult<u64> { | ||
| // Different types of chunks require using different types of ids to de-duplicate them when counting. | ||
| seen_native_chunks: Arc<Mutex<HashSet<ChunkId>>>, | ||
| seen_virtual_checksums: Arc<Mutex<HashSet<Checksum>>>, | ||
| // Virtual chunks don't necessarily have checksums. For those which don't we instead use the (url, offset, length) tuple as an identifier. | ||
| // This is more expensive, but should still work to de-duplicate because the only way that this identifier could be the same for different chunks | ||
| // is if the data were entirely overwritten at that exact storage location. In this scenario it makes sense not to count both chunks towards the storage total, | ||
| // as the overwritten data is no longer accessible anyway. | ||
| seen_virtual_identifiers: Arc< | ||
| Mutex<HashSet<(VirtualChunkLocation, ChunkOffset, ChunkLength)>>, | ||
| >, | ||
| ) -> RepositoryResult<ChunkStorageStats> { | ||
| trace!(manifest_id = %manifest.id(), "Processing manifest"); | ||
| let mut size = 0; | ||
| let mut native_bytes: u64 = 0; | ||
| let mut virtual_bytes: u64 = 0; | ||
| let mut inlined_bytes: u64 = 0; | ||
| for payload in manifest.chunk_payloads() { | ||
| match payload { | ||
| Ok(ChunkPayload::Ref(chunk_ref)) => { | ||
| if seen_chunks | ||
| .lock() | ||
| .map_err(|e| { | ||
| RepositoryErrorKind::Other(format!( | ||
| "Thread panic during manifest_chunk_storage: {e}" | ||
| )) | ||
| })? | ||
| .insert(chunk_ref.id) | ||
| { | ||
| size += chunk_ref.length; | ||
| // Deduplicate native chunks by ChunkId | ||
| insert_and_increment_size_if_new( | ||
| &seen_native_chunks, | ||
| chunk_ref.id, | ||
| chunk_ref.length, | ||
| &mut native_bytes, | ||
| )?; | ||
| } | ||
| Ok(ChunkPayload::Virtual(virtual_ref)) => { | ||
| // Deduplicate by checksum if available, otherwise by (location, offset, length) | ||
| if let Some(checksum) = &virtual_ref.checksum { | ||
TomNicholas marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // Has checksum: deduplicate by checksum | ||
| insert_and_increment_size_if_new( | ||
| &seen_virtual_checksums, | ||
| checksum.clone(), | ||
| virtual_ref.length, | ||
| &mut virtual_bytes, | ||
| )?; | ||
| } else { | ||
| // No checksum: deduplicate by (location, offset, length) | ||
| let virtual_identifier = ( | ||
| virtual_ref.location.clone(), | ||
|
||
| virtual_ref.offset, | ||
| virtual_ref.length, | ||
| ); | ||
| insert_and_increment_size_if_new( | ||
| &seen_virtual_identifiers, | ||
| virtual_identifier, | ||
| virtual_ref.length, | ||
| &mut virtual_bytes, | ||
| )?; | ||
| } | ||
| } | ||
| Ok(_) => {} | ||
| Ok(ChunkPayload::Inline(bytes)) => { | ||
| // Inline chunks are stored in the manifest itself, | ||
| // so count each occurrence since they're actually stored repeatedly across different manifests | ||
| inlined_bytes += bytes.len() as u64; | ||
| } | ||
| // TODO: don't skip errors | ||
| Err(err) => { | ||
| tracing::error!( | ||
|
|
@@ -51,7 +153,9 @@ fn calculate_manifest_storage( | |
| } | ||
| } | ||
| trace!(manifest_id = %manifest.id(), "Manifest done"); | ||
| Ok(size) | ||
|
|
||
| let stats = ChunkStorageStats::new(native_bytes, virtual_bytes, inlined_bytes); | ||
| Ok(stats) | ||
| } | ||
|
|
||
| async fn unique_manifest_infos<'a>( | ||
|
|
@@ -84,7 +188,7 @@ pub async fn repo_chunks_storage( | |
| max_snapshots_in_memory: NonZeroU16, | ||
| max_compressed_manifest_mem_bytes: NonZeroUsize, | ||
| max_concurrent_manifest_fetches: NonZeroU16, | ||
| ) -> RepositoryResult<u64> { | ||
| ) -> RepositoryResult<ChunkStorageStats> { | ||
| let extra_roots = Default::default(); | ||
| let manifest_infos = unique_manifest_infos( | ||
| asset_manager.clone(), | ||
|
|
@@ -104,7 +208,9 @@ pub async fn repo_chunks_storage( | |
| let rate_limited_manifests = | ||
| limiter.limit_stream(manifest_infos, |minfo| minfo.size_bytes as usize); | ||
|
|
||
| let seen_chunks = &Arc::new(Mutex::new(HashSet::new())); | ||
| let seen_native_chunks = &Arc::new(Mutex::new(HashSet::new())); | ||
| let seen_virtual_checksums = &Arc::new(Mutex::new(HashSet::new())); | ||
| let seen_virtual_composites = &Arc::new(Mutex::new(HashSet::new())); | ||
| let asset_manager = &asset_manager; | ||
|
|
||
| let compute_stream = rate_limited_manifests | ||
|
|
@@ -117,19 +223,29 @@ pub async fn repo_chunks_storage( | |
| // StreamLimiter we know memory is not going to blow up | ||
| .try_buffer_unordered(max_concurrent_manifest_fetches.get() as usize) | ||
| .and_then(|(manifest, minfo)| async move { | ||
| let seen_chunks = Arc::clone(seen_chunks); | ||
| let size = task::spawn_blocking(|| { | ||
| calculate_manifest_storage(manifest, seen_chunks) | ||
| let seen_native_chunks = Arc::clone(seen_native_chunks); | ||
| let seen_virtual_checksums = Arc::clone(seen_virtual_checksums); | ||
| let seen_virtual_composites = Arc::clone(seen_virtual_composites); | ||
| let stats = task::spawn_blocking(|| { | ||
| calculate_manifest_storage( | ||
| manifest, | ||
| seen_native_chunks, | ||
| seen_virtual_checksums, | ||
| seen_virtual_composites, | ||
| ) | ||
| }) | ||
| .await??; | ||
| Ok((size, minfo)) | ||
| Ok((stats, minfo)) | ||
| }); | ||
| let (_, res) = limiter | ||
| .unlimit_stream(compute_stream, |(_, minfo)| minfo.size_bytes as usize) | ||
| .try_fold((0u64, 0), |(processed, total_size), (partial, _)| { | ||
| //info!("Processed {processed} manifests"); | ||
| ready(Ok((processed + 1, total_size + partial))) | ||
| }) | ||
| .try_fold( | ||
| (0u64, ChunkStorageStats::default()), | ||
| |(processed, total_stats), (partial, _)| { | ||
| //info!("Processed {processed} manifests"); | ||
| ready(Ok((processed + 1, total_stats + partial))) | ||
| }, | ||
| ) | ||
| .await?; | ||
|
|
||
| debug_assert_eq!(limiter.current_usage(), 0); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.