Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
2abe611
rewrite graph struct to be 25 times faster
pmnoxx Feb 2, 2021
f502405
ignore unreachable nodes
pmnoxx Feb 3, 2021
7e9f638
add TODO
pmnoxx Feb 3, 2021
f733270
Update chain/network/src/routing.rs
pmnoxx Feb 3, 2021
f63e85f
Update chain/network/src/routing.rs
pmnoxx Feb 3, 2021
a541932
Update chain/network/src/routing.rs
pmnoxx Feb 3, 2021
ed15aab
cleanup
pmnoxx Feb 3, 2021
27c45b4
Update chain/network/src/routing.rs
pmnoxx Feb 3, 2021
73e46e3
Update chain/network/src/routing.rs
pmnoxx Feb 3, 2021
353f5a7
Update chain/network/src/routing.rs
pmnoxx Feb 3, 2021
a4a494d
cleanup
pmnoxx Feb 3, 2021
d393015
cleanup
pmnoxx Feb 3, 2021
5a12baf
cleanup
pmnoxx Feb 3, 2021
42aa9db
cleanup
pmnoxx Feb 3, 2021
bf2dc03
rewrite Graph to use FreeList and make code cleaner
pmnoxx Feb 4, 2021
b14d0f3
fix tests
pmnoxx Feb 4, 2021
9f77149
fix warnings
pmnoxx Feb 4, 2021
737fd4e
Update chain/network/src/routing.rs
pmnoxx Feb 4, 2021
2e7a094
hotfix
pmnoxx Feb 4, 2021
fd63837
fix tests
pmnoxx Feb 4, 2021
5c4f8b7
Add a check for max_num_peers
pmnoxx Feb 4, 2021
4e6699e
fix tests
pmnoxx Feb 4, 2021
c31666b
fix tests
pmnoxx Feb 4, 2021
f2258da
cleanup
pmnoxx Feb 4, 2021
524deeb
test
pmnoxx Feb 4, 2021
06a77fb
Update Cargo.toml
pmnoxx Feb 4, 2021
c0492f6
Update routing.rs
pmnoxx Feb 4, 2021
d0440cf
Update testbed_runners.rs
pmnoxx Feb 4, 2021
6c648d8
Update graph.rs
pmnoxx Feb 4, 2021
996c514
Update lib.rs
pmnoxx Feb 4, 2021
719d396
Update peer_manager.rs
pmnoxx Feb 4, 2021
9193a0f
Update routing.rs
pmnoxx Feb 4, 2021
faca2d7
Update routing.rs
pmnoxx Feb 4, 2021
48fad97
move source_id to Graph struct
pmnoxx Feb 4, 2021
79d762c
cleanup
pmnoxx Feb 4, 2021
25ab2ab
Update routing.rs
pmnoxx Feb 4, 2021
7b1f9e9
Update routing.rs
pmnoxx Feb 4, 2021
c35f9eb
Update routing.rs
pmnoxx Feb 4, 2021
56556e6
Update routing.rs
pmnoxx Feb 4, 2021
39b6c09
Update routing.rs
pmnoxx Feb 4, 2021
ab8d686
fix bug in get_raw_graph
pmnoxx Feb 4, 2021
8cdec9a
cleanup
pmnoxx Feb 4, 2021
18f3591
remove freelist
pmnoxx Feb 4, 2021
778f20d
cleanup
pmnoxx Feb 4, 2021
8476ab4
remove usage of FxHashMap
pmnoxx Feb 4, 2021
aa5b63f
stop using rustc-hash
pmnoxx Feb 4, 2021
1527689
fix tests
pmnoxx Feb 4, 2021
6e371cf
guard get_raw_graph with feature
pmnoxx Feb 4, 2021
073c335
count number of unreachable nodes
pmnoxx Feb 4, 2021
455f870
print number of unreachable nodes
pmnoxx Feb 4, 2021
bf3e9aa
cleanup
pmnoxx Feb 4, 2021
2f23ce8
cleanup
pmnoxx Feb 4, 2021
a24bde1
hotfix
pmnoxx Feb 4, 2021
f8bf83e
modify test for the issue
pmnoxx Feb 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion chain/network/src/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::peer::Peer;
use crate::peer_store::{PeerStore, TrustLevel};
#[cfg(feature = "metric_recorder")]
use crate::recorder::{MetricRecorder, PeerMessageMetadata};
use crate::routing::{Edge, EdgeInfo, EdgeType, ProcessEdgeResult, RoutingTable};
use crate::routing::{Edge, EdgeInfo, EdgeType, ProcessEdgeResult, RoutingTable, MAX_NUM_PEERS};
use crate::types::{
AccountOrPeerIdOrHash, Ban, BlockedPorts, Consolidate, ConsolidateResponse, FullPeerInfo,
InboundTcpConnect, KnownPeerStatus, KnownProducer, NetworkInfo, NetworkViewClientMessages,
Expand Down Expand Up @@ -161,6 +161,10 @@ impl PeerManagerActor {
client_addr: Recipient<NetworkClientMessages>,
view_client_addr: Recipient<NetworkViewClientMessages>,
) -> Result<Self, Box<dyn std::error::Error>> {
if config.max_num_peers as usize > MAX_NUM_PEERS {
panic!("Exceeded max peer limit: {}", MAX_NUM_PEERS);
}

let peer_store = PeerStore::new(store.clone(), &config.boot_nodes)?;
debug!(target: "network", "Found known peers: {} (boot nodes={})", peer_store.len(), config.boot_nodes.len());
debug!(target: "network", "Blacklist: {:?}", config.blacklist);
Expand Down
6 changes: 3 additions & 3 deletions chain/network/src/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ impl MetricRecorder {
self
}

pub fn set_graph(&mut self, graph: &HashMap<PeerId, HashSet<PeerId>>) {
pub fn set_graph(&mut self, graph: HashMap<PeerId, HashSet<PeerId>>) {
self.graph.clear();
for (u, u_adj) in graph.iter() {
for (u, u_adj) in graph {
for v in u_adj {
if u < v {
self.graph.push((u.clone(), v.clone()));
self.graph.push((u.clone(), v));
}
}
}
Expand Down
218 changes: 157 additions & 61 deletions chain/network/src/routing.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{hash_map::Entry, HashMap, HashSet};
use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque};
use std::ops::Sub;
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -43,6 +43,8 @@ const ROUND_ROBIN_NONCE_CACHE_SIZE: usize = 10_000;
/// seconds will be removed from cache and persisted in disk.
pub const SAVE_PEERS_MAX_TIME: u64 = 7_200;
pub const SAVE_PEERS_AFTER_TIME: u64 = 3_600;
/// Graph implementation supports up to 128 peers.
pub const MAX_NUM_PEERS: usize = 128;

/// Information that will be ultimately used to create a new edge.
/// It contains nonce proposed for the edge with signature from peer.
Expand Down Expand Up @@ -257,7 +259,7 @@ pub struct RoutingTable {
/// PeerId associated for every known account id.
account_peers: SizedCache<AccountId, AnnounceAccount>,
/// Active PeerId that are part of the shortest path to each PeerId.
pub peer_forwarding: HashMap<PeerId, HashSet<PeerId>>,
pub peer_forwarding: HashMap<PeerId, Vec<PeerId>>,
/// Store last update for known edges.
pub edges_info: HashMap<(PeerId, PeerId), Edge>,
/// Hash of messages that requires routing back to respective previous hop.
Expand Down Expand Up @@ -425,14 +427,14 @@ impl RoutingTable {
) -> Result<Vec<Edge>, ()> {
let enc_nonce = index_to_bytes(nonce);

let result = match self.store.get_ser::<Vec<Edge>>(ColComponentEdges, enc_nonce.as_ref()) {
let res = match self.store.get_ser::<Vec<Edge>>(ColComponentEdges, enc_nonce.as_ref()) {
Ok(Some(edges)) => Ok(edges),
_ => Err(()),
};

update.delete(ColComponentEdges, enc_nonce.as_ref());

result
res
}

/// If peer_id is not on memory check if it is on disk in bring it back on memory.
Expand Down Expand Up @@ -739,8 +741,20 @@ impl RoutingTable {
}
}

pub fn get_raw_graph(&self) -> &HashMap<PeerId, HashSet<PeerId>> {
&self.raw_graph.adjacency
#[cfg(feature = "metric_recorder")]
pub fn get_raw_graph(&self) -> HashMap<PeerId, HashSet<PeerId>> {
let mut res = HashMap::with_capacity(self.raw_graph.adjacency.len());
for (key, neighbors) in self.raw_graph.adjacency.iter().enumerate() {
if self.raw_graph.used[key] {
let key = self.raw_graph.id2p[key].clone();
let neighbors = neighbors
.iter()
.map(|&node| self.raw_graph.id2p[node as usize].clone())
.collect::<HashSet<_>>();
res.insert(key, neighbors);
}
}
res
}
}

Expand All @@ -752,106 +766,186 @@ pub struct ProcessEdgeResult {
#[derive(Debug)]
pub struct RoutingTableInfo {
pub account_peers: HashMap<AccountId, PeerId>,
pub peer_forwarding: HashMap<PeerId, HashSet<PeerId>>,
pub peer_forwarding: HashMap<PeerId, Vec<PeerId>>,
}

#[derive(Clone)]
pub struct Graph {
pub source: PeerId,
adjacency: HashMap<PeerId, HashSet<PeerId>>,
source_id: u32,
p2id: HashMap<PeerId, u32>,
id2p: Vec<PeerId>,
used: Vec<bool>,
unused: Vec<u32>,
adjacency: Vec<Vec<u32>>,

total_active_edges: u64,
}

impl Graph {
pub fn new(source: PeerId) -> Self {
Self { source, adjacency: HashMap::new(), total_active_edges: 0 }
let mut res = Self {
source: source.clone(),
source_id: 0,
p2id: HashMap::default(),
id2p: Vec::default(),
used: Vec::default(),
unused: Vec::default(),
adjacency: Vec::default(),
total_active_edges: 0,
};
res.id2p.push(source.clone());
res.adjacency.push(Vec::default());
res.p2id.insert(source, res.source_id);
res.used.push(true);

res
}

fn contains_edge(&mut self, peer0: &PeerId, peer1: &PeerId) -> bool {
if let Some(adj) = self.adjacency.get(&peer0) {
if adj.contains(&peer1) {
return true;
fn contains_edge(&self, peer0: &PeerId, peer1: &PeerId) -> bool {
if let Some(&id0) = self.p2id.get(&peer0) {
if let Some(&id1) = self.p2id.get(&peer1) {
return self.adjacency[id0 as usize].contains(&id1);
}
}

false
}

fn add_directed_edge(&mut self, peer0: PeerId, peer1: PeerId) {
self.adjacency.entry(peer0).or_insert_with(HashSet::new).insert(peer1);
fn remove_if_unused(&mut self, id: u32) {
let entry = &self.adjacency[id as usize];

if entry.is_empty() && id != self.source_id {
self.used[id as usize] = false;
self.unused.push(id);
self.p2id.remove(&self.id2p[id as usize]);
}
}

fn remove_directed_edge(&mut self, peer0: &PeerId, peer1: &PeerId) {
self.adjacency.get_mut(&peer0).unwrap().remove(&peer1);
fn get_id(&mut self, peer: &PeerId) -> u32 {
match self.p2id.entry(peer.clone()) {
Entry::Occupied(occupied) => *occupied.get(),
Entry::Vacant(vacant) => {
let val = if let Some(val) = self.unused.pop() {
assert!(!self.used[val as usize]);
assert!(self.adjacency[val as usize].is_empty());
self.id2p[val as usize] = peer.clone();
self.used[val as usize] = true;
val
} else {
let val = self.id2p.len() as u32;
self.id2p.push(peer.clone());
self.used.push(true);
self.adjacency.push(Vec::default());
val
};

vacant.insert(val);
val
}
}
}

pub fn add_edge(&mut self, peer0: PeerId, peer1: PeerId) {
assert_ne!(peer0, peer1);
if !self.contains_edge(&peer0, &peer1) {
self.add_directed_edge(peer0.clone(), peer1.clone());
self.add_directed_edge(peer1, peer0);
let id0 = self.get_id(&peer0);
let id1 = self.get_id(&peer1);

self.adjacency[id0 as usize].push(id1);
self.adjacency[id1 as usize].push(id0);

self.total_active_edges += 1;
}
}

pub fn remove_edge(&mut self, peer0: &PeerId, peer1: &PeerId) {
assert_ne!(peer0, peer1);
if self.contains_edge(&peer0, &peer1) {
self.remove_directed_edge(&peer0, &peer1);
self.remove_directed_edge(&peer1, &peer0);
let id0 = self.get_id(&peer0);
let id1 = self.get_id(&peer1);

self.adjacency[id0 as usize].retain(|&x| x != id1);
self.adjacency[id1 as usize].retain(|&x| x != id0);

self.remove_if_unused(id0);
self.remove_if_unused(id1);

self.total_active_edges -= 1;
}
}

// TODO(MarX, #1363): This is too slow right now. (See benchmarks)
/// Compute for every node `u` on the graph (other than `source`) which are the neighbors of
/// `sources` which belong to the shortest path from `source` to `u`. Nodes that are
/// not connected to `source` will not appear in the result.
pub fn calculate_distance(&self) -> HashMap<PeerId, HashSet<PeerId>> {
let mut queue = vec![];
let mut distance = HashMap::new();
// TODO(MarX, #1363): Represent routes more efficiently at least while calculating distances
let mut routes: HashMap<PeerId, HashSet<PeerId>> = HashMap::new();

distance.insert(&self.source, 0);

// Add active connections
if let Some(neighbors) = self.adjacency.get(&self.source) {
for neighbor in neighbors {
queue.push(neighbor);
distance.insert(neighbor, 1);
routes.insert(neighbor.clone(), vec![neighbor.clone()].drain(..).collect());
}
}
pub fn calculate_distance(&self) -> HashMap<PeerId, Vec<PeerId>> {
// TODO add removal of unreachable nodes

let mut head = 0;
let mut queue = VecDeque::new();

while head < queue.len() {
let cur_peer = queue[head];
let cur_distance = *distance.get(cur_peer).unwrap();
head += 1;
let nodes = self.id2p.len();
let mut distance: Vec<i32> = vec![-1; nodes];
let mut routes: Vec<u128> = vec![0; nodes];

if let Some(neighbors) = self.adjacency.get(&cur_peer) {
for neighbor in neighbors {
if let Entry::Vacant(entry) = distance.entry(neighbor) {
queue.push(entry.key());
entry.insert(cur_distance + 1);
routes.insert(neighbor.clone(), HashSet::new());
}
distance[self.source_id as usize] = 0;

// If this edge belong to a shortest path, all paths to
// the closer nodes are also valid for the current node.
if *distance.get(neighbor).unwrap() == cur_distance + 1 {
let adding_routes = routes.get(cur_peer).unwrap().clone();
let target_routes = routes.get_mut(neighbor).unwrap();
{
let neighbors = &self.adjacency[self.source_id as usize];
for (id, &neighbor) in neighbors.iter().enumerate().take(MAX_NUM_PEERS) {
queue.push_back(neighbor);
distance[neighbor as usize] = 1;
routes[neighbor as usize] = 1u128 << id;
}
}

for route in adding_routes {
target_routes.insert(route.clone());
}
}
while let Some(cur_peer) = queue.pop_front() {
let cur_distance = distance[cur_peer as usize];

for &neighbor in &self.adjacency[cur_peer as usize] {
if distance[neighbor as usize] == -1 {
distance[neighbor as usize] = cur_distance + 1;
queue.push_back(neighbor);
}
// If this edge belong to a shortest path, all paths to
// the closer nodes are also valid for the current node.
if distance[neighbor as usize] == cur_distance + 1 {
routes[neighbor as usize] |= routes[cur_peer as usize];
}
}
}

routes.into_iter().filter(|(_, hops)| !hops.is_empty()).collect()
self.compute_result(&mut routes, &distance)
}

fn compute_result(&self, routes: &[u128], distance: &[i32]) -> HashMap<PeerId, Vec<PeerId>> {
let mut res = HashMap::with_capacity(routes.len());

let neighbors = &self.adjacency[self.source_id as usize];
let mut unreachable_nodes = 0;

for (key, &cur_route) in routes.iter().enumerate() {
if distance[key] == -1 && self.used[key] {
unreachable_nodes += 1;
}
if key as u32 == self.source_id
|| distance[key] == -1
|| cur_route == 0u128
|| !self.used[key]
{
continue;
}
let mut peer_set: Vec<PeerId> = Vec::with_capacity(cur_route.count_ones() as usize);

for (id, &neighbor) in neighbors.iter().enumerate().take(MAX_NUM_PEERS) {
if (cur_route & (1u128 << id)) != 0 {
peer_set.push(self.id2p[neighbor as usize].clone());
};
}
res.insert(self.id2p[key].clone(), peer_set);
}
if unreachable_nodes > 1000 {
warn!("We store more than 1000 unreachable nodes: {}", unreachable_nodes);
}
res
}
}

Expand Down Expand Up @@ -894,6 +988,8 @@ mod test {

let mut graph = Graph::new(source.clone());
graph.add_edge(source.clone(), node0.clone());
graph.remove_edge(&source, &node0);
graph.add_edge(source.clone(), node0.clone());

assert!(expected_routing_tables(
graph.calculate_distance(),
Expand Down
2 changes: 1 addition & 1 deletion chain/network/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ pub fn random_epoch_id() -> EpochId {
}

pub fn expected_routing_tables(
current: HashMap<PeerId, HashSet<PeerId>>,
current: HashMap<PeerId, Vec<PeerId>>,
expected: Vec<(PeerId, Vec<PeerId>)>,
) -> bool {
if current.len() != expected.len() {
Expand Down