diff --git a/examples/bridge/src/bin/indexer.rs b/examples/bridge/src/bin/indexer.rs index 7775b21812..0073574837 100644 --- a/examples/bridge/src/bin/indexer.rs +++ b/examples/bridge/src/bin/indexer.rs @@ -241,8 +241,14 @@ fn main() { let (peer, mut sender, mut receiver) = match listen( context.with_label("listener"), |peer| { - let out = validators.position(&peer).is_some(); - async move { out } + let valid = validators.position(&peer).is_some(); + async move { + if valid { + Ok(()) + } else { + Err(()) + } + } }, config.clone(), stream, diff --git a/p2p/src/authenticated/discovery/actors/listener.rs b/p2p/src/authenticated/discovery/actors/listener.rs index 60cc5c67cb..4c1ed39858 100644 --- a/p2p/src/authenticated/discovery/actors/listener.rs +++ b/p2p/src/authenticated/discovery/actors/listener.rs @@ -11,7 +11,7 @@ use commonware_runtime::{ spawn_cell, Clock, ContextCell, Handle, KeyedRateLimiter, Listener, Metrics, Network, Quota, SinkOf, Spawner, StreamOf, }; -use commonware_stream::{listen, Config as StreamConfig}; +use commonware_stream::{listen, Config as StreamConfig, Error as StreamError}; use commonware_utils::{concurrency::Limiter, net::SubnetMask, IpAddrExt}; use prometheus_client::metrics::counter::Counter; use rand::{CryptoRng, Rng}; @@ -105,7 +105,17 @@ impl Actor< ) { let (peer, send, recv) = match listen( context, - |peer| tracker.acceptable(peer), + |peer| { + let fut = tracker.acceptable(peer); + async move { + let status = fut.await; + if status == tracker::Acceptable::Yes { + Ok(()) + } else { + Err(status) + } + } + }, stream_cfg, stream, sink, @@ -113,6 +123,20 @@ impl Actor< .await { Ok(x) => x, + Err(StreamError::PeerRejected(reason)) => { + match reason { + tracker::Acceptable::Blocked => { + debug!(?address, "peer is blocked"); + } + tracker::Acceptable::Unknown => { + debug!(?address, "peer unknown (not in peer set)"); + } + tracker::Acceptable::Rejected | tracker::Acceptable::Yes => { + debug!(?address, "peer rejected"); + } + } + return; + } Err(err) => { debug!(?err, "failed to complete handshake"); return; @@ -300,7 +324,7 @@ mod tests { while let Some(message) = tracker_rx.next().await { match message { tracker::Message::Acceptable { responder, .. } => { - let _ = responder.send(true); + let _ = responder.send(tracker::Acceptable::Yes); } tracker::Message::Listen { reservation, .. } => { let _ = reservation.send(None); @@ -443,7 +467,7 @@ mod tests { while let Some(message) = tracker_rx.next().await { match message { tracker::Message::Acceptable { responder, .. } => { - let _ = responder.send(true); + let _ = responder.send(tracker::Acceptable::Yes); } tracker::Message::Listen { reservation, .. } => { let _ = reservation.send(None); diff --git a/p2p/src/authenticated/discovery/actors/peer/actor.rs b/p2p/src/authenticated/discovery/actors/peer/actor.rs index 78b781025c..9c9d1cb226 100644 --- a/p2p/src/authenticated/discovery/actors/peer/actor.rs +++ b/p2p/src/authenticated/discovery/actors/peer/actor.rs @@ -427,7 +427,7 @@ mod tests { move |ctx| async move { commonware_stream::listen( ctx, - |_| async { true }, + |_| async { Ok::<(), ()>(()) }, remote_config, remote_stream, remote_sink, @@ -526,7 +526,7 @@ mod tests { move |ctx| async move { commonware_stream::listen( ctx, - |_| async { true }, + |_| async { Ok::<(), ()>(()) }, remote_config, remote_stream, remote_sink, @@ -631,7 +631,7 @@ mod tests { move |ctx| async move { commonware_stream::listen( ctx, - |_| async { true }, + |_| async { Ok::<(), ()>(()) }, remote_config, remote_stream, remote_sink, diff --git a/p2p/src/authenticated/discovery/actors/tracker/actor.rs b/p2p/src/authenticated/discovery/actors/tracker/actor.rs index b069922f8d..72c7bcb199 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/actor.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/actor.rs @@ -79,6 +79,7 @@ impl Actor { max_sets: cfg.tracked_peer_sets, dial_fail_limit: cfg.dial_fail_limit, rate_limit: cfg.allowed_connection_rate_per_peer, + block_duration: cfg.block_duration, }; // Create the mailboxes @@ -278,7 +279,10 @@ mod tests { use crate::{ authenticated::{ discovery::{ - actors::{peer, tracker}, + actors::{ + peer, + tracker::{self, Acceptable}, + }, config::Bootstrapper, types, }, @@ -319,6 +323,7 @@ mod tests { peer_gossip_max_count: 5, max_peer_set_size: 128, dial_fail_limit: 1, + block_duration: Duration::from_secs(60), } } @@ -795,9 +800,18 @@ mod tests { } = setup_actor(context.clone(), cfg_initial); // None listenable because not registered - assert!(!mailbox.acceptable(peer_pk.clone()).await); - assert!(!mailbox.acceptable(peer_pk2.clone()).await); - assert!(!mailbox.acceptable(peer_pk3.clone()).await); + assert_eq!( + mailbox.acceptable(peer_pk.clone()).await, + Acceptable::Unknown + ); + assert_eq!( + mailbox.acceptable(peer_pk2.clone()).await, + Acceptable::Unknown + ); + assert_eq!( + mailbox.acceptable(peer_pk3.clone()).await, + Acceptable::Unknown + ); oracle .update(0, [peer_pk.clone(), peer_pk2.clone()].try_into().unwrap()) @@ -805,11 +819,11 @@ mod tests { context.sleep(Duration::from_millis(10)).await; // Not listenable because self - assert!(!mailbox.acceptable(peer_pk).await); + assert_eq!(mailbox.acceptable(peer_pk).await, Acceptable::Rejected); // Listenable because registered - assert!(mailbox.acceptable(peer_pk2).await); + assert_eq!(mailbox.acceptable(peer_pk2).await, Acceptable::Yes); // Not listenable because not registered - assert!(!mailbox.acceptable(peer_pk3).await); + assert_eq!(mailbox.acceptable(peer_pk3).await, Acceptable::Unknown); }); } @@ -834,12 +848,15 @@ mod tests { .await; context.sleep(Duration::from_millis(10)).await; // Allow register to process - assert!(mailbox.acceptable(peer_pk.clone()).await); + assert_eq!(mailbox.acceptable(peer_pk.clone()).await, Acceptable::Yes); let reservation = mailbox.listen(peer_pk.clone()).await; assert!(reservation.is_some()); - assert!(!mailbox.acceptable(peer_pk.clone()).await); + assert_eq!( + mailbox.acceptable(peer_pk.clone()).await, + Acceptable::Rejected + ); let failed_reservation = mailbox.listen(peer_pk.clone()).await; assert!(failed_reservation.is_none()); diff --git a/p2p/src/authenticated/discovery/actors/tracker/directory.rs b/p2p/src/authenticated/discovery/actors/tracker/directory.rs index dc49a083c6..3dd197408c 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/directory.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/directory.rs @@ -1,4 +1,6 @@ -use super::{metrics::Metrics, record::Record, set::Set, Metadata, Reservation}; +use super::{ + ingress::Acceptable, metrics::Metrics, record::Record, set::Set, Metadata, Reservation, +}; use crate::{ authenticated::discovery::{ actors::tracker::ingress::Releaser, @@ -17,6 +19,7 @@ use rand::{seq::IteratorRandom, Rng}; use std::{ collections::{BTreeMap, HashMap}, ops::Deref, + time::Duration, }; use tracing::{debug, warn}; @@ -37,6 +40,9 @@ pub struct Config { /// The rate limit for allowing reservations per-peer. pub rate_limit: Quota, + + /// Duration after which blocked peers are automatically unblocked. + pub block_duration: Duration, } /// Represents a collection of records for all peers. @@ -57,6 +63,9 @@ pub struct Directory { /// peers for its peer info again. dial_fail_limit: usize, + /// Duration after which blocked peers are automatically unblocked. + block_duration: Duration, + // ---------- State ---------- /// The records of all peers. peers: HashMap>, @@ -107,6 +116,7 @@ impl Directory { allow_dns: cfg.allow_dns, max_sets: cfg.max_sets, dial_fail_limit: cfg.dial_fail_limit, + block_duration: cfg.block_duration, peers, sets: BTreeMap::new(), rate_limiter, @@ -131,7 +141,8 @@ impl Directory { record.dial_failure(ingress); } - let want = record.want(self.dial_fail_limit); + let now = self.context.current(); + let want = record.want(now, self.dial_fail_limit); for set in self.sets.values_mut() { set.update(peer, !want); } @@ -152,7 +163,8 @@ impl Directory { record.connect(); // We may have to update the sets. - let want = record.want(self.dial_fail_limit); + let now = self.context.current(); + let want = record.want(now, self.dial_fail_limit); for set in self.sets.values_mut() { set.update(peer, !want); } @@ -160,6 +172,7 @@ impl Directory { /// Using a list of (already-validated) peer information, update the records. pub fn update_peers(&mut self, infos: Vec>) { + let now = self.context.current(); for info in infos { // Update peer address // @@ -170,7 +183,7 @@ impl Directory { let Some(record) = self.peers.get_mut(&peer) else { continue; }; - if !record.update(info) { + if !record.update(now, info) { continue; } self.metrics @@ -179,7 +192,7 @@ impl Directory { .inc(); // We may have to update the sets. - let want = record.want(self.dial_fail_limit); + let want = record.want(now, self.dial_fail_limit); for set in self.sets.values_mut() { set.update(&peer, !want); } @@ -204,6 +217,7 @@ impl Directory { } // Create and store new peer set + let now = self.context.current(); let mut set = Set::new(peers.clone()); for peer in peers.iter() { let record = self.peers.entry(peer.clone()).or_insert_with(|| { @@ -211,7 +225,7 @@ impl Directory { Record::unknown() }); record.increment(); - set.update(peer, !record.want(self.dial_fail_limit)); + set.update(peer, !record.want(now, self.dial_fail_limit)); } self.sets.insert(index, set); @@ -271,7 +285,15 @@ impl Directory { /// Attempt to block a peer, updating the metrics accordingly. pub fn block(&mut self, peer: &C) { - if self.peers.get_mut(peer).is_some_and(|r| r.block()) { + let now = self.context.current(); + let blocked_until = now + .checked_add(self.block_duration) + .unwrap_or(now + Duration::from_secs(365 * 24 * 60 * 60 * 100)); // ~100 years + if self + .peers + .get_mut(peer) + .is_some_and(|r| r.block(blocked_until)) + { self.metrics.blocked.inc(); } } @@ -337,25 +359,31 @@ impl Directory { /// /// A peer is eligible if it is in a peer set (or is persistent), not blocked, and not ourselves. pub fn eligible(&self, peer: &C) -> bool { - self.peers.get(peer).is_some_and(|r| r.eligible()) + let now = self.context.current(); + self.peers.get(peer).is_some_and(|r| r.eligible(now)) } /// Returns a vector of dialable peers. That is, unconnected peers for which we have an ingress. pub fn dialable(&self) -> Vec { // Collect peers with known addresses + let now = self.context.current(); let mut result: Vec<_> = self .peers .iter() - .filter(|&(_, r)| r.dialable(self.allow_private_ips, self.allow_dns)) + .filter(|&(_, r)| r.dialable(now, self.allow_private_ips, self.allow_dns)) .map(|(peer, _)| peer.clone()) .collect(); result.sort(); result } - /// Returns true if this peer is acceptable (can accept an incoming connection from them). - pub fn acceptable(&self, peer: &C) -> bool { - self.peers.get(peer).is_some_and(|r| r.acceptable()) + /// Returns the acceptance status for a peer. + pub fn acceptable(&self, peer: &C) -> Acceptable { + let now = self.context.current(); + let Some(record) = self.peers.get(peer) else { + return Acceptable::Unknown; + }; + record.acceptable(now) } // --------- Helpers ---------- @@ -387,7 +415,8 @@ impl Directory { } // Reserve - if record.reserve() { + let now = self.context.current(); + if record.reserve(now) { self.metrics.reserved.inc(); return Some(Reservation::new(metadata, self.releaser.clone())); } @@ -404,7 +433,8 @@ impl Directory { if !record.deletable() { return false; } - if record.blocked() { + let now = self.context.current(); + if record.blocked(now) { self.metrics.blocked.dec(); } self.peers.remove(peer); diff --git a/p2p/src/authenticated/discovery/actors/tracker/ingress.rs b/p2p/src/authenticated/discovery/actors/tracker/ingress.rs index 2759b86c62..9b261f3b45 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/ingress.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/ingress.rs @@ -11,6 +11,19 @@ use commonware_cryptography::PublicKey; use commonware_utils::ordered::Set; use futures::channel::{mpsc, oneshot}; +/// Result of checking if a peer is acceptable for connection. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Acceptable { + /// Peer is acceptable for connection. + Yes, + /// Peer is blocked. + Blocked, + /// Peer is not in any peer set. + Unknown, + /// Peer is known but rejected (already connected, reserved, ourselves, etc.). + Rejected, +} + /// Messages that can be sent to the tracker actor. #[derive(Debug)] pub enum Message { @@ -110,8 +123,8 @@ pub enum Message { /// The public key of the peer to check. public_key: C, - /// The sender to respond with whether the peer is acceptable. - responder: oneshot::Sender, + /// The sender to respond with the acceptance status. + responder: oneshot::Sender, }, /// Request a reservation for a particular peer. @@ -185,14 +198,14 @@ impl UnboundedMailbox> { } /// Send an `Acceptable` message to the tracker. - pub async fn acceptable(&mut self, public_key: C) -> bool { + pub fn acceptable(&mut self, public_key: C) -> impl std::future::Future { let (tx, rx) = oneshot::channel(); self.send(Message::Acceptable { public_key, responder: tx, }) .unwrap(); - rx.await.unwrap() + async move { rx.await.unwrap() } } /// Send a `Listen` message to the tracker. diff --git a/p2p/src/authenticated/discovery/actors/tracker/mod.rs b/p2p/src/authenticated/discovery/actors/tracker/mod.rs index 4f069d558c..a5e8a0397d 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/mod.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/mod.rs @@ -15,7 +15,7 @@ mod reservation; mod set; pub use actor::Actor; -pub use ingress::{Message, Oracle}; +pub use ingress::{Acceptable, Message, Oracle}; pub use metadata::Metadata; pub use reservation::Reservation; @@ -31,6 +31,7 @@ pub struct Config { pub tracked_peer_sets: usize, pub max_peer_set_size: u64, pub allowed_connection_rate_per_peer: Quota, + pub block_duration: Duration, pub peer_gossip_max_count: usize, pub dial_fail_limit: usize, } diff --git a/p2p/src/authenticated/discovery/actors/tracker/record.rs b/p2p/src/authenticated/discovery/actors/tracker/record.rs index 24cdaacf20..581510a006 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/record.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/record.rs @@ -1,5 +1,7 @@ +use super::ingress::Acceptable; use crate::{authenticated::discovery::types::Info, Ingress}; use commonware_cryptography::PublicKey; +use std::time::SystemTime; use tracing::trace; /// Represents information known about a peer's address. @@ -21,9 +23,9 @@ pub enum Address { /// The `usize` indicates the number of times dialing this record has failed. Discovered(Info, usize), - /// Peer is blocked. - /// We don't care to track its information. - Blocked, + /// Peer is blocked until the given time. + /// The block expires after the specified time, allowing the peer to reconnect. + Blocked(SystemTime), } /// Represents the connection status of a peer. @@ -96,11 +98,11 @@ impl Record { /// Attempt to update the [Info] of a discovered peer. /// /// Returns true if the update was successful. - pub fn update(&mut self, info: Info) -> bool { + pub fn update(&mut self, now: SystemTime, info: Info) -> bool { match &self.address { Address::Myself(_) => false, - Address::Blocked => false, - Address::Unknown | Address::Bootstrapper(_) => { + Address::Blocked(until) if now < *until => false, + Address::Blocked(_) | Address::Unknown | Address::Bootstrapper(_) => { self.address = Address::Discovered(info, 0); true } @@ -124,17 +126,26 @@ impl Record { } } - /// Attempt to mark the peer as blocked. + /// Attempt to mark the peer as blocked until the specified time. /// - /// Returns `true` if the peer was newly blocked. + /// If the peer is already blocked, extends the block to the later of the two times. + /// Returns `true` if the peer was newly blocked (not already blocked). /// Returns `false` if the peer was already blocked or is the local node (unblockable). - pub fn block(&mut self) -> bool { - if matches!(self.address, Address::Blocked | Address::Myself(_)) { - return false; + pub fn block(&mut self, blocked_until: SystemTime) -> bool { + match &self.address { + Address::Myself(_) => false, + Address::Blocked(existing_until) => { + if blocked_until > *existing_until { + self.address = Address::Blocked(blocked_until); + } + false + } + Address::Unknown | Address::Bootstrapper(_) | Address::Discovered(_, _) => { + self.address = Address::Blocked(blocked_until); + self.persistent = false; + true + } } - self.address = Address::Blocked; - self.persistent = false; - true } /// Increase the count of peer sets this peer is part of. @@ -154,9 +165,11 @@ impl Record { /// Attempt to reserve the peer for connection. /// /// Returns `true` if the reservation was successful, `false` otherwise. - pub const fn reserve(&mut self) -> bool { - if matches!(self.address, Address::Blocked | Address::Myself(_)) { - return false; + pub fn reserve(&mut self, now: SystemTime) -> bool { + match &self.address { + Address::Blocked(until) if now < *until => return false, + Address::Myself(_) => return false, + _ => {} } if matches!(self.status, Status::Inert) { self.status = Status::Reserved; @@ -202,9 +215,9 @@ impl Record { // ---------- Getters ---------- - /// Returns `true` if the record is blocked. - pub const fn blocked(&self) -> bool { - matches!(self.address, Address::Blocked) + /// Returns `true` if the record is blocked (and block hasn't expired). + pub fn blocked(&self, now: SystemTime) -> bool { + matches!(&self.address, Address::Blocked(until) if now < *until) } /// Returns the number of peer sets this peer is part of. @@ -216,16 +229,18 @@ impl Record { /// /// A record is dialable if: /// - We have the ingress address of the peer - /// - It is not ourselves or blocked + /// - It is not ourselves or blocked (or block has expired) /// - We are not already connected or reserved /// - The ingress address is allowed (DNS enabled, Socket IP is global or private IPs allowed) /// /// Note: For DNS addresses, private IP checks are performed in the dialer after resolution. - pub fn dialable(&self, allow_private_ips: bool, allow_dns: bool) -> bool { + pub fn dialable(&self, now: SystemTime, allow_private_ips: bool, allow_dns: bool) -> bool { if self.status != Status::Inert { return false; } let ingress = match &self.address { + Address::Blocked(until) if now < *until => return false, + Address::Blocked(_) => return false, // Blocked with expired time but no ingress Address::Bootstrapper(ingress) => ingress, Address::Discovered(info, _) => &info.ingress, _ => return false, @@ -233,13 +248,29 @@ impl Record { ingress.is_valid(allow_private_ips, allow_dns) } - /// Returns `true` if this peer is acceptable (can accept an incoming connection from them). + /// Returns the acceptance status for this peer. /// /// A peer is acceptable if: /// - The peer is eligible (in a peer set, not blocked, not ourselves) /// - We are not already connected or reserved - pub fn acceptable(&self) -> bool { - self.eligible() && self.status == Status::Inert + pub fn acceptable(&self, now: SystemTime) -> Acceptable { + // Check if ourselves + if matches!(self.address, Address::Myself(_)) { + return Acceptable::Rejected; + } + // Check if blocked + if self.blocked(now) { + return Acceptable::Blocked; + } + // Check if in a peer set + if self.sets == 0 && !self.persistent { + return Acceptable::Unknown; + } + // Check if already connected or reserved + if self.status != Status::Inert { + return Acceptable::Rejected; + } + Acceptable::Yes } /// Return the ingress address of the peer, if known. @@ -249,7 +280,7 @@ impl Record { Address::Myself(info) => Some(&info.ingress), Address::Bootstrapper(ingress) => Some(ingress), Address::Discovered(info, _) => Some(&info.ingress), - Address::Blocked => None, + Address::Blocked(_) => None, } } @@ -261,7 +292,7 @@ impl Record { Address::Myself(info) => Some(info), Address::Bootstrapper(_) => None, Address::Discovered(info, _) => (self.status == Status::Active).then_some(info), - Address::Blocked => None, + Address::Blocked(_) => None, } .cloned() } @@ -274,21 +305,22 @@ impl Record { /// Returns `true` if we want to ask for updated peer information for this peer. /// - /// - Returns `false` for `Myself` and `Blocked` addresses. + /// - Returns `false` for `Myself` and actively `Blocked` addresses. /// - Returns `true` for addresses for which we don't have peer info. /// - Returns true for addresses for which we do have peer info if-and-only-if we have failed to /// dial at least `min_fails` times. - pub fn want(&self, min_fails: usize) -> bool { + pub fn want(&self, now: SystemTime, min_fails: usize) -> bool { // Ignore how many sets the peer is part of. // If the peer is not in any sets, this function is not called anyway. // Return true if we either: // - Don't have signed peer info // - Are not connected to the peer and have failed dialing it - match self.address { - Address::Myself(_) | Address::Blocked => false, - Address::Unknown | Address::Bootstrapper(_) => true, - Address::Discovered(_, fails) => self.status != Status::Active && fails >= min_fails, + match &self.address { + Address::Myself(_) => false, + Address::Blocked(until) if now < *until => false, + Address::Blocked(_) | Address::Unknown | Address::Bootstrapper(_) => true, + Address::Discovered(_, fails) => self.status != Status::Active && *fails >= min_fails, } } @@ -300,14 +332,16 @@ impl Record { /// Returns `true` if this peer is eligible for connection. /// /// A peer is eligible if: - /// - It is not blocked or ourselves + /// - It is not blocked (or block has expired) or ourselves /// - It is part of at least one peer set (or is persistent, e.g., bootstrapper) - pub const fn eligible(&self) -> bool { - match self.address { - Address::Blocked | Address::Myself(_) => false, - Address::Bootstrapper(_) | Address::Unknown | Address::Discovered(_, _) => { - self.sets > 0 || self.persistent - } + pub fn eligible(&self, now: SystemTime) -> bool { + match &self.address { + Address::Blocked(until) if now < *until => false, + Address::Myself(_) => false, + Address::Blocked(_) + | Address::Bootstrapper(_) + | Address::Unknown + | Address::Discovered(_, _) => self.sets > 0 || self.persistent, } } } @@ -316,10 +350,20 @@ mod tests { use super::*; use crate::authenticated::discovery::types; use commonware_cryptography::secp256r1::standard::{PrivateKey, PublicKey}; - use std::net::SocketAddr; + use std::{net::SocketAddr, time::Duration}; const NAMESPACE: &[u8] = b"test"; + // Helper to get a SystemTime for testing + fn now() -> SystemTime { + SystemTime::UNIX_EPOCH + Duration::from_secs(1000) + } + + // Helper to get a block expiry time in the future + fn future_block_time() -> SystemTime { + now() + Duration::from_secs(3600) + } + // Helper function to create signed peer info for testing fn create_peer_info( signer_seed: u64, @@ -369,11 +413,11 @@ mod tests { assert!(!record.persistent); assert!(record.ingress().is_none()); assert!(record.sharable().is_none()); - assert!(!record.blocked()); + assert!(!record.blocked(now())); assert!(!record.reserved()); - assert!(record.want(0), "Should want info for unknown peer"); + assert!(record.want(now(), 0), "Should want info for unknown peer"); assert!(record.deletable()); - assert!(!record.eligible()); + assert!(!record.eligible(now())); } #[test] @@ -391,11 +435,11 @@ mod tests { record.sharable().as_ref(), &my_info )); - assert!(!record.blocked()); + assert!(!record.blocked(now())); assert!(!record.reserved()); - assert!(!record.want(0), "Should not want info for myself"); + assert!(!record.want(now(), 0), "Should not want info for myself"); assert!(!record.deletable()); - assert!(!record.eligible()); + assert!(!record.eligible(now())); } #[test] @@ -409,11 +453,11 @@ mod tests { assert!(record.persistent); assert_eq!(record.ingress(), Some(&ingress)); assert!(record.sharable().is_none()); - assert!(!record.blocked()); + assert!(!record.blocked(now())); assert!(!record.reserved()); - assert!(record.want(0), "Should want info for bootstrapper"); + assert!(record.want(now(), 0), "Should want info for bootstrapper"); assert!(!record.deletable()); - assert!(record.eligible()); + assert!(record.eligible(now())); } #[test] @@ -422,7 +466,7 @@ mod tests { let mut record = Record::::unknown(); let peer_info = create_peer_info::(1, socket, 1000); - assert!(record.update(peer_info.clone())); + assert!(record.update(now(), peer_info.clone())); assert_eq!(record.ingress(), Some(&peer_info.ingress)); assert!( matches!(&record.address, Address::Discovered(info, 0) if peer_info_contents_are_equal(info, &peer_info)), @@ -439,7 +483,7 @@ mod tests { let peer_info = create_peer_info::(2, socket, 1000); assert!(record.persistent, "Should start as persistent"); - assert!(record.update(peer_info.clone())); + assert!(record.update(now(), peer_info.clone())); assert_eq!(record.ingress(), Some(&peer_info.ingress)); assert!( matches!(&record.address, Address::Discovered(info, 0) if peer_info_contents_are_equal(info, &peer_info)), @@ -456,8 +500,8 @@ mod tests { let peer_info_old = create_peer_info::(3, socket, 1000); let peer_info_new = create_peer_info::(3, socket, 2000); - assert!(record.update(peer_info_old)); - assert!(record.update(peer_info_new.clone())); + assert!(record.update(now(), peer_info_old)); + assert!(record.update(now(), peer_info_new.clone())); assert_eq!(record.ingress(), Some(&peer_info_new.ingress)); assert!( @@ -474,15 +518,15 @@ mod tests { let peer_info_older = create_peer_info::(5, socket, 500); let peer_info_equal = create_peer_info::(5, socket, 1000); - assert!(record.update(peer_info_current.clone())); + assert!(record.update(now(), peer_info_current.clone())); - assert!(!record.update(peer_info_older)); + assert!(!record.update(now(), peer_info_older)); assert!( matches!(&record.address, Address::Discovered(info, 0) if peer_info_contents_are_equal(info, &peer_info_current)), "Address should not update with older info" ); - assert!(!record.update(peer_info_equal)); + assert!(!record.update(now(), peer_info_equal)); assert!( matches!(&record.address, Address::Discovered(info, 0) if peer_info_contents_are_equal(info, &peer_info_current)), "Address should not update with equal timestamp info" @@ -497,8 +541,8 @@ mod tests { let newer_my_info = create_peer_info::(0, test_socket(), 300); // Cannot update Myself record with other info or newer self info - assert!(!record_myself.update(other_info.clone())); - assert!(!record_myself.update(newer_my_info)); + assert!(!record_myself.update(now(), other_info.clone())); + assert!(!record_myself.update(now(), newer_my_info)); assert!( matches!(&record_myself.address, Address::Myself(info) if peer_info_contents_are_equal(info, &my_info)), "Myself record should remain unchanged" @@ -506,9 +550,9 @@ mod tests { // Cannot update a Blocked record let mut record_blocked = Record::::unknown(); - assert!(record_blocked.block()); - assert!(!record_blocked.update(other_info)); - assert!(matches!(record_blocked.address, Address::Blocked)); + assert!(record_blocked.block(future_block_time())); + assert!(!record_blocked.update(now(), other_info)); + assert!(matches!(record_blocked.address, Address::Blocked(_))); } #[test] @@ -521,14 +565,14 @@ mod tests { let peer_info_pk1_ts1000 = create_peer_info::(10, socket, 1000); let peer_info_pk2_ts2000 = create_peer_info::(11, socket, 2000); - assert!(record.update(peer_info_pk1_ts1000.clone())); + assert!(record.update(now(), peer_info_pk1_ts1000.clone())); assert!( matches!(&record.address, Address::Discovered(info, 0) if peer_info_contents_are_equal(info, &peer_info_pk1_ts1000)) ); // Update should succeed based on newer timestamp, even if PK differs (though context matters) assert!( - record.update(peer_info_pk2_ts2000.clone()), + record.update(now(), peer_info_pk2_ts2000.clone()), "Update should succeed based on newer timestamp" ); assert!( @@ -549,7 +593,7 @@ mod tests { // Test Discovered (not persistent) let peer_info = create_peer_info::(7, test_socket(), 1000); let mut record_disc = Record::::unknown(); - assert!(record_disc.update(peer_info)); + assert!(record_disc.update(now(), peer_info)); assert!(record_disc.deletable()); record_disc.increment(); // sets = 1 assert!(!record_disc.deletable()); @@ -589,37 +633,37 @@ mod tests { // Block an Unknown record let mut record_unknown = Record::::unknown(); assert!(!record_unknown.persistent); - assert!(record_unknown.block()); // Newly blocked - assert!(record_unknown.blocked()); - assert!(matches!(record_unknown.address, Address::Blocked)); + assert!(record_unknown.block(future_block_time())); // Newly blocked + assert!(record_unknown.blocked(now())); + assert!(matches!(record_unknown.address, Address::Blocked(_))); assert_eq!(record_unknown.status, Status::Inert); assert!(!record_unknown.persistent, "Blocking sets persistent=false"); - assert!(!record_unknown.block()); // Already blocked + assert!(!record_unknown.block(future_block_time())); // Already blocked // Block a Bootstrapper record (initially persistent) let mut record_boot = Record::::bootstrapper(test_socket()); assert!(record_boot.persistent); - assert!(record_boot.block()); - assert!(record_boot.blocked()); - assert!(matches!(record_boot.address, Address::Blocked)); + assert!(record_boot.block(future_block_time())); + assert!(record_boot.blocked(now())); + assert!(matches!(record_boot.address, Address::Blocked(_))); assert!(!record_boot.persistent, "Blocking sets persistent=false"); // Block a Discovered record (initially not persistent) let mut record_disc = Record::::unknown(); - assert!(record_disc.update(sample_peer_info.clone())); + assert!(record_disc.update(now(), sample_peer_info.clone())); assert!(!record_disc.persistent); - assert!(record_disc.block()); - assert!(record_disc.blocked()); - assert!(matches!(record_disc.address, Address::Blocked)); + assert!(record_disc.block(future_block_time())); + assert!(record_disc.blocked(now())); + assert!(matches!(record_disc.address, Address::Blocked(_))); assert!(!record_disc.persistent); // Block a Discovered record that came from a Bootstrapper (initially persistent) let mut record_disc_from_boot = Record::::bootstrapper(test_socket()); - assert!(record_disc_from_boot.update(sample_peer_info.clone())); + assert!(record_disc_from_boot.update(now(), sample_peer_info.clone())); assert!(record_disc_from_boot.persistent); - assert!(record_disc_from_boot.block()); - assert!(record_disc_from_boot.blocked()); - assert!(matches!(record_disc_from_boot.address, Address::Blocked)); + assert!(record_disc_from_boot.block(future_block_time())); + assert!(record_disc_from_boot.blocked(now())); + assert!(matches!(record_disc_from_boot.address, Address::Blocked(_))); assert!( !record_disc_from_boot.persistent, "Blocking sets persistent=false" @@ -627,16 +671,16 @@ mod tests { // Check status remains unchanged when blocking let mut record_reserved = Record::::unknown(); - assert!(record_reserved.update(sample_peer_info.clone())); - assert!(record_reserved.reserve()); - assert!(record_reserved.block()); + assert!(record_reserved.update(now(), sample_peer_info.clone())); + assert!(record_reserved.reserve(now())); + assert!(record_reserved.block(future_block_time())); assert_eq!(record_reserved.status, Status::Reserved); let mut record_active = Record::::unknown(); - assert!(record_active.update(sample_peer_info)); - assert!(record_active.reserve()); + assert!(record_active.update(now(), sample_peer_info)); + assert!(record_active.reserve(now())); record_active.connect(); - assert!(record_active.block()); + assert!(record_active.block(future_block_time())); assert_eq!(record_active.status, Status::Active); } @@ -644,18 +688,21 @@ mod tests { fn test_block_myself_and_already_blocked() { let my_info = create_peer_info::(0, test_socket(), 100); let mut record_myself = Record::myself(my_info.clone()); - assert!(!record_myself.block(), "Cannot block myself"); + assert!( + !record_myself.block(future_block_time()), + "Cannot block myself" + ); assert!( matches!(&record_myself.address, Address::Myself(info) if peer_info_contents_are_equal(info, &my_info)) ); let mut record_to_be_blocked = Record::::unknown(); - assert!(record_to_be_blocked.block()); + assert!(record_to_be_blocked.block(future_block_time())); assert!( - !record_to_be_blocked.block(), + !record_to_be_blocked.block(future_block_time()), "Cannot block already blocked peer" ); - assert!(matches!(record_to_be_blocked.address, Address::Blocked)); + assert!(matches!(record_to_be_blocked.address, Address::Blocked(_))); } #[test] @@ -663,25 +710,25 @@ mod tests { let mut record = Record::::unknown(); assert_eq!(record.status, Status::Inert); - assert!(record.reserve()); + assert!(record.reserve(now())); assert_eq!(record.status, Status::Reserved); assert!(record.reserved()); - assert!(!record.reserve(), "Cannot re-reserve when Reserved"); + assert!(!record.reserve(now()), "Cannot re-reserve when Reserved"); assert_eq!(record.status, Status::Reserved); record.connect(); assert_eq!(record.status, Status::Active); assert!(record.reserved()); // reserved() is true for Active too - assert!(!record.reserve(), "Cannot reserve when Active"); + assert!(!record.reserve(now()), "Cannot reserve when Active"); assert_eq!(record.status, Status::Active); record.release(); // Release from Active assert_eq!(record.status, Status::Inert); assert!(!record.reserved()); - assert!(record.reserve()); // Reserve again + assert!(record.reserve(now())); // Reserve again assert_eq!(record.status, Status::Reserved); record.release(); // Release from Reserved assert_eq!(record.status, Status::Inert); @@ -698,7 +745,7 @@ mod tests { #[should_panic] fn test_connect_when_active_panics() { let mut record = Record::::unknown(); - assert!(record.reserve()); + assert!(record.reserve(now())); record.connect(); record.connect(); // Should panic } @@ -732,14 +779,14 @@ mod tests { // Blocked: Not sharable let mut record_blocked = Record::::unknown(); - record_blocked.block(); + record_blocked.block(future_block_time()); assert!(record_blocked.sharable().is_none()); // Discovered but not Active: Not sharable let mut record_disc = Record::::unknown(); - assert!(record_disc.update(peer_info_data.clone())); + assert!(record_disc.update(now(), peer_info_data.clone())); assert!(record_disc.sharable().is_none()); // Status Inert - assert!(record_disc.reserve()); + assert!(record_disc.reserve(now())); assert!(record_disc.sharable().is_none()); // Status Reserved // Discovered and Active: Sharable @@ -758,7 +805,7 @@ mod tests { fn test_reserved_status_check() { let mut record = Record::::unknown(); assert!(!record.reserved()); // Inert - assert!(record.reserve()); + assert!(record.reserve(now())); assert!(record.reserved()); // Reserved record.connect(); assert!(record.reserved()); // Active @@ -778,7 +825,7 @@ mod tests { assert!(matches!(record.address, Address::Unknown)); // Discover - assert!(record.update(peer_info)); + assert!(record.update(now(), peer_info)); assert!(matches!(&record.address, Address::Discovered(_, 0))); // Fail dial 1 @@ -817,59 +864,62 @@ mod tests { let min_fails = 2; // Unknown and Bootstrapper always want info - assert!(Record::::unknown().want(min_fails)); - assert!(Record::::bootstrapper(socket).want(min_fails)); + assert!(Record::::unknown().want(now(), min_fails)); + assert!(Record::::bootstrapper(socket).want(now(), min_fails)); // Myself and Blocked never want info - assert!(!Record::myself(peer_info.clone()).want(min_fails)); + assert!(!Record::myself(peer_info.clone()).want(now(), min_fails)); let mut blocked = Record::::unknown(); - blocked.block(); - assert!(!blocked.want(min_fails)); + blocked.block(future_block_time()); + assert!(!blocked.want(now(), min_fails)); let mut record_disc = Record::::unknown(); - assert!(record_disc.update(peer_info)); + assert!(record_disc.update(now(), peer_info)); // Status Inert assert!( - !record_disc.want(min_fails), + !record_disc.want(now(), min_fails), "Should not want when fails=0 < min_fails" ); record_disc.dial_failure(&ingress); // fails = 1 assert!( - !record_disc.want(min_fails), + !record_disc.want(now(), min_fails), "Should not want when fails=1 < min_fails" ); record_disc.dial_failure(&ingress); // fails = 2 assert!( - record_disc.want(min_fails), + record_disc.want(now(), min_fails), "Should want when fails=2 >= min_fails" ); // Status Reserved - assert!(record_disc.reserve()); + assert!(record_disc.reserve(now())); assert!( - record_disc.want(min_fails), + record_disc.want(now(), min_fails), "Should still want when Reserved and fails >= min_fails" ); // Status Active record_disc.connect(); - assert!(!record_disc.want(min_fails), "Should not want when Active"); + assert!( + !record_disc.want(now(), min_fails), + "Should not want when Active" + ); // Status Inert again (after release) record_disc.release(); - assert!(record_disc.want(min_fails)); + assert!(record_disc.want(now(), min_fails)); // Reset failures record_disc.dial_success(); // Reset failures assert!( - !record_disc.want(min_fails), + !record_disc.want(now(), min_fails), "Should not want when Inert and fails=0" ); record_disc.dial_failure(&ingress); // fails = 1 - assert!(!record_disc.want(min_fails)); + assert!(!record_disc.want(now(), min_fails)); record_disc.dial_failure(&ingress); // fails = 2 - assert!(record_disc.want(min_fails)); + assert!(record_disc.want(now(), min_fails)); } #[test] @@ -880,7 +930,7 @@ mod tests { assert!(!Record::myself(peer_info.clone()).deletable()); assert!(!Record::::bootstrapper(test_socket()).deletable()); let mut record_pers = Record::::bootstrapper(test_socket()); - assert!(record_pers.update(peer_info)); + assert!(record_pers.update(now(), peer_info)); assert!(!record_pers.deletable()); // Non-persistent records depend on sets count and status @@ -892,7 +942,7 @@ mod tests { record.increment(); // sets = 1 assert!(!record.deletable()); // sets != 0 - assert!(record.reserve()); // status = Reserved + assert!(record.reserve(now())); // status = Reserved assert!(!record.deletable()); // status != Inert record.connect(); // status = Active @@ -908,7 +958,7 @@ mod tests { let mut record_blocked = Record::::bootstrapper(test_socket()); assert!(record_blocked.persistent); record_blocked.increment(); // sets = 1 - assert!(record_blocked.block()); + assert!(record_blocked.block(future_block_time())); assert!(!record_blocked.persistent); assert!(!record_blocked.deletable()); // sets = 1 record_blocked.decrement(); // sets = 0 @@ -921,28 +971,28 @@ mod tests { // Blocked and Myself are never allowed let mut record_blocked = Record::::unknown(); - record_blocked.block(); - assert!(!record_blocked.eligible()); - assert!(!Record::myself(peer_info.clone()).eligible()); + record_blocked.block(future_block_time()); + assert!(!record_blocked.eligible(now())); + assert!(!Record::myself(peer_info.clone()).eligible(now())); // Persistent records (Bootstrapper, Myself before blocking) are allowed even with sets=0 - assert!(Record::::bootstrapper(test_socket()).eligible()); + assert!(Record::::bootstrapper(test_socket()).eligible(now())); let mut record_pers = Record::::bootstrapper(test_socket()); - assert!(record_pers.update(peer_info.clone())); - assert!(record_pers.eligible()); + assert!(record_pers.update(now(), peer_info.clone())); + assert!(record_pers.eligible(now())); // Non-persistent records (Unknown, Discovered) require sets > 0 let mut record_unknown = Record::::unknown(); - assert!(!record_unknown.eligible()); // sets = 0, !persistent + assert!(!record_unknown.eligible(now())); // sets = 0, !persistent record_unknown.increment(); // sets = 1 - assert!(record_unknown.eligible()); // sets > 0 + assert!(record_unknown.eligible(now())); // sets > 0 record_unknown.decrement(); // sets = 0 - assert!(!record_unknown.eligible()); + assert!(!record_unknown.eligible(now())); let mut record_disc = Record::::unknown(); - assert!(record_disc.update(peer_info)); - assert!(!record_disc.eligible()); // sets = 0, !persistent + assert!(record_disc.update(now(), peer_info)); + assert!(!record_disc.eligible(now())); // sets = 0, !persistent record_disc.increment(); // sets = 1 - assert!(record_disc.eligible()); // sets > 0 + assert!(record_disc.eligible(now())); // sets > 0 } } diff --git a/p2p/src/authenticated/discovery/config.rs b/p2p/src/authenticated/discovery/config.rs index 15ee0a9008..92e641b5a8 100644 --- a/p2p/src/authenticated/discovery/config.rs +++ b/p2p/src/authenticated/discovery/config.rs @@ -67,6 +67,9 @@ pub struct Config { /// Quota for connection attempts per peer (incoming or outgoing). pub allowed_connection_rate_per_peer: Quota, + /// Duration after which blocked peers are automatically unblocked. + pub block_duration: Duration, + /// Maximum number of concurrent handshake attempts allowed. pub max_concurrent_handshakes: NonZeroU32, @@ -148,6 +151,7 @@ impl Config { max_handshake_age: Duration::from_secs(10), handshake_timeout: Duration::from_secs(5), allowed_connection_rate_per_peer: Quota::per_minute(NZU32!(1)), + block_duration: Duration::from_secs(3600), // 1 hour max_concurrent_handshakes: NZU32!(512), allowed_handshake_rate_per_ip: Quota::with_period(Duration::from_secs(5)).unwrap(), // 1 concurrent handshake per IP allowed_handshake_rate_per_subnet: Quota::per_second(NZU32!(64)), @@ -190,6 +194,7 @@ impl Config { max_handshake_age: Duration::from_secs(10), handshake_timeout: Duration::from_secs(5), allowed_connection_rate_per_peer: Quota::per_second(NZU32!(1)), + block_duration: Duration::from_secs(3600), // 1 hour max_concurrent_handshakes: NZU32!(1_024), allowed_handshake_rate_per_ip: Quota::per_second(NZU32!(16)), // 80 concurrent handshakes per IP allowed_handshake_rate_per_subnet: Quota::per_second(NZU32!(128)), @@ -225,6 +230,7 @@ impl Config { max_handshake_age: Duration::from_secs(10), handshake_timeout: Duration::from_secs(5), allowed_connection_rate_per_peer: Quota::per_second(NZU32!(4)), + block_duration: Duration::from_secs(60), // 1 minute for tests max_concurrent_handshakes: NZU32!(1_024), allowed_handshake_rate_per_ip: Quota::per_second(NZU32!(128)), // 640 concurrent handshakes per IP allowed_handshake_rate_per_subnet: Quota::per_second(NZU32!(256)), diff --git a/p2p/src/authenticated/discovery/network.rs b/p2p/src/authenticated/discovery/network.rs index 7d5166eee3..1dabed3b80 100644 --- a/p2p/src/authenticated/discovery/network.rs +++ b/p2p/src/authenticated/discovery/network.rs @@ -66,6 +66,7 @@ impl, - mailbox: mpsc::Receiver>, + blocked_ips: HashMap, + mailbox: mpsc::Receiver, handshakes_blocked: Counter, handshakes_concurrent_rate_limited: Counter, handshakes_ip_rate_limited: Counter, @@ -59,12 +61,16 @@ pub struct Actor Actor { - pub fn new(context: E, cfg: Config, mailbox: mpsc::Receiver>) -> Self { + pub fn new( + context: E, + cfg: Config, + mailbox: mpsc::Receiver, + ) -> Self { // Create metrics let handshakes_blocked = Counter::default(); context.register( "handshakes_blocked", - "number of handshake attempts blocked because the IP was not registered", + "number of handshake attempts blocked because the IP was blocked or not registered", handshakes_blocked.clone(), ); let handshakes_concurrent_rate_limited = Counter::default(); @@ -97,6 +103,7 @@ impl Actor< allowed_handshake_rate_per_ip: cfg.allowed_handshake_rate_per_ip, allowed_handshake_rate_per_subnet: cfg.allowed_handshake_rate_per_subnet, registered_ips: HashSet::new(), + blocked_ips: HashMap::new(), mailbox, handshakes_blocked, handshakes_concurrent_rate_limited, @@ -119,7 +126,17 @@ impl Actor< let source_ip = address.ip(); let (peer, send, recv) = match listen( context, - |peer| tracker.acceptable(peer, source_ip), + |peer| { + let fut = tracker.acceptable(peer, source_ip); + async move { + let status = fut.await; + if status == tracker::Acceptable::Yes { + Ok(()) + } else { + Err(status) + } + } + }, stream_cfg, stream, sink, @@ -127,8 +144,22 @@ impl Actor< .await { Ok(connection) => connection, + Err(StreamError::PeerRejected(reason)) => { + match reason { + tracker::Acceptable::Blocked => { + debug!(?address, "peer is blocked"); + } + tracker::Acceptable::Unknown => { + debug!(?address, "peer unknown (not in peer set)"); + } + tracker::Acceptable::Rejected | tracker::Acceptable::Yes => { + debug!(?address, "peer rejected"); + } + } + return; + } Err(err) => { - debug!(?err, ?address, "failed to upgrade connection"); + debug!(?err, "failed to complete handshake"); return; } }; @@ -185,11 +216,12 @@ impl Actor< debug!("context shutdown, stopping listener"); }, update = self.mailbox.next() => { - let Some(registered_ips) = update else { + let Some(filter) = update else { debug!("mailbox closed"); break; }; - self.registered_ips = registered_ips; + self.registered_ips = filter.registered_ips; + self.blocked_ips = filter.blocked_ips; }, listener = listener.accept() => { // Accept a new connection @@ -210,10 +242,19 @@ impl Actor< continue; } + // Check whether the IP is from a blocked peer + if let Some(&blocked_until) = self.blocked_ips.get(&ip) { + if self.context.current() < blocked_until { + self.handshakes_blocked.inc(); + debug!(?address, "rejecting blocked peer"); + continue; + } + } + // Check whether the IP is registered if !self.bypass_ip_check && !self.registered_ips.contains(&ip) { self.handshakes_blocked.inc(); - debug!(?address, "rejecting unregistered address"); + debug!(?address, "rejecting unknown address"); continue; } @@ -323,10 +364,13 @@ mod tests { updates_rx, ); - let mut allowed = HashSet::new(); - allowed.insert(IpAddr::V4(Ipv4Addr::LOCALHOST)); + let mut registered_ips = HashSet::new(); + registered_ips.insert(IpAddr::V4(Ipv4Addr::LOCALHOST)); updates_tx - .send(allowed) + .send(tracker::ListenerFilter { + registered_ips, + blocked_ips: HashMap::new(), + }) .await .expect("update registered ips"); @@ -335,7 +379,7 @@ mod tests { while let Some(message) = tracker_rx.next().await { match message { tracker::Message::Acceptable { responder, .. } => { - let _ = responder.send(true); + let _ = responder.send(tracker::Acceptable::Yes); } tracker::Message::Listen { reservation, .. } => { let _ = reservation.send(None); @@ -496,7 +540,7 @@ mod tests { while let Some(message) = tracker_rx.next().await { match message { tracker::Message::Acceptable { responder, .. } => { - let _ = responder.send(true); + let _ = responder.send(tracker::Acceptable::Yes); } tracker::Message::Listen { reservation, .. } => { let _ = reservation.send(None); @@ -577,7 +621,7 @@ mod tests { while let Some(message) = tracker_rx.next().await { match message { tracker::Message::Acceptable { responder, .. } => { - let _ = responder.send(true); + let _ = responder.send(tracker::Acceptable::Yes); } tracker::Message::Listen { reservation, .. } => { let _ = reservation.send(None); @@ -654,10 +698,13 @@ mod tests { ); // Register the IP so it would be allowed if not for the private IP check - let mut allowed = HashSet::new(); - allowed.insert(IpAddr::V4(Ipv4Addr::LOCALHOST)); + let mut registered_ips = HashSet::new(); + registered_ips.insert(IpAddr::V4(Ipv4Addr::LOCALHOST)); updates_tx - .send(allowed) + .send(tracker::ListenerFilter { + registered_ips, + blocked_ips: HashMap::new(), + }) .await .expect("update registered ips"); @@ -666,7 +713,7 @@ mod tests { while let Some(message) = tracker_rx.next().await { match message { tracker::Message::Acceptable { responder, .. } => { - let _ = responder.send(true); + let _ = responder.send(tracker::Acceptable::Yes); } tracker::Message::Listen { reservation, .. } => { let _ = reservation.send(None); diff --git a/p2p/src/authenticated/lookup/actors/tracker/actor.rs b/p2p/src/authenticated/lookup/actors/tracker/actor.rs index 0881e193f6..cc2377fe6b 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/actor.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/actor.rs @@ -1,7 +1,7 @@ use super::{ directory::{self, Directory}, ingress::{Message, Oracle}, - Config, + Config, ListenerFilter, }; use crate::authenticated::{ lookup::actors::{peer, tracker::ingress::Releaser}, @@ -16,10 +16,7 @@ use commonware_runtime::{ use commonware_utils::ordered::Set; use futures::{channel::mpsc, StreamExt}; use rand::Rng; -use std::{ - collections::{HashMap, HashSet}, - net::IpAddr, -}; +use std::collections::HashMap; use tracing::debug; /// The tracker actor that manages peer discovery and connection reservations. @@ -35,7 +32,7 @@ pub struct Actor { receiver: mpsc::UnboundedReceiver>, /// The mailbox for the listener. - listener: Mailbox>, + listener: Mailbox, // ---------- State ---------- /// Tracks peer sets and peer connectivity information. @@ -67,6 +64,7 @@ impl Actor { rate_limit: cfg.allowed_connection_rate_per_peer, allow_private_ips: cfg.allow_private_ips, allow_dns: cfg.allow_dns, + block_duration: cfg.block_duration, bypass_ip_check: cfg.bypass_ip_check, }; @@ -133,8 +131,12 @@ impl Actor { } } - // Send the updated listenable IPs to the listener. - let _ = self.listener.send(self.directory.listenable()).await; + // Send the updated filter to the listener. + let filter = ListenerFilter { + registered_ips: self.directory.listenable(), + blocked_ips: self.directory.blocked_ips(), + }; + let _ = self.listener.send(filter).await; // Notify all subscribers about the new peer set self.subscribers.retain(|subscriber| { @@ -208,8 +210,12 @@ impl Actor { peer.kill().await; } - // Send the updated listenable IPs to the listener. - let _ = self.listener.send(self.directory.listenable()).await; + // Send the updated filter to the listener. + let filter = ListenerFilter { + registered_ips: self.directory.listenable(), + blocked_ips: self.directory.blocked_ips(), + }; + let _ = self.listener.send(filter).await; } Message::Release { metadata } => { // Clear the peer handle if it exists @@ -225,7 +231,10 @@ impl Actor { #[cfg(test)] mod tests { use super::*; - use crate::{authenticated::lookup::actors::peer, Blocker, Ingress, Manager}; + use crate::{ + authenticated::lookup::actors::{peer, tracker::Acceptable}, + Blocker, Ingress, Manager, + }; use commonware_cryptography::{ ed25519::{PrivateKey, PublicKey}, Signer, @@ -244,7 +253,7 @@ mod tests { fn test_config( crypto: C, bypass_ip_check: bool, - ) -> (Config, mpsc::Receiver>) { + ) -> (Config, mpsc::Receiver) { let (registered_ips_sender, registered_ips_receiver) = Mailbox::new(1); ( Config { @@ -253,6 +262,7 @@ mod tests { allowed_connection_rate_per_peer: Quota::per_second(NZU32!(5)), allow_private_ips: true, allow_dns: true, + block_duration: Duration::from_secs(60), bypass_ip_check, listener: registered_ips_sender, }, @@ -396,9 +406,18 @@ mod tests { } = setup_actor(context.clone(), cfg_initial); // None acceptable because not registered - assert!(!mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await); - assert!(!mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await); - assert!(!mailbox.acceptable(peer_pk3.clone(), peer_addr3.ip()).await); + assert_ne!( + mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await, + Acceptable::Yes + ); + assert_ne!( + mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await, + Acceptable::Yes + ); + assert_ne!( + mailbox.acceptable(peer_pk3.clone(), peer_addr3.ip()).await, + Acceptable::Yes + ); oracle .update( @@ -414,13 +433,25 @@ mod tests { context.sleep(Duration::from_millis(10)).await; // Not acceptable because self - assert!(!mailbox.acceptable(peer_pk, peer_addr.ip()).await); + assert_ne!( + mailbox.acceptable(peer_pk, peer_addr.ip()).await, + Acceptable::Yes + ); // Acceptable because registered with correct IP - assert!(mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await); + assert_eq!( + mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await, + Acceptable::Yes + ); // Not acceptable with wrong IP - assert!(!mailbox.acceptable(peer_pk2, peer_addr.ip()).await); + assert_ne!( + mailbox.acceptable(peer_pk2, peer_addr.ip()).await, + Acceptable::Yes + ); // Not acceptable because not registered - assert!(!mailbox.acceptable(peer_pk3, peer_addr3.ip()).await); + assert_ne!( + mailbox.acceptable(peer_pk3, peer_addr3.ip()).await, + Acceptable::Yes + ); }); } @@ -444,8 +475,9 @@ mod tests { } = setup_actor(context.clone(), cfg); // Unknown peer is NOT acceptable (bypass_ip_check only skips IP check) - assert!( - !mailbox.acceptable(peer_pk3.clone(), peer_addr3.ip()).await, + assert_ne!( + mailbox.acceptable(peer_pk3.clone(), peer_addr3.ip()).await, + Acceptable::Yes, "Unknown peer should not be acceptable" ); @@ -463,14 +495,16 @@ mod tests { context.sleep(Duration::from_millis(10)).await; // With bypass_ip_check=true, registered peer with wrong IP is acceptable - assert!( + assert_eq!( mailbox.acceptable(peer_pk2.clone(), peer_addr.ip()).await, + Acceptable::Yes, "Registered peer with wrong IP should be acceptable with bypass_ip_check=true" ); // Self is still not acceptable - assert!( - !mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await, + assert_ne!( + mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await, + Acceptable::Yes, "Self should not be acceptable" ); @@ -478,9 +512,10 @@ mod tests { oracle.block(peer_pk2.clone()).await; context.sleep(Duration::from_millis(10)).await; - assert!( - !mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await, - "Blocked peer should not be acceptable" + assert_eq!( + mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await, + Acceptable::Blocked, + "Blocked peer should return Blocked status" ); }); } @@ -507,12 +542,18 @@ mod tests { .await; context.sleep(Duration::from_millis(10)).await; // Allow register to process - assert!(mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await); + assert_eq!( + mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await, + Acceptable::Yes + ); let reservation = mailbox.listen(peer_pk.clone()).await; assert!(reservation.is_some()); - assert!(!mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await); + assert_ne!( + mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await, + Acceptable::Yes + ); let failed_reservation = mailbox.listen(peer_pk.clone()).await; assert!(failed_reservation.is_none()); @@ -665,10 +706,10 @@ mod tests { context.sleep(Duration::from_millis(10)).await; // Wait for a listener update - let registered_ips = listener_receiver.next().await.unwrap(); - assert!(registered_ips.contains(&my_addr.ip())); - assert!(registered_ips.contains(&addr_1.ip())); - assert!(!registered_ips.contains(&addr_2.ip())); + let filter = listener_receiver.next().await.unwrap(); + assert!(filter.registered_ips.contains(&my_addr.ip())); + assert!(filter.registered_ips.contains(&addr_1.ip())); + assert!(!filter.registered_ips.contains(&addr_2.ip())); // Mark peer as connected let reservation = mailbox.listen(pk_1.clone()).await; @@ -683,10 +724,10 @@ mod tests { .await; // Wait for a listener update - let registered_ips = listener_receiver.next().await.unwrap(); - assert!(!registered_ips.contains(&my_addr.ip())); - assert!(!registered_ips.contains(&addr_1.ip())); - assert!(registered_ips.contains(&addr_2.ip())); + let filter = listener_receiver.next().await.unwrap(); + assert!(!filter.registered_ips.contains(&my_addr.ip())); + assert!(!filter.registered_ips.contains(&addr_1.ip())); + assert!(filter.registered_ips.contains(&addr_2.ip())); // The first peer should be have received a kill message because its // peer set was removed because `tracked_peer_sets` is 1. diff --git a/p2p/src/authenticated/lookup/actors/tracker/directory.rs b/p2p/src/authenticated/lookup/actors/tracker/directory.rs index eede438e7e..34ce82e5c2 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/directory.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/directory.rs @@ -1,4 +1,4 @@ -use super::{metrics::Metrics, record::Record, Metadata, Reservation}; +use super::{ingress::Acceptable, metrics::Metrics, record::Record, Metadata, Reservation}; use crate::{ authenticated::lookup::{actors::tracker::ingress::Releaser, metrics}, types::Address, @@ -17,6 +17,7 @@ use rand::Rng; use std::{ collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}, net::IpAddr, + time::{Duration, SystemTime}, }; use tracing::{debug, warn}; @@ -36,6 +37,9 @@ pub struct Config { /// The rate limit for allowing reservations per-peer. pub rate_limit: Quota, + + /// Duration after which blocked peers are automatically unblocked. + pub block_duration: Duration, } /// Represents a collection of records for all peers. @@ -50,6 +54,9 @@ pub struct Directory { /// Whether DNS-based ingress addresses are allowed. allow_dns: bool, + /// Duration after which blocked peers are automatically unblocked. + block_duration: Duration, + /// Whether to skip IP verification for incoming connections (allows unknown IPs). bypass_ip_check: bool, @@ -63,6 +70,9 @@ pub struct Directory { /// Rate limiter for connection attempts. rate_limiter: KeyedRateLimiter, + /// Context for getting current time. + context: E, + // ---------- Message-Passing ---------- /// The releaser for the tracker actor. releaser: Releaser, @@ -82,17 +92,19 @@ impl Directory { // Other initialization. let rate_limiter = KeyedRateLimiter::hashmap_with_clock(cfg.rate_limit, context.clone()); - let metrics = Metrics::init(context); + let metrics = Metrics::init(context.clone()); let _ = metrics.tracked.try_set(peers.len() - 1); // Exclude self Self { max_sets: cfg.max_sets, allow_private_ips: cfg.allow_private_ips, allow_dns: cfg.allow_dns, + block_duration: cfg.block_duration, bypass_ip_check: cfg.bypass_ip_check, peers, sets: BTreeMap::new(), rate_limiter, + context, releaser, metrics, } @@ -139,11 +151,12 @@ impl Directory { } // Create and store new peer set (all peers are tracked regardless of address validity) + let now = self.context.current(); for (peer, addr) in &peers { let record = match self.peers.entry(peer.clone()) { Entry::Occupied(entry) => { let entry = entry.into_mut(); - entry.update(addr.clone()); + entry.update(now, addr.clone()); entry } Entry::Vacant(entry) => { @@ -207,7 +220,15 @@ impl Directory { /// Attempt to block a peer, updating the metrics accordingly. pub fn block(&mut self, peer: &C) { - if self.peers.get_mut(peer).is_some_and(|r| r.block()) { + let now = self.context.current(); + let blocked_until = now + .checked_add(self.block_duration) + .unwrap_or(now + Duration::from_secs(365 * 24 * 60 * 60 * 100)); // ~100 years + if self + .peers + .get_mut(peer) + .is_some_and(|r| r.block(blocked_until)) + { self.metrics.blocked.inc(); } } @@ -226,11 +247,12 @@ impl Directory { /// Returns true if the peer is eligible for connection. /// - /// A peer is eligible if it is in a peer set, not blocked, and not ourselves. + /// A peer is eligible if it is in a peer set, not blocked (unless expired), and not ourselves. /// This does NOT check IP validity - that is done separately for dialing (ingress) /// and accepting (egress). pub fn eligible(&self, peer: &C) -> bool { - self.peers.get(peer).is_some_and(|r| r.eligible()) + let now = self.context.current(); + self.peers.get(peer).is_some_and(|r| r.eligible(now)) } /// Returns a vector of dialable peers. That is, unconnected peers for which we have a socket. @@ -246,29 +268,44 @@ impl Directory { result } - /// Returns true if this peer is acceptable (can accept an incoming connection from them). + /// Returns the acceptance status for a peer. /// /// Checks eligibility (peer set membership), egress IP match (if not bypass_ip_check), and connection status. - pub fn acceptable(&self, peer: &C, source_ip: IpAddr) -> bool { - self.peers - .get(peer) - .is_some_and(|record| record.acceptable(source_ip, self.bypass_ip_check)) + pub fn acceptable(&self, peer: &C, source_ip: IpAddr) -> Acceptable { + let now = self.context.current(); + let Some(record) = self.peers.get(peer) else { + return Acceptable::Unknown; + }; + record.acceptable(now, source_ip, self.bypass_ip_check) } /// Return egress IPs we should listen for (accept incoming connections from). /// /// Only includes IPs from peers that are: - /// - Eligible (in a peer set, not blocked, not ourselves) + /// - Eligible (in a peer set, not blocked unless expired, not ourselves) /// - Have a valid egress IP (global, or private IPs are allowed) pub fn listenable(&self) -> HashSet { + let now = self.context.current(); self.peers .values() - .filter(|r| r.eligible()) + .filter(|r| r.eligible(now)) .filter_map(|r| r.egress_ip()) .filter(|ip| self.allow_private_ips || IpAddrExt::is_global(ip)) .collect() } + /// Return blocked peer IPs with their expiry times. + /// + /// Used by the listener to distinguish blocked peers from unknown peers. + pub fn blocked_ips(&self) -> HashMap { + let now = self.context.current(); + self.peers + .values() + .filter(|r| r.blocked(now)) + .filter_map(|r| r.egress_ip().zip(r.blocked_until())) + .collect() + } + // --------- Helpers ---------- /// Attempt to reserve a peer. @@ -298,7 +335,8 @@ impl Directory { } // Reserve - if record.reserve() { + let now = self.context.current(); + if record.reserve(now) { self.metrics.reserved.inc(); return Some(Reservation::new(metadata, self.releaser.clone())); } @@ -315,7 +353,8 @@ impl Directory { if !record.deletable() { return false; } - if record.blocked() { + let now = self.context.current(); + if record.blocked(now) { self.metrics.blocked.dec(); } self.peers.remove(peer); @@ -332,9 +371,12 @@ mod tests { Ingress, }; use commonware_cryptography::{ed25519, Signer}; - use commonware_runtime::{deterministic, Quota, Runner}; + use commonware_runtime::{deterministic, Clock, Quota, Runner}; use commonware_utils::{hostname, NZU32}; - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + time::Duration, + }; fn addr(socket: SocketAddr) -> Address { Address::Symmetric(socket) @@ -352,6 +394,7 @@ mod tests { bypass_ip_check: false, max_sets: 1, rate_limit: Quota::per_second(NZU32!(10)), + block_duration: Duration::from_secs(3600), }; let pk_1 = ed25519::PrivateKey::from_seed(1).public_key(); @@ -414,6 +457,7 @@ mod tests { bypass_ip_check: false, max_sets: 3, rate_limit: Quota::per_second(NZU32!(10)), + block_duration: Duration::from_secs(3600), }; let pk_1 = ed25519::PrivateKey::from_seed(1).public_key(); @@ -502,6 +546,7 @@ mod tests { bypass_ip_check: false, max_sets: 3, rate_limit: Quota::per_second(NZU32!(10)), + block_duration: Duration::from_secs(3600), }; let pk_1 = ed25519::PrivateKey::from_seed(1).public_key(); @@ -509,13 +554,14 @@ mod tests { let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 2235); runtime.start(|context| async move { - let mut directory = Directory::init(context, my_pk.clone(), config, releaser); + let mut directory = Directory::init(context.clone(), my_pk.clone(), config, releaser); + let now = context.current(); directory.add_set(0, [(pk_1.clone(), addr(addr_1))].try_into().unwrap()); directory.block(&pk_1); let record = directory.peers.get(&pk_1).unwrap(); assert!( - record.blocked(), + record.blocked(now), "Peer should be blocked after call to block" ); assert!( @@ -526,7 +572,7 @@ mod tests { directory.add_set(1, [(pk_1.clone(), addr(addr_2))].try_into().unwrap()); let record = directory.peers.get(&pk_1).unwrap(); assert!( - record.blocked(), + record.blocked(now), "Blocked peer should remain blocked after update" ); assert!( @@ -548,6 +594,7 @@ mod tests { bypass_ip_check: false, max_sets: 3, rate_limit: Quota::per_second(NZU32!(10)), + block_duration: Duration::from_secs(3600), }; // Create asymmetric address where ingress differs from egress @@ -647,6 +694,7 @@ mod tests { bypass_ip_check: false, max_sets: 3, rate_limit: Quota::per_second(NZU32!(10)), + block_duration: Duration::from_secs(3600), }; // Create peers with different address types @@ -712,6 +760,7 @@ mod tests { bypass_ip_check: false, max_sets: 3, rate_limit: Quota::per_second(NZU32!(10)), + block_duration: Duration::from_secs(3600), }; // Create peer with public egress IP diff --git a/p2p/src/authenticated/lookup/actors/tracker/ingress.rs b/p2p/src/authenticated/lookup/actors/tracker/ingress.rs index 99a6df0c5a..49ca08a03b 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/ingress.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/ingress.rs @@ -13,6 +13,19 @@ use commonware_utils::ordered::{Map, Set}; use futures::channel::{mpsc, oneshot}; use std::net::IpAddr; +/// Result of checking if a peer is acceptable for connection. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Acceptable { + /// Peer is acceptable for connection. + Yes, + /// Peer is blocked. + Blocked, + /// Peer is not in any peer set. + Unknown, + /// Peer is known but rejected (already connected, reserved, ourselves, IP mismatch, etc.). + Rejected, +} + /// Messages that can be sent to the tracker actor. #[derive(Debug)] pub enum Message { @@ -79,8 +92,8 @@ pub enum Message { /// The IP address the peer connected from. source_ip: IpAddr, - /// The sender to respond with whether the peer is acceptable. - responder: oneshot::Sender, + /// The sender to respond with the acceptance status. + responder: oneshot::Sender, }, /// Request a reservation for a particular peer. @@ -129,7 +142,11 @@ impl UnboundedMailbox> { } /// Send an `Acceptable` message to the tracker. - pub async fn acceptable(&mut self, public_key: C, source_ip: IpAddr) -> bool { + pub fn acceptable( + &mut self, + public_key: C, + source_ip: IpAddr, + ) -> impl std::future::Future { let (tx, rx) = oneshot::channel(); self.send(Message::Acceptable { public_key, @@ -137,7 +154,7 @@ impl UnboundedMailbox> { responder: tx, }) .unwrap(); - rx.await.unwrap() + async move { rx.await.unwrap() } } /// Send a `Listen` message to the tracker. diff --git a/p2p/src/authenticated/lookup/actors/tracker/mod.rs b/p2p/src/authenticated/lookup/actors/tracker/mod.rs index b69c1d3fa3..8f39ac86e8 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/mod.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/mod.rs @@ -3,7 +3,11 @@ use crate::authenticated::Mailbox; use commonware_cryptography::Signer; use commonware_runtime::Quota; -use std::{collections::HashSet, net::IpAddr}; +use std::{ + collections::HashSet, + net::IpAddr, + time::{Duration, SystemTime}, +}; pub mod actor; mod directory; @@ -14,10 +18,19 @@ mod record; mod reservation; pub use actor::Actor; -pub use ingress::{Message, Oracle}; +pub use ingress::{Acceptable, Message, Oracle}; pub use metadata::Metadata; pub use reservation::Reservation; +/// Message containing listener filter information. +#[derive(Clone, Debug)] +pub struct ListenerFilter { + /// IPs of eligible peers we should accept connections from. + pub registered_ips: HashSet, + /// IPs of blocked peers with their expiry times. + pub blocked_ips: std::collections::HashMap, +} + #[derive(Clone, Debug)] pub struct Config { pub crypto: C, @@ -25,6 +38,7 @@ pub struct Config { pub allowed_connection_rate_per_peer: Quota, pub allow_private_ips: bool, pub allow_dns: bool, + pub block_duration: Duration, pub bypass_ip_check: bool, - pub listener: Mailbox>, + pub listener: Mailbox, } diff --git a/p2p/src/authenticated/lookup/actors/tracker/record.rs b/p2p/src/authenticated/lookup/actors/tracker/record.rs index 1851fb1809..7752269426 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/record.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/record.rs @@ -1,5 +1,6 @@ +use super::ingress::Acceptable; use crate::types::{self, Ingress}; -use std::net::IpAddr; +use std::{net::IpAddr, time::SystemTime}; /// Represents information known about a peer's address. #[derive(Clone, Debug)] @@ -10,9 +11,8 @@ pub enum Address { /// Address is provided when peer is registered. Known(types::Address), - /// Peer is blocked. - /// We don't care to track its information. - Blocked, + /// Peer is blocked until the given time. + Blocked(SystemTime), } /// Represents the connection status of a peer. @@ -73,24 +73,38 @@ impl Record { // ---------- Setters ---------- /// Update the record with a new address. - pub fn update(&mut self, addr: types::Address) { - if matches!(self.address, Address::Myself | Address::Blocked) { - return; + /// + /// Updates are allowed for non-blocked records, or for blocked records whose + /// block has expired. + pub fn update(&mut self, now: SystemTime, addr: types::Address) { + match &self.address { + Address::Myself => return, + Address::Blocked(until) if now < *until => return, + _ => {} } self.address = Address::Known(addr); } - /// Attempt to mark the peer as blocked. + /// Attempt to mark the peer as blocked until the given time. /// - /// Returns `true` if the peer was newly blocked. + /// If the peer is already blocked, extends the block to the later of the two times. + /// Returns `true` if the peer was newly blocked (not already blocked). /// Returns `false` if the peer was already blocked or is the local node (unblockable). - pub fn block(&mut self) -> bool { - if matches!(self.address, Address::Blocked | Address::Myself) { - return false; + pub fn block(&mut self, blocked_until: SystemTime) -> bool { + match &self.address { + Address::Myself => false, + Address::Blocked(existing_until) => { + if blocked_until > *existing_until { + self.address = Address::Blocked(blocked_until); + } + false + } + Address::Known(_) => { + self.address = Address::Blocked(blocked_until); + self.persistent = false; + true + } } - self.address = Address::Blocked; - self.persistent = false; - true } /// Increase the count of peer sets this peer is part of. @@ -110,10 +124,15 @@ impl Record { /// Attempt to reserve the peer for connection. /// /// Returns `true` if the reservation was successful, `false` otherwise. - pub const fn reserve(&mut self) -> bool { - if matches!(self.address, Address::Blocked | Address::Myself) { + pub fn reserve(&mut self, now: SystemTime) -> bool { + if matches!(self.address, Address::Myself) { return false; } + if let Address::Blocked(until) = self.address { + if now < until { + return false; + } + } if matches!(self.status, Status::Inert) { self.status = Status::Reserved; return true; @@ -137,9 +156,17 @@ impl Record { // ---------- Getters ---------- - /// Returns `true` if the record is blocked. - pub const fn blocked(&self) -> bool { - matches!(self.address, Address::Blocked) + /// Returns `true` if the record is currently blocked (block has not expired). + pub fn blocked(&self, now: SystemTime) -> bool { + matches!(self.address, Address::Blocked(until) if now < until) + } + + /// Returns the time until which this peer is blocked, if blocked. + pub const fn blocked_until(&self) -> Option { + match self.address { + Address::Blocked(until) => Some(until), + _ => None, + } } /// Returns the number of peer sets this peer is part of. @@ -154,6 +181,10 @@ impl Record { /// - It is not ourselves or blocked /// - We are not already connected or reserved /// - The ingress address is allowed (DNS enabled, Socket IP is global or private IPs allowed) + /// + /// Note: Blocked peers are never dialable regardless of expiry since we don't retain + /// their address info. Once a block expires and the peer is re-added to a peer set, + /// they will become dialable again via the normal update path. pub fn dialable(&self, allow_private_ips: bool, allow_dns: bool) -> bool { if self.status != Status::Inert { return false; @@ -165,22 +196,41 @@ impl Record { ingress.is_valid(allow_private_ips, allow_dns) } - /// Returns `true` if this peer is acceptable (can accept an incoming connection from them). + /// Returns the acceptance status for this peer. /// /// A peer is acceptable if: - /// - The peer is eligible (in a peer set, not blocked, not ourselves) + /// - The peer is eligible (in a peer set, not blocked unless expired, not ourselves) /// - The source IP matches the expected egress IP for this peer (if not bypass_ip_check) /// - We are not already connected or reserved - pub fn acceptable(&self, source_ip: IpAddr, bypass_ip_check: bool) -> bool { - if !self.eligible() || self.status != Status::Inert { - return false; + pub fn acceptable( + &self, + now: SystemTime, + source_ip: IpAddr, + bypass_ip_check: bool, + ) -> Acceptable { + // Check if ourselves + if matches!(self.address, Address::Myself) { + return Acceptable::Rejected; + } + // Check if blocked (not expired) + if self.blocked(now) { + return Acceptable::Blocked; } + // Check if in a peer set + if self.sets == 0 && !self.persistent { + return Acceptable::Unknown; + } + // Check if already connected or reserved + if self.status != Status::Inert { + return Acceptable::Rejected; + } + // Check IP match if bypass_ip_check { - return true; + return Acceptable::Yes; } match &self.address { - Address::Known(addr) => addr.egress_ip() == source_ip, - _ => false, + Address::Known(addr) if addr.egress_ip() == source_ip => Acceptable::Yes, + _ => Acceptable::Rejected, // Known peer but wrong IP } } @@ -189,7 +239,7 @@ impl Record { match &self.address { Address::Myself => None, Address::Known(addr) => Some(addr.ingress()), - Address::Blocked => None, + Address::Blocked(_) => None, } } @@ -198,7 +248,7 @@ impl Record { match &self.address { Address::Myself => None, Address::Known(addr) => Some(addr.egress_ip()), - Address::Blocked => None, + Address::Blocked(_) => None, } } @@ -216,15 +266,23 @@ impl Record { /// Returns `true` if this peer is eligible for connection. /// /// A peer is eligible if: - /// - It is not blocked or ourselves + /// - It is not blocked (unless the block has expired) or ourselves /// - It is part of at least one peer set (or is persistent) /// /// This is the base check for reserving a connection. IP validity is checked /// separately: ingress validity for dialing (in `dialable()`), egress validity /// for the IP filter (in `Directory::eligible_egress_ips()`). - pub const fn eligible(&self) -> bool { + pub fn eligible(&self, now: SystemTime) -> bool { match &self.address { - Address::Blocked | Address::Myself => false, + Address::Blocked(until) => { + // Blocked but expired - treat as eligible if in a peer set + if now >= *until { + self.sets > 0 || self.persistent + } else { + false + } + } + Address::Myself => false, Address::Known(_) => self.sets > 0 || self.persistent, } } @@ -232,7 +290,7 @@ impl Record { #[cfg(test)] mod tests { use super::*; - use std::net::SocketAddr; + use std::{net::SocketAddr, time::Duration}; fn test_socket() -> SocketAddr { SocketAddr::from(([54, 12, 1, 9], 8080)) @@ -242,6 +300,14 @@ mod tests { types::Address::Symmetric(test_socket()) } + fn now() -> SystemTime { + SystemTime::UNIX_EPOCH + Duration::from_secs(1000) + } + + fn future_time() -> SystemTime { + now() + Duration::from_secs(3600) + } + #[test] fn test_myself_initial_state() { let record = Record::myself(); @@ -250,17 +316,17 @@ mod tests { assert_eq!(record.sets, 0); assert!(record.persistent); assert!(record.ingress().is_none()); - assert!(!record.blocked()); + assert!(!record.blocked(now())); assert!(!record.reserved()); assert!(!record.deletable()); - assert!(!record.eligible()); + assert!(!record.eligible(now())); } #[test] fn test_myself_blocked_to_known() { let mut record = Record::myself(); - record.block(); - assert!(!record.blocked(), "Can't block myself"); + record.block(future_time()); + assert!(!record.blocked(now()), "Can't block myself"); } #[test] @@ -292,36 +358,36 @@ mod tests { fn test_block_behavior_and_persistence() { let mut record_known = Record::known(test_address()); assert!(!record_known.persistent); - assert!(record_known.block()); - assert!(record_known.blocked()); - assert!(matches!(record_known.address, Address::Blocked)); + assert!(record_known.block(future_time())); + assert!(record_known.blocked(now())); + assert!(matches!(record_known.address, Address::Blocked(_))); assert!(!record_known.persistent); let mut record_reserved = Record::known(test_address()); - assert!(record_reserved.reserve()); - assert!(record_reserved.block()); + assert!(record_reserved.reserve(now())); + assert!(record_reserved.block(future_time())); assert_eq!(record_reserved.status, Status::Reserved); let mut record_active = Record::known(test_address()); - assert!(record_active.reserve()); + assert!(record_active.reserve(now())); record_active.connect(); - assert!(record_active.block()); + assert!(record_active.block(future_time())); assert_eq!(record_active.status, Status::Active); } #[test] fn test_block_myself_and_already_blocked() { let mut record_myself = Record::myself(); - assert!(!record_myself.block(), "Cannot block myself"); + assert!(!record_myself.block(future_time()), "Cannot block myself"); assert!(matches!(&record_myself.address, Address::Myself)); let mut record_to_be_blocked = Record::known(test_address()); - assert!(record_to_be_blocked.block()); + assert!(record_to_be_blocked.block(future_time())); assert!( - !record_to_be_blocked.block(), + !record_to_be_blocked.block(future_time()), "Cannot block already blocked peer" ); - assert!(matches!(record_to_be_blocked.address, Address::Blocked)); + assert!(matches!(record_to_be_blocked.address, Address::Blocked(_))); } #[test] @@ -329,25 +395,25 @@ mod tests { let mut record = Record::known(test_address()); assert_eq!(record.status, Status::Inert); - assert!(record.reserve()); + assert!(record.reserve(now())); assert_eq!(record.status, Status::Reserved); assert!(record.reserved()); - assert!(!record.reserve(), "Cannot re-reserve when Reserved"); + assert!(!record.reserve(now()), "Cannot re-reserve when Reserved"); assert_eq!(record.status, Status::Reserved); record.connect(); assert_eq!(record.status, Status::Active); assert!(record.reserved()); - assert!(!record.reserve(), "Cannot reserve when Active"); + assert!(!record.reserve(now()), "Cannot reserve when Active"); assert_eq!(record.status, Status::Active); record.release(); assert_eq!(record.status, Status::Inert); assert!(!record.reserved()); - assert!(record.reserve()); + assert!(record.reserve(now())); assert_eq!(record.status, Status::Reserved); record.release(); assert_eq!(record.status, Status::Inert); @@ -364,7 +430,7 @@ mod tests { #[should_panic] fn test_connect_when_active_panics() { let mut record = Record::known(test_address()); - assert!(record.reserve()); + assert!(record.reserve(now())); record.connect(); record.connect(); } @@ -380,7 +446,7 @@ mod tests { fn test_reserved_status_check() { let mut record = Record::known(test_address()); assert!(!record.reserved()); - assert!(record.reserve()); + assert!(record.reserve(now())); assert!(record.reserved()); record.connect(); assert!(record.reserved()); @@ -400,7 +466,7 @@ mod tests { record.increment(); assert!(!record.deletable()); - assert!(record.reserve()); + assert!(record.reserve(now())); assert!(!record.deletable()); record.connect(); @@ -415,19 +481,40 @@ mod tests { #[test] fn test_eligible_logic() { - // Blocked and Myself are never eligible + // Blocked and Myself are never eligible (while block is active) let mut record_blocked = Record::known(test_address()); - record_blocked.block(); - assert!(!record_blocked.eligible()); - assert!(!Record::myself().eligible()); + record_blocked.block(future_time()); + assert!(!record_blocked.eligible(now())); + assert!(!Record::myself().eligible(now())); // Known records are only eligible when in a peer set let mut record_known = Record::known(test_address()); - assert!(!record_known.eligible(), "Not eligible when sets=0"); + assert!(!record_known.eligible(now()), "Not eligible when sets=0"); record_known.increment(); - assert!(record_known.eligible(), "Eligible when sets>0"); + assert!(record_known.eligible(now()), "Eligible when sets>0"); record_known.decrement(); - assert!(!record_known.eligible(), "Not eligible when sets=0 again"); + assert!( + !record_known.eligible(now()), + "Not eligible when sets=0 again" + ); + } + + #[test] + fn test_block_expiry() { + let mut record = Record::known(test_address()); + record.increment(); + + // Block the peer + assert!(record.block(future_time())); + + // Currently blocked + assert!(record.blocked(now())); + assert!(!record.eligible(now())); + + // After expiry time, no longer blocked + let after_expiry = future_time() + Duration::from_secs(1); + assert!(!record.blocked(after_expiry)); + assert!(record.eligible(after_expiry)); } #[test] @@ -441,49 +528,55 @@ mod tests { // Eligible, Inert, and correct IP - acceptable let mut record = Record::known(types::Address::Symmetric(public_socket)); record.increment(); - assert!( - record.acceptable(egress_ip, false), + assert_eq!( + record.acceptable(now(), egress_ip, false), + Acceptable::Yes, "Eligible, Inert, correct IP is acceptable" ); // Correct everything but wrong IP - not acceptable - assert!( - !record.acceptable(wrong_ip, false), + assert_eq!( + record.acceptable(now(), wrong_ip, false), + Acceptable::Rejected, "Not acceptable when IP doesn't match" ); // Not eligible (sets=0) - not acceptable let record_not_eligible = Record::known(types::Address::Symmetric(public_socket)); - assert!( - !record_not_eligible.acceptable(egress_ip, false), + assert_eq!( + record_not_eligible.acceptable(now(), egress_ip, false), + Acceptable::Unknown, "Not acceptable when not eligible" ); // Already reserved - not acceptable let mut record_reserved = Record::known(types::Address::Symmetric(public_socket)); record_reserved.increment(); - record_reserved.reserve(); - assert!( - !record_reserved.acceptable(egress_ip, false), + record_reserved.reserve(now()); + assert_eq!( + record_reserved.acceptable(now(), egress_ip, false), + Acceptable::Rejected, "Not acceptable when reserved" ); // Already connected - not acceptable let mut record_connected = Record::known(types::Address::Symmetric(public_socket)); record_connected.increment(); - record_connected.reserve(); + record_connected.reserve(now()); record_connected.connect(); - assert!( - !record_connected.acceptable(egress_ip, false), + assert_eq!( + record_connected.acceptable(now(), egress_ip, false), + Acceptable::Rejected, "Not acceptable when connected" ); // Blocked - not acceptable let mut record_blocked = Record::known(types::Address::Symmetric(public_socket)); record_blocked.increment(); - record_blocked.block(); - assert!( - !record_blocked.acceptable(egress_ip, false), + record_blocked.block(future_time()); + assert_eq!( + record_blocked.acceptable(now(), egress_ip, false), + Acceptable::Blocked, "Not acceptable when blocked" ); } @@ -497,50 +590,56 @@ mod tests { // With bypass_ip_check=true, accepts even with wrong IP (skips IP check) let mut record = Record::known(types::Address::Symmetric(public_socket)); record.increment(); - assert!( - record.acceptable(wrong_ip, true), + assert_eq!( + record.acceptable(now(), wrong_ip, true), + Acceptable::Yes, "Acceptable with wrong IP when bypass_ip_check=true" ); // Still requires eligible (sets > 0), even with bypass_ip_check=true let record_not_eligible = Record::known(types::Address::Symmetric(public_socket)); - assert!( - !record_not_eligible.acceptable(egress_ip, true), + assert_eq!( + record_not_eligible.acceptable(now(), egress_ip, true), + Acceptable::Unknown, "Not acceptable when not eligible (sets=0), even with bypass_ip_check=true" ); // Still not acceptable when blocked let mut record_blocked = Record::known(types::Address::Symmetric(public_socket)); record_blocked.increment(); - record_blocked.block(); - assert!( - !record_blocked.acceptable(egress_ip, true), + record_blocked.block(future_time()); + assert_eq!( + record_blocked.acceptable(now(), egress_ip, true), + Acceptable::Blocked, "Not acceptable when blocked" ); // Still not acceptable when reserved let mut record_reserved = Record::known(types::Address::Symmetric(public_socket)); record_reserved.increment(); - record_reserved.reserve(); - assert!( - !record_reserved.acceptable(egress_ip, true), + record_reserved.reserve(now()); + assert_eq!( + record_reserved.acceptable(now(), egress_ip, true), + Acceptable::Rejected, "Not acceptable when reserved" ); // Still not acceptable when connected let mut record_connected = Record::known(types::Address::Symmetric(public_socket)); record_connected.increment(); - record_connected.reserve(); + record_connected.reserve(now()); record_connected.connect(); - assert!( - !record_connected.acceptable(egress_ip, true), + assert_eq!( + record_connected.acceptable(now(), egress_ip, true), + Acceptable::Rejected, "Not acceptable when connected" ); // Still not acceptable when myself let record_myself = Record::myself(); - assert!( - !record_myself.acceptable(egress_ip, true), + assert_eq!( + record_myself.acceptable(now(), egress_ip, true), + Acceptable::Rejected, "Not acceptable when myself" ); } diff --git a/p2p/src/authenticated/lookup/config.rs b/p2p/src/authenticated/lookup/config.rs index e22fd1a665..3f47d22359 100644 --- a/p2p/src/authenticated/lookup/config.rs +++ b/p2p/src/authenticated/lookup/config.rs @@ -96,6 +96,12 @@ pub struct Config { /// set (if we, for example, are trying to do a reshare of a threshold /// key). pub tracked_peer_sets: usize, + + /// Duration after which blocked peers are automatically unblocked. + /// + /// This allows recovery from temporary issues like misconfigured nodes + /// without requiring restarts across all peers. + pub block_duration: Duration, } impl Config { @@ -127,6 +133,7 @@ impl Config { dial_frequency: Duration::from_secs(1), query_frequency: Duration::from_secs(60), tracked_peer_sets: 4, + block_duration: Duration::from_secs(60 * 60), // 1 hour } } @@ -158,6 +165,7 @@ impl Config { dial_frequency: Duration::from_millis(500), query_frequency: Duration::from_secs(30), tracked_peer_sets: 4, + block_duration: Duration::from_secs(60 * 60), // 1 hour } } @@ -184,6 +192,7 @@ impl Config { dial_frequency: Duration::from_millis(200), query_frequency: Duration::from_secs(5), tracked_peer_sets: 4, + block_duration: Duration::from_secs(60), // 1 minute for tests } } } diff --git a/p2p/src/authenticated/lookup/network.rs b/p2p/src/authenticated/lookup/network.rs index b2e8624857..89537085d6 100644 --- a/p2p/src/authenticated/lookup/network.rs +++ b/p2p/src/authenticated/lookup/network.rs @@ -19,7 +19,6 @@ use commonware_stream::Config as StreamConfig; use commonware_utils::union; use futures::channel::mpsc; use rand::{CryptoRng, Rng}; -use std::{collections::HashSet, net::IpAddr}; use tracing::{debug, info}; /// Unique suffix for all messages signed in a stream. @@ -35,7 +34,7 @@ pub struct Network>, router: router::Actor, router_mailbox: Mailbox>, - listener: mpsc::Receiver>, + listener: mpsc::Receiver, } impl @@ -52,7 +51,8 @@ impl) -> (Self, tracker::Oracle) { - let (listener_mailbox, listener) = Mailbox::>::new(cfg.mailbox_size); + let (listener_mailbox, listener) = + Mailbox::::new(cfg.mailbox_size); let (tracker, tracker_mailbox, oracle) = tracker::Actor::new( context.with_label("tracker"), tracker::Config { @@ -61,6 +61,7 @@ impl(()) }, listener_config, listener_stream, listener_sink, diff --git a/stream/fuzz/fuzz_targets/e2e.rs b/stream/fuzz/fuzz_targets/e2e.rs index 14ea8252dd..584d04a4bb 100644 --- a/stream/fuzz/fuzz_targets/e2e.rs +++ b/stream/fuzz/fuzz_targets/e2e.rs @@ -124,7 +124,7 @@ fn fuzz(input: FuzzInput) { let listener_handle = context.clone().spawn(move |context| async move { listen( context, - |_| async { true }, + |_| async { Ok::<(), ()>(()) }, listener_config, listener_stream, listener_sink, diff --git a/stream/fuzz/fuzz_targets/lazy_transport.rs b/stream/fuzz/fuzz_targets/lazy_transport.rs index 682b69cf09..6663819311 100644 --- a/stream/fuzz/fuzz_targets/lazy_transport.rs +++ b/stream/fuzz/fuzz_targets/lazy_transport.rs @@ -48,7 +48,7 @@ thread_local! { let listener_handle = context.clone().spawn(move |context| async move { listen( context, - |_| async { true }, + |_| async { Ok::<(), ()>(()) }, listener_config, listener_stream, listener_sink, diff --git a/stream/fuzz/fuzz_targets/transport.rs b/stream/fuzz/fuzz_targets/transport.rs index c9e4704953..eb344c6fba 100644 --- a/stream/fuzz/fuzz_targets/transport.rs +++ b/stream/fuzz/fuzz_targets/transport.rs @@ -39,7 +39,7 @@ fn fuzz(data: &[u8]) { let listener_handle = context.clone().spawn(move |context| async move { listen( context, - |_| async { true }, + |_| async { Ok::<(), ()>(()) }, listener_config, listener_stream, listener_sink, diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 0830d5687d..0b934d6cb5 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -75,7 +75,7 @@ use commonware_cryptography::{ }; use commonware_macros::select; use commonware_runtime::{Clock, Error as RuntimeError, Sink, Stream}; -use commonware_utils::{hex, SystemTimeExt}; +use commonware_utils::SystemTimeExt; use rand_core::CryptoRngCore; use std::{future::Future, ops::Range, time::Duration}; use thiserror::Error; @@ -86,14 +86,17 @@ const CIPHERTEXT_OVERHEAD: u32 = { }; /// Errors that can occur when interacting with a stream. +/// +/// The `R` type parameter represents the rejection reason when a peer is rejected +/// by the bouncer during the handshake. Use `()` if no rejection information is needed. #[derive(Error, Debug)] -pub enum Error { +pub enum Error { #[error("handshake error: {0}")] HandshakeError(HandshakeError), #[error("unable to decode: {0}")] UnableToDecode(CodecError), - #[error("peer rejected: {}", hex(_0))] - PeerRejected(Vec), + #[error("peer rejected")] + PeerRejected(R), #[error("recv failed")] RecvFailed(RuntimeError), #[error("recv too large: {0} bytes")] @@ -112,18 +115,40 @@ pub enum Error { HandshakeTimeout, } -impl From for Error { +impl From for Error { fn from(value: CodecError) -> Self { Self::UnableToDecode(value) } } -impl From for Error { +impl From for Error { fn from(value: HandshakeError) -> Self { Self::HandshakeError(value) } } +impl Error { + /// Convert an error from the default `Error<()>` to a specific `Error`. + /// + /// This is useful when helper functions return `Error<()>` but the caller + /// needs `Error`. Panics if called on `PeerRejected`. + fn from_unit(value: Error<()>) -> Self { + match value { + Error::HandshakeError(e) => Self::HandshakeError(e), + Error::UnableToDecode(e) => Self::UnableToDecode(e), + Error::PeerRejected(()) => unreachable!("PeerRejected(()) should not be converted"), + Error::RecvFailed(e) => Self::RecvFailed(e), + Error::RecvTooLarge(s) => Self::RecvTooLarge(s), + Error::InvalidVarint => Self::InvalidVarint, + Error::SendFailed(e) => Self::SendFailed(e), + Error::SendZeroSize => Self::SendZeroSize, + Error::SendTooLarge(s) => Self::SendTooLarge(s), + Error::StreamClosed => Self::StreamClosed, + Error::HandshakeTimeout => Self::HandshakeTimeout, + } + } +} + /// Configuration for a connection. /// /// # Warning @@ -227,29 +252,38 @@ pub async fn dial( /// Accepts an authenticated connection from a peer as the listener. /// Returns the peer's identity, sender, and receiver for encrypted communication. +/// +/// The `bouncer` function is called with the peer's public key to decide whether to accept +/// the connection. It should return `Ok(())` to accept or `Err(rejection_reason)` to reject. +/// The rejection reason is included in the returned `Error::PeerRejected` variant. pub async fn listen< - R: CryptoRngCore + Clock, + Ctx: CryptoRngCore + Clock, S: Signer, I: Stream, O: Sink, - Fut: Future, + R, + Fut: Future>, F: FnOnce(S::PublicKey) -> Fut, >( - mut ctx: R, + mut ctx: Ctx, bouncer: F, config: Config, mut stream: I, mut sink: O, -) -> Result<(S::PublicKey, Sender, Receiver), Error> { +) -> Result<(S::PublicKey, Sender, Receiver), Error> { let timeout = ctx.sleep(config.handshake_timeout); let inner_routine = async move { - let peer_bytes = recv_frame(&mut stream, config.max_message_size).await?; + let peer_bytes = recv_frame(&mut stream, config.max_message_size) + .await + .map_err(Error::from_unit)?; let peer = S::PublicKey::decode(peer_bytes)?; - if !bouncer(peer.clone()).await { - return Err(Error::PeerRejected(peer.encode().to_vec())); + if let Err(rejection_reason) = bouncer(peer.clone()).await { + return Err(Error::PeerRejected(rejection_reason)); } - let msg1_bytes = recv_frame(&mut stream, config.max_message_size).await?; + let msg1_bytes = recv_frame(&mut stream, config.max_message_size) + .await + .map_err(Error::from_unit)?; let msg1 = Syn::::decode(msg1_bytes)?; let (current_time, ok_timestamps) = config.time_information(&ctx); @@ -264,9 +298,13 @@ pub async fn listen< ), msg1, )?; - send_frame(&mut sink, &syn_ack.encode(), config.max_message_size).await?; + send_frame(&mut sink, &syn_ack.encode(), config.max_message_size) + .await + .map_err(Error::from_unit)?; - let ack_bytes = recv_frame(&mut stream, config.max_message_size).await?; + let ack_bytes = recv_frame(&mut stream, config.max_message_size) + .await + .map_err(Error::from_unit)?; let ack = Ack::decode(ack_bytes)?; let (send, recv) = listen_end(state, ack)?; @@ -372,7 +410,7 @@ mod test { let listener_handle = context.clone().spawn(move |context| async move { listen( context, - |_| async { true }, + |_| async { Ok::<(), ()>(()) }, listener_config, listener_stream, listener_sink,