diff --git a/Cargo.lock b/Cargo.lock index c403d039c..9b6c0e85e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1374,8 +1374,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -1735,6 +1737,7 @@ dependencies = [ "err-into", "flatbuffers", "futures", + "getrandom 0.2.15", "icechunk-macros", "itertools", "object_store", diff --git a/icechunk/Cargo.toml b/icechunk/Cargo.toml index 0c12f36d8..714ef0316 100644 --- a/icechunk/Cargo.toml +++ b/icechunk/Cargo.toml @@ -17,21 +17,13 @@ async-trait = "0.1.88" bytes = { version = "1.10.1", features = ["serde"] } base64 = "0.22.1" futures = "0.3.31" +getrandom = {version = "0.2.15", features = ["js"]} itertools = "0.14.0" -object_store = { version = "0.12.3", features = [ - "aws", - "gcp", - "azure", - "http", -] } rand = "0.9.2" thiserror = "2.0.12" serde_json = "1.0.142" serde = { version = "1.0.219", features = ["derive", "rc"] } serde_with = { version = "3.14.0", features = ["hex"] } -tokio = { version = "1.47.1", features = ["rt-multi-thread", "macros"] } -test-strategy = "0.4.3" -proptest = "1.7.0" quick_cache = "0.6.16" base32 = "0.5.1" chrono = { version = "0.4.41", features = ["serde"] } @@ -40,17 +32,9 @@ rmp-serde = "1.3.0" url = "2.5.4" async-stream = "0.3.6" rmpv = { version = "1.3.0", features = ["serde", "with-serde"] } -aws-sdk-s3 = "=1.78.0" -aws-config = "=1.5.18" -aws-credential-types = "1.2.4" typed-path = "0.11.0" -aws-smithy-types-convert = { version = "0.60.9", features = [ - "convert-chrono", - "convert-streams", -] } typetag = "0.2.20" zstd = "0.13.3" -tokio-util = { version = "0.7.16", features = ["compat", "io-util"] } serde_bytes = "0.11.17" regex = "1.11.1" tracing-error = "0.2.1" @@ -67,9 +51,31 @@ dirs = { version = "6.0.0", optional = true } assert_fs = { version = "1.1.3", optional = true } flatbuffers = "25.2.10" +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +aws-sdk-s3 = "=1.78.0" +aws-config = "=1.5.18" +aws-credential-types = "1.2.4" +aws-smithy-types-convert = { version = "0.60.9", features = [ + "convert-chrono", + "convert-streams", +] } +object_store = { version = "0.12.3", features = [ + "aws", + "gcp", + "azure", + "http", +] } +tokio = { version = "1.47.1", features = ["rt-multi-thread", "macros"] } +tokio-util = { version = "0.7.16", features = ["compat", "io-util"] } + +[target.'cfg(target_arch = "wasm32")'.dependencies] +tokio = { version = "1.47.1", features = ["io-util", "rt", "macros", "sync"] } + [dev-dependencies] icechunk-macros = { path = "../icechunk-macros", version = "0.1.0" } pretty_assertions = "1.4.1" +test-strategy = "0.4.3" +proptest = "1.7.0" proptest-state-machine = "0.4.0" tempfile = "3.20.0" test-log = { version = "0.2.18", default-features = false, features = [ diff --git a/icechunk/src/config.rs b/icechunk/src/config.rs index 89e3d00f2..61eba8376 100644 --- a/icechunk/src/config.rs +++ b/icechunk/src/config.rs @@ -9,6 +9,7 @@ use std::{ use async_trait::async_trait; use chrono::{DateTime, Utc}; use itertools::Either; +#[cfg(not(target_arch = "wasm32"))] pub use object_store::gcp::GcpCredential; use regex::bytes::Regex; use serde::{Deserialize, Serialize}; @@ -519,6 +520,7 @@ pub struct GcsBearerCredential { pub expires_after: Option>, } +#[cfg(not(target_arch = "wasm32"))] impl From<&GcsBearerCredential> for GcpCredential { fn from(value: &GcsBearerCredential) -> Self { GcpCredential { bearer: value.bearer.clone() } diff --git a/icechunk/src/lib.rs b/icechunk/src/lib.rs index 31aef15f6..0d58be597 100644 --- a/icechunk/src/lib.rs +++ b/icechunk/src/lib.rs @@ -38,10 +38,10 @@ pub mod virtual_chunks; pub use config::{ObjectStoreConfig, RepositoryConfig}; pub use repository::Repository; -pub use storage::{ - ObjectStorage, Storage, StorageError, new_in_memory_storage, - new_local_filesystem_storage, new_s3_storage, -}; +pub use storage::{Storage, StorageError, new_in_memory_storage}; + +#[cfg(not(target_arch = "wasm32"))] +pub use storage::{ObjectStorage, new_local_filesystem_storage, new_s3_storage}; pub use store::Store; mod private { diff --git a/icechunk/src/storage/backends/mod.rs b/icechunk/src/storage/backends/mod.rs new file mode 100644 index 000000000..ed9a59f17 --- /dev/null +++ b/icechunk/src/storage/backends/mod.rs @@ -0,0 +1,10 @@ +#[cfg(not(target_arch = "wasm32"))] +pub mod object_store; +#[cfg(not(target_arch = "wasm32"))] +pub mod s3; + +// Re-export implementations conditionally +#[cfg(not(target_arch = "wasm32"))] +pub use object_store::ObjectStorage; +#[cfg(not(target_arch = "wasm32"))] +pub use s3::S3Storage; diff --git a/icechunk/src/storage/object_store.rs b/icechunk/src/storage/backends/object_store.rs similarity index 99% rename from icechunk/src/storage/object_store.rs rename to icechunk/src/storage/backends/object_store.rs index 7a5033979..7e3d6a231 100644 --- a/icechunk/src/storage/object_store.rs +++ b/icechunk/src/storage/backends/object_store.rs @@ -43,7 +43,7 @@ use tokio::{ use tokio_util::compat::FuturesAsyncReadCompatExt; use tracing::instrument; -use super::{ +use crate::storage::{ CHUNK_PREFIX, CONFIG_PATH, ConcurrencySettings, DeleteObjectsResult, ETag, FetchConfigResult, Generation, GetRefResult, ListInfo, MANIFEST_PREFIX, REF_PREFIX, Reader, RetriesSettings, SNAPSHOT_PREFIX, Settings, Storage, StorageError, diff --git a/icechunk/src/storage/s3.rs b/icechunk/src/storage/backends/s3.rs similarity index 99% rename from icechunk/src/storage/s3.rs rename to icechunk/src/storage/backends/s3.rs index 86fe0c20d..9f6f3cc75 100644 --- a/icechunk/src/storage/s3.rs +++ b/icechunk/src/storage/backends/s3.rs @@ -8,10 +8,10 @@ use std::{ }; use crate::{ - Storage, StorageError, config::{S3Credentials, S3CredentialsFetcher, S3Options}, format::{ChunkId, ChunkOffset, FileTypeTag, ManifestId, ObjectId, SnapshotId}, private, + storage::{Storage, StorageError}, }; use async_trait::async_trait; use aws_config::{ @@ -42,7 +42,7 @@ use serde::{Deserialize, Serialize}; use tokio::{io::AsyncRead, sync::OnceCell}; use tracing::{error, instrument}; -use super::{ +use crate::storage::{ CHUNK_PREFIX, CONFIG_PATH, DeleteObjectsResult, FetchConfigResult, GetRefResult, ListInfo, MANIFEST_PREFIX, REF_PREFIX, Reader, SNAPSHOT_PREFIX, Settings, StorageErrorKind, StorageResult, TRANSACTION_PREFIX, UpdateConfigResult, VersionInfo, diff --git a/icechunk/src/storage/errors.rs b/icechunk/src/storage/errors.rs new file mode 100644 index 000000000..256ac4c2d --- /dev/null +++ b/icechunk/src/storage/errors.rs @@ -0,0 +1,79 @@ +use std::ffi::OsString; +use thiserror::Error; + +use crate::error::ICError; + +#[cfg(not(target_arch = "wasm32"))] +use aws_sdk_s3::{ + config::http::HttpResponse, + error::SdkError, + operation::{ + complete_multipart_upload::CompleteMultipartUploadError, + create_multipart_upload::CreateMultipartUploadError, + delete_objects::DeleteObjectsError, get_object::GetObjectError, + head_object::HeadObjectError, list_objects_v2::ListObjectsV2Error, + put_object::PutObjectError, upload_part::UploadPartError, + }, + primitives::ByteStreamError, +}; + +#[derive(Debug, Error)] +pub enum StorageErrorKind { + #[cfg(not(target_arch = "wasm32"))] + #[error("object store error {0}")] + ObjectStore(#[from] Box<::object_store::Error>), + #[error("bad object store prefix {0:?}")] + BadPrefix(OsString), + #[cfg(not(target_arch = "wasm32"))] + #[error("error getting object from object store {0}")] + S3GetObjectError(#[from] Box>), + #[cfg(not(target_arch = "wasm32"))] + #[error("error writing object to object store {0}")] + S3PutObjectError(#[from] Box>), + #[cfg(not(target_arch = "wasm32"))] + #[error("error creating multipart upload {0}")] + S3CreateMultipartUploadError( + #[from] Box>, + ), + #[cfg(not(target_arch = "wasm32"))] + #[error("error uploading multipart part {0}")] + S3UploadPartError(#[from] Box>), + #[cfg(not(target_arch = "wasm32"))] + #[error("error completing multipart upload {0}")] + S3CompleteMultipartUploadError( + #[from] Box>, + ), + #[cfg(not(target_arch = "wasm32"))] + #[error("error getting object metadata from object store {0}")] + S3HeadObjectError(#[from] Box>), + #[cfg(not(target_arch = "wasm32"))] + #[error("error listing objects in object store {0}")] + S3ListObjectError(#[from] Box>), + #[cfg(not(target_arch = "wasm32"))] + #[error("error deleting objects in object store {0}")] + S3DeleteObjectError(#[from] Box>), + #[cfg(not(target_arch = "wasm32"))] + #[error("error streaming bytes from object store {0}")] + S3StreamError(#[from] Box), + #[error("I/O error: {0}")] + IOError(#[from] std::io::Error), + #[error("storage configuration error: {0}")] + R2ConfigurationError(String), + #[error("storage error: {0}")] + Other(String), +} + +pub type StorageError = ICError; + +// it would be great to define this impl in error.rs, but it conflicts with the blanket +// `impl From for T` +impl From for StorageError +where + E: Into, +{ + fn from(value: E) -> Self { + Self::new(value.into()) + } +} + +pub type StorageResult = Result; diff --git a/icechunk/src/storage/mod.rs b/icechunk/src/storage/mod.rs index 1c82625c3..3d86e905f 100644 --- a/icechunk/src/storage/mod.rs +++ b/icechunk/src/storage/mod.rs @@ -1,756 +1,35 @@ -use ::object_store::{azure::AzureConfigKey, gcp::GoogleConfigKey}; -use aws_sdk_s3::{ - config::http::HttpResponse, - error::SdkError, - operation::{ - complete_multipart_upload::CompleteMultipartUploadError, - create_multipart_upload::CreateMultipartUploadError, - delete_objects::DeleteObjectsError, get_object::GetObjectError, - head_object::HeadObjectError, list_objects_v2::ListObjectsV2Error, - put_object::PutObjectError, upload_part::UploadPartError, - }, - primitives::ByteStreamError, -}; -use chrono::{DateTime, Utc}; -use core::fmt; -use futures::{ - Stream, StreamExt, TryStreamExt, - stream::{BoxStream, FuturesOrdered}, -}; +use futures::{Stream, StreamExt, TryStreamExt, stream::BoxStream}; use itertools::Itertools; -use s3::S3Storage; -use serde::{Deserialize, Serialize}; use std::{ cmp::{max, min}, - collections::HashMap, - ffi::OsString, - io::Read, iter, - num::{NonZeroU16, NonZeroU64}, ops::Range, - path::Path, - sync::{Arc, Mutex, OnceLock}, -}; -use tokio::io::AsyncRead; -use tokio_util::io::SyncIoBridge; -use tracing::{instrument, warn}; - -use async_trait::async_trait; -use bytes::{Buf, Bytes}; -use thiserror::Error; - -#[cfg(test)] -pub mod logging; - -pub mod object_store; -pub mod s3; - -pub use object_store::ObjectStorage; - -use crate::{ - config::{AzureCredentials, GcsCredentials, S3Credentials, S3Options}, - error::ICError, - format::{ChunkId, ChunkOffset, ManifestId, SnapshotId}, - private, + sync::Arc, }; -#[derive(Debug, Error)] -pub enum StorageErrorKind { - #[error("object store error {0}")] - ObjectStore(#[from] Box<::object_store::Error>), - #[error("bad object store prefix {0:?}")] - BadPrefix(OsString), - #[error("error getting object from object store {0}")] - S3GetObjectError(#[from] Box>), - #[error("error writing object to object store {0}")] - S3PutObjectError(#[from] Box>), - #[error("error creating multipart upload {0}")] - S3CreateMultipartUploadError( - #[from] Box>, - ), - #[error("error uploading multipart part {0}")] - S3UploadPartError(#[from] Box>), - #[error("error completing multipart upload {0}")] - S3CompleteMultipartUploadError( - #[from] Box>, - ), - #[error("error getting object metadata from object store {0}")] - S3HeadObjectError(#[from] Box>), - #[error("error listing objects in object store {0}")] - S3ListObjectError(#[from] Box>), - #[error("error deleting objects in object store {0}")] - S3DeleteObjectError(#[from] Box>), - #[error("error streaming bytes from object store {0}")] - S3StreamError(#[from] Box), - #[error("I/O error: {0}")] - IOError(#[from] std::io::Error), - #[error("storage configuration error: {0}")] - R2ConfigurationError(String), - #[error("storage error: {0}")] - Other(String), -} -pub type StorageError = ICError; - -// it would be great to define this impl in error.rs, but it conflicts with the blanket -// `impl From for T` -impl From for StorageError -where - E: Into, -{ - fn from(value: E) -> Self { - Self::new(value.into()) - } -} - -pub type StorageResult = Result; - -#[derive(Debug)] -pub struct ListInfo { - pub id: Id, - pub created_at: DateTime, - pub size_bytes: u64, -} - -const SNAPSHOT_PREFIX: &str = "snapshots/"; -const MANIFEST_PREFIX: &str = "manifests/"; -// const ATTRIBUTES_PREFIX: &str = "attributes/"; -const CHUNK_PREFIX: &str = "chunks/"; -const REF_PREFIX: &str = "refs"; -const TRANSACTION_PREFIX: &str = "transactions/"; -const CONFIG_PATH: &str = "config.yaml"; - -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Hash, PartialOrd, Ord)] -pub struct ETag(pub String); -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Default)] -pub struct Generation(pub String); - -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] -pub struct VersionInfo { - pub etag: Option, - pub generation: Option, -} - -impl VersionInfo { - pub fn for_creation() -> Self { - Self { etag: None, generation: None } - } - - pub fn from_etag_only(etag: String) -> Self { - Self { etag: Some(ETag(etag)), generation: None } - } - - pub fn is_create(&self) -> bool { - self.etag.is_none() && self.generation.is_none() - } - - pub fn etag(&self) -> Option<&String> { - self.etag.as_ref().map(|e| &e.0) - } - - pub fn generation(&self) -> Option<&String> { - self.generation.as_ref().map(|e| &e.0) - } -} - -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Default)] -pub struct RetriesSettings { - pub max_tries: Option, - pub initial_backoff_ms: Option, - pub max_backoff_ms: Option, -} - -impl RetriesSettings { - pub fn max_tries(&self) -> NonZeroU16 { - self.max_tries.unwrap_or_else(|| NonZeroU16::new(10).unwrap_or(NonZeroU16::MIN)) - } - - pub fn initial_backoff_ms(&self) -> u32 { - self.initial_backoff_ms.unwrap_or(100) - } - - pub fn max_backoff_ms(&self) -> u32 { - self.max_backoff_ms.unwrap_or(3 * 60 * 1000) - } - - pub fn merge(&self, other: Self) -> Self { - Self { - max_tries: other.max_tries.or(self.max_tries), - initial_backoff_ms: other.initial_backoff_ms.or(self.initial_backoff_ms), - max_backoff_ms: other.max_backoff_ms.or(self.max_backoff_ms), - } - } -} - -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Default)] -pub struct ConcurrencySettings { - pub max_concurrent_requests_for_object: Option, - pub ideal_concurrent_request_size: Option, -} - -impl ConcurrencySettings { - // AWS recommendations: https://docs.aws.amazon.com/whitepapers/latest/s3-optimizing-performance-best-practices/horizontal-scaling-and-request-parallelization-for-high-throughput.html - // 8-16 MB requests - // 85-90 MB/s per request - // these numbers would saturate a 12.5 Gbps network - - pub fn max_concurrent_requests_for_object(&self) -> NonZeroU16 { - self.max_concurrent_requests_for_object - .unwrap_or_else(|| NonZeroU16::new(18).unwrap_or(NonZeroU16::MIN)) - } - pub fn ideal_concurrent_request_size(&self) -> NonZeroU64 { - self.ideal_concurrent_request_size.unwrap_or_else(|| { - NonZeroU64::new(12 * 1024 * 1024).unwrap_or(NonZeroU64::MIN) - }) - } - - pub fn merge(&self, other: Self) -> Self { - Self { - max_concurrent_requests_for_object: other - .max_concurrent_requests_for_object - .or(self.max_concurrent_requests_for_object), - ideal_concurrent_request_size: other - .ideal_concurrent_request_size - .or(self.ideal_concurrent_request_size), - } - } -} - -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Default)] -pub struct Settings { - pub concurrency: Option, - pub retries: Option, - pub unsafe_use_conditional_update: Option, - pub unsafe_use_conditional_create: Option, - pub unsafe_use_metadata: Option, - #[serde(default)] - pub storage_class: Option, - #[serde(default)] - pub metadata_storage_class: Option, - #[serde(default)] - pub chunks_storage_class: Option, - #[serde(default)] - pub minimum_size_for_multipart_upload: Option, -} - -static DEFAULT_CONCURRENCY: OnceLock = OnceLock::new(); -static DEFAULT_RETRIES: OnceLock = OnceLock::new(); - -impl Settings { - pub fn concurrency(&self) -> &ConcurrencySettings { - self.concurrency - .as_ref() - .unwrap_or_else(|| DEFAULT_CONCURRENCY.get_or_init(Default::default)) - } - - pub fn retries(&self) -> &RetriesSettings { - self.retries - .as_ref() - .unwrap_or_else(|| DEFAULT_RETRIES.get_or_init(Default::default)) - } - - pub fn unsafe_use_conditional_create(&self) -> bool { - self.unsafe_use_conditional_create.unwrap_or(true) - } - - pub fn unsafe_use_conditional_update(&self) -> bool { - self.unsafe_use_conditional_update.unwrap_or(true) - } - - pub fn unsafe_use_metadata(&self) -> bool { - self.unsafe_use_metadata.unwrap_or(true) - } - - pub fn metadata_storage_class(&self) -> Option<&String> { - self.metadata_storage_class.as_ref().or(self.storage_class.as_ref()) - } - - pub fn chunks_storage_class(&self) -> Option<&String> { - self.chunks_storage_class.as_ref().or(self.storage_class.as_ref()) - } - - pub fn minimum_size_for_multipart_upload(&self) -> u64 { - // per AWS recommendation: 100 MB - self.minimum_size_for_multipart_upload.unwrap_or(100 * 1024 * 1024) - } - - pub fn merge(&self, other: Self) -> Self { - Self { - concurrency: match (&self.concurrency, other.concurrency) { - (None, None) => None, - (None, Some(c)) => Some(c), - (Some(c), None) => Some(c.clone()), - (Some(mine), Some(theirs)) => Some(mine.merge(theirs)), - }, - retries: match (&self.retries, other.retries) { - (None, None) => None, - (None, Some(c)) => Some(c), - (Some(c), None) => Some(c.clone()), - (Some(mine), Some(theirs)) => Some(mine.merge(theirs)), - }, - unsafe_use_conditional_create: match ( - &self.unsafe_use_conditional_create, - other.unsafe_use_conditional_create, - ) { - (None, None) => None, - (None, Some(c)) => Some(c), - (Some(c), None) => Some(*c), - (Some(_), Some(theirs)) => Some(theirs), - }, - unsafe_use_conditional_update: match ( - &self.unsafe_use_conditional_update, - other.unsafe_use_conditional_update, - ) { - (None, None) => None, - (None, Some(c)) => Some(c), - (Some(c), None) => Some(*c), - (Some(_), Some(theirs)) => Some(theirs), - }, - unsafe_use_metadata: match ( - &self.unsafe_use_metadata, - other.unsafe_use_metadata, - ) { - (None, None) => None, - (None, Some(c)) => Some(c), - (Some(c), None) => Some(*c), - (Some(_), Some(theirs)) => Some(theirs), - }, - storage_class: match (&self.storage_class, other.storage_class) { - (None, None) => None, - (None, Some(c)) => Some(c), - (Some(c), None) => Some(c.clone()), - (Some(_), Some(theirs)) => Some(theirs), - }, - metadata_storage_class: match ( - &self.metadata_storage_class, - other.metadata_storage_class, - ) { - (None, None) => None, - (None, Some(c)) => Some(c), - (Some(c), None) => Some(c.clone()), - (Some(_), Some(theirs)) => Some(theirs), - }, - chunks_storage_class: match ( - &self.chunks_storage_class, - other.chunks_storage_class, - ) { - (None, None) => None, - (None, Some(c)) => Some(c), - (Some(c), None) => Some(c.clone()), - (Some(_), Some(theirs)) => Some(theirs), - }, - minimum_size_for_multipart_upload: match ( - &self.minimum_size_for_multipart_upload, - other.minimum_size_for_multipart_upload, - ) { - (None, None) => None, - (None, Some(c)) => Some(c), - (Some(c), None) => Some(*c), - (Some(_), Some(theirs)) => Some(theirs), - }, - } - } -} - -pub enum Reader { - Asynchronous(Box), - Synchronous(Box), -} - -impl Reader { - pub async fn to_bytes(self, expected_size: usize) -> StorageResult { - match self { - Reader::Asynchronous(mut read) => { - // add some extra space to the buffer to optimize conversion to bytes - let mut buffer = Vec::with_capacity(expected_size + 16); - tokio::io::copy(&mut read, &mut buffer) - .await - .map_err(StorageErrorKind::IOError)?; - Ok(buffer.into()) - } - Reader::Synchronous(mut buf) => Ok(buf.copy_to_bytes(buf.remaining())), - } - } - - /// Notice this Read can only be used in non async contexts, for example, calling tokio::task::spawn_blocking - pub fn into_read(self) -> Box { - match self { - Reader::Asynchronous(read) => Box::new(SyncIoBridge::new(read)), - Reader::Synchronous(buf) => Box::new(buf.reader()), - } - } -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum FetchConfigResult { - Found { bytes: Bytes, version: VersionInfo }, - NotFound, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum UpdateConfigResult { - Updated { new_version: VersionInfo }, - NotOnLatestVersion, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum GetRefResult { - Found { bytes: Bytes, version: VersionInfo }, - NotFound, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum WriteRefResult { - Written, - WontOverwrite, -} - -#[derive(Debug, Clone, PartialEq, Eq, Default)] -pub struct DeleteObjectsResult { - pub deleted_objects: u64, - pub deleted_bytes: u64, -} - -impl DeleteObjectsResult { - pub fn merge(&mut self, other: &Self) { - self.deleted_objects += other.deleted_objects; - self.deleted_bytes += other.deleted_bytes; - } -} - -/// Fetch and write the parquet files that represent the repository in object store -/// -/// Different implementation can cache the files differently, or not at all. -/// Implementations are free to assume files are never overwritten. -#[async_trait] -#[typetag::serde(tag = "type")] -pub trait Storage: fmt::Debug + fmt::Display + private::Sealed + Sync + Send { - fn default_settings(&self) -> Settings { - Default::default() - } - - fn can_write(&self) -> bool; - - async fn fetch_config(&self, settings: &Settings) - -> StorageResult; - async fn update_config( - &self, - settings: &Settings, - config: Bytes, - previous_version: &VersionInfo, - ) -> StorageResult; - async fn fetch_snapshot( - &self, - settings: &Settings, - id: &SnapshotId, - ) -> StorageResult>; - /// Returns whatever reader is more efficient. - /// - /// For example, if processed with multiple requests, it will return a synchronous `Buf` - /// instance pointing the different parts. If it was executed in a single request, it's more - /// efficient to return the network `AsyncRead` directly - async fn fetch_manifest_known_size( - &self, - settings: &Settings, - id: &ManifestId, - size: u64, - ) -> StorageResult; - async fn fetch_manifest_unknown_size( - &self, - settings: &Settings, - id: &ManifestId, - ) -> StorageResult>; - async fn fetch_chunk( - &self, - settings: &Settings, - id: &ChunkId, - range: &Range, - ) -> StorageResult; // FIXME: format flags - async fn fetch_transaction_log( - &self, - settings: &Settings, - id: &SnapshotId, - ) -> StorageResult>; - - async fn write_snapshot( - &self, - settings: &Settings, - id: SnapshotId, - metadata: Vec<(String, String)>, - bytes: Bytes, - ) -> StorageResult<()>; - async fn write_manifest( - &self, - settings: &Settings, - id: ManifestId, - metadata: Vec<(String, String)>, - bytes: Bytes, - ) -> StorageResult<()>; - async fn write_chunk( - &self, - settings: &Settings, - id: ChunkId, - bytes: Bytes, - ) -> StorageResult<()>; - async fn write_transaction_log( - &self, - settings: &Settings, - id: SnapshotId, - metadata: Vec<(String, String)>, - bytes: Bytes, - ) -> StorageResult<()>; +#[cfg(not(target_arch = "wasm32"))] +use crate::config::{AzureCredentials, GcsCredentials, S3Credentials, S3Options}; - async fn get_ref( - &self, - settings: &Settings, - ref_key: &str, - ) -> StorageResult; - async fn ref_names(&self, settings: &Settings) -> StorageResult>; - async fn write_ref( - &self, - settings: &Settings, - ref_key: &str, - bytes: Bytes, - previous_version: &VersionInfo, - ) -> StorageResult; +// Core modules - always available +pub mod errors; +pub mod traits; +pub mod types; - async fn list_objects<'a>( - &'a self, - settings: &Settings, - prefix: &str, - ) -> StorageResult>>>; +// Implementation modules - conditionally compiled +pub mod backends; - async fn delete_batch( - &self, - settings: &Settings, - prefix: &str, - batch: Vec<(String, u64)>, - ) -> StorageResult; - - /// Delete a stream of objects, by their id string representations - /// Input stream includes sizes to get as result the total number of bytes deleted - #[instrument(skip(self, settings, ids))] - async fn delete_objects( - &self, - settings: &Settings, - prefix: &str, - ids: BoxStream<'_, (String, u64)>, - ) -> StorageResult { - let res = Arc::new(Mutex::new(DeleteObjectsResult::default())); - ids.chunks(1_000) - // FIXME: configurable concurrency - .for_each_concurrent(10, |batch| { - let res = Arc::clone(&res); - async move { - let new_deletes = self - .delete_batch(settings, prefix, batch) - .await - .unwrap_or_else(|_| { - // FIXME: handle error instead of skipping - warn!("ignoring error in Storage::delete_batch"); - Default::default() - }); - #[allow(clippy::expect_used)] - res.lock().expect("Bug in delete objects").merge(&new_deletes); - } - }) - .await; - #[allow(clippy::expect_used)] - let res = res.lock().expect("Bug in delete objects"); - Ok(res.clone()) - } - - async fn get_snapshot_last_modified( - &self, - settings: &Settings, - snapshot: &SnapshotId, - ) -> StorageResult>; - - async fn root_is_clean(&self) -> StorageResult { - match self.list_objects(&Settings::default(), "").await?.next().await { - None => Ok(true), - Some(Ok(_)) => Ok(false), - Some(Err(err)) => Err(err), - } - } - - async fn list_chunks( - &self, - settings: &Settings, - ) -> StorageResult>>> { - Ok(translate_list_infos(self.list_objects(settings, CHUNK_PREFIX).await?)) - } - - async fn list_manifests( - &self, - settings: &Settings, - ) -> StorageResult>>> { - Ok(translate_list_infos(self.list_objects(settings, MANIFEST_PREFIX).await?)) - } - - async fn list_snapshots( - &self, - settings: &Settings, - ) -> StorageResult>>> { - Ok(translate_list_infos(self.list_objects(settings, SNAPSHOT_PREFIX).await?)) - } - - async fn list_transaction_logs( - &self, - settings: &Settings, - ) -> StorageResult>>> { - Ok(translate_list_infos(self.list_objects(settings, TRANSACTION_PREFIX).await?)) - } - - async fn delete_chunks( - &self, - settings: &Settings, - chunks: BoxStream<'_, (ChunkId, u64)>, - ) -> StorageResult { - self.delete_objects( - settings, - CHUNK_PREFIX, - chunks.map(|(id, size)| (id.to_string(), size)).boxed(), - ) - .await - } - - async fn delete_manifests( - &self, - settings: &Settings, - manifests: BoxStream<'_, (ManifestId, u64)>, - ) -> StorageResult { - self.delete_objects( - settings, - MANIFEST_PREFIX, - manifests.map(|(id, size)| (id.to_string(), size)).boxed(), - ) - .await - } - - async fn delete_snapshots( - &self, - settings: &Settings, - snapshots: BoxStream<'_, (SnapshotId, u64)>, - ) -> StorageResult { - self.delete_objects( - settings, - SNAPSHOT_PREFIX, - snapshots.map(|(id, size)| (id.to_string(), size)).boxed(), - ) - .await - } - - async fn delete_transaction_logs( - &self, - settings: &Settings, - transaction_logs: BoxStream<'_, (SnapshotId, u64)>, - ) -> StorageResult { - self.delete_objects( - settings, - TRANSACTION_PREFIX, - transaction_logs.map(|(id, size)| (id.to_string(), size)).boxed(), - ) - .await - } - - async fn delete_refs( - &self, - settings: &Settings, - refs: BoxStream<'_, String>, - ) -> StorageResult { - let refs = refs.map(|s| (s, 0)).boxed(); - Ok(self.delete_objects(settings, REF_PREFIX, refs).await?.deleted_objects) - } - - async fn get_object_range_buf( - &self, - settings: &Settings, - key: &str, - range: &Range, - ) -> StorageResult>; - - async fn get_object_range_read( - &self, - settings: &Settings, - key: &str, - range: &Range, - ) -> StorageResult>; - - async fn get_object_concurrently_multiple( - &self, - settings: &Settings, - key: &str, - parts: Vec>, - ) -> StorageResult> { - let results = - parts - .into_iter() - .map(|range| async move { - self.get_object_range_buf(settings, key, &range).await - }) - .collect::>(); - - let init: Box = Box::new(&[][..]); - let buf = results - .try_fold(init, |prev, buf| async { - let res: Box = Box::new(prev.chain(buf)); - Ok(res) - }) - .await?; - - Ok(Box::new(buf)) - } - - async fn get_object_concurrently( - &self, - settings: &Settings, - key: &str, - range: &Range, - ) -> StorageResult { - let parts = split_in_multiple_requests( - range, - settings.concurrency().ideal_concurrent_request_size().get(), - settings.concurrency().max_concurrent_requests_for_object().get(), - ) - .collect::>(); - - let res = match parts.len() { - 0 => Reader::Asynchronous(Box::new(tokio::io::empty())), - 1 => Reader::Asynchronous( - self.get_object_range_read(settings, key, range).await?, - ), - _ => Reader::Synchronous( - self.get_object_concurrently_multiple(settings, key, parts).await?, - ), - }; - Ok(res) - } -} +// Test module +#[cfg(test)] +pub mod logging; -fn convert_list_item(item: ListInfo) -> Option> -where - Id: for<'b> TryFrom<&'b str>, -{ - let id = Id::try_from(item.id.as_str()).ok()?; - let created_at = item.created_at; - Some(ListInfo { created_at, id, size_bytes: item.size_bytes }) -} +// Re-export core types and traits +pub use errors::*; +pub use traits::Storage; +pub use types::*; -fn translate_list_infos<'a, Id>( - s: impl Stream>> + Send + 'a, -) -> BoxStream<'a, StorageResult>> -where - Id: for<'b> TryFrom<&'b str> + Send + std::fmt::Debug + 'a, -{ - s.try_filter_map(|info| async move { - let info = convert_list_item(info); - if info.is_none() { - tracing::error!(list_info=?info, "Error processing list item metadata"); - } - Ok(info) - }) - .boxed() -} +// Conditional re-exports for implementations +#[cfg(not(target_arch = "wasm32"))] +pub use backends::{ObjectStorage, S3Storage}; /// Split an object request into multiple byte range requests /// @@ -820,12 +99,41 @@ pub fn split_in_multiple_equal_requests( .map(|(_, range)| range) } +fn convert_list_item(item: ListInfo) -> Option> +where + Id: for<'b> TryFrom<&'b str>, +{ + let id = Id::try_from(item.id.as_str()).ok()?; + let created_at = item.created_at; + Some(ListInfo { created_at, id, size_bytes: item.size_bytes }) +} + +fn translate_list_infos<'a, Id>( + s: impl Stream>> + Send + 'a, +) -> BoxStream<'a, StorageResult>> +where + Id: for<'b> TryFrom<&'b str> + Send + std::fmt::Debug + 'a, +{ + s.try_filter_map(|info| async move { + let info = convert_list_item(info); + if info.is_none() { + tracing::error!(list_info=?info, "Error processing list item metadata"); + } + Ok(info) + }) + .boxed() +} + +// Constructor functions - conditionally compiled for non-WASM targets +#[cfg(not(target_arch = "wasm32"))] pub fn new_s3_storage( config: S3Options, bucket: String, prefix: Option, credentials: Option, ) -> StorageResult> { + use backends::s3::S3Storage; + if let Some(endpoint) = &config.endpoint_url { if endpoint.contains("fly.storage.tigris.dev") { return Err(StorageError::from(StorageErrorKind::Other("Tigris Storage is not S3 compatible, use the Tigris specific constructor instead".to_string()))); @@ -844,6 +152,7 @@ pub fn new_s3_storage( Ok(Arc::new(st)) } +#[cfg(not(target_arch = "wasm32"))] pub fn new_r2_storage( config: S3Options, bucket: Option, @@ -851,6 +160,8 @@ pub fn new_r2_storage( account_id: Option, credentials: Option, ) -> StorageResult> { + use backends::s3::S3Storage; + let (bucket, prefix) = match (bucket, prefix) { (Some(bucket), Some(prefix)) => (bucket, Some(prefix)), (None, Some(prefix)) => match prefix.split_once("/") { @@ -893,6 +204,7 @@ pub fn new_r2_storage( Ok(Arc::new(st)) } +#[cfg(not(target_arch = "wasm32"))] pub fn new_tigris_storage( config: S3Options, bucket: String, @@ -900,6 +212,8 @@ pub fn new_tigris_storage( credentials: Option, use_weak_consistency: bool, ) -> StorageResult> { + use backends::s3::S3Storage; + let config = S3Options { endpoint_url: Some( config.endpoint_url.unwrap_or("https://fly.storage.tigris.dev".to_string()), @@ -934,18 +248,32 @@ pub fn new_tigris_storage( Ok(Arc::new(st)) } +// WASM-compatible constructors - always available pub async fn new_in_memory_storage() -> StorageResult> { - let st = ObjectStorage::new_in_memory().await?; - Ok(Arc::new(st)) + #[cfg(not(target_arch = "wasm32"))] + { + let st = backends::ObjectStorage::new_in_memory().await?; + Ok(Arc::new(st)) + } + #[cfg(target_arch = "wasm32")] + { + // For WASM, we'll need a different in-memory implementation + // This is a placeholder that will be replaced by WASM-specific implementation + Err(StorageError::from(StorageErrorKind::Other( + "In-memory storage not yet implemented for WASM".to_string(), + ))) + } } +#[cfg(not(target_arch = "wasm32"))] pub async fn new_local_filesystem_storage( - path: &Path, + path: &std::path::Path, ) -> StorageResult> { - let st = ObjectStorage::new_local_filesystem(path).await?; + let st = backends::ObjectStorage::new_local_filesystem(path).await?; Ok(Arc::new(st)) } +#[cfg(not(target_arch = "wasm32"))] pub async fn new_s3_object_store_storage( config: S3Options, bucket: String, @@ -958,34 +286,46 @@ pub async fn new_s3_object_store_storage( } } let storage = - ObjectStorage::new_s3(bucket, prefix, credentials, Some(config)).await?; + backends::ObjectStorage::new_s3(bucket, prefix, credentials, Some(config)) + .await?; Ok(Arc::new(storage)) } +#[cfg(not(target_arch = "wasm32"))] pub async fn new_azure_blob_storage( account: String, container: String, prefix: Option, credentials: Option, - config: Option>, + config: Option>, ) -> StorageResult> { + use object_store::azure::AzureConfigKey; + let config = config .unwrap_or_default() .into_iter() .filter_map(|(key, value)| key.parse::().map(|k| (k, value)).ok()) .collect(); - let storage = - ObjectStorage::new_azure(account, container, prefix, credentials, Some(config)) - .await?; + let storage = backends::ObjectStorage::new_azure( + account, + container, + prefix, + credentials, + Some(config), + ) + .await?; Ok(Arc::new(storage)) } +#[cfg(not(target_arch = "wasm32"))] pub async fn new_gcs_storage( bucket: String, prefix: Option, credentials: Option, - config: Option>, + config: Option>, ) -> StorageResult> { + use object_store::gcp::GoogleConfigKey; + let config = config .unwrap_or_default() .into_iter() @@ -994,14 +334,14 @@ pub async fn new_gcs_storage( }) .collect(); let storage = - ObjectStorage::new_gcs(bucket, prefix, credentials, Some(config)).await?; + backends::ObjectStorage::new_gcs(bucket, prefix, credentials, Some(config)) + .await?; Ok(Arc::new(storage)) } #[cfg(test)] #[allow(clippy::unwrap_used, clippy::panic)] mod tests { - use std::{collections::HashSet, fs::File, io::Write, path::PathBuf}; use crate::config::{GcsBearerCredential, GcsStaticCredentials}; @@ -1011,6 +351,7 @@ mod tests { use proptest::prelude::*; use tempfile::TempDir; + #[cfg(not(target_arch = "wasm32"))] #[tokio_test] async fn test_is_clean() { let repo_dir = TempDir::new().unwrap(); @@ -1027,6 +368,7 @@ mod tests { assert!(s.root_is_clean().await.unwrap()); } + #[cfg(not(target_arch = "wasm32"))] #[tokio_test] /// Regression test: we can deserialize a GCS credential with token async fn test_gcs_session_serialization() { diff --git a/icechunk/src/storage/traits.rs b/icechunk/src/storage/traits.rs new file mode 100644 index 000000000..684e448f4 --- /dev/null +++ b/icechunk/src/storage/traits.rs @@ -0,0 +1,352 @@ +use async_trait::async_trait; +use bytes::Bytes; +use chrono::{DateTime, Utc}; +use core::fmt; +use futures::{StreamExt, stream::BoxStream}; +use std::ops::Range; +use tokio::io::AsyncRead; + +use crate::{ + format::{ChunkId, ChunkOffset, ManifestId, SnapshotId}, + private, +}; + +use super::{ + DeleteObjectsResult, FetchConfigResult, GetRefResult, ListInfo, Reader, Settings, + StorageResult, UpdateConfigResult, VersionInfo, WriteRefResult, +}; + +/// Fetch and write the parquet files that represent the repository in object store +/// +/// Different implementation can cache the files differently, or not at all. +/// Implementations are free to assume files are never overwritten. +#[async_trait] +#[typetag::serde(tag = "type")] +pub trait Storage: fmt::Debug + fmt::Display + private::Sealed + Sync + Send { + fn default_settings(&self) -> Settings { + Default::default() + } + + fn can_write(&self) -> bool; + + async fn fetch_config(&self, settings: &Settings) + -> StorageResult; + async fn update_config( + &self, + settings: &Settings, + config: Bytes, + previous_version: &VersionInfo, + ) -> StorageResult; + async fn fetch_snapshot( + &self, + settings: &Settings, + id: &SnapshotId, + ) -> StorageResult>; + /// Returns whatever reader is more efficient. + /// + /// For example, if processed with multiple requests, it will return a synchronous `Buf` + /// instance pointing the different parts. If it was executed in a single request, it's more + /// efficient to return the network `AsyncRead` directly + async fn fetch_manifest_known_size( + &self, + settings: &Settings, + id: &ManifestId, + size: u64, + ) -> StorageResult; + async fn fetch_manifest_unknown_size( + &self, + settings: &Settings, + id: &ManifestId, + ) -> StorageResult>; + async fn fetch_chunk( + &self, + settings: &Settings, + id: &ChunkId, + range: &Range, + ) -> StorageResult; // FIXME: format flags + async fn fetch_transaction_log( + &self, + settings: &Settings, + id: &SnapshotId, + ) -> StorageResult>; + + async fn write_snapshot( + &self, + settings: &Settings, + id: SnapshotId, + metadata: Vec<(String, String)>, + bytes: Bytes, + ) -> StorageResult<()>; + async fn write_manifest( + &self, + settings: &Settings, + id: ManifestId, + metadata: Vec<(String, String)>, + bytes: Bytes, + ) -> StorageResult<()>; + async fn write_chunk( + &self, + settings: &Settings, + id: ChunkId, + bytes: Bytes, + ) -> StorageResult<()>; + async fn write_transaction_log( + &self, + settings: &Settings, + id: SnapshotId, + metadata: Vec<(String, String)>, + bytes: Bytes, + ) -> StorageResult<()>; + + async fn get_ref( + &self, + settings: &Settings, + ref_key: &str, + ) -> StorageResult; + async fn ref_names(&self, settings: &Settings) -> StorageResult>; + async fn write_ref( + &self, + settings: &Settings, + ref_key: &str, + bytes: Bytes, + previous_version: &VersionInfo, + ) -> StorageResult; + + async fn list_objects<'a>( + &'a self, + settings: &Settings, + prefix: &str, + ) -> StorageResult>>>; + + async fn delete_batch( + &self, + settings: &Settings, + prefix: &str, + batch: Vec<(String, u64)>, + ) -> StorageResult; + + /// Delete a stream of objects, by their id string representations + /// Input stream includes sizes to get as result the total number of bytes deleted + /// Delete a stream of objects, by their id string representations + /// Input stream includes sizes to get as result the total number of bytes deleted + async fn delete_objects( + &self, + settings: &Settings, + prefix: &str, + ids: BoxStream<'_, (String, u64)>, + ) -> StorageResult { + use futures::StreamExt; + use std::sync::{Arc, Mutex}; + use tracing::warn; + + let res = Arc::new(Mutex::new(DeleteObjectsResult::default())); + ids.chunks(1_000) + // FIXME: configurable concurrency + .for_each_concurrent(10, |batch| { + let res = Arc::clone(&res); + async move { + let new_deletes = self + .delete_batch(settings, prefix, batch) + .await + .unwrap_or_else(|_| { + // FIXME: handle error instead of skipping + warn!("ignoring error in Storage::delete_batch"); + Default::default() + }); + #[allow(clippy::expect_used)] + res.lock().expect("Bug in delete objects").merge(&new_deletes); + } + }) + .await; + #[allow(clippy::expect_used)] + let res = res.lock().expect("Bug in delete objects"); + Ok(res.clone()) + } + + async fn get_snapshot_last_modified( + &self, + settings: &Settings, + snapshot: &SnapshotId, + ) -> StorageResult>; + + async fn root_is_clean(&self) -> StorageResult { + match self.list_objects(&Settings::default(), "").await?.next().await { + None => Ok(true), + Some(Ok(_)) => Ok(false), + Some(Err(err)) => Err(err), + } + } + + async fn list_chunks( + &self, + settings: &Settings, + ) -> StorageResult>>> { + use super::{CHUNK_PREFIX, translate_list_infos}; + Ok(translate_list_infos(self.list_objects(settings, CHUNK_PREFIX).await?)) + } + + async fn list_manifests( + &self, + settings: &Settings, + ) -> StorageResult>>> { + use super::{MANIFEST_PREFIX, translate_list_infos}; + Ok(translate_list_infos(self.list_objects(settings, MANIFEST_PREFIX).await?)) + } + + async fn list_snapshots( + &self, + settings: &Settings, + ) -> StorageResult>>> { + use super::{SNAPSHOT_PREFIX, translate_list_infos}; + Ok(translate_list_infos(self.list_objects(settings, SNAPSHOT_PREFIX).await?)) + } + + async fn list_transaction_logs( + &self, + settings: &Settings, + ) -> StorageResult>>> { + use super::{TRANSACTION_PREFIX, translate_list_infos}; + Ok(translate_list_infos(self.list_objects(settings, TRANSACTION_PREFIX).await?)) + } + + async fn delete_chunks( + &self, + settings: &Settings, + chunks: BoxStream<'_, (ChunkId, u64)>, + ) -> StorageResult { + use super::CHUNK_PREFIX; + use futures::StreamExt; + self.delete_objects( + settings, + CHUNK_PREFIX, + chunks.map(|(id, size)| (id.to_string(), size)).boxed(), + ) + .await + } + + async fn delete_manifests( + &self, + settings: &Settings, + manifests: BoxStream<'_, (ManifestId, u64)>, + ) -> StorageResult { + use super::MANIFEST_PREFIX; + use futures::StreamExt; + self.delete_objects( + settings, + MANIFEST_PREFIX, + manifests.map(|(id, size)| (id.to_string(), size)).boxed(), + ) + .await + } + + async fn delete_snapshots( + &self, + settings: &Settings, + snapshots: BoxStream<'_, (SnapshotId, u64)>, + ) -> StorageResult { + use super::SNAPSHOT_PREFIX; + use futures::StreamExt; + self.delete_objects( + settings, + SNAPSHOT_PREFIX, + snapshots.map(|(id, size)| (id.to_string(), size)).boxed(), + ) + .await + } + + async fn delete_transaction_logs( + &self, + settings: &Settings, + transaction_logs: BoxStream<'_, (SnapshotId, u64)>, + ) -> StorageResult { + use super::TRANSACTION_PREFIX; + use futures::StreamExt; + self.delete_objects( + settings, + TRANSACTION_PREFIX, + transaction_logs.map(|(id, size)| (id.to_string(), size)).boxed(), + ) + .await + } + + async fn delete_refs( + &self, + settings: &Settings, + refs: BoxStream<'_, String>, + ) -> StorageResult { + use super::REF_PREFIX; + use futures::StreamExt; + let refs = refs.map(|s| (s, 0)).boxed(); + Ok(self.delete_objects(settings, REF_PREFIX, refs).await?.deleted_objects) + } + + async fn get_object_range_buf( + &self, + settings: &Settings, + key: &str, + range: &Range, + ) -> StorageResult>; + + async fn get_object_range_read( + &self, + settings: &Settings, + key: &str, + range: &Range, + ) -> StorageResult>; + + async fn get_object_concurrently_multiple( + &self, + settings: &Settings, + key: &str, + parts: Vec>, + ) -> StorageResult> { + use bytes::Buf; + use futures::TryStreamExt; + use futures::stream::FuturesOrdered; + + let results = + parts + .into_iter() + .map(|range| async move { + self.get_object_range_buf(settings, key, &range).await + }) + .collect::>(); + + let init: Box = Box::new(&[][..]); + let buf = results + .try_fold(init, |prev, buf| async { + let res: Box = Box::new(prev.chain(buf)); + Ok(res) + }) + .await?; + + Ok(Box::new(buf)) + } + + async fn get_object_concurrently( + &self, + settings: &Settings, + key: &str, + range: &Range, + ) -> StorageResult { + use super::split_in_multiple_requests; + + let parts = split_in_multiple_requests( + range, + settings.concurrency().ideal_concurrent_request_size().get(), + settings.concurrency().max_concurrent_requests_for_object().get(), + ) + .collect::>(); + + let res = match parts.len() { + 0 => Reader::Asynchronous(Box::new(tokio::io::empty())), + 1 => Reader::Asynchronous( + self.get_object_range_read(settings, key, range).await?, + ), + _ => Reader::Synchronous( + self.get_object_concurrently_multiple(settings, key, parts).await?, + ), + }; + Ok(res) + } +} diff --git a/icechunk/src/storage/types.rs b/icechunk/src/storage/types.rs new file mode 100644 index 000000000..0584b462a --- /dev/null +++ b/icechunk/src/storage/types.rs @@ -0,0 +1,329 @@ +use bytes::{Buf, Bytes}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::{ + io::Read, + num::{NonZeroU16, NonZeroU64}, + sync::OnceLock, +}; +use tokio::io::AsyncRead; +#[cfg(not(target_arch = "wasm32"))] +use tokio_util::io::SyncIoBridge; + +use super::StorageResult; + +pub const SNAPSHOT_PREFIX: &str = "snapshots/"; +pub const MANIFEST_PREFIX: &str = "manifests/"; +pub const CHUNK_PREFIX: &str = "chunks/"; +pub const REF_PREFIX: &str = "refs"; +pub const TRANSACTION_PREFIX: &str = "transactions/"; +pub const CONFIG_PATH: &str = "config.yaml"; + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Hash, PartialOrd, Ord)] +pub struct ETag(pub String); + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Default)] +pub struct Generation(pub String); + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +pub struct VersionInfo { + pub etag: Option, + pub generation: Option, +} + +impl VersionInfo { + pub fn for_creation() -> Self { + Self { etag: None, generation: None } + } + + pub fn from_etag_only(etag: String) -> Self { + Self { etag: Some(ETag(etag)), generation: None } + } + + pub fn is_create(&self) -> bool { + self.etag.is_none() && self.generation.is_none() + } + + pub fn etag(&self) -> Option<&String> { + self.etag.as_ref().map(|e| &e.0) + } + + pub fn generation(&self) -> Option<&String> { + self.generation.as_ref().map(|e| &e.0) + } +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Default)] +pub struct RetriesSettings { + pub max_tries: Option, + pub initial_backoff_ms: Option, + pub max_backoff_ms: Option, +} + +impl RetriesSettings { + pub fn max_tries(&self) -> NonZeroU16 { + self.max_tries.unwrap_or_else(|| NonZeroU16::new(10).unwrap_or(NonZeroU16::MIN)) + } + + pub fn initial_backoff_ms(&self) -> u32 { + self.initial_backoff_ms.unwrap_or(100) + } + + pub fn max_backoff_ms(&self) -> u32 { + self.max_backoff_ms.unwrap_or(3 * 60 * 1000) + } + + pub fn merge(&self, other: Self) -> Self { + Self { + max_tries: other.max_tries.or(self.max_tries), + initial_backoff_ms: other.initial_backoff_ms.or(self.initial_backoff_ms), + max_backoff_ms: other.max_backoff_ms.or(self.max_backoff_ms), + } + } +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Default)] +pub struct ConcurrencySettings { + pub max_concurrent_requests_for_object: Option, + pub ideal_concurrent_request_size: Option, +} + +impl ConcurrencySettings { + // AWS recommendations: https://docs.aws.amazon.com/whitepapers/latest/s3-optimizing-performance-best-practices/horizontal-scaling-and-request-parallelization-for-high-throughput.html + // 8-16 MB requests + // 85-90 MB/s per request + // these numbers would saturate a 12.5 Gbps network + + pub fn max_concurrent_requests_for_object(&self) -> NonZeroU16 { + self.max_concurrent_requests_for_object + .unwrap_or_else(|| NonZeroU16::new(18).unwrap_or(NonZeroU16::MIN)) + } + pub fn ideal_concurrent_request_size(&self) -> NonZeroU64 { + self.ideal_concurrent_request_size.unwrap_or_else(|| { + NonZeroU64::new(12 * 1024 * 1024).unwrap_or(NonZeroU64::MIN) + }) + } + + pub fn merge(&self, other: Self) -> Self { + Self { + max_concurrent_requests_for_object: other + .max_concurrent_requests_for_object + .or(self.max_concurrent_requests_for_object), + ideal_concurrent_request_size: other + .ideal_concurrent_request_size + .or(self.ideal_concurrent_request_size), + } + } +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Default)] +pub struct Settings { + pub concurrency: Option, + pub retries: Option, + pub unsafe_use_conditional_update: Option, + pub unsafe_use_conditional_create: Option, + pub unsafe_use_metadata: Option, + #[serde(default)] + pub storage_class: Option, + #[serde(default)] + pub metadata_storage_class: Option, + #[serde(default)] + pub chunks_storage_class: Option, + #[serde(default)] + pub minimum_size_for_multipart_upload: Option, +} + +static DEFAULT_CONCURRENCY: OnceLock = OnceLock::new(); +static DEFAULT_RETRIES: OnceLock = OnceLock::new(); + +impl Settings { + pub fn concurrency(&self) -> &ConcurrencySettings { + self.concurrency + .as_ref() + .unwrap_or_else(|| DEFAULT_CONCURRENCY.get_or_init(Default::default)) + } + + pub fn retries(&self) -> &RetriesSettings { + self.retries + .as_ref() + .unwrap_or_else(|| DEFAULT_RETRIES.get_or_init(Default::default)) + } + + pub fn unsafe_use_conditional_create(&self) -> bool { + self.unsafe_use_conditional_create.unwrap_or(true) + } + + pub fn unsafe_use_conditional_update(&self) -> bool { + self.unsafe_use_conditional_update.unwrap_or(true) + } + + pub fn unsafe_use_metadata(&self) -> bool { + self.unsafe_use_metadata.unwrap_or(true) + } + + pub fn metadata_storage_class(&self) -> Option<&String> { + self.metadata_storage_class.as_ref().or(self.storage_class.as_ref()) + } + + pub fn chunks_storage_class(&self) -> Option<&String> { + self.chunks_storage_class.as_ref().or(self.storage_class.as_ref()) + } + + pub fn minimum_size_for_multipart_upload(&self) -> u64 { + // per AWS recommendation: 100 MB + self.minimum_size_for_multipart_upload.unwrap_or(100 * 1024 * 1024) + } + + pub fn merge(&self, other: Self) -> Self { + Self { + concurrency: match (&self.concurrency, other.concurrency) { + (None, None) => None, + (None, Some(c)) => Some(c), + (Some(c), None) => Some(c.clone()), + (Some(mine), Some(theirs)) => Some(mine.merge(theirs)), + }, + retries: match (&self.retries, other.retries) { + (None, None) => None, + (None, Some(c)) => Some(c), + (Some(c), None) => Some(c.clone()), + (Some(mine), Some(theirs)) => Some(mine.merge(theirs)), + }, + unsafe_use_conditional_create: match ( + &self.unsafe_use_conditional_create, + other.unsafe_use_conditional_create, + ) { + (None, None) => None, + (None, Some(c)) => Some(c), + (Some(c), None) => Some(*c), + (Some(_), Some(theirs)) => Some(theirs), + }, + unsafe_use_conditional_update: match ( + &self.unsafe_use_conditional_update, + other.unsafe_use_conditional_update, + ) { + (None, None) => None, + (None, Some(c)) => Some(c), + (Some(c), None) => Some(*c), + (Some(_), Some(theirs)) => Some(theirs), + }, + unsafe_use_metadata: match ( + &self.unsafe_use_metadata, + other.unsafe_use_metadata, + ) { + (None, None) => None, + (None, Some(c)) => Some(c), + (Some(c), None) => Some(*c), + (Some(_), Some(theirs)) => Some(theirs), + }, + storage_class: match (&self.storage_class, other.storage_class) { + (None, None) => None, + (None, Some(c)) => Some(c), + (Some(c), None) => Some(c.clone()), + (Some(_), Some(theirs)) => Some(theirs), + }, + metadata_storage_class: match ( + &self.metadata_storage_class, + other.metadata_storage_class, + ) { + (None, None) => None, + (None, Some(c)) => Some(c), + (Some(c), None) => Some(c.clone()), + (Some(_), Some(theirs)) => Some(theirs), + }, + chunks_storage_class: match ( + &self.chunks_storage_class, + other.chunks_storage_class, + ) { + (None, None) => None, + (None, Some(c)) => Some(c), + (Some(c), None) => Some(c.clone()), + (Some(_), Some(theirs)) => Some(theirs), + }, + minimum_size_for_multipart_upload: match ( + &self.minimum_size_for_multipart_upload, + other.minimum_size_for_multipart_upload, + ) { + (None, None) => None, + (None, Some(c)) => Some(c), + (Some(c), None) => Some(*c), + (Some(_), Some(theirs)) => Some(theirs), + }, + } + } +} + +pub enum Reader { + Asynchronous(Box), + Synchronous(Box), +} + +impl Reader { + pub async fn to_bytes(self, expected_size: usize) -> StorageResult { + match self { + Reader::Asynchronous(mut read) => { + // add some extra space to the buffer to optimize conversion to bytes + let mut buffer = Vec::with_capacity(expected_size + 16); + tokio::io::copy(&mut read, &mut buffer) + .await + .map_err(super::StorageErrorKind::IOError)?; + Ok(buffer.into()) + } + Reader::Synchronous(mut buf) => Ok(buf.copy_to_bytes(buf.remaining())), + } + } + + /// Notice this Read can only be used in non async contexts, for example, calling tokio::task::spawn_blocking + pub fn into_read(self) -> Box { + match self { + #[cfg(not(target_arch = "wasm32"))] + Reader::Asynchronous(read) => Box::new(SyncIoBridge::new(read)), + #[cfg(target_arch = "wasm32")] + Reader::Asynchronous(_) => panic!("SyncIoBridge not available on WASM"), + Reader::Synchronous(buf) => Box::new(buf.reader()), + } + } +} + +#[derive(Debug)] +pub struct ListInfo { + pub id: Id, + pub created_at: DateTime, + pub size_bytes: u64, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FetchConfigResult { + Found { bytes: Bytes, version: VersionInfo }, + NotFound, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum UpdateConfigResult { + Updated { new_version: VersionInfo }, + NotOnLatestVersion, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum GetRefResult { + Found { bytes: Bytes, version: VersionInfo }, + NotFound, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum WriteRefResult { + Written, + WontOverwrite, +} + +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct DeleteObjectsResult { + pub deleted_objects: u64, + pub deleted_bytes: u64, +} + +impl DeleteObjectsResult { + pub fn merge(&mut self, other: &Self) { + self.deleted_objects += other.deleted_objects; + self.deleted_bytes += other.deleted_bytes; + } +} diff --git a/icechunk/src/virtual_chunks.rs b/icechunk/src/virtual_chunks.rs index 2a463d79c..b89eb30e7 100644 --- a/icechunk/src/virtual_chunks.rs +++ b/icechunk/src/virtual_chunks.rs @@ -2,40 +2,49 @@ use std::{ collections::HashMap, num::{NonZeroU16, NonZeroU64}, ops::Range, - str::FromStr, sync::Arc, }; use async_trait::async_trait; -use aws_sdk_s3::{Client, error::SdkError, operation::get_object::GetObjectError}; use bytes::{Buf, Bytes}; -use futures::{TryStreamExt, stream::FuturesOrdered}; -use object_store::{ - ClientConfigKey, GetOptions, ObjectStore, gcp::GoogleConfigKey, - local::LocalFileSystem, path::Path, -}; use quick_cache::sync::Cache; use serde::{Deserialize, Serialize}; use url::Url; +#[cfg(not(target_arch = "wasm32"))] +use { + aws_sdk_s3::{Client, error::SdkError, operation::get_object::GetObjectError}, + futures::{TryStreamExt, stream::FuturesOrdered}, + object_store::{ + ClientConfigKey, GetOptions, ObjectStore, gcp::GoogleConfigKey, + local::LocalFileSystem, path::Path, + }, + std::str::FromStr, +}; + +#[cfg(target_arch = "wasm32")] +use futures::{TryStreamExt, stream::FuturesOrdered}; + use crate::{ ObjectStoreConfig, config::{Credentials, GcsCredentials, S3Credentials, S3Options}, format::{ ChunkOffset, - manifest::{ - Checksum, SecondsSinceEpoch, VirtualReferenceError, VirtualReferenceErrorKind, - }, + manifest::{Checksum, VirtualReferenceError, VirtualReferenceErrorKind}, }, private, - storage::{ - self, - object_store::{ - GcsObjectStoreBackend, HttpObjectStoreBackend, ObjectStoreBackend as _, - }, - s3::{mk_client, range_to_header}, - split_in_multiple_requests, + storage::{self, split_in_multiple_requests}, +}; + +#[cfg(not(target_arch = "wasm32"))] +use crate::format::manifest::SecondsSinceEpoch; + +#[cfg(not(target_arch = "wasm32"))] +use crate::storage::backends::{ + object_store::{ + GcsObjectStoreBackend, HttpObjectStoreBackend, ObjectStoreBackend as _, }, + s3::{mk_client, range_to_header}, }; pub type ContainerName = String; @@ -256,7 +265,7 @@ impl VirtualChunkResolver { settings: storage::Settings, ) -> Self { fn add_trailing(s: String) -> String { - if s.ends_with('/') { s } else { format!("{s}/") } + if s.ends_with('/') { s } else { format!("{}/", s) } } // we need to validate the containers because they can come from persisted config @@ -487,12 +496,14 @@ fn fetcher_cache_key( } } +#[cfg(not(target_arch = "wasm32"))] #[derive(Debug)] pub struct S3Fetcher { client: Arc, settings: storage::Settings, } +#[cfg(not(target_arch = "wasm32"))] impl S3Fetcher { pub async fn new( opts: &S3Options, @@ -505,8 +516,10 @@ impl S3Fetcher { } } +#[cfg(not(target_arch = "wasm32"))] impl private::Sealed for S3Fetcher {} +#[cfg(not(target_arch = "wasm32"))] #[async_trait] impl ChunkFetcher for S3Fetcher { fn ideal_concurrent_request_size(&self) -> NonZeroU64 { @@ -600,13 +613,17 @@ impl ChunkFetcher for S3Fetcher { } } +#[cfg(not(target_arch = "wasm32"))] #[derive(Debug)] pub struct ObjectStoreFetcher { client: Arc, settings: storage::Settings, } + +#[cfg(not(target_arch = "wasm32"))] impl private::Sealed for ObjectStoreFetcher {} +#[cfg(not(target_arch = "wasm32"))] impl ObjectStoreFetcher { fn new_local() -> Self { ObjectStoreFetcher { @@ -669,6 +686,7 @@ impl ObjectStoreFetcher { } } +#[cfg(not(target_arch = "wasm32"))] #[async_trait] impl ChunkFetcher for ObjectStoreFetcher { fn ideal_concurrent_request_size(&self) -> NonZeroU64 { @@ -721,6 +739,117 @@ impl ChunkFetcher for ObjectStoreFetcher { } } +// WASM-compatible stubs +#[cfg(target_arch = "wasm32")] +#[derive(Debug)] +pub struct S3Fetcher; + +#[cfg(target_arch = "wasm32")] +impl S3Fetcher { + pub async fn new( + _opts: &crate::config::S3Options, + _credentials: &crate::config::S3Credentials, + _settings: storage::Settings, + ) -> Self { + Self + } +} + +#[cfg(target_arch = "wasm32")] +impl private::Sealed for S3Fetcher {} + +#[cfg(target_arch = "wasm32")] +#[async_trait] +impl ChunkFetcher for S3Fetcher { + fn ideal_concurrent_request_size(&self) -> NonZeroU64 { + NonZeroU64::new(1024).unwrap_or(NonZeroU64::MIN) + } + + fn max_concurrent_requests_for_object(&self) -> NonZeroU16 { + NonZeroU16::new(1).unwrap_or(NonZeroU16::MIN) + } + + async fn fetch_part( + &self, + _chunk_location: &Url, + _range: Range, + _checksum: Option<&Checksum>, + ) -> Result, VirtualReferenceError> { + Err(VirtualReferenceError::from(VirtualReferenceErrorKind::OtherError(Box::new( + std::io::Error::new( + std::io::ErrorKind::Unsupported, + "S3Fetcher not supported on WASM", + ), + )))) + } +} + +#[cfg(target_arch = "wasm32")] +#[derive(Debug)] +pub struct ObjectStoreFetcher; + +#[cfg(target_arch = "wasm32")] +impl ObjectStoreFetcher { + fn new_local() -> Self { + Self + } + + pub async fn new_http( + _url: &str, + _opts: &HashMap, + ) -> Result { + Err(VirtualReferenceError::from(VirtualReferenceErrorKind::OtherError(Box::new( + std::io::Error::new( + std::io::ErrorKind::Unsupported, + "ObjectStoreFetcher::new_http not supported on WASM", + ), + )))) + } + + pub async fn new_gcs( + _bucket: String, + _prefix: Option, + _credentials: Option, + _config: HashMap, + ) -> Result { + Err(VirtualReferenceError::from(VirtualReferenceErrorKind::OtherError(Box::new( + std::io::Error::new( + std::io::ErrorKind::Unsupported, + "ObjectStoreFetcher::new_gcs not supported on WASM", + ), + )))) + } +} + +#[cfg(target_arch = "wasm32")] +impl private::Sealed for ObjectStoreFetcher {} + +#[cfg(target_arch = "wasm32")] +#[async_trait] +impl ChunkFetcher for ObjectStoreFetcher { + fn ideal_concurrent_request_size(&self) -> NonZeroU64 { + NonZeroU64::new(1024).unwrap_or(NonZeroU64::MIN) + } + + fn max_concurrent_requests_for_object(&self) -> NonZeroU16 { + NonZeroU16::new(1).unwrap_or(NonZeroU16::MIN) + } + + async fn fetch_part( + &self, + _chunk_location: &Url, + _range: Range, + _checksum: Option<&Checksum>, + ) -> Result, VirtualReferenceError> { + Err(VirtualReferenceError::from(VirtualReferenceErrorKind::OtherError(Box::new( + std::io::Error::new( + std::io::ErrorKind::Unsupported, + "ObjectStoreFetcher not supported on WASM", + ), + )))) + } +} + #[cfg(test)] #[allow(clippy::panic, clippy::unwrap_used, clippy::expect_used)] mod tests { diff --git a/icechunk/tests/test_storage.rs b/icechunk/tests/test_storage.rs index beb027434..edd81aee1 100644 --- a/icechunk/tests/test_storage.rs +++ b/icechunk/tests/test_storage.rs @@ -17,7 +17,7 @@ use icechunk::{ }, storage::{ self, ETag, FetchConfigResult, Generation, StorageResult, UpdateConfigResult, - VersionInfo, new_in_memory_storage, new_s3_storage, s3::mk_client, + VersionInfo, backends::s3::mk_client, new_in_memory_storage, new_s3_storage, }, }; use icechunk_macros::tokio_test; diff --git a/icechunk/tests/test_virtual_refs.rs b/icechunk/tests/test_virtual_refs.rs index a267a2801..70c316086 100644 --- a/icechunk/tests/test_virtual_refs.rs +++ b/icechunk/tests/test_virtual_refs.rs @@ -21,7 +21,8 @@ use icechunk::{ repository::VersionInfo, session::{SessionErrorKind, get_chunk}, storage::{ - self, ConcurrencySettings, ETag, ObjectStorage, new_s3_storage, s3::mk_client, + self, ConcurrencySettings, ETag, ObjectStorage, backends::s3::mk_client, + new_s3_storage, }, store::{StoreError, StoreErrorKind}, virtual_chunks::VirtualChunkContainer,