diff --git a/src/protocol/libp2p/kademlia/executor.rs b/src/protocol/libp2p/kademlia/executor.rs index 28f55947..65b9f68c 100644 --- a/src/protocol/libp2p/kademlia/executor.rs +++ b/src/protocol/libp2p/kademlia/executor.rs @@ -37,15 +37,33 @@ const READ_TIMEOUT: Duration = Duration::from_secs(15); /// Write timeout for outbound messages. const WRITE_TIMEOUT: Duration = Duration::from_secs(15); +/// Faulure reason. +#[derive(Debug)] +pub enum FailureReason { + /// Substream was closed while reading/writing message to remote peer. + SubstreamClosed, + + /// Timeout while reading/writing to substream. + Timeout, +} + /// Query result. #[derive(Debug)] pub enum QueryResult { /// Message was sent to remote peer successfully. + /// This result is only reported for send-only queries. Queries that include reading a + /// response won't report it and will only yield a [`QueryResult::ReadSuccess`]. SendSuccess { /// Substream. substream: Substream, }, + /// Failed to send message to remote peer. + SendFailure { + /// Failure reason. + reason: FailureReason, + }, + /// Message was read from the remote peer successfully. ReadSuccess { /// Substream. @@ -55,11 +73,16 @@ pub enum QueryResult { message: BytesMut, }, - /// Timeout while reading a response from the substream. - Timeout, + /// Failed to read message from remote peer. + ReadFailure { + /// Failure reason. + reason: FailureReason, + }, - /// Substream was closed wile reading/writing message to remote peer. - SubstreamClosed, + /// Result that must be treated as send success. This is needed as a workaround to support + /// older litep2p nodes not sending `PUT_VALUE` ACK messages and not reading them. + // TODO: remove this as part of https://github.com/paritytech/litep2p/issues/429. + AssumeSendSuccess, } /// Query result. @@ -90,24 +113,69 @@ impl QueryExecutor { } /// Send message to remote peer. - pub fn send_message(&mut self, peer: PeerId, message: Bytes, mut substream: Substream) { + pub fn send_message( + &mut self, + peer: PeerId, + query_id: Option, + message: Bytes, + mut substream: Substream, + ) { self.futures.push(Box::pin(async move { match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await { // Timeout error. Err(_) => QueryContext { peer, - query_id: None, - result: QueryResult::Timeout, + query_id, + result: QueryResult::SendFailure { + reason: FailureReason::Timeout, + }, }, // Writing message to substream failed. Ok(Err(_)) => QueryContext { peer, - query_id: None, - result: QueryResult::SubstreamClosed, + query_id, + result: QueryResult::SendFailure { + reason: FailureReason::SubstreamClosed, + }, }, Ok(Ok(())) => QueryContext { peer, - query_id: None, + query_id, + result: QueryResult::SendSuccess { substream }, + }, + } + })); + } + + /// Send message and ignore sending errors. + /// + /// This is a hackish way of dealing with older litep2p nodes not expecting receiving + /// `PUT_VALUE` ACK messages. This should eventually be removed. + // TODO: remove this as part of https://github.com/paritytech/litep2p/issues/429. + pub fn send_message_eat_failure( + &mut self, + peer: PeerId, + query_id: Option, + message: Bytes, + mut substream: Substream, + ) { + self.futures.push(Box::pin(async move { + match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await { + // Timeout error. + Err(_) => QueryContext { + peer, + query_id, + result: QueryResult::AssumeSendSuccess, + }, + // Writing message to substream failed. + Ok(Err(_)) => QueryContext { + peer, + query_id, + result: QueryResult::AssumeSendSuccess, + }, + Ok(Ok(())) => QueryContext { + peer, + query_id, result: QueryResult::SendSuccess { substream }, }, } @@ -126,7 +194,9 @@ impl QueryExecutor { Err(_) => QueryContext { peer, query_id, - result: QueryResult::Timeout, + result: QueryResult::ReadFailure { + reason: FailureReason::Timeout, + }, }, Ok(Some(Ok(message))) => QueryContext { peer, @@ -136,7 +206,9 @@ impl QueryExecutor { Ok(None) | Ok(Some(Err(_))) => QueryContext { peer, query_id, - result: QueryResult::SubstreamClosed, + result: QueryResult::ReadFailure { + reason: FailureReason::SubstreamClosed, + }, }, } })); @@ -157,7 +229,9 @@ impl QueryExecutor { return QueryContext { peer, query_id, - result: QueryResult::Timeout, + result: QueryResult::SendFailure { + reason: FailureReason::Timeout, + }, }, // Writing message to substream failed. Ok(Err(_)) => { @@ -165,9 +239,12 @@ impl QueryExecutor { return QueryContext { peer, query_id, - result: QueryResult::SubstreamClosed, + result: QueryResult::SendFailure { + reason: FailureReason::SubstreamClosed, + }, }; } + // This will result in either `SendAndReadSuccess` or `SendSuccessReadFailure`. Ok(Ok(())) => (), }; @@ -175,7 +252,9 @@ impl QueryExecutor { Err(_) => QueryContext { peer, query_id, - result: QueryResult::Timeout, + result: QueryResult::ReadFailure { + reason: FailureReason::Timeout, + }, }, Ok(Some(Ok(message))) => QueryContext { peer, @@ -185,11 +264,70 @@ impl QueryExecutor { Ok(None) | Ok(Some(Err(_))) => QueryContext { peer, query_id, - result: QueryResult::SubstreamClosed, + result: QueryResult::ReadFailure { + reason: FailureReason::SubstreamClosed, + }, }, } })); } + + /// Send request to remote peer and read the response, ignoring it and any read errors. + /// + /// This is a hackish way of dealing with older litep2p nodes not sending `PUT_VALUE` ACK + /// messages. This should eventually be removed. + // TODO: remove this as part of https://github.com/paritytech/litep2p/issues/429. + pub fn send_request_eat_response_failure( + &mut self, + peer: PeerId, + query_id: Option, + message: Bytes, + mut substream: Substream, + ) { + self.futures.push(Box::pin(async move { + match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await { + // Timeout error. + Err(_) => + return QueryContext { + peer, + query_id, + result: QueryResult::SendFailure { + reason: FailureReason::Timeout, + }, + }, + // Writing message to substream failed. + Ok(Err(_)) => { + let _ = substream.close().await; + return QueryContext { + peer, + query_id, + result: QueryResult::SendFailure { + reason: FailureReason::SubstreamClosed, + }, + }; + } + // This will result in either `SendAndReadSuccess` or `SendSuccessReadFailure`. + Ok(Ok(())) => (), + }; + + // Ignore the read result (including errors). + if let Ok(Some(Ok(message))) = + tokio::time::timeout(READ_TIMEOUT, substream.next()).await + { + QueryContext { + peer, + query_id, + result: QueryResult::ReadSuccess { substream, message }, + } + } else { + QueryContext { + peer, + query_id, + result: QueryResult::AssumeSendSuccess, + } + } + })); + } } impl Stream for QueryExecutor { @@ -223,7 +361,12 @@ mod tests { })) => { assert_eq!(peer, queried_peer); assert!(query_id.is_none()); - assert!(std::matches!(result, QueryResult::Timeout)); + assert!(std::matches!( + result, + QueryResult::ReadFailure { + reason: FailureReason::Timeout + } + )); } result => panic!("invalid result received: {result:?}"), } @@ -252,7 +395,12 @@ mod tests { })) => { assert_eq!(peer, queried_peer); assert_eq!(query_id, Some(QueryId(1338))); - assert!(std::matches!(result, QueryResult::SubstreamClosed)); + assert!(std::matches!( + result, + QueryResult::ReadFailure { + reason: FailureReason::SubstreamClosed + } + )); } result => panic!("invalid result received: {result:?}"), } @@ -287,7 +435,12 @@ mod tests { })) => { assert_eq!(peer, queried_peer); assert_eq!(query_id, Some(QueryId(1337))); - assert!(std::matches!(result, QueryResult::SubstreamClosed)); + assert!(std::matches!( + result, + QueryResult::ReadFailure { + reason: FailureReason::SubstreamClosed + } + )); } result => panic!("invalid result received: {result:?}"), } @@ -321,7 +474,12 @@ mod tests { })) => { assert_eq!(peer, queried_peer); assert_eq!(query_id, Some(QueryId(1337))); - assert!(std::matches!(result, QueryResult::SubstreamClosed)); + assert!(std::matches!( + result, + QueryResult::SendFailure { + reason: FailureReason::SubstreamClosed + } + )); } result => panic!("invalid result received: {result:?}"), } @@ -350,7 +508,12 @@ mod tests { })) => { assert_eq!(peer, queried_peer); assert_eq!(query_id, Some(QueryId(1336))); - assert!(std::matches!(result, QueryResult::Timeout)); + assert!(std::matches!( + result, + QueryResult::ReadFailure { + reason: FailureReason::Timeout + } + )); } result => panic!("invalid result received: {result:?}"), } @@ -382,7 +545,12 @@ mod tests { })) => { assert_eq!(peer, queried_peer); assert_eq!(query_id, Some(QueryId(1335))); - assert!(std::matches!(result, QueryResult::SubstreamClosed)); + assert!(std::matches!( + result, + QueryResult::ReadFailure { + reason: FailureReason::SubstreamClosed + } + )); } result => panic!("invalid result received: {result:?}"), } diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index cb6ab67e..43dafd91 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -89,14 +89,25 @@ mod schema { #[derive(Debug, Clone)] #[allow(clippy::enum_variant_names)] enum PeerAction { - /// Send `FIND_NODE` message to peer. + /// Find nodes (and values/providers) as part of `FIND_NODE`/`GET_VALUE`/`GET_PROVIDERS` query. + // TODO: may be a better naming would be `SendFindRequest`? SendFindNode(QueryId), /// Send `PUT_VALUE` message to peer. SendPutValue(QueryId, Bytes), /// Send `ADD_PROVIDER` message to peer. - SendAddProvider(Bytes), + SendAddProvider(QueryId, Bytes), +} + +impl PeerAction { + fn query_id(&self) -> QueryId { + match self { + PeerAction::SendFindNode(query_id) => *query_id, + PeerAction::SendPutValue(query_id, _) => *query_id, + PeerAction::SendAddProvider(query_id, _) => *query_id, + } + } } /// Peer context. @@ -252,6 +263,7 @@ impl Kademlia { ); if let PeerAction::SendFindNode(query_id) = action { + self.engine.register_send_failure(query_id, peer); self.engine.register_response_failure(query_id, peer); } } @@ -291,13 +303,19 @@ impl Kademlia { tracing::trace!(target: LOG_TARGET, ?peer, ?query, "disconnect peer"); if let Some(query) = query { - self.engine.register_response_failure(query, peer); + self.engine.register_peer_failure(query, peer); } + // Apart from the failing query, we need to fail all other pending queries for the peer + // being disconnected. if let Some(PeerContext { pending_actions }) = self.peers.remove(&peer) { pending_actions.into_iter().for_each(|(_, action)| { - if let PeerAction::SendFindNode(query_id) = action { - self.engine.register_response_failure(query_id, peer); + // Don't report failure twice for the same `query_id` if it was already reported + // above. (We can still have other pending queries for the peer that + // need to be reported.) + let query_id = action.query_id(); + if Some(query_id) != query { + self.engine.register_peer_failure(query_id, peer); } }); } @@ -371,12 +389,19 @@ impl Kademlia { Some(PeerAction::SendPutValue(query, message)) => { tracing::trace!(target: LOG_TARGET, ?peer, "send `PUT_VALUE` message"); - self.executor.send_request_read_response(peer, Some(query), message, substream); + self.executor.send_request_eat_response_failure( + peer, + Some(query), + message, + substream, + ); + // TODO: replace this with `send_request_read_response` as part of + // https://github.com/paritytech/litep2p/issues/429. } - Some(PeerAction::SendAddProvider(message)) => { + Some(PeerAction::SendAddProvider(query, message)) => { tracing::trace!(target: LOG_TARGET, ?peer, "send `ADD_PROVIDER` message"); - self.executor.send_message(peer, message, substream); + self.executor.send_message(peer, Some(query), message, substream); } } @@ -454,6 +479,7 @@ impl Kademlia { peer, KademliaMessage::FindNode { target, peers }, ); + substream.close().await; } None => { tracing::trace!( @@ -468,7 +494,7 @@ impl Kademlia { self.routing_table .closest(&Key::new(target.as_ref()), self.replication_factor), ); - self.executor.send_message(peer, message.into(), substream); + self.executor.send_message(peer, None, message.into(), substream); } } } @@ -487,6 +513,7 @@ impl Kademlia { peer, KademliaMessage::PutValue { record }, ); + substream.close().await; } None => { tracing::trace!( @@ -506,7 +533,9 @@ impl Kademlia { record.key.clone(), record.value.clone(), ); - self.executor.send_message(peer, message, substream); + self.executor.send_message_eat_failure(peer, None, message, substream); + // TODO: replace this with `send_message` as part of + // https://github.com/paritytech/litep2p/issues/429. let _ = self.event_tx.send(KademliaEvent::IncomingRecord { record }).await; } @@ -531,6 +560,8 @@ impl Kademlia { peer, KademliaMessage::GetRecord { key, record, peers }, ); + + substream.close().await; } (None, Some(key)) => { tracing::trace!( @@ -547,7 +578,7 @@ impl Kademlia { let message = KademliaMessage::get_value_response(key, closest_peers, value); - self.executor.send_message(peer, message.into(), substream); + self.executor.send_message(peer, None, message.into(), substream); } (None, None) => tracing::debug!( target: LOG_TARGET, @@ -639,6 +670,8 @@ impl Kademlia { providers, }, ); + + substream.close().await; } (None, Some(key)) => { tracing::trace!( @@ -664,7 +697,7 @@ impl Kademlia { let message = KademliaMessage::get_providers_response(providers, &closer_peers); - self.executor.send_message(peer, message.into(), substream); + self.executor.send_message(peer, None, message.into(), substream); } (None, None) => tracing::debug!( target: LOG_TARGET, @@ -703,10 +736,8 @@ impl Kademlia { }; if let Some(context) = self.peers.get_mut(&peer) { - let query = match context.pending_actions.remove(&substream_id) { - Some(PeerAction::SendFindNode(query)) => Some(query), - _ => None, - }; + let query = + context.pending_actions.remove(&substream_id).as_ref().map(PeerAction::query_id); self.disconnect_peer(peer, query).await; } @@ -723,17 +754,19 @@ impl Kademlia { }; for action in actions { - if let PeerAction::SendFindNode(query_id) = action { - tracing::trace!( - target: LOG_TARGET, - ?peer, - query = ?query_id, - ?addresses, - "report failure for pending query", - ); + let query = action.query_id(); - self.engine.register_response_failure(query_id, peer); - } + tracing::trace!( + target: LOG_TARGET, + ?peer, + ?query, + ?addresses, + "report failure for pending query", + ); + + // Fail both sending and receiving due to dial failure. + self.engine.register_send_failure(query, peer); + self.engine.register_response_failure(query, peer); } } @@ -793,11 +826,13 @@ impl Kademlia { async fn on_query_action(&mut self, action: QueryAction) -> Result<(), (QueryId, PeerId)> { match action { QueryAction::SendMessage { query, peer, .. } => { + // This action is used for `FIND_NODE`, `GET_VALUE` and `GET_PROVIDERS` queries. if self .open_substream_or_dial(peer, PeerAction::SendFindNode(query), Some(query)) .is_err() { // Announce the error to the query engine. + self.engine.register_send_failure(query, peer); self.engine.register_response_failure(query, peer); } Ok(()) @@ -883,6 +918,7 @@ impl Kademlia { Ok(()) } QueryAction::AddProviderToFoundNodes { + query, provided_key, provider, peers, @@ -899,7 +935,7 @@ impl Kademlia { for peer in peers { if let Err(error) = self.open_substream_or_dial( peer.peer, - PeerAction::SendAddProvider(message.clone()), + PeerAction::SendAddProvider(query, message.clone()), None, ) { tracing::debug!( @@ -1014,33 +1050,72 @@ impl Kademlia { "message sent to peer", ); let _ = substream.close().await; + + if let Some(query_id) = query_id { + self.engine.register_send_success(query_id, peer); + } + } + // This is a workaround to gracefully handle older litep2p nodes not + // sending/receiving `PUT_VALUE` ACKs. This should eventually be removed. + // TODO: remove this as part of + // https://github.com/paritytech/litep2p/issues/429. + QueryResult::AssumeSendSuccess => { + tracing::trace!( + target: LOG_TARGET, + ?peer, + query = ?query_id, + "treating message as sent to peer", + ); + + if let Some(query_id) = query_id { + self.engine.register_send_success(query_id, peer); + } + } + QueryResult::SendFailure { reason } => { + tracing::debug!( + target: LOG_TARGET, + ?peer, + query = ?query_id, + ?reason, + "failed to send message to peer", + ); + + self.disconnect_peer(peer, query_id).await; } QueryResult::ReadSuccess { substream, message } => { - tracing::trace!(target: LOG_TARGET, + tracing::trace!( + target: LOG_TARGET, ?peer, query = ?query_id, "message read from peer", ); + if let Some(query_id) = query_id { + // Read success for locally originating requests implies send + // success. + self.engine.register_send_success(query_id, peer); + } + if let Err(error) = self.on_message_received( peer, query_id, message, substream ).await { - tracing::debug!(target: LOG_TARGET, + tracing::debug!( + target: LOG_TARGET, ?peer, ?error, "failed to process message", ); } } - QueryResult::SubstreamClosed | QueryResult::Timeout => { + QueryResult::ReadFailure { reason } => { tracing::debug!( target: LOG_TARGET, ?peer, query = ?query_id, - ?result, + ?reason, "failed to read message from substream", ); diff --git a/src/protocol/libp2p/kademlia/query/find_many_nodes.rs b/src/protocol/libp2p/kademlia/query/find_many_nodes.rs index 5bc1155b..4be51b0d 100644 --- a/src/protocol/libp2p/kademlia/query/find_many_nodes.rs +++ b/src/protocol/libp2p/kademlia/query/find_many_nodes.rs @@ -52,6 +52,12 @@ impl FindManyNodesContext { /// Register `FIND_NODE` response from `peer`. pub fn register_response(&mut self, _peer: PeerId, _peers: Vec) {} + /// Register a failure of sending a request to `peer`. + pub fn register_send_failure(&mut self, _peer: PeerId) {} + + /// Register a success of sending a request to `peer`. + pub fn register_send_success(&mut self, _peer: PeerId) {} + /// Get next action for `peer`. pub fn next_peer_action(&mut self, _peer: &PeerId) -> Option { None diff --git a/src/protocol/libp2p/kademlia/query/find_node.rs b/src/protocol/libp2p/kademlia/query/find_node.rs index 8163451b..a354c397 100644 --- a/src/protocol/libp2p/kademlia/query/find_node.rs +++ b/src/protocol/libp2p/kademlia/query/find_node.rs @@ -194,6 +194,18 @@ impl>> FindNodeContext { } } + /// Register a failure of sending `FIN_NODE` request to `peer`. + pub fn register_send_failure(&mut self, _peer: PeerId) { + // In case of a send failure, `register_response_failure` is called as well. + // Failure is handled there. + } + + /// Register a success of sending `FIND_NODE` request to `peer`. + pub fn register_send_success(&mut self, _peer: PeerId) { + // `FIND_NODE` requests are compound request-response pairs of messages, + // so we handle final success/failure in `register_response`/`register_response_failure`. + } + /// Get next action for `peer`. pub fn next_peer_action(&mut self, peer: &PeerId) -> Option { self.pending.contains_key(peer).then_some(QueryAction::SendMessage { diff --git a/src/protocol/libp2p/kademlia/query/get_providers.rs b/src/protocol/libp2p/kademlia/query/get_providers.rs index d6874bd1..9596e036 100644 --- a/src/protocol/libp2p/kademlia/query/get_providers.rs +++ b/src/protocol/libp2p/kademlia/query/get_providers.rs @@ -207,6 +207,18 @@ impl GetProvidersContext { } } + /// Register a failure of sending a `GET_PROVIDERS` request to `peer`. + pub fn register_send_failure(&mut self, _peer: PeerId) { + // In case of a send failure, `register_response_failure` is called as well. + // Failure is handled there. + } + + /// Register a success of sending a `GET_PROVIDERS` request to `peer`. + pub fn register_send_success(&mut self, _peer: PeerId) { + // `GET_PROVIDERS` requests are compound request-response pairs of messages, + // so we handle final success/failure in `register_response`/`register_response_failure`. + } + /// Get next action for `peer`. // TODO: https://github.com/paritytech/litep2p/issues/40 remove this and store the next action to `PeerAction` pub fn next_peer_action(&mut self, peer: &PeerId) -> Option { diff --git a/src/protocol/libp2p/kademlia/query/get_record.rs b/src/protocol/libp2p/kademlia/query/get_record.rs index 420d34f6..cc143efa 100644 --- a/src/protocol/libp2p/kademlia/query/get_record.rs +++ b/src/protocol/libp2p/kademlia/query/get_record.rs @@ -216,6 +216,18 @@ impl GetRecordContext { } } + /// Register a failure of sending a `GET_VALUE` request to `peer`. + pub fn register_send_failure(&mut self, _peer: PeerId) { + // In case of a send failure, `register_response_failure` is called as well. + // Failure is handled there. + } + + /// Register a success of sending a `GET_VALUE` request to `peer`. + pub fn register_send_success(&mut self, _peer: PeerId) { + // `GET_VALUE` requests are compound request-response pairs of messages, + // so we handle final success/failure in `register_response`/`register_response_failure`. + } + /// Get next action for `peer`. // TODO: https://github.com/paritytech/litep2p/issues/40 remove this and store the next action to `PeerAction` pub fn next_peer_action(&mut self, peer: &PeerId) -> Option { diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index 3fcd4650..e5d237a1 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -37,13 +37,13 @@ use bytes::Bytes; use std::collections::{HashMap, VecDeque}; -use self::{find_many_nodes::FindManyNodesContext, put_record::PutRecordToFoundNodesContext}; +use self::{find_many_nodes::FindManyNodesContext, target_peers::PutToTargetPeersContext}; mod find_many_nodes; mod find_node; mod get_providers; mod get_record; -mod put_record; +mod target_peers; /// Logging target for the file. const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query"; @@ -89,7 +89,7 @@ enum QueryType { /// `PUT_VALUE` message sending phase. PutRecordToFoundNodes { /// Context for tracking `PUT_VALUE` responses. - context: PutRecordToFoundNodesContext, + context: PutToTargetPeersContext, }, /// `GET_VALUE` query. @@ -170,6 +170,9 @@ pub enum QueryAction { /// Add the provider record to nodes closest to the target key. AddProviderToFoundNodes { + /// Query ID of the original ADD_PROVIDER request. + query: QueryId, + /// Provided key. provided_key: RecordKey, @@ -478,7 +481,7 @@ impl QueryEngine { self.queries.insert( query_id, QueryType::PutRecordToFoundNodes { - context: PutRecordToFoundNodesContext::new(query_id, key, peers, quorum), + context: PutToTargetPeersContext::new(query_id, key, peers, quorum), }, ); } @@ -526,7 +529,7 @@ impl QueryEngine { match self.queries.get_mut(&query) { None => { - tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query"); + tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response for a stale query"); } Some(QueryType::FindNode { context }) => match message { KademliaMessage::FindNode { peers, .. } => { @@ -578,6 +581,81 @@ impl QueryEngine { None } + pub fn register_send_failure(&mut self, query: QueryId, peer: PeerId) { + tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register send failure"); + + match self.queries.get_mut(&query) { + None => { + tracing::trace!(target: LOG_TARGET, ?query, ?peer, "send failure for a stale query"); + } + Some(QueryType::FindNode { context }) => { + context.register_send_failure(peer); + } + Some(QueryType::PutRecord { context, .. }) => { + context.register_send_failure(peer); + } + Some(QueryType::PutRecordToPeers { context, .. }) => { + context.register_send_failure(peer); + } + Some(QueryType::PutRecordToFoundNodes { context, .. }) => { + context.register_send_failure(peer); + } + Some(QueryType::GetRecord { context }) => { + context.register_send_failure(peer); + } + Some(QueryType::AddProvider { context, .. }) => { + context.register_send_failure(peer); + } + Some(QueryType::GetProviders { context }) => { + context.register_send_failure(peer); + } + } + } + + pub fn register_send_success(&mut self, query: QueryId, peer: PeerId) { + tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register send success"); + + match self.queries.get_mut(&query) { + None => { + tracing::trace!(target: LOG_TARGET, ?query, ?peer, "send success for a stale query"); + } + Some(QueryType::FindNode { context }) => { + context.register_send_success(peer); + } + Some(QueryType::PutRecord { context, .. }) => { + context.register_send_success(peer); + } + Some(QueryType::PutRecordToPeers { context, .. }) => { + context.register_send_success(peer); + } + Some(QueryType::PutRecordToFoundNodes { context, .. }) => { + context.register_send_success(peer); + } + Some(QueryType::GetRecord { context }) => { + context.register_send_success(peer); + } + Some(QueryType::AddProvider { context, .. }) => { + context.register_send_success(peer); + } + Some(QueryType::GetProviders { context }) => { + context.register_send_success(peer); + } + } + } + + /// Register peer failure when it is not known whether sending or receiveiing failed. + /// This is called from [`super::Kademlia::disconnect_peer`]. + pub fn register_peer_failure(&mut self, query: QueryId, peer: PeerId) { + tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register peer failure"); + + // Because currently queries track either send success/failure (`PUT_VALUE`, `ADD_PROVIDER`) + // or response success/failure (`FIND_NODE`, `GET_VALUE`, `GET_PROVIDERS`), + // but not both, we can just call both here and not propagate this different type of + // failure to specific queries knowing this will result in the correct behaviour. + self.register_send_failure(query, peer); + self.register_response_failure(query, peer); + } + /// Get next action for `peer` from the [`QueryEngine`]. pub fn next_peer_action(&mut self, query: &QueryId, peer: &PeerId) -> Option { tracing::trace!(target: LOG_TARGET, ?query, ?peer, "get next peer action"); @@ -641,6 +719,7 @@ impl QueryEngine { provider, context, } => QueryAction::AddProviderToFoundNodes { + query: context.config.query, provided_key, provider, peers: context.responses.into_values().collect::>(), @@ -711,7 +790,7 @@ mod tests { } #[test] - fn query_fails() { + fn find_node_query_fails() { let _ = tracing_subscriber::fmt() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .try_init(); @@ -746,7 +825,7 @@ mod tests { } #[test] - fn lookup_paused() { + fn find_node_lookup_paused() { let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize); let target_peer = PeerId::random(); let _target_key = Key::from(target_peer); @@ -868,6 +947,139 @@ mod tests { assert!(engine.next_action().is_none()); } + #[test] + fn put_record_fails() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize); + let record_key = RecordKey::new(&vec![1, 2, 3, 4]); + let target_key = Key::new(record_key.clone()); + let original_record = Record::new(record_key.clone(), vec![1, 3, 3, 7, 1, 3, 3, 8]); + + let distances = { + let mut distances = std::collections::BTreeMap::new(); + + for i in 1..64 { + let peer = make_peer_id(i, 0); + let key = Key::from(peer); + + distances.insert(target_key.distance(&key), peer); + } + + distances + }; + let mut iter = distances.iter(); + + // start find node with one known peer + let original_query_id = QueryId(1340); + let _query = engine.start_put_record( + original_query_id, + original_record.clone(), + vec![KademliaPeer::new( + *iter.next().unwrap().1, + vec![], + ConnectionType::NotConnected, + )] + .into(), + Quorum::All, + ); + + let action = engine.next_action(); + assert!(engine.next_action().is_none()); + + // the one known peer responds with 3 other peers it knows + match action { + Some(QueryAction::SendMessage { query, peer, .. }) => { + engine.register_response( + query, + peer, + KademliaMessage::FindNode { + target: Vec::new(), + peers: vec![ + KademliaPeer::new( + *iter.next().unwrap().1, + vec![], + ConnectionType::NotConnected, + ), + KademliaPeer::new( + *iter.next().unwrap().1, + vec![], + ConnectionType::NotConnected, + ), + KademliaPeer::new( + *iter.next().unwrap().1, + vec![], + ConnectionType::NotConnected, + ), + ], + }, + ); + } + _ => panic!("invalid event received"), + } + + // send empty response for the last three nodes + for _ in 0..3 { + match engine.next_action() { + Some(QueryAction::SendMessage { query, peer, .. }) => { + println!("next send message to {peer:?}"); + engine.register_response( + query, + peer, + KademliaMessage::FindNode { + target: Vec::new(), + peers: vec![], + }, + ); + } + _ => panic!("invalid event received"), + } + } + + let mut peers = match engine.next_action() { + Some(QueryAction::PutRecordToFoundNodes { + query, + peers, + record, + quorum, + }) => { + assert_eq!(query, original_query_id); + assert_eq!(peers.len(), 4); + assert_eq!(record.key, original_record.key); + assert_eq!(record.value, original_record.value); + assert!(matches!(quorum, Quorum::All)); + + peers + } + _ => panic!("invalid event received"), + }; + + engine.start_put_record_to_found_nodes_requests_tracking( + original_query_id, + record_key.clone(), + peers.iter().map(|p| p.peer).collect(), + Quorum::All, + ); + + // sends to all but one peer succeed + let last_peer = peers.pop().unwrap(); + for peer in peers { + engine.register_send_success(original_query_id, peer.peer); + } + engine.register_send_failure(original_query_id, last_peer.peer); + + match engine.next_action() { + Some(QueryAction::QueryFailed { query }) => { + assert_eq!(query, original_query_id); + } + _ => panic!("invalid event received"), + } + + assert!(engine.next_action().is_none()); + } + #[test] fn put_record_succeeds() { let _ = tracing_subscriber::fmt() @@ -984,16 +1196,211 @@ mod tests { Quorum::All, ); - // Receive ACKs for PUT_VALUE requests. + // simulate successful sends to all peers for peer in &peers { - engine.register_response( - original_query_id, - peer.peer, - KademliaMessage::PutValue { - record: original_record.clone(), - }, - ); + engine.register_send_success(original_query_id, peer.peer); + } + + match engine.next_action() { + Some(QueryAction::PutRecordQuerySucceeded { query, key }) => { + assert_eq!(query, original_query_id); + assert_eq!(key, record_key); + } + _ => panic!("invalid event received"), + } + + assert!(engine.next_action().is_none()); + + // get records from those peers. + let _query = engine.start_get_record( + QueryId(1341), + record_key.clone(), + vec![ + KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected), + KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected), + KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected), + KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected), + ] + .into(), + Quorum::All, + false, + ); + + let mut records = Vec::new(); + for _ in 0..4 { + match engine.next_action() { + Some(QueryAction::SendMessage { query, peer, .. }) => { + assert_eq!(query, QueryId(1341)); + engine.register_response( + query, + peer, + KademliaMessage::GetRecord { + record: Some(original_record.clone()), + peers: vec![], + key: Some(record_key.clone()), + }, + ); + } + event => panic!("invalid event received {:?}", event), + } + + // GetRecordPartialResult is emitted after the `register_response` if the record is + // valid. + match engine.next_action() { + Some(QueryAction::GetRecordPartialResult { query_id, record }) => { + println!("Partial result {:?}", record); + assert_eq!(query_id, QueryId(1341)); + records.push(record); + } + event => panic!("invalid event received {:?}", event), + } + } + + let peers: std::collections::HashSet<_> = peers.into_iter().map(|p| p.peer).collect(); + match engine.next_action() { + Some(QueryAction::GetRecordQueryDone { .. }) => { + println!("Records {:?}", records); + let query_peers = records + .iter() + .map(|peer_record| peer_record.peer) + .collect::>(); + assert_eq!(peers, query_peers); + + let records: std::collections::HashSet<_> = + records.into_iter().map(|peer_record| peer_record.record).collect(); + // One single record found across peers. + assert_eq!(records.len(), 1); + let record = records.into_iter().next().unwrap(); + + assert_eq!(record.key, original_record.key); + assert_eq!(record.value, original_record.value); + } + event => panic!("invalid event received {:?}", event), + } + } + + #[test] + fn put_record_succeeds_with_quorum_one() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize); + let record_key = RecordKey::new(&vec![1, 2, 3, 4]); + let target_key = Key::new(record_key.clone()); + let original_record = Record::new(record_key.clone(), vec![1, 3, 3, 7, 1, 3, 3, 8]); + + let distances = { + let mut distances = std::collections::BTreeMap::new(); + + for i in 1..64 { + let peer = make_peer_id(i, 0); + let key = Key::from(peer); + + distances.insert(target_key.distance(&key), peer); + } + + distances + }; + let mut iter = distances.iter(); + + // start find node with one known peer + let original_query_id = QueryId(1340); + let _query = engine.start_put_record( + original_query_id, + original_record.clone(), + vec![KademliaPeer::new( + *iter.next().unwrap().1, + vec![], + ConnectionType::NotConnected, + )] + .into(), + Quorum::One, + ); + + let action = engine.next_action(); + assert!(engine.next_action().is_none()); + + // the one known peer responds with 3 other peers it knows + match action { + Some(QueryAction::SendMessage { query, peer, .. }) => { + engine.register_response( + query, + peer, + KademliaMessage::FindNode { + target: Vec::new(), + peers: vec![ + KademliaPeer::new( + *iter.next().unwrap().1, + vec![], + ConnectionType::NotConnected, + ), + KademliaPeer::new( + *iter.next().unwrap().1, + vec![], + ConnectionType::NotConnected, + ), + KademliaPeer::new( + *iter.next().unwrap().1, + vec![], + ConnectionType::NotConnected, + ), + ], + }, + ); + } + _ => panic!("invalid event received"), + } + + // send empty response for the last three nodes + for _ in 0..3 { + match engine.next_action() { + Some(QueryAction::SendMessage { query, peer, .. }) => { + println!("next send message to {peer:?}"); + engine.register_response( + query, + peer, + KademliaMessage::FindNode { + target: Vec::new(), + peers: vec![], + }, + ); + } + _ => panic!("invalid event received"), + } + } + + let peers = match engine.next_action() { + Some(QueryAction::PutRecordToFoundNodes { + query, + peers, + record, + quorum, + }) => { + assert_eq!(query, original_query_id); + assert_eq!(peers.len(), 4); + assert_eq!(record.key, original_record.key); + assert_eq!(record.value, original_record.value); + assert!(matches!(quorum, Quorum::One)); + + peers + } + _ => panic!("invalid event received"), + }; + + engine.start_put_record_to_found_nodes_requests_tracking( + original_query_id, + record_key.clone(), + peers.iter().map(|p| p.peer).collect(), + Quorum::One, + ); + + // all but one peer fail + assert!(peers.len() > 1); + for peer in peers.iter().take(peers.len() - 1) { + engine.register_send_failure(original_query_id, peer.peer); } + engine.register_send_success(original_query_id, peers.last().unwrap().peer); match engine.next_action() { Some(QueryAction::PutRecordQuerySucceeded { query, key }) => { diff --git a/src/protocol/libp2p/kademlia/query/target_peers.rs b/src/protocol/libp2p/kademlia/query/target_peers.rs new file mode 100644 index 00000000..964aca4a --- /dev/null +++ b/src/protocol/libp2p/kademlia/query/target_peers.rs @@ -0,0 +1,149 @@ +// Copyright 2025 litep2p developers +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. +use crate::{ + protocol::libp2p::kademlia::{handle::Quorum, query::QueryAction, QueryId, RecordKey}, + PeerId, +}; + +use std::{cmp, collections::HashSet}; + +/// Logging target for this file. +const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query::target_peers"; + +/// Context for tracking `PUT_VALUE`/`ADD_PROVIDER` requests to peers. +#[derive(Debug)] +pub struct PutToTargetPeersContext { + /// Query ID. + pub query: QueryId, + + /// Record/provider key. + pub key: RecordKey, + + /// Quorum that needs to be reached for the query to succeed. + peers_to_succeed: usize, + + /// Peers we're waiting for responses from. + pending_peers: HashSet, + + /// Number of successfully responded peers. + n_succeeded: usize, +} + +impl PutToTargetPeersContext { + /// Create new [`PutToTargetPeersContext`]. + pub fn new(query: QueryId, key: RecordKey, peers: Vec, quorum: Quorum) -> Self { + Self { + query, + key, + peers_to_succeed: match quorum { + Quorum::One => 1, + // Clamp by the number of discovered peers. This should ever be relevant on + // small networks with fewer peers than the replication factor. Without such + // clamping the query would always fail in small testnets. + Quorum::N(n) => cmp::min(n.get(), cmp::max(peers.len(), 1)), + Quorum::All => cmp::max(peers.len(), 1), + }, + pending_peers: peers.into_iter().collect(), + n_succeeded: 0, + } + } + + /// Register a success of sending a message to `peer`. + pub fn register_send_success(&mut self, peer: PeerId) { + if self.pending_peers.remove(&peer) { + self.n_succeeded += 1; + + tracing::trace!( + target: LOG_TARGET, + query = ?self.query, + ?peer, + "successful `PUT_VALUE`/`ADD_PROVIDER` to peer", + ); + } else { + tracing::debug!( + target: LOG_TARGET, + query = ?self.query, + ?peer, + "`PutToTargetPeersContext::register_response`: pending peer does not exist", + ); + } + } + + /// Register a failure of sending a message to `peer`. + pub fn register_send_failure(&mut self, peer: PeerId) { + if self.pending_peers.remove(&peer) { + tracing::trace!( + target: LOG_TARGET, + query = ?self.query, + ?peer, + "failed `PUT_VALUE`/`ADD_PROVIDER` to peer", + ); + } else { + tracing::debug!( + target: LOG_TARGET, + query = ?self.query, + ?peer, + "`PutToTargetPeersContext::register_response_failure`: pending peer does not exist", + ); + } + } + + /// Register successful response from peer. + pub fn register_response(&mut self, _peer: PeerId) { + // Currently we only track if we successfully sent the message to the peer both for + // `PUT_VALUE` and `ADD_PROVIDER`. While `PUT_VALUE` has a response message, due to litep2p + // not sending it in the past, tracking it would frequently result in reporting query + // failures. `ADD_PROVIDER` does not have a response message at all. + + // TODO: once most of the network is on a litep2p version that sends `PUT_VALUE` responses, + // we should track them. + } + + /// Register failed response from peer. + pub fn register_response_failure(&mut self, _peer: PeerId) { + // See a comment in `register_response`. + + // Also note that due to the implementation of [`QueryEngine::register_peer_failure`], only + // one of `register_response_failure` or `register_send_failure` must be implemented. + } + + /// Check if all responses have been received. + pub fn is_finished(&self) -> bool { + self.pending_peers.is_empty() + } + + /// Check if all requests were successful. + pub fn is_succeded(&self) -> bool { + self.n_succeeded >= self.peers_to_succeed + } + + /// Get next action if the context is finished. + pub fn next_action(&self) -> Option { + if self.is_finished() { + if self.is_succeded() { + Some(QueryAction::QuerySucceeded { query: self.query }) + } else { + Some(QueryAction::QueryFailed { query: self.query }) + } + } else { + None + } + } +} diff --git a/tests/protocol/kademlia.rs b/tests/protocol/kademlia.rs index 242ce670..c0d6b3a9 100644 --- a/tests/protocol/kademlia.rs +++ b/tests/protocol/kademlia.rs @@ -184,6 +184,11 @@ async fn records_are_stored_automatically() { let _ = kad_handle2 .get_record(RecordKey::from(vec![1, 2, 3]), Quorum::One).await; } + Some(KademliaEvent::QueryFailed { query_id: got_query_id }) => { + assert_eq!(got_query_id, query_id); + + panic!("query failed") + } _ => {} } }