From 4630288edb6b03732aca5c2b9c56d9b2034ad8d3 Mon Sep 17 00:00:00 2001 From: Jordan Santell Date: Thu, 21 Sep 2023 15:52:56 -0700 Subject: [PATCH] feat: Introduce EphemeralStorage, allowing storages to create temporary Stores. --- Cargo.lock | 1 + rust/noosphere-common/src/task.rs | 14 + rust/noosphere-core/src/authority/author.rs | 8 +- rust/noosphere-core/src/context/context.rs | 2 +- rust/noosphere-core/src/data/address.rs | 8 +- rust/noosphere-core/src/data/sphere.rs | 4 +- rust/noosphere-core/src/helpers/context.rs | 2 +- rust/noosphere-core/src/helpers/link.rs | 2 +- .../examples/notes-to-html/implementation.rs | 2 +- rust/noosphere-ipfs/src/storage.rs | 17 +- rust/noosphere-ns/src/builder.rs | 4 +- rust/noosphere-ns/src/dht_client.rs | 2 +- rust/noosphere-ns/src/name_system.rs | 6 +- rust/noosphere-ns/src/server/client.rs | 4 +- rust/noosphere-ns/tests/ns_test.rs | 4 +- rust/noosphere-storage/Cargo.toml | 9 +- rust/noosphere-storage/examples/bench/main.rs | 2 +- .../examples/bench/performance.rs | 15 +- rust/noosphere-storage/src/db.rs | 35 ++- rust/noosphere-storage/src/ephemeral.rs | 294 ++++++++++++++++++ .../src/implementation/indexed_db.rs | 292 +++++++++++++++-- .../src/implementation/memory.rs | 19 +- .../src/implementation/rocks_db.rs | 144 +++++++-- .../src/implementation/sled.rs | 134 +++++++- .../src/implementation/tracking.rs | 13 + rust/noosphere-storage/src/lib.rs | 38 ++- rust/noosphere-storage/src/non_persistent.rs | 88 ++++++ rust/noosphere-storage/src/ops.rs | 13 + rust/noosphere-storage/src/partitioned.rs | 106 +++++++ rust/noosphere-storage/src/storage.rs | 5 +- rust/noosphere-storage/src/store.rs | 52 +++- rust/noosphere-storage/src/ucan.rs | 2 +- rust/noosphere/src/sphere/builder/mod.rs | 2 +- 33 files changed, 1224 insertions(+), 119 deletions(-) create mode 100644 rust/noosphere-storage/src/ephemeral.rs create mode 100644 rust/noosphere-storage/src/non_persistent.rs create mode 100644 rust/noosphere-storage/src/ops.rs create mode 100644 rust/noosphere-storage/src/partitioned.rs diff --git a/Cargo.lock b/Cargo.lock index d2cc1e7db..a1b6898b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3710,6 +3710,7 @@ dependencies = [ "async-trait", "base64 0.21.4", "cid", + "futures", "instant", "js-sys", "libipld-cbor", diff --git a/rust/noosphere-common/src/task.rs b/rust/noosphere-common/src/task.rs index 6fc481f4e..3a0048119 100644 --- a/rust/noosphere-common/src/task.rs +++ b/rust/noosphere-common/src/task.rs @@ -13,6 +13,8 @@ use futures::future::join_all; #[cfg(not(target_arch = "wasm32"))] use tokio::task::JoinSet; +use crate::ConditionalSend; + #[cfg(target_arch = "wasm32")] /// Spawn a future by scheduling it with the local executor. The returned /// future will be pending until the spawned future completes. @@ -43,6 +45,18 @@ where Ok(tokio::spawn(future).await?) } +/// Spawns a [ConditionalSend] future without requiring `await`. +/// The future will immediately start processing. +pub fn spawn_no_wait(future: F) +where + F: Future + ConditionalSend + 'static, +{ + #[cfg(target_arch = "wasm32")] + wasm_bindgen_futures::spawn_local(future); + #[cfg(not(target_arch = "wasm32"))] + tokio::task::spawn(future); +} + /// An aggregator of async work that can be used to observe the moment when all /// the aggregated work is completed. It is similar to tokio's [JoinSet], but is /// relatively constrained and also works on `wasm32-unknown-unknown`. Unlike diff --git a/rust/noosphere-core/src/authority/author.rs b/rust/noosphere-core/src/authority/author.rs index c307caa3c..a7ab4b06c 100644 --- a/rust/noosphere-core/src/authority/author.rs +++ b/rust/noosphere-core/src/authority/author.rs @@ -179,7 +179,7 @@ mod tests { #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] async fn it_gives_read_only_access_when_there_is_no_authorization() -> Result<()> { let author = Author::anonymous(); - let mut db = SphereDb::new(&MemoryStorage::default()).await?; + let mut db = SphereDb::new(MemoryStorage::default()).await?; let (sphere, _, _) = Sphere::generate("did:key:foo", &mut db).await?; @@ -201,7 +201,7 @@ mod tests { async fn it_gives_read_write_access_if_the_key_is_authorized() -> Result<()> { let owner_key = generate_ed25519_key(); let owner_did = Did(owner_key.get_did().await.unwrap()); - let mut db = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let mut db = SphereDb::new(MemoryStorage::default()).await.unwrap(); let (sphere, authorization, _) = Sphere::generate(&owner_did, &mut db).await.unwrap(); @@ -227,7 +227,7 @@ mod tests { async fn it_gives_read_write_access_to_the_root_sphere_credential() -> Result<()> { let owner_key = generate_ed25519_key(); let owner_did = Did(owner_key.get_did().await.unwrap()); - let mut db = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let mut db = SphereDb::new(MemoryStorage::default()).await.unwrap(); let (sphere, authorization, mnemonic) = Sphere::generate(&owner_did, &mut db).await.unwrap(); @@ -256,7 +256,7 @@ mod tests { async fn it_can_find_itself_in_an_authorization_lineage() -> Result<()> { let owner_key = generate_ed25519_key(); let owner_did = Did(owner_key.get_did().await.unwrap()); - let mut db = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let mut db = SphereDb::new(MemoryStorage::default()).await.unwrap(); let (sphere, authorization, _) = Sphere::generate(&owner_did, &mut db).await.unwrap(); diff --git a/rust/noosphere-core/src/context/context.rs b/rust/noosphere-core/src/context/context.rs index c8f62e1f6..54c5300db 100644 --- a/rust/noosphere-core/src/context/context.rs +++ b/rust/noosphere-core/src/context/context.rs @@ -393,7 +393,7 @@ mod tests { let mut records: Vec = vec![]; let owner_key = generate_ed25519_key(); let owner_did = owner_key.get_did().await?; - let mut db = SphereDb::new(&MemoryStorage::default()).await?; + let mut db = SphereDb::new(MemoryStorage::default()).await?; let (sphere, proof, _) = Sphere::generate(&owner_did, &mut db).await?; let ucan_proof = proof.as_ucan(&db).await?; let sphere_identity = sphere.get_identity().await?; diff --git a/rust/noosphere-core/src/data/address.rs b/rust/noosphere-core/src/data/address.rs index e833886bd..e8fd27310 100644 --- a/rust/noosphere-core/src/data/address.rs +++ b/rust/noosphere-core/src/data/address.rs @@ -403,7 +403,7 @@ mod tests { let sphere_identity = Did::from(sphere_key.get_did().await?); let link = "bafyr4iagi6t6khdrtbhmyjpjgvdlwv6pzylxhuhstxhkdp52rju7er325i"; let cid_link: Link = link.parse()?; - let store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let store = SphereDb::new(MemoryStorage::default()).await.unwrap(); let record = from_issuer(&sphere_key, &sphere_identity, &cid_link, None).await?; @@ -422,7 +422,7 @@ mod tests { let sphere_identity = Did::from(sphere_key.get_did().await?); let link = "bafyr4iagi6t6khdrtbhmyjpjgvdlwv6pzylxhuhstxhkdp52rju7er325i"; let cid_link: Cid = link.parse()?; - let mut store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let mut store = SphereDb::new(MemoryStorage::default()).await.unwrap(); // First verify that `owner` cannot publish for `sphere` // without delegation. @@ -486,7 +486,7 @@ mod tests { let sphere_key = generate_ed25519_key(); let sphere_identity = Did::from(sphere_key.get_did().await?); let cid_address = "bafyr4iagi6t6khdrtbhmyjpjgvdlwv6pzylxhuhstxhkdp52rju7er325i"; - let store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let store = SphereDb::new(MemoryStorage::default()).await.unwrap(); expect_failure( "fails when expect `fact` is missing", @@ -676,7 +676,7 @@ mod tests { let delegatee_key = generate_ed25519_key(); let delegatee_did = delegatee_key.get_did().await?; - let mut db = SphereDb::new(&MemoryStorage::default()).await?; + let mut db = SphereDb::new(MemoryStorage::default()).await?; let mut ucan_store = UcanStore(db.clone()); let (sphere, proof, _) = Sphere::generate(&owner_did, &mut db).await?; diff --git a/rust/noosphere-core/src/data/sphere.rs b/rust/noosphere-core/src/data/sphere.rs index df89c0f6d..4073bebcc 100644 --- a/rust/noosphere-core/src/data/sphere.rs +++ b/rust/noosphere-core/src/data/sphere.rs @@ -78,7 +78,7 @@ mod tests { let identity_credential = generate_ed25519_key(); let identity = Did(identity_credential.get_did().await?); - let mut store = SphereDb::new(&MemoryStorage::default()).await?; + let mut store = SphereDb::new(MemoryStorage::default()).await?; let sphere = SphereIpld::new(&identity, &mut store).await?; @@ -127,7 +127,7 @@ mod tests { let identity = Did(identity_credential.get_did().await?); let authorized = Did(authorized_credential.get_did().await?); - let mut store = SphereDb::new(&MemoryStorage::default()).await?; + let mut store = SphereDb::new(MemoryStorage::default()).await?; let sphere = SphereIpld::new(&identity, &mut store).await?; diff --git a/rust/noosphere-core/src/helpers/context.rs b/rust/noosphere-core/src/helpers/context.rs index 84033ce55..aead0fc5e 100644 --- a/rust/noosphere-core/src/helpers/context.rs +++ b/rust/noosphere-core/src/helpers/context.rs @@ -39,7 +39,7 @@ pub async fn simulated_sphere_context( Some(db) => db, None => { let storage_provider = TrackingStorage::wrap(MemoryStorage::default()); - SphereDb::new(&storage_provider).await? + SphereDb::new(storage_provider).await? } }; diff --git a/rust/noosphere-core/src/helpers/link.rs b/rust/noosphere-core/src/helpers/link.rs index f8290f4e4..f381d3eee 100644 --- a/rust/noosphere-core/src/helpers/link.rs +++ b/rust/noosphere-core/src/helpers/link.rs @@ -16,7 +16,7 @@ where { let owner_key = generate_ed25519_key(); let owner_did = owner_key.get_did().await?; - let mut db = SphereDb::new(&MemoryStorage::default()).await?; + let mut db = SphereDb::new(MemoryStorage::default()).await?; let (sphere, proof, _) = Sphere::generate(&owner_did, &mut db).await?; let ucan_proof = proof.as_ucan(&db).await?; diff --git a/rust/noosphere-into/examples/notes-to-html/implementation.rs b/rust/noosphere-into/examples/notes-to-html/implementation.rs index 0f323d073..55e4cbbba 100644 --- a/rust/noosphere-into/examples/notes-to-html/implementation.rs +++ b/rust/noosphere-into/examples/notes-to-html/implementation.rs @@ -27,7 +27,7 @@ use ucan::crypto::KeyMaterial; pub async fn main() -> Result<()> { let storage_provider = MemoryStorage::default(); - let mut db = SphereDb::new(&storage_provider).await.unwrap(); + let mut db = SphereDb::new(storage_provider).await.unwrap(); let owner_key: SphereContextKey = Arc::new(Box::new(generate_ed25519_key())); let owner_did = owner_key.get_did().await?; diff --git a/rust/noosphere-ipfs/src/storage.rs b/rust/noosphere-ipfs/src/storage.rs index f79e9dc1f..19c7d45d3 100644 --- a/rust/noosphere-ipfs/src/storage.rs +++ b/rust/noosphere-ipfs/src/storage.rs @@ -3,7 +3,7 @@ use anyhow::Result; use async_trait::async_trait; use cid::Cid; use noosphere_common::ConditionalSync; -use noosphere_storage::{BlockStore, Storage}; +use noosphere_storage::{BlockStore, EphemeralStorage, EphemeralStore, Storage}; use std::sync::Arc; use tokio::sync::RwLock; @@ -45,7 +45,6 @@ where C: IpfsClient + ConditionalSync, { type BlockStore = IpfsStore; - type KeyValueStore = S::KeyValueStore; async fn get_block_store(&self, name: &str) -> Result { @@ -58,6 +57,20 @@ where } } +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl EphemeralStorage for IpfsStorage +where + S: Storage + EphemeralStorage + ConditionalSync, + C: IpfsClient + ConditionalSync, +{ + type EphemeralStoreType = ::EphemeralStoreType; + + async fn get_ephemeral_store(&self) -> Result> { + self.local_storage.get_ephemeral_store().await + } +} + /// An implementation of [BlockStore] that wraps some other implementation of /// same. It forwards most behavior to its wrapped implementation, except when /// reading blocks. In that case, if a block cannot be found locally, it will diff --git a/rust/noosphere-ns/src/builder.rs b/rust/noosphere-ns/src/builder.rs index 9c9ab72e2..7288d34a3 100644 --- a/rust/noosphere-ns/src/builder.rs +++ b/rust/noosphere-ns/src/builder.rs @@ -23,7 +23,7 @@ use libp2p::kad::KademliaConfig; /// #[tokio::main(flavor = "multi_thread")] /// async fn main() { /// let key_material = generate_ed25519_key(); -/// let store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); +/// let store = SphereDb::new(MemoryStorage::default()).await.unwrap(); /// /// let ns = NameSystemBuilder::default() /// .ucan_store(store) @@ -190,7 +190,7 @@ mod tests { let keypair = key_material.to_dht_keypair()?; PeerId::from(keypair.public()) }; - let store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let store = SphereDb::new(MemoryStorage::default()).await.unwrap(); let bootstrap_peers: Vec = vec![ "/ip4/127.0.0.50/tcp/33333/p2p/12D3KooWH8WgH9mgbMXrKX4veokUznvEn6Ycwg4qaGNi83nLkoUK" .parse()?, diff --git a/rust/noosphere-ns/src/dht_client.rs b/rust/noosphere-ns/src/dht_client.rs index 9dcd0729a..4aa585671 100644 --- a/rust/noosphere-ns/src/dht_client.rs +++ b/rust/noosphere-ns/src/dht_client.rs @@ -133,7 +133,7 @@ pub mod test { // Now test another node connecting. let (_other_ns, other_peer_id) = { let key_material = generate_ed25519_key(); - let store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let store = SphereDb::new(MemoryStorage::default()).await.unwrap(); let ns = NameSystemBuilder::default() .ucan_store(store) .key_material(&key_material) diff --git a/rust/noosphere-ns/src/name_system.rs b/rust/noosphere-ns/src/name_system.rs index 6e9f8e4f2..e1a0b11bc 100644 --- a/rust/noosphere-ns/src/name_system.rs +++ b/rust/noosphere-ns/src/name_system.rs @@ -172,7 +172,7 @@ mod test { async fn before_name_resolver_tests() -> Result { let ns = { let key_material = generate_ed25519_key(); - let store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let store = SphereDb::new(MemoryStorage::default()).await.unwrap(); let ns = NameSystemBuilder::default() .ucan_store(store) .key_material(&key_material) @@ -205,7 +205,7 @@ mod test { async fn before_each() -> Result<(DataPlaceholder, Arc>)> { let (bootstrap, bootstrap_address) = { let key_material = generate_ed25519_key(); - let store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let store = SphereDb::new(MemoryStorage::default()).await.unwrap(); let ns = NameSystemBuilder::default() .ucan_store(store) .key_material(&key_material) @@ -221,7 +221,7 @@ mod test { let ns = { let key_material = generate_ed25519_key(); - let store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let store = SphereDb::new(MemoryStorage::default()).await.unwrap(); let ns = NameSystemBuilder::default() .ucan_store(store) .key_material(&key_material) diff --git a/rust/noosphere-ns/src/server/client.rs b/rust/noosphere-ns/src/server/client.rs index 4cc703f34..519b76085 100644 --- a/rust/noosphere-ns/src/server/client.rs +++ b/rust/noosphere-ns/src/server/client.rs @@ -146,7 +146,7 @@ mod test { async fn before_each() -> Result<(DataPlaceholder, Arc>)> { let (bootstrap, bootstrap_address) = { let key_material = generate_ed25519_key(); - let store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let store = SphereDb::new(MemoryStorage::default()).await.unwrap(); let ns = NameSystemBuilder::default() .ucan_store(store) .key_material(&key_material) @@ -164,7 +164,7 @@ mod test { let api_port = api_listener.local_addr().unwrap().port(); let api_url = Url::parse(&format!("http://127.0.0.1:{}", api_port))?; let key_material = generate_ed25519_key(); - let store = SphereDb::new(&MemoryStorage::default()).await.unwrap(); + let store = SphereDb::new(MemoryStorage::default()).await.unwrap(); let ns = NameSystemBuilder::default() .ucan_store(store) diff --git a/rust/noosphere-ns/tests/ns_test.rs b/rust/noosphere-ns/tests/ns_test.rs index c95fdb9ab..b612ae486 100644 --- a/rust/noosphere-ns/tests/ns_test.rs +++ b/rust/noosphere-ns/tests/ns_test.rs @@ -75,7 +75,7 @@ async fn test_name_system_peer_propagation() -> Result<()> { initialize_tracing(None); // Create two NameSystems, where `ns_1` is publishing for `sphere_1` // and `ns_2` is publishing for `sphere_2`. - let mut db = SphereDb::new(&MemoryStorage::default()).await?; + let mut db = SphereDb::new(MemoryStorage::default()).await?; let network = NameSystemNetwork::generate(3, Some(db.clone())).await?; let sphere_1_cid_1 = derive_cid::(b"00000000"); let sphere_1_cid_2 = derive_cid::(b"11111111"); @@ -172,7 +172,7 @@ async fn test_name_system_peer_propagation() -> Result<()> { #[tokio::test] async fn test_name_system_validation() -> Result<()> { initialize_tracing(None); - let mut db = SphereDb::new(&MemoryStorage::default()).await?; + let mut db = SphereDb::new(MemoryStorage::default()).await?; let network = NameSystemNetwork::generate(2, Some(db.clone())).await?; let ns_1 = network.get(1).unwrap(); diff --git a/rust/noosphere-storage/Cargo.toml b/rust/noosphere-storage/Cargo.toml index b47f66c7f..8fa7ba232 100644 --- a/rust/noosphere-storage/Cargo.toml +++ b/rust/noosphere-storage/Cargo.toml @@ -21,6 +21,7 @@ readme = "README.md" anyhow = { workspace = true } async-trait = "~0.1" async-stream = { workspace = true } +futures = { workspace = true } tokio-stream = { workspace = true } cid = { workspace = true } noosphere-common = { version = "0.1.2", path = "../noosphere-common" } @@ -28,22 +29,19 @@ tracing = "~0.1" ucan = { workspace = true } libipld-core = { workspace = true } libipld-cbor = { workspace = true } +rand = { workspace = true } serde = { workspace = true } base64 = "=0.21.4" url = { version = "^2" } [dev-dependencies] -witty-phrase-generator = "~0.2" wasm-bindgen-test = { workspace = true } -rand = { workspace = true } noosphere-core-dev = { path = "../noosphere-core", features = ["helpers"], package = "noosphere-core" } noosphere-common = { path = "../noosphere-common", features = ["helpers"] } instant = { workspace = true } -[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] -tempfile = { workspace = true } - [target.'cfg(not(target_arch = "wasm32"))'.dependencies] +tempfile = { workspace = true } sled = "~0.34" tokio = { workspace = true, features = ["full"] } rocksdb = { version = "0.21.0", optional = true } @@ -52,6 +50,7 @@ rocksdb = { version = "0.21.0", optional = true } tokio = { workspace = true, features = ["sync", "macros"] } wasm-bindgen = { workspace = true, features = ["serde-serialize"] } wasm-bindgen-futures = { workspace = true } +witty-phrase-generator = "~0.2" serde-wasm-bindgen = { workspace = true } js-sys = { workspace = true } rexie = { version = "~0.5" } diff --git a/rust/noosphere-storage/examples/bench/main.rs b/rust/noosphere-storage/examples/bench/main.rs index 36484568e..98041bfa9 100644 --- a/rust/noosphere-storage/examples/bench/main.rs +++ b/rust/noosphere-storage/examples/bench/main.rs @@ -179,7 +179,7 @@ impl BenchmarkStorage { } pub async fn sphere_db(&self) -> Result> { - SphereDb::new(&self.storage).await + SphereDb::new(self.storage.clone()).await } pub async fn as_stats(&mut self) -> Result { diff --git a/rust/noosphere-storage/examples/bench/performance.rs b/rust/noosphere-storage/examples/bench/performance.rs index 02a88f8ce..9de3942b8 100644 --- a/rust/noosphere-storage/examples/bench/performance.rs +++ b/rust/noosphere-storage/examples/bench/performance.rs @@ -1,7 +1,7 @@ use anyhow::{Error, Result}; use async_trait::async_trait; use instant::{Duration, Instant}; -use noosphere_storage::{Space, Storage, Store}; +use noosphere_storage::{EphemeralStorage, EphemeralStore, Space, Storage, Store}; use std::{collections::HashMap, sync::Arc}; use tokio::sync::Mutex; @@ -208,3 +208,16 @@ impl Store for PerformanceStore { result } } + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl EphemeralStorage for PerformanceStorage +where + S: Storage, +{ + type EphemeralStoreType = ::EphemeralStoreType; + + async fn get_ephemeral_store(&self) -> Result> { + self.storage.get_ephemeral_store().await + } +} diff --git a/rust/noosphere-storage/src/db.rs b/rust/noosphere-storage/src/db.rs index da62003f1..733279af5 100644 --- a/rust/noosphere-storage/src/db.rs +++ b/rust/noosphere-storage/src/db.rs @@ -14,7 +14,7 @@ use std::{collections::BTreeSet, fmt::Debug}; use tokio_stream::Stream; use ucan::store::UcanStore; -use crate::{BlockStore, KeyValueStore, MemoryStore, Storage}; +use crate::{BlockStore, EphemeralStorage, EphemeralStore, KeyValueStore, MemoryStore, Storage}; use async_stream::try_stream; @@ -22,9 +22,15 @@ pub const BLOCK_STORE: &str = "blocks"; pub const LINK_STORE: &str = "links"; pub const VERSION_STORE: &str = "versions"; pub const METADATA_STORE: &str = "metadata"; +pub const EPHEMERAL_STORE: &str = "ephemeral"; -pub const SPHERE_DB_STORE_NAMES: &[&str] = - &[BLOCK_STORE, LINK_STORE, VERSION_STORE, METADATA_STORE]; +pub const SPHERE_DB_STORE_NAMES: &[&str] = &[ + BLOCK_STORE, + LINK_STORE, + VERSION_STORE, + METADATA_STORE, + EPHEMERAL_STORE, +]; /// A [SphereDb] is a high-level storage primitive for Noosphere's APIs. It /// takes a [Storage] and implements [BlockStore] and [KeyValueStore], @@ -40,18 +46,20 @@ where link_store: S::KeyValueStore, version_store: S::KeyValueStore, metadata_store: S::KeyValueStore, + storage: S, } impl SphereDb where S: Storage, { - pub async fn new(storage: &S) -> Result> { + pub async fn new(storage: S) -> Result> { Ok(SphereDb { block_store: storage.get_block_store(BLOCK_STORE).await?, link_store: storage.get_key_value_store(LINK_STORE).await?, version_store: storage.get_key_value_store(VERSION_STORE).await?, metadata_store: storage.get_key_value_store(METADATA_STORE).await?, + storage, }) } @@ -275,6 +283,19 @@ where } } +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl EphemeralStorage for SphereDb +where + S: Storage, +{ + type EphemeralStoreType = ::EphemeralStoreType; + + async fn get_ephemeral_store(&self) -> Result> { + self.storage.get_ephemeral_store().await + } +} + #[cfg(test)] mod tests { @@ -295,7 +316,7 @@ mod tests { #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] pub async fn it_stores_links_when_a_block_is_saved() { let storage_provider = MemoryStorage::default(); - let mut db = SphereDb::new(&storage_provider).await.unwrap(); + let mut db = SphereDb::new(storage_provider).await.unwrap(); let list1 = vec!["cats", "dogs", "pigeons"]; let list2 = vec!["apples", "oranges", "starfruit"]; @@ -316,7 +337,7 @@ mod tests { #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] pub async fn it_can_stream_all_blocks_in_a_dag() { let storage_provider = MemoryStorage::default(); - let mut db = SphereDb::new(&storage_provider).await.unwrap(); + let mut db = SphereDb::new(storage_provider).await.unwrap(); let list1 = vec!["cats", "dogs", "pigeons"]; let list2 = vec!["apples", "oranges", "starfruit"]; @@ -351,7 +372,7 @@ mod tests { #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] pub async fn it_can_put_a_raw_block_and_read_it_as_a_token() { let storage_provider = MemoryStorage::default(); - let mut db = SphereDb::new(&storage_provider).await.unwrap(); + let mut db = SphereDb::new(storage_provider).await.unwrap(); let (cid, block) = block_encode::(&Ipld::Bytes(b"foobar".to_vec())).unwrap(); diff --git a/rust/noosphere-storage/src/ephemeral.rs b/rust/noosphere-storage/src/ephemeral.rs new file mode 100644 index 000000000..cf7e81ffa --- /dev/null +++ b/rust/noosphere-storage/src/ephemeral.rs @@ -0,0 +1,294 @@ +use crate::{KeyValueStore, Store}; +use anyhow::{anyhow, Result}; +use async_stream::try_stream; +use async_trait::async_trait; +use noosphere_common::ConditionalSync; +use std::sync::Arc; +use tokio::sync::{Mutex, OnceCell}; + +/// Provides an [EphemeralStore] that does not persist after dropping. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait EphemeralStorage: ConditionalSync { + type EphemeralStoreType: KeyValueStore + Disposable; + + async fn get_ephemeral_store(&self) -> Result>; +} + +/// A [Store] that can clear its data after dropping as an [EphemeralStore]. +/// +/// A [Disposable] store is only cleared when used as an [EphemeralStore]. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait Disposable: Store { + async fn dispose(&mut self) -> Result<()>; +} + +/// Wrapper [Store], ensuring underlying data does not persist. +/// +/// As [Store] must be [Sync] and [Clone], [EphemeralStore] clears +/// its data after all references have been dropped. +#[derive(Clone)] +pub struct EphemeralStore(OnceCell>>) +where + S: KeyValueStore + Disposable + 'static; + +impl EphemeralStore +where + S: KeyValueStore + Disposable + 'static, +{ + pub(crate) fn new(store: S) -> Self { + Self(OnceCell::from(Arc::new(Mutex::new(store)))) + } + + async fn store(&self) -> Result> { + Ok(self + .0 + .get() + .ok_or_else(|| anyhow!("Inner store not set."))? + .lock() + .await) + } +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Store for EphemeralStore +where + S: KeyValueStore + Disposable + 'static, +{ + async fn read(&self, key: &[u8]) -> Result>> { + let store = self.store().await?; + store.read(key).await + } + + async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result>> { + let mut store = self.store().await?; + store.write(key, bytes).await + } + + async fn remove(&mut self, key: &[u8]) -> Result>> { + let mut store = self.store().await?; + store.remove(key).await + } + + async fn flush(&self) -> Result<()> { + let store = self.store().await?; + Store::flush(std::ops::Deref::deref(&store)).await + } +} + +impl From for EphemeralStore +where + S: KeyValueStore + Disposable + 'static, +{ + fn from(value: S) -> Self { + EphemeralStore::new(value) + } +} + +impl crate::IterableStore for EphemeralStore +where + S: crate::IterableStore + KeyValueStore + Disposable + 'static, +{ + fn get_all_entries(&self) -> std::pin::Pin>> { + use tokio_stream::StreamExt; + Box::pin(try_stream! { + let store = self.store().await?; + let mut stream = store.get_all_entries(); + while let Some((key, value)) = stream.try_next().await? { + yield (key, value); + } + }) + } +} + +impl Drop for EphemeralStore +where + S: KeyValueStore + Disposable + 'static, +{ + fn drop(&mut self) { + if let Some(store_arc) = self.0.take() { + if let Some(store_mutex) = Arc::into_inner(store_arc) { + let mut store = store_mutex.into_inner(); + noosphere_common::spawn_no_wait(async move { + if let Err(e) = store.dispose().await { + error!("Error disposing EphemeralStore: {}", e); + } + }); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + EphemeralStorage, IterableStore, NonPersistentStorage, OpenStorage, + PreferredPlatformStorage, Storage, EPHEMERAL_STORE, + }; + use noosphere_core_dev::tracing::initialize_tracing; + use std::path::Path; + use tokio_stream::StreamExt; + + #[cfg(target_arch = "wasm32")] + use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure}; + #[cfg(target_arch = "wasm32")] + wasm_bindgen_test_configure!(run_in_browser); + + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + async fn it_clears_ephemeral_storage_on_store_drop() -> Result<()> { + initialize_tracing(None); + + let storage = NonPersistentStorage::::new().await?; + for _ in 0..3 { + let mut ephemeral_store = storage.get_ephemeral_store().await?; + for n in 0..10 { + ephemeral_store + .write(format!("{}", n).as_ref(), &vec![2; 100]) + .await?; + } + } + + // Wait for destructors to complete asynchronously. + noosphere_common::helpers::wait(1).await; + + #[cfg(all(not(target_arch = "wasm32"), not(feature = "rocksdb")))] + { + // Sled's storage space still grows on a new DB when removing trees. + // Otherwise we could also test deletion via [crate::Space]. + let db = storage.inner(); + assert_eq!(db.tree_names().len(), 1, "only default tree persists."); + } + + { + // Ensure there's no extra data in the ephemeral space + // (IndexedDbStorage, RocksDbStorage) + let store = storage.get_key_value_store(EPHEMERAL_STORE).await?; + let mut stream = store.get_all_entries(); + assert!( + stream.try_next().await?.is_none(), + "ephemeral store should have no entries." + ); + } + Ok(()) + } + + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + async fn test_ephemeral_store_isolation() -> Result<()> { + initialize_tracing(None); + + let storage = NonPersistentStorage::::new().await?; + + let mut ephemeral_stores: Vec< + EphemeralStore<::EphemeralStoreType>, + > = vec![]; + for _ in 0..2 { + let mut ephemeral_store = storage.get_ephemeral_store().await?; + for n in 0..10 { + ephemeral_store + .write(format!("{}", n).as_ref(), &vec![1; 100]) + .await?; + } + + { + let mut stream = ephemeral_store.get_all_entries(); + let mut entries = vec![]; + while let Some(entry) = stream.try_next().await? { + entries.push(entry); + } + assert_eq!( + entries.len(), + 10, + "get_all_entries() should be scoped to this store." + ); + } + ephemeral_stores.push(ephemeral_store); + } + + ephemeral_stores.pop(); + let store = ephemeral_stores.pop().unwrap(); + assert_eq!( + store.read(format!("0").as_ref()).await?.unwrap(), + vec![1; 100], + "ephemeral stores can be dropped without affecting other ephemeral stores." + ); + Ok(()) + } + + /// Circumvent using [EphemeralStore] to ensure ephemeral data is wiped + /// on storage drop/init in the event [EphemeralStore] cleanups do not occur. + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + async fn it_clears_ephemeral_storage_on_startup() -> Result<()> { + #[cfg(target_arch = "wasm32")] + let db_path: String = witty_phrase_generator::WPGen::new() + .with_words(3) + .unwrap() + .into_iter() + .map(|word| String::from(word)) + .collect(); + + #[cfg(not(target_arch = "wasm32"))] + let _temp_dir = tempfile::TempDir::new()?; + #[cfg(not(target_arch = "wasm32"))] + let db_path = _temp_dir.path(); + + #[cfg(any(target_arch = "wasm32", feature = "rocksdb"))] + let result = test_partition_based_ephemeral_storage_startup(db_path).await?; + #[cfg(all(not(target_arch = "wasm32"), not(feature = "rocksdb")))] + let result = test_tree_based_ephemeral_storage_startup(db_path).await?; + + Ok(result) + } + + /// IndexedDbStorage and RocksDbStorage write to the shared ephemeral space. + /// Write directly to the ephemeral store to ensure it is cleaned up + /// on storage drop/init. + #[cfg(any(target_arch = "wasm32", feature = "rocksdb"))] + async fn test_partition_based_ephemeral_storage_startup>( + db_path: P, + ) -> Result<()> { + { + let storage = PreferredPlatformStorage::open(db_path.as_ref()).await?; + let mut store = storage.get_key_value_store(EPHEMERAL_STORE).await?; + store.write(&[0], &[11]).await?; + assert_eq!(store.read(&vec![0]).await?, Some(vec![11])); + } + + let storage = PreferredPlatformStorage::open(db_path.as_ref()).await?; + let store = storage.get_key_value_store(EPHEMERAL_STORE).await?; + let mut stream = store.get_all_entries(); + assert!( + stream.try_next().await?.is_none(), + "ephemeral store should have no entries." + ); + Ok(()) + } + + // SledStorage uses ephemeral trees. Ensure the trees do not exist after + // reinitialization. + #[cfg(all(not(target_arch = "wasm32"), not(feature = "rocksdb")))] + async fn test_tree_based_ephemeral_storage_startup>(db_path: P) -> Result<()> { + let mut store_name = String::from(crate::implementation::EPHEMERAL_SLED_PREFIX); + store_name.push_str("-1234567890"); + { + let storage = PreferredPlatformStorage::open(db_path.as_ref()).await?; + let mut store = storage.get_key_value_store(&store_name).await?; + store.write(&[1], &[11]).await?; + assert_eq!(store.read(&vec![1]).await?, Some(vec![11])); + } + + let storage = PreferredPlatformStorage::open(db_path.as_ref()).await?; + let store = storage.get_key_value_store(&store_name).await?; + let mut stream = store.get_all_entries(); + assert!( + stream.try_next().await?.is_none(), + "ephemeral store should have no entries." + ); + Ok(()) + } +} diff --git a/rust/noosphere-storage/src/implementation/indexed_db.rs b/rust/noosphere-storage/src/implementation/indexed_db.rs index dfec3e215..ade51860b 100644 --- a/rust/noosphere-storage/src/implementation/indexed_db.rs +++ b/rust/noosphere-storage/src/implementation/indexed_db.rs @@ -1,16 +1,24 @@ use crate::store::Store; -use crate::{db::SPHERE_DB_STORE_NAMES, storage::Storage}; -use anyhow::{anyhow, Error, Result}; +use crate::{ + db::{EPHEMERAL_STORE, SPHERE_DB_STORE_NAMES}, + storage::Storage, + PartitionedStore, +}; +use anyhow::{anyhow, Result}; +use async_stream::try_stream; use async_trait::async_trait; use js_sys::Uint8Array; +use noosphere_common::ConditionalSend; use rexie::{ KeyRange, ObjectStore, Rexie, RexieBuilder, Store as IdbStore, Transaction, TransactionMode, }; use serde::{Deserialize, Serialize}; -use std::{fmt::Debug, rc::Rc}; +use std::{fmt::Debug, path::Path, rc::Rc}; use wasm_bindgen::{JsCast, JsValue}; -pub const INDEXEDDB_STORAGE_VERSION: u32 = 1; +use js_utils::*; + +pub const INDEXEDDB_STORAGE_VERSION: u32 = 2; #[derive(Clone)] pub struct IndexedDbStorage { @@ -28,7 +36,10 @@ impl Debug for IndexedDbStorage { impl IndexedDbStorage { pub async fn new(db_name: &str) -> Result { - Self::configure(INDEXEDDB_STORAGE_VERSION, db_name, SPHERE_DB_STORE_NAMES).await + let storage = + Self::configure(INDEXEDDB_STORAGE_VERSION, db_name, SPHERE_DB_STORE_NAMES).await?; + storage.clear_ephemeral().await?; + Ok(storage) } async fn configure(version: u32, db_name: &str, store_names: &[&str]) -> Result { @@ -76,6 +87,12 @@ impl IndexedDbStorage { .await .map_err(|error| anyhow!("{:?}", error)) } + + /// Wipes the "ephemeral" column family. + async fn clear_ephemeral(&self) -> Result<()> { + let ephemeral_store = self.get_store(EPHEMERAL_STORE).await?; + ephemeral_store.clear().await + } } #[async_trait(?Send)] @@ -93,6 +110,31 @@ impl Storage for IndexedDbStorage { } } +#[async_trait(?Send)] +impl crate::ops::OpenStorage for IndexedDbStorage { + async fn open + ConditionalSend>(path: P) -> Result { + IndexedDbStorage::new( + path.as_ref() + .to_str() + .ok_or_else(|| anyhow!("Could not stringify path."))?, + ) + .await + } +} + +#[async_trait(?Send)] +impl crate::EphemeralStorage for IndexedDbStorage { + type EphemeralStoreType = EphemeralIndexedDbStore; + + async fn get_ephemeral_store(&self) -> Result> { + let store = PartitionedStore::new(IndexedDbStore { + db: self.db.clone(), + store_name: EPHEMERAL_STORE.to_owned(), + }); + Ok(crate::EphemeralIndexedDbStore::new(store).into()) + } +} + #[derive(Clone)] pub struct IndexedDbStore { db: Rc, @@ -100,7 +142,10 @@ pub struct IndexedDbStore { } impl IndexedDbStore { - fn start_transaction(&self, mode: TransactionMode) -> Result<(IdbStore, Transaction)> { + pub(crate) fn start_transaction( + &self, + mode: TransactionMode, + ) -> Result<(IdbStore, Transaction)> { let tx = self .db .transaction(&[&self.store_name], mode) @@ -112,15 +157,36 @@ impl IndexedDbStore { Ok((store, tx)) } - async fn finish_transaction(tx: Transaction) -> Result<()> { + pub(crate) async fn finish_transaction(tx: Transaction) -> Result<()> { tx.done().await.map_err(|error| anyhow!("{:?}", error))?; Ok(()) } - fn bytes_to_typed_array(bytes: &[u8]) -> Result { - let array = Uint8Array::new_with_length(bytes.len() as u32); - array.copy_from(&bytes); - Ok(JsValue::from(array)) + async fn clear(&self) -> Result<()> { + let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?; + store + .clear() + .await + .map_err(|error| anyhow!("{:?}", error))?; + IndexedDbStore::finish_transaction(tx).await?; + Ok(()) + } + + async fn remove_range(&self, from: &[u8], to: &[u8]) -> Result<()> { + let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?; + + let lower = bytes_to_typed_array(from)?; + let upper = bytes_to_typed_array(to)?; + let key_range = + KeyRange::bound(&lower, &upper, false, false).map_err(|e| anyhow!("{:?}", e))?; + + store + .delete(key_range.as_ref()) + .await + .map_err(|error| anyhow!("{:?}", error))?; + + IndexedDbStore::finish_transaction(tx).await?; + Ok(()) } async fn contains(key: &JsValue, store: &IdbStore) -> Result { @@ -153,7 +219,7 @@ impl IndexedDbStore { impl Store for IndexedDbStore { async fn read(&self, key: &[u8]) -> Result>> { let (store, tx) = self.start_transaction(TransactionMode::ReadOnly)?; - let key = IndexedDbStore::bytes_to_typed_array(key)?; + let key = bytes_to_typed_array(key)?; let maybe_dag = IndexedDbStore::read(&key, &store).await?; @@ -165,8 +231,8 @@ impl Store for IndexedDbStore { async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result>> { let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?; - let key = IndexedDbStore::bytes_to_typed_array(key)?; - let value = IndexedDbStore::bytes_to_typed_array(bytes)?; + let key = bytes_to_typed_array(key)?; + let value = bytes_to_typed_array(bytes)?; let old_bytes = IndexedDbStore::read(&key, &store).await?; @@ -183,7 +249,7 @@ impl Store for IndexedDbStore { async fn remove(&mut self, key: &[u8]) -> Result>> { let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?; - let key = IndexedDbStore::bytes_to_typed_array(key)?; + let key = bytes_to_typed_array(key)?; let old_value = IndexedDbStore::read(&key, &store).await?; @@ -198,28 +264,78 @@ impl Store for IndexedDbStore { } } -struct JsError(Error); +impl crate::IterableStore for IndexedDbStore { + fn get_all_entries(&self) -> std::pin::Pin>> { + Box::pin(try_stream! { + let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?; + let limit = 100; + let mut offset = 0; + loop { + let results = store.get_all(None, Some(limit), Some(offset), None).await + .map_err(|error| anyhow!("{:?}", error))?; + let count = results.len(); + if count == 0 { + IndexedDbStore::finish_transaction(tx).await?; + break; + } + + offset += count as u32; + + for (key_js, value_js) in results { + yield ( + typed_array_to_bytes(JsValue::from(Uint8Array::new(&key_js)))?, + typed_array_to_bytes(value_js)? + ); + } + } + }) + } +} -impl From for JsError { - fn from(value: JsValue) -> JsError { - if let Ok(js_string) = js_sys::JSON::stringify(&value) { - JsError(anyhow!("{}", js_string.as_string().unwrap())) - } else { - JsError(anyhow!("Could not parse JsValue error as string.")) - } +/// A [IndexedDbStore] that does not persist data after dropping. +/// Can be created from [IndexedDbStorage::get_ephemeral_store]. +#[derive(Clone)] +pub struct EphemeralIndexedDbStore { + store: PartitionedStore, +} + +impl EphemeralIndexedDbStore { + pub(crate) fn new(store: PartitionedStore) -> Self { + EphemeralIndexedDbStore { store } + } +} + +#[async_trait(?Send)] +impl Store for EphemeralIndexedDbStore { + async fn read(&self, key: &[u8]) -> Result>> { + self.store.read(key).await + } + + async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result>> { + self.store.write(key, bytes).await + } + + async fn remove(&mut self, key: &[u8]) -> Result>> { + self.store.remove(key).await + } + + async fn flush(&self) -> Result<()> { + self.store.flush().await } } -impl From for JsError { - fn from(value: serde_wasm_bindgen::Error) -> JsError { - let js_value: JsValue = value.into(); - js_value.into() +#[async_trait(?Send)] +impl crate::Disposable for EphemeralIndexedDbStore { + async fn dispose(&mut self) -> Result<()> { + let (start_key, end_key) = self.store.get_key_range(); + self.store.inner().remove_range(start_key, end_key).await } } -impl From for Error { - fn from(value: JsError) -> Self { - value.0 +#[async_trait(?Send)] +impl crate::IterableStore for EphemeralIndexedDbStore { + fn get_all_entries(&self) -> std::pin::Pin>> { + self.store.get_all_entries() } } @@ -265,3 +381,119 @@ impl crate::Space for IndexedDbStorage { } } } + +mod js_utils { + use anyhow::{anyhow, Error, Result}; + use js_sys::Uint8Array; + use wasm_bindgen::{JsCast, JsValue}; + + pub struct JsError(Error); + + impl From for JsError { + fn from(value: JsValue) -> JsError { + if let Ok(js_string) = js_sys::JSON::stringify(&value) { + JsError(anyhow!("{}", js_string.as_string().unwrap())) + } else { + JsError(anyhow!("Could not parse JsValue error as string.")) + } + } + } + + impl From for JsError { + fn from(value: serde_wasm_bindgen::Error) -> JsError { + let js_value: JsValue = value.into(); + js_value.into() + } + } + + impl From for Error { + fn from(value: JsError) -> Self { + value.0 + } + } + + pub fn bytes_to_typed_array(bytes: &[u8]) -> Result { + let array = Uint8Array::new_with_length(bytes.len() as u32); + array.copy_from(&bytes); + Ok(JsValue::from(array)) + } + + pub fn typed_array_to_bytes(js_value: JsValue) -> Result> { + Ok(js_value + .dyn_into::() + .map_err(|error| anyhow!("{:?}", error))? + .to_vec()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{key_value::KeyValueStore, LINK_STORE}; + use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure}; + wasm_bindgen_test_configure!(run_in_browser); + + #[derive(Clone)] + pub struct IndexedDbStorageV1 { + db: Rc, + } + + impl IndexedDbStorageV1 { + pub async fn new(db_name: &str) -> Result { + const V1_STORES: [&str; 4] = ["blocks", "links", "versions", "metadata"]; + let mut builder = RexieBuilder::new(db_name).version(1); + + for name in V1_STORES { + builder = builder.add_object_store(ObjectStore::new(name).auto_increment(false)); + } + + let db = builder + .build() + .await + .map_err(|error| anyhow!("{:?}", error))?; + + Ok(IndexedDbStorageV1 { db: Rc::new(db) }) + } + + async fn get_store(&self, name: &str) -> Result { + if self + .db + .store_names() + .iter() + .find(|val| val.as_str() == name) + .is_none() + { + return Err(anyhow!("No such store named {}", name)); + } + + Ok(IndexedDbStore { + db: self.db.clone(), + store_name: name.to_string(), + }) + } + } + + /* + /// Triggering a migration from version upgrade can take several + /// seconds (8s on average, can be as low as 2 seconds, or as high as 30s). + #[wasm_bindgen_test] + async fn it_can_upgrade_from_v1() -> Result<()> { + // noosphere_core_dev::tracing::initialize_tracing(None); + let db_name = format!("{}_v1_test", rand::random::()); + let key = String::from("foo"); + let value = String::from("bar"); + { + let storage_v1 = IndexedDbStorageV1::new(&db_name).await?; + let mut store_v1 = storage_v1.get_store(LINK_STORE).await?; + store_v1.set_key(&key, &value).await?; + } + + let start = instant::Instant::now(); + let storage_v2 = IndexedDbStorage::new(&db_name).await?; + info!("Store migrated (t={}ms)", start.elapsed().as_millis()); + let store_v2 = storage_v2.get_store(LINK_STORE).await?; + assert_eq!(store_v2.get_key::<_, String>(&key).await?.unwrap(), value); + Ok(()) + } + */ +} diff --git a/rust/noosphere-storage/src/implementation/memory.rs b/rust/noosphere-storage/src/implementation/memory.rs index 8fd92e692..0361a33c2 100644 --- a/rust/noosphere-storage/src/implementation/memory.rs +++ b/rust/noosphere-storage/src/implementation/memory.rs @@ -131,7 +131,24 @@ impl Store for MemoryStore { } } -#[cfg(feature = "performance")] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl crate::EphemeralStorage for MemoryStorage { + type EphemeralStoreType = MemoryStore; + + async fn get_ephemeral_store(&self) -> Result> { + Ok(MemoryStore::default().into()) + } +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl crate::Disposable for MemoryStore { + async fn dispose(&mut self) -> Result<()> { + Ok(()) + } +} + #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl crate::Space for MemoryStorage { diff --git a/rust/noosphere-storage/src/implementation/rocks_db.rs b/rust/noosphere-storage/src/implementation/rocks_db.rs index d5441e875..2d88307f9 100644 --- a/rust/noosphere-storage/src/implementation/rocks_db.rs +++ b/rust/noosphere-storage/src/implementation/rocks_db.rs @@ -1,10 +1,12 @@ use crate::{ - storage::Storage, store::Store, ConfigurableStorage, StorageConfig, SPHERE_DB_STORE_NAMES, + storage::Storage, store::Store, ConfigurableStorage, PartitionedStore, StorageConfig, + EPHEMERAL_STORE, SPHERE_DB_STORE_NAMES, }; use anyhow::{anyhow, Result}; +use async_stream::try_stream; use async_trait::async_trait; use noosphere_common::ConditionalSend; -use rocksdb::{ColumnFamilyDescriptor, DBWithThreadMode, Options}; +use rocksdb::{ColumnFamilyDescriptor, DBWithThreadMode, IteratorMode, Options}; use std::{ path::{Path, PathBuf}, sync::Arc, @@ -60,22 +62,27 @@ impl RocksDbStorage { Arc::new(DbInner::open_cf_descriptors(&db_opts, path, cfs)?) }; - Ok(RocksDbStorage { + let storage = RocksDbStorage { db, debug_data: Arc::new((db_path, storage_config)), - }) + }; + storage.clear_ephemeral()?; + Ok(storage) } - async fn get_store(&self, name: &str) -> Result { - if SPHERE_DB_STORE_NAMES - .iter() - .find(|val| **val == name) - .is_none() - { - return Err(anyhow!("No such store named {}", name)); + fn get_store(&self, name: &str) -> Result { + for store_name in SPHERE_DB_STORE_NAMES { + if name == *store_name { + return RocksDbStore::new(self.db.clone(), String::from(name)); + } } + return Err(anyhow!("No such store named {}", name)); + } - RocksDbStore::new(self.db.clone(), name.to_owned()) + /// Wipes the "ephemeral" column family. + fn clear_ephemeral(&self) -> Result<()> { + let ephemeral_store = self.get_store(EPHEMERAL_STORE)?; + ephemeral_store.remove_range(&[0], &[u8::MAX]) } } @@ -85,11 +92,11 @@ impl Storage for RocksDbStorage { type KeyValueStore = RocksDbStore; async fn get_block_store(&self, name: &str) -> Result { - self.get_store(name).await + self.get_store(name) } async fn get_key_value_store(&self, name: &str) -> Result { - self.get_store(name).await + self.get_store(name) } } @@ -112,15 +119,50 @@ impl std::fmt::Debug for RocksDbStorage { } } +#[async_trait] +impl crate::EphemeralStorage for RocksDbStorage { + type EphemeralStoreType = EphemeralRocksDbStore; + + async fn get_ephemeral_store(&self) -> Result> { + let inner = self.get_store(crate::EPHEMERAL_STORE)?; + let mapped = crate::PartitionedStore::new(inner); + Ok(EphemeralRocksDbStore::new(mapped).into()) + } +} + +#[async_trait] +impl crate::OpenStorage for RocksDbStorage { + async fn open + ConditionalSend>(path: P) -> Result { + RocksDbStorage::new(path) + } +} + +#[async_trait] +impl crate::Space for RocksDbStorage { + async fn get_space_usage(&self) -> Result { + crate::get_dir_size(&self.debug_data.0).await + } +} + #[derive(Clone)] pub struct RocksDbStore { - name: String, + name: Arc, db: Arc, } impl RocksDbStore { - pub fn new(db: Arc, name: String) -> Result { - Ok(RocksDbStore { db, name }) + pub(crate) fn new(db: Arc, name: String) -> Result { + Ok(RocksDbStore { + db, + name: Arc::from(name), + }) + } + + fn remove_range(&self, from: &[u8], to: &[u8]) -> Result<()> { + let cf = self.cf_handle()?; + #[cfg(feature = "rocksdb-multi-thread")] + let cf = &cf; + self.db.delete_range_cf(cf, from, to).map_err(|e| e.into()) } /// Returns the column family handle. Unfortunately generated on every call @@ -169,8 +211,70 @@ impl Store for RocksDbStore { } #[async_trait] -impl crate::Space for RocksDbStorage { - async fn get_space_usage(&self) -> Result { - crate::get_dir_size(&self.debug_data.0).await +impl crate::IterableStore for RocksDbStore { + fn get_all_entries(&self) -> std::pin::Pin>> { + // handle is not Sync; generate the iterator before + // async stream work. + let cf_option = self.db.cf_handle(&self.name); + let iter = if let Some(cf) = cf_option { + #[cfg(feature = "rocksdb-multi-thread")] + let cf = &cf; + Some(self.db.iterator_cf(cf, IteratorMode::Start)) + } else { + None + }; + Box::pin(try_stream! { + let iter = iter.ok_or_else(|| anyhow!("Could not get cf handle."))?; + for entry in iter { + let (key, value) = entry?; + yield (Vec::from(key.as_ref()), Vec::from(value.as_ref())); + } + }) + } +} + +/// A [RocksDbStore] that does not persist data after dropping. +/// Can be created from [IndexedDbStorage::get_ephemeral_store]. +#[derive(Clone)] +pub struct EphemeralRocksDbStore { + store: PartitionedStore, +} + +impl EphemeralRocksDbStore { + pub(crate) fn new(store: PartitionedStore) -> Self { + EphemeralRocksDbStore { store } + } +} + +#[async_trait] +impl Store for EphemeralRocksDbStore { + async fn read(&self, key: &[u8]) -> Result>> { + self.store.read(key).await + } + + async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result>> { + self.store.write(key, bytes).await + } + + async fn remove(&mut self, key: &[u8]) -> Result>> { + self.store.remove(key).await + } + + async fn flush(&self) -> Result<()> { + self.store.flush().await + } +} + +#[async_trait] +impl crate::Disposable for EphemeralRocksDbStore { + async fn dispose(&mut self) -> Result<()> { + let (start_key, end_key) = self.store.get_key_range(); + self.store.inner().remove_range(start_key, end_key) + } +} + +impl crate::IterableStore for EphemeralRocksDbStore { + fn get_all_entries(&self) -> std::pin::Pin>> { + self.store.get_all_entries() } } diff --git a/rust/noosphere-storage/src/implementation/sled.rs b/rust/noosphere-storage/src/implementation/sled.rs index 4418fb928..02c70ba11 100644 --- a/rust/noosphere-storage/src/implementation/sled.rs +++ b/rust/noosphere-storage/src/implementation/sled.rs @@ -1,14 +1,15 @@ -use std::path::{Path, PathBuf}; -use std::sync::Arc; - use crate::store::Store; use crate::StorageConfig; use crate::{storage::Storage, ConfigurableStorage}; - -use anyhow::Result; +use anyhow::{anyhow, Result}; +use async_stream::try_stream; use async_trait::async_trait; use noosphere_common::ConditionalSend; use sled::{Db, Tree}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +pub(crate) const EPHEMERAL_SLED_PREFIX: &str = "EPHEMERAL-SLED-STORAGE"; #[derive(Clone)] pub struct SledStorage { @@ -35,12 +36,37 @@ impl SledStorage { let db = sled_config.open()?; let debug_data = Arc::new((db_path, config)); - Ok(SledStorage { db, debug_data }) + let storage = SledStorage { db, debug_data }; + storage.clear_ephemeral()?; + Ok(storage) } async fn get_store(&self, name: &str) -> Result { Ok(SledStore::new(&self.db.open_tree(name)?)) } + + #[cfg(test)] + #[allow(unused)] + pub(crate) fn inner(&self) -> &Db { + &self.db + } + + /// Wipes all "ephemeral" trees. + fn clear_ephemeral(&self) -> Result<()> { + for name in self.db.tree_names() { + let tree_name = String::from_utf8(Vec::from(name.as_ref()))?; + if tree_name.starts_with(EPHEMERAL_SLED_PREFIX) { + match self.db.drop_tree(tree_name.as_bytes())? { + true => continue, + false => { + warn!("Could not drop ephemeral tree {}", tree_name); + continue; + } + } + } + } + Ok(()) + } } #[async_trait] @@ -77,13 +103,42 @@ impl std::fmt::Debug for SledStorage { } } +impl Drop for SledStorage { + fn drop(&mut self) { + let _ = self.db.flush(); + } +} + +#[async_trait] +impl crate::EphemeralStorage for SledStorage { + type EphemeralStoreType = EphemeralSledStore; + + async fn get_ephemeral_store(&self) -> Result> { + Ok(EphemeralSledStore::new(self.db.clone())?.into()) + } +} + +#[async_trait] +impl crate::OpenStorage for SledStorage { + async fn open + ConditionalSend>(path: P) -> Result { + SledStorage::new(path) + } +} + +#[async_trait] +impl crate::Space for SledStorage { + async fn get_space_usage(&self) -> Result { + self.db.size_on_disk().map_err(|e| e.into()) + } +} + #[derive(Clone)] pub struct SledStore { db: Tree, } impl SledStore { - pub fn new(db: &Tree) -> Self { + pub(crate) fn new(db: &Tree) -> Self { SledStore { db: db.clone() } } } @@ -121,15 +176,68 @@ impl Store for SledStore { } } -impl Drop for SledStorage { - fn drop(&mut self) { - let _ = self.db.flush(); +/// A [SledStore] that does not persist data after dropping. +/// Can be created from [SledStorage]'s [crate::EphemeralStorage] implementation. +#[derive(Clone)] +pub struct EphemeralSledStore { + db: Db, + name: String, + store: SledStore, +} + +impl EphemeralSledStore { + pub(crate) fn new(db: Db) -> Result { + let name = format!("{}-{}", EPHEMERAL_SLED_PREFIX, rand::random::()); + let store = SledStore::new(&db.open_tree(&name)?); + Ok(EphemeralSledStore { db, store, name }) } } #[async_trait] -impl crate::Space for SledStorage { - async fn get_space_usage(&self) -> Result { - self.db.size_on_disk().map_err(|e| e.into()) +impl Store for EphemeralSledStore { + async fn read(&self, key: &[u8]) -> Result>> { + self.store.read(key).await + } + + async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result>> { + self.store.write(key, bytes).await + } + + async fn remove(&mut self, key: &[u8]) -> Result>> { + self.store.remove(key).await + } + + async fn flush(&self) -> Result<()> { + self.store.flush().await + } +} + +impl crate::IterableStore for SledStore { + fn get_all_entries(&self) -> std::pin::Pin>> { + Box::pin(try_stream! { + for entry in self.db.iter() { + let (key, value) = entry?; + yield (Vec::from(key.as_ref()), Vec::from(value.as_ref())); + } + }) + } +} + +#[async_trait] +impl crate::Disposable for EphemeralSledStore { + async fn dispose(&mut self) -> Result<()> { + self.db.drop_tree(&self.name).map_or_else( + |e| Err(e.into()), + |bool_state| match bool_state { + true => Ok(()), + false => Err(anyhow!("Could not clear temporary tree.")), + }, + ) + } +} + +impl crate::IterableStore for EphemeralSledStore { + fn get_all_entries(&self) -> std::pin::Pin>> { + self.store.get_all_entries() } } diff --git a/rust/noosphere-storage/src/implementation/tracking.rs b/rust/noosphere-storage/src/implementation/tracking.rs index ba690d755..32d24fb9e 100644 --- a/rust/noosphere-storage/src/implementation/tracking.rs +++ b/rust/noosphere-storage/src/implementation/tracking.rs @@ -103,3 +103,16 @@ impl Storage for TrackingStorage { Ok(key_value_store) } } + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl crate::EphemeralStorage for TrackingStorage +where + S: Storage + crate::EphemeralStorage, +{ + type EphemeralStoreType = ::EphemeralStoreType; + + async fn get_ephemeral_store(&self) -> Result> { + self.storage.get_ephemeral_store().await + } +} diff --git a/rust/noosphere-storage/src/lib.rs b/rust/noosphere-storage/src/lib.rs index d526447b7..de49140e6 100644 --- a/rust/noosphere-storage/src/lib.rs +++ b/rust/noosphere-storage/src/lib.rs @@ -10,9 +10,14 @@ mod block; mod config; mod db; mod encoding; +mod ephemeral; mod implementation; mod key_value; +mod non_persistent; +mod ops; +mod partitioned; mod retry; +mod space; mod storage; mod store; mod tap; @@ -23,22 +28,35 @@ pub use block::*; pub use config::*; pub use db::*; pub use encoding::*; +pub use ephemeral::*; pub use implementation::*; pub use key_value::*; +pub use non_persistent::*; +pub use ops::*; +pub use partitioned::*; pub use retry::*; +pub use space::*; pub use storage::*; pub use store::*; pub use tap::*; -mod space; -pub use space::*; - #[cfg(test)] -pub mod helpers; +mod inner { + #[cfg(all(not(target_arch = "wasm32"), not(feature = "rocksdb")))] + pub type PreferredPlatformStorage = crate::SledStorage; + #[cfg(all(not(target_arch = "wasm32"), feature = "rocksdb"))] + pub type PreferredPlatformStorage = crate::RocksDbStorage; + #[cfg(target_arch = "wasm32")] + pub type PreferredPlatformStorage = crate::IndexedDbStorage; +} +#[cfg(test)] +pub use inner::*; #[cfg(test)] mod tests { - use crate::{block::BlockStore, helpers::make_disposable_store}; + use crate::{ + block::BlockStore, NonPersistentStorage, PreferredPlatformStorage, Storage, BLOCK_STORE, + }; use libipld_cbor::DagCborCodec; #[cfg(target_arch = "wasm32")] @@ -48,13 +66,15 @@ mod tests { #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] - async fn it_can_store_and_retrieve_bytes() { - let mut storage = make_disposable_store().await.unwrap(); + async fn it_can_store_and_retrieve_bytes() -> anyhow::Result<()> { + let storage = NonPersistentStorage::::new().await?; + let mut store = storage.get_block_store(BLOCK_STORE).await?; let bytes = b"I love every kind of cat"; - let cid = storage.save::(bytes).await.unwrap(); - let retrieved = storage.load::>(&cid).await.unwrap(); + let cid = store.save::(bytes).await.unwrap(); + let retrieved = store.load::>(&cid).await.unwrap(); assert_eq!(retrieved, bytes); + Ok(()) } } diff --git a/rust/noosphere-storage/src/non_persistent.rs b/rust/noosphere-storage/src/non_persistent.rs new file mode 100644 index 000000000..a55f06842 --- /dev/null +++ b/rust/noosphere-storage/src/non_persistent.rs @@ -0,0 +1,88 @@ +use crate::{ops::OpenStorage, storage::Storage}; +use anyhow::Result; +use std::ops::{Deref, DerefMut}; + +#[cfg(doc)] +use crate::EphemeralStorage; +#[cfg(doc)] +use crate::MemoryStorage; + +/// [Storage] provider wrapper that does not persist after dropping. +/// +/// Whereas [EphemeralStorage] can provide a slice of a storage system +/// as non-persistent storage space, the entirety of [NonPersistentStorage] +/// is wiped after dropping. +/// +/// Currently, native builds create a temp dir syncing lifetimes, and web +/// builds use a randomly generated database name. +/// In the future, we may have web builds that use +/// a file-system backed Storage, or native builds that do not use +/// the file-system (currently the case with [MemoryStorage]), where +/// a more complex configuration is needed. Mostly used in tests. +pub struct NonPersistentStorage +where + S: Storage + OpenStorage, +{ + inner: S, + #[cfg(not(target_arch = "wasm32"))] + _temp_dir: tempfile::TempDir, +} + +impl NonPersistentStorage +where + S: Storage + OpenStorage, +{ + /// Create a new [NonPersistentStorage], wrapping a new [Storage] + /// that will be cleared after dropping. + pub async fn new() -> Result { + #[cfg(target_arch = "wasm32")] + let key: String = witty_phrase_generator::WPGen::new() + .with_words(3) + .unwrap() + .into_iter() + .map(|word| String::from(word)) + .collect(); + #[cfg(target_arch = "wasm32")] + let inner = S::open(&key).await?; + #[cfg(target_arch = "wasm32")] + let out = Self { inner }; + + #[cfg(not(target_arch = "wasm32"))] + let _temp_dir = tempfile::TempDir::new()?; + #[cfg(not(target_arch = "wasm32"))] + let inner = S::open(_temp_dir.path()).await?; + #[cfg(not(target_arch = "wasm32"))] + let out = Self { _temp_dir, inner }; + + Ok(out) + } +} + +impl Deref for NonPersistentStorage +where + S: Storage + OpenStorage, +{ + type Target = S; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for NonPersistentStorage +where + S: Storage + OpenStorage, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl AsRef for NonPersistentStorage +where + S: Storage + OpenStorage, +{ + fn as_ref(&self) -> &S { + &self.inner + } +} diff --git a/rust/noosphere-storage/src/ops.rs b/rust/noosphere-storage/src/ops.rs new file mode 100644 index 000000000..51e5e52f7 --- /dev/null +++ b/rust/noosphere-storage/src/ops.rs @@ -0,0 +1,13 @@ +use crate::storage::Storage; +use anyhow::Result; +use async_trait::async_trait; +use noosphere_common::ConditionalSend; +use std::path::Path; + +/// [Storage] that can be opened via [Path] reference. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait OpenStorage: Storage + Sized { + /// Open [Storage] at `path`. + async fn open + ConditionalSend>(path: P) -> Result; +} diff --git a/rust/noosphere-storage/src/partitioned.rs b/rust/noosphere-storage/src/partitioned.rs new file mode 100644 index 000000000..5e4448d76 --- /dev/null +++ b/rust/noosphere-storage/src/partitioned.rs @@ -0,0 +1,106 @@ +use crate::{Disposable, Store}; +use anyhow::Result; +use async_stream::try_stream; +use async_trait::async_trait; + +/// Maps all [Store] method calls with a key to a prefixed form. +#[derive(Clone)] +pub struct PartitionedStore +where + S: Store, +{ + store: S, + partition_key: Vec, + end_partition_key: Vec, +} + +impl PartitionedStore +where + S: Store, +{ + pub fn new(store: S) -> Self { + let prefix: Vec = format!("{:0<10}-", rand::random::()).into(); + Self::with_partition_key(store, prefix) + } + + pub fn with_partition_key(store: S, partition_key: Vec) -> Self { + let mut end_partition_key = partition_key.clone(); + end_partition_key.push(u8::MAX); + Self { + store, + partition_key, + end_partition_key, + } + } + + /// Returns `bool` indicating whether `key` is within + /// the partition key space. + fn partition_key_contains(&self, key: &[u8]) -> bool { + key.starts_with(self.partition_key.as_slice()) + } + + pub fn get_key_range(&self) -> (&Vec, &Vec) { + (&self.partition_key, &self.end_partition_key) + } + + pub fn inner(&self) -> &S { + &self.store + } + + fn map_key(&self, key: &[u8]) -> Vec { + let mut new_key = self.partition_key.clone(); + new_key.extend(key); + new_key + } +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Store for PartitionedStore +where + S: Store, +{ + async fn read(&self, key: &[u8]) -> Result>> { + self.store.read(&self.map_key(key)).await + } + + async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result>> { + self.store.write(&self.map_key(key), bytes).await + } + + async fn remove(&mut self, key: &[u8]) -> Result>> { + self.store.remove(&self.map_key(key)).await + } + + async fn flush(&self) -> Result<()> { + self.store.flush().await + } +} + +impl crate::IterableStore for PartitionedStore +where + S: Store + crate::IterableStore, +{ + fn get_all_entries(&self) -> std::pin::Pin>> { + use tokio_stream::StreamExt; + Box::pin(try_stream! { + let mut stream = self.store.get_all_entries(); + while let Some((key, value)) = stream.try_next().await? { + if self.partition_key_contains(&key) { + yield (key, value); + } + } + }) + } +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl Disposable for PartitionedStore +where + S: Store + Disposable, +{ + async fn dispose(&mut self) -> Result<()> { + self.store.dispose().await + } +} diff --git a/rust/noosphere-storage/src/storage.rs b/rust/noosphere-storage/src/storage.rs index 86a585596..cdd7ccaf5 100644 --- a/rust/noosphere-storage/src/storage.rs +++ b/rust/noosphere-storage/src/storage.rs @@ -1,5 +1,4 @@ -use crate::block::BlockStore; -use crate::key_value::KeyValueStore; +use crate::{block::BlockStore, ephemeral::EphemeralStorage, key_value::KeyValueStore}; use anyhow::Result; use async_trait::async_trait; use noosphere_common::ConditionalSync; @@ -13,7 +12,7 @@ use std::fmt::Debug; /// other Noosphere constructs. #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -pub trait Storage: Clone + ConditionalSync + Debug { +pub trait Storage: EphemeralStorage + Clone + ConditionalSync + Debug { type BlockStore: BlockStore; type KeyValueStore: KeyValueStore; diff --git a/rust/noosphere-storage/src/store.rs b/rust/noosphere-storage/src/store.rs index 2cef23a5c..51c607101 100644 --- a/rust/noosphere-storage/src/store.rs +++ b/rust/noosphere-storage/src/store.rs @@ -1,9 +1,10 @@ -use std::io::Cursor; +use std::{io::Cursor, pin::Pin}; use crate::{block::BlockStore, key_value::KeyValueStore}; use anyhow::Result; use async_trait::async_trait; use cid::Cid; +use futures::Stream; use libipld_cbor::DagCborCodec; use libipld_core::{ codec::{Codec, Decode}, @@ -37,6 +38,21 @@ pub trait Store: Clone + ConditionalSync { } } +/// An async stream of key/value pairs from an [IterableStore]. +#[cfg(not(target_arch = "wasm32"))] +pub type IterableStoreStream<'a> = dyn Stream, Vec)>> + Send + 'a; +/// An async stream of key/value pairs from an [IterableStore]. +#[cfg(target_arch = "wasm32")] +pub type IterableStoreStream<'a> = dyn Stream, Vec)>> + 'a; + +/// A store that can iterate over all of its entries. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait IterableStore { + /// Retrieve all key/value pairs from this store as an async stream. + fn get_all_entries(&self) -> Pin>>; +} + #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl BlockStore for S @@ -104,3 +120,37 @@ where Store::flush(self).await } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{NonPersistentStorage, PreferredPlatformStorage, Storage, LINK_STORE}; + use std::collections::HashMap; + use tokio_stream::StreamExt; + + #[cfg(target_arch = "wasm32")] + use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure}; + #[cfg(target_arch = "wasm32")] + wasm_bindgen_test_configure!(run_in_browser); + + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + pub async fn iterable_stores_get_all_entries() -> Result<()> { + let storage = NonPersistentStorage::::new().await?; + let mut store = storage.get_key_value_store(LINK_STORE).await?; + store.write(&[1], &[11]).await?; + store.write(&[2], &[22]).await?; + store.write(&[3], &[33]).await?; + let mut stream = store.get_all_entries(); + + let mut results = HashMap::new(); + while let Some((key, value)) = stream.try_next().await? { + results.insert(key, value); + } + assert_eq!(results.len(), 3); + assert_eq!(results.get(&vec![1]), Some(&vec![11u8])); + assert_eq!(results.get(&vec![2]), Some(&vec![22u8])); + assert_eq!(results.get(&vec![3]), Some(&vec![33u8])); + Ok(()) + } +} diff --git a/rust/noosphere-storage/src/ucan.rs b/rust/noosphere-storage/src/ucan.rs index 0b238f57f..25b11e792 100644 --- a/rust/noosphere-storage/src/ucan.rs +++ b/rust/noosphere-storage/src/ucan.rs @@ -27,7 +27,7 @@ impl UcanStoreTrait for UcanStore { } } -impl Clone for UcanStore { +impl Clone for UcanStore { fn clone(&self) -> Self { UcanStore(self.0.clone()) } diff --git a/rust/noosphere/src/sphere/builder/mod.rs b/rust/noosphere/src/sphere/builder/mod.rs index fc2bb586f..1420dfaf5 100644 --- a/rust/noosphere/src/sphere/builder/mod.rs +++ b/rust/noosphere/src/sphere/builder/mod.rs @@ -279,7 +279,7 @@ pub(crate) async fn generate_db( (storage_path, scoped_storage_layout, sphere_identity).try_into()?; let storage = create_platform_storage(storage_layout, ipfs_gateway_url, storage_config).await?; - SphereDb::new(&storage).await + SphereDb::new(storage).await } #[cfg(test)]