diff --git a/Cargo.lock b/Cargo.lock index 3ff508bc..16b1226a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -448,6 +448,15 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" +[[package]] +name = "convert_case" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -783,6 +792,26 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "enum-display" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02058bb25d8d0605829af88230427dd5cd50661590bd2b09d1baf7c64c417f24" +dependencies = [ + "enum-display-macro", +] + +[[package]] +name = "enum-display-macro" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4be2cf2fe7b971b1865febbacd4d8df544aa6bd377cca011a6d69dcf4c60d94" +dependencies = [ + "convert_case", + "quote", + "syn 1.0.109", +] + [[package]] name = "env_logger" version = "0.8.4" @@ -1887,6 +1916,7 @@ dependencies = [ "bytes", "cid", "ed25519-dalek", + "enum-display", "futures", "futures-timer", "futures_ringbuf", @@ -3974,6 +4004,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" + [[package]] name = "unicode-xid" version = "0.2.6" diff --git a/Cargo.toml b/Cargo.toml index 88e5528d..afa6e4ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,7 @@ str0m = { version = "0.9.0", optional = true } # Fuzzing related dependencies. serde_millis = {version = "0.1", optional = true} +enum-display = "0.1.4" # End of fuzzing related dependencies. [dev-dependencies] diff --git a/src/crypto/mod.rs b/src/crypto/mod.rs index a276f498..f50f77b5 100644 --- a/src/crypto/mod.rs +++ b/src/crypto/mod.rs @@ -94,7 +94,7 @@ impl From for PublicKey { /// The public key of a remote node's identity keypair. Supports RSA keys additionally to ed25519. #[derive(Clone, Debug, PartialEq, Eq)] -pub(crate) enum RemotePublicKey { +pub enum RemotePublicKey { /// A public Ed25519 key. Ed25519(ed25519::PublicKey), /// A public RSA key. diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index 07bca65d..da02d845 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -41,7 +41,7 @@ use std::{ /// /// Quorum defines how many peers must be successfully contacted /// in order for the query to be considered successful. -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] #[cfg_attr(feature = "fuzz", derive(serde::Serialize, serde::Deserialize))] pub enum Quorum { /// All peers must be successfully contacted. @@ -156,6 +156,9 @@ pub enum KademliaCommand { /// Provided key. key: RecordKey, + /// [`Quorum`] for the query. + quorum: Quorum, + /// Query ID for the query. query_id: QueryId, }, @@ -240,6 +243,15 @@ pub enum KademliaEvent { key: RecordKey, }, + /// `ADD_PROVIDER` query succeeded. + AddProviderSuccess { + /// Query ID. + query_id: QueryId, + + /// Provided key. + provided_key: RecordKey, + }, + /// Query failed. QueryFailed { /// Query ID. @@ -373,9 +385,16 @@ impl KademliaHandle { /// /// Register the local peer ID & its `public_addresses` as a provider for a given `key`. /// Returns [`Err`] only if `Kademlia` is terminating. - pub async fn start_providing(&mut self, key: RecordKey) -> QueryId { + pub async fn start_providing(&mut self, key: RecordKey, quorum: Quorum) -> QueryId { let query_id = self.next_query_id(); - let _ = self.cmd_tx.send(KademliaCommand::StartProviding { key, query_id }).await; + let _ = self + .cmd_tx + .send(KademliaCommand::StartProviding { + key, + quorum, + query_id, + }) + .await; query_id } diff --git a/src/protocol/libp2p/kademlia/message.rs b/src/protocol/libp2p/kademlia/message.rs index ff2b8f34..ad1b4d54 100644 --- a/src/protocol/libp2p/kademlia/message.rs +++ b/src/protocol/libp2p/kademlia/message.rs @@ -28,6 +28,7 @@ use crate::{ }; use bytes::{Bytes, BytesMut}; +use enum_display::EnumDisplay; use prost::Message; use std::time::{Duration, Instant}; @@ -35,7 +36,7 @@ use std::time::{Duration, Instant}; const LOG_TARGET: &str = "litep2p::ipfs::kademlia::message"; /// Kademlia message. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, EnumDisplay)] pub enum KademliaMessage { /// `FIND_NODE` message. FindNode { diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 9d561d89..632ed76b 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -929,6 +929,7 @@ impl Kademlia { provided_key, provider, peers, + quorum, } => { tracing::trace!( target: LOG_TARGET, @@ -939,7 +940,7 @@ impl Kademlia { let message = KademliaMessage::add_provider(provided_key.clone(), provider); - for peer in peers { + for peer in &peers { if let Err(error) = self.open_substream_or_dial( peer.peer, PeerAction::SendAddProvider(query, message.clone()), @@ -955,6 +956,28 @@ impl Kademlia { } } + self.engine.start_add_provider_to_found_nodes_requests_tracking( + query, + provided_key, + peers.into_iter().map(|peer| peer.peer).collect(), + quorum, + ); + + Ok(()) + } + QueryAction::AddProviderQuerySucceeded { + query, + provided_key, + } => { + tracing::debug!(target: LOG_TARGET, ?query, "`ADD_PROVIDER` query succeeded"); + + let _ = self + .event_tx + .send(KademliaEvent::AddProviderSuccess { + query_id: query, + provided_key, + }) + .await; Ok(()) } QueryAction::GetRecordQueryDone { query_id } => { @@ -1222,6 +1245,7 @@ impl Kademlia { } Some(KademliaCommand::StartProviding { key, + quorum, query_id }) => { tracing::debug!( @@ -1237,7 +1261,7 @@ impl Kademlia { addresses, }; - self.store.put_provider(key.clone(), provider.clone()); + self.store.put_local_provider(key.clone(), quorum); self.engine.start_add_provider( query_id, @@ -1246,6 +1270,7 @@ impl Kademlia { self.routing_table .closest(&Key::new(key), self.replication_factor) .into(), + quorum, ); } Some(KademliaCommand::StopProviding { @@ -1356,16 +1381,17 @@ impl Kademlia { } }, action = self.store.next_action() => match action { - Some(MemoryStoreAction::RefreshProvider { provided_key, provider }) => { + Some(MemoryStoreAction::RefreshProvider { provided_key, provider, quorum }) => { tracing::trace!( target: LOG_TARGET, ?provided_key, "republishing local provider", ); - self.store.put_provider(provided_key.clone(), provider.clone()); - // We never update local provider addresses in the store when refresh - // it, as this is done anyway when replying to `GET_PROVIDERS` request. + self.store.put_local_provider(provided_key.clone(), quorum); + + // We never update local provider addresses in the store during refresh, + // as this is done anyway when replying to `GET_PROVIDERS` request. let query_id = self.next_query_id(); self.engine.start_add_provider( @@ -1375,6 +1401,7 @@ impl Kademlia { self.routing_table .closest(&Key::new(provided_key), self.replication_factor) .into(), + quorum, ); } None => {} diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index 0e025ade..954a531b 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -88,7 +88,7 @@ enum QueryType { /// `PUT_VALUE` message sending phase. PutRecordToFoundNodes { - /// Context for tracking `PUT_VALUE` responses. + /// Context for tracking `PUT_VALUE` requests. context: PutToTargetPeersContext, }, @@ -106,10 +106,19 @@ enum QueryType { /// Provider record that need to be stored. provider: ContentProvider, + /// [`Quorum`] that needs to be reached for the query to succeed. + quorum: Quorum, + /// Context for the `FIND_NODE` query. context: FindNodeContext, }, + /// `ADD_PROVIDER` message sending phase. + AddProviderToFoundNodes { + /// Context for tracking `ADD_PROVIDER` requests. + context: PutToTargetPeersContext, + }, + /// `GET_PROVIDERS` query. GetProviders { /// Context for the `GET_PROVIDERS` query. @@ -181,6 +190,18 @@ pub enum QueryAction { /// Peers for whom the `ADD_PROVIDER` must be sent to. peers: Vec, + + /// [`Quorum`] that needs to be reached for the query to succeed. + quorum: Quorum, + }, + + /// `ADD_PROVIDER` query succeeded. + AddProviderQuerySucceeded { + /// ID of the query that succeeded. + query: QueryId, + + /// Provided key. + provided_key: RecordKey, }, /// `GET_VALUE` query succeeded. @@ -399,6 +420,7 @@ impl QueryEngine { provided_key: RecordKey, provider: ContentProvider, candidates: VecDeque, + quorum: Quorum, ) -> QueryId { tracing::debug!( target: LOG_TARGET, @@ -421,6 +443,7 @@ impl QueryEngine { QueryType::AddProvider { provided_key, provider, + quorum, context: FindNodeContext::new(config, candidates), }, ); @@ -475,7 +498,7 @@ impl QueryEngine { target: LOG_TARGET, ?query_id, num_peers = ?peers.len(), - "start `PUT_VALUE` responses tracking" + "start `PUT_VALUE` progress tracking" ); self.queries.insert( @@ -486,6 +509,29 @@ impl QueryEngine { ); } + /// Start `ADD_PROVIDER` requests tracking. + pub fn start_add_provider_to_found_nodes_requests_tracking( + &mut self, + query_id: QueryId, + provided_key: RecordKey, + peers: Vec, + quorum: Quorum, + ) { + tracing::debug!( + target: LOG_TARGET, + ?query_id, + num_peers = ?peers.len(), + "start `ADD_PROVIDER` progress tracking" + ); + + self.queries.insert( + query_id, + QueryType::AddProviderToFoundNodes { + context: PutToTargetPeersContext::new(query_id, provided_key, peers, quorum), + }, + ); + } + /// Register response failure from a queried peer. pub fn register_response_failure(&mut self, query: QueryId, peer: PeerId) { tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response failure"); @@ -503,7 +549,7 @@ impl QueryEngine { Some(QueryType::PutRecordToPeers { context, .. }) => { context.register_response_failure(peer); } - Some(QueryType::PutRecordToFoundNodes { context, .. }) => { + Some(QueryType::PutRecordToFoundNodes { context }) => { context.register_response_failure(peer); } Some(QueryType::GetRecord { context }) => { @@ -512,6 +558,9 @@ impl QueryEngine { Some(QueryType::AddProvider { context, .. }) => { context.register_response_failure(peer); } + Some(QueryType::AddProviderToFoundNodes { context }) => { + context.register_response_failure(peer); + } Some(QueryType::GetProviders { context }) => { context.register_response_failure(peer); } @@ -519,12 +568,7 @@ impl QueryEngine { } /// Register that `response` received from `peer`. - pub fn register_response( - &mut self, - query: QueryId, - peer: PeerId, - message: KademliaMessage, - ) -> Option { + pub fn register_response(&mut self, query: QueryId, peer: PeerId, message: KademliaMessage) { tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response"); match self.queries.get_mut(&query) { @@ -535,36 +579,98 @@ impl QueryEngine { KademliaMessage::FindNode { peers, .. } => { context.register_response(peer, peers); } - _ => unreachable!(), + message => { + tracing::debug!( + target: LOG_TARGET, + ?query, + ?peer, + "unexpected response to `FIND_NODE`: {message}", + ); + context.register_response_failure(peer); + } }, Some(QueryType::PutRecord { context, .. }) => match message { KademliaMessage::FindNode { peers, .. } => { context.register_response(peer, peers); } - _ => unreachable!(), + message => { + tracing::debug!( + target: LOG_TARGET, + ?query, + ?peer, + "unexpected response to `FIND_NODE` during `PUT_VALUE` query: {message}", + ); + context.register_response_failure(peer); + } }, Some(QueryType::PutRecordToPeers { context, .. }) => match message { KademliaMessage::FindNode { peers, .. } => { context.register_response(peer, peers); } - _ => unreachable!(), + message => { + tracing::debug!( + target: LOG_TARGET, + ?query, + ?peer, + "unexpected response to `FIND_NODE` during `PUT_VALUE` (to peers): {message}", + ); + context.register_response_failure(peer); + } }, - Some(QueryType::PutRecordToFoundNodes { context, .. }) => match message { + Some(QueryType::PutRecordToFoundNodes { context }) => match message { KademliaMessage::PutValue { .. } => { context.register_response(peer); } - _ => unreachable!(), + message => { + tracing::debug!( + target: LOG_TARGET, + ?query, + ?peer, + "unexpected response to `PUT_VALUE`: {message}", + ); + context.register_response_failure(peer); + } }, Some(QueryType::GetRecord { context }) => match message { KademliaMessage::GetRecord { record, peers, .. } => context.register_response(peer, record, peers), - _ => unreachable!(), + message => { + tracing::debug!( + target: LOG_TARGET, + ?query, + ?peer, + "unexpected response to `GET_VALUE`: {message}", + ); + context.register_response_failure(peer); + } }, Some(QueryType::AddProvider { context, .. }) => match message { KademliaMessage::FindNode { peers, .. } => { context.register_response(peer, peers); } - _ => unreachable!(), + message => { + tracing::debug!( + target: LOG_TARGET, + ?query, + ?peer, + "unexpected response to `FIND_NODE` during `ADD_PROVIDER` query: {message}", + ); + context.register_response_failure(peer); + } + }, + Some(QueryType::AddProviderToFoundNodes { context, .. }) => match message { + KademliaMessage::AddProvider { .. } => { + context.register_response(peer); + } + message => { + tracing::debug!( + target: LOG_TARGET, + ?query, + ?peer, + "unexpected response to `ADD_PROVIDER`: {message}", + ); + context.register_response_failure(peer); + } }, Some(QueryType::GetProviders { context }) => match message { KademliaMessage::GetProviders { @@ -574,11 +680,17 @@ impl QueryEngine { } => { context.register_response(peer, providers, peers); } - _ => unreachable!(), + message => { + tracing::debug!( + target: LOG_TARGET, + ?query, + ?peer, + "unexpected response to `GET_PROVIDERS`: {message}", + ); + context.register_response_failure(peer); + } }, } - - None } pub fn register_send_failure(&mut self, query: QueryId, peer: PeerId) { @@ -597,7 +709,7 @@ impl QueryEngine { Some(QueryType::PutRecordToPeers { context, .. }) => { context.register_send_failure(peer); } - Some(QueryType::PutRecordToFoundNodes { context, .. }) => { + Some(QueryType::PutRecordToFoundNodes { context }) => { context.register_send_failure(peer); } Some(QueryType::GetRecord { context }) => { @@ -606,6 +718,9 @@ impl QueryEngine { Some(QueryType::AddProvider { context, .. }) => { context.register_send_failure(peer); } + Some(QueryType::AddProviderToFoundNodes { context }) => { + context.register_send_failure(peer); + } Some(QueryType::GetProviders { context }) => { context.register_send_failure(peer); } @@ -637,6 +752,9 @@ impl QueryEngine { Some(QueryType::AddProvider { context, .. }) => { context.register_send_success(peer); } + Some(QueryType::AddProviderToFoundNodes { context, .. }) => { + context.register_send_success(peer); + } Some(QueryType::GetProviders { context }) => { context.register_send_success(peer); } @@ -675,6 +793,10 @@ impl QueryEngine { // All `PUT_VALUE` requests were sent when initiating this query type. None } + Some(QueryType::AddProviderToFoundNodes { .. }) => { + // All `ADD_PROVIDER` requests were sent when initiating this query type. + None + } } } @@ -717,13 +839,20 @@ impl QueryEngine { QueryType::AddProvider { provided_key, provider, + quorum, context, } => QueryAction::AddProviderToFoundNodes { query: context.config.query, provided_key, provider, peers: context.responses.into_values().collect::>(), + quorum, }, + QueryType::AddProviderToFoundNodes { context } => + QueryAction::AddProviderQuerySucceeded { + query: context.query, + provided_key: context.key, + }, QueryType::GetProviders { context } => QueryAction::GetProvidersQueryDone { query_id: context.config.query, provided_key: context.config.target.clone().into_preimage(), @@ -751,6 +880,7 @@ impl QueryEngine { QueryType::AddProvider { context, .. } => context.next_action(), QueryType::GetProviders { context } => context.next_action(), QueryType::PutRecordToFoundNodes { context, .. } => context.next_action(), + QueryType::AddProviderToFoundNodes { context, .. } => context.next_action(), }; match action { @@ -1479,4 +1609,537 @@ mod tests { event => panic!("invalid event received {:?}", event), } } + + #[test] + fn add_provider_fails() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let local_peer_id = PeerId::random(); + let mut engine = QueryEngine::new(local_peer_id, 20usize, 3usize); + let original_provided_key = RecordKey::new(&vec![1, 2, 3, 4]); + let local_content_provider = ContentProvider { + peer: local_peer_id, + addresses: vec![], + }; + let target_key = Key::new(original_provided_key.clone()); + + let distances = { + let mut distances = std::collections::BTreeMap::new(); + + for i in 1..64 { + let peer = make_peer_id(i, 0); + let key = Key::from(peer); + + distances.insert(target_key.distance(&key), peer); + } + + distances + }; + let mut iter = distances.iter(); + + // start add provider with one known peer + let original_query_id = QueryId(1340); + let _query = engine.start_add_provider( + original_query_id, + original_provided_key.clone(), + local_content_provider.clone(), + vec![KademliaPeer::new( + *iter.next().unwrap().1, + vec![], + ConnectionType::NotConnected, + )] + .into(), + Quorum::All, + ); + + let action = engine.next_action(); + assert!(engine.next_action().is_none()); + + // the one known peer responds with 3 other peers it knows + match action { + Some(QueryAction::SendMessage { query, peer, .. }) => { + engine.register_response( + query, + peer, + KademliaMessage::FindNode { + target: Vec::new(), + peers: vec![ + KademliaPeer::new( + *iter.next().unwrap().1, + vec![], + ConnectionType::NotConnected, + ), + KademliaPeer::new( + *iter.next().unwrap().1, + vec![], + ConnectionType::NotConnected, + ), + KademliaPeer::new( + *iter.next().unwrap().1, + vec![], + ConnectionType::NotConnected, + ), + ], + }, + ); + } + _ => panic!("invalid event received"), + } + + // send empty response for the last three nodes + for _ in 0..3 { + match engine.next_action() { + Some(QueryAction::SendMessage { query, peer, .. }) => { + println!("next send message to {peer:?}"); + engine.register_response( + query, + peer, + KademliaMessage::FindNode { + target: Vec::new(), + peers: vec![], + }, + ); + } + _ => panic!("invalid event received"), + } + } + + let mut peers = match engine.next_action() { + Some(QueryAction::AddProviderToFoundNodes { + query, + provided_key, + provider, + peers, + quorum, + }) => { + assert_eq!(query, original_query_id); + assert_eq!(provided_key, original_provided_key); + assert_eq!(provider, local_content_provider); + assert_eq!(peers.len(), 4); + assert!(matches!(quorum, Quorum::All)); + + peers + } + _ => panic!("invalid event received"), + }; + + engine.start_add_provider_to_found_nodes_requests_tracking( + original_query_id, + original_provided_key.clone(), + peers.iter().map(|p| p.peer).collect(), + Quorum::All, + ); + + // sends to all but one peer succeed + let last_peer = peers.pop().unwrap(); + for peer in peers { + engine.register_send_success(original_query_id, peer.peer); + } + engine.register_send_failure(original_query_id, last_peer.peer); + + match engine.next_action() { + Some(QueryAction::QueryFailed { query }) => { + assert_eq!(query, original_query_id); + } + _ => panic!("invalid event received"), + } + + assert!(engine.next_action().is_none()); + } + + #[test] + fn add_provider_succeeds() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let local_peer_id = PeerId::random(); + let mut engine = QueryEngine::new(local_peer_id, 20usize, 3usize); + let original_provided_key = RecordKey::new(&vec![1, 2, 3, 4]); + let local_content_provider = ContentProvider { + peer: local_peer_id, + addresses: vec![], + }; + + let target_key = Key::new(original_provided_key.clone()); + let distances = { + let mut distances = std::collections::BTreeMap::new(); + + for i in 1..64 { + let peer = make_peer_id(i, 0); + let key = Key::from(peer); + + distances.insert(target_key.distance(&key), peer); + } + + distances + }; + let mut iter = distances.iter(); + + // start add provider with one known peer + let add_query_id = QueryId(1340); + let _query = engine.start_add_provider( + add_query_id, + original_provided_key.clone(), + local_content_provider.clone(), + vec![KademliaPeer::new( + *iter.next().unwrap().1, + vec![], + ConnectionType::NotConnected, + )] + .into(), + Quorum::All, + ); + + let action = engine.next_action(); + assert!(engine.next_action().is_none()); + + // the one known peer responds with 3 other peers it knows + match action { + Some(QueryAction::SendMessage { query, peer, .. }) => { + engine.register_response( + query, + peer, + KademliaMessage::FindNode { + target: Vec::new(), + peers: vec![ + KademliaPeer::new( + *iter.next().unwrap().1, + vec![], + ConnectionType::NotConnected, + ), + KademliaPeer::new( + *iter.next().unwrap().1, + vec![], + ConnectionType::NotConnected, + ), + KademliaPeer::new( + *iter.next().unwrap().1, + vec![], + ConnectionType::NotConnected, + ), + ], + }, + ); + } + _ => panic!("invalid event received"), + } + + // send empty response for the last three nodes + for _ in 0..3 { + match engine.next_action() { + Some(QueryAction::SendMessage { query, peer, .. }) => { + println!("next send message to {peer:?}"); + engine.register_response( + query, + peer, + KademliaMessage::FindNode { + target: Vec::new(), + peers: vec![], + }, + ); + } + _ => panic!("invalid event received"), + } + } + + let peers = match engine.next_action() { + Some(QueryAction::AddProviderToFoundNodes { + query, + provided_key, + provider, + peers, + quorum, + }) => { + assert_eq!(query, add_query_id); + assert_eq!(provided_key, original_provided_key); + assert_eq!(provider, local_content_provider); + assert_eq!(peers.len(), 4); + assert!(matches!(quorum, Quorum::All)); + + peers + } + _ => panic!("invalid event received"), + }; + + engine.start_add_provider_to_found_nodes_requests_tracking( + add_query_id, + original_provided_key.clone(), + peers.iter().map(|p| p.peer).collect(), + Quorum::All, + ); + + // simulate successful sends to all peers + for peer in &peers { + engine.register_send_success(add_query_id, peer.peer); + } + + match engine.next_action() { + Some(QueryAction::AddProviderQuerySucceeded { + query, + provided_key, + }) => { + assert_eq!(query, add_query_id); + assert_eq!(provided_key, original_provided_key); + } + _ => panic!("invalid event received"), + } + + assert!(engine.next_action().is_none()); + + // get providers from those peers. + let get_query_id = QueryId(1341); + let _query = engine.start_get_providers( + get_query_id, + original_provided_key.clone(), + vec![ + KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected), + KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected), + KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected), + KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected), + ] + .into(), + vec![], + ); + + for _ in 0..4 { + match engine.next_action() { + Some(QueryAction::SendMessage { query, peer, .. }) => { + assert_eq!(query, get_query_id); + engine.register_response( + query, + peer, + KademliaMessage::GetProviders { + key: Some(original_provided_key.clone()), + peers: vec![], + providers: vec![local_content_provider.clone().into()], + }, + ); + } + event => panic!("invalid event received {:?}", event), + } + } + + match engine.next_action() { + Some(QueryAction::GetProvidersQueryDone { + query_id, + provided_key, + providers, + }) => { + assert_eq!(query_id, get_query_id); + assert_eq!(provided_key, original_provided_key); + assert_eq!(providers, vec![local_content_provider]); + } + event => panic!("invalid event received {:?}", event), + } + } + + #[test] + fn add_provider_succeeds_with_quorum_one() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let local_peer_id = PeerId::random(); + let mut engine = QueryEngine::new(local_peer_id, 20usize, 3usize); + let original_provided_key = RecordKey::new(&vec![1, 2, 3, 4]); + let local_content_provider = ContentProvider { + peer: local_peer_id, + addresses: vec![], + }; + + let target_key = Key::new(original_provided_key.clone()); + let distances = { + let mut distances = std::collections::BTreeMap::new(); + + for i in 1..64 { + let peer = make_peer_id(i, 0); + let key = Key::from(peer); + + distances.insert(target_key.distance(&key), peer); + } + + distances + }; + let mut iter = distances.iter(); + + // start add provider with one known peer + let add_query_id = QueryId(1340); + let _query = engine.start_add_provider( + add_query_id, + original_provided_key.clone(), + local_content_provider.clone(), + vec![KademliaPeer::new( + *iter.next().unwrap().1, + vec![], + ConnectionType::NotConnected, + )] + .into(), + Quorum::One, + ); + + let action = engine.next_action(); + assert!(engine.next_action().is_none()); + + // the one known peer responds with 3 other peers it knows + match action { + Some(QueryAction::SendMessage { query, peer, .. }) => { + engine.register_response( + query, + peer, + KademliaMessage::FindNode { + target: Vec::new(), + peers: vec![ + KademliaPeer::new( + *iter.next().unwrap().1, + vec![], + ConnectionType::NotConnected, + ), + KademliaPeer::new( + *iter.next().unwrap().1, + vec![], + ConnectionType::NotConnected, + ), + KademliaPeer::new( + *iter.next().unwrap().1, + vec![], + ConnectionType::NotConnected, + ), + ], + }, + ); + } + _ => panic!("invalid event received"), + } + + // send empty response for the last three nodes + for _ in 0..3 { + match engine.next_action() { + Some(QueryAction::SendMessage { query, peer, .. }) => { + println!("next send message to {peer:?}"); + engine.register_response( + query, + peer, + KademliaMessage::FindNode { + target: Vec::new(), + peers: vec![], + }, + ); + } + _ => panic!("invalid event received"), + } + } + + let peers = match engine.next_action() { + Some(QueryAction::AddProviderToFoundNodes { + query, + provided_key, + provider, + peers, + quorum, + }) => { + assert_eq!(query, add_query_id); + assert_eq!(provided_key, original_provided_key); + assert_eq!(provider, local_content_provider); + assert_eq!(peers.len(), 4); + assert!(matches!(quorum, Quorum::One)); + + peers + } + _ => panic!("invalid event received"), + }; + + engine.start_add_provider_to_found_nodes_requests_tracking( + add_query_id, + original_provided_key.clone(), + peers.iter().map(|p| p.peer).collect(), + Quorum::One, + ); + + // all but one peer fail + assert!(peers.len() > 1); + engine.register_send_success(add_query_id, peers.first().unwrap().peer); + for peer in peers.iter().skip(1) { + engine.register_send_failure(add_query_id, peer.peer); + } + + match engine.next_action() { + Some(QueryAction::AddProviderQuerySucceeded { + query, + provided_key, + }) => { + assert_eq!(query, add_query_id); + assert_eq!(provided_key, original_provided_key); + } + _ => panic!("invalid event received"), + } + + assert!(engine.next_action().is_none()); + + // get providers from those peers. + let get_query_id = QueryId(1341); + let _query = engine.start_get_providers( + get_query_id, + original_provided_key.clone(), + vec![ + KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected), + KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected), + KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected), + KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected), + ] + .into(), + vec![], + ); + + // first peer responds with the provider + match engine.next_action() { + Some(QueryAction::SendMessage { query, peer, .. }) => { + assert_eq!(query, get_query_id); + engine.register_response( + query, + peer, + KademliaMessage::GetProviders { + key: Some(original_provided_key.clone()), + peers: vec![], + providers: vec![local_content_provider.clone().into()], + }, + ); + } + event => panic!("invalid event received {:?}", event), + } + + // other peers respond with no providers + for _ in 1..4 { + match engine.next_action() { + Some(QueryAction::SendMessage { query, peer, .. }) => { + assert_eq!(query, get_query_id); + engine.register_response( + query, + peer, + KademliaMessage::GetProviders { + key: Some(original_provided_key.clone()), + peers: vec![], + providers: vec![], + }, + ); + } + event => panic!("invalid event received {:?}", event), + } + } + + match engine.next_action() { + Some(QueryAction::GetProvidersQueryDone { + query_id, + provided_key, + providers, + }) => { + assert_eq!(query_id, get_query_id); + assert_eq!(provided_key, original_provided_key); + assert_eq!(providers, vec![local_content_provider]); + } + event => panic!("invalid event received {:?}", event), + } + } } diff --git a/src/protocol/libp2p/kademlia/store.rs b/src/protocol/libp2p/kademlia/store.rs index e5af771e..f6491493 100644 --- a/src/protocol/libp2p/kademlia/store.rs +++ b/src/protocol/libp2p/kademlia/store.rs @@ -1,5 +1,3 @@ -// Copyright 2023 litep2p developers -// // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation @@ -29,6 +27,7 @@ use crate::{ }, record::{ContentProvider, Key, ProviderRecord, Record}, types::Key as KademliaKey, + Quorum, }, utils::futures_stream::FuturesStream, PeerId, @@ -49,6 +48,7 @@ pub enum MemoryStoreAction { RefreshProvider { provided_key: Key, provider: ContentProvider, + quorum: Quorum, }, } @@ -63,7 +63,7 @@ pub struct MemoryStore { /// Provider records. provider_keys: HashMap>, /// Local providers. - local_providers: HashMap, + local_providers: HashMap, /// Futures to signal it's time to republish a local provider. pending_provider_refresh: FuturesStream>, } @@ -187,24 +187,9 @@ impl MemoryStore { /// the furthest already inserted provider. The furthest provider is then discarded. /// /// Returns `true` if the provider was added, `false` otherwise. + /// + /// `quorum` is only relevant for local providers. pub fn put_provider(&mut self, key: Key, provider: ContentProvider) -> bool { - // Helper to schedule local provider refresh. - let mut schedule_local_provider_refresh = |provider_record: ProviderRecord| { - let key = provider_record.key.clone(); - let refresh_interval = self.config.provider_refresh_interval; - self.local_providers.insert( - key.clone(), - ContentProvider { - peer: provider_record.provider, - addresses: provider_record.addresses, - }, - ); - self.pending_provider_refresh.push(Box::pin(async move { - tokio::time::sleep(refresh_interval).await; - key - })); - }; - // Make sure we have no more than `max_provider_addresses`. let provider_record = { let mut record = ProviderRecord { @@ -222,10 +207,6 @@ impl MemoryStore { match self.provider_keys.entry(provider_record.key.clone()) { Entry::Vacant(entry) => if can_insert_new_key { - if provider_record.provider == self.local_peer_id { - schedule_local_provider_refresh(provider_record.clone()); - } - entry.insert(vec![provider_record]); true @@ -249,9 +230,6 @@ impl MemoryStore { match provider_position { Ok(i) => { - if provider_record.provider == self.local_peer_id { - schedule_local_provider_refresh(provider_record.clone()); - } // Update the provider in place. providers[i] = provider_record.clone(); @@ -275,10 +253,6 @@ impl MemoryStore { providers.pop(); } - if provider_record.provider == self.local_peer_id { - schedule_local_provider_refresh(provider_record.clone()); - } - providers.insert(i, provider_record.clone()); true @@ -289,6 +263,31 @@ impl MemoryStore { } } + /// Try to add ourself as a provider for `key`. + /// + /// Returns `true` if the provider was added, `false` otherwise. + pub fn put_local_provider(&mut self, key: Key, quorum: Quorum) -> bool { + let provider = ContentProvider { + peer: self.local_peer_id, + // For local providers addresses are populated when replying to `GET_PROVIDERS` + // requests. + addresses: vec![], + }; + + if self.put_provider(key.clone(), provider.clone()) { + let refresh_interval = self.config.provider_refresh_interval; + self.local_providers.insert(key.clone(), (provider, quorum)); + self.pending_provider_refresh.push(Box::pin(async move { + tokio::time::sleep(refresh_interval).await; + key + })); + + true + } else { + false + } + } + /// Remove local provider for `key`. pub fn remove_local_provider(&mut self, key: Key) { if self.local_providers.remove(&key).is_none() { @@ -332,7 +331,7 @@ impl MemoryStore { pub async fn next_action(&mut self) -> Option { // [`FuturesStream`] never terminates, so `and_then()` below is always triggered. self.pending_provider_refresh.next().await.and_then(|key| { - if let Some(provider) = self.local_providers.get(&key).cloned() { + if let Some((provider, quorum)) = self.local_providers.get(&key).cloned() { tracing::trace!( target: LOG_TARGET, ?key, @@ -342,6 +341,7 @@ impl MemoryStore { Some(MemoryStoreAction::RefreshProvider { provided_key: key, provider, + quorum, }) } else { tracing::trace!( @@ -851,15 +851,19 @@ mod tests { let key = Key::from(vec![1, 2, 3]); let local_provider = ContentProvider { peer: local_peer_id, - addresses: vec![multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10001u16))], + addresses: vec![], }; + let quorum = Quorum::All; assert!(store.local_providers.is_empty()); assert_eq!(store.pending_provider_refresh.len(), 0); - assert!(store.put_provider(key.clone(), local_provider.clone())); + assert!(store.put_local_provider(key.clone(), quorum)); - assert_eq!(store.local_providers.get(&key), Some(&local_provider),); + assert_eq!( + store.local_providers.get(&key), + Some(&(local_provider, quorum)), + ); assert_eq!(store.pending_provider_refresh.len(), 1); } @@ -878,21 +882,25 @@ mod tests { let local_provider = ContentProvider { peer: local_peer_id, - addresses: vec![multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10001u16))], + addresses: vec![], }; + let quorum = Quorum::N(5.try_into().unwrap()); assert!(store.local_providers.is_empty()); assert_eq!(store.pending_provider_refresh.len(), 0); assert!(store.put_provider(key.clone(), remote_provider.clone())); - assert!(store.put_provider(key.clone(), local_provider.clone())); + assert!(store.put_local_provider(key.clone(), quorum)); let got_providers = store.get_providers(&key); assert_eq!(got_providers.len(), 2); assert!(got_providers.contains(&remote_provider)); assert!(got_providers.contains(&local_provider)); - assert_eq!(store.local_providers.get(&key), Some(&local_provider),); + assert_eq!( + store.local_providers.get(&key), + Some(&(local_provider, quorum)) + ); assert_eq!(store.pending_provider_refresh.len(), 1); } @@ -904,14 +912,18 @@ mod tests { let key = Key::from(vec![1, 2, 3]); let local_provider = ContentProvider { peer: local_peer_id, - addresses: vec![multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10001u16))], + addresses: vec![], }; + let quorum = Quorum::One; assert!(store.local_providers.is_empty()); - assert!(store.put_provider(key.clone(), local_provider.clone())); + assert!(store.put_local_provider(key.clone(), quorum)); - assert_eq!(store.local_providers.get(&key), Some(&local_provider),); + assert_eq!( + store.local_providers.get(&key), + Some(&(local_provider, quorum)) + ); store.remove_local_provider(key.clone()); @@ -934,18 +946,22 @@ mod tests { let local_provider = ContentProvider { peer: local_peer_id, - addresses: vec![multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10001u16))], + addresses: vec![], }; + let quorum = Quorum::One; assert!(store.put_provider(key.clone(), remote_provider.clone())); - assert!(store.put_provider(key.clone(), local_provider.clone())); + assert!(store.put_local_provider(key.clone(), quorum)); let got_providers = store.get_providers(&key); assert_eq!(got_providers.len(), 2); assert!(got_providers.contains(&remote_provider)); assert!(got_providers.contains(&local_provider)); - assert_eq!(store.local_providers.get(&key), Some(&local_provider),); + assert_eq!( + store.local_providers.get(&key), + Some(&(local_provider, quorum)) + ); store.remove_local_provider(key.clone()); @@ -967,13 +983,17 @@ mod tests { let key = Key::from(vec![1, 2, 3]); let local_provider = ContentProvider { peer: local_peer_id, - addresses: vec![multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10001u16))], + addresses: vec![], }; + let quorum = Quorum::One; - assert!(store.put_provider(key.clone(), local_provider.clone())); + assert!(store.put_local_provider(key.clone(), quorum)); assert_eq!(store.get_providers(&key), vec![local_provider.clone()]); - assert_eq!(store.local_providers.get(&key), Some(&local_provider)); + assert_eq!( + store.local_providers.get(&key), + Some(&(local_provider.clone(), quorum)) + ); // No actions are instantly generated. assert!(matches!( @@ -987,7 +1007,8 @@ mod tests { .unwrap(), Some(MemoryStoreAction::RefreshProvider { provided_key: key, - provider: local_provider + provider: local_provider, + quorum, }), ); } @@ -1013,18 +1034,22 @@ mod tests { let local_provider = ContentProvider { peer: local_peer_id, - addresses: vec![multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10001u16))], + addresses: vec![], }; + let quorum = Quorum::One; assert!(store.put_provider(key.clone(), remote_provider.clone())); - assert!(store.put_provider(key.clone(), local_provider.clone())); + assert!(store.put_local_provider(key.clone(), quorum)); let got_providers = store.get_providers(&key); assert_eq!(got_providers.len(), 2); assert!(got_providers.contains(&remote_provider)); assert!(got_providers.contains(&local_provider)); - assert_eq!(store.local_providers.get(&key), Some(&local_provider)); + assert_eq!( + store.local_providers.get(&key), + Some(&(local_provider.clone(), quorum)) + ); // No actions are instantly generated. assert!(matches!( @@ -1038,7 +1063,8 @@ mod tests { .unwrap(), Some(MemoryStoreAction::RefreshProvider { provided_key: key, - provider: local_provider + provider: local_provider, + quorum, }), ); } @@ -1057,13 +1083,17 @@ mod tests { let key = Key::from(vec![1, 2, 3]); let local_provider = ContentProvider { peer: local_peer_id, - addresses: vec![multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10001u16))], + addresses: vec![], }; + let quorum = Quorum::One; - assert!(store.put_provider(key.clone(), local_provider.clone())); + assert!(store.put_local_provider(key.clone(), quorum)); assert_eq!(store.get_providers(&key), vec![local_provider.clone()]); - assert_eq!(store.local_providers.get(&key), Some(&local_provider)); + assert_eq!( + store.local_providers.get(&key), + Some(&(local_provider, quorum)) + ); store.remove_local_provider(key); diff --git a/tests/conformance/rust/kademlia.rs b/tests/conformance/rust/kademlia.rs index 2b01a44d..1a11bb06 100644 --- a/tests/conformance/rust/kademlia.rs +++ b/tests/conformance/rust/kademlia.rs @@ -439,7 +439,7 @@ async fn litep2p_add_provider_to_libp2p() { let litep2p_peer_id = PeerId::from_bytes(&litep2p.local_peer_id().to_bytes()).unwrap(); let key = vec![1u8, 2u8, 3u8]; - litep2p_kad.start_providing(RecordKey::new(&key)).await; + litep2p_kad.start_providing(RecordKey::new(&key), Quorum::All).await; loop { tokio::select! { @@ -606,7 +606,7 @@ async fn libp2p_get_providers_from_litep2p() { // Store provider locally in litep2p. let original_key = vec![1u8, 2u8, 3u8]; - litep2p_kad.start_providing(original_key.clone().into()).await; + litep2p_kad.start_providing(original_key.clone().into(), Quorum::All).await; // Drive litep2p a little bit to make sure the provider record is stored and no `ADD_PROVIDER` // requests are generated (because no peers are know yet). diff --git a/tests/protocol/kademlia.rs b/tests/protocol/kademlia.rs index c0d6b3a9..48141f17 100644 --- a/tests/protocol/kademlia.rs +++ b/tests/protocol/kademlia.rs @@ -680,7 +680,7 @@ async fn provider_retrieved_by_remote_node() { // Store provider locally. let key = RecordKey::new(&vec![1, 2, 3]); - kad_handle1.start_providing(key.clone()).await; + let query0 = kad_handle1.start_providing(key.clone(), Quorum::All).await; // This is the expected provider. let expected_provider = ContentProvider { @@ -688,8 +688,7 @@ async fn provider_retrieved_by_remote_node() { addresses: vec![peer1_public_address], }; - // This request to get rpovider should fail because the nodes are not connected. - let query1 = kad_handle2.get_providers(key.clone()).await; + let mut query1 = None; let mut query2 = None; loop { @@ -699,12 +698,20 @@ async fn provider_retrieved_by_remote_node() { } event = litep2p1.next_event() => {} event = litep2p2.next_event() => {} - event = kad_handle1.next() => {} + event = kad_handle1.next() => { + if let Some(KademliaEvent::QueryFailed { query_id }) = event { + // Publishing the provider failed, because the nodes are not connected. + assert_eq!(query_id, query0); + // This request to get provider should fail because the nodes are still + // not connected. + query1 = Some(kad_handle2.get_providers(key.clone()).await); + } + } event = kad_handle2.next() => { match event { Some(KademliaEvent::QueryFailed { query_id }) => { // Query failed, because the nodes don't know about each other yet. - assert_eq!(query_id, query1); + assert_eq!(Some(query_id), query1); // Let the node know about `litep2p1`. kad_handle2 @@ -782,7 +789,9 @@ async fn provider_added_to_remote_node() { // Start provodong. let key = RecordKey::new(&vec![1, 2, 3]); - kad_handle1.start_providing(key.clone()).await; + let query = kad_handle1.start_providing(key.clone(), Quorum::All).await; + let mut add_provider_success = false; + let mut incoming_provider = false; // This is the expected provider. let expected_provider = ContentProvider { @@ -797,12 +806,24 @@ async fn provider_added_to_remote_node() { } event = litep2p1.next_event() => {} event = litep2p2.next_event() => {} - event = kad_handle1.next() => {} + event = kad_handle1.next() => { + if let Some(KademliaEvent::AddProviderSuccess { query_id, provided_key }) = event { + assert_eq!(query_id, query); + assert_eq!(provided_key, key); + add_provider_success = true; + if incoming_provider { + break + } + } + } event = kad_handle2.next() => { if let Some(KademliaEvent::IncomingProvider { provided_key, provider }) = event { assert_eq!(provided_key, key); assert_eq!(provider, expected_provider); - break + incoming_provider = true; + if add_provider_success { + break + } } } }