From a4befedb8d3b6e5d51fa3a6d69cfd6ee5e6f31cd Mon Sep 17 00:00:00 2001 From: Darius Date: Sat, 9 Nov 2024 22:47:34 -0500 Subject: [PATCH 01/26] chore: Add boilerplate of discovery refactor --- extensions/warp-ipfs/src/store/discovery.rs | 318 +++++++++++++++++++- 1 file changed, 315 insertions(+), 3 deletions(-) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 5522b25ba..f68652171 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -1,9 +1,22 @@ +use futures::{FutureExt, StreamExt}; +use rust_ipfs::{ + libp2p::swarm::dial_opts::DialOpts, ConnectionEvents, Ipfs, Multiaddr, PeerConnectionEvents, + PeerId, +}; +use std::cmp::Ordering; use std::{collections::HashSet, fmt::Debug, hash::Hash, sync::Arc, time::Duration}; -use futures::StreamExt; -use rust_ipfs::{libp2p::swarm::dial_opts::DialOpts, Ipfs, Multiaddr, PeerId}; - +use futures::channel::oneshot; +use futures::future::BoxFuture; +use futures::stream::BoxStream; +use indexmap::IndexSet; +use pollable_map::futures::FutureMap; +use rust_ipfs::libp2p::swarm::ConnectionId; use std::collections::{hash_map::Entry, HashMap}; +use std::future::Future; +use std::hash::Hasher; +use std::pin::Pin; +use std::task::{Context, Poll, Waker}; use tokio::sync::{broadcast, RwLock}; use rust_ipfs::p2p::MultiaddrExt; @@ -313,6 +326,56 @@ impl Discovery { } } +enum DiscoveryCommand { + Insert { + peer_id: PeerId, + response: oneshot::Sender>, + }, + Remove { + peer_id: PeerId, + response: oneshot::Sender>, + }, + Contains { + peer_id: PeerId, + response: oneshot::Sender, + }, + List { + response: oneshot::Sender>, + }, +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct DiscoveryRecord { + peer_id: PeerId, + addresses: HashSet, +} + +impl Hash for DiscoveryRecord { + fn hash(&self, state: &mut H) { + self.peer_id.hash(state) + } +} + +impl PartialOrd for DiscoveryRecord { + fn partial_cmp(&self, other: &Self) -> Option { + self.peer_id.partial_cmp(&other.peer_id) + } +} + +impl DiscoveryRecord { + pub fn peer_id(&self) -> &PeerId { + &self.peer_id + } + + pub fn did_key(&self) -> DID { + self.peer_id.to_did().unwrap() + } + + pub fn addresses(&self) -> &HashSet { + &self.addresses + } +} + #[derive(Clone)] pub struct DiscoveryEntry { ipfs: Ipfs, @@ -336,6 +399,255 @@ impl Hash for DiscoveryEntry { } } +enum DiscoveryPeerStatus { + Initial { + fut: BoxFuture<'static, Result, anyhow::Error>>, + }, + Status { + stream: BoxStream<'static, PeerConnectionEvents>, + }, +} + +struct DiscoveryPeerTask { + peer_id: PeerId, + ipfs: Ipfs, + addresses: HashSet, + connections: HashMap, + status: DiscoveryPeerStatus, + dialing_task: Option>>, + waker: Option, +} + +impl DiscoveryPeerTask { + pub fn new(ipfs: &Ipfs, peer_id: PeerId) -> Self { + let status = DiscoveryPeerStatus::Initial { + fut: { + let ipfs = ipfs.clone(); + async move { ipfs.peer_connection_events(peer_id).await }.boxed() + }, + }; + + Self { + peer_id, + ipfs: ipfs.clone(), + addresses: HashSet::new(), + connections: HashMap::new(), + status, + dialing_task: None, + waker: None, + } + } + + pub fn set_connection(mut self, connection_id: ConnectionId, addr: Multiaddr) -> Self { + self.addresses.insert(addr.clone()); + self.connections.insert(connection_id, addr); + self + } +} + +impl DiscoveryPeerTask { + pub fn addresses(&self) -> &HashSet { + &self.addresses + } + + pub fn is_connected(&self) -> bool { + !self.connections.is_empty() + } + + pub fn dial(&mut self) { + if !self.connections.is_empty() { + return; + } + + if self.dialing_task.is_some() { + return; + } + + let peer_id = self.peer_id; + let ipfs = self.ipfs.clone(); + + let opt = match self.addresses.is_empty() { + true => DialOpts::peer_id(peer_id).build(), + false => DialOpts::peer_id(peer_id) + .addresses(Vec::from_iter(self.addresses.clone())) + .build(), + }; + + let fut = async move { ipfs.connect(opt).await.map_err(Error::from) }; + + self.dialing_task = Some(Box::pin(fut)); + + if let Some(waker) = self.waker.take() { + waker.wake(); + } + } +} + +impl Future for DiscoveryPeerTask { + type Output = (); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + if let Some(fut) = self.dialing_task.as_mut() { + match fut.poll_unpin(cx) { + Poll::Ready(result) => { + if let Err(e) = result { + tracing::error!(error = %e, "dialing failed"); + } + self.dialing_task.take(); + } + Poll::Pending => {} + } + } + + loop { + match self.status { + DiscoveryPeerStatus::Initial { ref mut fut } => match fut.poll_unpin(cx) { + Poll::Ready(result) => { + let stream = result.expect("instance is valid"); + self.status = DiscoveryPeerStatus::Status { stream }; + } + Poll::Pending => break, + }, + DiscoveryPeerStatus::Status { ref mut stream } => { + match stream.poll_next_unpin(cx) { + Poll::Ready(Some(event)) => match event { + PeerConnectionEvents::IncomingConnection { + connection_id, + addr, + } + | PeerConnectionEvents::OutgoingConnection { + connection_id, + addr, + } => { + self.addresses.insert(addr.clone()); + self.connections.insert(connection_id, addr); + } + PeerConnectionEvents::ClosedConnection { connection_id } => { + self.connections.remove(&connection_id); + } + }, + Poll::Ready(None) => unreachable!(), + Poll::Pending => break, + } + } + } + } + self.waker = Some(cx.waker().clone()); + Poll::Pending + } +} + +pub struct DiscoveryTask { + ipfs: Ipfs, + config: DiscoveryConfig, + relays: Vec, + peers: FutureMap, + command_rx: futures::channel::mpsc::Receiver, + connection_event: BoxStream<'static, ConnectionEvents>, + + discovery_fut: Option<()>, + + waker: Option, +} + +enum DiscoverySelect {} + +#[derive(Clone, Copy, Eq, PartialEq)] +pub enum DiscoveryType { + RzPoint, + Shuttle, + DHT, +} + +impl DiscoveryTask { + pub async fn new( + ipfs: &Ipfs, + command_rx: futures::channel::mpsc::Receiver, + config: DiscoveryConfig, + relays: Vec, + ) -> Self { + let connection_event = ipfs.connection_events().await.expect("should not fail"); + + Self { + ipfs: ipfs.clone(), + config, + relays, + peers: FutureMap::new(), + command_rx, + connection_event, + discovery_fut: None, + waker: None, + } + } +} + +impl DiscoveryTask { + pub fn dht_discovery(&self, namespace: String) { + let ipfs = self.ipfs.clone(); + let _fut = async move { + let bytes = namespace.as_bytes(); + let stream = ipfs.dht_get_providers(bytes.to_vec()).await?; + // We collect instead of passing the stream through and polling there is to try to maintain compatibility in discovery + // Note: This may change in the future where we would focus on a single discovery method + let peers = stream.collect::>().await; + Ok(peers) + }; + } +} + +impl Future for DiscoveryTask { + type Output = (); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + loop { + match self.command_rx.poll_next_unpin(cx) { + Poll::Ready(Some(command)) => match command { + DiscoveryCommand::Insert { .. } => {} + DiscoveryCommand::Remove { .. } => {} + DiscoveryCommand::Contains { .. } => {} + DiscoveryCommand::List { .. } => {} + }, + Poll::Ready(None) => unreachable!(), + Poll::Pending => break, + } + } + + loop { + match self.connection_event.poll_next_unpin(cx) { + Poll::Ready(Some(event)) => match event { + ConnectionEvents::IncomingConnection { + peer_id, + connection_id, + addr, + } + | ConnectionEvents::OutgoingConnection { + peer_id, + connection_id, + addr, + } => { + if self.peers.contains_key(&peer_id) { + continue; + } + + let task = DiscoveryPeerTask::new(&self.ipfs, peer_id) + .set_connection(connection_id, addr); + + self.peers.insert(peer_id, task); + } + ConnectionEvents::ClosedConnection { .. } => { + // Note: Since we are handling individual peers connection tracking, we can ignore this event + } + }, + Poll::Ready(None) => unreachable!(), + Poll::Pending => break, + } + } + + let _ = self.peers.poll_next_unpin(cx); + self.waker = Some(cx.waker().clone()); + + Poll::Pending + } +} + impl Eq for DiscoveryEntry {} impl DiscoveryEntry { From 5cacc2757230eb6932861fbe95edb0057912a684 Mon Sep 17 00:00:00 2001 From: Darius Date: Sun, 10 Nov 2024 11:42:08 -0500 Subject: [PATCH 02/26] chore: Additional logic to future --- extensions/warp-ipfs/src/lib.rs | 2 +- extensions/warp-ipfs/src/store/discovery.rs | 811 ++++++++++---------- extensions/warp-ipfs/src/store/identity.rs | 2 +- 3 files changed, 401 insertions(+), 414 deletions(-) diff --git a/extensions/warp-ipfs/src/lib.rs b/extensions/warp-ipfs/src/lib.rs index a06400b0c..513033225 100644 --- a/extensions/warp-ipfs/src/lib.rs +++ b/extensions/warp-ipfs/src/lib.rs @@ -597,7 +597,7 @@ impl WarpIpfs { } let discovery = - Discovery::new(&ipfs, &self.inner.config.store_setting().discovery, &relays); + Discovery::new(&ipfs, &self.inner.config.store_setting().discovery, &relays).await; let phonebook = PhoneBook::new(discovery.clone(), pb_tx); diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index f68652171..9127562d2 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -1,34 +1,29 @@ -use futures::{FutureExt, StreamExt}; +use futures::{FutureExt, SinkExt, Stream, StreamExt}; use rust_ipfs::{ libp2p::swarm::dial_opts::DialOpts, ConnectionEvents, Ipfs, Multiaddr, PeerConnectionEvents, PeerId, }; use std::cmp::Ordering; -use std::{collections::HashSet, fmt::Debug, hash::Hash, sync::Arc, time::Duration}; +use std::{collections::HashSet, fmt::Debug, hash::Hash}; use futures::channel::oneshot; use futures::future::BoxFuture; use futures::stream::BoxStream; use indexmap::IndexSet; -use pollable_map::futures::FutureMap; +use pollable_map::stream::StreamMap; use rust_ipfs::libp2p::swarm::ConnectionId; -use std::collections::{hash_map::Entry, HashMap}; +use std::collections::{HashMap, VecDeque}; use std::future::Future; use std::hash::Hasher; use std::pin::Pin; use std::task::{Context, Poll, Waker}; -use tokio::sync::{broadcast, RwLock}; - -use rust_ipfs::p2p::MultiaddrExt; +use tokio::sync::broadcast; use crate::rt::{AbortableJoinHandle, Executor, LocalExecutor}; use warp::{crypto::DID, error::Error}; -use crate::{ - config::{Discovery as DiscoveryConfig, DiscoveryType}, - store::topics::PeerTopic, -}; +use crate::config::Discovery as DiscoveryConfig; use super::{DidExt, PeerIdExt, PeerType}; @@ -39,200 +34,209 @@ use super::{DidExt, PeerIdExt, PeerType}; pub struct Discovery { ipfs: Ipfs, config: DiscoveryConfig, - entries: Arc>>, - task: Arc>>>, - events: broadcast::Sender, - relays: Vec, + command_tx: futures::channel::mpsc::Sender, + broadcast_tx: broadcast::Sender, + handle: AbortableJoinHandle<()>, executor: LocalExecutor, } impl Discovery { - pub fn new(ipfs: &Ipfs, config: &DiscoveryConfig, relays: &[Multiaddr]) -> Self { - let (events, _) = tokio::sync::broadcast::channel(2048); + pub async fn new(ipfs: &Ipfs, config: &DiscoveryConfig, relays: &[Multiaddr]) -> Self { + let executor = LocalExecutor::default(); + let (command_tx, command_rx) = futures::channel::mpsc::channel(0); + let (broadcast_tx, _) = broadcast::channel(2048); + let task = DiscoveryTask::new( + ipfs, + command_rx, + broadcast_tx.clone(), + config, + relays.to_vec(), + ) + .await; + let handle = executor.spawn_abortable(task); Self { + command_tx, ipfs: ipfs.clone(), config: config.clone(), - entries: Arc::default(), - task: Arc::default(), - events, - relays: relays.to_vec(), - executor: LocalExecutor, + broadcast_tx, + handle, + executor, } } /// Start discovery task /// Note: This starting will only work across a provided namespace - pub async fn start(&self) -> Result<(), Error> { - match &self.config { - DiscoveryConfig::Namespace { - discovery_type: DiscoveryType::DHT, - namespace, - } => { - let namespace = namespace.clone().unwrap_or_else(|| "warp-mp-ipfs".into()); - let cid = self.ipfs.put_dag(format!("discovery:{namespace}")).await?; - - let task = self.executor.spawn_abortable({ - let discovery = self.clone(); - async move { - let mut cached = HashSet::new(); - - if let Err(e) = discovery.ipfs.provide(cid).await { - //Maybe panic? - tracing::error!("Error providing key: {e}"); - return; - } - - loop { - if let Ok(mut stream) = discovery.ipfs.get_providers(cid).await { - while let Some(peer_id) = stream.next().await { - if discovery - .ipfs - .is_connected(peer_id) - .await - .unwrap_or_default() - && cached.insert(peer_id) - && !discovery.contains(peer_id).await - { - let entry = DiscoveryEntry::new( - &discovery.ipfs, - peer_id, - discovery.config.clone(), - discovery.events.clone(), - discovery.relays.clone(), - ) - .await; - if discovery.entries.write().await.insert(entry.clone()) { - entry.start().await; - } - } - } - } - futures_timer::Delay::new(Duration::from_secs(1)).await; - } - } - }); - - *self.task.write().await = Some(task); - } - DiscoveryConfig::Namespace { - discovery_type: DiscoveryType::RzPoint { addresses }, - namespace, - } => { - let mut peers = vec![]; - for mut addr in addresses.iter().cloned() { - let Some(peer_id) = addr.extract_peer_id() else { - continue; - }; - - if let Err(e) = self.ipfs.add_peer((peer_id, addr)).await { - tracing::error!("Error adding peer to address book {e}"); - continue; - } - - peers.push(peer_id); - } - - let namespace = namespace.clone().unwrap_or_else(|| "warp-mp-ipfs".into()); - let mut register_id = vec![]; - - for peer_id in &peers { - if let Err(e) = self - .ipfs - .rendezvous_register_namespace(namespace.clone(), None, *peer_id) - .await - { - tracing::error!("Error registering to namespace: {e}"); - continue; - } - - register_id.push(*peer_id); - } - - if register_id.is_empty() { - return Err(Error::OtherWithContext( - "Unable to register to any external nodes".into(), - )); - } - - let task = self.executor.spawn_abortable({ - let discovery = self.clone(); - let register_id = register_id; - async move { - let mut meshed_map: HashMap> = - HashMap::new(); - - loop { - for peer_id in ®ister_id { - let map = match discovery - .ipfs - .rendezvous_namespace_discovery( - namespace.clone(), - None, - *peer_id, - ) - .await - { - Ok(map) => map, - Err(e) => { - tracing::error!(namespace = %namespace, error = %e, "failed to perform discovery over given namespace"); - continue; - } - }; - - for (peer_id, addrs) in map { - match meshed_map.entry(peer_id) { - Entry::Occupied(mut entry) => { - entry.get_mut().extend(addrs); - } - Entry::Vacant(entry) => { - entry.insert(HashSet::from_iter( - addrs.iter().cloned(), - )); - if !discovery - .ipfs - .is_connected(peer_id) - .await - .unwrap_or_default() - && discovery.ipfs.connect(peer_id).await.is_ok() - && !discovery.contains(peer_id).await - { - let entry = DiscoveryEntry::new( - &discovery.ipfs, - peer_id, - discovery.config.clone(), - discovery.events.clone(), - discovery.relays.clone(), - ) - .await; - - if discovery - .entries - .write() - .await - .insert(entry.clone()) - { - entry.start().await; - } - } - } - } - } - } - futures_timer::Delay::new(Duration::from_secs(5)).await; - } - } - }); - - *self.task.write().await = Some(task); - } - DiscoveryConfig::Shuttle { addresses: _ } => {} - _ => {} - } - Ok(()) - } + // pub async fn start(&self) -> Result<(), Error> { + // match &self.config { + // DiscoveryConfig::Namespace { + // discovery_type: DiscoveryType::DHT, + // namespace, + // } => { + // let namespace = namespace.clone().unwrap_or_else(|| "warp-mp-ipfs".into()); + // let cid = self.ipfs.put_dag(format!("discovery:{namespace}")).await?; + // + // let task = self.executor.spawn_abortable({ + // let discovery = self.clone(); + // async move { + // let mut cached = HashSet::new(); + // + // if let Err(e) = discovery.ipfs.provide(cid).await { + // //Maybe panic? + // tracing::error!("Error providing key: {e}"); + // return; + // } + // + // loop { + // if let Ok(mut stream) = discovery.ipfs.get_providers(cid).await { + // while let Some(peer_id) = stream.next().await { + // if discovery + // .ipfs + // .is_connected(peer_id) + // .await + // .unwrap_or_default() + // && cached.insert(peer_id) + // && !discovery.contains(peer_id).await + // { + // let entry = DiscoveryEntry::new( + // &discovery.ipfs, + // peer_id, + // discovery.config.clone(), + // discovery.events.clone(), + // discovery.relays.clone(), + // ) + // .await; + // if discovery.entries.write().await.insert(entry.clone()) { + // entry.start().await; + // } + // } + // } + // } + // futures_timer::Delay::new(Duration::from_secs(1)).await; + // } + // } + // }); + // + // *self.task.write().await = Some(task); + // } + // DiscoveryConfig::Namespace { + // discovery_type: DiscoveryType::RzPoint { addresses }, + // namespace, + // } => { + // let mut peers = vec![]; + // for mut addr in addresses.iter().cloned() { + // let Some(peer_id) = addr.extract_peer_id() else { + // continue; + // }; + // + // if let Err(e) = self.ipfs.add_peer((peer_id, addr)).await { + // tracing::error!("Error adding peer to address book {e}"); + // continue; + // } + // + // peers.push(peer_id); + // } + // + // let namespace = namespace.clone().unwrap_or_else(|| "warp-mp-ipfs".into()); + // let mut register_id = vec![]; + // + // for peer_id in &peers { + // if let Err(e) = self + // .ipfs + // .rendezvous_register_namespace(namespace.clone(), None, *peer_id) + // .await + // { + // tracing::error!("Error registering to namespace: {e}"); + // continue; + // } + // + // register_id.push(*peer_id); + // } + // + // if register_id.is_empty() { + // return Err(Error::OtherWithContext( + // "Unable to register to any external nodes".into(), + // )); + // } + // + // let task = self.executor.spawn_abortable({ + // let discovery = self.clone(); + // let register_id = register_id; + // async move { + // let mut meshed_map: HashMap> = + // HashMap::new(); + // + // loop { + // for peer_id in ®ister_id { + // let map = match discovery + // .ipfs + // .rendezvous_namespace_discovery( + // namespace.clone(), + // None, + // *peer_id, + // ) + // .await + // { + // Ok(map) => map, + // Err(e) => { + // tracing::error!(namespace = %namespace, error = %e, "failed to perform discovery over given namespace"); + // continue; + // } + // }; + // + // for (peer_id, addrs) in map { + // match meshed_map.entry(peer_id) { + // Entry::Occupied(mut entry) => { + // entry.get_mut().extend(addrs); + // } + // Entry::Vacant(entry) => { + // entry.insert(HashSet::from_iter( + // addrs.iter().cloned(), + // )); + // if !discovery + // .ipfs + // .is_connected(peer_id) + // .await + // .unwrap_or_default() + // && discovery.ipfs.connect(peer_id).await.is_ok() + // && !discovery.contains(peer_id).await + // { + // let entry = DiscoveryEntry::new( + // &discovery.ipfs, + // peer_id, + // discovery.config.clone(), + // discovery.events.clone(), + // discovery.relays.clone(), + // ) + // .await; + // + // if discovery + // .entries + // .write() + // .await + // .insert(entry.clone()) + // { + // entry.start().await; + // } + // } + // } + // } + // } + // } + // futures_timer::Delay::new(Duration::from_secs(5)).await; + // } + // } + // }); + // + // *self.task.write().await = Some(task); + // } + // DiscoveryConfig::Shuttle { addresses: _ } => {} + // _ => {} + // } + // Ok(()) + // } pub fn events(&self) -> broadcast::Receiver { - self.events.subscribe() + self.broadcast_tx.subscribe() } pub fn discovery_config(&self) -> &DiscoveryConfig { @@ -246,62 +250,55 @@ impl Discovery { PeerType::DID(did_key) => did_key.to_peer_id()?, }; - if self.get(peer_id).await.is_ok() { - return Ok(()); - } - - let own_peer_id = self.ipfs.keypair().public().to_peer_id(); - - if peer_id == own_peer_id { - return Ok(()); - } - - let entry = DiscoveryEntry::new( - &self.ipfs, - peer_id, - self.config.clone(), - self.events.clone(), - self.relays.clone(), - ) - .await; - entry.start().await; - let prev = self.entries.write().await.replace(entry); - if let Some(entry) = prev { - entry.cancel().await; - } - - Ok(()) + let (tx, rx) = futures::channel::oneshot::channel(); + let _ = self + .command_tx + .clone() + .send(DiscoveryCommand::Insert { + peer_id, + response: tx, + }) + .await; + + rx.await.map_err(anyhow::Error::from)? } pub async fn remove>(&self, peer_type: P) -> Result<(), Error> { - let entry = self.get(peer_type).await?; - - let removed = self.entries.write().await.remove(&entry); - if removed { - entry.cancel().await; - return Ok(()); - } + let peer_id = match &peer_type.into() { + PeerType::PeerId(peer_id) => *peer_id, + PeerType::DID(did_key) => did_key.to_peer_id()?, + }; - Err(Error::ObjectNotFound) + let (tx, rx) = futures::channel::oneshot::channel(); + let _ = self + .command_tx + .clone() + .send(DiscoveryCommand::Remove { + peer_id, + response: tx, + }) + .await; + + rx.await.map_err(anyhow::Error::from)? } - pub async fn get>(&self, peer_type: P) -> Result { + pub async fn get>(&self, peer_type: P) -> Result { let peer_id = match &peer_type.into() { PeerType::PeerId(peer_id) => *peer_id, PeerType::DID(did_key) => did_key.to_peer_id()?, }; - if !self.contains(peer_id).await { - return Err(Error::ObjectNotFound); - } - - self.entries - .read() - .await - .iter() - .find(|entry| entry.peer_id() == peer_id) - .cloned() - .ok_or(Error::ObjectNotFound) + let (tx, rx) = futures::channel::oneshot::channel(); + let _ = self + .command_tx + .clone() + .send(DiscoveryCommand::Get { + peer_id, + response: tx, + }) + .await; + + rx.await.map_err(anyhow::Error::from)? } pub async fn contains>(&self, peer_type: P) -> bool { @@ -315,14 +312,35 @@ impl Discovery { } }; - self.list() - .await - .iter() - .any(|entry| entry.peer_id().eq(&peer_id)) + let (tx, rx) = futures::channel::oneshot::channel(); + let _ = self + .command_tx + .clone() + .send(DiscoveryCommand::Contains { + peer_id, + response: tx, + }) + .await; + + match rx.await { + Ok(value) => value, + _ => false, + } } - pub async fn list(&self) -> HashSet { - self.entries.read().await.clone() + pub async fn list(&self) -> HashSet { + let (tx, rx) = futures::channel::oneshot::channel(); + let _ = self + .command_tx + .clone() + .send(DiscoveryCommand::List { response: tx }) + .await; + + let Ok(list) = rx.await else { + return HashSet::new(); + }; + + HashSet::from_iter(list) } } @@ -331,6 +349,10 @@ enum DiscoveryCommand { peer_id: PeerId, response: oneshot::Sender>, }, + Get { + peer_id: PeerId, + response: oneshot::Sender>, + }, Remove { peer_id: PeerId, response: oneshot::Sender>, @@ -340,7 +362,7 @@ enum DiscoveryCommand { response: oneshot::Sender, }, List { - response: oneshot::Sender>, + response: oneshot::Sender>, }, } @@ -376,29 +398,6 @@ impl DiscoveryRecord { } } -#[derive(Clone)] -pub struct DiscoveryEntry { - ipfs: Ipfs, - peer_id: PeerId, - config: DiscoveryConfig, - drop_guard: Arc>>>, - sender: broadcast::Sender, - relays: Vec, - executor: LocalExecutor, -} - -impl PartialEq for DiscoveryEntry { - fn eq(&self, other: &Self) -> bool { - self.peer_id.eq(&other.peer_id) - } -} - -impl Hash for DiscoveryEntry { - fn hash(&self, state: &mut H) { - self.peer_id.hash(state); - } -} - enum DiscoveryPeerStatus { Initial { fut: BoxFuture<'static, Result, anyhow::Error>>, @@ -413,8 +412,10 @@ struct DiscoveryPeerTask { ipfs: Ipfs, addresses: HashSet, connections: HashMap, + connected: bool, status: DiscoveryPeerStatus, dialing_task: Option>>, + is_connected_fut: Option>, waker: Option, } @@ -427,12 +428,19 @@ impl DiscoveryPeerTask { }, }; + let is_connected_fut = Some({ + let ipfs = ipfs.clone(); + async move { ipfs.is_connected(peer_id).await.unwrap_or_default() }.boxed() + }); + Self { peer_id, ipfs: ipfs.clone(), addresses: HashSet::new(), connections: HashMap::new(), + connected: false, status, + is_connected_fut, dialing_task: None, waker: None, } @@ -443,6 +451,11 @@ impl DiscoveryPeerTask { self.connections.insert(connection_id, addr); self } + + pub fn set_addresses(mut self, addresses: impl IntoIterator) -> Self { + self.addresses.extend(addresses); + self + } } impl DiscoveryPeerTask { @@ -450,8 +463,9 @@ impl DiscoveryPeerTask { &self.addresses } + #[allow(dead_code)] pub fn is_connected(&self) -> bool { - !self.connections.is_empty() + !self.connections.is_empty() || self.connected } pub fn dial(&mut self) { @@ -483,9 +497,29 @@ impl DiscoveryPeerTask { } } -impl Future for DiscoveryPeerTask { - type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { +enum DiscoveryPeerEvent { + Connected, + Disconnected, +} + +impl Stream for DiscoveryPeerTask { + type Item = DiscoveryPeerEvent; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + if let Some(fut) = self.is_connected_fut.as_mut() { + match fut.poll_unpin(cx) { + Poll::Ready(connected) => { + self.connected = connected; + self.is_connected_fut.take(); + let event = match connected { + true => DiscoveryPeerEvent::Connected, + false => DiscoveryPeerEvent::Disconnected, + }; + return Poll::Ready(Some(event)); + } + Poll::Pending => {} + } + } + if let Some(fut) = self.dialing_task.as_mut() { match fut.poll_unpin(cx) { Poll::Ready(result) => { @@ -518,11 +552,21 @@ impl Future for DiscoveryPeerTask { connection_id, addr, } => { + let currently_connected = + !self.connections.is_empty() | self.connected; self.addresses.insert(addr.clone()); self.connections.insert(connection_id, addr); + self.connected = true; + if !currently_connected { + return Poll::Ready(Some(DiscoveryPeerEvent::Connected)); + } } PeerConnectionEvents::ClosedConnection { connection_id } => { self.connections.remove(&connection_id); + if self.connections.is_empty() { + self.connected = false; + return Poll::Ready(Some(DiscoveryPeerEvent::Disconnected)); + } } }, Poll::Ready(None) => unreachable!(), @@ -536,42 +580,39 @@ impl Future for DiscoveryPeerTask { } } -pub struct DiscoveryTask { +#[allow(dead_code)] +struct DiscoveryTask { ipfs: Ipfs, + pending_broadcast: VecDeque, config: DiscoveryConfig, relays: Vec, - peers: FutureMap, + peers: StreamMap, command_rx: futures::channel::mpsc::Receiver, connection_event: BoxStream<'static, ConnectionEvents>, discovery_fut: Option<()>, - waker: Option, -} + broadcast_tx: broadcast::Sender, -enum DiscoverySelect {} - -#[derive(Clone, Copy, Eq, PartialEq)] -pub enum DiscoveryType { - RzPoint, - Shuttle, - DHT, + waker: Option, } impl DiscoveryTask { pub async fn new( ipfs: &Ipfs, command_rx: futures::channel::mpsc::Receiver, - config: DiscoveryConfig, + broadcast_tx: broadcast::Sender, + config: &DiscoveryConfig, relays: Vec, ) -> Self { let connection_event = ipfs.connection_events().await.expect("should not fail"); - Self { ipfs: ipfs.clone(), - config, + pending_broadcast: VecDeque::new(), + config: config.clone(), relays, - peers: FutureMap::new(), + peers: StreamMap::new(), + broadcast_tx, command_rx, connection_event, discovery_fut: None, @@ -581,6 +622,7 @@ impl DiscoveryTask { } impl DiscoveryTask { + #[allow(dead_code)] pub fn dht_discovery(&self, namespace: String) { let ipfs = self.ipfs.clone(); let _fut = async move { @@ -589,7 +631,7 @@ impl DiscoveryTask { // We collect instead of passing the stream through and polling there is to try to maintain compatibility in discovery // Note: This may change in the future where we would focus on a single discovery method let peers = stream.collect::>().await; - Ok(peers) + Ok::<_, Error>(peers) }; } } @@ -598,12 +640,64 @@ impl Future for DiscoveryTask { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { loop { + if let Some(peer_id) = self.pending_broadcast.pop_front() { + if let Ok(did) = peer_id.to_did() { + let _ = self.broadcast_tx.send(did); + } + continue; + } + match self.command_rx.poll_next_unpin(cx) { Poll::Ready(Some(command)) => match command { - DiscoveryCommand::Insert { .. } => {} - DiscoveryCommand::Remove { .. } => {} - DiscoveryCommand::Contains { .. } => {} - DiscoveryCommand::List { .. } => {} + DiscoveryCommand::Insert { peer_id, response } => { + if self.peers.contains_key(&peer_id) { + let _ = response.send(Err(Error::IdentityExist)); + continue; + } + + let mut task = DiscoveryPeerTask::new(&self.ipfs, peer_id) + .set_addresses(self.relays.clone()); + + task.dial(); + self.peers.insert(peer_id, task); + let _ = response.send(Ok(())); + } + DiscoveryCommand::Remove { peer_id, response } => { + if !self.peers.contains_key(&peer_id) { + let _ = response.send(Err(Error::IdentityDoesntExist)); + continue; + } + let _ = self.peers.remove(&peer_id); + let _ = response.send(Ok(())); + } + DiscoveryCommand::Contains { peer_id, response } => { + let _ = response.send(self.peers.contains_key(&peer_id)); + } + DiscoveryCommand::Get { peer_id, response } => { + let Some(task) = self.peers.get(&peer_id) else { + let _ = response.send(Err(Error::IdentityDoesntExist)); + continue; + }; + + let record = DiscoveryRecord { + peer_id, + addresses: task.addresses().clone(), + }; + + let _ = response.send(Ok(record)); + } + DiscoveryCommand::List { response } => { + let list = self + .peers + .iter() + .map(|(peer_id, task)| DiscoveryRecord { + peer_id: *peer_id, + addresses: task.addresses().clone(), + }) + .collect::>(); + + let _ = response.send(list); + } }, Poll::Ready(None) => unreachable!(), Poll::Pending => break, @@ -641,133 +735,26 @@ impl Future for DiscoveryTask { } } - let _ = self.peers.poll_next_unpin(cx); - self.waker = Some(cx.waker().clone()); - - Poll::Pending - } -} - -impl Eq for DiscoveryEntry {} - -impl DiscoveryEntry { - pub async fn new( - ipfs: &Ipfs, - peer_id: PeerId, - config: DiscoveryConfig, - sender: broadcast::Sender, - relays: Vec, - ) -> Self { - Self { - ipfs: ipfs.clone(), - peer_id, - config, - drop_guard: Arc::default(), - sender, - relays, - executor: LocalExecutor, - } - } - - pub async fn start(&self) { - let holder = &mut *self.drop_guard.write().await; - - if holder.is_some() { - return; - } - - let fut = { - let entry = self.clone(); - let ipfs = self.ipfs.clone(); - let peer_id = self.peer_id; - async move { - let mut sent_initial_push = false; - if !entry.relays.is_empty() { - //Adding relay for peer to address book in case we are connected over common relays - for addr in entry.relays.clone() { - let _ = ipfs.add_peer((entry.peer_id, addr)).await; - } - } - loop { - if ipfs.is_connected(entry.peer_id).await.unwrap_or_default() { - if !sent_initial_push { - if let Ok(did) = peer_id.to_did() { - futures_timer::Delay::new(Duration::from_millis(500)).await; - tracing::info!("Connected to {did}. Emitting initial event"); - let topic = did.events(); - let subscribed = ipfs - .pubsub_peers(Some(topic)) - .await - .unwrap_or_default() - .contains(&entry.peer_id); - - if subscribed { - let _ = entry.sender.send(did); - sent_initial_push = true; - } - } + loop { + match self.peers.poll_next_unpin(cx) { + Poll::Ready(Some((peer_id, event))) => match event { + DiscoveryPeerEvent::Connected => { + if !self.pending_broadcast.contains(&peer_id) { + self.pending_broadcast.push_back(peer_id); } - futures_timer::Delay::new(Duration::from_secs(10)).await; - continue; } - - match entry.config { - // Used for provider. Doesnt do anything right now - // TODO: Maybe have separate provider query in case - // Discovery task isnt enabled? - DiscoveryConfig::Namespace { - discovery_type: DiscoveryType::DHT, - .. - } => {} - DiscoveryConfig::Namespace { - discovery_type: DiscoveryType::RzPoint { .. }, - .. - } => { - tracing::debug!("Dialing {peer_id}"); - - if let Err(_e) = ipfs.connect(peer_id).await { - tracing::error!("Error connecting to {peer_id}: {_e}"); - futures_timer::Delay::new(Duration::from_secs(10)).await; - continue; - } - } - //TODO: Possibly obtain peer records from external node - // of any connected peer, otherwise await on a response for - // those records to establish a connection - DiscoveryConfig::Shuttle { .. } | DiscoveryConfig::None => { - let opts = DialOpts::peer_id(peer_id) - .addresses(entry.relays.clone()) - .build(); - - tracing::debug!("Dialing {peer_id}"); - - if let Err(_e) = ipfs.connect(opts).await { - tracing::error!("Error connecting to {peer_id}: {_e}"); - futures_timer::Delay::new(Duration::from_secs(10)).await; - continue; - } - } + DiscoveryPeerEvent::Disconnected => { + self.pending_broadcast.retain(|p| *p == peer_id); } - - futures_timer::Delay::new(Duration::from_secs(10)).await; - } + }, + Poll::Ready(None) => unreachable!(), + Poll::Pending => break, } - }; - - let guard = self.executor.spawn_abortable(fut); - - *holder = Some(guard); - } + } - /// Returns a peer id - pub fn peer_id(&self) -> PeerId { - self.peer_id - } + let _ = self.peers.poll_next_unpin(cx); + self.waker = Some(cx.waker().clone()); - pub async fn cancel(&self) { - let task = std::mem::take(&mut *self.drop_guard.write().await); - if let Some(guard) = task { - guard.abort(); - } + Poll::Pending } } diff --git a/extensions/warp-ipfs/src/store/identity.rs b/extensions/warp-ipfs/src/store/identity.rs index b653d5d8f..7548ea920 100644 --- a/extensions/warp-ipfs/src/store/identity.rs +++ b/extensions/warp-ipfs/src/store/identity.rs @@ -439,7 +439,7 @@ impl IdentityStore { } }); - store.discovery.start().await?; + // store.discovery.start().await?; let mut discovery_rx = store.discovery.events(); From e20018249ca033858b0972a81dd5e0d86321d364 Mon Sep 17 00:00:00 2001 From: Darius Date: Sun, 10 Nov 2024 16:07:01 -0500 Subject: [PATCH 03/26] chore: dial if address us set ti peer task --- extensions/warp-ipfs/src/store/discovery.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 9127562d2..6ae5a7dc7 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -655,10 +655,12 @@ impl Future for DiscoveryTask { continue; } - let mut task = DiscoveryPeerTask::new(&self.ipfs, peer_id) - .set_addresses(self.relays.clone()); + let mut task = DiscoveryPeerTask::new(&self.ipfs, peer_id); + if !self.relays.is_empty() { + task = task.set_addresses(self.relays.clone()); + task.dial(); + } - task.dial(); self.peers.insert(peer_id, task); let _ = response.send(Ok(())); } From 0cf2320e70af0707c6e95a2ab3cc7a1988cd1732 Mon Sep 17 00:00:00 2001 From: Darius Date: Sun, 10 Nov 2024 18:00:24 -0500 Subject: [PATCH 04/26] chore: cleanup linting --- extensions/warp-ipfs/src/store/discovery.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 6ae5a7dc7..cc5034671 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -42,7 +42,7 @@ pub struct Discovery { impl Discovery { pub async fn new(ipfs: &Ipfs, config: &DiscoveryConfig, relays: &[Multiaddr]) -> Self { - let executor = LocalExecutor::default(); + let executor = LocalExecutor; let (command_tx, command_rx) = futures::channel::mpsc::channel(0); let (broadcast_tx, _) = broadcast::channel(2048); let task = DiscoveryTask::new( @@ -322,10 +322,7 @@ impl Discovery { }) .await; - match rx.await { - Ok(value) => value, - _ => false, - } + rx.await.unwrap_or_default() } pub async fn list(&self) -> HashSet { From 889754f1da497ef3157a847c4dd3e8e4c3b8cf86 Mon Sep 17 00:00:00 2001 From: Darius Date: Sun, 17 Nov 2024 22:32:48 -0600 Subject: [PATCH 05/26] chore: misc cleanup --- extensions/warp-ipfs/src/store/discovery.rs | 121 ++++++++++---------- 1 file changed, 58 insertions(+), 63 deletions(-) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index cc5034671..84f446d49 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -503,29 +503,23 @@ impl Stream for DiscoveryPeerTask { type Item = DiscoveryPeerEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { if let Some(fut) = self.is_connected_fut.as_mut() { - match fut.poll_unpin(cx) { - Poll::Ready(connected) => { - self.connected = connected; - self.is_connected_fut.take(); - let event = match connected { - true => DiscoveryPeerEvent::Connected, - false => DiscoveryPeerEvent::Disconnected, - }; - return Poll::Ready(Some(event)); - } - Poll::Pending => {} + if let Poll::Ready(connected) = fut.poll_unpin(cx) { + self.connected = connected; + self.is_connected_fut.take(); + let event = match connected { + true => DiscoveryPeerEvent::Connected, + false => DiscoveryPeerEvent::Disconnected, + }; + return Poll::Ready(Some(event)); } } if let Some(fut) = self.dialing_task.as_mut() { - match fut.poll_unpin(cx) { - Poll::Ready(result) => { - if let Err(e) = result { - tracing::error!(error = %e, "dialing failed"); - } - self.dialing_task.take(); + if let Poll::Ready(result) = fut.poll_unpin(cx) { + if let Err(e) = result { + tracing::error!(error = %e, "dialing failed"); } - Poll::Pending => {} + self.dialing_task.take(); } } @@ -540,32 +534,34 @@ impl Stream for DiscoveryPeerTask { }, DiscoveryPeerStatus::Status { ref mut stream } => { match stream.poll_next_unpin(cx) { - Poll::Ready(Some(event)) => match event { - PeerConnectionEvents::IncomingConnection { - connection_id, - addr, - } - | PeerConnectionEvents::OutgoingConnection { - connection_id, - addr, - } => { - let currently_connected = - !self.connections.is_empty() | self.connected; - self.addresses.insert(addr.clone()); - self.connections.insert(connection_id, addr); - self.connected = true; - if !currently_connected { - return Poll::Ready(Some(DiscoveryPeerEvent::Connected)); + Poll::Ready(Some(event)) => { + let (id, addr) = match event { + PeerConnectionEvents::IncomingConnection { + connection_id, + addr, + } => (connection_id, addr), + PeerConnectionEvents::OutgoingConnection { + connection_id, + addr, + } => (connection_id, addr), + PeerConnectionEvents::ClosedConnection { connection_id } => { + self.connections.remove(&connection_id); + if self.connections.is_empty() { + self.connected = false; + return Poll::Ready(Some(DiscoveryPeerEvent::Disconnected)); + } + continue; } + }; + + let currently_connected = !self.connections.is_empty() | self.connected; + self.addresses.insert(addr.clone()); + self.connections.insert(id, addr); + self.connected = true; + if !currently_connected { + return Poll::Ready(Some(DiscoveryPeerEvent::Connected)); } - PeerConnectionEvents::ClosedConnection { connection_id } => { - self.connections.remove(&connection_id); - if self.connections.is_empty() { - self.connected = false; - return Poll::Ready(Some(DiscoveryPeerEvent::Disconnected)); - } - } - }, + } Poll::Ready(None) => unreachable!(), Poll::Pending => break, } @@ -705,30 +701,29 @@ impl Future for DiscoveryTask { loop { match self.connection_event.poll_next_unpin(cx) { - Poll::Ready(Some(event)) => match event { - ConnectionEvents::IncomingConnection { - peer_id, - connection_id, - addr, + Poll::Ready(Some(event)) => { + let (peer_id, id, addr) = match event { + ConnectionEvents::IncomingConnection { + peer_id, + connection_id, + addr, + } => (peer_id, connection_id, addr), + ConnectionEvents::OutgoingConnection { + peer_id, + connection_id, + addr, + } => (peer_id, connection_id, addr), + _ => continue, + }; + + if self.peers.contains_key(&peer_id) { + continue; } - | ConnectionEvents::OutgoingConnection { - peer_id, - connection_id, - addr, - } => { - if self.peers.contains_key(&peer_id) { - continue; - } - let task = DiscoveryPeerTask::new(&self.ipfs, peer_id) - .set_connection(connection_id, addr); + let task = DiscoveryPeerTask::new(&self.ipfs, peer_id).set_connection(id, addr); - self.peers.insert(peer_id, task); - } - ConnectionEvents::ClosedConnection { .. } => { - // Note: Since we are handling individual peers connection tracking, we can ignore this event - } - }, + self.peers.insert(peer_id, task); + } Poll::Ready(None) => unreachable!(), Poll::Pending => break, } From f1a601dfbc240f820ac8ac37a6536b2a60623836 Mon Sep 17 00:00:00 2001 From: Darius Date: Sun, 17 Nov 2024 23:31:10 -0600 Subject: [PATCH 06/26] chore: remove redundant call to poll --- extensions/warp-ipfs/src/store/discovery.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 84f446d49..d4922ba6a 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -746,7 +746,6 @@ impl Future for DiscoveryTask { } } - let _ = self.peers.poll_next_unpin(cx); self.waker = Some(cx.waker().clone()); Poll::Pending From ec01cc06b4152fe072d9b2800d3c699d793d5b3e Mon Sep 17 00:00:00 2001 From: Darius Date: Fri, 29 Nov 2024 06:55:18 -0500 Subject: [PATCH 07/26] chore: add connectionid to result --- extensions/warp-ipfs/src/store/discovery.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index d4922ba6a..89d479429 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -411,7 +411,7 @@ struct DiscoveryPeerTask { connections: HashMap, connected: bool, status: DiscoveryPeerStatus, - dialing_task: Option>>, + dialing_task: Option>>, is_connected_fut: Option>, waker: Option, } @@ -501,6 +501,7 @@ enum DiscoveryPeerEvent { impl Stream for DiscoveryPeerTask { type Item = DiscoveryPeerEvent; + #[tracing::instrument(name = "DiscoveryPeerTask::poll_next", skip(self), fields(peer_id = ?self.peer_id, connected = self.connected))] fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { if let Some(fut) = self.is_connected_fut.as_mut() { if let Poll::Ready(connected) = fut.poll_unpin(cx) { @@ -516,10 +517,15 @@ impl Stream for DiscoveryPeerTask { if let Some(fut) = self.dialing_task.as_mut() { if let Poll::Ready(result) = fut.poll_unpin(cx) { - if let Err(e) = result { - tracing::error!(error = %e, "dialing failed"); - } self.dialing_task.take(); + match result { + Ok(id) => { + tracing::info!(%id, "successful connection."); + } + Err(e) => { + tracing::error!(error = %e, "dialing failed"); + } + } } } From 98cb2f2b8a08621ca642bc2dff9209f154abc123 Mon Sep 17 00:00:00 2001 From: Darius Date: Fri, 29 Nov 2024 17:41:05 -0500 Subject: [PATCH 08/26] chore: supply keypair to discovery and send a ping request directly --- extensions/warp-ipfs/src/store/discovery.rs | 218 ++++++++++++++++++-- 1 file changed, 203 insertions(+), 15 deletions(-) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 89d479429..0babeaf37 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -1,31 +1,36 @@ use futures::{FutureExt, SinkExt, Stream, StreamExt}; use rust_ipfs::{ - libp2p::swarm::dial_opts::DialOpts, ConnectionEvents, Ipfs, Multiaddr, PeerConnectionEvents, - PeerId, + libp2p::swarm::dial_opts::DialOpts, ConnectionEvents, Ipfs, Keypair, Multiaddr, + PeerConnectionEvents, PeerId, }; use std::cmp::Ordering; use std::{collections::HashSet, fmt::Debug, hash::Hash}; +use crate::rt::{AbortableJoinHandle, Executor, LocalExecutor}; +use bytes::Bytes; use futures::channel::oneshot; use futures::future::BoxFuture; use futures::stream::BoxStream; +use futures_timeout::TimeoutExt; use indexmap::IndexSet; use pollable_map::stream::StreamMap; use rust_ipfs::libp2p::swarm::ConnectionId; +use rust_ipfs::p2p::MultiaddrExt; +use serde::{Deserialize, Serialize}; use std::collections::{HashMap, VecDeque}; use std::future::Future; use std::hash::Hasher; use std::pin::Pin; use std::task::{Context, Poll, Waker}; +use std::time::Duration; use tokio::sync::broadcast; - -use crate::rt::{AbortableJoinHandle, Executor, LocalExecutor}; +use tokio::time::Instant; use warp::{crypto::DID, error::Error}; -use crate::config::Discovery as DiscoveryConfig; - -use super::{DidExt, PeerIdExt, PeerType}; +use super::{protocols, DidExt, PeerIdExt, PeerType}; +use crate::config::{Discovery as DiscoveryConfig, DiscoveryType}; +use crate::store::payload::{PayloadBuilder, PayloadMessage}; //TODO: Deprecate for separate discovery service @@ -41,12 +46,18 @@ pub struct Discovery { } impl Discovery { - pub async fn new(ipfs: &Ipfs, config: &DiscoveryConfig, relays: &[Multiaddr]) -> Self { + pub async fn new( + ipfs: &Ipfs, + keypair: &Keypair, + config: &DiscoveryConfig, + relays: &[Multiaddr], + ) -> Self { let executor = LocalExecutor; let (command_tx, command_rx) = futures::channel::mpsc::channel(0); let (broadcast_tx, _) = broadcast::channel(2048); let task = DiscoveryTask::new( ipfs, + keypair, command_rx, broadcast_tx.clone(), config, @@ -406,6 +417,7 @@ enum DiscoveryPeerStatus { struct DiscoveryPeerTask { peer_id: PeerId, + local_keypair: Keypair, ipfs: Ipfs, addresses: HashSet, connections: HashMap, @@ -413,11 +425,12 @@ struct DiscoveryPeerTask { status: DiscoveryPeerStatus, dialing_task: Option>>, is_connected_fut: Option>, + ping_fut: Option>>, waker: Option, } impl DiscoveryPeerTask { - pub fn new(ipfs: &Ipfs, peer_id: PeerId) -> Self { + pub fn new(ipfs: &Ipfs, peer_id: PeerId, keypair: &Keypair) -> Self { let status = DiscoveryPeerStatus::Initial { fut: { let ipfs = ipfs.clone(); @@ -433,12 +446,14 @@ impl DiscoveryPeerTask { Self { peer_id, ipfs: ipfs.clone(), + local_keypair: keypair.clone(), addresses: HashSet::new(), connections: HashMap::new(), connected: false, status, is_connected_fut, dialing_task: None, + ping_fut: None, waker: None, } } @@ -492,6 +507,35 @@ impl DiscoveryPeerTask { waker.wake(); } } + + pub fn ping(&mut self) { + if self.ping_fut.is_some() { + return; + } + let ipfs = self.ipfs.clone(); + let peer_id = self.peer_id; + let keypair = self.local_keypair.clone(); + let fut = async move { + let pl = PayloadBuilder::new(&keypair, DiscoveryRequest::Ping).build()?; + let bytes = pl.to_bytes()?; + + let start = Instant::now(); + let response = ipfs + .send_request(peer_id, (protocols::DISCOVERY_PROTOCOL, bytes)) + .await?; + let end = start.elapsed(); + + let payload: PayloadMessage = PayloadMessage::from_bytes(&response)?; + + match payload.message() { + DiscoveryResponse::Pong => {} + _ => return Err(Error::Other), + } + Ok(end) + }; + + self.ping_fut = Some(Box::pin(fut)); + } } enum DiscoveryPeerEvent { @@ -529,6 +573,20 @@ impl Stream for DiscoveryPeerTask { } } + if let Some(fut) = self.ping_fut.as_mut() { + if let Poll::Ready(result) = fut.poll_unpin(cx) { + self.ping_fut.take(); + match result { + Ok(duration) => { + tracing::info!(duration = duration.as_millis(), peer_id = ?self.peer_id, "peer responded to ping"); + } + Err(e) => { + tracing::error!(error = %e, peer_id = ?self.peer_id, "pinging failed"); + } + } + } + } + loop { match self.status { DiscoveryPeerStatus::Initial { ref mut fut } => match fut.poll_unpin(cx) { @@ -582,6 +640,7 @@ impl Stream for DiscoveryPeerTask { #[allow(dead_code)] struct DiscoveryTask { ipfs: Ipfs, + keypair: Keypair, pending_broadcast: VecDeque, config: DiscoveryConfig, relays: Vec, @@ -589,7 +648,9 @@ struct DiscoveryTask { command_rx: futures::channel::mpsc::Receiver, connection_event: BoxStream<'static, ConnectionEvents>, - discovery_fut: Option<()>, + discovery_request_st: BoxStream<'static, (PeerId, Bytes, oneshot::Sender)>, + + discovery_fut: Option, Error>>>, broadcast_tx: broadcast::Sender, @@ -599,14 +660,21 @@ struct DiscoveryTask { impl DiscoveryTask { pub async fn new( ipfs: &Ipfs, + keypair: &Keypair, command_rx: futures::channel::mpsc::Receiver, broadcast_tx: broadcast::Sender, config: &DiscoveryConfig, relays: Vec, ) -> Self { let connection_event = ipfs.connection_events().await.expect("should not fail"); + let discovery_request_st = ipfs + .requests_subscribe(protocols::DISCOVERY_PROTOCOL) + .await + .expect("should not fail"); + Self { ipfs: ipfs.clone(), + keypair: keypair.clone(), pending_broadcast: VecDeque::new(), config: config.clone(), relays, @@ -614,24 +682,91 @@ impl DiscoveryTask { broadcast_tx, command_rx, connection_event, + discovery_request_st, discovery_fut: None, waker: None, } } } +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +enum DiscoveryRequest { + Ping, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +enum DiscoveryResponse { + Pong, + InvalidRequest, +} + impl DiscoveryTask { + pub fn discover_initial_peers(&mut self, ns: Option) { + match self.config { + DiscoveryConfig::Shuttle { .. } => {} + DiscoveryConfig::Namespace { + ref namespace, + ref discovery_type, + } => { + let namespace = namespace + .clone() + .or(ns) + .unwrap_or("satellite-warp".to_string()); + match discovery_type { + DiscoveryType::DHT => { + self.dht_discovery(namespace); + } + DiscoveryType::RzPoint { addresses } => { + let peers = addresses + .iter() + .filter_map(|addr| addr.clone().extract_peer_id()) + .collect::>(); + + if !peers.is_empty() { + let peer_id = peers.get_index(0).copied().expect("should not fail"); + self.rz_discovery(namespace, peer_id); + } + } + } + } + DiscoveryConfig::None => {} + } + } + #[allow(dead_code)] - pub fn dht_discovery(&self, namespace: String) { + pub fn dht_discovery(&mut self, namespace: String) { + if self.discovery_fut.is_some() { + return; + } let ipfs = self.ipfs.clone(); - let _fut = async move { + let fut = async move { let bytes = namespace.as_bytes(); let stream = ipfs.dht_get_providers(bytes.to_vec()).await?; // We collect instead of passing the stream through and polling there is to try to maintain compatibility in discovery // Note: This may change in the future where we would focus on a single discovery method - let peers = stream.collect::>().await; + let peers = stream.collect::>().await; + Ok::<_, Error>(Vec::from_iter(peers)) + }; + + self.discovery_fut.replace(Box::pin(fut)); + } + + #[allow(dead_code)] + pub fn rz_discovery(&mut self, namespace: String, rz_peer_id: PeerId) { + if self.discovery_fut.is_some() { + return; + } + let ipfs = self.ipfs.clone(); + let fut = async move { + let peers = ipfs + .rendezvous_namespace_discovery(namespace, None, rz_peer_id) + .await?; + let peers = peers.keys().copied().collect::>(); Ok::<_, Error>(peers) }; + self.discovery_fut.replace(Box::pin(fut)); } } @@ -654,7 +789,7 @@ impl Future for DiscoveryTask { continue; } - let mut task = DiscoveryPeerTask::new(&self.ipfs, peer_id); + let mut task = DiscoveryPeerTask::new(&self.ipfs, peer_id, &self.keypair); if !self.relays.is_empty() { task = task.set_addresses(self.relays.clone()); task.dial(); @@ -705,6 +840,58 @@ impl Future for DiscoveryTask { } } + while let Poll::Ready(Some((peer_id, request, response))) = + self.discovery_request_st.poll_next_unpin(cx) + { + let Ok(payload) = PayloadMessage::from_bytes(&request) else { + let pl = PayloadBuilder::new(&self.keypair, DiscoveryResponse::InvalidRequest) + .build() + .expect("valid payload"); + let bytes = pl.to_bytes().expect("valid payload"); + _ = response.send(bytes); + continue; + }; + + if peer_id.ne(payload.sender()) { + // TODO: When adding cosigner into the payload, we will check both fields before not only rejecting the connection + // + continue; + } + + let bytes = match payload.message() { + DiscoveryRequest::Ping => { + let pl = PayloadBuilder::new(&self.keypair, DiscoveryResponse::Pong) + .build() + .expect("valid payload"); + pl.to_bytes().expect("valid payload") + } + }; + + _ = response.send(bytes); + } + + if let Some(fut) = self.discovery_fut.as_mut() { + if let Poll::Ready(result) = fut.poll_unpin(cx) { + self.discovery_fut.take(); + match result { + Ok(peers) => { + for peer_id in peers { + if self.peers.contains_key(&peer_id) { + continue; + } + + let task = DiscoveryPeerTask::new(&self.ipfs, peer_id, &self.keypair); + + self.peers.insert(peer_id, task); + } + } + Err(e) => { + tracing::error!(error = %e, "error discovering peers in given namespace"); + } + } + } + } + loop { match self.connection_event.poll_next_unpin(cx) { Poll::Ready(Some(event)) => { @@ -726,7 +913,8 @@ impl Future for DiscoveryTask { continue; } - let task = DiscoveryPeerTask::new(&self.ipfs, peer_id).set_connection(id, addr); + let task = DiscoveryPeerTask::new(&self.ipfs, peer_id, &self.keypair) + .set_connection(id, addr); self.peers.insert(peer_id, task); } From e0d791eb2df48c7e4dda5b2d44293796d149805e Mon Sep 17 00:00:00 2001 From: Darius Date: Sat, 30 Nov 2024 01:48:10 -0500 Subject: [PATCH 09/26] chore: cleanup and ping peer --- extensions/warp-ipfs/src/lib.rs | 9 +++- extensions/warp-ipfs/src/store/discovery.rs | 56 +++++++++++++-------- 2 files changed, 42 insertions(+), 23 deletions(-) diff --git a/extensions/warp-ipfs/src/lib.rs b/extensions/warp-ipfs/src/lib.rs index d64fda436..47b429ce2 100644 --- a/extensions/warp-ipfs/src/lib.rs +++ b/extensions/warp-ipfs/src/lib.rs @@ -618,8 +618,13 @@ impl WarpIpfs { } } - let discovery = - Discovery::new(&ipfs, &self.inner.config.store_setting().discovery, &relays).await; + let discovery = Discovery::new( + &ipfs, + &keypair, + &self.inner.config.store_setting().discovery, + &relays, + ) + .await; let phonebook = PhoneBook::new(discovery.clone(), pb_tx); diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 0babeaf37..7b2255bf8 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -11,7 +11,7 @@ use bytes::Bytes; use futures::channel::oneshot; use futures::future::BoxFuture; use futures::stream::BoxStream; -use futures_timeout::TimeoutExt; +use futures_timer::Delay; use indexmap::IndexSet; use pollable_map::stream::StreamMap; use rust_ipfs::libp2p::swarm::ConnectionId; @@ -261,7 +261,7 @@ impl Discovery { PeerType::DID(did_key) => did_key.to_peer_id()?, }; - let (tx, rx) = futures::channel::oneshot::channel(); + let (tx, rx) = oneshot::channel(); let _ = self .command_tx .clone() @@ -280,7 +280,7 @@ impl Discovery { PeerType::DID(did_key) => did_key.to_peer_id()?, }; - let (tx, rx) = futures::channel::oneshot::channel(); + let (tx, rx) = oneshot::channel(); let _ = self .command_tx .clone() @@ -299,7 +299,7 @@ impl Discovery { PeerType::DID(did_key) => did_key.to_peer_id()?, }; - let (tx, rx) = futures::channel::oneshot::channel(); + let (tx, rx) = oneshot::channel(); let _ = self .command_tx .clone() @@ -323,7 +323,7 @@ impl Discovery { } }; - let (tx, rx) = futures::channel::oneshot::channel(); + let (tx, rx) = oneshot::channel(); let _ = self .command_tx .clone() @@ -337,7 +337,7 @@ impl Discovery { } pub async fn list(&self) -> HashSet { - let (tx, rx) = futures::channel::oneshot::channel(); + let (tx, rx) = oneshot::channel(); let _ = self .command_tx .clone() @@ -426,6 +426,7 @@ struct DiscoveryPeerTask { dialing_task: Option>>, is_connected_fut: Option>, ping_fut: Option>>, + ping_timer: Option, waker: Option, } @@ -454,6 +455,7 @@ impl DiscoveryPeerTask { is_connected_fut, dialing_task: None, ping_fut: None, + ping_timer: Some(Delay::new(Duration::from_secs(5))), waker: None, } } @@ -573,20 +575,6 @@ impl Stream for DiscoveryPeerTask { } } - if let Some(fut) = self.ping_fut.as_mut() { - if let Poll::Ready(result) = fut.poll_unpin(cx) { - self.ping_fut.take(); - match result { - Ok(duration) => { - tracing::info!(duration = duration.as_millis(), peer_id = ?self.peer_id, "peer responded to ping"); - } - Err(e) => { - tracing::error!(error = %e, peer_id = ?self.peer_id, "pinging failed"); - } - } - } - } - loop { match self.status { DiscoveryPeerStatus::Initial { ref mut fut } => match fut.poll_unpin(cx) { @@ -632,6 +620,31 @@ impl Stream for DiscoveryPeerTask { } } } + + if let Some(timer) = self.ping_timer.as_mut() { + if timer.poll_unpin(cx).is_ready() { + self.ping_timer.take(); + self.ping(); + } + } + + if let Some(fut) = self.ping_fut.as_mut() { + if let Poll::Ready(result) = fut.poll_unpin(cx) { + self.ping_fut.take(); + match result { + Ok(duration) => { + tracing::info!(duration = duration.as_millis(), peer_id = ?self.peer_id, "peer responded to ping"); + self.ping_timer = Some(Delay::new(Duration::from_secs(30))); + } + Err(e) => { + // TODO: probably close connection + self.ping_timer = Some(Delay::new(Duration::from_secs(60))); + tracing::error!(error = %e, peer_id = ?self.peer_id, "pinging failed"); + } + } + } + } + self.waker = Some(cx.waker().clone()); Poll::Pending } @@ -703,6 +716,7 @@ enum DiscoveryResponse { } impl DiscoveryTask { + #[allow(dead_code)] pub fn discover_initial_peers(&mut self, ns: Option) { match self.config { DiscoveryConfig::Shuttle { .. } => {} @@ -866,7 +880,7 @@ impl Future for DiscoveryTask { pl.to_bytes().expect("valid payload") } }; - + tracing::error!("ping sent to {peer_id}"); _ = response.send(bytes); } From c37d4f2e77157d0ecabfcc7cb9a98c534fbba66b Mon Sep 17 00:00:00 2001 From: Darius Date: Sat, 30 Nov 2024 15:08:38 -0500 Subject: [PATCH 10/26] chore: confirm connection by sending ping --- extensions/warp-ipfs/src/store/discovery.rs | 41 ++++++++++++++++----- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 7b2255bf8..293330ff6 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -427,6 +427,7 @@ struct DiscoveryPeerTask { is_connected_fut: Option>, ping_fut: Option>>, ping_timer: Option, + confirmed: bool, waker: Option, } @@ -453,9 +454,10 @@ impl DiscoveryPeerTask { connected: false, status, is_connected_fut, + confirmed: false, dialing_task: None, ping_fut: None, - ping_timer: Some(Delay::new(Duration::from_secs(5))), + ping_timer: None, waker: None, } } @@ -463,6 +465,9 @@ impl DiscoveryPeerTask { pub fn set_connection(mut self, connection_id: ConnectionId, addr: Multiaddr) -> Self { self.addresses.insert(addr.clone()); self.connections.insert(connection_id, addr); + if self.ping_fut.is_none() && self.ping_timer.is_none() { + self.ping(); + } self } @@ -542,6 +547,7 @@ impl DiscoveryPeerTask { enum DiscoveryPeerEvent { Connected, + Confirmed, Disconnected, } @@ -554,7 +560,10 @@ impl Stream for DiscoveryPeerTask { self.connected = connected; self.is_connected_fut.take(); let event = match connected { - true => DiscoveryPeerEvent::Connected, + true => { + self.ping(); + DiscoveryPeerEvent::Connected + } false => DiscoveryPeerEvent::Disconnected, }; return Poll::Ready(Some(event)); @@ -600,6 +609,8 @@ impl Stream for DiscoveryPeerTask { self.connections.remove(&connection_id); if self.connections.is_empty() { self.connected = false; + self.ping_fut.take(); + self.ping_timer.take(); return Poll::Ready(Some(DiscoveryPeerEvent::Disconnected)); } continue; @@ -611,6 +622,7 @@ impl Stream for DiscoveryPeerTask { self.connections.insert(id, addr); self.connected = true; if !currently_connected { + self.ping(); return Poll::Ready(Some(DiscoveryPeerEvent::Connected)); } } @@ -635,6 +647,10 @@ impl Stream for DiscoveryPeerTask { Ok(duration) => { tracing::info!(duration = duration.as_millis(), peer_id = ?self.peer_id, "peer responded to ping"); self.ping_timer = Some(Delay::new(Duration::from_secs(30))); + if !self.confirmed { + self.confirmed = true; + return Poll::Ready(Some(DiscoveryPeerEvent::Confirmed)); + } } Err(e) => { // TODO: probably close connection @@ -787,14 +803,13 @@ impl DiscoveryTask { impl Future for DiscoveryTask { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - loop { - if let Some(peer_id) = self.pending_broadcast.pop_front() { - if let Ok(did) = peer_id.to_did() { - let _ = self.broadcast_tx.send(did); - } - continue; + while let Some(peer_id) = self.pending_broadcast.pop_front() { + if let Ok(did) = peer_id.to_did() { + let _ = self.broadcast_tx.send(did); } + } + loop { match self.command_rx.poll_next_unpin(cx) { Poll::Ready(Some(command)) => match command { DiscoveryCommand::Insert { peer_id, response } => { @@ -880,7 +895,6 @@ impl Future for DiscoveryTask { pl.to_bytes().expect("valid payload") } }; - tracing::error!("ping sent to {peer_id}"); _ = response.send(bytes); } @@ -940,7 +954,14 @@ impl Future for DiscoveryTask { loop { match self.peers.poll_next_unpin(cx) { Poll::Ready(Some((peer_id, event))) => match event { - DiscoveryPeerEvent::Connected => { + DiscoveryPeerEvent::Connected => {} + DiscoveryPeerEvent::Confirmed => { + // Since the peer is confirmed, we can send the broadcast out for the initial identity request + let peer_task = self.peers.get(&peer_id).expect("peer apart of task"); + if !peer_task.is_connected() { + // note: peer state should be connected if it is confirmed. We could probably assert here + continue; + } if !self.pending_broadcast.contains(&peer_id) { self.pending_broadcast.push_back(peer_id); } From 45371444feacf7c60d9657b5a9a15310250247ce Mon Sep 17 00:00:00 2001 From: Darius Date: Sat, 30 Nov 2024 15:51:39 -0500 Subject: [PATCH 11/26] chore: remove connection check --- extensions/warp-ipfs/src/store/identity.rs | 8 -------- extensions/warp-ipfs/tests/common.rs | 2 +- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/extensions/warp-ipfs/src/store/identity.rs b/extensions/warp-ipfs/src/store/identity.rs index 217d4a9a2..d84e30bef 100644 --- a/extensions/warp-ipfs/src/store/identity.rs +++ b/extensions/warp-ipfs/src/store/identity.rs @@ -946,10 +946,6 @@ impl IdentityStore { pub async fn request(&self, out_did: &DID, option: RequestOption) -> Result<(), Error> { let out_peer_id = out_did.to_peer_id()?; - if !self.ipfs.is_connected(out_peer_id).await? { - return Err(Error::IdentityDoesntExist); - } - let pk_did = self.root_document.keypair(); let event = IdentityEvent::Request { option }; @@ -980,10 +976,6 @@ impl IdentityStore { pub async fn push(&self, out_did: &DID) -> Result<(), Error> { let out_peer_id = out_did.to_peer_id()?; - if !self.ipfs.is_connected(out_peer_id).await? { - return Err(Error::IdentityDoesntExist); - } - let pk_did = self.root_document.keypair(); let mut identity = self.own_identity_document().await?; diff --git a/extensions/warp-ipfs/tests/common.rs b/extensions/warp-ipfs/tests/common.rs index 270fcaebb..d07442eac 100644 --- a/extensions/warp-ipfs/tests/common.rs +++ b/extensions/warp-ipfs/tests/common.rs @@ -65,7 +65,7 @@ pub async fn create_account( config.ipfs_setting_mut().relay_client.relay_address = vec![]; config.ipfs_setting_mut().mdns.enable = false; config.store_setting_mut().announce_to_mesh = true; - config.store_setting_mut().auto_push = Some(Duration::from_secs(1)); + config.store_setting_mut().auto_push = Some(Duration::from_secs(5)); *config.bootstrap_mut() = Bootstrap::None; From e37a3327e12d367d0a36561ad53700232bd129a3 Mon Sep 17 00:00:00 2001 From: Darius Date: Sat, 30 Nov 2024 16:23:19 -0500 Subject: [PATCH 12/26] chore: linting --- extensions/warp-ipfs/src/store/discovery.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 293330ff6..571545fd7 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -75,8 +75,8 @@ impl Discovery { } } - /// Start discovery task - /// Note: This starting will only work across a provided namespace + // Start discovery task + // Note: This starting will only work across a provided namespace // pub async fn start(&self) -> Result<(), Error> { // match &self.config { // DiscoveryConfig::Namespace { From e73e9e40916cb20b4492d75267a533026a17d815 Mon Sep 17 00:00:00 2001 From: Darius Date: Sat, 30 Nov 2024 16:50:29 -0500 Subject: [PATCH 13/26] chore: remove commented code --- extensions/warp-ipfs/src/store/discovery.rs | 171 -------------------- 1 file changed, 171 deletions(-) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 571545fd7..f5e194a06 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -75,177 +75,6 @@ impl Discovery { } } - // Start discovery task - // Note: This starting will only work across a provided namespace - // pub async fn start(&self) -> Result<(), Error> { - // match &self.config { - // DiscoveryConfig::Namespace { - // discovery_type: DiscoveryType::DHT, - // namespace, - // } => { - // let namespace = namespace.clone().unwrap_or_else(|| "warp-mp-ipfs".into()); - // let cid = self.ipfs.put_dag(format!("discovery:{namespace}")).await?; - // - // let task = self.executor.spawn_abortable({ - // let discovery = self.clone(); - // async move { - // let mut cached = HashSet::new(); - // - // if let Err(e) = discovery.ipfs.provide(cid).await { - // //Maybe panic? - // tracing::error!("Error providing key: {e}"); - // return; - // } - // - // loop { - // if let Ok(mut stream) = discovery.ipfs.get_providers(cid).await { - // while let Some(peer_id) = stream.next().await { - // if discovery - // .ipfs - // .is_connected(peer_id) - // .await - // .unwrap_or_default() - // && cached.insert(peer_id) - // && !discovery.contains(peer_id).await - // { - // let entry = DiscoveryEntry::new( - // &discovery.ipfs, - // peer_id, - // discovery.config.clone(), - // discovery.events.clone(), - // discovery.relays.clone(), - // ) - // .await; - // if discovery.entries.write().await.insert(entry.clone()) { - // entry.start().await; - // } - // } - // } - // } - // futures_timer::Delay::new(Duration::from_secs(1)).await; - // } - // } - // }); - // - // *self.task.write().await = Some(task); - // } - // DiscoveryConfig::Namespace { - // discovery_type: DiscoveryType::RzPoint { addresses }, - // namespace, - // } => { - // let mut peers = vec![]; - // for mut addr in addresses.iter().cloned() { - // let Some(peer_id) = addr.extract_peer_id() else { - // continue; - // }; - // - // if let Err(e) = self.ipfs.add_peer((peer_id, addr)).await { - // tracing::error!("Error adding peer to address book {e}"); - // continue; - // } - // - // peers.push(peer_id); - // } - // - // let namespace = namespace.clone().unwrap_or_else(|| "warp-mp-ipfs".into()); - // let mut register_id = vec![]; - // - // for peer_id in &peers { - // if let Err(e) = self - // .ipfs - // .rendezvous_register_namespace(namespace.clone(), None, *peer_id) - // .await - // { - // tracing::error!("Error registering to namespace: {e}"); - // continue; - // } - // - // register_id.push(*peer_id); - // } - // - // if register_id.is_empty() { - // return Err(Error::OtherWithContext( - // "Unable to register to any external nodes".into(), - // )); - // } - // - // let task = self.executor.spawn_abortable({ - // let discovery = self.clone(); - // let register_id = register_id; - // async move { - // let mut meshed_map: HashMap> = - // HashMap::new(); - // - // loop { - // for peer_id in ®ister_id { - // let map = match discovery - // .ipfs - // .rendezvous_namespace_discovery( - // namespace.clone(), - // None, - // *peer_id, - // ) - // .await - // { - // Ok(map) => map, - // Err(e) => { - // tracing::error!(namespace = %namespace, error = %e, "failed to perform discovery over given namespace"); - // continue; - // } - // }; - // - // for (peer_id, addrs) in map { - // match meshed_map.entry(peer_id) { - // Entry::Occupied(mut entry) => { - // entry.get_mut().extend(addrs); - // } - // Entry::Vacant(entry) => { - // entry.insert(HashSet::from_iter( - // addrs.iter().cloned(), - // )); - // if !discovery - // .ipfs - // .is_connected(peer_id) - // .await - // .unwrap_or_default() - // && discovery.ipfs.connect(peer_id).await.is_ok() - // && !discovery.contains(peer_id).await - // { - // let entry = DiscoveryEntry::new( - // &discovery.ipfs, - // peer_id, - // discovery.config.clone(), - // discovery.events.clone(), - // discovery.relays.clone(), - // ) - // .await; - // - // if discovery - // .entries - // .write() - // .await - // .insert(entry.clone()) - // { - // entry.start().await; - // } - // } - // } - // } - // } - // } - // futures_timer::Delay::new(Duration::from_secs(5)).await; - // } - // } - // }); - // - // *self.task.write().await = Some(task); - // } - // DiscoveryConfig::Shuttle { addresses: _ } => {} - // _ => {} - // } - // Ok(()) - // } - pub fn events(&self) -> broadcast::Receiver { self.broadcast_tx.subscribe() } From 8f2edf64bd08c3e96e5db74e3d2349fe60e6468b Mon Sep 17 00:00:00 2001 From: Darius Date: Sun, 1 Dec 2024 07:33:16 -0500 Subject: [PATCH 14/26] chore: update logging --- extensions/warp-ipfs/src/store/identity.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions/warp-ipfs/src/store/identity.rs b/extensions/warp-ipfs/src/store/identity.rs index d84e30bef..aadca00a5 100644 --- a/extensions/warp-ipfs/src/store/identity.rs +++ b/extensions/warp-ipfs/src/store/identity.rs @@ -635,10 +635,10 @@ impl IdentityStore { // Used as the initial request/push Ok(push) = discovery_rx.recv() => { if let Err(e) = store.request(&push, RequestOption::Identity).await { - tracing::error!("Error requesting identity: {e}"); + tracing::error!(error = %e, "error requesting identity"); } if let Err(e) = store.push(&push).await { - tracing::error!("Error pushing identity: {e}"); + tracing::error!(error = %e, "error pushing identity"); } } _ = &mut tick => { From 2c71d00067d1f5d06f8488f1ffc6df623005bdb0 Mon Sep 17 00:00:00 2001 From: Darius Date: Sun, 1 Dec 2024 07:49:13 -0500 Subject: [PATCH 15/26] chore: misc --- extensions/warp-ipfs/src/store/discovery.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index f5e194a06..8148aae7d 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -32,9 +32,6 @@ use super::{protocols, DidExt, PeerIdExt, PeerType}; use crate::config::{Discovery as DiscoveryConfig, DiscoveryType}; use crate::store::payload::{PayloadBuilder, PayloadMessage}; -//TODO: Deprecate for separate discovery service - -#[allow(dead_code)] #[derive(Clone)] pub struct Discovery { ipfs: Ipfs, @@ -482,7 +479,7 @@ impl Stream for DiscoveryPeerTask { } } Err(e) => { - // TODO: probably close connection + // TODO: probably close connection? self.ping_timer = Some(Delay::new(Duration::from_secs(60))); tracing::error!(error = %e, peer_id = ?self.peer_id, "pinging failed"); } @@ -562,7 +559,7 @@ enum DiscoveryResponse { impl DiscoveryTask { #[allow(dead_code)] - pub fn discover_initial_peers(&mut self, ns: Option) { + pub fn discovery(&mut self, ns: Option) { match self.config { DiscoveryConfig::Shuttle { .. } => {} DiscoveryConfig::Namespace { @@ -584,6 +581,7 @@ impl DiscoveryTask { .collect::>(); if !peers.is_empty() { + // We will use the first instance instead of the whole set for now let peer_id = peers.get_index(0).copied().expect("should not fail"); self.rz_discovery(namespace, peer_id); } @@ -594,7 +592,6 @@ impl DiscoveryTask { } } - #[allow(dead_code)] pub fn dht_discovery(&mut self, namespace: String) { if self.discovery_fut.is_some() { return; @@ -604,7 +601,7 @@ impl DiscoveryTask { let bytes = namespace.as_bytes(); let stream = ipfs.dht_get_providers(bytes.to_vec()).await?; // We collect instead of passing the stream through and polling there is to try to maintain compatibility in discovery - // Note: This may change in the future where we would focus on a single discovery method + // Note: This may change in the future where we would focus on a single discovery method (ie shuttle) let peers = stream.collect::>().await; Ok::<_, Error>(Vec::from_iter(peers)) }; @@ -612,7 +609,6 @@ impl DiscoveryTask { self.discovery_fut.replace(Box::pin(fut)); } - #[allow(dead_code)] pub fn rz_discovery(&mut self, namespace: String, rz_peer_id: PeerId) { if self.discovery_fut.is_some() { return; From d552edf04d001e4a23cf2825d26cf0ad5b7a204a Mon Sep 17 00:00:00 2001 From: Darius Date: Sun, 1 Dec 2024 08:02:06 -0500 Subject: [PATCH 16/26] chore: remove unused --- extensions/warp-ipfs/src/store/discovery.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 8148aae7d..2b72de123 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -34,12 +34,10 @@ use crate::store::payload::{PayloadBuilder, PayloadMessage}; #[derive(Clone)] pub struct Discovery { - ipfs: Ipfs, config: DiscoveryConfig, command_tx: futures::channel::mpsc::Sender, broadcast_tx: broadcast::Sender, - handle: AbortableJoinHandle<()>, - executor: LocalExecutor, + _handle: AbortableJoinHandle<()>, } impl Discovery { @@ -61,14 +59,12 @@ impl Discovery { relays.to_vec(), ) .await; - let handle = executor.spawn_abortable(task); + let _handle = executor.spawn_abortable(task); Self { command_tx, - ipfs: ipfs.clone(), config: config.clone(), broadcast_tx, - handle, - executor, + _handle, } } From abe126d43b47db548cfb36d55e7421e8f5c35572 Mon Sep 17 00:00:00 2001 From: Darius Date: Sun, 1 Dec 2024 16:24:55 -0500 Subject: [PATCH 17/26] chore: comment and register in rz namespace --- extensions/warp-ipfs/src/store/discovery.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 2b72de123..5a856d3a1 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -609,10 +609,15 @@ impl DiscoveryTask { if self.discovery_fut.is_some() { return; } + + // TODO: show that we are registered so we dont repeat multiple registration to the namespace when discovering peers + let ipfs = self.ipfs.clone(); let fut = async move { + ipfs.rendezvous_register_namespace(&namespace, None, rz_peer_id) + .await?; let peers = ipfs - .rendezvous_namespace_discovery(namespace, None, rz_peer_id) + .rendezvous_namespace_discovery(&namespace, None, rz_peer_id) .await?; let peers = peers.keys().copied().collect::>(); Ok::<_, Error>(peers) From db3186ea3890719f4fe284d401bf8d49ed2e7c16 Mon Sep 17 00:00:00 2001 From: Darius Date: Sun, 1 Dec 2024 16:32:17 -0500 Subject: [PATCH 18/26] chore: use discovery function --- extensions/warp-ipfs/src/store/discovery.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 5a856d3a1..3227e5d79 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -50,7 +50,7 @@ impl Discovery { let executor = LocalExecutor; let (command_tx, command_rx) = futures::channel::mpsc::channel(0); let (broadcast_tx, _) = broadcast::channel(2048); - let task = DiscoveryTask::new( + let mut task = DiscoveryTask::new( ipfs, keypair, command_rx, @@ -59,6 +59,9 @@ impl Discovery { relays.to_vec(), ) .await; + + task.discovery(None); + let _handle = executor.spawn_abortable(task); Self { command_tx, @@ -611,7 +614,6 @@ impl DiscoveryTask { } // TODO: show that we are registered so we dont repeat multiple registration to the namespace when discovering peers - let ipfs = self.ipfs.clone(); let fut = async move { ipfs.rendezvous_register_namespace(&namespace, None, rz_peer_id) From f652b912d34aad77294f42d810900af3260791ee Mon Sep 17 00:00:00 2001 From: Darius Date: Sun, 1 Dec 2024 23:44:52 -0500 Subject: [PATCH 19/26] chore: add additional discovery logic --- extensions/warp-ipfs/src/store/discovery.rs | 93 +++++++++++++++++++-- 1 file changed, 88 insertions(+), 5 deletions(-) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 3227e5d79..f37e5c8b6 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -60,7 +60,7 @@ impl Discovery { ) .await; - task.discovery(None); + task.publish_discovery(); let _handle = executor.spawn_abortable(task); Self { @@ -508,6 +508,10 @@ struct DiscoveryTask { broadcast_tx: broadcast::Sender, + discovery_publish_confirmed: bool, + + discovery_publish_fut: Option>, + waker: Option, } @@ -537,6 +541,8 @@ impl DiscoveryTask { command_rx, connection_event, discovery_request_st, + discovery_publish_confirmed: false, + discovery_publish_fut: None, discovery_fut: None, waker: None, } @@ -556,6 +562,12 @@ enum DiscoveryResponse { InvalidRequest, } +enum DiscoveryPublish { + Dht, + Rz, + None, +} + impl DiscoveryTask { #[allow(dead_code)] pub fn discovery(&mut self, ns: Option) { @@ -591,10 +603,68 @@ impl DiscoveryTask { } } + pub fn publish_discovery(&mut self) { + let ipfs = self.ipfs.clone(); + let fut = match self.config { + DiscoveryConfig::Shuttle { .. } => { + futures::future::ready(DiscoveryPublish::None).boxed() + } + DiscoveryConfig::Namespace { + ref namespace, + ref discovery_type, + } => { + let namespace = namespace.clone().unwrap_or("satellite-warp".to_string()); + match discovery_type { + DiscoveryType::DHT => async move { + if let Err(e) = ipfs.dht_provide(namespace.as_bytes().to_vec()).await { + tracing::error!(error = %e, "cannot provide {namespace}"); + return DiscoveryPublish::None; + } + DiscoveryPublish::Dht + } + .boxed(), + DiscoveryType::RzPoint { addresses } => { + let peers = addresses + .iter() + .filter_map(|addr| addr.clone().extract_peer_id()) + .collect::>(); + + if peers.is_empty() { + return; + } + + // We will use the first instance instead of the whole set for now + let peer_id = peers.get_index(0).copied().expect("should not fail"); + + async move { + if let Err(e) = ipfs + .rendezvous_register_namespace(&namespace, None, peer_id) + .await + { + tracing::error!(error = %e, "cannot provide {namespace}"); + return DiscoveryPublish::None; + } + DiscoveryPublish::Rz + } + .boxed() + } + } + } + DiscoveryConfig::None => futures::future::ready(DiscoveryPublish::None).boxed(), + }; + + self.discovery_publish_fut = Some(fut); + } + pub fn dht_discovery(&mut self, namespace: String) { if self.discovery_fut.is_some() { return; } + + if !self.discovery_publish_confirmed { + return; + } + let ipfs = self.ipfs.clone(); let fut = async move { let bytes = namespace.as_bytes(); @@ -613,11 +683,12 @@ impl DiscoveryTask { return; } - // TODO: show that we are registered so we dont repeat multiple registration to the namespace when discovering peers + if !self.discovery_publish_confirmed { + return; + } + let ipfs = self.ipfs.clone(); let fut = async move { - ipfs.rendezvous_register_namespace(&namespace, None, rz_peer_id) - .await?; let peers = ipfs .rendezvous_namespace_discovery(&namespace, None, rz_peer_id) .await?; @@ -697,6 +768,15 @@ impl Future for DiscoveryTask { } } + if let Some(fut) = self.discovery_publish_fut.as_mut() { + if let Poll::Ready(discovery_publish_type) = fut.poll_unpin(cx) { + self.discovery_publish_fut.take(); + self.discovery_publish_confirmed = + !matches!(discovery_publish_type, DiscoveryPublish::None); + self.discovery(None); + } + } + while let Poll::Ready(Some((peer_id, request, response))) = self.discovery_request_st.poll_next_unpin(cx) { @@ -723,7 +803,10 @@ impl Future for DiscoveryTask { pl.to_bytes().expect("valid payload") } }; - _ = response.send(bytes); + + if response.send(bytes).is_err() { + tracing::warn!(%peer_id, "unable to respond to peer due to request being dropped."); + } } if let Some(fut) = self.discovery_fut.as_mut() { From 399dc35215d359648edb12ff99f6f62b056524c8 Mon Sep 17 00:00:00 2001 From: Darius Date: Mon, 2 Dec 2024 00:17:40 -0500 Subject: [PATCH 20/26] chore: respond for invalid peer --- extensions/warp-ipfs/src/store/discovery.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index f37e5c8b6..3935005f9 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -791,7 +791,11 @@ impl Future for DiscoveryTask { if peer_id.ne(payload.sender()) { // TODO: When adding cosigner into the payload, we will check both fields before not only rejecting the connection - // + let pl = PayloadBuilder::new(&self.keypair, DiscoveryResponse::InvalidRequest) + .build() + .expect("valid payload"); + let bytes = pl.to_bytes().expect("valid payload"); + _ = response.send(bytes); continue; } From f19c8a91f22d7d5468bf6661651f48c30a3e64d1 Mon Sep 17 00:00:00 2001 From: Darius Date: Mon, 2 Dec 2024 09:36:23 -0500 Subject: [PATCH 21/26] chore: misc update --- extensions/warp-ipfs/src/store/discovery.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 3935005f9..113941188 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -591,11 +591,13 @@ impl DiscoveryTask { .filter_map(|addr| addr.clone().extract_peer_id()) .collect::>(); - if !peers.is_empty() { + if peers.is_empty() { // We will use the first instance instead of the whole set for now - let peer_id = peers.get_index(0).copied().expect("should not fail"); - self.rz_discovery(namespace, peer_id); + return; } + + let peer_id = peers.get_index(0).copied().expect("should not fail"); + self.rz_discovery(namespace, peer_id); } } } @@ -625,8 +627,9 @@ impl DiscoveryTask { .boxed(), DiscoveryType::RzPoint { addresses } => { let peers = addresses - .iter() - .filter_map(|addr| addr.clone().extract_peer_id()) + .clone() + .into_iter() + .filter_map(|mut addr| addr.extract_peer_id()) .collect::>(); if peers.is_empty() { From 82fce4fb13b55ce776abe078f39e3832c9171e41 Mon Sep 17 00:00:00 2001 From: Darius Date: Mon, 2 Dec 2024 09:37:55 -0500 Subject: [PATCH 22/26] chore: announce to mesh by default --- extensions/warp-ipfs/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/warp-ipfs/src/config.rs b/extensions/warp-ipfs/src/config.rs index 827652675..6d79e4cc0 100644 --- a/extensions/warp-ipfs/src/config.rs +++ b/extensions/warp-ipfs/src/config.rs @@ -158,7 +158,7 @@ impl Default for StoreSetting { disable_images: false, with_friends: false, default_profile_picture: None, - announce_to_mesh: false, + announce_to_mesh: true, } } } From ea87613579db3fb550e04dd66d9902332b0f40cb Mon Sep 17 00:00:00 2001 From: Darius Date: Fri, 6 Dec 2024 19:11:03 -0500 Subject: [PATCH 23/26] chore: add future to process responses --- extensions/warp-ipfs/src/store/discovery.rs | 28 ++++++++++++++++----- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 113941188..6ed87f2eb 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -1,4 +1,6 @@ use futures::{FutureExt, SinkExt, Stream, StreamExt}; +use pollable_map::futures::FutureMap; +use rust_ipfs::libp2p::request_response::InboundRequestId; use rust_ipfs::{ libp2p::swarm::dial_opts::DialOpts, ConnectionEvents, Ipfs, Keypair, Multiaddr, PeerConnectionEvents, PeerId, @@ -502,7 +504,9 @@ struct DiscoveryTask { command_rx: futures::channel::mpsc::Receiver, connection_event: BoxStream<'static, ConnectionEvents>, - discovery_request_st: BoxStream<'static, (PeerId, Bytes, oneshot::Sender)>, + discovery_request_st: BoxStream<'static, (PeerId, InboundRequestId, Bytes)>, + + responsing_fut: FutureMap<(PeerId, InboundRequestId), BoxFuture<'static, Result<(), anyhow::Error>>>, discovery_fut: Option, Error>>>, @@ -537,6 +541,7 @@ impl DiscoveryTask { config: config.clone(), relays, peers: StreamMap::new(), + responsing_fut: FutureMap::new(), broadcast_tx, command_rx, connection_event, @@ -780,15 +785,18 @@ impl Future for DiscoveryTask { } } - while let Poll::Ready(Some((peer_id, request, response))) = + while let Poll::Ready(Some((peer_id, id, request))) = self.discovery_request_st.poll_next_unpin(cx) { + let ipfs = self.ipfs.clone(); let Ok(payload) = PayloadMessage::from_bytes(&request) else { let pl = PayloadBuilder::new(&self.keypair, DiscoveryResponse::InvalidRequest) .build() .expect("valid payload"); let bytes = pl.to_bytes().expect("valid payload"); - _ = response.send(bytes); + self.responsing_fut.insert((peer_id, id), async move { + ipfs.send_response(peer_id, id, bytes).await + }.boxed()); continue; }; @@ -798,7 +806,9 @@ impl Future for DiscoveryTask { .build() .expect("valid payload"); let bytes = pl.to_bytes().expect("valid payload"); - _ = response.send(bytes); + self.responsing_fut.insert((peer_id, id), async move { + ipfs.send_response(peer_id, id, bytes).await + }.boxed()); continue; } @@ -811,8 +821,14 @@ impl Future for DiscoveryTask { } }; - if response.send(bytes).is_err() { - tracing::warn!(%peer_id, "unable to respond to peer due to request being dropped."); + self.responsing_fut.insert((peer_id, id), async move { + ipfs.send_response(peer_id, id, bytes).await + }.boxed()); + } + + while let Poll::Ready(Some(((peer_id, id), result))) = self.responsing_fut.poll_next_unpin(cx) { + if let Err(e) = result { + tracing::error!(%peer_id, error = %e, %id, "unable to respond to peer request"); } } From 659957c4e43486638f0c5b560fccace813db0022 Mon Sep 17 00:00:00 2001 From: Darius Date: Sat, 7 Dec 2024 10:38:19 -0500 Subject: [PATCH 24/26] chore: format --- extensions/warp-ipfs/src/store/discovery.rs | 28 +++++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 6ed87f2eb..282e51c80 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -506,7 +506,8 @@ struct DiscoveryTask { discovery_request_st: BoxStream<'static, (PeerId, InboundRequestId, Bytes)>, - responsing_fut: FutureMap<(PeerId, InboundRequestId), BoxFuture<'static, Result<(), anyhow::Error>>>, + responsing_fut: + FutureMap<(PeerId, InboundRequestId), BoxFuture<'static, Result<(), anyhow::Error>>>, discovery_fut: Option, Error>>>, @@ -794,9 +795,10 @@ impl Future for DiscoveryTask { .build() .expect("valid payload"); let bytes = pl.to_bytes().expect("valid payload"); - self.responsing_fut.insert((peer_id, id), async move { - ipfs.send_response(peer_id, id, bytes).await - }.boxed()); + self.responsing_fut.insert( + (peer_id, id), + async move { ipfs.send_response(peer_id, id, bytes).await }.boxed(), + ); continue; }; @@ -806,9 +808,10 @@ impl Future for DiscoveryTask { .build() .expect("valid payload"); let bytes = pl.to_bytes().expect("valid payload"); - self.responsing_fut.insert((peer_id, id), async move { - ipfs.send_response(peer_id, id, bytes).await - }.boxed()); + self.responsing_fut.insert( + (peer_id, id), + async move { ipfs.send_response(peer_id, id, bytes).await }.boxed(), + ); continue; } @@ -821,12 +824,15 @@ impl Future for DiscoveryTask { } }; - self.responsing_fut.insert((peer_id, id), async move { - ipfs.send_response(peer_id, id, bytes).await - }.boxed()); + self.responsing_fut.insert( + (peer_id, id), + async move { ipfs.send_response(peer_id, id, bytes).await }.boxed(), + ); } - while let Poll::Ready(Some(((peer_id, id), result))) = self.responsing_fut.poll_next_unpin(cx) { + while let Poll::Ready(Some(((peer_id, id), result))) = + self.responsing_fut.poll_next_unpin(cx) + { if let Err(e) = result { tracing::error!(%peer_id, error = %e, %id, "unable to respond to peer request"); } From b7e1a0b1bee7c74b075b149089ada615397a4a51 Mon Sep 17 00:00:00 2001 From: Darius Date: Fri, 20 Dec 2024 17:46:38 -0600 Subject: [PATCH 25/26] fix: use correct message opt --- extensions/warp-ipfs/src/store/discovery.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 282e51c80..a10d36a51 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -361,7 +361,7 @@ impl DiscoveryPeerTask { let payload: PayloadMessage = PayloadMessage::from_bytes(&response)?; - match payload.message() { + match payload.message(None)? { DiscoveryResponse::Pong => {} _ => return Err(Error::Other), } @@ -815,13 +815,19 @@ impl Future for DiscoveryTask { continue; } - let bytes = match payload.message() { - DiscoveryRequest::Ping => { + let bytes = match payload.message(None) { + Ok(DiscoveryRequest::Ping) => { let pl = PayloadBuilder::new(&self.keypair, DiscoveryResponse::Pong) .build() .expect("valid payload"); pl.to_bytes().expect("valid payload") } + _ => { + let pl = PayloadBuilder::new(&self.keypair, DiscoveryResponse::InvalidRequest) + .build() + .expect("valid payload"); + pl.to_bytes().expect("valid payload") + } }; self.responsing_fut.insert( From 44c05f80c20ba595818d1353a37c12324f6274d9 Mon Sep 17 00:00:00 2001 From: Darius Date: Tue, 31 Dec 2024 07:33:06 -0500 Subject: [PATCH 26/26] chore: wake up task --- extensions/warp-ipfs/src/store/discovery.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/extensions/warp-ipfs/src/store/discovery.rs b/extensions/warp-ipfs/src/store/discovery.rs index 80132061b..32a69f7a2 100644 --- a/extensions/warp-ipfs/src/store/discovery.rs +++ b/extensions/warp-ipfs/src/store/discovery.rs @@ -363,6 +363,10 @@ impl DiscoveryPeerTask { }; self.ping_fut = Some(Box::pin(fut)); + + if let Some(waker) = self.waker.take() { + waker.wake(); + } } } @@ -603,6 +607,10 @@ impl DiscoveryTask { } DiscoveryConfig::None => {} } + + if let Some(waker) = self.waker.take() { + waker.wake(); + } } pub fn publish_discovery(&mut self) { @@ -657,6 +665,10 @@ impl DiscoveryTask { }; self.discovery_publish_fut = Some(fut); + + if let Some(waker) = self.waker.take() { + waker.wake(); + } } pub fn dht_discovery(&mut self, namespace: String) { @@ -679,6 +691,10 @@ impl DiscoveryTask { }; self.discovery_fut.replace(Box::pin(fut)); + + if let Some(waker) = self.waker.take() { + waker.wake(); + } } pub fn rz_discovery(&mut self, namespace: String, rz_peer_id: PeerId) { @@ -699,6 +715,10 @@ impl DiscoveryTask { Ok::<_, Error>(peers) }; self.discovery_fut.replace(Box::pin(fut)); + + if let Some(waker) = self.waker.take() { + waker.wake(); + } } }