diff --git a/icechunk-python/python/icechunk/_icechunk_python.pyi b/icechunk-python/python/icechunk/_icechunk_python.pyi index f713dee54..02975c6ff 100644 --- a/icechunk-python/python/icechunk/_icechunk_python.pyi +++ b/icechunk-python/python/icechunk/_icechunk_python.pyi @@ -964,10 +964,8 @@ class PyRepository: def save_config(self) -> None: ... def config(self) -> RepositoryConfig: ... def storage(self) -> Storage: ... - def set_default_commit_metadata( - self, metadata: dict[str, Any] | None = None - ) -> None: ... - def default_commit_metadata(self) -> dict[str, Any] | None: ... + def set_default_commit_metadata(self, metadata: dict[str, Any]) -> None: ... + def default_commit_metadata(self) -> dict[str, Any]: ... def async_ancestry( self, *, diff --git a/icechunk-python/python/icechunk/repository.py b/icechunk-python/python/icechunk/repository.py index 01e9e06d6..7cb95cb18 100644 --- a/icechunk-python/python/icechunk/repository.py +++ b/icechunk-python/python/icechunk/repository.py @@ -216,7 +216,7 @@ def storage(self) -> Storage: """ return self._repository.storage() - def set_default_commit_metadata(self, metadata: dict[str, Any] | None = None) -> None: + def set_default_commit_metadata(self, metadata: dict[str, Any]) -> None: """ Set the default commit metadata for the repository. This is useful for providing addition static system conexted metadata to all commits. @@ -230,18 +230,18 @@ def set_default_commit_metadata(self, metadata: dict[str, Any] | None = None) -> Parameters ---------- - metadata : dict[str, Any], optional - The default commit metadata. + metadata : dict[str, Any] + The default commit metadata. Pass an empty dict to clear the default metadata. """ return self._repository.set_default_commit_metadata(metadata) - def default_commit_metadata(self) -> dict[str, Any] | None: + def default_commit_metadata(self) -> dict[str, Any]: """ Get the current configured default commit metadata for the repository. Returns ------- - dict[str, Any] | None + dict[str, Any] The default commit metadata. """ return self._repository.default_commit_metadata() diff --git a/icechunk-python/src/repository.rs b/icechunk-python/src/repository.rs index d45547198..441962d2d 100644 --- a/icechunk-python/src/repository.rs +++ b/icechunk-python/src/repository.rs @@ -362,7 +362,7 @@ impl PyGCSummary { } #[pyclass] -pub struct PyRepository(Arc); +pub struct PyRepository(Arc>); #[pymethods] /// Most functions in this class call `Runtime.block_on` so they need to `allow_threads` so other @@ -390,7 +390,7 @@ impl PyRepository { .map_err(PyIcechunkStoreError::RepositoryError) })?; - Ok(Self(Arc::new(repository))) + Ok(Self(Arc::new(RwLock::new(repository)))) }) } @@ -416,7 +416,7 @@ impl PyRepository { .map_err(PyIcechunkStoreError::RepositoryError) })?; - Ok(Self(Arc::new(repository))) + Ok(Self(Arc::new(RwLock::new(repository)))) }) } @@ -444,7 +444,7 @@ impl PyRepository { ) })?; - Ok(Self(Arc::new(repository))) + Ok(Self(Arc::new(RwLock::new(repository)))) }) } @@ -474,14 +474,15 @@ impl PyRepository { virtual_chunk_credentials: Option>>, ) -> PyResult { py.allow_threads(move || { - Ok(Self(Arc::new( + Ok(Self(Arc::new(RwLock::new( self.0 + .blocking_read() .reopen( config.map(|c| c.into()), virtual_chunk_credentials.map(map_credentials), ) .map_err(PyIcechunkStoreError::RepositoryError)?, - ))) + )))) }) } @@ -495,15 +496,18 @@ impl PyRepository { py.allow_threads(move || { let repository = Repository::from_bytes(bytes) .map_err(PyIcechunkStoreError::RepositoryError)?; - Ok(Self(Arc::new(repository))) + Ok(Self(Arc::new(RwLock::new(repository)))) }) } fn as_bytes(&self, py: Python<'_>) -> PyResult> { // This is a compute intensive task, we need to release the Gil py.allow_threads(move || { - let bytes = - self.0.as_bytes().map_err(PyIcechunkStoreError::RepositoryError)?; + let bytes = self + .0 + .blocking_read() + .as_bytes() + .map_err(PyIcechunkStoreError::RepositoryError)?; Ok(Cow::Owned(bytes)) }) } @@ -530,6 +534,8 @@ impl PyRepository { pyo3_async_runtimes::tokio::get_runtime().block_on(async move { let _etag = self .0 + .read() + .await .save_config() .await .map_err(PyIcechunkStoreError::RepositoryError)?; @@ -539,33 +545,32 @@ impl PyRepository { } pub fn config(&self) -> PyRepositoryConfig { - self.0.config().clone().into() + self.0.blocking_read().config().clone().into() } pub fn storage_settings(&self) -> PyStorageSettings { - self.0.storage_settings().clone().into() + self.0.blocking_read().storage_settings().clone().into() } pub fn storage(&self) -> PyStorage { - PyStorage(Arc::clone(self.0.storage())) + PyStorage(Arc::clone(self.0.blocking_read().storage())) } - #[pyo3(signature = (metadata))] pub fn set_default_commit_metadata( &self, - metadata: Option, - ) -> PyResult<()> { - let metadata = metadata.map(|m| m.into()); - pyo3_async_runtimes::tokio::get_runtime().block_on(async move { - self.0.set_default_commit_metadata(metadata).await; - Ok(()) + py: Python<'_>, + metadata: PySnapshotProperties, + ) { + py.allow_threads(move || { + let metadata = metadata.into(); + self.0.blocking_write().set_default_commit_metadata(metadata); }) } - pub fn default_commit_metadata(&self) -> PyResult> { - pyo3_async_runtimes::tokio::get_runtime().block_on(async move { - let metadata = self.0.default_commit_metadata().await; - Ok(metadata.map(|m| m.into())) + pub fn default_commit_metadata(&self, py: Python<'_>) -> PySnapshotProperties { + py.allow_threads(move || { + let metadata = self.0.blocking_read().default_commit_metadata().clone(); + metadata.into() }) } @@ -578,12 +583,20 @@ impl PyRepository { tag: Option, snapshot_id: Option, ) -> PyResult { - let repo = Arc::clone(&self.0); // This function calls block_on, so we need to allow other thread python to make progress py.allow_threads(move || { let version = args_to_version_info(branch, tag, snapshot_id, None)?; let ancestry = pyo3_async_runtimes::tokio::get_runtime() - .block_on(async move { repo.ancestry_arc(&version).await }) + .block_on(async move { + let (snapshot_id, asset_manager) = { + let lock = self.0.read().await; + ( + lock.resolve_version(&version).await?, + Arc::clone(lock.asset_manager()), + ) + }; + asset_manager.snapshot_ancestry(&snapshot_id).await + }) .map_err(PyIcechunkStoreError::RepositoryError)? .map_err(PyIcechunkStoreError::RepositoryError); @@ -615,6 +628,8 @@ impl PyRepository { pyo3_async_runtimes::tokio::get_runtime().block_on(async move { self.0 + .read() + .await .create_branch(branch_name, &snapshot_id) .await .map_err(PyIcechunkStoreError::RepositoryError)?; @@ -629,6 +644,8 @@ impl PyRepository { pyo3_async_runtimes::tokio::get_runtime().block_on(async move { let branches = self .0 + .read() + .await .list_branches() .await .map_err(PyIcechunkStoreError::RepositoryError)?; @@ -643,6 +660,8 @@ impl PyRepository { pyo3_async_runtimes::tokio::get_runtime().block_on(async move { let tip = self .0 + .read() + .await .lookup_branch(branch_name) .await .map_err(PyIcechunkStoreError::RepositoryError)?; @@ -667,6 +686,8 @@ impl PyRepository { pyo3_async_runtimes::tokio::get_runtime().block_on(async move { self.0 + .read() + .await .reset_branch(branch_name, &snapshot_id) .await .map_err(PyIcechunkStoreError::RepositoryError)?; @@ -680,6 +701,8 @@ impl PyRepository { py.allow_threads(move || { pyo3_async_runtimes::tokio::get_runtime().block_on(async move { self.0 + .read() + .await .delete_branch(branch) .await .map_err(PyIcechunkStoreError::RepositoryError)?; @@ -693,6 +716,8 @@ impl PyRepository { py.allow_threads(move || { pyo3_async_runtimes::tokio::get_runtime().block_on(async move { self.0 + .read() + .await .delete_tag(tag) .await .map_err(PyIcechunkStoreError::RepositoryError)?; @@ -717,6 +742,8 @@ impl PyRepository { pyo3_async_runtimes::tokio::get_runtime().block_on(async move { self.0 + .read() + .await .create_tag(tag_name, &snapshot_id) .await .map_err(PyIcechunkStoreError::RepositoryError)?; @@ -731,6 +758,8 @@ impl PyRepository { pyo3_async_runtimes::tokio::get_runtime().block_on(async move { let tags = self .0 + .read() + .await .list_tags() .await .map_err(PyIcechunkStoreError::RepositoryError)?; @@ -745,6 +774,8 @@ impl PyRepository { pyo3_async_runtimes::tokio::get_runtime().block_on(async move { let tag = self .0 + .read() + .await .lookup_tag(tag) .await .map_err(PyIcechunkStoreError::RepositoryError)?; @@ -773,6 +804,8 @@ impl PyRepository { pyo3_async_runtimes::tokio::get_runtime().block_on(async move { let diff = self .0 + .read() + .await .diff(&from, &to) .await .map_err(PyIcechunkStoreError::SessionError)?; @@ -796,6 +829,8 @@ impl PyRepository { let session = pyo3_async_runtimes::tokio::get_runtime().block_on(async move { self.0 + .read() + .await .readonly_session(&version) .await .map_err(PyIcechunkStoreError::RepositoryError) @@ -811,6 +846,8 @@ impl PyRepository { let session = pyo3_async_runtimes::tokio::get_runtime().block_on(async move { self.0 + .read() + .await .writable_session(branch) .await .map_err(PyIcechunkStoreError::RepositoryError) @@ -832,10 +869,19 @@ impl PyRepository { py.allow_threads(move || { let result = pyo3_async_runtimes::tokio::get_runtime().block_on(async move { + let (storage, storage_settings, asset_manager) = { + let lock = self.0.read().await; + ( + Arc::clone(lock.storage()), + lock.storage_settings().clone(), + Arc::clone(lock.asset_manager()), + ) + }; + let result = expire( - self.0.storage().as_ref(), - self.0.storage_settings(), - self.0.asset_manager().clone(), + storage.as_ref(), + &storage_settings, + asset_manager, older_than, if delete_expired_branches { ExpiredRefAction::Delete @@ -877,10 +923,18 @@ impl PyRepository { delete_object_older_than, Default::default(), ); + let (storage, storage_settings, asset_manager) = { + let lock = self.0.read().await; + ( + Arc::clone(lock.storage()), + lock.storage_settings().clone(), + Arc::clone(lock.asset_manager()), + ) + }; let result = garbage_collect( - self.0.storage().as_ref(), - self.0.storage_settings(), - self.0.asset_manager().clone(), + storage.as_ref(), + &storage_settings, + asset_manager, &gc_config, ) .await @@ -897,10 +951,18 @@ impl PyRepository { py.allow_threads(move || { let result = pyo3_async_runtimes::tokio::get_runtime().block_on(async move { + let (storage, storage_settings, asset_manager) = { + let lock = self.0.read().await; + ( + Arc::clone(lock.storage()), + lock.storage_settings().clone(), + Arc::clone(lock.asset_manager()), + ) + }; let result = repo_chunks_storage( - self.0.storage().as_ref(), - self.0.storage_settings(), - self.0.asset_manager().clone(), + storage.as_ref(), + &storage_settings, + asset_manager, ) .await .map_err(PyIcechunkStoreError::RepositoryError)?; diff --git a/icechunk-python/tests/test_pickle.py b/icechunk-python/tests/test_pickle.py index 1a71bd92b..239bd6e12 100644 --- a/icechunk-python/tests/test_pickle.py +++ b/icechunk-python/tests/test_pickle.py @@ -15,7 +15,9 @@ def create_local_repo(path: str) -> Repository: - return Repository.create(storage=local_filesystem_storage(path)) + repo = Repository.create(storage=local_filesystem_storage(path)) + repo.set_default_commit_metadata({"author": "test"}) + return repo @pytest.fixture(scope="function") @@ -29,6 +31,8 @@ def test_pickle_repository(tmpdir: Path, tmp_repo: Repository) -> None: pickled = pickle.dumps(tmp_repo) roundtripped = pickle.loads(pickled) assert tmp_repo.list_branches() == roundtripped.list_branches() + assert tmp_repo.default_commit_metadata() == roundtripped.default_commit_metadata() + assert tmp_repo.default_commit_metadata() == {"author": "test"} storage = tmp_repo.storage assert ( diff --git a/icechunk/src/repository.rs b/icechunk/src/repository.rs index d52d5de2b..039a82624 100644 --- a/icechunk/src/repository.rs +++ b/icechunk/src/repository.rs @@ -16,7 +16,6 @@ use futures::{ use regex::bytes::Regex; use serde::{Deserialize, Serialize}; use thiserror::Error; -use tokio::sync::RwLock; use tokio::task::JoinError; use tracing::{Instrument, debug, error, instrument, trace}; @@ -60,7 +59,6 @@ pub enum RepositoryErrorKind { FormatError(IcechunkFormatErrorKind), #[error(transparent)] Ref(RefErrorKind), - #[error("snapshot not found: `{id}`")] SnapshotNotFound { id: SnapshotId }, #[error("branch {branch} does not have a snapshots before or at {at}")] @@ -139,8 +137,7 @@ pub struct Repository { asset_manager: Arc, virtual_resolver: Arc, virtual_chunk_credentials: HashMap, - #[serde(skip)] - default_commit_metadata: Arc>>, + default_commit_metadata: SnapshotProperties, } impl Repository { @@ -312,7 +309,7 @@ impl Repository { virtual_resolver, asset_manager, virtual_chunk_credentials, - default_commit_metadata: Arc::new(RwLock::new(None)), + default_commit_metadata: SnapshotProperties::default(), }) } @@ -382,17 +379,13 @@ impl Repository { } #[instrument(skip_all)] - pub async fn set_default_commit_metadata( - &self, - metadata: Option, - ) { - let mut guard = self.default_commit_metadata.write().await; - *guard = metadata; + pub fn set_default_commit_metadata(&mut self, metadata: SnapshotProperties) { + self.default_commit_metadata = metadata; } #[instrument(skip_all)] - pub async fn default_commit_metadata(&self) -> Option { - self.default_commit_metadata.read().await.clone() + pub fn default_commit_metadata(&self) -> &SnapshotProperties { + &self.default_commit_metadata } #[instrument(skip(storage, config))] @@ -629,7 +622,7 @@ impl Repository { } #[instrument(skip(self))] - async fn resolve_version( + pub async fn resolve_version( &self, version: &VersionInfo, ) -> RepositoryResult { @@ -774,7 +767,7 @@ impl Repository { self.virtual_resolver.clone(), branch.to_string(), ref_data.snapshot.clone(), - self.default_commit_metadata.read().await.clone(), + self.default_commit_metadata.clone(), ); self.preload_manifests(ref_data.snapshot); diff --git a/icechunk/src/session.rs b/icechunk/src/session.rs index 2364797bc..ce5293180 100644 --- a/icechunk/src/session.rs +++ b/icechunk/src/session.rs @@ -166,7 +166,7 @@ pub struct Session { branch_name: Option, snapshot_id: SnapshotId, change_set: ChangeSet, - default_commit_metadata: Option, + default_commit_metadata: SnapshotProperties, } impl Session { @@ -187,7 +187,7 @@ impl Session { branch_name: None, snapshot_id, change_set: ChangeSet::default(), - default_commit_metadata: None, + default_commit_metadata: SnapshotProperties::default(), } } @@ -200,7 +200,7 @@ impl Session { virtual_resolver: Arc, branch_name: String, snapshot_id: SnapshotId, - default_commit_metadata: Option, + default_commit_metadata: SnapshotProperties, ) -> Self { Self { config, @@ -811,16 +811,14 @@ impl Session { return Err(SessionErrorKind::ReadOnlySession.into()); }; - let properties = match (properties, self.default_commit_metadata.as_ref()) { - (Some(p), None) => Some(p), - (None, Some(d)) => Some(d.clone()), - (Some(p), Some(d)) => { - let mut merged = d.clone(); + let default_metadata = self.default_commit_metadata.clone(); + let properties = properties + .map(|p| { + let mut merged = default_metadata.clone(); merged.extend(p.into_iter()); - Some(merged) - } - (None, None) => None, - }; + merged + }) + .unwrap_or(default_metadata); let current = fetch_branch_tip( self.storage.as_ref(), @@ -839,7 +837,7 @@ impl Session { &self.snapshot_id, &self.change_set, message, - properties, + Some(properties), ) .await } @@ -861,7 +859,7 @@ impl Session { &self.snapshot_id, &self.change_set, message, - properties, + Some(properties), ) .await } @@ -1969,7 +1967,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_repository_with_default_commit_metadata() -> Result<(), Box> { - let repo = create_memory_store_repository().await; + let mut repo = create_memory_store_repository().await; let mut ds = repo.writable_session("main").await?; ds.add_group(Path::root(), Bytes::new()).await?; let snapshot = ds.commit("commit", None).await?; @@ -1983,7 +1981,7 @@ mod tests { let mut default_metadata = SnapshotProperties::default(); default_metadata.insert("author".to_string(), "John Doe".to_string().into()); default_metadata.insert("project".to_string(), "My Project".to_string().into()); - repo.set_default_commit_metadata(Some(default_metadata.clone())).await; + repo.set_default_commit_metadata(default_metadata.clone()); let mut ds = repo.writable_session("main").await?; ds.add_group("/group".try_into().unwrap(), Bytes::new()).await?;