Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ str0m = { version = "0.9.0", optional = true }

# Fuzzing related dependencies.
serde_millis = {version = "0.1", optional = true}
enum-display = "0.1.4"
# End of fuzzing related dependencies.

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion src/crypto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl From<ed25519::PublicKey> for PublicKey {

/// The public key of a remote node's identity keypair. Supports RSA keys additionally to ed25519.
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum RemotePublicKey {
pub enum RemotePublicKey {
/// A public Ed25519 key.
Ed25519(ed25519::PublicKey),
/// A public RSA key.
Expand Down
25 changes: 22 additions & 3 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use std::{
///
/// Quorum defines how many peers must be successfully contacted
/// in order for the query to be considered successful.
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "fuzz", derive(serde::Serialize, serde::Deserialize))]
pub enum Quorum {
/// All peers must be successfully contacted.
Expand Down Expand Up @@ -156,6 +156,9 @@ pub enum KademliaCommand {
/// Provided key.
key: RecordKey,

/// [`Quorum`] for the query.
quorum: Quorum,

/// Query ID for the query.
query_id: QueryId,
},
Expand Down Expand Up @@ -240,6 +243,15 @@ pub enum KademliaEvent {
key: RecordKey,
},

/// `ADD_PROVIDER` query succeeded.
AddProviderSuccess {
/// Query ID.
query_id: QueryId,

/// Provided key.
provided_key: RecordKey,
},

/// Query failed.
QueryFailed {
/// Query ID.
Expand Down Expand Up @@ -373,9 +385,16 @@ impl KademliaHandle {
///
/// Register the local peer ID & its `public_addresses` as a provider for a given `key`.
/// Returns [`Err`] only if `Kademlia` is terminating.
pub async fn start_providing(&mut self, key: RecordKey) -> QueryId {
pub async fn start_providing(&mut self, key: RecordKey, quorum: Quorum) -> QueryId {
let query_id = self.next_query_id();
let _ = self.cmd_tx.send(KademliaCommand::StartProviding { key, query_id }).await;
let _ = self
.cmd_tx
.send(KademliaCommand::StartProviding {
key,
quorum,
query_id,
})
.await;

query_id
}
Expand Down
3 changes: 2 additions & 1 deletion src/protocol/libp2p/kademlia/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ use crate::{
};

use bytes::{Bytes, BytesMut};
use enum_display::EnumDisplay;
use prost::Message;
use std::time::{Duration, Instant};

/// Logging target for the file.
const LOG_TARGET: &str = "litep2p::ipfs::kademlia::message";

/// Kademlia message.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, EnumDisplay)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dq: This is used to display the message in the logs?

pub enum KademliaMessage {
/// `FIND_NODE` message.
FindNode {
Expand Down
39 changes: 33 additions & 6 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,7 @@ impl Kademlia {
provided_key,
provider,
peers,
quorum,
} => {
tracing::trace!(
target: LOG_TARGET,
Expand All @@ -939,7 +940,7 @@ impl Kademlia {

let message = KademliaMessage::add_provider(provided_key.clone(), provider);

for peer in peers {
for peer in &peers {
if let Err(error) = self.open_substream_or_dial(
peer.peer,
PeerAction::SendAddProvider(query, message.clone()),
Expand All @@ -955,6 +956,28 @@ impl Kademlia {
}
}

self.engine.start_add_provider_to_found_nodes_requests_tracking(
query,
provided_key,
peers.into_iter().map(|peer| peer.peer).collect(),
quorum,
);

Ok(())
}
QueryAction::AddProviderQuerySucceeded {
query,
provided_key,
} => {
tracing::debug!(target: LOG_TARGET, ?query, "`ADD_PROVIDER` query succeeded");

let _ = self
.event_tx
.send(KademliaEvent::AddProviderSuccess {
query_id: query,
provided_key,
})
.await;
Ok(())
}
QueryAction::GetRecordQueryDone { query_id } => {
Expand Down Expand Up @@ -1222,6 +1245,7 @@ impl Kademlia {
}
Some(KademliaCommand::StartProviding {
key,
quorum,
query_id
}) => {
tracing::debug!(
Expand All @@ -1237,7 +1261,7 @@ impl Kademlia {
addresses,
};

self.store.put_provider(key.clone(), provider.clone());
self.store.put_local_provider(key.clone(), quorum);

self.engine.start_add_provider(
query_id,
Expand All @@ -1246,6 +1270,7 @@ impl Kademlia {
self.routing_table
.closest(&Key::new(key), self.replication_factor)
.into(),
quorum,
);
}
Some(KademliaCommand::StopProviding {
Expand Down Expand Up @@ -1356,16 +1381,17 @@ impl Kademlia {
}
},
action = self.store.next_action() => match action {
Some(MemoryStoreAction::RefreshProvider { provided_key, provider }) => {
Some(MemoryStoreAction::RefreshProvider { provided_key, provider, quorum }) => {
tracing::trace!(
target: LOG_TARGET,
?provided_key,
"republishing local provider",
);

self.store.put_provider(provided_key.clone(), provider.clone());
// We never update local provider addresses in the store when refresh
// it, as this is done anyway when replying to `GET_PROVIDERS` request.
self.store.put_local_provider(provided_key.clone(), quorum);

// We never update local provider addresses in the store during refresh,
// as this is done anyway when replying to `GET_PROVIDERS` request.

let query_id = self.next_query_id();
self.engine.start_add_provider(
Expand All @@ -1375,6 +1401,7 @@ impl Kademlia {
self.routing_table
.closest(&Key::new(provided_key), self.replication_factor)
.into(),
quorum,
);
}
None => {}
Expand Down
Loading