Skip to content
Open
36 changes: 36 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ str0m = { version = "0.9.0", optional = true }

# Fuzzing related dependencies.
serde_millis = {version = "0.1", optional = true}
enum-display = "0.1.4"
# End of fuzzing related dependencies.

[dev-dependencies]
Expand Down
25 changes: 22 additions & 3 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use std::{
///
/// Quorum defines how many peers must be successfully contacted
/// in order for the query to be considered successful.
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "fuzz", derive(serde::Serialize, serde::Deserialize))]
pub enum Quorum {
/// All peers must be successfully contacted.
Expand Down Expand Up @@ -156,6 +156,9 @@ pub enum KademliaCommand {
/// Provided key.
key: RecordKey,

/// [`Quorum`] for the query.
quorum: Quorum,

/// Query ID for the query.
query_id: QueryId,
},
Expand Down Expand Up @@ -240,6 +243,15 @@ pub enum KademliaEvent {
key: RecordKey,
},

/// `ADD_PROVIDER` query succeeded.
AddProviderSuccess {
/// Query ID.
query_id: QueryId,

/// Provided key.
provided_key: RecordKey,
},

/// Query failed.
QueryFailed {
/// Query ID.
Expand Down Expand Up @@ -373,9 +385,16 @@ impl KademliaHandle {
///
/// Register the local peer ID & its `public_addresses` as a provider for a given `key`.
/// Returns [`Err`] only if `Kademlia` is terminating.
pub async fn start_providing(&mut self, key: RecordKey) -> QueryId {
pub async fn start_providing(&mut self, key: RecordKey, quorum: Quorum) -> QueryId {
let query_id = self.next_query_id();
let _ = self.cmd_tx.send(KademliaCommand::StartProviding { key, query_id }).await;
let _ = self
.cmd_tx
.send(KademliaCommand::StartProviding {
key,
quorum,
query_id,
})
.await;

query_id
}
Expand Down
3 changes: 2 additions & 1 deletion src/protocol/libp2p/kademlia/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ use crate::{
};

use bytes::{Bytes, BytesMut};
use enum_display::EnumDisplay;
use prost::Message;
use std::time::{Duration, Instant};

/// Logging target for the file.
const LOG_TARGET: &str = "litep2p::ipfs::kademlia::message";

/// Kademlia message.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, EnumDisplay)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dq: This is used to display the message in the logs?

pub enum KademliaMessage {
/// `FIND_NODE` message.
FindNode {
Expand Down
39 changes: 33 additions & 6 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,7 @@ impl Kademlia {
provided_key,
provider,
peers,
quorum,
} => {
tracing::trace!(
target: LOG_TARGET,
Expand All @@ -939,7 +940,7 @@ impl Kademlia {

let message = KademliaMessage::add_provider(provided_key.clone(), provider);

for peer in peers {
for peer in &peers {
if let Err(error) = self.open_substream_or_dial(
peer.peer,
PeerAction::SendAddProvider(query, message.clone()),
Expand All @@ -955,6 +956,28 @@ impl Kademlia {
}
}

self.engine.start_add_provider_to_found_nodes_requests_tracking(
query,
provided_key,
peers.into_iter().map(|peer| peer.peer).collect(),
quorum,
);

Ok(())
}
QueryAction::AddProviderQuerySucceeded {
query,
provided_key,
} => {
tracing::debug!(target: LOG_TARGET, ?query, "`ADD_PROVIDER` query succeeded");

let _ = self
.event_tx
.send(KademliaEvent::AddProviderSuccess {
query_id: query,
provided_key,
})
.await;
Ok(())
}
QueryAction::GetRecordQueryDone { query_id } => {
Expand Down Expand Up @@ -1222,6 +1245,7 @@ impl Kademlia {
}
Some(KademliaCommand::StartProviding {
key,
quorum,
query_id
}) => {
tracing::debug!(
Expand All @@ -1237,7 +1261,7 @@ impl Kademlia {
addresses,
};

self.store.put_provider(key.clone(), provider.clone());
self.store.put_local_provider(key.clone(), quorum);

self.engine.start_add_provider(
query_id,
Expand All @@ -1246,6 +1270,7 @@ impl Kademlia {
self.routing_table
.closest(&Key::new(key), self.replication_factor)
.into(),
quorum,
);
}
Some(KademliaCommand::StopProviding {
Expand Down Expand Up @@ -1356,16 +1381,17 @@ impl Kademlia {
}
},
action = self.store.next_action() => match action {
Some(MemoryStoreAction::RefreshProvider { provided_key, provider }) => {
Some(MemoryStoreAction::RefreshProvider { provided_key, provider, quorum }) => {
tracing::trace!(
target: LOG_TARGET,
?provided_key,
"republishing local provider",
);

self.store.put_provider(provided_key.clone(), provider.clone());
// We never update local provider addresses in the store when refresh
// it, as this is done anyway when replying to `GET_PROVIDERS` request.
self.store.put_local_provider(provided_key.clone(), quorum);

// We never update local provider addresses in the store during refresh,
// as this is done anyway when replying to `GET_PROVIDERS` request.

let query_id = self.next_query_id();
self.engine.start_add_provider(
Expand All @@ -1375,6 +1401,7 @@ impl Kademlia {
self.routing_table
.closest(&Key::new(provided_key), self.replication_factor)
.into(),
quorum,
);
}
None => {}
Expand Down
Loading