Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 47 additions & 39 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,10 +422,7 @@ impl Kademlia {
tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "handle message from peer");

match KademliaMessage::from_bytes(message).ok_or(Error::InvalidData)? {
ref message @ KademliaMessage::FindNode {
ref target,
ref peers,
} => {
KademliaMessage::FindNode { target, peers } => {
match query_id {
Some(query_id) => {
tracing::trace!(
Expand All @@ -437,8 +434,12 @@ impl Kademlia {
);

// update routing table and inform user about the update
self.update_routing_table(peers).await;
self.engine.register_response(query_id, peer, message.clone());
self.update_routing_table(&peers).await;
self.engine.register_response(
query_id,
peer,
KademliaMessage::FindNode { target, peers },
);
}
None => {
tracing::trace!(
Expand All @@ -449,9 +450,9 @@ impl Kademlia {
);

let message = KademliaMessage::find_node_response(
target,
&target,
self.routing_table
.closest(Key::from(target.clone()), self.replication_factor),
.closest(&Key::new(target.as_ref()), self.replication_factor),
);
self.executor.send_message(peer, message.into(), substream);
}
Expand All @@ -471,13 +472,9 @@ impl Kademlia {

let _ = self.event_tx.send(KademliaEvent::IncomingRecord { record }).await;
}
ref message @ KademliaMessage::GetRecord {
ref key,
ref record,
ref peers,
} => {
KademliaMessage::GetRecord { key, record, peers } => {
match (query_id, key) {
(Some(query_id), _) => {
(Some(query_id), key) => {
tracing::trace!(
target: LOG_TARGET,
?peer,
Expand All @@ -488,8 +485,12 @@ impl Kademlia {
);

// update routing table and inform user about the update
self.update_routing_table(peers).await;
self.engine.register_response(query_id, peer, message.clone());
self.update_routing_table(&peers).await;
self.engine.register_response(
query_id,
peer,
KademliaMessage::GetRecord { key, record, peers },
);
}
(None, Some(key)) => {
tracing::trace!(
Expand All @@ -499,22 +500,20 @@ impl Kademlia {
"handle `GET_VALUE` request",
);

let value = self.store.get(key).cloned();
let value = self.store.get(&key).cloned();
let closest_peers = self
.routing_table
.closest(Key::from(key.to_vec()), self.replication_factor);
.closest(&Key::new(key.as_ref()), self.replication_factor);

let message = KademliaMessage::get_value_response(
(*key).clone(),
closest_peers,
value,
);
let message =
KademliaMessage::get_value_response(key, closest_peers, value);
self.executor.send_message(peer, message.into(), substream);
}
(None, None) => tracing::debug!(
target: LOG_TARGET,
?peer,
?message,
?record,
?peers,
"unable to handle `GET_RECORD` request with empty key",
),
}
Expand Down Expand Up @@ -567,10 +566,10 @@ impl Kademlia {
}
}
}
ref message @ KademliaMessage::GetProviders {
ref key,
ref peers,
ref providers,
KademliaMessage::GetProviders {
key,
peers,
providers,
} => {
match (query_id, key) {
(Some(query_id), key) => {
Expand All @@ -586,9 +585,17 @@ impl Kademlia {
);

// update routing table and inform user about the update
self.update_routing_table(peers).await;
self.update_routing_table(&peers).await;

self.engine.register_response(query_id, peer, message.clone());
self.engine.register_response(
query_id,
peer,
KademliaMessage::GetProviders {
key,
peers,
providers,
},
);
}
(None, Some(key)) => {
tracing::trace!(
Expand All @@ -598,7 +605,7 @@ impl Kademlia {
"handle `GET_PROVIDERS` request",
);

let mut providers = self.store.get_providers(key);
let mut providers = self.store.get_providers(&key);

// Make sure local provider addresses are up to date.
let local_peer_id = self.local_key.clone().into_preimage();
Expand All @@ -608,7 +615,7 @@ impl Kademlia {

let closer_peers = self
.routing_table
.closest(Key::from(key.to_vec()), self.replication_factor);
.closest(&Key::new(key.as_ref()), self.replication_factor);

let message =
KademliaMessage::get_providers_response(providers, &closer_peers);
Expand All @@ -617,7 +624,8 @@ impl Kademlia {
(None, None) => tracing::debug!(
target: LOG_TARGET,
?peer,
?message,
?peers,
?providers,
"unable to handle `GET_PROVIDERS` request with empty key",
),
}
Expand Down Expand Up @@ -977,7 +985,7 @@ impl Kademlia {
query_id,
peer,
self.routing_table
.closest(Key::from(peer), self.replication_factor)
.closest(&Key::from(peer), self.replication_factor)
.into()
);
}
Expand Down Expand Up @@ -1005,7 +1013,7 @@ impl Kademlia {
self.engine.start_put_record(
query_id,
record,
self.routing_table.closest(key, self.replication_factor).into(),
self.routing_table.closest(&key, self.replication_factor).into(),
);
}
Some(KademliaCommand::PutRecordToPeers {
Expand Down Expand Up @@ -1074,7 +1082,7 @@ impl Kademlia {
key.clone(),
provider,
self.routing_table
.closest(Key::new(key), self.replication_factor)
.closest(&Key::new(key), self.replication_factor)
.into(),
);
}
Expand Down Expand Up @@ -1107,7 +1115,7 @@ impl Kademlia {
query_id,
key.clone(),
self.routing_table
.closest(Key::new(key), self.replication_factor)
.closest(&Key::new(key), self.replication_factor)
.into(),
quorum,
if record.is_some() { 1 } else { 0 },
Expand All @@ -1125,7 +1133,7 @@ impl Kademlia {
query_id,
key.clone(),
self.routing_table
.closest(Key::new(key), self.replication_factor)
.closest(&Key::new(key), self.replication_factor)
.into(),
known_providers,
);
Expand Down Expand Up @@ -1185,7 +1193,7 @@ impl Kademlia {
provided_key.clone(),
provider,
self.routing_table
.closest(Key::new(provided_key), self.replication_factor)
.closest(&Key::new(provided_key), self.replication_factor)
.into(),
);
}
Expand Down
8 changes: 4 additions & 4 deletions src/protocol/libp2p/kademlia/routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ impl RoutingTable {
}

/// Get `limit` closest peers to `target` from the k-buckets.
pub fn closest<K: Clone>(&mut self, target: Key<K>, limit: usize) -> Vec<KademliaPeer> {
ClosestBucketsIter::new(self.local_key.distance(&target))
.flat_map(|index| self.buckets[index.get()].closest_iter(&target))
pub fn closest<K: Clone>(&mut self, target: &Key<K>, limit: usize) -> Vec<KademliaPeer> {
ClosestBucketsIter::new(self.local_key.distance(target))
.flat_map(|index| self.buckets[index.get()].closest_iter(target))
.take(limit)
.collect()
}
Expand Down Expand Up @@ -299,7 +299,7 @@ mod tests {
}

let target = Key::from(PeerId::random());
let closest = table.closest(target.clone(), 60usize);
let closest = table.closest(&target, 60usize);
let mut prev = None;

for peer in &closest {
Expand Down