From 104832058e337d5d90063f5ba4a53df33e6003fa Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 22 Aug 2024 15:15:19 +0000 Subject: [PATCH 01/16] Introduce `ADD_PROVIDER` query --- src/protocol/libp2p/kademlia/handle.rs | 36 ++++++- src/protocol/libp2p/kademlia/mod.rs | 113 +++++++++++++++++++--- src/protocol/libp2p/kademlia/query/mod.rs | 18 ++-- 3 files changed, 145 insertions(+), 22 deletions(-) diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index 5d3b4630..f1b4c218 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -130,6 +130,18 @@ pub(crate) enum KademliaCommand { query_id: QueryId, }, + /// Register as a content provider for `key`. + StartProviding { + /// Provided key. + key: RecordKey, + + /// Our external addresses to publish. + public_addresses: Vec, + + /// Query ID for the query. + query_id: QueryId, + }, + /// Store record locally. StoreRecord { // Record. @@ -175,7 +187,8 @@ pub enum KademliaEvent { }, /// `PUT_VALUE` query succeeded. - PutRecordSucess { + // TODO: this is never emitted. Implement + add `AddProviderSuccess`. + PutRecordSuccess { /// Query ID. query_id: QueryId, @@ -299,6 +312,27 @@ impl KademliaHandle { query_id } + /// Register as a content provider on the DHT. + /// + /// Register the local peer ID & its `public_addresses` as a provider for a given `key`. + pub async fn start_providing( + &mut self, + key: RecordKey, + public_addresses: Vec, + ) -> QueryId { + let query_id = self.next_query_id(); + let _ = self + .cmd_tx + .send(KademliaCommand::StartProviding { + key, + public_addresses, + query_id, + }) + .await; + + query_id + } + /// Store the record in the local store. Used in combination with /// [`IncomingRecordValidationMode::Manual`]. pub async fn store_record(&mut self, record: Record) { diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index b59a1fcf..c0f809e5 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -791,7 +791,11 @@ impl Kademlia { event = self.service.next() => match event { Some(TransportEvent::ConnectionEstablished { peer, .. }) => { if let Err(error) = self.on_connection_established(peer) { - tracing::debug!(target: LOG_TARGET, ?error, "failed to handle established connection"); + tracing::debug!( + target: LOG_TARGET, + ?error, + "failed to handle established connection", + ); } } Some(TransportEvent::ConnectionClosed { peer }) => { @@ -801,7 +805,10 @@ impl Kademlia { match direction { Direction::Inbound => self.on_inbound_substream(peer, substream).await, Direction::Outbound(substream_id) => { - if let Err(error) = self.on_outbound_substream(peer, substream_id, substream).await { + if let Err(error) = self + .on_outbound_substream(peer, substream_id, substream) + .await + { tracing::debug!( target: LOG_TARGET, ?peer, @@ -816,7 +823,8 @@ impl Kademlia { Some(TransportEvent::SubstreamOpenFailure { substream, error }) => { self.on_substream_open_failure(substream, error).await; } - Some(TransportEvent::DialFailure { peer, address }) => self.on_dial_failure(peer, address), + Some(TransportEvent::DialFailure { peer, address }) => + self.on_dial_failure(peer, address), None => return Err(Error::EssentialTaskClosed), }, context = self.executor.next() => { @@ -824,14 +832,32 @@ impl Kademlia { match result { QueryResult::SendSuccess { substream } => { - tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "message sent to peer"); + tracing::trace!( + target: LOG_TARGET, + ?peer, + query = ?query_id, + "message sent to peer", + ); let _ = substream.close().await; } QueryResult::ReadSuccess { substream, message } => { - tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "message read from peer"); + tracing::trace!(target: LOG_TARGET, + ?peer, + query = ?query_id, + "message read from peer", + ); - if let Err(error) = self.on_message_received(peer, query_id, message, substream).await { - tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to process message"); + if let Err(error) = self.on_message_received( + peer, + query_id, + message, + substream + ).await { + tracing::debug!(target: LOG_TARGET, + ?peer, + ?error, + "failed to process message", + ); } } QueryResult::SubstreamClosed | QueryResult::Timeout => { @@ -850,22 +876,36 @@ impl Kademlia { command = self.cmd_rx.recv() => { match command { Some(KademliaCommand::FindNode { peer, query_id }) => { - tracing::debug!(target: LOG_TARGET, ?peer, query = ?query_id, "starting `FIND_NODE` query"); + tracing::debug!( + target: LOG_TARGET, + ?peer, + query = ?query_id, + "starting `FIND_NODE` query", + ); self.engine.start_find_node( query_id, peer, - self.routing_table.closest(Key::from(peer), self.replication_factor).into() + self.routing_table + .closest(Key::from(peer), self.replication_factor) + .into() ); } Some(KademliaCommand::PutRecord { mut record, query_id }) => { - tracing::debug!(target: LOG_TARGET, query = ?query_id, key = ?record.key, "store record to DHT"); + tracing::debug!( + target: LOG_TARGET, + query = ?query_id, + key = ?record.key, + "store record to DHT", + ); // For `PUT_VALUE` requests originating locally we are always the publisher. record.publisher = Some(self.local_key.clone().into_preimage()); // Make sure TTL is set. - record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl)); + record.expires = record + .expires + .or_else(|| Some(Instant::now() + self.record_ttl)); let key = Key::new(record.key.clone()); @@ -877,11 +917,23 @@ impl Kademlia { self.routing_table.closest(key, self.replication_factor).into(), ); } - Some(KademliaCommand::PutRecordToPeers { mut record, query_id, peers, update_local_store }) => { - tracing::debug!(target: LOG_TARGET, query = ?query_id, key = ?record.key, "store record to DHT to specified peers"); + Some(KademliaCommand::PutRecordToPeers { + mut record, + query_id, + peers, + update_local_store, + }) => { + tracing::debug!( + target: LOG_TARGET, + query = ?query_id, + key = ?record.key, + "store record to DHT to specified peers", + ); // Make sure TTL is set. - record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl)); + record.expires = record + .expires + .or_else(|| Some(Instant::now() + self.record_ttl)); if update_local_store { self.store.put(record.clone()); @@ -895,7 +947,8 @@ impl Kademlia { match self.routing_table.entry(Key::from(peer)) { KBucketEntry::Occupied(entry) => Some(entry.clone()), - KBucketEntry::Vacant(entry) if !entry.addresses.is_empty() => Some(entry.clone()), + KBucketEntry::Vacant(entry) if !entry.addresses.is_empty() => + Some(entry.clone()), _ => None, } }).collect(); @@ -906,6 +959,36 @@ impl Kademlia { peers, ); } + Some(KademliaCommand::StartProviding { + key, + public_addresses, + query_id + }) => { + tracing::debug!( + target: LOG_TARGET, + query = ?query_id, + ?key, + ?public_addresses, + "register as content provider" + ); + + let provider = ProviderRecord { + key: key.clone(), + provider: self.service.local_peer_id, + addresses: public_addresses, + expires: Instant::now() + self.provider_ttl, + }; + + self.store.put_provider(provider); + + self.engine.start_add_provider( + query_id, + provider, + self.routing_table + .closest(Key::new(key), self.replication_factor) + .into(), + ); + } Some(KademliaCommand::GetRecord { key, quorum, query_id }) => { tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT"); diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index f29af805..da293556 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -25,7 +25,7 @@ use crate::{ find_node::{FindNodeConfig, FindNodeContext}, get_record::{GetRecordConfig, GetRecordContext}, }, - record::{Key as RecordKey, Record}, + record::{Key as RecordKey, ProviderRecord, Record}, types::{KademliaPeer, Key}, PeerRecord, Quorum, }, @@ -45,8 +45,6 @@ mod get_record; /// Logging target for the file. const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query"; -// TODO: store record key instead of the actual record - /// Type representing a query ID. #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] pub struct QueryId(pub usize); @@ -56,7 +54,7 @@ pub struct QueryId(pub usize); enum QueryType { /// `FIND_NODE` query. FindNode { - /// Context for the `FIND_NODE` query + /// Context for the `FIND_NODE` query. context: FindNodeContext, }, @@ -65,7 +63,7 @@ enum QueryType { /// Record that needs to be stored. record: Record, - /// Context for the `FIND_NODE` query + /// Context for the `FIND_NODE` query. context: FindNodeContext, }, @@ -83,6 +81,15 @@ enum QueryType { /// Context for the `GET_VALUE` query. context: GetRecordContext, }, + + /// `ADD_PROVIDER` query. + AddProvider { + /// Provider record that need to be stored. + provider: ProviderRecord, + + /// Context for the `FIND_NODE` query. + context: FindNodeConfig, + }, } /// Query action. @@ -131,7 +138,6 @@ pub enum QueryAction { records: Vec, }, - // TODO: remove /// Query succeeded. QuerySucceeded { /// ID of the query that succeeded. From 778078b52790e82d6674e73a07cc32be5353dd9f Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Fri, 23 Aug 2024 15:33:22 +0000 Subject: [PATCH 02/16] Execute `ADD_PROVIDER` query --- src/protocol/libp2p/kademlia/message.rs | 7 ++- src/protocol/libp2p/kademlia/mod.rs | 60 ++++++++++++++++++++-- src/protocol/libp2p/kademlia/query/mod.rs | 61 ++++++++++++++++++++++- 3 files changed, 120 insertions(+), 8 deletions(-) diff --git a/src/protocol/libp2p/kademlia/message.rs b/src/protocol/libp2p/kademlia/message.rs index 4f53fbc1..bba2b285 100644 --- a/src/protocol/libp2p/kademlia/message.rs +++ b/src/protocol/libp2p/kademlia/message.rs @@ -172,8 +172,7 @@ impl KademliaMessage { } /// Create `ADD_PROVIDER` message with `provider`. - #[allow(unused)] - pub fn add_provider(provider: ProviderRecord) -> Vec { + pub fn add_provider(provider: ProviderRecord) -> Bytes { let peer = KademliaPeer::new( provider.provider, provider.addresses, @@ -187,10 +186,10 @@ impl KademliaMessage { ..Default::default() }; - let mut buf = Vec::with_capacity(message.encoded_len()); + let mut buf = BytesMut::with_capacity(message.encoded_len()); message.encode(&mut buf).expect("Vec to provide needed capacity"); - buf + buf.freeze() } /// Create `GET_PROVIDERS` request for `key`. diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index c0f809e5..fc6373cd 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -83,13 +83,16 @@ mod schema { } /// Peer action. -#[derive(Debug)] +#[derive(Debug, Clone)] enum PeerAction { /// Send `FIND_NODE` message to peer. SendFindNode(QueryId), /// Send `PUT_VALUE` message to peer. SendPutValue(Bytes), + + /// Send `ADD_PROVIDER` message to peer. + SendAddProvider(Bytes), } /// Peer context. @@ -335,7 +338,12 @@ impl Kademlia { } } Some(PeerAction::SendPutValue(message)) => { - tracing::trace!(target: LOG_TARGET, ?peer, "send `PUT_VALUE` response"); + tracing::trace!(target: LOG_TARGET, ?peer, "send `PUT_VALUE` message"); + + self.executor.send_message(peer, message, substream); + } + Some(PeerAction::SendAddProvider(message)) => { + tracing::trace!(target: LOG_TARGET, ?peer, "send `ADD_PROVIDER` message"); self.executor.send_message(peer, message, substream); } @@ -755,6 +763,52 @@ impl Kademlia { Ok(()) } + QueryAction::AddProviderToFoundNodes { provider, peers } => { + tracing::trace!( + target: LOG_TARGET, + provided_key = ?provider.key, + num_peers = ?peers.len(), + "add provider record to found peers", + ); + + let provided_key = provider.key.clone(); + let message = KademliaMessage::add_provider(provider); + let peer_action = PeerAction::SendAddProvider(message); + + for peer in peers { + match self.service.open_substream(peer.peer) { + Ok(substream_id) => { + self.pending_substreams.insert(substream_id, peer.peer); + self.peers + .entry(peer.peer) + .or_default() + .pending_actions + .insert(substream_id, peer_action.clone()); + } + Err(_) => match self.service.dial(&peer.peer) { + Ok(_) => match self.pending_dials.entry(peer.peer) { + Entry::Occupied(entry) => { + entry.into_mut().push(peer_action.clone()); + } + Entry::Vacant(entry) => { + entry.insert(vec![peer_action.clone()]); + } + }, + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + ?peer, + ?provided_key, + ?error, + "failed to dial peer", + ) + } + }, + } + } + + Ok(()) + } QueryAction::GetRecordQueryDone { query_id, records } => { let _ = self .event_tx @@ -979,7 +1033,7 @@ impl Kademlia { expires: Instant::now() + self.provider_ttl, }; - self.store.put_provider(provider); + self.store.put_provider(provider.clone()); self.engine.start_add_provider( query_id, diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index da293556..34f6e84e 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -88,7 +88,7 @@ enum QueryType { provider: ProviderRecord, /// Context for the `FIND_NODE` query. - context: FindNodeConfig, + context: FindNodeContext, }, } @@ -129,6 +129,15 @@ pub enum QueryAction { peers: Vec, }, + /// Add the provider record to nodes closest to the target key. + AddProviderToFoundNodes { + /// Provider record. + provider: ProviderRecord, + + /// Peers for whom the `ADD_PROVIDER` must be sent to. + peers: Vec, + }, + /// `GET_VALUE` query succeeded. GetRecordQueryDone { /// Query ID. @@ -314,6 +323,41 @@ impl QueryEngine { query_id } + /// Start `ADD_PROVIDER` query. + pub fn start_add_provider( + &mut self, + query_id: QueryId, + provider: ProviderRecord, + candidates: VecDeque, + ) -> QueryId { + tracing::debug!( + target: LOG_TARGET, + ?query_id, + ?provider, + num_peers = ?candidates.len(), + "start `ADD_PROVIDER` query", + ); + + let target = Key::new(provider.key.clone()); + let config = FindNodeConfig { + local_peer_id: self.local_peer_id, + replication_factor: self.replication_factor, + parallelism_factor: self.parallelism_factor, + query: query_id, + target, + }; + + self.queries.insert( + query_id, + QueryType::AddProvider { + provider, + context: FindNodeContext::new(config, candidates), + }, + ); + + query_id + } + /// Register response failure from a queried peer. pub fn register_response_failure(&mut self, query: QueryId, peer: PeerId) { tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response failure"); @@ -334,6 +378,9 @@ impl QueryEngine { Some(QueryType::GetRecord { context }) => { context.register_response_failure(peer); } + Some(QueryType::AddProvider { context, .. }) => { + context.register_response_failure(peer); + } } } @@ -369,6 +416,12 @@ impl QueryEngine { } _ => unreachable!(), }, + Some(QueryType::AddProvider { context, .. }) => match message { + KademliaMessage::FindNode { peers, .. } => { + context.register_response(peer, peers); + } + _ => unreachable!(), + }, } } @@ -385,6 +438,7 @@ impl QueryEngine { Some(QueryType::PutRecord { context, .. }) => context.next_peer_action(peer), Some(QueryType::PutRecordToPeers { context, .. }) => context.next_peer_action(peer), Some(QueryType::GetRecord { context }) => context.next_peer_action(peer), + Some(QueryType::AddProvider { context, .. }) => context.next_peer_action(peer), } } @@ -409,6 +463,10 @@ impl QueryEngine { query_id: context.config.query, records: context.found_records(), }, + QueryType::AddProvider { provider, context } => QueryAction::AddProviderToFoundNodes { + provider, + peers: context.responses.into_values().collect::>(), + }, } } @@ -428,6 +486,7 @@ impl QueryEngine { QueryType::PutRecord { context, .. } => context.next_action(), QueryType::PutRecordToPeers { context, .. } => context.next_action(), QueryType::GetRecord { context } => context.next_action(), + QueryType::AddProvider { context, .. } => context.next_action(), }; match action { From 323a26f58067a7f25c2b56442a5b9cbf9244a4e1 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 27 Aug 2024 14:15:30 +0000 Subject: [PATCH 03/16] Introduce local providers in `MemoryStore` --- src/protocol/libp2p/kademlia/store.rs | 140 +++++++++++++++++--------- 1 file changed, 91 insertions(+), 49 deletions(-) diff --git a/src/protocol/libp2p/kademlia/store.rs b/src/protocol/libp2p/kademlia/store.rs index fced9372..8d255913 100644 --- a/src/protocol/libp2p/kademlia/store.rs +++ b/src/protocol/libp2p/kademlia/store.rs @@ -21,8 +21,12 @@ //! Memory store implementation for Kademlia. #![allow(unused)] -use crate::protocol::libp2p::kademlia::record::{Key, ProviderRecord, Record}; +use crate::{ + protocol::libp2p::kademlia::record::{Key, ProviderRecord, Record}, + PeerId, +}; +use futures::{future::BoxFuture, stream::FuturesUnordered}; use std::{ collections::{hash_map::Entry, HashMap}, num::NonZeroUsize, @@ -36,30 +40,42 @@ pub enum MemoryStoreEvent {} /// Memory store. pub struct MemoryStore { + /// Local peer ID. Used to track local providers. + local_peer_id: PeerId, + /// Configuration. + config: MemoryStoreConfig, /// Records. records: HashMap, /// Provider records. provider_keys: HashMap>, - /// Configuration. - config: MemoryStoreConfig, + /// Local providers. + local_providers: HashMap, + /// Futures to signal it's time to republish a local provider. + pending_provider_republish: FuturesUnordered>, } impl MemoryStore { /// Create new [`MemoryStore`]. - pub fn new() -> Self { + pub fn new(local_peer_id: PeerId) -> Self { Self { + local_peer_id, + config: MemoryStoreConfig::default(), records: HashMap::new(), provider_keys: HashMap::new(), - config: MemoryStoreConfig::default(), + local_providers: HashMap::new(), + pending_provider_republish: FuturesUnordered::new(), } } /// Create new [`MemoryStore`] with the provided configuration. - pub fn with_config(config: MemoryStoreConfig) -> Self { + pub fn with_config(local_peer_id: PeerId, config: MemoryStoreConfig) -> Self { Self { + local_peer_id, + config, records: HashMap::new(), provider_keys: HashMap::new(), - config, + local_providers: HashMap::new(), + pending_provider_republish: FuturesUnordered::new(), } } @@ -263,7 +279,7 @@ mod tests { #[test] fn put_get_record() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let key = Key::from(vec![1, 2, 3]); let record = Record::new(key.clone(), vec![4, 5, 6]); @@ -273,11 +289,14 @@ mod tests { #[test] fn max_records() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_records: 1, - max_record_size_bytes: 1024, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_records: 1, + max_record_size_bytes: 1024, + ..Default::default() + }, + ); let key1 = Key::from(vec![1, 2, 3]); let key2 = Key::from(vec![4, 5, 6]); @@ -293,7 +312,7 @@ mod tests { #[test] fn expired_record_removed() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let key = Key::from(vec![1, 2, 3]); let record = Record { key: key.clone(), @@ -310,7 +329,7 @@ mod tests { #[test] fn new_record_overwrites() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let key = Key::from(vec![1, 2, 3]); let record1 = Record { key: key.clone(), @@ -334,11 +353,14 @@ mod tests { #[test] fn max_record_size() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_records: 1024, - max_record_size_bytes: 2, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_records: 1024, + max_record_size_bytes: 2, + ..Default::default() + }, + ); let key = Key::from(vec![1, 2, 3]); let record = Record::new(key.clone(), vec![4, 5]); @@ -352,7 +374,7 @@ mod tests { #[test] fn put_get_provider() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let provider = ProviderRecord { key: Key::from(vec![1, 2, 3]), provider: PeerId::random(), @@ -366,7 +388,7 @@ mod tests { #[test] fn multiple_providers_per_key() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let key = Key::from(vec![1, 2, 3]); let provider1 = ProviderRecord { key: key.clone(), @@ -392,7 +414,7 @@ mod tests { #[test] fn providers_sorted_by_distance() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let key = Key::from(vec![1, 2, 3]); let providers = (0..10) .map(|_| ProviderRecord { @@ -418,10 +440,13 @@ mod tests { #[test] fn max_providers_per_key() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_providers_per_key: 10, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_providers_per_key: 10, + ..Default::default() + }, + ); let key = Key::from(vec![1, 2, 3]); let providers = (0..20) .map(|_| ProviderRecord { @@ -440,10 +465,13 @@ mod tests { #[test] fn closest_providers_kept() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_providers_per_key: 10, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_providers_per_key: 10, + ..Default::default() + }, + ); let key = Key::from(vec![1, 2, 3]); let providers = (0..20) .map(|_| ProviderRecord { @@ -470,10 +498,13 @@ mod tests { #[test] fn furthest_provider_discarded() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_providers_per_key: 10, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_providers_per_key: 10, + ..Default::default() + }, + ); let key = Key::from(vec![1, 2, 3]); let providers = (0..11) .map(|_| ProviderRecord { @@ -503,10 +534,13 @@ mod tests { #[test] fn update_provider_in_place() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_providers_per_key: 10, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_providers_per_key: 10, + ..Default::default() + }, + ); let key = Key::from(vec![1, 2, 3]); let peer_ids = (0..10).map(|_| PeerId::random()).collect::>(); let peer_id0 = peer_ids[0]; @@ -558,7 +592,7 @@ mod tests { #[test] fn provider_record_expires() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let provider = ProviderRecord { key: Key::from(vec![1, 2, 3]), provider: PeerId::random(), @@ -575,7 +609,7 @@ mod tests { #[test] fn individual_provider_record_expires() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let key = Key::from(vec![1, 2, 3]); let provider1 = ProviderRecord { key: key.clone(), @@ -600,10 +634,13 @@ mod tests { #[test] fn max_addresses_per_provider() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_provider_addresses: 2, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_provider_addresses: 2, + ..Default::default() + }, + ); let key = Key::from(vec![1, 2, 3]); let provider = ProviderRecord { key: Key::from(vec![1, 2, 3]), @@ -628,10 +665,13 @@ mod tests { #[test] fn max_provider_keys() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_provider_keys: 2, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_provider_keys: 2, + ..Default::default() + }, + ); let provider1 = ProviderRecord { key: Key::from(vec![1, 2, 3]), @@ -660,4 +700,6 @@ mod tests { assert_eq!(store.get_providers(&provider2.key), vec![provider2]); assert_eq!(store.get_providers(&provider3.key), vec![]); } + + // TODO: test local providers. } From 8946809921642715a36baa7ea0c87b91a271ef99 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 28 Aug 2024 08:03:29 +0000 Subject: [PATCH 04/16] Add provider refresh interval to Kademlia config --- src/protocol/libp2p/kademlia/config.rs | 26 ++++++++++++++++++++++++-- src/protocol/libp2p/kademlia/mod.rs | 13 +++++++++++-- src/protocol/libp2p/kademlia/store.rs | 10 +++++++++- 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/src/protocol/libp2p/kademlia/config.rs b/src/protocol/libp2p/kademlia/config.rs index 8a02a3e3..c9e79050 100644 --- a/src/protocol/libp2p/kademlia/config.rs +++ b/src/protocol/libp2p/kademlia/config.rs @@ -39,6 +39,9 @@ const DEFAULT_TTL: Duration = Duration::from_secs(36 * 60 * 60); /// Default provider record TTL. const DEFAULT_PROVIDER_TTL: Duration = Duration::from_secs(48 * 60 * 60); +/// Default provider republish interval. +pub(super) const DEFAULT_PROVIDER_REFRESH_INTERVAL: Duration = Duration::from_secs(22 * 60 * 60); + /// Protocol name. const PROTOCOL_NAME: &str = "/ipfs/kad/1.0.0"; @@ -74,6 +77,9 @@ pub struct Config { /// Provider record TTL. pub(super) provider_ttl: Duration, + /// Provider republish interval. + pub(super) provider_refresh_interval: Duration, + /// TX channel for sending events to `KademliaHandle`. pub(super) event_tx: Sender, @@ -90,6 +96,7 @@ impl Config { validation_mode: IncomingRecordValidationMode, record_ttl: Duration, provider_ttl: Duration, + provider_refresh_interval: Duration, ) -> (Self, KademliaHandle) { let (cmd_tx, cmd_rx) = channel(DEFAULT_CHANNEL_SIZE); let (event_tx, event_rx) = channel(DEFAULT_CHANNEL_SIZE); @@ -106,6 +113,7 @@ impl Config { validation_mode, record_ttl, provider_ttl, + provider_refresh_interval, codec: ProtocolCodec::UnsignedVarint(None), replication_factor, known_peers, @@ -126,6 +134,7 @@ impl Config { IncomingRecordValidationMode::Automatic, DEFAULT_TTL, DEFAULT_PROVIDER_TTL, + DEFAULT_PROVIDER_REFRESH_INTERVAL, ) } } @@ -151,8 +160,11 @@ pub struct ConfigBuilder { /// Default TTL for the records. pub(super) record_ttl: Duration, - /// Default TTL for the provider records. + /// TTL for the provider records. pub(super) provider_ttl: Duration, + + /// Republish interval for the provider records. + pub(super) provider_refresh_interval: Duration, } impl Default for ConfigBuilder { @@ -172,6 +184,7 @@ impl ConfigBuilder { validation_mode: IncomingRecordValidationMode::Automatic, record_ttl: DEFAULT_TTL, provider_ttl: DEFAULT_PROVIDER_TTL, + provider_refresh_interval: DEFAULT_PROVIDER_REFRESH_INTERVAL, } } @@ -224,7 +237,7 @@ impl ConfigBuilder { self } - /// Set default TTL for the provider records. Recommended value is 2 * (refresh interval) + 20%. + /// Set TTL for the provider records. Recommended value is 2 * (refresh interval) + 10%. /// /// If unspecified, the default TTL is 48 hours. pub fn with_provider_record_ttl(mut self, provider_record_ttl: Duration) -> Self { @@ -232,6 +245,14 @@ impl ConfigBuilder { self } + /// Set the refresh (republish) interval for provider records. + /// + /// If unspecified, the default interval is 22 hours. + pub fn with_provider_refresh_interval(mut self, provider_refresh_interval: Duration) -> Self { + self.provider_refresh_interval = provider_refresh_interval; + self + } + /// Build Kademlia [`Config`]. pub fn build(self) -> (Config, KademliaHandle) { Config::new( @@ -242,6 +263,7 @@ impl ConfigBuilder { self.validation_mode, self.record_ttl, self.provider_ttl, + self.provider_refresh_interval, ) } } diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index fc6373cd..4a8f163a 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -31,7 +31,7 @@ use crate::{ query::{QueryAction, QueryEngine}, record::ProviderRecord, routing_table::RoutingTable, - store::MemoryStore, + store::{MemoryStore, MemoryStoreConfig}, types::{ConnectionType, KademliaPeer, Key}, }, Direction, TransportEvent, TransportService, @@ -181,12 +181,20 @@ impl Kademlia { service.add_known_address(&peer, addresses.into_iter()); } + let store = MemoryStore::with_config( + local_peer_id, + MemoryStoreConfig { + provider_refresh_interval: config.provider_refresh_interval, + ..Default::default() + }, + ); + Self { service, routing_table, peers: HashMap::new(), cmd_rx: config.cmd_rx, - store: MemoryStore::new(), + store, event_tx: config.event_tx, local_key, pending_dials: HashMap::new(), @@ -1156,6 +1164,7 @@ mod tests { validation_mode: IncomingRecordValidationMode::Automatic, record_ttl: Duration::from_secs(36 * 60 * 60), provider_ttl: Duration::from_secs(48 * 60 * 60), + provider_refresh_interval: Duration::from_secs(22 * 60 * 60), event_tx, cmd_rx, }; diff --git a/src/protocol/libp2p/kademlia/store.rs b/src/protocol/libp2p/kademlia/store.rs index 8d255913..96321340 100644 --- a/src/protocol/libp2p/kademlia/store.rs +++ b/src/protocol/libp2p/kademlia/store.rs @@ -22,7 +22,10 @@ #![allow(unused)] use crate::{ - protocol::libp2p::kademlia::record::{Key, ProviderRecord, Record}, + protocol::libp2p::kademlia::{ + config::DEFAULT_PROVIDER_REFRESH_INTERVAL, + record::{Key, ProviderRecord, Record}, + }, PeerId, }; @@ -30,6 +33,7 @@ use futures::{future::BoxFuture, stream::FuturesUnordered}; use std::{ collections::{hash_map::Entry, HashMap}, num::NonZeroUsize, + time::Duration, }; /// Logging target for the file. @@ -254,6 +258,9 @@ pub struct MemoryStoreConfig { /// Maximum number of providers per key. Only providers with peer IDs closest to the key are /// kept. pub max_providers_per_key: usize, + + /// Local providers republish interval. + pub provider_refresh_interval: Duration, } impl Default for MemoryStoreConfig { @@ -264,6 +271,7 @@ impl Default for MemoryStoreConfig { max_provider_keys: 1024, max_provider_addresses: 30, max_providers_per_key: 20, + provider_refresh_interval: DEFAULT_PROVIDER_REFRESH_INTERVAL, } } } From d02aceee693793404779e1f9ad6fc692e04aa430 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 28 Aug 2024 13:28:09 +0000 Subject: [PATCH 05/16] Move `FuturesStream` to a separate file --- src/protocol/libp2p/kademlia/executor.rs | 58 ++------------- .../libp2p/kademlia/futures_stream.rs | 74 +++++++++++++++++++ src/protocol/libp2p/kademlia/mod.rs | 1 + 3 files changed, 82 insertions(+), 51 deletions(-) create mode 100644 src/protocol/libp2p/kademlia/futures_stream.rs diff --git a/src/protocol/libp2p/kademlia/executor.rs b/src/protocol/libp2p/kademlia/executor.rs index 9b8bd8ce..5d695346 100644 --- a/src/protocol/libp2p/kademlia/executor.rs +++ b/src/protocol/libp2p/kademlia/executor.rs @@ -18,15 +18,18 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{protocol::libp2p::kademlia::query::QueryId, substream::Substream, PeerId}; +use crate::{ + protocol::libp2p::kademlia::{futures_stream::FuturesStream, query::QueryId}, + substream::Substream, + PeerId, +}; use bytes::{Bytes, BytesMut}; -use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt}; +use futures::{future::BoxFuture, Stream, StreamExt}; use std::{ - future::Future, pin::Pin, - task::{Context, Poll, Waker}, + task::{Context, Poll}, time::Duration, }; @@ -71,53 +74,6 @@ pub struct QueryContext { pub result: QueryResult, } -/// Wrapper around [`FuturesUnordered`] that wakes a task up automatically. -#[derive(Default)] -pub struct FuturesStream { - futures: FuturesUnordered, - waker: Option, -} - -impl FuturesStream { - /// Create new [`FuturesStream`]. - pub fn new() -> Self { - Self { - futures: FuturesUnordered::new(), - waker: None, - } - } - - /// Push a future for processing. - pub fn push(&mut self, future: F) { - self.futures.push(future); - - if let Some(waker) = self.waker.take() { - waker.wake(); - } - } -} - -impl Stream for FuturesStream { - type Item = ::Output; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let Poll::Ready(Some(result)) = self.futures.poll_next_unpin(cx) else { - // We must save the current waker to wake up the task when new futures are inserted. - // - // Otherwise, simply returning `Poll::Pending` here would cause the task to never be - // woken up again. - // - // We were previously relying on some other task from the `loop tokio::select!` to - // finish. - self.waker = Some(cx.waker().clone()); - - return Poll::Pending; - }; - - Poll::Ready(Some(result)) - } -} - /// Query executor. pub struct QueryExecutor { /// Pending futures. diff --git a/src/protocol/libp2p/kademlia/futures_stream.rs b/src/protocol/libp2p/kademlia/futures_stream.rs new file mode 100644 index 00000000..1f208f88 --- /dev/null +++ b/src/protocol/libp2p/kademlia/futures_stream.rs @@ -0,0 +1,74 @@ +// Copyright 2024 litep2p developers +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::{stream::FuturesUnordered, Stream, StreamExt}; + +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll, Waker}, +}; + +/// Wrapper around [`FuturesUnordered`] that wakes a task up automatically. +#[derive(Default)] +pub struct FuturesStream { + futures: FuturesUnordered, + waker: Option, +} + +impl FuturesStream { + /// Create new [`FuturesStream`]. + pub fn new() -> Self { + Self { + futures: FuturesUnordered::new(), + waker: None, + } + } + + /// Push a future for processing. + pub fn push(&mut self, future: F) { + self.futures.push(future); + + if let Some(waker) = self.waker.take() { + waker.wake(); + } + } +} + +impl Stream for FuturesStream { + type Item = ::Output; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Poll::Ready(Some(result)) = self.futures.poll_next_unpin(cx) else { + // We must save the current waker to wake up the task when new futures are inserted. + // + // Otherwise, simply returning `Poll::Pending` here would cause the task to never be + // woken up again. + // + // We were previously relying on some other task from the `loop tokio::select!` to + // finish. + self.waker = Some(cx.waker().clone()); + + return Poll::Pending; + }; + + Poll::Ready(Some(result)) + } +} diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 4a8f163a..63f9fcac 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -68,6 +68,7 @@ const PARALLELISM_FACTOR: usize = 3; mod bucket; mod config; mod executor; +mod futures_stream; mod handle; mod message; mod query; From 5432c9b0a0bd7d1db4e526de934f9cbbccb6ef3c Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 28 Aug 2024 14:07:31 +0000 Subject: [PATCH 06/16] Refresh providers: dry-run without network queries --- .../libp2p/kademlia/futures_stream.rs | 2 + src/protocol/libp2p/kademlia/mod.rs | 29 ++++++++- src/protocol/libp2p/kademlia/store.rs | 65 ++++++++++++++----- 3 files changed, 79 insertions(+), 17 deletions(-) diff --git a/src/protocol/libp2p/kademlia/futures_stream.rs b/src/protocol/libp2p/kademlia/futures_stream.rs index 1f208f88..9c7d8039 100644 --- a/src/protocol/libp2p/kademlia/futures_stream.rs +++ b/src/protocol/libp2p/kademlia/futures_stream.rs @@ -27,6 +27,8 @@ use std::{ }; /// Wrapper around [`FuturesUnordered`] that wakes a task up automatically. +/// The [`Stream`] implemented by [`FuturesStream`] never terminates and can be +/// polled when contains no futures. #[derive(Default)] pub struct FuturesStream { futures: FuturesUnordered, diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 63f9fcac..f89ed728 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -31,7 +31,7 @@ use crate::{ query::{QueryAction, QueryEngine}, record::ProviderRecord, routing_table::RoutingTable, - store::{MemoryStore, MemoryStoreConfig}, + store::{MemoryStore, MemoryStoreAction, MemoryStoreConfig}, types::{ConnectionType, KademliaPeer, Key}, }, Direction, TransportEvent, TransportService, @@ -935,7 +935,7 @@ impl Kademlia { self.disconnect_peer(peer, query_id).await; } } - } + }, command = self.cmd_rx.recv() => { match command { Some(KademliaCommand::FindNode { peer, query_id }) => { @@ -1107,6 +1107,31 @@ impl Kademlia { None => return Err(Error::EssentialTaskClosed), } }, + action = self.store.next_action() => match action { + Some(MemoryStoreAction::RefreshProvider { mut provider }) => { + tracing::trace!( + target: LOG_TARGET, + key = ?provider.key, + "republishing local provider", + ); + + // Make sure to roll expiration time. + provider.expires = Instant::now() + self.provider_ttl; + + self.store.put_provider(provider.clone()); + + + todo!("obtain a query ID and start query"); + // self.engine.start_add_provider( + // query_id, + // provider, + // self.routing_table + // .closest(Key::new(provider.key), self.replication_factor) + // .into(), + // ); + } + None => {} + } } } } diff --git a/src/protocol/libp2p/kademlia/store.rs b/src/protocol/libp2p/kademlia/store.rs index 96321340..e5410f94 100644 --- a/src/protocol/libp2p/kademlia/store.rs +++ b/src/protocol/libp2p/kademlia/store.rs @@ -24,12 +24,13 @@ use crate::{ protocol::libp2p::kademlia::{ config::DEFAULT_PROVIDER_REFRESH_INTERVAL, + futures_stream::FuturesStream, record::{Key, ProviderRecord, Record}, }, PeerId, }; -use futures::{future::BoxFuture, stream::FuturesUnordered}; +use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; use std::{ collections::{hash_map::Entry, HashMap}, num::NonZeroUsize, @@ -40,7 +41,9 @@ use std::{ const LOG_TARGET: &str = "litep2p::ipfs::kademlia::store"; /// Memory store events. -pub enum MemoryStoreEvent {} +pub enum MemoryStoreAction { + RefreshProvider { provider: ProviderRecord }, +} /// Memory store. pub struct MemoryStore { @@ -55,7 +58,7 @@ pub struct MemoryStore { /// Local providers. local_providers: HashMap, /// Futures to signal it's time to republish a local provider. - pending_provider_republish: FuturesUnordered>, + pending_provider_refresh: FuturesStream>, } impl MemoryStore { @@ -67,7 +70,7 @@ impl MemoryStore { records: HashMap::new(), provider_keys: HashMap::new(), local_providers: HashMap::new(), - pending_provider_republish: FuturesUnordered::new(), + pending_provider_refresh: FuturesStream::new(), } } @@ -79,7 +82,7 @@ impl MemoryStore { records: HashMap::new(), provider_keys: HashMap::new(), local_providers: HashMap::new(), - pending_provider_republish: FuturesUnordered::new(), + pending_provider_refresh: FuturesStream::new(), } } @@ -204,9 +207,7 @@ impl MemoryStore { match provider_position { Ok(i) => { // Update the provider in place. - providers[i] = provider_record; - - true + providers[i] = provider_record.clone(); } Err(i) => { // `Err(i)` contains the insertion point. @@ -220,25 +221,59 @@ impl MemoryStore { existing `max_providers_per_key`", ); - false + return false; } else { if providers.len() == usize::from(self.config.max_providers_per_key) { providers.pop(); } - providers.insert(i, provider_record); - - true + providers.insert(i, provider_record.clone()); } } } + + if provider_record.provider == self.local_peer_id { + // We must make sure to refresh the local provider. + let key = provider_record.key.clone(); + let refresh_interval = self.config.provider_refresh_interval; + self.local_providers.insert(key.clone(), provider_record); + self.pending_provider_refresh.push(Box::pin(async move { + tokio::time::sleep(refresh_interval).await; + key + })); + } + + true } } } - /// Poll next event from the store. - async fn next_event() -> Option { - None + /// Poll next action from the store. + pub async fn next_action(&mut self) -> Option { + // [`FuturesStream`] never terminates, so `map()` below is always triggered. + self.pending_provider_refresh + .next() + .await + .map(|key| { + if let Some(provider) = self.local_providers.get(&key).cloned() { + tracing::trace!( + target: LOG_TARGET, + ?key, + "refresh provider" + ); + + Some(MemoryStoreAction::RefreshProvider { provider }) + } else { + tracing::trace!( + target: LOG_TARGET, + ?key, + "it's time to refresh a provider, but we do not provide this key anymore", + ); + + None + } + }) + .flatten() } } From feb493df9c08176a5a544f835ee1436f81c78296 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 29 Aug 2024 13:58:53 +0000 Subject: [PATCH 07/16] Remove `try_get_record()` and other `try_...()` non-async methods --- src/protocol/libp2p/kademlia/handle.rs | 64 -------------------------- 1 file changed, 64 deletions(-) diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index f1b4c218..64c80a61 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -338,70 +338,6 @@ impl KademliaHandle { pub async fn store_record(&mut self, record: Record) { let _ = self.cmd_tx.send(KademliaCommand::StoreRecord { record }).await; } - - /// Try to add known peer and if the channel is clogged, return an error. - pub fn try_add_known_peer(&self, peer: PeerId, addresses: Vec) -> Result<(), ()> { - self.cmd_tx - .try_send(KademliaCommand::AddKnownPeer { peer, addresses }) - .map_err(|_| ()) - } - - /// Try to initiate `FIND_NODE` query and if the channel is clogged, return an error. - pub fn try_find_node(&mut self, peer: PeerId) -> Result { - let query_id = self.next_query_id(); - self.cmd_tx - .try_send(KademliaCommand::FindNode { peer, query_id }) - .map(|_| query_id) - .map_err(|_| ()) - } - - /// Try to initiate `PUT_VALUE` query and if the channel is clogged, return an error. - pub fn try_put_record(&mut self, record: Record) -> Result { - let query_id = self.next_query_id(); - self.cmd_tx - .try_send(KademliaCommand::PutRecord { record, query_id }) - .map(|_| query_id) - .map_err(|_| ()) - } - - /// Try to initiate `PUT_VALUE` query to the given peers and if the channel is clogged, - /// return an error. - pub fn try_put_record_to_peers( - &mut self, - record: Record, - peers: Vec, - update_local_store: bool, - ) -> Result { - let query_id = self.next_query_id(); - self.cmd_tx - .try_send(KademliaCommand::PutRecordToPeers { - record, - query_id, - peers, - update_local_store, - }) - .map(|_| query_id) - .map_err(|_| ()) - } - - /// Try to initiate `GET_VALUE` query and if the channel is clogged, return an error. - pub fn try_get_record(&mut self, key: RecordKey, quorum: Quorum) -> Result { - let query_id = self.next_query_id(); - self.cmd_tx - .try_send(KademliaCommand::GetRecord { - key, - quorum, - query_id, - }) - .map(|_| query_id) - .map_err(|_| ()) - } - - /// Try to store the record in the local store, and if the channel is clogged, return an error. - /// Used in combination with [`IncomingRecordValidationMode::Manual`]. - pub fn try_store_record(&mut self, record: Record) -> Result<(), ()> { - self.cmd_tx.try_send(KademliaCommand::StoreRecord { record }).map_err(|_| ()) - } } impl Stream for KademliaHandle { From 982c73dcf0b02ba03febe4e892d69411bf41a207 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 29 Aug 2024 13:59:44 +0000 Subject: [PATCH 08/16] Move query ID generation from `KademliaHandle` to `Kademlia` --- src/protocol/libp2p/kademlia/handle.rs | 113 +++++++++++++------------ src/protocol/libp2p/kademlia/mod.rs | 37 ++++++-- 2 files changed, 89 insertions(+), 61 deletions(-) diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index 64c80a61..7168b127 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -25,7 +25,10 @@ use crate::{ use futures::Stream; use multiaddr::Multiaddr; -use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::{ + mpsc::{Receiver, Sender}, + oneshot, +}; use std::{ num::NonZeroUsize, @@ -88,8 +91,8 @@ pub(crate) enum KademliaCommand { /// Peer ID. peer: PeerId, - /// Query ID for the query. - query_id: QueryId, + /// Query ID callback. + query_id_tx: oneshot::Sender, }, /// Store record to DHT. @@ -97,8 +100,8 @@ pub(crate) enum KademliaCommand { /// Record. record: Record, - /// Query ID for the query. - query_id: QueryId, + /// Query ID callback. + query_id_tx: oneshot::Sender, }, /// Store record to DHT to the given peers. @@ -108,8 +111,8 @@ pub(crate) enum KademliaCommand { /// Record. record: Record, - /// Query ID for the query. - query_id: QueryId, + /// Query ID callback. + query_id_tx: oneshot::Sender, /// Use the following peers for the put request. peers: Vec, @@ -126,8 +129,8 @@ pub(crate) enum KademliaCommand { /// [`Quorum`] for the query. quorum: Quorum, - /// Query ID for the query. - query_id: QueryId, + /// Query ID callback. + query_id_tx: oneshot::Sender, }, /// Register as a content provider for `key`. @@ -139,7 +142,7 @@ pub(crate) enum KademliaCommand { public_addresses: Vec, /// Query ID for the query. - query_id: QueryId, + query_id_tx: oneshot::Sender, }, /// Store record locally. @@ -232,27 +235,12 @@ pub struct KademliaHandle { /// RX channel for receiving events from `Kademlia`. event_rx: Receiver, - - /// Next query ID. - next_query_id: usize, } impl KademliaHandle { /// Create new [`KademliaHandle`]. pub(super) fn new(cmd_tx: Sender, event_rx: Receiver) -> Self { - Self { - cmd_tx, - event_rx, - next_query_id: 0usize, - } - } - - /// Allocate next query ID. - fn next_query_id(&mut self) -> QueryId { - let query_id = self.next_query_id; - self.next_query_id += 1; - - QueryId(query_id) + Self { cmd_tx, event_rx } } /// Add known peer. @@ -261,19 +249,32 @@ impl KademliaHandle { } /// Send `FIND_NODE` query to known peers. - pub async fn find_node(&mut self, peer: PeerId) -> QueryId { - let query_id = self.next_query_id(); - let _ = self.cmd_tx.send(KademliaCommand::FindNode { peer, query_id }).await; - - query_id + /// + /// Returns [`Err`] only if [`super::Kademlia`] is terminating. + pub async fn find_node(&mut self, peer: PeerId) -> Result { + let (query_id_tx, query_id_rx) = oneshot::channel(); + self.cmd_tx + .send(KademliaCommand::FindNode { peer, query_id_tx }) + .await + .map_err(|_| ())?; + + query_id_rx.await.map_err(|_| ()) } /// Store record to DHT. - pub async fn put_record(&mut self, record: Record) -> QueryId { - let query_id = self.next_query_id(); - let _ = self.cmd_tx.send(KademliaCommand::PutRecord { record, query_id }).await; + /// + /// Returns [`Err`] only if [`super::Kademlia`] is terminating. + pub async fn put_record(&mut self, record: Record) -> Result { + let (query_id_tx, query_id_rx) = oneshot::channel(); + self.cmd_tx + .send(KademliaCommand::PutRecord { + record, + query_id_tx, + }) + .await + .map_err(|_| ())?; - query_id + query_id_rx.await.map_err(|_| ()) } /// Store record to DHT to the given peers. @@ -282,34 +283,34 @@ impl KademliaHandle { record: Record, peers: Vec, update_local_store: bool, - ) -> QueryId { - let query_id = self.next_query_id(); - let _ = self - .cmd_tx + ) -> Result { + let (query_id_tx, query_id_rx) = oneshot::channel(); + self.cmd_tx .send(KademliaCommand::PutRecordToPeers { record, - query_id, + query_id_tx, peers, update_local_store, }) - .await; + .await + .map_err(|_| ())?; - query_id + query_id_rx.await.map_err(|_| ()) } /// Get record from DHT. - pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> QueryId { - let query_id = self.next_query_id(); - let _ = self - .cmd_tx + pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> Result { + let (query_id_tx, query_id_rx) = oneshot::channel(); + self.cmd_tx .send(KademliaCommand::GetRecord { key, quorum, - query_id, + query_id_tx, }) - .await; + .await + .map_err(|_| ())?; - query_id + query_id_rx.await.map_err(|_| ()) } /// Register as a content provider on the DHT. @@ -319,18 +320,18 @@ impl KademliaHandle { &mut self, key: RecordKey, public_addresses: Vec, - ) -> QueryId { - let query_id = self.next_query_id(); - let _ = self - .cmd_tx + ) -> Result { + let (query_id_tx, query_id_rx) = oneshot::channel(); + self.cmd_tx .send(KademliaCommand::StartProviding { key, public_addresses, - query_id, + query_id_tx, }) - .await; + .await + .map_err(|_| ())?; - query_id + query_id_rx.await.map_err(|_| ()) } /// Store the record in the local store. Used in combination with diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index f89ed728..7698bde0 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -166,6 +166,9 @@ pub(crate) struct Kademlia { /// Query executor. executor: QueryExecutor, + + /// Next query ID. + next_query_id: usize, } impl Kademlia { @@ -207,6 +210,7 @@ impl Kademlia { provider_ttl: config.provider_ttl, replication_factor: config.replication_factor, engine: QueryEngine::new(local_peer_id, config.replication_factor, PARALLELISM_FACTOR), + next_query_id: 0usize, } } @@ -938,7 +942,10 @@ impl Kademlia { }, command = self.cmd_rx.recv() => { match command { - Some(KademliaCommand::FindNode { peer, query_id }) => { + Some(KademliaCommand::FindNode { peer, query_id_tx }) => { + let query_id = self.next_query_id(); + let _ = query_id_tx.send(query_id); + tracing::debug!( target: LOG_TARGET, ?peer, @@ -954,7 +961,10 @@ impl Kademlia { .into() ); } - Some(KademliaCommand::PutRecord { mut record, query_id }) => { + Some(KademliaCommand::PutRecord { mut record, query_id_tx }) => { + let query_id = self.next_query_id(); + let _ = query_id_tx.send(query_id); + tracing::debug!( target: LOG_TARGET, query = ?query_id, @@ -982,10 +992,13 @@ impl Kademlia { } Some(KademliaCommand::PutRecordToPeers { mut record, - query_id, + query_id_tx, peers, update_local_store, }) => { + let query_id = self.next_query_id(); + let _ = query_id_tx.send(query_id); + tracing::debug!( target: LOG_TARGET, query = ?query_id, @@ -1025,8 +1038,11 @@ impl Kademlia { Some(KademliaCommand::StartProviding { key, public_addresses, - query_id + query_id_tx }) => { + let query_id = self.next_query_id(); + let _ = query_id_tx.send(query_id); + tracing::debug!( target: LOG_TARGET, query = ?query_id, @@ -1052,7 +1068,10 @@ impl Kademlia { .into(), ); } - Some(KademliaCommand::GetRecord { key, quorum, query_id }) => { + Some(KademliaCommand::GetRecord { key, quorum, query_id_tx }) => { + let query_id = self.next_query_id(); + let _ = query_id_tx.send(query_id); + tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT"); match (self.store.get(&key), quorum) { @@ -1135,6 +1154,14 @@ impl Kademlia { } } } + + /// Allocate next query ID. + fn next_query_id(&mut self) -> QueryId { + let query_id = self.next_query_id; + self.next_query_id += 1; + + QueryId(query_id) + } } #[cfg(test)] From 68c7a87b29d0a5225dd06f0006ce1115023ed63f Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 29 Aug 2024 14:02:55 +0000 Subject: [PATCH 09/16] Republish providers --- src/protocol/libp2p/kademlia/mod.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 7698bde0..c6d306a1 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -1139,15 +1139,15 @@ impl Kademlia { self.store.put_provider(provider.clone()); - - todo!("obtain a query ID and start query"); - // self.engine.start_add_provider( - // query_id, - // provider, - // self.routing_table - // .closest(Key::new(provider.key), self.replication_factor) - // .into(), - // ); + let key = provider.key.clone(); + let query_id = self.next_query_id(); + self.engine.start_add_provider( + query_id, + provider, + self.routing_table + .closest(Key::new(key), self.replication_factor) + .into(), + ); } None => {} } From 5e97484bfc2f54b2477c563aea4b193480d7f90e Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Fri, 30 Aug 2024 14:48:13 +0000 Subject: [PATCH 10/16] Use getter `TransportService::local_peer_id()` instead of accessing directly --- src/protocol/libp2p/kademlia/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index a6135dbc..25976bdd 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -1031,7 +1031,7 @@ impl Kademlia { let provider = ProviderRecord { key: key.clone(), - provider: self.service.local_peer_id, + provider: self.service.local_peer_id(), addresses: public_addresses, expires: Instant::now() + self.provider_ttl, }; From 7caf29002bd085e52b1bfbd6d2cb96ac11dadad8 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Fri, 30 Aug 2024 14:48:13 +0000 Subject: [PATCH 11/16] Use getter `TransportService::local_peer_id()` instead of accessing directly --- src/protocol/libp2p/kademlia/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index edd7905a..973527ad 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -1056,7 +1056,7 @@ impl Kademlia { let provider = ProviderRecord { key: key.clone(), - provider: self.service.local_peer_id, + provider: self.service.local_peer_id(), addresses: public_addresses, expires: Instant::now() + self.provider_ttl, }; From e895a44a2fa2aec28d8f3b0adb7baa15bc7fe20a Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 4 Sep 2024 07:51:11 +0000 Subject: [PATCH 12/16] Revert "Remove `try_get_record()` and other `try_...()` non-async methods" This reverts commit feb493df9c08176a5a544f835ee1436f81c78296. --- src/protocol/libp2p/kademlia/handle.rs | 64 ++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index 7168b127..bf1581f9 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -339,6 +339,70 @@ impl KademliaHandle { pub async fn store_record(&mut self, record: Record) { let _ = self.cmd_tx.send(KademliaCommand::StoreRecord { record }).await; } + + /// Try to add known peer and if the channel is clogged, return an error. + pub fn try_add_known_peer(&self, peer: PeerId, addresses: Vec) -> Result<(), ()> { + self.cmd_tx + .try_send(KademliaCommand::AddKnownPeer { peer, addresses }) + .map_err(|_| ()) + } + + /// Try to initiate `FIND_NODE` query and if the channel is clogged, return an error. + pub fn try_find_node(&mut self, peer: PeerId) -> Result { + let query_id = self.next_query_id(); + self.cmd_tx + .try_send(KademliaCommand::FindNode { peer, query_id }) + .map(|_| query_id) + .map_err(|_| ()) + } + + /// Try to initiate `PUT_VALUE` query and if the channel is clogged, return an error. + pub fn try_put_record(&mut self, record: Record) -> Result { + let query_id = self.next_query_id(); + self.cmd_tx + .try_send(KademliaCommand::PutRecord { record, query_id }) + .map(|_| query_id) + .map_err(|_| ()) + } + + /// Try to initiate `PUT_VALUE` query to the given peers and if the channel is clogged, + /// return an error. + pub fn try_put_record_to_peers( + &mut self, + record: Record, + peers: Vec, + update_local_store: bool, + ) -> Result { + let query_id = self.next_query_id(); + self.cmd_tx + .try_send(KademliaCommand::PutRecordToPeers { + record, + query_id, + peers, + update_local_store, + }) + .map(|_| query_id) + .map_err(|_| ()) + } + + /// Try to initiate `GET_VALUE` query and if the channel is clogged, return an error. + pub fn try_get_record(&mut self, key: RecordKey, quorum: Quorum) -> Result { + let query_id = self.next_query_id(); + self.cmd_tx + .try_send(KademliaCommand::GetRecord { + key, + quorum, + query_id, + }) + .map(|_| query_id) + .map_err(|_| ()) + } + + /// Try to store the record in the local store, and if the channel is clogged, return an error. + /// Used in combination with [`IncomingRecordValidationMode::Manual`]. + pub fn try_store_record(&mut self, record: Record) -> Result<(), ()> { + self.cmd_tx.try_send(KademliaCommand::StoreRecord { record }).map_err(|_| ()) + } } impl Stream for KademliaHandle { From ce18594d3c3466649ed6a3dacf32f1cfb5fb53c4 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 4 Sep 2024 08:01:24 +0000 Subject: [PATCH 13/16] Revert "Move query ID generation from `KademliaHandle` to `Kademlia`" This reverts commit 982c73dcf0b02ba03febe4e892d69411bf41a207. --- src/protocol/libp2p/kademlia/handle.rs | 111 ++++++++++++------------- src/protocol/libp2p/kademlia/mod.rs | 37 ++------- 2 files changed, 60 insertions(+), 88 deletions(-) diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index bf1581f9..f1b4c218 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -25,10 +25,7 @@ use crate::{ use futures::Stream; use multiaddr::Multiaddr; -use tokio::sync::{ - mpsc::{Receiver, Sender}, - oneshot, -}; +use tokio::sync::mpsc::{Receiver, Sender}; use std::{ num::NonZeroUsize, @@ -91,8 +88,8 @@ pub(crate) enum KademliaCommand { /// Peer ID. peer: PeerId, - /// Query ID callback. - query_id_tx: oneshot::Sender, + /// Query ID for the query. + query_id: QueryId, }, /// Store record to DHT. @@ -100,8 +97,8 @@ pub(crate) enum KademliaCommand { /// Record. record: Record, - /// Query ID callback. - query_id_tx: oneshot::Sender, + /// Query ID for the query. + query_id: QueryId, }, /// Store record to DHT to the given peers. @@ -111,8 +108,8 @@ pub(crate) enum KademliaCommand { /// Record. record: Record, - /// Query ID callback. - query_id_tx: oneshot::Sender, + /// Query ID for the query. + query_id: QueryId, /// Use the following peers for the put request. peers: Vec, @@ -129,8 +126,8 @@ pub(crate) enum KademliaCommand { /// [`Quorum`] for the query. quorum: Quorum, - /// Query ID callback. - query_id_tx: oneshot::Sender, + /// Query ID for the query. + query_id: QueryId, }, /// Register as a content provider for `key`. @@ -142,7 +139,7 @@ pub(crate) enum KademliaCommand { public_addresses: Vec, /// Query ID for the query. - query_id_tx: oneshot::Sender, + query_id: QueryId, }, /// Store record locally. @@ -235,12 +232,27 @@ pub struct KademliaHandle { /// RX channel for receiving events from `Kademlia`. event_rx: Receiver, + + /// Next query ID. + next_query_id: usize, } impl KademliaHandle { /// Create new [`KademliaHandle`]. pub(super) fn new(cmd_tx: Sender, event_rx: Receiver) -> Self { - Self { cmd_tx, event_rx } + Self { + cmd_tx, + event_rx, + next_query_id: 0usize, + } + } + + /// Allocate next query ID. + fn next_query_id(&mut self) -> QueryId { + let query_id = self.next_query_id; + self.next_query_id += 1; + + QueryId(query_id) } /// Add known peer. @@ -249,32 +261,19 @@ impl KademliaHandle { } /// Send `FIND_NODE` query to known peers. - /// - /// Returns [`Err`] only if [`super::Kademlia`] is terminating. - pub async fn find_node(&mut self, peer: PeerId) -> Result { - let (query_id_tx, query_id_rx) = oneshot::channel(); - self.cmd_tx - .send(KademliaCommand::FindNode { peer, query_id_tx }) - .await - .map_err(|_| ())?; + pub async fn find_node(&mut self, peer: PeerId) -> QueryId { + let query_id = self.next_query_id(); + let _ = self.cmd_tx.send(KademliaCommand::FindNode { peer, query_id }).await; - query_id_rx.await.map_err(|_| ()) + query_id } /// Store record to DHT. - /// - /// Returns [`Err`] only if [`super::Kademlia`] is terminating. - pub async fn put_record(&mut self, record: Record) -> Result { - let (query_id_tx, query_id_rx) = oneshot::channel(); - self.cmd_tx - .send(KademliaCommand::PutRecord { - record, - query_id_tx, - }) - .await - .map_err(|_| ())?; + pub async fn put_record(&mut self, record: Record) -> QueryId { + let query_id = self.next_query_id(); + let _ = self.cmd_tx.send(KademliaCommand::PutRecord { record, query_id }).await; - query_id_rx.await.map_err(|_| ()) + query_id } /// Store record to DHT to the given peers. @@ -283,34 +282,34 @@ impl KademliaHandle { record: Record, peers: Vec, update_local_store: bool, - ) -> Result { - let (query_id_tx, query_id_rx) = oneshot::channel(); - self.cmd_tx + ) -> QueryId { + let query_id = self.next_query_id(); + let _ = self + .cmd_tx .send(KademliaCommand::PutRecordToPeers { record, - query_id_tx, + query_id, peers, update_local_store, }) - .await - .map_err(|_| ())?; + .await; - query_id_rx.await.map_err(|_| ()) + query_id } /// Get record from DHT. - pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> Result { - let (query_id_tx, query_id_rx) = oneshot::channel(); - self.cmd_tx + pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> QueryId { + let query_id = self.next_query_id(); + let _ = self + .cmd_tx .send(KademliaCommand::GetRecord { key, quorum, - query_id_tx, + query_id, }) - .await - .map_err(|_| ())?; + .await; - query_id_rx.await.map_err(|_| ()) + query_id } /// Register as a content provider on the DHT. @@ -320,18 +319,18 @@ impl KademliaHandle { &mut self, key: RecordKey, public_addresses: Vec, - ) -> Result { - let (query_id_tx, query_id_rx) = oneshot::channel(); - self.cmd_tx + ) -> QueryId { + let query_id = self.next_query_id(); + let _ = self + .cmd_tx .send(KademliaCommand::StartProviding { key, public_addresses, - query_id_tx, + query_id, }) - .await - .map_err(|_| ())?; + .await; - query_id_rx.await.map_err(|_| ()) + query_id } /// Store the record in the local store. Used in combination with diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 973527ad..01a66d7f 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -166,9 +166,6 @@ pub(crate) struct Kademlia { /// Query executor. executor: QueryExecutor, - - /// Next query ID. - next_query_id: usize, } impl Kademlia { @@ -210,7 +207,6 @@ impl Kademlia { provider_ttl: config.provider_ttl, replication_factor: config.replication_factor, engine: QueryEngine::new(local_peer_id, config.replication_factor, PARALLELISM_FACTOR), - next_query_id: 0usize, } } @@ -945,10 +941,7 @@ impl Kademlia { }, command = self.cmd_rx.recv() => { match command { - Some(KademliaCommand::FindNode { peer, query_id_tx }) => { - let query_id = self.next_query_id(); - let _ = query_id_tx.send(query_id); - + Some(KademliaCommand::FindNode { peer, query_id }) => { tracing::debug!( target: LOG_TARGET, ?peer, @@ -964,10 +957,7 @@ impl Kademlia { .into() ); } - Some(KademliaCommand::PutRecord { mut record, query_id_tx }) => { - let query_id = self.next_query_id(); - let _ = query_id_tx.send(query_id); - + Some(KademliaCommand::PutRecord { mut record, query_id }) => { tracing::debug!( target: LOG_TARGET, query = ?query_id, @@ -995,13 +985,10 @@ impl Kademlia { } Some(KademliaCommand::PutRecordToPeers { mut record, - query_id_tx, + query_id, peers, update_local_store, }) => { - let query_id = self.next_query_id(); - let _ = query_id_tx.send(query_id); - tracing::debug!( target: LOG_TARGET, query = ?query_id, @@ -1041,11 +1028,8 @@ impl Kademlia { Some(KademliaCommand::StartProviding { key, public_addresses, - query_id_tx + query_id }) => { - let query_id = self.next_query_id(); - let _ = query_id_tx.send(query_id); - tracing::debug!( target: LOG_TARGET, query = ?query_id, @@ -1071,10 +1055,7 @@ impl Kademlia { .into(), ); } - Some(KademliaCommand::GetRecord { key, quorum, query_id_tx }) => { - let query_id = self.next_query_id(); - let _ = query_id_tx.send(query_id); - + Some(KademliaCommand::GetRecord { key, quorum, query_id }) => { tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT"); match (self.store.get(&key), quorum) { @@ -1157,14 +1138,6 @@ impl Kademlia { } } } - - /// Allocate next query ID. - fn next_query_id(&mut self) -> QueryId { - let query_id = self.next_query_id; - self.next_query_id += 1; - - QueryId(query_id) - } } #[cfg(test)] From 0fcd621b3eb1baba90d8e169dd0efd3384df32a0 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 4 Sep 2024 08:16:59 +0000 Subject: [PATCH 14/16] Use `AtomicUsize` to generate `QueryId` in both `KademliaHandle` and `Kademlia` --- src/protocol/libp2p/kademlia/config.rs | 13 +++++++++++-- src/protocol/libp2p/kademlia/handle.rs | 17 ++++++++++++----- src/protocol/libp2p/kademlia/mod.rs | 17 +++++++++++++++++ 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/src/protocol/libp2p/kademlia/config.rs b/src/protocol/libp2p/kademlia/config.rs index c9e79050..f4fb5f29 100644 --- a/src/protocol/libp2p/kademlia/config.rs +++ b/src/protocol/libp2p/kademlia/config.rs @@ -31,7 +31,11 @@ use crate::{ use multiaddr::Multiaddr; use tokio::sync::mpsc::{channel, Receiver, Sender}; -use std::{collections::HashMap, time::Duration}; +use std::{ + collections::HashMap, + sync::{atomic::AtomicUsize, Arc}, + time::Duration, +}; /// Default TTL for the records. const DEFAULT_TTL: Duration = Duration::from_secs(36 * 60 * 60); @@ -85,6 +89,9 @@ pub struct Config { /// RX channel for receiving commands from `KademliaHandle`. pub(super) cmd_rx: Receiver, + + /// Next query ID counter shared with the handle. + pub(super) next_query_id: Arc, } impl Config { @@ -100,6 +107,7 @@ impl Config { ) -> (Self, KademliaHandle) { let (cmd_tx, cmd_rx) = channel(DEFAULT_CHANNEL_SIZE); let (event_tx, event_rx) = channel(DEFAULT_CHANNEL_SIZE); + let next_query_id = Arc::new(AtomicUsize::new(0usize)); // if no protocol names were provided, use the default protocol if protocol_names.is_empty() { @@ -119,8 +127,9 @@ impl Config { known_peers, cmd_rx, event_tx, + next_query_id: next_query_id.clone(), }, - KademliaHandle::new(cmd_tx, event_rx), + KademliaHandle::new(cmd_tx, event_rx, next_query_id), ) } diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index f1b4c218..bf6ee78c 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -30,6 +30,10 @@ use tokio::sync::mpsc::{Receiver, Sender}; use std::{ num::NonZeroUsize, pin::Pin, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, task::{Context, Poll}, }; @@ -234,23 +238,26 @@ pub struct KademliaHandle { event_rx: Receiver, /// Next query ID. - next_query_id: usize, + next_query_id: Arc, } impl KademliaHandle { /// Create new [`KademliaHandle`]. - pub(super) fn new(cmd_tx: Sender, event_rx: Receiver) -> Self { + pub(super) fn new( + cmd_tx: Sender, + event_rx: Receiver, + next_query_id: Arc, + ) -> Self { Self { cmd_tx, event_rx, - next_query_id: 0usize, + next_query_id, } } /// Allocate next query ID. fn next_query_id(&mut self) -> QueryId { - let query_id = self.next_query_id; - self.next_query_id += 1; + let query_id = self.next_query_id.fetch_add(1, Ordering::Relaxed); QueryId(query_id) } diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 01a66d7f..c25bff5b 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -48,6 +48,10 @@ use tokio::sync::mpsc::{Receiver, Sender}; use std::{ collections::{hash_map::Entry, HashMap}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, time::{Duration, Instant}, }; @@ -134,6 +138,9 @@ pub(crate) struct Kademlia { /// RX channel for receiving commands from `KademliaHandle`. cmd_rx: Receiver, + /// Next query ID. + next_query_id: Arc, + /// Routing table. routing_table: RoutingTable, @@ -195,6 +202,7 @@ impl Kademlia { routing_table, peers: HashMap::new(), cmd_rx: config.cmd_rx, + next_query_id: config.next_query_id, store, event_tx: config.event_tx, local_key, @@ -210,6 +218,13 @@ impl Kademlia { } } + /// Allocate next query ID. + fn next_query_id(&mut self) -> QueryId { + let query_id = self.next_query_id.fetch_add(1, Ordering::Relaxed); + + QueryId(query_id) + } + /// Connection established to remote peer. fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> { tracing::trace!(target: LOG_TARGET, ?peer, "connection established"); @@ -1183,6 +1198,7 @@ mod tests { ); let (event_tx, event_rx) = channel(64); let (_cmd_tx, cmd_rx) = channel(64); + let next_query_id = Arc::new(AtomicUsize::new(0usize)); let config = Config { protocol_names: vec![ProtocolName::from("/kad/1")], @@ -1196,6 +1212,7 @@ mod tests { provider_refresh_interval: Duration::from_secs(22 * 60 * 60), event_tx, cmd_rx, + next_query_id, }; ( From 57f17ce3890d4bda0484fc359efb884d05beb74d Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 5 Sep 2024 13:49:11 +0000 Subject: [PATCH 15/16] Use `open_substream_or_dial()` to add provider records to peers --- src/protocol/libp2p/kademlia/mod.rs | 41 +++++++++-------------------- 1 file changed, 12 insertions(+), 29 deletions(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 25976bdd..3d00c7f8 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -776,37 +776,20 @@ impl Kademlia { let provided_key = provider.key.clone(); let message = KademliaMessage::add_provider(provider); - let peer_action = PeerAction::SendAddProvider(message); for peer in peers { - match self.service.open_substream(peer.peer) { - Ok(substream_id) => { - self.pending_substreams.insert(substream_id, peer.peer); - self.peers - .entry(peer.peer) - .or_default() - .pending_actions - .insert(substream_id, peer_action.clone()); - } - Err(_) => match self.service.dial(&peer.peer) { - Ok(_) => match self.pending_dials.entry(peer.peer) { - Entry::Occupied(entry) => { - entry.into_mut().push(peer_action.clone()); - } - Entry::Vacant(entry) => { - entry.insert(vec![peer_action.clone()]); - } - }, - Err(error) => { - tracing::debug!( - target: LOG_TARGET, - ?peer, - ?provided_key, - ?error, - "failed to dial peer", - ) - } - }, + if let Err(error) = self.open_substream_or_dial( + peer.peer, + PeerAction::SendAddProvider(message.clone()), + None, + ) { + tracing::debug!( + target: LOG_TARGET, + ?peer, + ?provided_key, + ?error, + "failed to add provider record to peer", + ) } } From 48619591293e02faca116221ee3732234dd87aaa Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 9 Sep 2024 16:51:03 +0000 Subject: [PATCH 16/16] Fix refresh when we are the only provider --- src/protocol/libp2p/kademlia/store.rs | 41 ++++++++++++++++++--------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/src/protocol/libp2p/kademlia/store.rs b/src/protocol/libp2p/kademlia/store.rs index e5410f94..74bf7b1d 100644 --- a/src/protocol/libp2p/kademlia/store.rs +++ b/src/protocol/libp2p/kademlia/store.rs @@ -171,6 +171,17 @@ impl MemoryStore { /// /// Returns `true` if the provider was added, `false` otherwise. pub fn put_provider(&mut self, provider_record: ProviderRecord) -> bool { + // Helper to schedule local provider refresh. + let mut schedule_local_provider_refresh = |provider_record: ProviderRecord| { + let key = provider_record.key.clone(); + let refresh_interval = self.config.provider_refresh_interval; + self.local_providers.insert(key.clone(), provider_record); + self.pending_provider_refresh.push(Box::pin(async move { + tokio::time::sleep(refresh_interval).await; + key + })); + }; + // Make sure we have no more than `max_provider_addresses`. let provider_record = { let mut record = provider_record; @@ -183,6 +194,10 @@ impl MemoryStore { match self.provider_keys.entry(provider_record.key.clone()) { Entry::Vacant(entry) => if can_insert_new_key { + if provider_record.provider == self.local_peer_id { + schedule_local_provider_refresh(provider_record.clone()); + } + entry.insert(vec![provider_record]); true @@ -206,8 +221,13 @@ impl MemoryStore { match provider_position { Ok(i) => { + if provider_record.provider == self.local_peer_id { + schedule_local_provider_refresh(provider_record.clone()); + } // Update the provider in place. providers[i] = provider_record.clone(); + + true } Err(i) => { // `Err(i)` contains the insertion point. @@ -221,29 +241,22 @@ impl MemoryStore { existing `max_providers_per_key`", ); - return false; + false } else { if providers.len() == usize::from(self.config.max_providers_per_key) { providers.pop(); } + if provider_record.provider == self.local_peer_id { + schedule_local_provider_refresh(provider_record.clone()); + } + providers.insert(i, provider_record.clone()); + + true } } } - - if provider_record.provider == self.local_peer_id { - // We must make sure to refresh the local provider. - let key = provider_record.key.clone(); - let refresh_interval = self.config.provider_refresh_interval; - self.local_providers.insert(key.clone(), provider_record); - self.pending_provider_refresh.push(Box::pin(async move { - tokio::time::sleep(refresh_interval).await; - key - })); - } - - true } } }