diff --git a/chain/network/src/peer_manager.rs b/chain/network/src/peer_manager.rs index fd7c1a475a7..efb35cd72be 100644 --- a/chain/network/src/peer_manager.rs +++ b/chain/network/src/peer_manager.rs @@ -31,7 +31,7 @@ use crate::peer::Peer; use crate::peer_store::{PeerStore, TrustLevel}; #[cfg(feature = "metric_recorder")] use crate::recorder::{MetricRecorder, PeerMessageMetadata}; -use crate::routing::{Edge, EdgeInfo, EdgeType, ProcessEdgeResult, RoutingTable}; +use crate::routing::{Edge, EdgeInfo, EdgeType, ProcessEdgeResult, RoutingTable, MAX_NUM_PEERS}; use crate::types::{ AccountOrPeerIdOrHash, Ban, BlockedPorts, Consolidate, ConsolidateResponse, FullPeerInfo, InboundTcpConnect, KnownPeerStatus, KnownProducer, NetworkInfo, NetworkViewClientMessages, @@ -161,6 +161,10 @@ impl PeerManagerActor { client_addr: Recipient, view_client_addr: Recipient, ) -> Result> { + if config.max_num_peers as usize > MAX_NUM_PEERS { + panic!("Exceeded max peer limit: {}", MAX_NUM_PEERS); + } + let peer_store = PeerStore::new(store.clone(), &config.boot_nodes)?; debug!(target: "network", "Found known peers: {} (boot nodes={})", peer_store.len(), config.boot_nodes.len()); debug!(target: "network", "Blacklist: {:?}", config.blacklist); diff --git a/chain/network/src/recorder.rs b/chain/network/src/recorder.rs index c6b73f38b97..ca19976d039 100644 --- a/chain/network/src/recorder.rs +++ b/chain/network/src/recorder.rs @@ -109,12 +109,12 @@ impl MetricRecorder { self } - pub fn set_graph(&mut self, graph: &HashMap>) { + pub fn set_graph(&mut self, graph: HashMap>) { self.graph.clear(); - for (u, u_adj) in graph.iter() { + for (u, u_adj) in graph { for v in u_adj { if u < v { - self.graph.push((u.clone(), v.clone())); + self.graph.push((u.clone(), v)); } } } diff --git a/chain/network/src/routing.rs b/chain/network/src/routing.rs index e1d9ffb3a12..e5240b1be55 100644 --- a/chain/network/src/routing.rs +++ b/chain/network/src/routing.rs @@ -1,4 +1,4 @@ -use std::collections::{hash_map::Entry, HashMap, HashSet}; +use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque}; use std::ops::Sub; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -43,6 +43,8 @@ const ROUND_ROBIN_NONCE_CACHE_SIZE: usize = 10_000; /// seconds will be removed from cache and persisted in disk. pub const SAVE_PEERS_MAX_TIME: u64 = 7_200; pub const SAVE_PEERS_AFTER_TIME: u64 = 3_600; +/// Graph implementation supports up to 128 peers. +pub const MAX_NUM_PEERS: usize = 128; /// Information that will be ultimately used to create a new edge. /// It contains nonce proposed for the edge with signature from peer. @@ -257,7 +259,7 @@ pub struct RoutingTable { /// PeerId associated for every known account id. account_peers: SizedCache, /// Active PeerId that are part of the shortest path to each PeerId. - pub peer_forwarding: HashMap>, + pub peer_forwarding: HashMap>, /// Store last update for known edges. pub edges_info: HashMap<(PeerId, PeerId), Edge>, /// Hash of messages that requires routing back to respective previous hop. @@ -425,14 +427,14 @@ impl RoutingTable { ) -> Result, ()> { let enc_nonce = index_to_bytes(nonce); - let result = match self.store.get_ser::>(ColComponentEdges, enc_nonce.as_ref()) { + let res = match self.store.get_ser::>(ColComponentEdges, enc_nonce.as_ref()) { Ok(Some(edges)) => Ok(edges), _ => Err(()), }; update.delete(ColComponentEdges, enc_nonce.as_ref()); - result + res } /// If peer_id is not on memory check if it is on disk in bring it back on memory. @@ -739,8 +741,20 @@ impl RoutingTable { } } - pub fn get_raw_graph(&self) -> &HashMap> { - &self.raw_graph.adjacency + #[cfg(feature = "metric_recorder")] + pub fn get_raw_graph(&self) -> HashMap> { + let mut res = HashMap::with_capacity(self.raw_graph.adjacency.len()); + for (key, neighbors) in self.raw_graph.adjacency.iter().enumerate() { + if self.raw_graph.used[key] { + let key = self.raw_graph.id2p[key].clone(); + let neighbors = neighbors + .iter() + .map(|&node| self.raw_graph.id2p[node as usize].clone()) + .collect::>(); + res.insert(key, neighbors); + } + } + res } } @@ -752,106 +766,186 @@ pub struct ProcessEdgeResult { #[derive(Debug)] pub struct RoutingTableInfo { pub account_peers: HashMap, - pub peer_forwarding: HashMap>, + pub peer_forwarding: HashMap>, } #[derive(Clone)] pub struct Graph { pub source: PeerId, - adjacency: HashMap>, + source_id: u32, + p2id: HashMap, + id2p: Vec, + used: Vec, + unused: Vec, + adjacency: Vec>, + total_active_edges: u64, } impl Graph { pub fn new(source: PeerId) -> Self { - Self { source, adjacency: HashMap::new(), total_active_edges: 0 } + let mut res = Self { + source: source.clone(), + source_id: 0, + p2id: HashMap::default(), + id2p: Vec::default(), + used: Vec::default(), + unused: Vec::default(), + adjacency: Vec::default(), + total_active_edges: 0, + }; + res.id2p.push(source.clone()); + res.adjacency.push(Vec::default()); + res.p2id.insert(source, res.source_id); + res.used.push(true); + + res } - fn contains_edge(&mut self, peer0: &PeerId, peer1: &PeerId) -> bool { - if let Some(adj) = self.adjacency.get(&peer0) { - if adj.contains(&peer1) { - return true; + fn contains_edge(&self, peer0: &PeerId, peer1: &PeerId) -> bool { + if let Some(&id0) = self.p2id.get(&peer0) { + if let Some(&id1) = self.p2id.get(&peer1) { + return self.adjacency[id0 as usize].contains(&id1); } } - false } - fn add_directed_edge(&mut self, peer0: PeerId, peer1: PeerId) { - self.adjacency.entry(peer0).or_insert_with(HashSet::new).insert(peer1); + fn remove_if_unused(&mut self, id: u32) { + let entry = &self.adjacency[id as usize]; + + if entry.is_empty() && id != self.source_id { + self.used[id as usize] = false; + self.unused.push(id); + self.p2id.remove(&self.id2p[id as usize]); + } } - fn remove_directed_edge(&mut self, peer0: &PeerId, peer1: &PeerId) { - self.adjacency.get_mut(&peer0).unwrap().remove(&peer1); + fn get_id(&mut self, peer: &PeerId) -> u32 { + match self.p2id.entry(peer.clone()) { + Entry::Occupied(occupied) => *occupied.get(), + Entry::Vacant(vacant) => { + let val = if let Some(val) = self.unused.pop() { + assert!(!self.used[val as usize]); + assert!(self.adjacency[val as usize].is_empty()); + self.id2p[val as usize] = peer.clone(); + self.used[val as usize] = true; + val + } else { + let val = self.id2p.len() as u32; + self.id2p.push(peer.clone()); + self.used.push(true); + self.adjacency.push(Vec::default()); + val + }; + + vacant.insert(val); + val + } + } } pub fn add_edge(&mut self, peer0: PeerId, peer1: PeerId) { + assert_ne!(peer0, peer1); if !self.contains_edge(&peer0, &peer1) { - self.add_directed_edge(peer0.clone(), peer1.clone()); - self.add_directed_edge(peer1, peer0); + let id0 = self.get_id(&peer0); + let id1 = self.get_id(&peer1); + + self.adjacency[id0 as usize].push(id1); + self.adjacency[id1 as usize].push(id0); + self.total_active_edges += 1; } } pub fn remove_edge(&mut self, peer0: &PeerId, peer1: &PeerId) { + assert_ne!(peer0, peer1); if self.contains_edge(&peer0, &peer1) { - self.remove_directed_edge(&peer0, &peer1); - self.remove_directed_edge(&peer1, &peer0); + let id0 = self.get_id(&peer0); + let id1 = self.get_id(&peer1); + + self.adjacency[id0 as usize].retain(|&x| x != id1); + self.adjacency[id1 as usize].retain(|&x| x != id0); + + self.remove_if_unused(id0); + self.remove_if_unused(id1); + self.total_active_edges -= 1; } } - // TODO(MarX, #1363): This is too slow right now. (See benchmarks) /// Compute for every node `u` on the graph (other than `source`) which are the neighbors of /// `sources` which belong to the shortest path from `source` to `u`. Nodes that are /// not connected to `source` will not appear in the result. - pub fn calculate_distance(&self) -> HashMap> { - let mut queue = vec![]; - let mut distance = HashMap::new(); - // TODO(MarX, #1363): Represent routes more efficiently at least while calculating distances - let mut routes: HashMap> = HashMap::new(); - - distance.insert(&self.source, 0); - - // Add active connections - if let Some(neighbors) = self.adjacency.get(&self.source) { - for neighbor in neighbors { - queue.push(neighbor); - distance.insert(neighbor, 1); - routes.insert(neighbor.clone(), vec![neighbor.clone()].drain(..).collect()); - } - } + pub fn calculate_distance(&self) -> HashMap> { + // TODO add removal of unreachable nodes - let mut head = 0; + let mut queue = VecDeque::new(); - while head < queue.len() { - let cur_peer = queue[head]; - let cur_distance = *distance.get(cur_peer).unwrap(); - head += 1; + let nodes = self.id2p.len(); + let mut distance: Vec = vec![-1; nodes]; + let mut routes: Vec = vec![0; nodes]; - if let Some(neighbors) = self.adjacency.get(&cur_peer) { - for neighbor in neighbors { - if let Entry::Vacant(entry) = distance.entry(neighbor) { - queue.push(entry.key()); - entry.insert(cur_distance + 1); - routes.insert(neighbor.clone(), HashSet::new()); - } + distance[self.source_id as usize] = 0; - // If this edge belong to a shortest path, all paths to - // the closer nodes are also valid for the current node. - if *distance.get(neighbor).unwrap() == cur_distance + 1 { - let adding_routes = routes.get(cur_peer).unwrap().clone(); - let target_routes = routes.get_mut(neighbor).unwrap(); + { + let neighbors = &self.adjacency[self.source_id as usize]; + for (id, &neighbor) in neighbors.iter().enumerate().take(MAX_NUM_PEERS) { + queue.push_back(neighbor); + distance[neighbor as usize] = 1; + routes[neighbor as usize] = 1u128 << id; + } + } - for route in adding_routes { - target_routes.insert(route.clone()); - } - } + while let Some(cur_peer) = queue.pop_front() { + let cur_distance = distance[cur_peer as usize]; + + for &neighbor in &self.adjacency[cur_peer as usize] { + if distance[neighbor as usize] == -1 { + distance[neighbor as usize] = cur_distance + 1; + queue.push_back(neighbor); + } + // If this edge belong to a shortest path, all paths to + // the closer nodes are also valid for the current node. + if distance[neighbor as usize] == cur_distance + 1 { + routes[neighbor as usize] |= routes[cur_peer as usize]; } } } - routes.into_iter().filter(|(_, hops)| !hops.is_empty()).collect() + self.compute_result(&mut routes, &distance) + } + + fn compute_result(&self, routes: &[u128], distance: &[i32]) -> HashMap> { + let mut res = HashMap::with_capacity(routes.len()); + + let neighbors = &self.adjacency[self.source_id as usize]; + let mut unreachable_nodes = 0; + + for (key, &cur_route) in routes.iter().enumerate() { + if distance[key] == -1 && self.used[key] { + unreachable_nodes += 1; + } + if key as u32 == self.source_id + || distance[key] == -1 + || cur_route == 0u128 + || !self.used[key] + { + continue; + } + let mut peer_set: Vec = Vec::with_capacity(cur_route.count_ones() as usize); + + for (id, &neighbor) in neighbors.iter().enumerate().take(MAX_NUM_PEERS) { + if (cur_route & (1u128 << id)) != 0 { + peer_set.push(self.id2p[neighbor as usize].clone()); + }; + } + res.insert(self.id2p[key].clone(), peer_set); + } + if unreachable_nodes > 1000 { + warn!("We store more than 1000 unreachable nodes: {}", unreachable_nodes); + } + res } } @@ -894,6 +988,8 @@ mod test { let mut graph = Graph::new(source.clone()); graph.add_edge(source.clone(), node0.clone()); + graph.remove_edge(&source, &node0); + graph.add_edge(source.clone(), node0.clone()); assert!(expected_routing_tables( graph.calculate_distance(), diff --git a/chain/network/src/test_utils.rs b/chain/network/src/test_utils.rs index 617ff48e181..1ad8e4dcb93 100644 --- a/chain/network/src/test_utils.rs +++ b/chain/network/src/test_utils.rs @@ -193,7 +193,7 @@ pub fn random_epoch_id() -> EpochId { } pub fn expected_routing_tables( - current: HashMap>, + current: HashMap>, expected: Vec<(PeerId, Vec)>, ) -> bool { if current.len() != expected.len() {