From 08dcc3d54768fdda6158b1087a32805a5c855e98 Mon Sep 17 00:00:00 2001 From: Christopher Joel <240083+cdata@users.noreply.github.com> Date: Tue, 10 Oct 2023 13:59:37 -0700 Subject: [PATCH] feat: 3P replication fall-back and resilience (#673) --- Cargo.lock | 9 +- Cargo.toml | 3 +- rust/noosphere-common/Cargo.toml | 8 +- rust/noosphere-common/src/helpers/wait.rs | 5 +- rust/noosphere-common/src/latency.rs | 130 ++++++++++++++ rust/noosphere-common/src/lib.rs | 2 + rust/noosphere-common/src/unshared.rs | 6 +- rust/noosphere-core/Cargo.toml | 1 + rust/noosphere-core/src/context/cursor.rs | 114 +++++++++++- .../src/context/replication/mod.rs | 2 + .../src/context/replication/write.rs | 67 +++++++ .../src/context/sync/strategy.rs | 36 +--- rust/noosphere-core/src/helpers/context.rs | 86 ++++----- rust/noosphere-core/src/stream/car.rs | 6 +- rust/noosphere-core/src/stream/ledger.rs | 170 ++++++++++++++++++ rust/noosphere-core/src/stream/mod.rs | 3 + rust/noosphere-core/src/view/sphere.rs | 26 ++- .../src/worker/syndication.rs | 38 +++- rust/noosphere-ipfs/examples/car.rs | 67 +++++-- rust/noosphere-ipfs/src/client/gateway.rs | 9 + rust/noosphere-ipfs/src/client/kubo.rs | 36 +++- rust/noosphere-ipfs/src/client/mod.rs | 17 +- rust/noosphere-storage/Cargo.toml | 2 +- rust/noosphere-storage/src/db.rs | 4 +- rust/noosphere-storage/src/retry.rs | 4 +- rust/noosphere/Cargo.toml | 1 + rust/noosphere/tests/distributed_basic.rs | 87 ++++++++- rust/noosphere/tests/sphere_channel.rs | 2 +- 28 files changed, 797 insertions(+), 144 deletions(-) create mode 100644 rust/noosphere-common/src/latency.rs create mode 100644 rust/noosphere-core/src/context/replication/write.rs create mode 100644 rust/noosphere-core/src/stream/ledger.rs diff --git a/Cargo.lock b/Cargo.lock index de047c774..10f6e4626 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1928,9 +1928,9 @@ dependencies = [ [[package]] name = "gloo-timers" -version = "0.2.6" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" dependencies = [ "futures-channel", "futures-core", @@ -3501,11 +3501,15 @@ dependencies = [ "anyhow", "futures", "futures-util", + "gloo-timers", + "instant", "rand", "tokio", + "tokio-stream", "tracing", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-bindgen-test", ] [[package]] @@ -3529,6 +3533,7 @@ dependencies = [ "futures-util", "getrandom 0.2.10", "gloo-net", + "instant", "iroh-car", "js-sys", "libipld-cbor", diff --git a/Cargo.toml b/Cargo.toml index 77b0a4cb5..14ada6e01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,8 +28,9 @@ fastcdc = { version = "3.1" } futures = { version = "0.3" } futures-util = { version = "0.3" } gloo-net = { version = "0.4" } -gloo-timers = { version = "0.2", features = ["futures"] } +gloo-timers = { version = "0.3", features = ["futures"] } ignore = { version = "0.4.20" } +instant = { version = "0.1", features = ["wasm-bindgen"] } iroh-car = { version = "^0.3.0" } js-sys = { version = "^0.3" } libipld = { version = "0.16" } diff --git a/rust/noosphere-common/Cargo.toml b/rust/noosphere-common/Cargo.toml index c15fdf584..bb7cb7944 100644 --- a/rust/noosphere-common/Cargo.toml +++ b/rust/noosphere-common/Cargo.toml @@ -13,16 +13,19 @@ homepage = "https://github.com/subconsciousnetwork/noosphere" readme = "README.md" [features] -helpers = ["rand"] +helpers = ["rand", "gloo-timers"] [dependencies] anyhow = { workspace = true } +gloo-timers = { workspace = true, optional = true } tracing = { workspace = true } rand = { workspace = true, optional = true } futures-util = { workspace = true } +instant = { workspace = true } [dev-dependencies] rand = { workspace = true } +tokio-stream = { workspace = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio = { workspace = true, features = ["full"] } @@ -32,3 +35,6 @@ tokio = { workspace = true, features = ["sync", "macros"] } futures = { workspace = true } wasm-bindgen = { workspace = true } wasm-bindgen-futures = { workspace = true } + +[target.'cfg(target_arch = "wasm32")'.dev-dependencies] +wasm-bindgen-test = { workspace = true } diff --git a/rust/noosphere-common/src/helpers/wait.rs b/rust/noosphere-common/src/helpers/wait.rs index 2604fa849..530c0915e 100644 --- a/rust/noosphere-common/src/helpers/wait.rs +++ b/rust/noosphere-common/src/helpers/wait.rs @@ -1,8 +1,11 @@ -use std::time::Duration; +use instant::Duration; /// Wait for the specified number of seconds; uses [tokio::time::sleep], so this /// will yield to the async runtime rather than block until the sleep time is /// elapsed. pub async fn wait(seconds: u64) { + #[cfg(not(target_arch = "wasm32"))] tokio::time::sleep(Duration::from_secs(seconds)).await; + #[cfg(target_arch = "wasm32")] + gloo_timers::future::sleep(Duration::from_secs(seconds)).await } diff --git a/rust/noosphere-common/src/latency.rs b/rust/noosphere-common/src/latency.rs new file mode 100644 index 000000000..d49e9bcb0 --- /dev/null +++ b/rust/noosphere-common/src/latency.rs @@ -0,0 +1,130 @@ +use instant::{Duration, Instant}; + +use futures_util::Stream; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; + +use crate::ConditionalSend; + +/// A helper for observing when [Stream] throughput appears to have stalled +pub struct StreamLatencyGuard +where + S: Stream + Unpin, + S::Item: ConditionalSend + 'static, +{ + inner: S, + threshold: Duration, + last_ready_time: Instant, + tx: UnboundedSender<()>, +} + +impl StreamLatencyGuard +where + S: Stream + Unpin, + S::Item: ConditionalSend + 'static, +{ + /// Wraps a [Stream] and provides an [UnboundedReceiver<()>] that will receive + /// a message any time the wrapped [Stream] is pending for longer than the provided + /// threshold [Duration]. + pub fn wrap(stream: S, threshold: Duration) -> (Self, UnboundedReceiver<()>) { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>(); + ( + StreamLatencyGuard { + inner: stream, + threshold, + last_ready_time: Instant::now(), + tx, + }, + rx, + ) + } +} + +impl Stream for StreamLatencyGuard +where + S: Stream + Unpin, + S::Item: ConditionalSend + 'static, +{ + type Item = S::Item; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let result = std::pin::pin!(&mut self.inner).poll_next(cx); + + if result.is_pending() { + if Instant::now() - self.last_ready_time > self.threshold { + let _ = self.tx.send(()); + } + } else if result.is_ready() { + self.last_ready_time = Instant::now(); + } + + result + } +} + +#[cfg(test)] +mod tests { + use anyhow::Result; + use instant::Duration; + use tokio::select; + use tokio_stream::StreamExt; + + use crate::{helpers::wait, StreamLatencyGuard}; + + #[cfg(target_arch = "wasm32")] + use wasm_bindgen_test::wasm_bindgen_test; + + #[cfg(target_arch = "wasm32")] + wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + async fn it_does_not_impede_the_behavior_of_a_wrapped_stream() -> Result<()> { + let stream = tokio_stream::iter(Vec::from([0u32; 1024])); + + let (guarded_stream, _latency_signal) = + StreamLatencyGuard::wrap(stream, Duration::from_secs(1)); + + tokio::pin!(guarded_stream); + + guarded_stream.collect::>().await; + + Ok(()) + } + + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + async fn it_signals_when_a_stream_encounters_latency() -> Result<()> { + let stream = Box::pin(futures_util::stream::unfold(0, |index| async move { + match index { + 512 => { + for _ in 0..3 { + // Uh oh, latency! Note that `tokio::time::sleep` is observed to cooperate + // with the runtime, so we wait multiple times to ensure that the stream is + // actually polled multiple times + wait(1).await; + } + Some((index, index + 1)) + } + _ if index < 1024 => Some((index, index + 1)), + _ => None, + } + })); + + let (guarded_stream, mut latency_guard) = + StreamLatencyGuard::wrap(stream, Duration::from_millis(100)); + + tokio::pin!(guarded_stream); + + select! { + _ = guarded_stream.collect::>() => { + unreachable!("Latency guard should be hit first"); + }, + _ = latency_guard.recv() => () + } + + Ok(()) + } +} diff --git a/rust/noosphere-common/src/lib.rs b/rust/noosphere-common/src/lib.rs index 405d433a7..6697463f1 100644 --- a/rust/noosphere-common/src/lib.rs +++ b/rust/noosphere-common/src/lib.rs @@ -5,10 +5,12 @@ extern crate tracing; pub mod channel; +mod latency; mod sync; mod task; mod unshared; +pub use latency::*; pub use sync::*; pub use task::*; pub use unshared::*; diff --git a/rust/noosphere-common/src/unshared.rs b/rust/noosphere-common/src/unshared.rs index 000b9a1c3..1eb094637 100644 --- a/rust/noosphere-common/src/unshared.rs +++ b/rust/noosphere-common/src/unshared.rs @@ -53,12 +53,12 @@ impl std::fmt::Debug for Unshared { pub struct UnsharedStream(Unshared) where T: Stream + Unpin, - T::Item: ConditionalSend + 'static; + for<'a> T::Item: ConditionalSend + 'a; impl UnsharedStream where T: Stream + Unpin, - T::Item: ConditionalSend + 'static, + for<'a> T::Item: ConditionalSend + 'a, { /// Initialize a new [UnsharedStream] wrapping a provided (presumably `!Sync`) /// [Stream] @@ -70,7 +70,7 @@ where impl Stream for UnsharedStream where T: Stream + Unpin, - T::Item: ConditionalSend + 'static, + for<'a> T::Item: ConditionalSend + 'a, { type Item = T::Item; diff --git a/rust/noosphere-core/Cargo.toml b/rust/noosphere-core/Cargo.toml index 727cca801..715c17c2e 100644 --- a/rust/noosphere-core/Cargo.toml +++ b/rust/noosphere-core/Cargo.toml @@ -35,6 +35,7 @@ async-stream = { workspace = true } async-once-cell = "~0.4" anyhow = { workspace = true } bytes = { workspace = true } +instant = { workspace = true } iroh-car = { workspace = true } thiserror = { workspace = true } fastcdc = { workspace = true } diff --git a/rust/noosphere-core/src/context/cursor.rs b/rust/noosphere-core/src/context/cursor.rs index 50ac447bd..df0745428 100644 --- a/rust/noosphere-core/src/context/cursor.rs +++ b/rust/noosphere-core/src/context/cursor.rs @@ -4,11 +4,14 @@ use crate::{ data::{Link, MemoIpld}, view::{Sphere, Timeline}, }; -use anyhow::Result; +use anyhow::{anyhow, Result}; use async_trait::async_trait; +use noosphere_common::StreamLatencyGuard; use noosphere_storage::Storage; +use tokio::select; use crate::context::{HasMutableSphereContext, HasSphereContext, SphereContext, SphereReplicaRead}; +use instant::Duration; use std::marker::PhantomData; /// A [SphereCursor] is a structure that enables reading from and writing to a @@ -200,7 +203,17 @@ where let stream = client .replicate(&version, replicate_parameters.as_ref()) .await?; - put_block_stream(db.clone(), stream).await?; + + tokio::pin!(stream); + + let (stream, mut rx) = StreamLatencyGuard::wrap(stream, Duration::from_secs(5)); + + select! { + _ = put_block_stream(db.clone(), stream) => (), + _ = rx.recv() => { + return Err(anyhow!("Block timed out")) + } + } // If this was incremental replication, we have to hydrate... if let Some(since) = since { @@ -271,10 +284,10 @@ where #[cfg(test)] mod tests { - use std::sync::Arc; + use std::{sync::Arc, time::Duration}; use anyhow::Result; - use noosphere_storage::UcanStore; + use noosphere_storage::{Store, UcanStore}; use tokio::io::AsyncReadExt; #[cfg(target_arch = "wasm32")] @@ -287,7 +300,7 @@ mod tests { authority::Access, context::{ HasMutableSphereContext, HasSphereContext, SphereContentRead, SphereContentWrite, - SpherePetnameRead, SpherePetnameWrite, SphereReplicaRead, + SpherePetnameRead, SpherePetnameWrite, SphereReplicaRead, SphereReplicaWrite, }, data::{ContentType, Header}, helpers::{ @@ -646,6 +659,86 @@ mod tests { Ok(()) } + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + async fn it_falls_back_to_available_version_when_traversal_target_version_is_not_available( + ) -> Result<()> { + initialize_tracing(None); + + let name_sequence: Vec = vec!["a".into(), "b".into(), "c".into()]; + let (mut origin_sphere_context, mut sphere_contexts) = + make_sphere_context_with_peer_chain(&name_sequence).await?; + + let expected_last_sphere_context_version = + sphere_contexts.last().unwrap().version().await?; + + let mut db = origin_sphere_context.sphere_context().await?.db().clone(); + let mut next_link_record = None; + let mut next_peer_name: Option<&str> = None; + let mut last_context_latest_version = None; + + for (context, name) in sphere_contexts.iter_mut().zip(name_sequence.iter()).rev() { + let current_version = context.version().await?; + context + .write("revision", &ContentType::Text, b"foo".as_ref(), None) + .await?; + + if let Some(peer_name) = next_peer_name { + context + .set_petname_record(peer_name, &next_link_record.unwrap()) + .await?; + } + + let version = context.save(None).await?; + let link_record = context + .create_link_record(Some(Duration::from_secs(130))) + .await?; + + if last_context_latest_version.is_none() { + last_context_latest_version = Some(version); + } + + // Force the tip of history back to an old version in order to + // simulate the "needs to replicate" condition (noting that this + // pointer automatically advances for a local save). + db.set_version(&context.identity().await?, ¤t_version) + .await?; + + next_peer_name = Some(name.as_str()); + next_link_record = Some(link_record); + } + + origin_sphere_context + .set_petname_record("a", &next_link_record.unwrap()) + .await?; + origin_sphere_context.save(None).await?; + + // Remove the memo for this version so that it cannot be found, forcing + // a replicaton attempt that will fail when we try to traverse to it + // later + origin_sphere_context + .sphere_context_mut() + .await? + .db_mut() + .to_block_store() + .remove(&last_context_latest_version.unwrap().to_bytes()) + .await?; + + let cursor = SphereCursor::latest(origin_sphere_context); + + let target_sphere_context = cursor + .traverse_by_petnames(&name_sequence.into_iter().rev().collect::>()) + .await? + .unwrap(); + + assert_eq!( + target_sphere_context.version().await?, + expected_last_sphere_context_version + ); + + Ok(()) + } + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] async fn it_resolves_none_when_a_petname_is_missing_from_the_sequence() -> Result<()> { @@ -678,14 +771,19 @@ mod tests { #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] - async fn it_correctly_identifies_a_visited_perer() -> Result<()> { + async fn it_correctly_identifies_a_visited_peer() -> Result<()> { initialize_tracing(None); let name_sequence: Vec = vec!["a".into(), "b".into(), "c".into()]; - let (origin_sphere_context, dids) = + let (origin_sphere_context, sphere_contexts) = make_sphere_context_with_peer_chain(&name_sequence).await?; + let mut dids = vec![origin_sphere_context.identity().await?]; + for context in &sphere_contexts { + dids.push(context.identity().await?); + } + let cursor = SphereCursor::latest(Arc::new( origin_sphere_context.sphere_context().await?.clone(), )); @@ -701,7 +799,7 @@ mod tests { identities.push(target_sphere_context.identity().await?); } - assert_eq!(identities.into_iter().rev().collect::>(), dids); + assert_eq!(identities, dids); Ok(()) } diff --git a/rust/noosphere-core/src/context/replication/mod.rs b/rust/noosphere-core/src/context/replication/mod.rs index cd2e912fe..c8b54eba1 100644 --- a/rust/noosphere-core/src/context/replication/mod.rs +++ b/rust/noosphere-core/src/context/replication/mod.rs @@ -1,3 +1,5 @@ mod read; +mod write; pub use read::*; +pub use write::*; diff --git a/rust/noosphere-core/src/context/replication/write.rs b/rust/noosphere-core/src/context/replication/write.rs new file mode 100644 index 000000000..ea7626c42 --- /dev/null +++ b/rust/noosphere-core/src/context/replication/write.rs @@ -0,0 +1,67 @@ +use anyhow::Result; +use instant::Duration; + +use async_trait::async_trait; +use noosphere_storage::Storage; +use ucan::{builder::UcanBuilder, store::UcanJwtStore}; + +use crate::{ + authority::{generate_capability, SphereAbility}, + context::{internal::SphereContextInternal, HasMutableSphereContext}, + data::{Jwt, LinkRecord, LINK_RECORD_FACT_NAME}, +}; + +/// Implementors are able to write Noosphere data pertaining to replicating +/// their spheres across the network +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait SphereReplicaWrite +where + S: Storage + 'static, +{ + /// Produce a [LinkRecord] for this sphere pointing to the latest version + /// according to the implementor, optionally valid for the specified + /// lifetime. + async fn create_link_record(&mut self, lifetime: Option) -> Result; +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl SphereReplicaWrite for C +where + C: HasMutableSphereContext, + S: Storage + 'static, +{ + async fn create_link_record(&mut self, lifetime: Option) -> Result { + self.assert_write_access().await?; + + let mut context = self.sphere_context_mut().await?; + let author = context.author(); + let identity = context.identity(); + let version = context.version().await?; + + let mut builder = UcanBuilder::default() + .issued_by(&author.key) + .for_audience(identity) + .claiming_capability(&generate_capability(identity, SphereAbility::Publish)) + .with_fact(LINK_RECORD_FACT_NAME, version.to_string()) + .with_nonce(); + + if let Some(lifetime) = lifetime { + builder = builder.with_lifetime(lifetime.as_secs()); + } + + // An authorization may not be present or required if the issuer is the root credential, + // which may happen in recovery scenarios where the user provides their mnemonic + if let Some(authorization) = &author.authorization { + builder = builder.witnessed_by(&authorization.as_ucan(context.db()).await?, None) + } + + let link_record = LinkRecord::from(builder.build()?.sign().await?); + + let jwt = Jwt(link_record.encode()?); + context.db_mut().write_token(&jwt).await?; + + Ok(link_record) + } +} diff --git a/rust/noosphere-core/src/context/sync/strategy.rs b/rust/noosphere-core/src/context/sync/strategy.rs index 48452db91..a279f7e26 100644 --- a/rust/noosphere-core/src/context/sync/strategy.rs +++ b/rust/noosphere-core/src/context/sync/strategy.rs @@ -1,21 +1,20 @@ -use std::{collections::BTreeMap, marker::PhantomData}; +use std::{collections::BTreeMap, marker::PhantomData, time::Duration}; use crate::{ api::{ v0alpha1::FetchParameters, v0alpha2::{PushBody, PushResponse}, }, + context::SphereReplicaWrite, stream::put_block_stream, }; use crate::{ - authority::{generate_capability, SphereAbility}, - data::{Did, IdentityIpld, Jwt, Link, MemoIpld, LINK_RECORD_FACT_NAME}, + data::{Did, IdentityIpld, Jwt, Link, MemoIpld}, view::{Sphere, Timeline}, }; use anyhow::{anyhow, Result}; use noosphere_storage::{KeyValueStore, SphereDb, Storage}; use tokio_stream::StreamExt; -use ucan::builder::UcanBuilder; use crate::context::{ metadata::COUNTERPART, HasMutableSphereContext, SpherePetnameRead, SpherePetnameWrite, @@ -358,6 +357,10 @@ where counterpart_sphere_identity: &Did, counterpart_sphere_tip: &Link, ) -> Result<(), SyncError> { + let link_record = Jwt(context + .create_link_record(Some(Duration::from_secs(120))) + .await? + .encode()?); let mut context = context.sphere_context_mut().await?; let local_sphere_base = Sphere::at(counterpart_sphere_tip, context.db()) @@ -378,26 +381,6 @@ where let client = context.client().await?; let local_sphere_identity = context.identity(); - let authorization = context - .author() - .require_authorization()? - .as_ucan(context.db()) - .await?; - - let name_record = Jwt(UcanBuilder::default() - .issued_by(&context.author().key) - .for_audience(local_sphere_identity) - .witnessed_by(&authorization, None) - .claiming_capability(&generate_capability( - local_sphere_identity, - SphereAbility::Publish, - )) - .with_lifetime(120) - .with_fact(LINK_RECORD_FACT_NAME, local_sphere_tip.to_string()) - .build()? - .sign() - .await? - .encode()?); info!( "Pushing new local history to gateway {}...", @@ -410,7 +393,7 @@ where local_base: local_sphere_base, local_tip: *local_sphere_tip, counterpart_tip: Some(*counterpart_sphere_tip), - name_record: Some(name_record), + name_record: Some(link_record), }) .await?; @@ -423,9 +406,6 @@ where info!("Saving updated counterpart sphere history..."); - // TODO: Do this inside the client `push` method - //new_blocks.load_into(context.db_mut()).await?; - debug!( "Hydrating updated counterpart sphere history (from {} back to {})...", counterpart_sphere_tip, counterpart_sphere_updated_tip diff --git a/rust/noosphere-core/src/helpers/context.rs b/rust/noosphere-core/src/helpers/context.rs index 398b420b4..84033ce55 100644 --- a/rust/noosphere-core/src/helpers/context.rs +++ b/rust/noosphere-core/src/helpers/context.rs @@ -1,17 +1,18 @@ //! These helpers are intended for use in documentation examples and tests only. //! They are useful for quickly scaffolding common scenarios that would //! otherwise be verbosely rubber-stamped in a bunch of places. -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use crate::{ - authority::{generate_capability, generate_ed25519_key, Access, Author, SphereAbility}, - data::{ContentType, Did, LinkRecord, Mnemonic, LINK_RECORD_FACT_NAME}, + authority::{generate_ed25519_key, Access, Author}, + context::SphereReplicaWrite, + data::{ContentType, Mnemonic}, view::Sphere, }; use anyhow::Result; use noosphere_storage::{BlockStore, MemoryStorage, SphereDb, Storage, TrackingStorage, UcanStore}; use tokio::{io::AsyncReadExt, sync::Mutex}; -use ucan::{builder::UcanBuilder, crypto::KeyMaterial}; +use ucan::crypto::KeyMaterial; use crate::{ context::{ @@ -126,12 +127,18 @@ where /// A type of [HasMutableSphereContext] that uses [TrackingStorage] internally pub type TrackedHasMutableSphereContext = Arc>>>; -/// Create a series of spheres where each sphere has the next as resolved -/// entry in its address book; return a [HasMutableSphereContext] for the -/// first sphere in the sequence. +/// Create a series of spheres where each sphere has the next as resolved entry +/// in its address book; return a [HasMutableSphereContext] for the first sphere +/// in the sequence. The returned [HasMutableSphereContext] will have a peer in +/// its address book that refers to the first link in the caller-prescribed +/// chain of peers, so if you suggest a chain of three peers (for example), a +/// total of four sphere contexts will be scaffolded. pub async fn make_sphere_context_with_peer_chain( peer_chain: &[String], -) -> Result<(TrackedHasMutableSphereContext, Vec)> { +) -> Result<( + TrackedHasMutableSphereContext, + Vec, +)> { let (origin_sphere_context, _) = simulated_sphere_context(Access::ReadWrite, None) .await .unwrap(); @@ -160,47 +167,14 @@ pub async fn make_sphere_context_with_peer_chain( } let mut next_sphere_context: Option = None; - let mut dids = Vec::new(); + let mut sphere_contexts = Vec::new(); for mut sphere_context in contexts.into_iter().rev() { - dids.push(sphere_context.identity().await?); - if let Some(next_sphere_context) = next_sphere_context { - let version = next_sphere_context.version().await.unwrap(); - - let next_author = next_sphere_context - .sphere_context() - .await - .unwrap() - .author() - .clone(); - let next_identity = next_sphere_context.identity().await.unwrap(); - - let link_record = LinkRecord::from( - UcanBuilder::default() - .issued_by(&next_author.key) - .for_audience(&next_identity) - .witnessed_by( - &next_author - .authorization - .as_ref() - .unwrap() - .as_ucan(&db) - .await - .unwrap(), - None, - ) - .claiming_capability(&generate_capability( - &next_identity, - SphereAbility::Publish, - )) - .with_lifetime(120) - .with_fact(LINK_RECORD_FACT_NAME, version.to_string()) - .build() - .unwrap() - .sign() - .await - .unwrap(), - ); + if let Some(mut next_sphere_context) = next_sphere_context { + sphere_contexts.push(next_sphere_context.clone()); + let link_record = next_sphere_context + .create_link_record(Some(Duration::from_secs(120))) + .await?; let mut name = String::new(); let mut file = next_sphere_context.read("my-name").await.unwrap().unwrap(); @@ -208,7 +182,7 @@ pub async fn make_sphere_context_with_peer_chain( debug!("Adopting {name}"); sphere_context - .set_petname(&name, Some(next_identity)) + .set_petname(&name, Some(next_sphere_context.identity().await?)) .await?; sphere_context.save(None).await?; @@ -216,15 +190,21 @@ pub async fn make_sphere_context_with_peer_chain( .set_petname_record(&name, &link_record) .await .unwrap(); - let identity = sphere_context.identity().await?; - db.set_version(&identity, &sphere_context.save(None).await.unwrap()) - .await - .unwrap(); + sphere_context.save(None).await?; } + db.set_version( + &sphere_context.identity().await?, + &sphere_context.version().await?.into(), + ) + .await + .unwrap(); + next_sphere_context = Some(sphere_context); } - Ok((origin_sphere_context, dids)) + sphere_contexts.reverse(); + + Ok((origin_sphere_context, sphere_contexts)) } diff --git a/rust/noosphere-core/src/stream/car.rs b/rust/noosphere-core/src/stream/car.rs index d9ed1528b..8f8e77491 100644 --- a/rust/noosphere-core/src/stream/car.rs +++ b/rust/noosphere-core/src/stream/car.rs @@ -42,12 +42,12 @@ where /// Takes a list of roots and a stream of blocks (pairs of [Cid] and /// corresponding [Vec]), and produces an async byte stream that yields a /// valid [CARv1](https://ipld.io/specs/transport/car/carv1/) -pub fn to_car_stream( +pub fn to_car_stream<'a, S>( mut roots: Vec, block_stream: S, -) -> impl Stream> + ConditionalSend +) -> impl Stream> + ConditionalSend + 'a where - S: Stream)>> + ConditionalSend, + S: Stream)>> + ConditionalSend + 'a, { if roots.is_empty() { roots = vec![Cid::default()] diff --git a/rust/noosphere-core/src/stream/ledger.rs b/rust/noosphere-core/src/stream/ledger.rs new file mode 100644 index 000000000..44f51a58c --- /dev/null +++ b/rust/noosphere-core/src/stream/ledger.rs @@ -0,0 +1,170 @@ +use anyhow::Result; +use async_stream::try_stream; +use futures_util::Stream; +use noosphere_common::ConditionalSend; +use std::{collections::BTreeSet, sync::Arc}; +use tokio::sync::Mutex; + +use cid::Cid; +use libipld_cbor::DagCborCodec; +use libipld_core::{codec::Codec, ipld::Ipld, raw::RawCodec}; + +/// A utility to help with tracking the relationship of blocks and references +/// within a series of blocks. +#[derive(Default, Clone, Debug)] +pub struct BlockLedger { + references: BTreeSet, + blocks: BTreeSet, +} + +impl BlockLedger { + /// Record a block in the ledger, extracting references from the block bytes + /// and noting the block's own [Cid] + pub fn record(&mut self, cid: &Cid, block: &[u8]) -> Result<()> { + self.blocks.insert(*cid); + + match cid.codec() { + codec if codec == u64::from(DagCborCodec) => { + DagCborCodec.references::(block, &mut self.references)?; + } + codec if codec == u64::from(RawCodec) => { + RawCodec.references::(block, &mut self.references)?; + } + _ => (), + }; + + Ok(()) + } + + /// Get an iterator over the [Cid]s of orphan blocks based on the current + /// state of the [BlockLedger]. + /// + /// Orphan blocks are blocks that are not referenced by any other blocks + /// in the set of blocks recorded by this [BlockLedger]. + pub fn orphans(&self) -> impl IntoIterator { + self.blocks.difference(&self.references) + } + + /// Same as [BlockLedger::orphans], but consumes the [BlockLedger] and + /// yields owned [Cid]s. + pub fn into_orphans(self) -> impl IntoIterator { + self.blocks + .into_iter() + .filter(move |cid| !self.references.contains(cid)) + } + + /// Get an iterator over [Cid]s that are referenced by the recorded blocks + /// but have not been recorded by this [BlockLedger] themselves. + pub fn missing_references(&self) -> impl IntoIterator { + self.references + .iter() + .filter(|cid| !self.blocks.contains(*cid)) + } + + /// Same as [BlockLedger::missing_references], but consumes the + /// [BlockLedger] and yields owned [Cid]s. + pub fn into_missing_references(self) -> impl IntoIterator { + self.references + .into_iter() + .filter(move |cid| !self.blocks.contains(cid)) + } +} + +/// Wraps a block stream (any stream yielding `Result<(Cid, Vec)>`), and +/// records "orphan" blocks to a providede target buffer. +/// +/// Orphan blocks are blocks that occurred in the stream but were not referenced +/// by any other blocks in the stream. +pub fn record_stream_orphans( + orphans: Arc>, + block_stream: S, +) -> impl Stream)>> + ConditionalSend +where + E: Extend + ConditionalSend, + S: Stream)>> + ConditionalSend, +{ + try_stream! { + let mut ledger = BlockLedger::default(); + let mut locked_orphans = orphans.lock().await; + + for await item in block_stream { + let (cid, block) = item?; + + ledger.record(&cid, &block)?; + + yield (cid, block); + } + + locked_orphans.extend(ledger.into_orphans()); + } +} + +#[cfg(test)] +mod tests { + use std::{collections::BTreeSet, sync::Arc}; + + use anyhow::Result; + use cid::Cid; + use futures_util::StreamExt; + use tokio::sync::Mutex; + #[cfg(target_arch = "wasm32")] + use wasm_bindgen_test::wasm_bindgen_test; + + use crate::{ + authority::Access, + context::{HasMutableSphereContext, HasSphereContext, SpherePetnameWrite}, + helpers::{make_valid_link_record, simulated_sphere_context}, + stream::{memo_body_stream, record_stream_orphans}, + }; + + #[cfg(target_arch = "wasm32")] + wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + async fn it_records_orphans_in_a_stream() -> Result<()> { + let (mut sphere_context, _) = simulated_sphere_context(Access::ReadWrite, None).await?; + let mut db = sphere_context.sphere_context().await?.db().clone(); + let (did, link_record, _) = make_valid_link_record(&mut db).await?; + + sphere_context.set_petname("foo", Some(did)).await?; + sphere_context.save(None).await?; + sphere_context + .set_petname_record("foo", &link_record) + .await?; + + let version_to_stream = sphere_context.save(None).await?; + let orphans: Arc>> = Default::default(); + + let stream = record_stream_orphans( + orphans.clone(), + memo_body_stream(db.clone(), &version_to_stream, true), + ); + + tokio::pin!(stream); + + let _ = stream.collect::)>>>().await; + + let orphans = orphans.lock().await; + + assert_eq!(orphans.len(), 2); + + let root = sphere_context.version().await?; + + assert!(orphans.contains(&root)); + + let orphan_link_record = Cid::try_from( + link_record + .proofs() + .as_ref() + .unwrap() + .first() + .unwrap() + .as_str(), + )?; + + assert!(orphans.contains(&orphan_link_record)); + + Ok(()) + } +} diff --git a/rust/noosphere-core/src/stream/mod.rs b/rust/noosphere-core/src/stream/mod.rs index d5cd5e0c4..350d1df99 100644 --- a/rust/noosphere-core/src/stream/mod.rs +++ b/rust/noosphere-core/src/stream/mod.rs @@ -3,11 +3,13 @@ mod block; mod car; +mod ledger; mod memo; mod walk; pub use block::*; pub use car::*; +pub use ledger::*; pub use memo::*; pub use walk::*; @@ -388,6 +390,7 @@ mod tests { ); let store = sphere_context.lock().await.db().clone(); + let last_version = versions.pop().unwrap(); let last_version_parent = versions.pop().unwrap(); diff --git a/rust/noosphere-core/src/view/sphere.rs b/rust/noosphere-core/src/view/sphere.rs index dd47be1df..b7e02b7b8 100644 --- a/rust/noosphere-core/src/view/sphere.rs +++ b/rust/noosphere-core/src/view/sphere.rs @@ -124,7 +124,7 @@ where } }; - debug!("Petname assigned to {:?}", identity); + debug!("Petname is assigned to {:?}", identity); let link_record_version = match identity.link_record(&UcanStore(self.store().clone())).await { @@ -147,12 +147,14 @@ where // Check for version in local sphere DB // If desired version available, check for memo and body blocks + warn!(did = ?identity.did, "Looking for local version"); + let local_version = self .store() .get_version(&identity.did) .await? .map(|cid| cid.into()); - let (replication_required, local_version) = + let (replication_required, target_version) = if local_version.as_ref() == Some(&link_record_version) { match self .store() @@ -192,13 +194,25 @@ where // If no version available or memo/body missing, attempt to replicate the needed blocks - if replication_required { + let target_version = if replication_required { debug!("Attempting to replicate from gateway..."); - replicate(link_record_version, local_version).await?; - } + if let Err(error) = replicate(link_record_version, target_version).await { + if let Some(local_version) = local_version { + warn!("Replication failed; falling back to {local_version}"); + local_version + } else { + warn!(identity = ?identity.did, "Replication failed, and no fallback is available"); + return Err(error); + } + } else { + link_record_version + } + } else { + link_record_version + }; - Ok(Some(Sphere::at(&link_record_version, self.store()))) + Ok(Some(Sphere::at(&target_version, self.store()))) } } diff --git a/rust/noosphere-gateway/src/worker/syndication.rs b/rust/noosphere-gateway/src/worker/syndication.rs index e84a878a3..66c138caf 100644 --- a/rust/noosphere-gateway/src/worker/syndication.rs +++ b/rust/noosphere-gateway/src/worker/syndication.rs @@ -2,13 +2,14 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::{io::Cursor, sync::Arc}; use anyhow::Result; +use cid::Cid; use libipld_cbor::DagCborCodec; use noosphere_common::UnsharedStream; use noosphere_core::context::{ metadata::COUNTERPART, HasMutableSphereContext, SphereContentRead, SphereContentWrite, SphereCursor, }; -use noosphere_core::stream::{memo_body_stream, to_car_stream}; +use noosphere_core::stream::{memo_body_stream, record_stream_orphans, to_car_stream}; use noosphere_core::{ data::{ContentType, Did, Link, MemoIpld}, view::Timeline, @@ -16,6 +17,7 @@ use noosphere_core::{ use noosphere_ipfs::{IpfsClient, KuboClient}; use noosphere_storage::{block_deserialize, block_serialize, KeyValueStore, Storage}; use serde::{Deserialize, Serialize}; +use tokio::sync::Mutex; use tokio::{ io::AsyncReadExt, sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, @@ -177,14 +179,38 @@ where // For all CIDs since the last historical checkpoint, syndicate a CAR // of blocks that are unique to that revision to the backing IPFS // implementation + for cid in timeline { - let car_stream = to_car_stream(vec![cid.into()], memo_body_stream(db.clone(), &cid, true)); - let car_reader = StreamReader::new(UnsharedStream::new(Box::pin(car_stream))); + let orphans = Arc::new(Mutex::new(Vec::new())); + + let car_stream = to_car_stream( + vec![cid.into()], + record_stream_orphans(orphans.clone(), memo_body_stream(db.clone(), &cid, true)), + ); + let unshared_stream = UnsharedStream::new(Box::pin(car_stream)); + let car_reader = StreamReader::new(unshared_stream); match kubo_client.syndicate_blocks(car_reader).await { Ok(_) => { - syndication_checkpoint.last_syndicated_version = Some(cid); - debug!("Syndicated sphere revision {} to IPFS", cid) + let orphans = orphans.lock().await; + + debug!(?orphans, "Imported blocks to IPFS; pinning orphans...",); + + let root = Cid::from(cid); + + match kubo_client + .pin_blocks(orphans.iter().filter(|orphan| *orphan != &root)) + .await + { + Ok(_) => { + debug!("Syndicated sphere revision {} to IPFS", cid); + syndication_checkpoint.last_syndicated_version = Some(cid); + } + Err(error) => warn!( + "Failed to pin orphans for revision {} to IPFS: {:?}", + cid, error + ), + } } Err(error) => warn!("Failed to syndicate revision {} to IPFS: {:?}", cid, error), }; @@ -286,7 +312,7 @@ mod tests { debug!("Looking for blocks..."); - for _ in 0..3 { + for _ in 0..30 { debug!("Sending request to Kubo..."); select! { diff --git a/rust/noosphere-ipfs/examples/car.rs b/rust/noosphere-ipfs/examples/car.rs index d862b63fa..6dd9aefbe 100644 --- a/rust/noosphere-ipfs/examples/car.rs +++ b/rust/noosphere-ipfs/examples/car.rs @@ -11,6 +11,8 @@ use libipld_cbor::DagCborCodec; use libipld_core::raw::RawCodec; use multihash::MultihashDigest; +use noosphere_core::stream::BlockLedger; + #[cfg(not(target_arch = "wasm32"))] use tokio::fs::File; @@ -40,6 +42,9 @@ pub fn main() {} #[cfg(not(target_arch = "wasm32"))] #[cfg_attr(not(target_arch = "wasm32"), tokio::main)] pub async fn main() -> Result<()> { + use libipld_core::ipld::Ipld; + use noosphere_storage::block_decode; + let file = if let Some(arg) = env::args().nth(1) { println!("Opening {arg}...\n"); File::open(arg).await? @@ -50,7 +55,7 @@ pub async fn main() -> Result<()> { let mut reader = CarReader::new(file).await?; - let header = reader.header(); + let header = reader.header().clone(); println!("=== Header (CARv{}) ===\n", header.version()); @@ -62,22 +67,27 @@ pub async fn main() -> Result<()> { let mut index = 0usize; + let mut block_ledger = BlockLedger::default(); + while let Some((cid, block)) = reader.next_block().await? { println!("=== Block {} ===\n", index); - let verification_sign = if cid.codec() == u64::from(DagCborCodec) { - let hasher = cid::multihash::Code::try_from(cid.hash().code())?; - let multihash = hasher.digest(&block); - let new_cid = Cid::new_v1(cid.codec(), multihash); + block_ledger.record(&cid, &block)?; + + let verification_sign = + if cid.codec() == u64::from(DagCborCodec) || cid.codec() == u64::from(RawCodec) { + let hasher = cid::multihash::Code::try_from(cid.hash().code())?; + let multihash = hasher.digest(&block); + let new_cid = Cid::new_v1(cid.codec(), multihash); - if cid == new_cid { - "✔️" + if cid == new_cid { + "✔️" + } else { + "🚫" + } } else { - "🚫" - } - } else { - "🤷" - }; + "🤷" + }; println!( "{} {} ({:?}, {}, {})\n", @@ -96,8 +106,41 @@ pub async fn main() -> Result<()> { .join(" ") ); + if cid.codec() == u64::from(DagCborCodec) { + let ipld = block_decode::(&block)?; + println!("{:#?}\n", ipld); + } + index += 1; } + let missing_references = block_ledger + .missing_references() + .into_iter() + .map(|cid| cid.to_string()) + .collect::>(); + + let orphaned = block_ledger + .orphans() + .into_iter() + .filter_map(|cid| { + if header.roots().contains(cid) { + None + } else { + Some(cid.to_string()) + } + }) + .collect::>(); + + if !missing_references.is_empty() { + println!("=== References to missing blocks ===\n"); + println!("{}\n", missing_references.join("\n")); + } + + if !orphaned.is_empty() { + println!("=== Orphaned blocks ===\n"); + println!("{}\n", orphaned.join("\n")); + } + Ok(()) } diff --git a/rust/noosphere-ipfs/src/client/gateway.rs b/rust/noosphere-ipfs/src/client/gateway.rs index f8588a04c..71e2073f9 100644 --- a/rust/noosphere-ipfs/src/client/gateway.rs +++ b/rust/noosphere-ipfs/src/client/gateway.rs @@ -2,6 +2,7 @@ use super::{IpfsClient, IpfsClientAsyncReadSendSync}; use anyhow::Result; use async_trait::async_trait; use cid::Cid; +use noosphere_common::ConditionalSend; use reqwest::Client; use reqwest::StatusCode; use std::str::FromStr; @@ -56,6 +57,14 @@ impl GatewayClient { #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl IpfsClient for GatewayClient { + async fn pin_blocks<'a, I>(&self, _cids: I) -> Result<()> + where + I: IntoIterator + ConditionalSend + std::fmt::Debug, + I::IntoIter: ConditionalSend, + { + unimplemented!("IPFS HTTP Gateway does not have this capability."); + } + async fn block_is_pinned(&self, _cid: &Cid) -> Result { unimplemented!("IPFS HTTP Gateway does not have this capability."); } diff --git a/rust/noosphere-ipfs/src/client/kubo.rs b/rust/noosphere-ipfs/src/client/kubo.rs index 7649afe15..fe9150e8e 100644 --- a/rust/noosphere-ipfs/src/client/kubo.rs +++ b/rust/noosphere-ipfs/src/client/kubo.rs @@ -13,9 +13,10 @@ use hyper::{ }; use hyper_multipart_rfc7578::client::multipart::{Body as MultipartBody, Form}; // TODO(#587): Remove dependency on `ipfs-api` crate -use ipfs_api_prelude::response::PinLsResponse; +use ipfs_api_prelude::response::{PinAddResponse, PinLsResponse}; use libipld_cbor::DagCborCodec; use libipld_core::raw::RawCodec; +use noosphere_common::ConditionalSend; use serde_json::Value; use tokio::select; use url::Url; @@ -35,7 +36,7 @@ fn get_codec(cid: &Cid) -> Result { // that it cannot find locally or on the network, it hangs indefinitely with no // feedback // See: https://github.com/ipfs/kubo/issues/10159 -const KUBO_DAG_IMPORT_TIMEOUT: Duration = Duration::from_secs(10); +const KUBO_DAG_IMPORT_TIMEOUT: Duration = Duration::from_secs(60); /// A high-level HTTP client for accessing IPFS /// [Kubo RPC APIs](https://docs.ipfs.tech/reference/kubo/rpc/) and normalizing @@ -59,6 +60,37 @@ impl KuboClient { #[async_trait] impl IpfsClient for KuboClient { + #[instrument(skip(self), level = "trace")] + async fn pin_blocks<'a, I>(&self, cids: I) -> Result<()> + where + I: IntoIterator + ConditionalSend + std::fmt::Debug, + I::IntoIter: ConditionalSend, + { + let mut api_url = self.api_url.clone(); + api_url.set_path("/api/v0/pin/add"); + + for cid in cids { + let cid_base64 = cid.to_string(); + api_url.set_query(Some(&format!("arg={cid_base64}"))); + + let request = Request::builder() + .method("POST") + .uri(&api_url.to_string()) + .body(Body::empty())?; + let response = self.client.request(request).await?; + + let body_bytes = hyper::body::to_bytes(response.into_body()).await?; + match serde_json::from_slice(body_bytes.as_ref()) { + Ok(PinAddResponse { .. }) => (), + _ => { + warn!("Unexpected response when pinning {}", cid); + } + } + } + + Ok(()) + } + #[instrument(skip(self), level = "trace")] async fn block_is_pinned(&self, cid: &Cid) -> Result { let mut api_url = self.api_url.clone(); diff --git a/rust/noosphere-ipfs/src/client/mod.rs b/rust/noosphere-ipfs/src/client/mod.rs index ae55d25bc..8a2cf2c63 100644 --- a/rust/noosphere-ipfs/src/client/mod.rs +++ b/rust/noosphere-ipfs/src/client/mod.rs @@ -12,18 +12,12 @@ pub use kubo::KuboClient; use anyhow::{anyhow, Result}; use async_trait::async_trait; use cid::Cid; +use noosphere_common::{ConditionalSend, ConditionalSync}; use std::fmt::Debug; use tokio::io::AsyncRead; -#[cfg(not(target_arch = "wasm32"))] -pub trait IpfsClientAsyncReadSendSync: AsyncRead + Send + Sync + 'static {} -#[cfg(not(target_arch = "wasm32"))] -impl IpfsClientAsyncReadSendSync for S where S: AsyncRead + Send + Sync + 'static {} - -#[cfg(target_arch = "wasm32")] -pub trait IpfsClientAsyncReadSendSync: AsyncRead {} -#[cfg(target_arch = "wasm32")] -impl IpfsClientAsyncReadSendSync for S where S: AsyncRead {} +pub trait IpfsClientAsyncReadSendSync: AsyncRead + ConditionalSync + 'static {} +impl IpfsClientAsyncReadSendSync for S where S: AsyncRead + ConditionalSync + 'static {} /// A generic interface for interacting with an IPFS-like backend where it may /// be desirable to syndicate sphere data to. Although the interface was @@ -32,6 +26,11 @@ impl IpfsClientAsyncReadSendSync for S where S: AsyncRead {} #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait IpfsClient: Clone + Debug { + async fn pin_blocks<'a, I>(&self, cids: I) -> Result<()> + where + I: IntoIterator + ConditionalSend + Debug, + I::IntoIter: ConditionalSend; + /// Returns true if the block (referenced by [Cid]) is pinned by the IPFS /// server async fn block_is_pinned(&self, cid: &Cid) -> Result; diff --git a/rust/noosphere-storage/Cargo.toml b/rust/noosphere-storage/Cargo.toml index 83483021d..b35cc0268 100644 --- a/rust/noosphere-storage/Cargo.toml +++ b/rust/noosphere-storage/Cargo.toml @@ -38,7 +38,7 @@ wasm-bindgen-test = { workspace = true } rand = { workspace = true } noosphere-core-dev = { path = "../noosphere-core", features = ["helpers"], package = "noosphere-core" } noosphere-common = { path = "../noosphere-common", features = ["helpers"] } -instant = { version = "0.1.12", features = ["wasm-bindgen"] } +instant = { workspace = true } [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] tempfile = { workspace = true } diff --git a/rust/noosphere-storage/src/db.rs b/rust/noosphere-storage/src/db.rs index e54501032..da62003f1 100644 --- a/rust/noosphere-storage/src/db.rs +++ b/rust/noosphere-storage/src/db.rs @@ -80,9 +80,7 @@ where /// Record the tip of a local sphere lineage as a [Cid] pub async fn set_version(&mut self, identity: &str, version: &Cid) -> Result<()> { - self.version_store - .set_key(identity.to_string(), version) - .await + self.version_store.set_key(identity, version).await } /// Get the most recently recorded tip of a local sphere lineage diff --git a/rust/noosphere-storage/src/retry.rs b/rust/noosphere-storage/src/retry.rs index 0d1260d85..c83a41d26 100644 --- a/rust/noosphere-storage/src/retry.rs +++ b/rust/noosphere-storage/src/retry.rs @@ -7,11 +7,11 @@ use tokio::select; use crate::BlockStore; const DEFAULT_MAX_RETRIES: u32 = 2u32; -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(2); +const DEFAULT_TIMEOUT: Duration = Duration::from_millis(1500); const DEFAULT_MINIMUM_DELAY: Duration = Duration::from_secs(1); const DEFAULT_BACKOFF: Backoff = Backoff::Exponential { exponent: 2f32, - ceiling: Duration::from_secs(10), + ceiling: Duration::from_secs(6), }; /// Backoff configuration used to define how [BlockStoreRetry] should time diff --git a/rust/noosphere/Cargo.toml b/rust/noosphere/Cargo.toml index 3ff62d900..c06e58c15 100644 --- a/rust/noosphere/Cargo.toml +++ b/rust/noosphere/Cargo.toml @@ -33,6 +33,7 @@ async-stream = { workspace = true } tracing = { workspace = true } url = { workspace = true, features = ["serde"] } subtext = { workspace = true } +instant = { workspace = true } itertools = "0.11.0" rand = { workspace = true } tokio-stream = { workspace = true } diff --git a/rust/noosphere/tests/distributed_basic.rs b/rust/noosphere/tests/distributed_basic.rs index 2d0e91cd9..e61a8b49c 100644 --- a/rust/noosphere/tests/distributed_basic.rs +++ b/rust/noosphere/tests/distributed_basic.rs @@ -12,13 +12,14 @@ extern crate noosphere_ns_dev as noosphere_ns; extern crate tracing; use anyhow::Result; +use instant::Duration; use noosphere_cli::helpers::{start_name_system_server, SpherePair}; use noosphere_common::helpers::wait; use noosphere_core::{ context::{ HasMutableSphereContext, HasSphereContext, SphereContentRead, SphereContentWrite, - SphereCursor, SpherePetnameRead, SpherePetnameWrite, SphereReplicaRead, SphereSync, - SphereWalker, + SphereCursor, SpherePetnameRead, SpherePetnameWrite, SphereReplicaRead, SphereReplicaWrite, + SphereSync, SphereWalker, }, data::{ContentType, Did, Link, MemoIpld}, stream::memo_history_stream, @@ -751,3 +752,85 @@ async fn all_of_client_history_is_made_manifest_on_the_gateway_after_sync() -> R Ok(()) } + +#[tokio::test] +async fn fall_back_to_local_content_when_kubo_times_out_on_block() -> Result<()> { + initialize_tracing(None); + + let ipfs_url = Url::parse("http://127.0.0.1:5001")?; + let (ns_url, ns_task) = start_name_system_server(&ipfs_url).await?; + + let mut pair_1 = SpherePair::new("ONE", &ipfs_url, &ns_url).await?; + let mut pair_2 = SpherePair::new("TWO", &ipfs_url, &ns_url).await?; + + let sphere_1_identity = pair_1.client.identity.clone(); + + pair_1.start_gateway().await?; + pair_2.start_gateway().await?; + + let fallback_version = pair_1 + .spawn(|mut ctx| async move { + ctx.write("foo", &ContentType::Text, b"bar".as_ref(), None) + .await?; + ctx.save(None).await?; + let fallback_version = ctx.sync().await?; + wait(1).await; + + Ok(fallback_version) + }) + .await?; + + pair_2 + .spawn(|mut ctx| async move { + ctx.set_petname("one", Some(sphere_1_identity)).await?; + ctx.save(None).await?; + ctx.sync().await?; + wait(1).await; + ctx.sync().await?; + + // Traverse to peer at a known-to-be-available version to ensure it + // becomes locally cached + let cursor = SphereCursor::latest(ctx); + let _ = cursor.traverse_by_petnames(&[String::from("one")]).await?; + + Ok(()) + }) + .await?; + + let unavailable_link_record = pair_1 + .spawn(|mut ctx| async move { + // Make some changes but don't sync to simulate unavailable blocks + ctx.write("baz", &ContentType::Text, b"foo".as_ref(), None) + .await?; + ctx.save(None).await?; + + Ok(ctx + .create_link_record(Some(Duration::from_secs(120))) + .await?) + }) + .await?; + + pair_2 + .spawn(move |mut ctx| async move { + ctx.set_petname_record("one", &unavailable_link_record) + .await?; + + ctx.save(None).await?; + + let cursor = SphereCursor::latest(ctx); + + let peer = cursor + .traverse_by_petnames(&[String::from("one")]) + .await? + .unwrap(); + + assert_eq!(peer.version().await?, fallback_version); + + Ok(()) + }) + .await?; + + ns_task.abort(); + + Ok(()) +} diff --git a/rust/noosphere/tests/sphere_channel.rs b/rust/noosphere/tests/sphere_channel.rs index 6631e601b..5e8222c71 100644 --- a/rust/noosphere/tests/sphere_channel.rs +++ b/rust/noosphere/tests/sphere_channel.rs @@ -6,7 +6,7 @@ use std::pin::Pin; #[cfg(target_arch = "wasm32")] use instant::Duration; #[cfg(not(target_arch = "wasm32"))] -use std::time::Duration; +use instant::Duration; use anyhow::Result; use async_stream::try_stream;