Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
feat: Introduce EphemeralStorage, allowing storages to create temporary
Browse files Browse the repository at this point in the history
Stores.
  • Loading branch information
jsantell committed Oct 20, 2023
1 parent 9e5c83b commit 869abef
Show file tree
Hide file tree
Showing 34 changed files with 1,236 additions and 120 deletions.
13 changes: 12 additions & 1 deletion .github/workflows/run_test_suite.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,18 @@ jobs:
ipfs_version: v0.17.0
run_daemon: true
- name: 'Run Rust native target tests'
run: cargo test --features test-kubo,helpers
# Increase stack size on Windows tests; (non-main) threads are spawned with 2MB
# default stack size, which `orb_can_render_peers_in_the_sphere_address_book`
# uses (at time of writing) slightly more than 2MB. While we could set the thread
# stack size at runtime (via tokio's `thread_stack_size`), it appears to not solve the
# problem, possibly due to the harness thread overflowing (e.g. a non-main thread that
# we can't configure within the test). In lieu of that, set RUST_MIN_STACK to increase
# the stack sizes of threads created by tokio within tests, as well as the test harness
# threads themselves.
#
# While our main thread isn't under fire here, notating this for future use:
# https://users.rust-lang.org/t/stack-overflow-when-compiling-on-windows-10/50818/8
run: $env:RUST_MIN_STACK = '4000000'; cargo test --features test-kubo,helpers
env:
NOOSPHERE_LOG: deafening

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions rust/noosphere-common/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use futures::future::join_all;
#[cfg(not(target_arch = "wasm32"))]
use tokio::task::JoinSet;

use crate::ConditionalSend;

#[cfg(target_arch = "wasm32")]
/// Spawn a future by scheduling it with the local executor. The returned
/// future will be pending until the spawned future completes.
Expand Down Expand Up @@ -43,6 +45,18 @@ where
Ok(tokio::spawn(future).await?)
}

/// Spawns a [ConditionalSend] future without requiring `await`.
/// The future will immediately start processing.
pub fn spawn_no_wait<F>(future: F)
where
F: Future<Output = ()> + ConditionalSend + 'static,
{
#[cfg(target_arch = "wasm32")]
wasm_bindgen_futures::spawn_local(future);
#[cfg(not(target_arch = "wasm32"))]
tokio::task::spawn(future);
}

/// An aggregator of async work that can be used to observe the moment when all
/// the aggregated work is completed. It is similar to tokio's [JoinSet], but is
/// relatively constrained and also works on `wasm32-unknown-unknown`. Unlike
Expand Down
8 changes: 4 additions & 4 deletions rust/noosphere-core/src/authority/author.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ mod tests {
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
async fn it_gives_read_only_access_when_there_is_no_authorization() -> Result<()> {
let author = Author::anonymous();
let mut db = SphereDb::new(&MemoryStorage::default()).await?;
let mut db = SphereDb::new(MemoryStorage::default()).await?;

let (sphere, _, _) = Sphere::generate("did:key:foo", &mut db).await?;

Expand All @@ -201,7 +201,7 @@ mod tests {
async fn it_gives_read_write_access_if_the_key_is_authorized() -> Result<()> {
let owner_key = generate_ed25519_key();
let owner_did = Did(owner_key.get_did().await.unwrap());
let mut db = SphereDb::new(&MemoryStorage::default()).await.unwrap();
let mut db = SphereDb::new(MemoryStorage::default()).await.unwrap();

let (sphere, authorization, _) = Sphere::generate(&owner_did, &mut db).await.unwrap();

Expand All @@ -227,7 +227,7 @@ mod tests {
async fn it_gives_read_write_access_to_the_root_sphere_credential() -> Result<()> {
let owner_key = generate_ed25519_key();
let owner_did = Did(owner_key.get_did().await.unwrap());
let mut db = SphereDb::new(&MemoryStorage::default()).await.unwrap();
let mut db = SphereDb::new(MemoryStorage::default()).await.unwrap();

let (sphere, authorization, mnemonic) =
Sphere::generate(&owner_did, &mut db).await.unwrap();
Expand Down Expand Up @@ -256,7 +256,7 @@ mod tests {
async fn it_can_find_itself_in_an_authorization_lineage() -> Result<()> {
let owner_key = generate_ed25519_key();
let owner_did = Did(owner_key.get_did().await.unwrap());
let mut db = SphereDb::new(&MemoryStorage::default()).await.unwrap();
let mut db = SphereDb::new(MemoryStorage::default()).await.unwrap();

let (sphere, authorization, _) = Sphere::generate(&owner_did, &mut db).await.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-core/src/context/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ mod tests {
let mut records: Vec<LinkRecord> = vec![];
let owner_key = generate_ed25519_key();
let owner_did = owner_key.get_did().await?;
let mut db = SphereDb::new(&MemoryStorage::default()).await?;
let mut db = SphereDb::new(MemoryStorage::default()).await?;
let (sphere, proof, _) = Sphere::generate(&owner_did, &mut db).await?;
let ucan_proof = proof.as_ucan(&db).await?;
let sphere_identity = sphere.get_identity().await?;
Expand Down
8 changes: 4 additions & 4 deletions rust/noosphere-core/src/data/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ mod tests {
let sphere_identity = Did::from(sphere_key.get_did().await?);
let link = "bafyr4iagi6t6khdrtbhmyjpjgvdlwv6pzylxhuhstxhkdp52rju7er325i";
let cid_link: Link<MemoIpld> = link.parse()?;
let store = SphereDb::new(&MemoryStorage::default()).await.unwrap();
let store = SphereDb::new(MemoryStorage::default()).await.unwrap();

let record = from_issuer(&sphere_key, &sphere_identity, &cid_link, None).await?;

Expand All @@ -422,7 +422,7 @@ mod tests {
let sphere_identity = Did::from(sphere_key.get_did().await?);
let link = "bafyr4iagi6t6khdrtbhmyjpjgvdlwv6pzylxhuhstxhkdp52rju7er325i";
let cid_link: Cid = link.parse()?;
let mut store = SphereDb::new(&MemoryStorage::default()).await.unwrap();
let mut store = SphereDb::new(MemoryStorage::default()).await.unwrap();

// First verify that `owner` cannot publish for `sphere`
// without delegation.
Expand Down Expand Up @@ -486,7 +486,7 @@ mod tests {
let sphere_key = generate_ed25519_key();
let sphere_identity = Did::from(sphere_key.get_did().await?);
let cid_address = "bafyr4iagi6t6khdrtbhmyjpjgvdlwv6pzylxhuhstxhkdp52rju7er325i";
let store = SphereDb::new(&MemoryStorage::default()).await.unwrap();
let store = SphereDb::new(MemoryStorage::default()).await.unwrap();

expect_failure(
"fails when expect `fact` is missing",
Expand Down Expand Up @@ -676,7 +676,7 @@ mod tests {
let delegatee_key = generate_ed25519_key();
let delegatee_did = delegatee_key.get_did().await?;

let mut db = SphereDb::new(&MemoryStorage::default()).await?;
let mut db = SphereDb::new(MemoryStorage::default()).await?;
let mut ucan_store = UcanStore(db.clone());

let (sphere, proof, _) = Sphere::generate(&owner_did, &mut db).await?;
Expand Down
4 changes: 2 additions & 2 deletions rust/noosphere-core/src/data/sphere.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ mod tests {
let identity_credential = generate_ed25519_key();
let identity = Did(identity_credential.get_did().await?);

let mut store = SphereDb::new(&MemoryStorage::default()).await?;
let mut store = SphereDb::new(MemoryStorage::default()).await?;

let sphere = SphereIpld::new(&identity, &mut store).await?;

Expand Down Expand Up @@ -127,7 +127,7 @@ mod tests {
let identity = Did(identity_credential.get_did().await?);
let authorized = Did(authorized_credential.get_did().await?);

let mut store = SphereDb::new(&MemoryStorage::default()).await?;
let mut store = SphereDb::new(MemoryStorage::default()).await?;

let sphere = SphereIpld::new(&identity, &mut store).await?;

Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-core/src/helpers/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub async fn simulated_sphere_context(
Some(db) => db,
None => {
let storage_provider = TrackingStorage::wrap(MemoryStorage::default());
SphereDb::new(&storage_provider).await?
SphereDb::new(storage_provider).await?
}
};

Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-core/src/helpers/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ where
{
let owner_key = generate_ed25519_key();
let owner_did = owner_key.get_did().await?;
let mut db = SphereDb::new(&MemoryStorage::default()).await?;
let mut db = SphereDb::new(MemoryStorage::default()).await?;

let (sphere, proof, _) = Sphere::generate(&owner_did, &mut db).await?;
let ucan_proof = proof.as_ucan(&db).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use ucan::crypto::KeyMaterial;

pub async fn main() -> Result<()> {
let storage_provider = MemoryStorage::default();
let mut db = SphereDb::new(&storage_provider).await.unwrap();
let mut db = SphereDb::new(storage_provider).await.unwrap();

let owner_key: SphereContextKey = Arc::new(Box::new(generate_ed25519_key()));
let owner_did = owner_key.get_did().await?;
Expand Down
17 changes: 15 additions & 2 deletions rust/noosphere-ipfs/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use anyhow::Result;
use async_trait::async_trait;
use cid::Cid;
use noosphere_common::ConditionalSync;
use noosphere_storage::{BlockStore, Storage};
use noosphere_storage::{BlockStore, EphemeralStorage, EphemeralStore, Storage};
use std::sync::Arc;
use tokio::sync::RwLock;

Expand Down Expand Up @@ -45,7 +45,6 @@ where
C: IpfsClient + ConditionalSync,
{
type BlockStore = IpfsStore<S::BlockStore, C>;

type KeyValueStore = S::KeyValueStore;

async fn get_block_store(&self, name: &str) -> Result<Self::BlockStore> {
Expand All @@ -58,6 +57,20 @@ where
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<S, C> EphemeralStorage for IpfsStorage<S, C>
where
S: Storage + EphemeralStorage + ConditionalSync,
C: IpfsClient + ConditionalSync,
{
type EphemeralStoreType = <S as EphemeralStorage>::EphemeralStoreType;

async fn get_ephemeral_store(&self) -> Result<EphemeralStore<Self::EphemeralStoreType>> {
self.local_storage.get_ephemeral_store().await
}
}

/// An implementation of [BlockStore] that wraps some other implementation of
/// same. It forwards most behavior to its wrapped implementation, except when
/// reading blocks. In that case, if a block cannot be found locally, it will
Expand Down
4 changes: 2 additions & 2 deletions rust/noosphere-ns/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use libp2p::kad::KademliaConfig;
/// #[tokio::main(flavor = "multi_thread")]
/// async fn main() {
/// let key_material = generate_ed25519_key();
/// let store = SphereDb::new(&MemoryStorage::default()).await.unwrap();
/// let store = SphereDb::new(MemoryStorage::default()).await.unwrap();
///
/// let ns = NameSystemBuilder::default()
/// .ucan_store(store)
Expand Down Expand Up @@ -190,7 +190,7 @@ mod tests {
let keypair = key_material.to_dht_keypair()?;
PeerId::from(keypair.public())
};
let store = SphereDb::new(&MemoryStorage::default()).await.unwrap();
let store = SphereDb::new(MemoryStorage::default()).await.unwrap();
let bootstrap_peers: Vec<Multiaddr> = vec![
"/ip4/127.0.0.50/tcp/33333/p2p/12D3KooWH8WgH9mgbMXrKX4veokUznvEn6Ycwg4qaGNi83nLkoUK"
.parse()?,
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-ns/src/dht_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ pub mod test {
// Now test another node connecting.
let (_other_ns, other_peer_id) = {
let key_material = generate_ed25519_key();
let store = SphereDb::new(&MemoryStorage::default()).await.unwrap();
let store = SphereDb::new(MemoryStorage::default()).await.unwrap();
let ns = NameSystemBuilder::default()
.ucan_store(store)
.key_material(&key_material)
Expand Down
6 changes: 3 additions & 3 deletions rust/noosphere-ns/src/name_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ mod test {
async fn before_name_resolver_tests() -> Result<NameSystem> {
let ns = {
let key_material = generate_ed25519_key();
let store = SphereDb::new(&MemoryStorage::default()).await.unwrap();
let store = SphereDb::new(MemoryStorage::default()).await.unwrap();
let ns = NameSystemBuilder::default()
.ucan_store(store)
.key_material(&key_material)
Expand Down Expand Up @@ -205,7 +205,7 @@ mod test {
async fn before_each() -> Result<(DataPlaceholder, Arc<Mutex<NameSystem>>)> {
let (bootstrap, bootstrap_address) = {
let key_material = generate_ed25519_key();
let store = SphereDb::new(&MemoryStorage::default()).await.unwrap();
let store = SphereDb::new(MemoryStorage::default()).await.unwrap();
let ns = NameSystemBuilder::default()
.ucan_store(store)
.key_material(&key_material)
Expand All @@ -221,7 +221,7 @@ mod test {

let ns = {
let key_material = generate_ed25519_key();
let store = SphereDb::new(&MemoryStorage::default()).await.unwrap();
let store = SphereDb::new(MemoryStorage::default()).await.unwrap();
let ns = NameSystemBuilder::default()
.ucan_store(store)
.key_material(&key_material)
Expand Down
4 changes: 2 additions & 2 deletions rust/noosphere-ns/src/server/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ mod test {
async fn before_each() -> Result<(DataPlaceholder, Arc<Mutex<HttpClient>>)> {
let (bootstrap, bootstrap_address) = {
let key_material = generate_ed25519_key();
let store = SphereDb::new(&MemoryStorage::default()).await.unwrap();
let store = SphereDb::new(MemoryStorage::default()).await.unwrap();
let ns = NameSystemBuilder::default()
.ucan_store(store)
.key_material(&key_material)
Expand All @@ -164,7 +164,7 @@ mod test {
let api_port = api_listener.local_addr().unwrap().port();
let api_url = Url::parse(&format!("http://127.0.0.1:{}", api_port))?;
let key_material = generate_ed25519_key();
let store = SphereDb::new(&MemoryStorage::default()).await.unwrap();
let store = SphereDb::new(MemoryStorage::default()).await.unwrap();

let ns = NameSystemBuilder::default()
.ucan_store(store)
Expand Down
4 changes: 2 additions & 2 deletions rust/noosphere-ns/tests/ns_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn test_name_system_peer_propagation() -> Result<()> {
initialize_tracing(None);
// Create two NameSystems, where `ns_1` is publishing for `sphere_1`
// and `ns_2` is publishing for `sphere_2`.
let mut db = SphereDb::new(&MemoryStorage::default()).await?;
let mut db = SphereDb::new(MemoryStorage::default()).await?;
let network = NameSystemNetwork::generate(3, Some(db.clone())).await?;
let sphere_1_cid_1 = derive_cid::<DagCborCodec>(b"00000000");
let sphere_1_cid_2 = derive_cid::<DagCborCodec>(b"11111111");
Expand Down Expand Up @@ -172,7 +172,7 @@ async fn test_name_system_peer_propagation() -> Result<()> {
#[tokio::test]
async fn test_name_system_validation() -> Result<()> {
initialize_tracing(None);
let mut db = SphereDb::new(&MemoryStorage::default()).await?;
let mut db = SphereDb::new(MemoryStorage::default()).await?;
let network = NameSystemNetwork::generate(2, Some(db.clone())).await?;

let ns_1 = network.get(1).unwrap();
Expand Down
9 changes: 4 additions & 5 deletions rust/noosphere-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,27 @@ readme = "README.md"
anyhow = { workspace = true }
async-trait = "~0.1"
async-stream = { workspace = true }
futures = { workspace = true }
tokio-stream = { workspace = true }
cid = { workspace = true }
noosphere-common = { version = "0.1.2", path = "../noosphere-common" }
tracing = "~0.1"
ucan = { workspace = true }
libipld-core = { workspace = true }
libipld-cbor = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
base64 = "=0.21.4"
url = { version = "^2" }

[dev-dependencies]
witty-phrase-generator = "~0.2"
wasm-bindgen-test = { workspace = true }
rand = { workspace = true }
noosphere-core-dev = { path = "../noosphere-core", features = ["helpers"], package = "noosphere-core" }
noosphere-common = { path = "../noosphere-common", features = ["helpers"] }
instant = { workspace = true }

[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
tempfile = { workspace = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tempfile = { workspace = true }
sled = "~0.34"
tokio = { workspace = true, features = ["full"] }
rocksdb = { version = "0.21.0", optional = true }
Expand All @@ -52,6 +50,7 @@ rocksdb = { version = "0.21.0", optional = true }
tokio = { workspace = true, features = ["sync", "macros"] }
wasm-bindgen = { workspace = true, features = ["serde-serialize"] }
wasm-bindgen-futures = { workspace = true }
witty-phrase-generator = "~0.2"
serde-wasm-bindgen = { workspace = true }
js-sys = { workspace = true }
rexie = { version = "~0.5" }
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-storage/examples/bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl BenchmarkStorage {
}

pub async fn sphere_db(&self) -> Result<SphereDb<ActiveStorageType>> {
SphereDb::new(&self.storage).await
SphereDb::new(self.storage.clone()).await
}

pub async fn as_stats(&mut self) -> Result<PerformanceStats> {
Expand Down
15 changes: 14 additions & 1 deletion rust/noosphere-storage/examples/bench/performance.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{Error, Result};
use async_trait::async_trait;
use instant::{Duration, Instant};
use noosphere_storage::{Space, Storage, Store};
use noosphere_storage::{EphemeralStorage, EphemeralStore, Space, Storage, Store};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;

Expand Down Expand Up @@ -208,3 +208,16 @@ impl<S: Store> Store for PerformanceStore<S> {
result
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<S> EphemeralStorage for PerformanceStorage<S>
where
S: Storage,
{
type EphemeralStoreType = <S as EphemeralStorage>::EphemeralStoreType;

async fn get_ephemeral_store(&self) -> Result<EphemeralStore<Self::EphemeralStoreType>> {
self.storage.get_ephemeral_store().await
}
}
Loading

0 comments on commit 869abef

Please sign in to comment.