From 27860f7eef3a3a6cee579bafa777ba905bde42bd Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Thu, 18 Dec 2025 17:31:14 -0800 Subject: [PATCH 1/6] spike --- .../discovery/actors/listener.rs | 36 ++- .../discovery/actors/tracker/actor.rs | 32 +- .../discovery/actors/tracker/directory.rs | 59 +++- .../discovery/actors/tracker/ingress.rs | 17 +- .../discovery/actors/tracker/mod.rs | 3 +- .../discovery/actors/tracker/record.rs | 276 ++++++++++-------- p2p/src/authenticated/discovery/config.rs | 6 + p2p/src/authenticated/discovery/network.rs | 1 + .../authenticated/lookup/actors/listener.rs | 49 +++- .../lookup/actors/tracker/actor.rs | 45 +-- .../lookup/actors/tracker/directory.rs | 75 ++++- .../lookup/actors/tracker/mod.rs | 18 +- .../lookup/actors/tracker/record.rs | 184 ++++++++---- p2p/src/authenticated/lookup/config.rs | 9 + p2p/src/authenticated/lookup/network.rs | 7 +- 15 files changed, 547 insertions(+), 270 deletions(-) diff --git a/p2p/src/authenticated/discovery/actors/listener.rs b/p2p/src/authenticated/discovery/actors/listener.rs index 2b61307c0c..e671fd9143 100644 --- a/p2p/src/authenticated/discovery/actors/listener.rs +++ b/p2p/src/authenticated/discovery/actors/listener.rs @@ -11,7 +11,7 @@ use commonware_runtime::{ spawn_cell, Clock, ContextCell, Handle, KeyedRateLimiter, Listener, Metrics, Network, Quota, SinkOf, Spawner, StreamOf, }; -use commonware_stream::{listen, Config as StreamConfig}; +use commonware_stream::{listen, Config as StreamConfig, Error as StreamError}; use commonware_utils::{concurrency::Limiter, net::SubnetMask, IpAddrExt}; use prometheus_client::metrics::counter::Counter; use rand::{CryptoRng, Rng}; @@ -103,9 +103,23 @@ impl Actor< mut tracker: UnboundedMailbox>, mut supervisor: Mailbox, StreamOf, C::PublicKey>>, ) { + // Track the rejection reason from the bouncer + let rejection_reason = std::sync::Arc::new(std::sync::Mutex::new(None)); + let rejection_reason_clone = rejection_reason.clone(); + let (peer, send, recv) = match listen( context, - |peer| tracker.acceptable(peer), + |peer| { + let mut tracker = tracker.clone(); + let rejection_reason = rejection_reason_clone.clone(); + async move { + let result = tracker.acceptable(peer).await; + if result != tracker::Acceptable::Yes { + *rejection_reason.lock().unwrap() = Some(result); + } + result == tracker::Acceptable::Yes + } + }, stream_cfg, stream, sink, @@ -113,6 +127,20 @@ impl Actor< .await { Ok(x) => x, + Err(StreamError::PeerRejected(_)) => { + // The bouncer returned false - check the captured reason + let reason = rejection_reason.lock().unwrap().take(); + match reason { + Some(tracker::Acceptable::Blocked) => { + debug!(?address, "peer is blocked"); + } + Some(tracker::Acceptable::Unknown) | None => { + debug!(?address, "peer not acceptable (unknown or not in peer set)"); + } + Some(tracker::Acceptable::Yes) => unreachable!(), + } + return; + } Err(err) => { debug!(?err, "failed to complete handshake"); return; @@ -300,7 +328,7 @@ mod tests { while let Some(message) = tracker_rx.next().await { match message { tracker::Message::Acceptable { responder, .. } => { - let _ = responder.send(true); + let _ = responder.send(tracker::Acceptable::Yes); } tracker::Message::Listen { reservation, .. } => { let _ = reservation.send(None); @@ -443,7 +471,7 @@ mod tests { while let Some(message) = tracker_rx.next().await { match message { tracker::Message::Acceptable { responder, .. } => { - let _ = responder.send(true); + let _ = responder.send(tracker::Acceptable::Yes); } tracker::Message::Listen { reservation, .. } => { let _ = reservation.send(None); diff --git a/p2p/src/authenticated/discovery/actors/tracker/actor.rs b/p2p/src/authenticated/discovery/actors/tracker/actor.rs index b069922f8d..246282f70d 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/actor.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/actor.rs @@ -1,3 +1,5 @@ +#[cfg(test)] +use super::ingress::Acceptable; use super::{ directory::{self, Directory}, ingress::{Message, Oracle}, @@ -79,6 +81,7 @@ impl Actor { max_sets: cfg.tracked_peer_sets, dial_fail_limit: cfg.dial_fail_limit, rate_limit: cfg.allowed_connection_rate_per_peer, + block_duration: cfg.block_duration, }; // Create the mailboxes @@ -319,6 +322,7 @@ mod tests { peer_gossip_max_count: 5, max_peer_set_size: 128, dial_fail_limit: 1, + block_duration: Duration::from_secs(60), } } @@ -795,9 +799,18 @@ mod tests { } = setup_actor(context.clone(), cfg_initial); // None listenable because not registered - assert!(!mailbox.acceptable(peer_pk.clone()).await); - assert!(!mailbox.acceptable(peer_pk2.clone()).await); - assert!(!mailbox.acceptable(peer_pk3.clone()).await); + assert_eq!( + mailbox.acceptable(peer_pk.clone()).await, + Acceptable::Unknown + ); + assert_eq!( + mailbox.acceptable(peer_pk2.clone()).await, + Acceptable::Unknown + ); + assert_eq!( + mailbox.acceptable(peer_pk3.clone()).await, + Acceptable::Unknown + ); oracle .update(0, [peer_pk.clone(), peer_pk2.clone()].try_into().unwrap()) @@ -805,11 +818,11 @@ mod tests { context.sleep(Duration::from_millis(10)).await; // Not listenable because self - assert!(!mailbox.acceptable(peer_pk).await); + assert_eq!(mailbox.acceptable(peer_pk).await, Acceptable::Unknown); // Listenable because registered - assert!(mailbox.acceptable(peer_pk2).await); + assert_eq!(mailbox.acceptable(peer_pk2).await, Acceptable::Yes); // Not listenable because not registered - assert!(!mailbox.acceptable(peer_pk3).await); + assert_eq!(mailbox.acceptable(peer_pk3).await, Acceptable::Unknown); }); } @@ -834,12 +847,15 @@ mod tests { .await; context.sleep(Duration::from_millis(10)).await; // Allow register to process - assert!(mailbox.acceptable(peer_pk.clone()).await); + assert_eq!(mailbox.acceptable(peer_pk.clone()).await, Acceptable::Yes); let reservation = mailbox.listen(peer_pk.clone()).await; assert!(reservation.is_some()); - assert!(!mailbox.acceptable(peer_pk.clone()).await); + assert_eq!( + mailbox.acceptable(peer_pk.clone()).await, + Acceptable::Unknown + ); let failed_reservation = mailbox.listen(peer_pk.clone()).await; assert!(failed_reservation.is_none()); diff --git a/p2p/src/authenticated/discovery/actors/tracker/directory.rs b/p2p/src/authenticated/discovery/actors/tracker/directory.rs index c3bfa6093c..d89bd6288e 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/directory.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/directory.rs @@ -17,6 +17,7 @@ use rand::{seq::IteratorRandom, Rng}; use std::{ collections::{BTreeMap, HashMap}, ops::Deref, + time::Duration, }; use tracing::{debug, warn}; @@ -37,6 +38,9 @@ pub struct Config { /// The rate limit for allowing reservations per-peer. pub rate_limit: Quota, + + /// Duration after which blocked peers are automatically unblocked. + pub block_duration: Duration, } /// Represents a collection of records for all peers. @@ -57,6 +61,9 @@ pub struct Directory { /// peers for its peer info again. dial_fail_limit: usize, + /// Duration after which blocked peers are automatically unblocked. + block_duration: Duration, + // ---------- State ---------- /// The records of all peers. peers: HashMap>, @@ -107,6 +114,7 @@ impl Directory { allow_dns: cfg.allow_dns, max_sets: cfg.max_sets, dial_fail_limit: cfg.dial_fail_limit, + block_duration: cfg.block_duration, peers, sets: BTreeMap::new(), rate_limiter, @@ -131,7 +139,8 @@ impl Directory { record.dial_failure(ingress); } - let want = record.want(self.dial_fail_limit); + let now = self.context.current(); + let want = record.want(now, self.dial_fail_limit); for set in self.sets.values_mut() { set.update(peer, !want); } @@ -152,7 +161,8 @@ impl Directory { record.connect(); // We may have to update the sets. - let want = record.want(self.dial_fail_limit); + let now = self.context.current(); + let want = record.want(now, self.dial_fail_limit); for set in self.sets.values_mut() { set.update(peer, !want); } @@ -160,6 +170,7 @@ impl Directory { /// Using a list of (already-validated) peer information, update the records. pub fn update_peers(&mut self, infos: Vec>) { + let now = self.context.current(); for info in infos { // Update peer address // @@ -170,7 +181,7 @@ impl Directory { let Some(record) = self.peers.get_mut(&peer) else { continue; }; - if !record.update(info) { + if !record.update(now, info) { continue; } self.metrics @@ -179,7 +190,7 @@ impl Directory { .inc(); // We may have to update the sets. - let want = record.want(self.dial_fail_limit); + let want = record.want(now, self.dial_fail_limit); for set in self.sets.values_mut() { set.update(&peer, !want); } @@ -204,6 +215,7 @@ impl Directory { } // Create and store new peer set + let now = self.context.current(); let mut set = Set::new(peers.clone()); for peer in peers.iter() { let record = self.peers.entry(peer.clone()).or_insert_with(|| { @@ -211,7 +223,7 @@ impl Directory { Record::unknown() }); record.increment(); - set.update(peer, !record.want(self.dial_fail_limit)); + set.update(peer, !record.want(now, self.dial_fail_limit)); } self.sets.insert(index, set); @@ -268,7 +280,12 @@ impl Directory { /// Attempt to block a peer, updating the metrics accordingly. pub fn block(&mut self, peer: &C) { - if self.peers.get_mut(peer).is_some_and(|r| r.block()) { + let blocked_until = self.context.current() + self.block_duration; + if self + .peers + .get_mut(peer) + .is_some_and(|r| r.block(blocked_until)) + { self.metrics.blocked.inc(); } } @@ -334,25 +351,39 @@ impl Directory { /// /// A peer is eligible if it is in a peer set (or is persistent), not blocked, and not ourselves. pub fn eligible(&self, peer: &C) -> bool { - self.peers.get(peer).is_some_and(|r| r.eligible()) + let now = self.context.current(); + self.peers.get(peer).is_some_and(|r| r.eligible(now)) } /// Returns a vector of dialable peers. That is, unconnected peers for which we have an ingress. pub fn dialable(&self) -> Vec { // Collect peers with known addresses + let now = self.context.current(); let mut result: Vec<_> = self .peers .iter() - .filter(|&(_, r)| r.dialable(self.allow_private_ips, self.allow_dns)) + .filter(|&(_, r)| r.dialable(now, self.allow_private_ips, self.allow_dns)) .map(|(peer, _)| peer.clone()) .collect(); result.sort(); result } - /// Returns true if this peer is acceptable (can accept an incoming connection from them). - pub fn acceptable(&self, peer: &C) -> bool { - self.peers.get(peer).is_some_and(|r| r.acceptable()) + /// Returns the acceptance status for a peer. + pub fn acceptable(&self, peer: &C) -> super::ingress::Acceptable { + use super::ingress::Acceptable; + let now = self.context.current(); + let Some(record) = self.peers.get(peer) else { + return Acceptable::Unknown; + }; + if record.blocked(now) { + return Acceptable::Blocked; + } + if record.acceptable(now) { + Acceptable::Yes + } else { + Acceptable::Unknown + } } // --------- Helpers ---------- @@ -384,7 +415,8 @@ impl Directory { } // Reserve - if record.reserve() { + let now = self.context.current(); + if record.reserve(now) { self.metrics.reserved.inc(); return Some(Reservation::new(metadata, self.releaser.clone())); } @@ -401,7 +433,8 @@ impl Directory { if !record.deletable() { return false; } - if record.blocked() { + let now = self.context.current(); + if record.blocked(now) { self.metrics.blocked.dec(); } self.peers.remove(peer); diff --git a/p2p/src/authenticated/discovery/actors/tracker/ingress.rs b/p2p/src/authenticated/discovery/actors/tracker/ingress.rs index 2759b86c62..4afcdb15a3 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/ingress.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/ingress.rs @@ -11,6 +11,17 @@ use commonware_cryptography::PublicKey; use commonware_utils::ordered::Set; use futures::channel::{mpsc, oneshot}; +/// Result of checking if a peer is acceptable for connection. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Acceptable { + /// Peer is acceptable for connection. + Yes, + /// Peer is blocked. + Blocked, + /// Peer is unknown (not in any peer set). + Unknown, +} + /// Messages that can be sent to the tracker actor. #[derive(Debug)] pub enum Message { @@ -110,8 +121,8 @@ pub enum Message { /// The public key of the peer to check. public_key: C, - /// The sender to respond with whether the peer is acceptable. - responder: oneshot::Sender, + /// The sender to respond with the acceptance status. + responder: oneshot::Sender, }, /// Request a reservation for a particular peer. @@ -185,7 +196,7 @@ impl UnboundedMailbox> { } /// Send an `Acceptable` message to the tracker. - pub async fn acceptable(&mut self, public_key: C) -> bool { + pub async fn acceptable(&mut self, public_key: C) -> Acceptable { let (tx, rx) = oneshot::channel(); self.send(Message::Acceptable { public_key, diff --git a/p2p/src/authenticated/discovery/actors/tracker/mod.rs b/p2p/src/authenticated/discovery/actors/tracker/mod.rs index 4f069d558c..a5e8a0397d 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/mod.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/mod.rs @@ -15,7 +15,7 @@ mod reservation; mod set; pub use actor::Actor; -pub use ingress::{Message, Oracle}; +pub use ingress::{Acceptable, Message, Oracle}; pub use metadata::Metadata; pub use reservation::Reservation; @@ -31,6 +31,7 @@ pub struct Config { pub tracked_peer_sets: usize, pub max_peer_set_size: u64, pub allowed_connection_rate_per_peer: Quota, + pub block_duration: Duration, pub peer_gossip_max_count: usize, pub dial_fail_limit: usize, } diff --git a/p2p/src/authenticated/discovery/actors/tracker/record.rs b/p2p/src/authenticated/discovery/actors/tracker/record.rs index 24cdaacf20..8ac8006c69 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/record.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/record.rs @@ -1,5 +1,6 @@ use crate::{authenticated::discovery::types::Info, Ingress}; use commonware_cryptography::PublicKey; +use std::time::SystemTime; use tracing::trace; /// Represents information known about a peer's address. @@ -21,9 +22,9 @@ pub enum Address { /// The `usize` indicates the number of times dialing this record has failed. Discovered(Info, usize), - /// Peer is blocked. - /// We don't care to track its information. - Blocked, + /// Peer is blocked until the given time. + /// The block expires after the specified time, allowing the peer to reconnect. + Blocked(SystemTime), } /// Represents the connection status of a peer. @@ -96,11 +97,11 @@ impl Record { /// Attempt to update the [Info] of a discovered peer. /// /// Returns true if the update was successful. - pub fn update(&mut self, info: Info) -> bool { + pub fn update(&mut self, now: SystemTime, info: Info) -> bool { match &self.address { Address::Myself(_) => false, - Address::Blocked => false, - Address::Unknown | Address::Bootstrapper(_) => { + Address::Blocked(until) if now < *until => false, + Address::Blocked(_) | Address::Unknown | Address::Bootstrapper(_) => { self.address = Address::Discovered(info, 0); true } @@ -124,15 +125,15 @@ impl Record { } } - /// Attempt to mark the peer as blocked. + /// Attempt to mark the peer as blocked until the specified time. /// /// Returns `true` if the peer was newly blocked. /// Returns `false` if the peer was already blocked or is the local node (unblockable). - pub fn block(&mut self) -> bool { - if matches!(self.address, Address::Blocked | Address::Myself(_)) { + pub fn block(&mut self, blocked_until: SystemTime) -> bool { + if matches!(self.address, Address::Blocked(_) | Address::Myself(_)) { return false; } - self.address = Address::Blocked; + self.address = Address::Blocked(blocked_until); self.persistent = false; true } @@ -154,9 +155,11 @@ impl Record { /// Attempt to reserve the peer for connection. /// /// Returns `true` if the reservation was successful, `false` otherwise. - pub const fn reserve(&mut self) -> bool { - if matches!(self.address, Address::Blocked | Address::Myself(_)) { - return false; + pub fn reserve(&mut self, now: SystemTime) -> bool { + match &self.address { + Address::Blocked(until) if now < *until => return false, + Address::Myself(_) => return false, + _ => {} } if matches!(self.status, Status::Inert) { self.status = Status::Reserved; @@ -202,9 +205,9 @@ impl Record { // ---------- Getters ---------- - /// Returns `true` if the record is blocked. - pub const fn blocked(&self) -> bool { - matches!(self.address, Address::Blocked) + /// Returns `true` if the record is blocked (and block hasn't expired). + pub fn blocked(&self, now: SystemTime) -> bool { + matches!(&self.address, Address::Blocked(until) if now < *until) } /// Returns the number of peer sets this peer is part of. @@ -216,16 +219,18 @@ impl Record { /// /// A record is dialable if: /// - We have the ingress address of the peer - /// - It is not ourselves or blocked + /// - It is not ourselves or blocked (or block has expired) /// - We are not already connected or reserved /// - The ingress address is allowed (DNS enabled, Socket IP is global or private IPs allowed) /// /// Note: For DNS addresses, private IP checks are performed in the dialer after resolution. - pub fn dialable(&self, allow_private_ips: bool, allow_dns: bool) -> bool { + pub fn dialable(&self, now: SystemTime, allow_private_ips: bool, allow_dns: bool) -> bool { if self.status != Status::Inert { return false; } let ingress = match &self.address { + Address::Blocked(until) if now < *until => return false, + Address::Blocked(_) => return false, // Blocked with expired time but no ingress Address::Bootstrapper(ingress) => ingress, Address::Discovered(info, _) => &info.ingress, _ => return false, @@ -238,8 +243,8 @@ impl Record { /// A peer is acceptable if: /// - The peer is eligible (in a peer set, not blocked, not ourselves) /// - We are not already connected or reserved - pub fn acceptable(&self) -> bool { - self.eligible() && self.status == Status::Inert + pub fn acceptable(&self, now: SystemTime) -> bool { + self.eligible(now) && self.status == Status::Inert } /// Return the ingress address of the peer, if known. @@ -249,7 +254,7 @@ impl Record { Address::Myself(info) => Some(&info.ingress), Address::Bootstrapper(ingress) => Some(ingress), Address::Discovered(info, _) => Some(&info.ingress), - Address::Blocked => None, + Address::Blocked(_) => None, } } @@ -261,7 +266,7 @@ impl Record { Address::Myself(info) => Some(info), Address::Bootstrapper(_) => None, Address::Discovered(info, _) => (self.status == Status::Active).then_some(info), - Address::Blocked => None, + Address::Blocked(_) => None, } .cloned() } @@ -274,21 +279,22 @@ impl Record { /// Returns `true` if we want to ask for updated peer information for this peer. /// - /// - Returns `false` for `Myself` and `Blocked` addresses. + /// - Returns `false` for `Myself` and actively `Blocked` addresses. /// - Returns `true` for addresses for which we don't have peer info. /// - Returns true for addresses for which we do have peer info if-and-only-if we have failed to /// dial at least `min_fails` times. - pub fn want(&self, min_fails: usize) -> bool { + pub fn want(&self, now: SystemTime, min_fails: usize) -> bool { // Ignore how many sets the peer is part of. // If the peer is not in any sets, this function is not called anyway. // Return true if we either: // - Don't have signed peer info // - Are not connected to the peer and have failed dialing it - match self.address { - Address::Myself(_) | Address::Blocked => false, - Address::Unknown | Address::Bootstrapper(_) => true, - Address::Discovered(_, fails) => self.status != Status::Active && fails >= min_fails, + match &self.address { + Address::Myself(_) => false, + Address::Blocked(until) if now < *until => false, + Address::Blocked(_) | Address::Unknown | Address::Bootstrapper(_) => true, + Address::Discovered(_, fails) => self.status != Status::Active && *fails >= min_fails, } } @@ -300,14 +306,16 @@ impl Record { /// Returns `true` if this peer is eligible for connection. /// /// A peer is eligible if: - /// - It is not blocked or ourselves + /// - It is not blocked (or block has expired) or ourselves /// - It is part of at least one peer set (or is persistent, e.g., bootstrapper) - pub const fn eligible(&self) -> bool { - match self.address { - Address::Blocked | Address::Myself(_) => false, - Address::Bootstrapper(_) | Address::Unknown | Address::Discovered(_, _) => { - self.sets > 0 || self.persistent - } + pub fn eligible(&self, now: SystemTime) -> bool { + match &self.address { + Address::Blocked(until) if now < *until => false, + Address::Myself(_) => false, + Address::Blocked(_) + | Address::Bootstrapper(_) + | Address::Unknown + | Address::Discovered(_, _) => self.sets > 0 || self.persistent, } } } @@ -316,10 +324,20 @@ mod tests { use super::*; use crate::authenticated::discovery::types; use commonware_cryptography::secp256r1::standard::{PrivateKey, PublicKey}; - use std::net::SocketAddr; + use std::{net::SocketAddr, time::Duration}; const NAMESPACE: &[u8] = b"test"; + // Helper to get a SystemTime for testing + fn now() -> SystemTime { + SystemTime::UNIX_EPOCH + Duration::from_secs(1000) + } + + // Helper to get a block expiry time in the future + fn future_block_time() -> SystemTime { + now() + Duration::from_secs(3600) + } + // Helper function to create signed peer info for testing fn create_peer_info( signer_seed: u64, @@ -369,11 +387,11 @@ mod tests { assert!(!record.persistent); assert!(record.ingress().is_none()); assert!(record.sharable().is_none()); - assert!(!record.blocked()); + assert!(!record.blocked(now())); assert!(!record.reserved()); - assert!(record.want(0), "Should want info for unknown peer"); + assert!(record.want(now(), 0), "Should want info for unknown peer"); assert!(record.deletable()); - assert!(!record.eligible()); + assert!(!record.eligible(now())); } #[test] @@ -391,11 +409,11 @@ mod tests { record.sharable().as_ref(), &my_info )); - assert!(!record.blocked()); + assert!(!record.blocked(now())); assert!(!record.reserved()); - assert!(!record.want(0), "Should not want info for myself"); + assert!(!record.want(now(), 0), "Should not want info for myself"); assert!(!record.deletable()); - assert!(!record.eligible()); + assert!(!record.eligible(now())); } #[test] @@ -409,11 +427,11 @@ mod tests { assert!(record.persistent); assert_eq!(record.ingress(), Some(&ingress)); assert!(record.sharable().is_none()); - assert!(!record.blocked()); + assert!(!record.blocked(now())); assert!(!record.reserved()); - assert!(record.want(0), "Should want info for bootstrapper"); + assert!(record.want(now(), 0), "Should want info for bootstrapper"); assert!(!record.deletable()); - assert!(record.eligible()); + assert!(record.eligible(now())); } #[test] @@ -422,7 +440,7 @@ mod tests { let mut record = Record::::unknown(); let peer_info = create_peer_info::(1, socket, 1000); - assert!(record.update(peer_info.clone())); + assert!(record.update(now(), peer_info.clone())); assert_eq!(record.ingress(), Some(&peer_info.ingress)); assert!( matches!(&record.address, Address::Discovered(info, 0) if peer_info_contents_are_equal(info, &peer_info)), @@ -439,7 +457,7 @@ mod tests { let peer_info = create_peer_info::(2, socket, 1000); assert!(record.persistent, "Should start as persistent"); - assert!(record.update(peer_info.clone())); + assert!(record.update(now(), peer_info.clone())); assert_eq!(record.ingress(), Some(&peer_info.ingress)); assert!( matches!(&record.address, Address::Discovered(info, 0) if peer_info_contents_are_equal(info, &peer_info)), @@ -456,8 +474,8 @@ mod tests { let peer_info_old = create_peer_info::(3, socket, 1000); let peer_info_new = create_peer_info::(3, socket, 2000); - assert!(record.update(peer_info_old)); - assert!(record.update(peer_info_new.clone())); + assert!(record.update(now(), peer_info_old)); + assert!(record.update(now(), peer_info_new.clone())); assert_eq!(record.ingress(), Some(&peer_info_new.ingress)); assert!( @@ -474,15 +492,15 @@ mod tests { let peer_info_older = create_peer_info::(5, socket, 500); let peer_info_equal = create_peer_info::(5, socket, 1000); - assert!(record.update(peer_info_current.clone())); + assert!(record.update(now(), peer_info_current.clone())); - assert!(!record.update(peer_info_older)); + assert!(!record.update(now(), peer_info_older)); assert!( matches!(&record.address, Address::Discovered(info, 0) if peer_info_contents_are_equal(info, &peer_info_current)), "Address should not update with older info" ); - assert!(!record.update(peer_info_equal)); + assert!(!record.update(now(), peer_info_equal)); assert!( matches!(&record.address, Address::Discovered(info, 0) if peer_info_contents_are_equal(info, &peer_info_current)), "Address should not update with equal timestamp info" @@ -497,8 +515,8 @@ mod tests { let newer_my_info = create_peer_info::(0, test_socket(), 300); // Cannot update Myself record with other info or newer self info - assert!(!record_myself.update(other_info.clone())); - assert!(!record_myself.update(newer_my_info)); + assert!(!record_myself.update(now(), other_info.clone())); + assert!(!record_myself.update(now(), newer_my_info)); assert!( matches!(&record_myself.address, Address::Myself(info) if peer_info_contents_are_equal(info, &my_info)), "Myself record should remain unchanged" @@ -506,9 +524,9 @@ mod tests { // Cannot update a Blocked record let mut record_blocked = Record::::unknown(); - assert!(record_blocked.block()); - assert!(!record_blocked.update(other_info)); - assert!(matches!(record_blocked.address, Address::Blocked)); + assert!(record_blocked.block(future_block_time())); + assert!(!record_blocked.update(now(), other_info)); + assert!(matches!(record_blocked.address, Address::Blocked(_))); } #[test] @@ -521,14 +539,14 @@ mod tests { let peer_info_pk1_ts1000 = create_peer_info::(10, socket, 1000); let peer_info_pk2_ts2000 = create_peer_info::(11, socket, 2000); - assert!(record.update(peer_info_pk1_ts1000.clone())); + assert!(record.update(now(), peer_info_pk1_ts1000.clone())); assert!( matches!(&record.address, Address::Discovered(info, 0) if peer_info_contents_are_equal(info, &peer_info_pk1_ts1000)) ); // Update should succeed based on newer timestamp, even if PK differs (though context matters) assert!( - record.update(peer_info_pk2_ts2000.clone()), + record.update(now(), peer_info_pk2_ts2000.clone()), "Update should succeed based on newer timestamp" ); assert!( @@ -549,7 +567,7 @@ mod tests { // Test Discovered (not persistent) let peer_info = create_peer_info::(7, test_socket(), 1000); let mut record_disc = Record::::unknown(); - assert!(record_disc.update(peer_info)); + assert!(record_disc.update(now(), peer_info)); assert!(record_disc.deletable()); record_disc.increment(); // sets = 1 assert!(!record_disc.deletable()); @@ -589,37 +607,37 @@ mod tests { // Block an Unknown record let mut record_unknown = Record::::unknown(); assert!(!record_unknown.persistent); - assert!(record_unknown.block()); // Newly blocked - assert!(record_unknown.blocked()); - assert!(matches!(record_unknown.address, Address::Blocked)); + assert!(record_unknown.block(future_block_time())); // Newly blocked + assert!(record_unknown.blocked(now())); + assert!(matches!(record_unknown.address, Address::Blocked(_))); assert_eq!(record_unknown.status, Status::Inert); assert!(!record_unknown.persistent, "Blocking sets persistent=false"); - assert!(!record_unknown.block()); // Already blocked + assert!(!record_unknown.block(future_block_time())); // Already blocked // Block a Bootstrapper record (initially persistent) let mut record_boot = Record::::bootstrapper(test_socket()); assert!(record_boot.persistent); - assert!(record_boot.block()); - assert!(record_boot.blocked()); - assert!(matches!(record_boot.address, Address::Blocked)); + assert!(record_boot.block(future_block_time())); + assert!(record_boot.blocked(now())); + assert!(matches!(record_boot.address, Address::Blocked(_))); assert!(!record_boot.persistent, "Blocking sets persistent=false"); // Block a Discovered record (initially not persistent) let mut record_disc = Record::::unknown(); - assert!(record_disc.update(sample_peer_info.clone())); + assert!(record_disc.update(now(), sample_peer_info.clone())); assert!(!record_disc.persistent); - assert!(record_disc.block()); - assert!(record_disc.blocked()); - assert!(matches!(record_disc.address, Address::Blocked)); + assert!(record_disc.block(future_block_time())); + assert!(record_disc.blocked(now())); + assert!(matches!(record_disc.address, Address::Blocked(_))); assert!(!record_disc.persistent); // Block a Discovered record that came from a Bootstrapper (initially persistent) let mut record_disc_from_boot = Record::::bootstrapper(test_socket()); - assert!(record_disc_from_boot.update(sample_peer_info.clone())); + assert!(record_disc_from_boot.update(now(), sample_peer_info.clone())); assert!(record_disc_from_boot.persistent); - assert!(record_disc_from_boot.block()); - assert!(record_disc_from_boot.blocked()); - assert!(matches!(record_disc_from_boot.address, Address::Blocked)); + assert!(record_disc_from_boot.block(future_block_time())); + assert!(record_disc_from_boot.blocked(now())); + assert!(matches!(record_disc_from_boot.address, Address::Blocked(_))); assert!( !record_disc_from_boot.persistent, "Blocking sets persistent=false" @@ -627,16 +645,16 @@ mod tests { // Check status remains unchanged when blocking let mut record_reserved = Record::::unknown(); - assert!(record_reserved.update(sample_peer_info.clone())); - assert!(record_reserved.reserve()); - assert!(record_reserved.block()); + assert!(record_reserved.update(now(), sample_peer_info.clone())); + assert!(record_reserved.reserve(now())); + assert!(record_reserved.block(future_block_time())); assert_eq!(record_reserved.status, Status::Reserved); let mut record_active = Record::::unknown(); - assert!(record_active.update(sample_peer_info)); - assert!(record_active.reserve()); + assert!(record_active.update(now(), sample_peer_info)); + assert!(record_active.reserve(now())); record_active.connect(); - assert!(record_active.block()); + assert!(record_active.block(future_block_time())); assert_eq!(record_active.status, Status::Active); } @@ -644,18 +662,21 @@ mod tests { fn test_block_myself_and_already_blocked() { let my_info = create_peer_info::(0, test_socket(), 100); let mut record_myself = Record::myself(my_info.clone()); - assert!(!record_myself.block(), "Cannot block myself"); + assert!( + !record_myself.block(future_block_time()), + "Cannot block myself" + ); assert!( matches!(&record_myself.address, Address::Myself(info) if peer_info_contents_are_equal(info, &my_info)) ); let mut record_to_be_blocked = Record::::unknown(); - assert!(record_to_be_blocked.block()); + assert!(record_to_be_blocked.block(future_block_time())); assert!( - !record_to_be_blocked.block(), + !record_to_be_blocked.block(future_block_time()), "Cannot block already blocked peer" ); - assert!(matches!(record_to_be_blocked.address, Address::Blocked)); + assert!(matches!(record_to_be_blocked.address, Address::Blocked(_))); } #[test] @@ -663,25 +684,25 @@ mod tests { let mut record = Record::::unknown(); assert_eq!(record.status, Status::Inert); - assert!(record.reserve()); + assert!(record.reserve(now())); assert_eq!(record.status, Status::Reserved); assert!(record.reserved()); - assert!(!record.reserve(), "Cannot re-reserve when Reserved"); + assert!(!record.reserve(now()), "Cannot re-reserve when Reserved"); assert_eq!(record.status, Status::Reserved); record.connect(); assert_eq!(record.status, Status::Active); assert!(record.reserved()); // reserved() is true for Active too - assert!(!record.reserve(), "Cannot reserve when Active"); + assert!(!record.reserve(now()), "Cannot reserve when Active"); assert_eq!(record.status, Status::Active); record.release(); // Release from Active assert_eq!(record.status, Status::Inert); assert!(!record.reserved()); - assert!(record.reserve()); // Reserve again + assert!(record.reserve(now())); // Reserve again assert_eq!(record.status, Status::Reserved); record.release(); // Release from Reserved assert_eq!(record.status, Status::Inert); @@ -698,7 +719,7 @@ mod tests { #[should_panic] fn test_connect_when_active_panics() { let mut record = Record::::unknown(); - assert!(record.reserve()); + assert!(record.reserve(now())); record.connect(); record.connect(); // Should panic } @@ -732,14 +753,14 @@ mod tests { // Blocked: Not sharable let mut record_blocked = Record::::unknown(); - record_blocked.block(); + record_blocked.block(future_block_time()); assert!(record_blocked.sharable().is_none()); // Discovered but not Active: Not sharable let mut record_disc = Record::::unknown(); - assert!(record_disc.update(peer_info_data.clone())); + assert!(record_disc.update(now(), peer_info_data.clone())); assert!(record_disc.sharable().is_none()); // Status Inert - assert!(record_disc.reserve()); + assert!(record_disc.reserve(now())); assert!(record_disc.sharable().is_none()); // Status Reserved // Discovered and Active: Sharable @@ -758,7 +779,7 @@ mod tests { fn test_reserved_status_check() { let mut record = Record::::unknown(); assert!(!record.reserved()); // Inert - assert!(record.reserve()); + assert!(record.reserve(now())); assert!(record.reserved()); // Reserved record.connect(); assert!(record.reserved()); // Active @@ -778,7 +799,7 @@ mod tests { assert!(matches!(record.address, Address::Unknown)); // Discover - assert!(record.update(peer_info)); + assert!(record.update(now(), peer_info)); assert!(matches!(&record.address, Address::Discovered(_, 0))); // Fail dial 1 @@ -817,59 +838,62 @@ mod tests { let min_fails = 2; // Unknown and Bootstrapper always want info - assert!(Record::::unknown().want(min_fails)); - assert!(Record::::bootstrapper(socket).want(min_fails)); + assert!(Record::::unknown().want(now(), min_fails)); + assert!(Record::::bootstrapper(socket).want(now(), min_fails)); // Myself and Blocked never want info - assert!(!Record::myself(peer_info.clone()).want(min_fails)); + assert!(!Record::myself(peer_info.clone()).want(now(), min_fails)); let mut blocked = Record::::unknown(); - blocked.block(); - assert!(!blocked.want(min_fails)); + blocked.block(future_block_time()); + assert!(!blocked.want(now(), min_fails)); let mut record_disc = Record::::unknown(); - assert!(record_disc.update(peer_info)); + assert!(record_disc.update(now(), peer_info)); // Status Inert assert!( - !record_disc.want(min_fails), + !record_disc.want(now(), min_fails), "Should not want when fails=0 < min_fails" ); record_disc.dial_failure(&ingress); // fails = 1 assert!( - !record_disc.want(min_fails), + !record_disc.want(now(), min_fails), "Should not want when fails=1 < min_fails" ); record_disc.dial_failure(&ingress); // fails = 2 assert!( - record_disc.want(min_fails), + record_disc.want(now(), min_fails), "Should want when fails=2 >= min_fails" ); // Status Reserved - assert!(record_disc.reserve()); + assert!(record_disc.reserve(now())); assert!( - record_disc.want(min_fails), + record_disc.want(now(), min_fails), "Should still want when Reserved and fails >= min_fails" ); // Status Active record_disc.connect(); - assert!(!record_disc.want(min_fails), "Should not want when Active"); + assert!( + !record_disc.want(now(), min_fails), + "Should not want when Active" + ); // Status Inert again (after release) record_disc.release(); - assert!(record_disc.want(min_fails)); + assert!(record_disc.want(now(), min_fails)); // Reset failures record_disc.dial_success(); // Reset failures assert!( - !record_disc.want(min_fails), + !record_disc.want(now(), min_fails), "Should not want when Inert and fails=0" ); record_disc.dial_failure(&ingress); // fails = 1 - assert!(!record_disc.want(min_fails)); + assert!(!record_disc.want(now(), min_fails)); record_disc.dial_failure(&ingress); // fails = 2 - assert!(record_disc.want(min_fails)); + assert!(record_disc.want(now(), min_fails)); } #[test] @@ -880,7 +904,7 @@ mod tests { assert!(!Record::myself(peer_info.clone()).deletable()); assert!(!Record::::bootstrapper(test_socket()).deletable()); let mut record_pers = Record::::bootstrapper(test_socket()); - assert!(record_pers.update(peer_info)); + assert!(record_pers.update(now(), peer_info)); assert!(!record_pers.deletable()); // Non-persistent records depend on sets count and status @@ -892,7 +916,7 @@ mod tests { record.increment(); // sets = 1 assert!(!record.deletable()); // sets != 0 - assert!(record.reserve()); // status = Reserved + assert!(record.reserve(now())); // status = Reserved assert!(!record.deletable()); // status != Inert record.connect(); // status = Active @@ -908,7 +932,7 @@ mod tests { let mut record_blocked = Record::::bootstrapper(test_socket()); assert!(record_blocked.persistent); record_blocked.increment(); // sets = 1 - assert!(record_blocked.block()); + assert!(record_blocked.block(future_block_time())); assert!(!record_blocked.persistent); assert!(!record_blocked.deletable()); // sets = 1 record_blocked.decrement(); // sets = 0 @@ -921,28 +945,28 @@ mod tests { // Blocked and Myself are never allowed let mut record_blocked = Record::::unknown(); - record_blocked.block(); - assert!(!record_blocked.eligible()); - assert!(!Record::myself(peer_info.clone()).eligible()); + record_blocked.block(future_block_time()); + assert!(!record_blocked.eligible(now())); + assert!(!Record::myself(peer_info.clone()).eligible(now())); // Persistent records (Bootstrapper, Myself before blocking) are allowed even with sets=0 - assert!(Record::::bootstrapper(test_socket()).eligible()); + assert!(Record::::bootstrapper(test_socket()).eligible(now())); let mut record_pers = Record::::bootstrapper(test_socket()); - assert!(record_pers.update(peer_info.clone())); - assert!(record_pers.eligible()); + assert!(record_pers.update(now(), peer_info.clone())); + assert!(record_pers.eligible(now())); // Non-persistent records (Unknown, Discovered) require sets > 0 let mut record_unknown = Record::::unknown(); - assert!(!record_unknown.eligible()); // sets = 0, !persistent + assert!(!record_unknown.eligible(now())); // sets = 0, !persistent record_unknown.increment(); // sets = 1 - assert!(record_unknown.eligible()); // sets > 0 + assert!(record_unknown.eligible(now())); // sets > 0 record_unknown.decrement(); // sets = 0 - assert!(!record_unknown.eligible()); + assert!(!record_unknown.eligible(now())); let mut record_disc = Record::::unknown(); - assert!(record_disc.update(peer_info)); - assert!(!record_disc.eligible()); // sets = 0, !persistent + assert!(record_disc.update(now(), peer_info)); + assert!(!record_disc.eligible(now())); // sets = 0, !persistent record_disc.increment(); // sets = 1 - assert!(record_disc.eligible()); // sets > 0 + assert!(record_disc.eligible(now())); // sets > 0 } } diff --git a/p2p/src/authenticated/discovery/config.rs b/p2p/src/authenticated/discovery/config.rs index fc828500e4..4c0b6d970e 100644 --- a/p2p/src/authenticated/discovery/config.rs +++ b/p2p/src/authenticated/discovery/config.rs @@ -67,6 +67,9 @@ pub struct Config { /// Quota for connection attempts per peer (incoming or outgoing). pub allowed_connection_rate_per_peer: Quota, + /// Duration after which blocked peers are automatically unblocked. + pub block_duration: Duration, + /// Maximum number of concurrent handshake attempts allowed. pub max_concurrent_handshakes: NonZeroU32, @@ -148,6 +151,7 @@ impl Config { max_handshake_age: Duration::from_secs(10), handshake_timeout: Duration::from_secs(5), allowed_connection_rate_per_peer: Quota::per_minute(NZU32!(1)), + block_duration: Duration::from_secs(3600), // 1 hour max_concurrent_handshakes: NZU32!(512), allowed_handshake_rate_per_ip: Quota::with_period(Duration::from_secs(5)).unwrap(), // 1 concurrent handshake per IP allowed_handshake_rate_per_subnet: Quota::per_second(NZU32!(64)), @@ -190,6 +194,7 @@ impl Config { max_handshake_age: Duration::from_secs(10), handshake_timeout: Duration::from_secs(5), allowed_connection_rate_per_peer: Quota::per_second(NZU32!(1)), + block_duration: Duration::from_secs(3600), // 1 hour max_concurrent_handshakes: NZU32!(1_024), allowed_handshake_rate_per_ip: Quota::per_second(NZU32!(16)), // 80 concurrent handshakes per IP allowed_handshake_rate_per_subnet: Quota::per_second(NZU32!(128)), @@ -225,6 +230,7 @@ impl Config { max_handshake_age: Duration::from_secs(10), handshake_timeout: Duration::from_secs(5), allowed_connection_rate_per_peer: Quota::per_second(NZU32!(4)), + block_duration: Duration::from_secs(60), // 1 minute for tests max_concurrent_handshakes: NZU32!(1_024), allowed_handshake_rate_per_ip: Quota::per_second(NZU32!(128)), // 640 concurrent handshakes per IP allowed_handshake_rate_per_subnet: Quota::per_second(NZU32!(256)), diff --git a/p2p/src/authenticated/discovery/network.rs b/p2p/src/authenticated/discovery/network.rs index 4590217269..11db6e9eb6 100644 --- a/p2p/src/authenticated/discovery/network.rs +++ b/p2p/src/authenticated/discovery/network.rs @@ -66,6 +66,7 @@ impl, - mailbox: mpsc::Receiver>, + blocked_ips: HashMap, + mailbox: mpsc::Receiver, handshakes_blocked: Counter, handshakes_concurrent_rate_limited: Counter, handshakes_ip_rate_limited: Counter, @@ -59,12 +61,16 @@ pub struct Actor Actor { - pub fn new(context: E, cfg: Config, mailbox: mpsc::Receiver>) -> Self { + pub fn new( + context: E, + cfg: Config, + mailbox: mpsc::Receiver, + ) -> Self { // Create metrics let handshakes_blocked = Counter::default(); context.register( "handshakes_blocked", - "number of handshake attempts blocked because the IP was not registered", + "number of handshake attempts blocked because the IP was blocked or not registered", handshakes_blocked.clone(), ); let handshakes_concurrent_rate_limited = Counter::default(); @@ -97,6 +103,7 @@ impl Actor< allowed_handshake_rate_per_ip: cfg.allowed_handshake_rate_per_ip, allowed_handshake_rate_per_subnet: cfg.allowed_handshake_rate_per_subnet, registered_ips: HashSet::new(), + blocked_ips: HashMap::new(), mailbox, handshakes_blocked, handshakes_concurrent_rate_limited, @@ -185,11 +192,12 @@ impl Actor< debug!("context shutdown, stopping listener"); }, update = self.mailbox.next() => { - let Some(registered_ips) = update else { + let Some(filter) = update else { debug!("mailbox closed"); break; }; - self.registered_ips = registered_ips; + self.registered_ips = filter.registered_ips; + self.blocked_ips = filter.blocked_ips; }, listener = listener.accept() => { // Accept a new connection @@ -210,10 +218,19 @@ impl Actor< continue; } + // Check whether the IP is from a blocked peer + if let Some(&blocked_until) = self.blocked_ips.get(&ip) { + if self.context.current() < blocked_until { + self.handshakes_blocked.inc(); + debug!(?address, "rejecting blocked peer"); + continue; + } + } + // Check whether the IP is registered if !self.attempt_unregistered_handshakes && !self.registered_ips.contains(&ip) { self.handshakes_blocked.inc(); - debug!(?address, "rejecting unregistered address"); + debug!(?address, "rejecting unknown address"); continue; } @@ -322,10 +339,13 @@ mod tests { updates_rx, ); - let mut allowed = HashSet::new(); - allowed.insert(IpAddr::V4(Ipv4Addr::LOCALHOST)); + let mut registered_ips = HashSet::new(); + registered_ips.insert(IpAddr::V4(Ipv4Addr::LOCALHOST)); updates_tx - .send(allowed) + .send(tracker::ListenerFilter { + registered_ips, + blocked_ips: HashMap::new(), + }) .await .expect("update registered ips"); @@ -653,10 +673,13 @@ mod tests { ); // Register the IP so it would be allowed if not for the private IP check - let mut allowed = HashSet::new(); - allowed.insert(IpAddr::V4(Ipv4Addr::LOCALHOST)); + let mut registered_ips = HashSet::new(); + registered_ips.insert(IpAddr::V4(Ipv4Addr::LOCALHOST)); updates_tx - .send(allowed) + .send(tracker::ListenerFilter { + registered_ips, + blocked_ips: HashMap::new(), + }) .await .expect("update registered ips"); diff --git a/p2p/src/authenticated/lookup/actors/tracker/actor.rs b/p2p/src/authenticated/lookup/actors/tracker/actor.rs index a1b89e0c5f..f243259d5b 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/actor.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/actor.rs @@ -1,7 +1,7 @@ use super::{ directory::{self, Directory}, ingress::{Message, Oracle}, - Config, + Config, ListenerFilter, }; use crate::authenticated::{ lookup::actors::{peer, tracker::ingress::Releaser}, @@ -16,10 +16,7 @@ use commonware_runtime::{ use commonware_utils::ordered::Set; use futures::{channel::mpsc, StreamExt}; use rand::Rng; -use std::{ - collections::{HashMap, HashSet}, - net::IpAddr, -}; +use std::collections::HashMap; use tracing::debug; /// The tracker actor that manages peer discovery and connection reservations. @@ -35,7 +32,7 @@ pub struct Actor { receiver: mpsc::UnboundedReceiver>, /// The mailbox for the listener. - listener: Mailbox>, + listener: Mailbox, // ---------- State ---------- /// Tracks peer sets and peer connectivity information. @@ -67,6 +64,7 @@ impl Actor { rate_limit: cfg.allowed_connection_rate_per_peer, allow_private_ips: cfg.allow_private_ips, allow_dns: cfg.allow_dns, + block_duration: cfg.block_duration, }; // Create the mailboxes @@ -132,8 +130,12 @@ impl Actor { } } - // Send the updated listenable IPs to the listener. - let _ = self.listener.send(self.directory.listenable()).await; + // Send the updated filter to the listener. + let filter = ListenerFilter { + registered_ips: self.directory.listenable(), + blocked_ips: self.directory.blocked_ips(), + }; + let _ = self.listener.send(filter).await; // Notify all subscribers about the new peer set self.subscribers.retain(|subscriber| { @@ -207,8 +209,12 @@ impl Actor { peer.kill().await; } - // Send the updated listenable IPs to the listener. - let _ = self.listener.send(self.directory.listenable()).await; + // Send the updated filter to the listener. + let filter = ListenerFilter { + registered_ips: self.directory.listenable(), + blocked_ips: self.directory.blocked_ips(), + }; + let _ = self.listener.send(filter).await; } Message::Release { metadata } => { // Clear the peer handle if it exists @@ -240,7 +246,7 @@ mod tests { }; // Test Configuration Setup - fn default_test_config(crypto: C) -> (Config, mpsc::Receiver>) { + fn default_test_config(crypto: C) -> (Config, mpsc::Receiver) { let (registered_ips_sender, registered_ips_receiver) = Mailbox::new(1); ( Config { @@ -249,6 +255,7 @@ mod tests { allowed_connection_rate_per_peer: Quota::per_second(NZU32!(5)), allow_private_ips: true, allow_dns: true, + block_duration: Duration::from_secs(60), listener: registered_ips_sender, }, registered_ips_receiver, @@ -599,10 +606,10 @@ mod tests { context.sleep(Duration::from_millis(10)).await; // Wait for a listener update - let registered_ips = listener_receiver.next().await.unwrap(); - assert!(registered_ips.contains(&my_addr.ip())); - assert!(registered_ips.contains(&addr_1.ip())); - assert!(!registered_ips.contains(&addr_2.ip())); + let filter = listener_receiver.next().await.unwrap(); + assert!(filter.registered_ips.contains(&my_addr.ip())); + assert!(filter.registered_ips.contains(&addr_1.ip())); + assert!(!filter.registered_ips.contains(&addr_2.ip())); // Mark peer as connected let reservation = mailbox.listen(pk_1.clone()).await; @@ -617,10 +624,10 @@ mod tests { .await; // Wait for a listener update - let registered_ips = listener_receiver.next().await.unwrap(); - assert!(!registered_ips.contains(&my_addr.ip())); - assert!(!registered_ips.contains(&addr_1.ip())); - assert!(registered_ips.contains(&addr_2.ip())); + let filter = listener_receiver.next().await.unwrap(); + assert!(!filter.registered_ips.contains(&my_addr.ip())); + assert!(!filter.registered_ips.contains(&addr_1.ip())); + assert!(filter.registered_ips.contains(&addr_2.ip())); // The first peer should be have received a kill message because its // peer set was removed because `tracked_peer_sets` is 1. diff --git a/p2p/src/authenticated/lookup/actors/tracker/directory.rs b/p2p/src/authenticated/lookup/actors/tracker/directory.rs index 9d78141c01..4e51805c67 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/directory.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/directory.rs @@ -17,6 +17,7 @@ use rand::Rng; use std::{ collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}, net::IpAddr, + time::{Duration, SystemTime}, }; use tracing::{debug, warn}; @@ -33,6 +34,9 @@ pub struct Config { /// The rate limit for allowing reservations per-peer. pub rate_limit: Quota, + + /// Duration after which blocked peers are automatically unblocked. + pub block_duration: Duration, } /// Represents a collection of records for all peers. @@ -47,6 +51,9 @@ pub struct Directory { /// Whether DNS-based ingress addresses are allowed. allow_dns: bool, + /// Duration after which blocked peers are automatically unblocked. + block_duration: Duration, + // ---------- State ---------- /// The records of all peers. peers: HashMap, @@ -57,6 +64,9 @@ pub struct Directory { /// Rate limiter for connection attempts. rate_limiter: KeyedRateLimiter, + /// Context for getting current time. + context: E, + // ---------- Message-Passing ---------- /// The releaser for the tracker actor. releaser: Releaser, @@ -76,16 +86,18 @@ impl Directory { // Other initialization. let rate_limiter = KeyedRateLimiter::hashmap_with_clock(cfg.rate_limit, context.clone()); - let metrics = Metrics::init(context); + let metrics = Metrics::init(context.clone()); let _ = metrics.tracked.try_set(peers.len() - 1); // Exclude self Self { max_sets: cfg.max_sets, allow_private_ips: cfg.allow_private_ips, allow_dns: cfg.allow_dns, + block_duration: cfg.block_duration, peers, sets: BTreeMap::new(), rate_limiter, + context, releaser, metrics, } @@ -132,11 +144,12 @@ impl Directory { } // Create and store new peer set (all peers are tracked regardless of address validity) + let now = self.context.current(); for (peer, addr) in &peers { let record = match self.peers.entry(peer.clone()) { Entry::Occupied(entry) => { let entry = entry.into_mut(); - entry.update(addr.clone()); + entry.update(now, addr.clone()); entry } Entry::Vacant(entry) => { @@ -196,7 +209,12 @@ impl Directory { /// Attempt to block a peer, updating the metrics accordingly. pub fn block(&mut self, peer: &C) { - if self.peers.get_mut(peer).is_some_and(|r| r.block()) { + let blocked_until = self.context.current() + self.block_duration; + if self + .peers + .get_mut(peer) + .is_some_and(|r| r.block(blocked_until)) + { self.metrics.blocked.inc(); } } @@ -215,11 +233,12 @@ impl Directory { /// Returns true if the peer is eligible for connection. /// - /// A peer is eligible if it is in a peer set, not blocked, and not ourselves. + /// A peer is eligible if it is in a peer set, not blocked (unless expired), and not ourselves. /// This does NOT check IP validity - that is done separately for dialing (ingress) /// and accepting (egress). pub fn eligible(&self, peer: &C) -> bool { - self.peers.get(peer).is_some_and(|r| r.eligible()) + let now = self.context.current(); + self.peers.get(peer).is_some_and(|r| r.eligible(now)) } /// Returns a vector of dialable peers. That is, unconnected peers for which we have a socket. @@ -239,25 +258,39 @@ impl Directory { /// /// Checks eligibility (peer set membership), egress IP match, and connection status. pub fn acceptable(&self, peer: &C, source_ip: IpAddr) -> bool { + let now = self.context.current(); self.peers .get(peer) - .is_some_and(|r| r.acceptable(source_ip)) + .is_some_and(|r| r.acceptable(now, source_ip)) } /// Return egress IPs we should listen for (accept incoming connections from). /// /// Only includes IPs from peers that are: - /// - Eligible (in a peer set, not blocked, not ourselves) + /// - Eligible (in a peer set, not blocked unless expired, not ourselves) /// - Have a valid egress IP (global, or private IPs are allowed) pub fn listenable(&self) -> HashSet { + let now = self.context.current(); self.peers .values() - .filter(|r| r.eligible()) + .filter(|r| r.eligible(now)) .filter_map(|r| r.egress_ip()) .filter(|ip| self.allow_private_ips || IpAddrExt::is_global(ip)) .collect() } + /// Return blocked peer IPs with their expiry times. + /// + /// Used by the listener to distinguish blocked peers from unknown peers. + pub fn blocked_ips(&self) -> HashMap { + let now = self.context.current(); + self.peers + .values() + .filter(|r| r.blocked(now)) + .filter_map(|r| r.egress_ip().zip(r.blocked_until())) + .collect() + } + // --------- Helpers ---------- /// Attempt to reserve a peer. @@ -287,7 +320,8 @@ impl Directory { } // Reserve - if record.reserve() { + let now = self.context.current(); + if record.reserve(now) { self.metrics.reserved.inc(); return Some(Reservation::new(metadata, self.releaser.clone())); } @@ -304,7 +338,8 @@ impl Directory { if !record.deletable() { return false; } - if record.blocked() { + let now = self.context.current(); + if record.blocked(now) { self.metrics.blocked.dec(); } self.peers.remove(peer); @@ -321,9 +356,12 @@ mod tests { Ingress, }; use commonware_cryptography::{ed25519, Signer}; - use commonware_runtime::{deterministic, Quota, Runner}; + use commonware_runtime::{deterministic, Clock, Quota, Runner}; use commonware_utils::{hostname, NZU32}; - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + time::Duration, + }; fn addr(socket: SocketAddr) -> Address { Address::Symmetric(socket) @@ -340,6 +378,7 @@ mod tests { allow_dns: true, max_sets: 1, rate_limit: Quota::per_second(NZU32!(10)), + block_duration: Duration::from_secs(3600), }; let pk_1 = ed25519::PrivateKey::from_seed(1).public_key(); @@ -401,6 +440,7 @@ mod tests { allow_dns: true, max_sets: 3, rate_limit: Quota::per_second(NZU32!(10)), + block_duration: Duration::from_secs(3600), }; let pk_1 = ed25519::PrivateKey::from_seed(1).public_key(); @@ -488,6 +528,7 @@ mod tests { allow_dns: true, max_sets: 3, rate_limit: Quota::per_second(NZU32!(10)), + block_duration: Duration::from_secs(3600), }; let pk_1 = ed25519::PrivateKey::from_seed(1).public_key(); @@ -495,13 +536,14 @@ mod tests { let addr_2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 2235); runtime.start(|context| async move { - let mut directory = Directory::init(context, my_pk.clone(), config, releaser); + let mut directory = Directory::init(context.clone(), my_pk.clone(), config, releaser); + let now = context.current(); directory.add_set(0, [(pk_1.clone(), addr(addr_1))].try_into().unwrap()); directory.block(&pk_1); let record = directory.peers.get(&pk_1).unwrap(); assert!( - record.blocked(), + record.blocked(now), "Peer should be blocked after call to block" ); assert!( @@ -512,7 +554,7 @@ mod tests { directory.add_set(1, [(pk_1.clone(), addr(addr_2))].try_into().unwrap()); let record = directory.peers.get(&pk_1).unwrap(); assert!( - record.blocked(), + record.blocked(now), "Blocked peer should remain blocked after update" ); assert!( @@ -533,6 +575,7 @@ mod tests { allow_dns: true, max_sets: 3, rate_limit: Quota::per_second(NZU32!(10)), + block_duration: Duration::from_secs(3600), }; // Create asymmetric address where ingress differs from egress @@ -631,6 +674,7 @@ mod tests { allow_dns: false, max_sets: 3, rate_limit: Quota::per_second(NZU32!(10)), + block_duration: Duration::from_secs(3600), }; // Create peers with different address types @@ -695,6 +739,7 @@ mod tests { allow_dns: true, max_sets: 3, rate_limit: Quota::per_second(NZU32!(10)), + block_duration: Duration::from_secs(3600), }; // Create peer with public egress IP diff --git a/p2p/src/authenticated/lookup/actors/tracker/mod.rs b/p2p/src/authenticated/lookup/actors/tracker/mod.rs index 36c2c58254..7251fe5d4c 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/mod.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/mod.rs @@ -3,7 +3,11 @@ use crate::authenticated::Mailbox; use commonware_cryptography::Signer; use commonware_runtime::Quota; -use std::{collections::HashSet, net::IpAddr}; +use std::{ + collections::HashSet, + net::IpAddr, + time::{Duration, SystemTime}, +}; pub mod actor; mod directory; @@ -18,6 +22,15 @@ pub use ingress::{Message, Oracle}; pub use metadata::Metadata; pub use reservation::Reservation; +/// Message containing listener filter information. +#[derive(Clone, Debug)] +pub struct ListenerFilter { + /// IPs of eligible peers we should accept connections from. + pub registered_ips: HashSet, + /// IPs of blocked peers with their expiry times. + pub blocked_ips: std::collections::HashMap, +} + #[derive(Clone, Debug)] pub struct Config { pub crypto: C, @@ -25,5 +38,6 @@ pub struct Config { pub allowed_connection_rate_per_peer: Quota, pub allow_private_ips: bool, pub allow_dns: bool, - pub listener: Mailbox>, + pub block_duration: Duration, + pub listener: Mailbox, } diff --git a/p2p/src/authenticated/lookup/actors/tracker/record.rs b/p2p/src/authenticated/lookup/actors/tracker/record.rs index 418f4133e4..19f93f84e2 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/record.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/record.rs @@ -1,5 +1,5 @@ use crate::types::{self, Ingress}; -use std::net::IpAddr; +use std::{net::IpAddr, time::SystemTime}; /// Represents information known about a peer's address. #[derive(Clone, Debug)] @@ -10,9 +10,8 @@ pub enum Address { /// Address is provided when peer is registered. Known(types::Address), - /// Peer is blocked. - /// We don't care to track its information. - Blocked, + /// Peer is blocked until the given time. + Blocked(SystemTime), } /// Represents the connection status of a peer. @@ -73,22 +72,27 @@ impl Record { // ---------- Setters ---------- /// Update the record with a new address. - pub fn update(&mut self, addr: types::Address) { - if matches!(self.address, Address::Myself | Address::Blocked) { - return; + /// + /// Updates are allowed for non-blocked records, or for blocked records whose + /// block has expired. + pub fn update(&mut self, now: SystemTime, addr: types::Address) { + match &self.address { + Address::Myself => return, + Address::Blocked(until) if now < *until => return, + _ => {} } self.address = Address::Known(addr); } - /// Attempt to mark the peer as blocked. + /// Attempt to mark the peer as blocked until the given time. /// /// Returns `true` if the peer was newly blocked. /// Returns `false` if the peer was already blocked or is the local node (unblockable). - pub fn block(&mut self) -> bool { - if matches!(self.address, Address::Blocked | Address::Myself) { + pub fn block(&mut self, blocked_until: SystemTime) -> bool { + if matches!(self.address, Address::Blocked(_) | Address::Myself) { return false; } - self.address = Address::Blocked; + self.address = Address::Blocked(blocked_until); self.persistent = false; true } @@ -110,10 +114,15 @@ impl Record { /// Attempt to reserve the peer for connection. /// /// Returns `true` if the reservation was successful, `false` otherwise. - pub const fn reserve(&mut self) -> bool { - if matches!(self.address, Address::Blocked | Address::Myself) { + pub fn reserve(&mut self, now: SystemTime) -> bool { + if matches!(self.address, Address::Myself) { return false; } + if let Address::Blocked(until) = self.address { + if now < until { + return false; + } + } if matches!(self.status, Status::Inert) { self.status = Status::Reserved; return true; @@ -137,9 +146,17 @@ impl Record { // ---------- Getters ---------- - /// Returns `true` if the record is blocked. - pub const fn blocked(&self) -> bool { - matches!(self.address, Address::Blocked) + /// Returns `true` if the record is currently blocked (block has not expired). + pub fn blocked(&self, now: SystemTime) -> bool { + matches!(self.address, Address::Blocked(until) if now < until) + } + + /// Returns the time until which this peer is blocked, if blocked. + pub const fn blocked_until(&self) -> Option { + match self.address { + Address::Blocked(until) => Some(until), + _ => None, + } } /// Returns the number of peer sets this peer is part of. @@ -154,6 +171,10 @@ impl Record { /// - It is not ourselves or blocked /// - We are not already connected or reserved /// - The ingress address is allowed (DNS enabled, Socket IP is global or private IPs allowed) + /// + /// Note: Blocked peers are never dialable regardless of expiry since we don't retain + /// their address info. Once a block expires and the peer is re-added to a peer set, + /// they will become dialable again via the normal update path. pub fn dialable(&self, allow_private_ips: bool, allow_dns: bool) -> bool { if self.status != Status::Inert { return false; @@ -168,11 +189,11 @@ impl Record { /// Returns `true` if this peer is acceptable (can accept an incoming connection from them). /// /// A peer is acceptable if: - /// - The peer is eligible (in a peer set, not blocked, not ourselves) + /// - The peer is eligible (in a peer set, not blocked unless expired, not ourselves) /// - The source IP matches the expected egress IP for this peer /// - We are not already connected or reserved - pub fn acceptable(&self, source_ip: IpAddr) -> bool { - if !self.eligible() || self.status != Status::Inert { + pub fn acceptable(&self, now: SystemTime, source_ip: IpAddr) -> bool { + if !self.eligible(now) || self.status != Status::Inert { return false; } match &self.address { @@ -186,7 +207,7 @@ impl Record { match &self.address { Address::Myself => None, Address::Known(addr) => Some(addr.ingress()), - Address::Blocked => None, + Address::Blocked(_) => None, } } @@ -195,7 +216,7 @@ impl Record { match &self.address { Address::Myself => None, Address::Known(addr) => Some(addr.egress_ip()), - Address::Blocked => None, + Address::Blocked(_) => None, } } @@ -213,15 +234,23 @@ impl Record { /// Returns `true` if this peer is eligible for connection. /// /// A peer is eligible if: - /// - It is not blocked or ourselves + /// - It is not blocked (unless the block has expired) or ourselves /// - It is part of at least one peer set (or is persistent) /// /// This is the base check for reserving a connection. IP validity is checked /// separately: ingress validity for dialing (in `dialable()`), egress validity /// for the IP filter (in `Directory::eligible_egress_ips()`). - pub const fn eligible(&self) -> bool { + pub fn eligible(&self, now: SystemTime) -> bool { match &self.address { - Address::Blocked | Address::Myself => false, + Address::Blocked(until) => { + // Blocked but expired - treat as eligible if in a peer set + if now >= *until { + self.sets > 0 || self.persistent + } else { + false + } + } + Address::Myself => false, Address::Known(_) => self.sets > 0 || self.persistent, } } @@ -229,7 +258,7 @@ impl Record { #[cfg(test)] mod tests { use super::*; - use std::net::SocketAddr; + use std::{net::SocketAddr, time::Duration}; fn test_socket() -> SocketAddr { SocketAddr::from(([54, 12, 1, 9], 8080)) @@ -239,6 +268,14 @@ mod tests { types::Address::Symmetric(test_socket()) } + fn now() -> SystemTime { + SystemTime::UNIX_EPOCH + Duration::from_secs(1000) + } + + fn future_time() -> SystemTime { + now() + Duration::from_secs(3600) + } + #[test] fn test_myself_initial_state() { let record = Record::myself(); @@ -247,17 +284,17 @@ mod tests { assert_eq!(record.sets, 0); assert!(record.persistent); assert!(record.ingress().is_none()); - assert!(!record.blocked()); + assert!(!record.blocked(now())); assert!(!record.reserved()); assert!(!record.deletable()); - assert!(!record.eligible()); + assert!(!record.eligible(now())); } #[test] fn test_myself_blocked_to_known() { let mut record = Record::myself(); - record.block(); - assert!(!record.blocked(), "Can't block myself"); + record.block(future_time()); + assert!(!record.blocked(now()), "Can't block myself"); } #[test] @@ -289,36 +326,36 @@ mod tests { fn test_block_behavior_and_persistence() { let mut record_known = Record::known(test_address()); assert!(!record_known.persistent); - assert!(record_known.block()); - assert!(record_known.blocked()); - assert!(matches!(record_known.address, Address::Blocked)); + assert!(record_known.block(future_time())); + assert!(record_known.blocked(now())); + assert!(matches!(record_known.address, Address::Blocked(_))); assert!(!record_known.persistent); let mut record_reserved = Record::known(test_address()); - assert!(record_reserved.reserve()); - assert!(record_reserved.block()); + assert!(record_reserved.reserve(now())); + assert!(record_reserved.block(future_time())); assert_eq!(record_reserved.status, Status::Reserved); let mut record_active = Record::known(test_address()); - assert!(record_active.reserve()); + assert!(record_active.reserve(now())); record_active.connect(); - assert!(record_active.block()); + assert!(record_active.block(future_time())); assert_eq!(record_active.status, Status::Active); } #[test] fn test_block_myself_and_already_blocked() { let mut record_myself = Record::myself(); - assert!(!record_myself.block(), "Cannot block myself"); + assert!(!record_myself.block(future_time()), "Cannot block myself"); assert!(matches!(&record_myself.address, Address::Myself)); let mut record_to_be_blocked = Record::known(test_address()); - assert!(record_to_be_blocked.block()); + assert!(record_to_be_blocked.block(future_time())); assert!( - !record_to_be_blocked.block(), + !record_to_be_blocked.block(future_time()), "Cannot block already blocked peer" ); - assert!(matches!(record_to_be_blocked.address, Address::Blocked)); + assert!(matches!(record_to_be_blocked.address, Address::Blocked(_))); } #[test] @@ -326,25 +363,25 @@ mod tests { let mut record = Record::known(test_address()); assert_eq!(record.status, Status::Inert); - assert!(record.reserve()); + assert!(record.reserve(now())); assert_eq!(record.status, Status::Reserved); assert!(record.reserved()); - assert!(!record.reserve(), "Cannot re-reserve when Reserved"); + assert!(!record.reserve(now()), "Cannot re-reserve when Reserved"); assert_eq!(record.status, Status::Reserved); record.connect(); assert_eq!(record.status, Status::Active); assert!(record.reserved()); - assert!(!record.reserve(), "Cannot reserve when Active"); + assert!(!record.reserve(now()), "Cannot reserve when Active"); assert_eq!(record.status, Status::Active); record.release(); assert_eq!(record.status, Status::Inert); assert!(!record.reserved()); - assert!(record.reserve()); + assert!(record.reserve(now())); assert_eq!(record.status, Status::Reserved); record.release(); assert_eq!(record.status, Status::Inert); @@ -361,7 +398,7 @@ mod tests { #[should_panic] fn test_connect_when_active_panics() { let mut record = Record::known(test_address()); - assert!(record.reserve()); + assert!(record.reserve(now())); record.connect(); record.connect(); } @@ -377,7 +414,7 @@ mod tests { fn test_reserved_status_check() { let mut record = Record::known(test_address()); assert!(!record.reserved()); - assert!(record.reserve()); + assert!(record.reserve(now())); assert!(record.reserved()); record.connect(); assert!(record.reserved()); @@ -397,7 +434,7 @@ mod tests { record.increment(); assert!(!record.deletable()); - assert!(record.reserve()); + assert!(record.reserve(now())); assert!(!record.deletable()); record.connect(); @@ -412,19 +449,40 @@ mod tests { #[test] fn test_eligible_logic() { - // Blocked and Myself are never eligible + // Blocked and Myself are never eligible (while block is active) let mut record_blocked = Record::known(test_address()); - record_blocked.block(); - assert!(!record_blocked.eligible()); - assert!(!Record::myself().eligible()); + record_blocked.block(future_time()); + assert!(!record_blocked.eligible(now())); + assert!(!Record::myself().eligible(now())); // Known records are only eligible when in a peer set let mut record_known = Record::known(test_address()); - assert!(!record_known.eligible(), "Not eligible when sets=0"); + assert!(!record_known.eligible(now()), "Not eligible when sets=0"); record_known.increment(); - assert!(record_known.eligible(), "Eligible when sets>0"); + assert!(record_known.eligible(now()), "Eligible when sets>0"); record_known.decrement(); - assert!(!record_known.eligible(), "Not eligible when sets=0 again"); + assert!( + !record_known.eligible(now()), + "Not eligible when sets=0 again" + ); + } + + #[test] + fn test_block_expiry() { + let mut record = Record::known(test_address()); + record.increment(); + + // Block the peer + assert!(record.block(future_time())); + + // Currently blocked + assert!(record.blocked(now())); + assert!(!record.eligible(now())); + + // After expiry time, no longer blocked + let after_expiry = future_time() + Duration::from_secs(1); + assert!(!record.blocked(after_expiry)); + assert!(record.eligible(after_expiry)); } #[test] @@ -439,48 +497,48 @@ mod tests { let mut record = Record::known(types::Address::Symmetric(public_socket)); record.increment(); assert!( - record.acceptable(egress_ip), + record.acceptable(now(), egress_ip), "Eligible, Inert, correct IP is acceptable" ); // Correct everything but wrong IP - not acceptable assert!( - !record.acceptable(wrong_ip), + !record.acceptable(now(), wrong_ip), "Not acceptable when IP doesn't match" ); // Not eligible (sets=0) - not acceptable let record_not_eligible = Record::known(types::Address::Symmetric(public_socket)); assert!( - !record_not_eligible.acceptable(egress_ip), + !record_not_eligible.acceptable(now(), egress_ip), "Not acceptable when not eligible" ); // Already reserved - not acceptable let mut record_reserved = Record::known(types::Address::Symmetric(public_socket)); record_reserved.increment(); - record_reserved.reserve(); + record_reserved.reserve(now()); assert!( - !record_reserved.acceptable(egress_ip), + !record_reserved.acceptable(now(), egress_ip), "Not acceptable when reserved" ); // Already connected - not acceptable let mut record_connected = Record::known(types::Address::Symmetric(public_socket)); record_connected.increment(); - record_connected.reserve(); + record_connected.reserve(now()); record_connected.connect(); assert!( - !record_connected.acceptable(egress_ip), + !record_connected.acceptable(now(), egress_ip), "Not acceptable when connected" ); // Blocked - not acceptable let mut record_blocked = Record::known(types::Address::Symmetric(public_socket)); record_blocked.increment(); - record_blocked.block(); + record_blocked.block(future_time()); assert!( - !record_blocked.acceptable(egress_ip), + !record_blocked.acceptable(now(), egress_ip), "Not acceptable when blocked" ); } diff --git a/p2p/src/authenticated/lookup/config.rs b/p2p/src/authenticated/lookup/config.rs index cb52424e12..2fca45ae90 100644 --- a/p2p/src/authenticated/lookup/config.rs +++ b/p2p/src/authenticated/lookup/config.rs @@ -96,6 +96,12 @@ pub struct Config { /// set (if we, for example, are trying to do a reshare of a threshold /// key). pub tracked_peer_sets: usize, + + /// Duration after which blocked peers are automatically unblocked. + /// + /// This allows recovery from temporary issues like misconfigured nodes + /// without requiring restarts across all peers. + pub block_duration: Duration, } impl Config { @@ -127,6 +133,7 @@ impl Config { dial_frequency: Duration::from_secs(1), query_frequency: Duration::from_secs(60), tracked_peer_sets: 4, + block_duration: Duration::from_secs(60 * 60), // 1 hour } } @@ -158,6 +165,7 @@ impl Config { dial_frequency: Duration::from_millis(500), query_frequency: Duration::from_secs(30), tracked_peer_sets: 4, + block_duration: Duration::from_secs(60 * 60), // 1 hour } } @@ -184,6 +192,7 @@ impl Config { dial_frequency: Duration::from_millis(200), query_frequency: Duration::from_secs(5), tracked_peer_sets: 4, + block_duration: Duration::from_secs(60), // 1 minute for tests } } } diff --git a/p2p/src/authenticated/lookup/network.rs b/p2p/src/authenticated/lookup/network.rs index d50aeef2dd..f9f89bc974 100644 --- a/p2p/src/authenticated/lookup/network.rs +++ b/p2p/src/authenticated/lookup/network.rs @@ -19,7 +19,6 @@ use commonware_stream::Config as StreamConfig; use commonware_utils::union; use futures::channel::mpsc; use rand::{CryptoRng, Rng}; -use std::{collections::HashSet, net::IpAddr}; use tracing::{debug, info}; /// Unique suffix for all messages signed in a stream. @@ -35,7 +34,7 @@ pub struct Network>, router: router::Actor, router_mailbox: Mailbox>, - listener: mpsc::Receiver>, + listener: mpsc::Receiver, } impl @@ -52,7 +51,8 @@ impl) -> (Self, tracker::Oracle) { - let (listener_mailbox, listener) = Mailbox::>::new(cfg.mailbox_size); + let (listener_mailbox, listener) = + Mailbox::::new(cfg.mailbox_size); let (tracker, tracker_mailbox, oracle) = tracker::Actor::new( context.with_label("tracker"), tracker::Config { @@ -61,6 +61,7 @@ impl Date: Tue, 30 Dec 2025 10:27:23 -0800 Subject: [PATCH 2/6] nit --- .../discovery/actors/listener.rs | 36 ++++++++----------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/p2p/src/authenticated/discovery/actors/listener.rs b/p2p/src/authenticated/discovery/actors/listener.rs index d9e4537726..8fef5e389e 100644 --- a/p2p/src/authenticated/discovery/actors/listener.rs +++ b/p2p/src/authenticated/discovery/actors/listener.rs @@ -5,6 +5,7 @@ use crate::authenticated::{ mailbox::UnboundedMailbox, Mailbox, }; +use commonware_codec::ReadExt; use commonware_cryptography::Signer; use commonware_macros::select_loop; use commonware_runtime::{ @@ -103,22 +104,11 @@ impl Actor< mut tracker: UnboundedMailbox>, mut supervisor: Mailbox, StreamOf, C::PublicKey>>, ) { - // Track the rejection reason from the bouncer - let rejection_reason = std::sync::Arc::new(std::sync::Mutex::new(None)); - let rejection_reason_clone = rejection_reason.clone(); - let (peer, send, recv) = match listen( context, |peer| { let mut tracker = tracker.clone(); - let rejection_reason = rejection_reason_clone.clone(); - async move { - let result = tracker.acceptable(peer).await; - if result != tracker::Acceptable::Yes { - *rejection_reason.lock().unwrap() = Some(result); - } - result == tracker::Acceptable::Yes - } + async move { tracker.acceptable(peer).await == tracker::Acceptable::Yes } }, stream_cfg, stream, @@ -127,17 +117,19 @@ impl Actor< .await { Ok(x) => x, - Err(StreamError::PeerRejected(_)) => { - // The bouncer returned false - check the captured reason - let reason = rejection_reason.lock().unwrap().take(); - match reason { - Some(tracker::Acceptable::Blocked) => { - debug!(?address, "peer is blocked"); - } - Some(tracker::Acceptable::Unknown) | None => { - debug!(?address, "peer not acceptable (unknown or not in peer set)"); + Err(StreamError::PeerRejected(bytes)) => { + // Decode the peer public key and query for rejection reason + if let Ok(peer) = C::PublicKey::read(&mut bytes.as_slice()) { + match tracker.acceptable(peer).await { + tracker::Acceptable::Blocked => { + debug!(?address, "peer is blocked"); + } + tracker::Acceptable::Unknown | tracker::Acceptable::Yes => { + debug!(?address, "peer not acceptable (unknown or not in peer set)"); + } } - Some(tracker::Acceptable::Yes) => unreachable!(), + } else { + debug!(?address, "peer rejected (unable to decode peer key)"); } return; } From 547e27ec59aa21617c3830af27ef698460dab2b7 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Thu, 1 Jan 2026 14:44:42 -0800 Subject: [PATCH 3/6] nits --- examples/bridge/src/bin/indexer.rs | 10 ++- .../discovery/actors/listener.rs | 29 ++++---- .../discovery/actors/peer/actor.rs | 6 +- .../discovery/actors/tracker/directory.rs | 12 ++-- .../discovery/actors/tracker/record.rs | 21 ++++-- .../authenticated/lookup/actors/listener.rs | 19 ++++- .../lookup/actors/tracker/directory.rs | 5 +- .../lookup/actors/tracker/record.rs | 21 ++++-- stream/fuzz/fuzz_targets/connection.rs | 2 +- stream/fuzz/fuzz_targets/e2e.rs | 2 +- stream/fuzz/fuzz_targets/lazy_transport.rs | 2 +- stream/fuzz/fuzz_targets/transport.rs | 2 +- stream/src/lib.rs | 72 ++++++++++++++----- 13 files changed, 143 insertions(+), 60 deletions(-) diff --git a/examples/bridge/src/bin/indexer.rs b/examples/bridge/src/bin/indexer.rs index 7775b21812..0073574837 100644 --- a/examples/bridge/src/bin/indexer.rs +++ b/examples/bridge/src/bin/indexer.rs @@ -241,8 +241,14 @@ fn main() { let (peer, mut sender, mut receiver) = match listen( context.with_label("listener"), |peer| { - let out = validators.position(&peer).is_some(); - async move { out } + let valid = validators.position(&peer).is_some(); + async move { + if valid { + Ok(()) + } else { + Err(()) + } + } }, config.clone(), stream, diff --git a/p2p/src/authenticated/discovery/actors/listener.rs b/p2p/src/authenticated/discovery/actors/listener.rs index 8fef5e389e..9b577b5319 100644 --- a/p2p/src/authenticated/discovery/actors/listener.rs +++ b/p2p/src/authenticated/discovery/actors/listener.rs @@ -5,7 +5,6 @@ use crate::authenticated::{ mailbox::UnboundedMailbox, Mailbox, }; -use commonware_codec::ReadExt; use commonware_cryptography::Signer; use commonware_macros::select_loop; use commonware_runtime::{ @@ -108,7 +107,14 @@ impl Actor< context, |peer| { let mut tracker = tracker.clone(); - async move { tracker.acceptable(peer).await == tracker::Acceptable::Yes } + async move { + let status = tracker.acceptable(peer).await; + if status == tracker::Acceptable::Yes { + Ok(()) + } else { + Err(status) + } + } }, stream_cfg, stream, @@ -117,19 +123,14 @@ impl Actor< .await { Ok(x) => x, - Err(StreamError::PeerRejected(bytes)) => { - // Decode the peer public key and query for rejection reason - if let Ok(peer) = C::PublicKey::read(&mut bytes.as_slice()) { - match tracker.acceptable(peer).await { - tracker::Acceptable::Blocked => { - debug!(?address, "peer is blocked"); - } - tracker::Acceptable::Unknown | tracker::Acceptable::Yes => { - debug!(?address, "peer not acceptable (unknown or not in peer set)"); - } + Err(StreamError::PeerRejected(reason)) => { + match reason { + tracker::Acceptable::Blocked => { + debug!(?address, "peer is blocked"); + } + tracker::Acceptable::Unknown | tracker::Acceptable::Yes => { + debug!(?address, "peer not acceptable (unknown or not in peer set)"); } - } else { - debug!(?address, "peer rejected (unable to decode peer key)"); } return; } diff --git a/p2p/src/authenticated/discovery/actors/peer/actor.rs b/p2p/src/authenticated/discovery/actors/peer/actor.rs index 78b781025c..9c9d1cb226 100644 --- a/p2p/src/authenticated/discovery/actors/peer/actor.rs +++ b/p2p/src/authenticated/discovery/actors/peer/actor.rs @@ -427,7 +427,7 @@ mod tests { move |ctx| async move { commonware_stream::listen( ctx, - |_| async { true }, + |_| async { Ok::<(), ()>(()) }, remote_config, remote_stream, remote_sink, @@ -526,7 +526,7 @@ mod tests { move |ctx| async move { commonware_stream::listen( ctx, - |_| async { true }, + |_| async { Ok::<(), ()>(()) }, remote_config, remote_stream, remote_sink, @@ -631,7 +631,7 @@ mod tests { move |ctx| async move { commonware_stream::listen( ctx, - |_| async { true }, + |_| async { Ok::<(), ()>(()) }, remote_config, remote_stream, remote_sink, diff --git a/p2p/src/authenticated/discovery/actors/tracker/directory.rs b/p2p/src/authenticated/discovery/actors/tracker/directory.rs index 24ab2372fe..daafa2106e 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/directory.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/directory.rs @@ -1,4 +1,6 @@ -use super::{metrics::Metrics, record::Record, set::Set, Metadata, Reservation}; +use super::{ + ingress::Acceptable, metrics::Metrics, record::Record, set::Set, Metadata, Reservation, +}; use crate::{ authenticated::discovery::{ actors::tracker::ingress::Releaser, @@ -283,7 +285,10 @@ impl Directory { /// Attempt to block a peer, updating the metrics accordingly. pub fn block(&mut self, peer: &C) { - let blocked_until = self.context.current() + self.block_duration; + let now = self.context.current(); + let blocked_until = now + .checked_add(self.block_duration) + .unwrap_or(now + Duration::from_secs(365 * 24 * 60 * 60 * 100)); // ~100 years if self .peers .get_mut(peer) @@ -373,8 +378,7 @@ impl Directory { } /// Returns the acceptance status for a peer. - pub fn acceptable(&self, peer: &C) -> super::ingress::Acceptable { - use super::ingress::Acceptable; + pub fn acceptable(&self, peer: &C) -> Acceptable { let now = self.context.current(); let Some(record) = self.peers.get(peer) else { return Acceptable::Unknown; diff --git a/p2p/src/authenticated/discovery/actors/tracker/record.rs b/p2p/src/authenticated/discovery/actors/tracker/record.rs index 8ac8006c69..835b4d91ac 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/record.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/record.rs @@ -127,15 +127,24 @@ impl Record { /// Attempt to mark the peer as blocked until the specified time. /// - /// Returns `true` if the peer was newly blocked. + /// If the peer is already blocked, extends the block to the later of the two times. + /// Returns `true` if the peer was newly blocked (not already blocked). /// Returns `false` if the peer was already blocked or is the local node (unblockable). pub fn block(&mut self, blocked_until: SystemTime) -> bool { - if matches!(self.address, Address::Blocked(_) | Address::Myself(_)) { - return false; + match &self.address { + Address::Myself(_) => false, + Address::Blocked(existing_until) => { + if blocked_until > *existing_until { + self.address = Address::Blocked(blocked_until); + } + false + } + Address::Unknown | Address::Bootstrapper(_) | Address::Discovered(_, _) => { + self.address = Address::Blocked(blocked_until); + self.persistent = false; + true + } } - self.address = Address::Blocked(blocked_until); - self.persistent = false; - true } /// Increase the count of peer sets this peer is part of. diff --git a/p2p/src/authenticated/lookup/actors/listener.rs b/p2p/src/authenticated/lookup/actors/listener.rs index 0b652a0c62..e9540c42aa 100644 --- a/p2p/src/authenticated/lookup/actors/listener.rs +++ b/p2p/src/authenticated/lookup/actors/listener.rs @@ -11,7 +11,7 @@ use commonware_runtime::{ spawn_cell, Clock, ContextCell, Handle, KeyedRateLimiter, Listener, Metrics, Network, Quota, SinkOf, Spawner, StreamOf, }; -use commonware_stream::{listen, Config as StreamConfig}; +use commonware_stream::{listen, Config as StreamConfig, Error as StreamError}; use commonware_utils::{concurrency::Limiter, net::SubnetMask, IpAddrExt}; use futures::{channel::mpsc, StreamExt}; use prometheus_client::metrics::counter::Counter; @@ -126,7 +126,16 @@ impl Actor< let source_ip = address.ip(); let (peer, send, recv) = match listen( context, - |peer| tracker.acceptable(peer, source_ip), + |peer| { + let fut = tracker.acceptable(peer, source_ip); + async move { + if fut.await { + Ok(()) + } else { + Err(()) + } + } + }, stream_cfg, stream, sink, @@ -134,8 +143,12 @@ impl Actor< .await { Ok(connection) => connection, + Err(StreamError::PeerRejected(())) => { + debug!(?address, "peer not acceptable (unknown or not in peer set)"); + return; + } Err(err) => { - debug!(?err, ?address, "failed to upgrade connection"); + debug!(?err, "failed to complete handshake"); return; } }; diff --git a/p2p/src/authenticated/lookup/actors/tracker/directory.rs b/p2p/src/authenticated/lookup/actors/tracker/directory.rs index 27999a2d91..655dbee62c 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/directory.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/directory.rs @@ -220,7 +220,10 @@ impl Directory { /// Attempt to block a peer, updating the metrics accordingly. pub fn block(&mut self, peer: &C) { - let blocked_until = self.context.current() + self.block_duration; + let now = self.context.current(); + let blocked_until = now + .checked_add(self.block_duration) + .unwrap_or(now + Duration::from_secs(365 * 24 * 60 * 60 * 100)); // ~100 years if self .peers .get_mut(peer) diff --git a/p2p/src/authenticated/lookup/actors/tracker/record.rs b/p2p/src/authenticated/lookup/actors/tracker/record.rs index 1ccf37a65b..65f9474c80 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/record.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/record.rs @@ -86,15 +86,24 @@ impl Record { /// Attempt to mark the peer as blocked until the given time. /// - /// Returns `true` if the peer was newly blocked. + /// If the peer is already blocked, extends the block to the later of the two times. + /// Returns `true` if the peer was newly blocked (not already blocked). /// Returns `false` if the peer was already blocked or is the local node (unblockable). pub fn block(&mut self, blocked_until: SystemTime) -> bool { - if matches!(self.address, Address::Blocked(_) | Address::Myself) { - return false; + match &self.address { + Address::Myself => false, + Address::Blocked(existing_until) => { + if blocked_until > *existing_until { + self.address = Address::Blocked(blocked_until); + } + false + } + Address::Known(_) => { + self.address = Address::Blocked(blocked_until); + self.persistent = false; + true + } } - self.address = Address::Blocked(blocked_until); - self.persistent = false; - true } /// Increase the count of peer sets this peer is part of. diff --git a/stream/fuzz/fuzz_targets/connection.rs b/stream/fuzz/fuzz_targets/connection.rs index c0ed37fa03..5af8372c7c 100644 --- a/stream/fuzz/fuzz_targets/connection.rs +++ b/stream/fuzz/fuzz_targets/connection.rs @@ -114,7 +114,7 @@ fn fuzz(input: FuzzInput) { move |context| async move { listen( context, - |_| async { true }, + |_| async { Ok::<(), ()>(()) }, listener_config, listener_stream, listener_sink, diff --git a/stream/fuzz/fuzz_targets/e2e.rs b/stream/fuzz/fuzz_targets/e2e.rs index 14ea8252dd..584d04a4bb 100644 --- a/stream/fuzz/fuzz_targets/e2e.rs +++ b/stream/fuzz/fuzz_targets/e2e.rs @@ -124,7 +124,7 @@ fn fuzz(input: FuzzInput) { let listener_handle = context.clone().spawn(move |context| async move { listen( context, - |_| async { true }, + |_| async { Ok::<(), ()>(()) }, listener_config, listener_stream, listener_sink, diff --git a/stream/fuzz/fuzz_targets/lazy_transport.rs b/stream/fuzz/fuzz_targets/lazy_transport.rs index 682b69cf09..6663819311 100644 --- a/stream/fuzz/fuzz_targets/lazy_transport.rs +++ b/stream/fuzz/fuzz_targets/lazy_transport.rs @@ -48,7 +48,7 @@ thread_local! { let listener_handle = context.clone().spawn(move |context| async move { listen( context, - |_| async { true }, + |_| async { Ok::<(), ()>(()) }, listener_config, listener_stream, listener_sink, diff --git a/stream/fuzz/fuzz_targets/transport.rs b/stream/fuzz/fuzz_targets/transport.rs index c9e4704953..eb344c6fba 100644 --- a/stream/fuzz/fuzz_targets/transport.rs +++ b/stream/fuzz/fuzz_targets/transport.rs @@ -39,7 +39,7 @@ fn fuzz(data: &[u8]) { let listener_handle = context.clone().spawn(move |context| async move { listen( context, - |_| async { true }, + |_| async { Ok::<(), ()>(()) }, listener_config, listener_stream, listener_sink, diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 0830d5687d..0b934d6cb5 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -75,7 +75,7 @@ use commonware_cryptography::{ }; use commonware_macros::select; use commonware_runtime::{Clock, Error as RuntimeError, Sink, Stream}; -use commonware_utils::{hex, SystemTimeExt}; +use commonware_utils::SystemTimeExt; use rand_core::CryptoRngCore; use std::{future::Future, ops::Range, time::Duration}; use thiserror::Error; @@ -86,14 +86,17 @@ const CIPHERTEXT_OVERHEAD: u32 = { }; /// Errors that can occur when interacting with a stream. +/// +/// The `R` type parameter represents the rejection reason when a peer is rejected +/// by the bouncer during the handshake. Use `()` if no rejection information is needed. #[derive(Error, Debug)] -pub enum Error { +pub enum Error { #[error("handshake error: {0}")] HandshakeError(HandshakeError), #[error("unable to decode: {0}")] UnableToDecode(CodecError), - #[error("peer rejected: {}", hex(_0))] - PeerRejected(Vec), + #[error("peer rejected")] + PeerRejected(R), #[error("recv failed")] RecvFailed(RuntimeError), #[error("recv too large: {0} bytes")] @@ -112,18 +115,40 @@ pub enum Error { HandshakeTimeout, } -impl From for Error { +impl From for Error { fn from(value: CodecError) -> Self { Self::UnableToDecode(value) } } -impl From for Error { +impl From for Error { fn from(value: HandshakeError) -> Self { Self::HandshakeError(value) } } +impl Error { + /// Convert an error from the default `Error<()>` to a specific `Error`. + /// + /// This is useful when helper functions return `Error<()>` but the caller + /// needs `Error`. Panics if called on `PeerRejected`. + fn from_unit(value: Error<()>) -> Self { + match value { + Error::HandshakeError(e) => Self::HandshakeError(e), + Error::UnableToDecode(e) => Self::UnableToDecode(e), + Error::PeerRejected(()) => unreachable!("PeerRejected(()) should not be converted"), + Error::RecvFailed(e) => Self::RecvFailed(e), + Error::RecvTooLarge(s) => Self::RecvTooLarge(s), + Error::InvalidVarint => Self::InvalidVarint, + Error::SendFailed(e) => Self::SendFailed(e), + Error::SendZeroSize => Self::SendZeroSize, + Error::SendTooLarge(s) => Self::SendTooLarge(s), + Error::StreamClosed => Self::StreamClosed, + Error::HandshakeTimeout => Self::HandshakeTimeout, + } + } +} + /// Configuration for a connection. /// /// # Warning @@ -227,29 +252,38 @@ pub async fn dial( /// Accepts an authenticated connection from a peer as the listener. /// Returns the peer's identity, sender, and receiver for encrypted communication. +/// +/// The `bouncer` function is called with the peer's public key to decide whether to accept +/// the connection. It should return `Ok(())` to accept or `Err(rejection_reason)` to reject. +/// The rejection reason is included in the returned `Error::PeerRejected` variant. pub async fn listen< - R: CryptoRngCore + Clock, + Ctx: CryptoRngCore + Clock, S: Signer, I: Stream, O: Sink, - Fut: Future, + R, + Fut: Future>, F: FnOnce(S::PublicKey) -> Fut, >( - mut ctx: R, + mut ctx: Ctx, bouncer: F, config: Config, mut stream: I, mut sink: O, -) -> Result<(S::PublicKey, Sender, Receiver), Error> { +) -> Result<(S::PublicKey, Sender, Receiver), Error> { let timeout = ctx.sleep(config.handshake_timeout); let inner_routine = async move { - let peer_bytes = recv_frame(&mut stream, config.max_message_size).await?; + let peer_bytes = recv_frame(&mut stream, config.max_message_size) + .await + .map_err(Error::from_unit)?; let peer = S::PublicKey::decode(peer_bytes)?; - if !bouncer(peer.clone()).await { - return Err(Error::PeerRejected(peer.encode().to_vec())); + if let Err(rejection_reason) = bouncer(peer.clone()).await { + return Err(Error::PeerRejected(rejection_reason)); } - let msg1_bytes = recv_frame(&mut stream, config.max_message_size).await?; + let msg1_bytes = recv_frame(&mut stream, config.max_message_size) + .await + .map_err(Error::from_unit)?; let msg1 = Syn::::decode(msg1_bytes)?; let (current_time, ok_timestamps) = config.time_information(&ctx); @@ -264,9 +298,13 @@ pub async fn listen< ), msg1, )?; - send_frame(&mut sink, &syn_ack.encode(), config.max_message_size).await?; + send_frame(&mut sink, &syn_ack.encode(), config.max_message_size) + .await + .map_err(Error::from_unit)?; - let ack_bytes = recv_frame(&mut stream, config.max_message_size).await?; + let ack_bytes = recv_frame(&mut stream, config.max_message_size) + .await + .map_err(Error::from_unit)?; let ack = Ack::decode(ack_bytes)?; let (send, recv) = listen_end(state, ack)?; @@ -372,7 +410,7 @@ mod test { let listener_handle = context.clone().spawn(move |context| async move { listen( context, - |_| async { true }, + |_| async { Ok::<(), ()>(()) }, listener_config, listener_stream, listener_sink, From 4f5e21f141ee968105ab53a800ac17790745ecb4 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Thu, 1 Jan 2026 15:05:59 -0800 Subject: [PATCH 4/6] nit --- p2p/src/authenticated/discovery/actors/tracker/actor.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/p2p/src/authenticated/discovery/actors/tracker/actor.rs b/p2p/src/authenticated/discovery/actors/tracker/actor.rs index 246282f70d..6eb3e7b908 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/actor.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/actor.rs @@ -1,5 +1,3 @@ -#[cfg(test)] -use super::ingress::Acceptable; use super::{ directory::{self, Directory}, ingress::{Message, Oracle}, @@ -281,7 +279,10 @@ mod tests { use crate::{ authenticated::{ discovery::{ - actors::{peer, tracker}, + actors::{ + peer, + tracker::{self, Acceptable}, + }, config::Bootstrapper, types, }, From 9786009a5a813dc15a186bae6a657135cfd052a6 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Thu, 1 Jan 2026 15:20:24 -0800 Subject: [PATCH 5/6] nits --- .../discovery/actors/listener.rs | 4 +- .../discovery/actors/tracker/ingress.rs | 4 +- .../authenticated/lookup/actors/listener.rs | 24 ++++-- .../lookup/actors/tracker/actor.rs | 70 +++++++++++----- .../lookup/actors/tracker/directory.rs | 13 +-- .../lookup/actors/tracker/ingress.rs | 23 +++++- .../lookup/actors/tracker/mod.rs | 2 +- .../lookup/actors/tracker/record.rs | 80 ++++++++++++------- 8 files changed, 151 insertions(+), 69 deletions(-) diff --git a/p2p/src/authenticated/discovery/actors/listener.rs b/p2p/src/authenticated/discovery/actors/listener.rs index 9b577b5319..ed2766c4a6 100644 --- a/p2p/src/authenticated/discovery/actors/listener.rs +++ b/p2p/src/authenticated/discovery/actors/listener.rs @@ -106,9 +106,9 @@ impl Actor< let (peer, send, recv) = match listen( context, |peer| { - let mut tracker = tracker.clone(); + let fut = tracker.acceptable(peer); async move { - let status = tracker.acceptable(peer).await; + let status = fut.await; if status == tracker::Acceptable::Yes { Ok(()) } else { diff --git a/p2p/src/authenticated/discovery/actors/tracker/ingress.rs b/p2p/src/authenticated/discovery/actors/tracker/ingress.rs index 4afcdb15a3..87caef98ad 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/ingress.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/ingress.rs @@ -196,14 +196,14 @@ impl UnboundedMailbox> { } /// Send an `Acceptable` message to the tracker. - pub async fn acceptable(&mut self, public_key: C) -> Acceptable { + pub fn acceptable(&mut self, public_key: C) -> impl std::future::Future { let (tx, rx) = oneshot::channel(); self.send(Message::Acceptable { public_key, responder: tx, }) .unwrap(); - rx.await.unwrap() + async move { rx.await.unwrap() } } /// Send a `Listen` message to the tracker. diff --git a/p2p/src/authenticated/lookup/actors/listener.rs b/p2p/src/authenticated/lookup/actors/listener.rs index e9540c42aa..321b0e651b 100644 --- a/p2p/src/authenticated/lookup/actors/listener.rs +++ b/p2p/src/authenticated/lookup/actors/listener.rs @@ -129,10 +129,11 @@ impl Actor< |peer| { let fut = tracker.acceptable(peer, source_ip); async move { - if fut.await { + let status = fut.await; + if status == tracker::Acceptable::Yes { Ok(()) } else { - Err(()) + Err(status) } } }, @@ -143,8 +144,15 @@ impl Actor< .await { Ok(connection) => connection, - Err(StreamError::PeerRejected(())) => { - debug!(?address, "peer not acceptable (unknown or not in peer set)"); + Err(StreamError::PeerRejected(reason)) => { + match reason { + tracker::Acceptable::Blocked => { + debug!(?address, "peer is blocked"); + } + tracker::Acceptable::Unknown | tracker::Acceptable::Yes => { + debug!(?address, "peer not acceptable (unknown or not in peer set)"); + } + } return; } Err(err) => { @@ -368,7 +376,7 @@ mod tests { while let Some(message) = tracker_rx.next().await { match message { tracker::Message::Acceptable { responder, .. } => { - let _ = responder.send(true); + let _ = responder.send(tracker::Acceptable::Yes); } tracker::Message::Listen { reservation, .. } => { let _ = reservation.send(None); @@ -529,7 +537,7 @@ mod tests { while let Some(message) = tracker_rx.next().await { match message { tracker::Message::Acceptable { responder, .. } => { - let _ = responder.send(true); + let _ = responder.send(tracker::Acceptable::Yes); } tracker::Message::Listen { reservation, .. } => { let _ = reservation.send(None); @@ -610,7 +618,7 @@ mod tests { while let Some(message) = tracker_rx.next().await { match message { tracker::Message::Acceptable { responder, .. } => { - let _ = responder.send(true); + let _ = responder.send(tracker::Acceptable::Yes); } tracker::Message::Listen { reservation, .. } => { let _ = reservation.send(None); @@ -702,7 +710,7 @@ mod tests { while let Some(message) = tracker_rx.next().await { match message { tracker::Message::Acceptable { responder, .. } => { - let _ = responder.send(true); + let _ = responder.send(tracker::Acceptable::Yes); } tracker::Message::Listen { reservation, .. } => { let _ = reservation.send(None); diff --git a/p2p/src/authenticated/lookup/actors/tracker/actor.rs b/p2p/src/authenticated/lookup/actors/tracker/actor.rs index 3b3e0a629e..cc2377fe6b 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/actor.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/actor.rs @@ -231,7 +231,10 @@ impl Actor { #[cfg(test)] mod tests { use super::*; - use crate::{authenticated::lookup::actors::peer, Blocker, Ingress, Manager}; + use crate::{ + authenticated::lookup::actors::{peer, tracker::Acceptable}, + Blocker, Ingress, Manager, + }; use commonware_cryptography::{ ed25519::{PrivateKey, PublicKey}, Signer, @@ -403,9 +406,18 @@ mod tests { } = setup_actor(context.clone(), cfg_initial); // None acceptable because not registered - assert!(!mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await); - assert!(!mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await); - assert!(!mailbox.acceptable(peer_pk3.clone(), peer_addr3.ip()).await); + assert_ne!( + mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await, + Acceptable::Yes + ); + assert_ne!( + mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await, + Acceptable::Yes + ); + assert_ne!( + mailbox.acceptable(peer_pk3.clone(), peer_addr3.ip()).await, + Acceptable::Yes + ); oracle .update( @@ -421,13 +433,25 @@ mod tests { context.sleep(Duration::from_millis(10)).await; // Not acceptable because self - assert!(!mailbox.acceptable(peer_pk, peer_addr.ip()).await); + assert_ne!( + mailbox.acceptable(peer_pk, peer_addr.ip()).await, + Acceptable::Yes + ); // Acceptable because registered with correct IP - assert!(mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await); + assert_eq!( + mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await, + Acceptable::Yes + ); // Not acceptable with wrong IP - assert!(!mailbox.acceptable(peer_pk2, peer_addr.ip()).await); + assert_ne!( + mailbox.acceptable(peer_pk2, peer_addr.ip()).await, + Acceptable::Yes + ); // Not acceptable because not registered - assert!(!mailbox.acceptable(peer_pk3, peer_addr3.ip()).await); + assert_ne!( + mailbox.acceptable(peer_pk3, peer_addr3.ip()).await, + Acceptable::Yes + ); }); } @@ -451,8 +475,9 @@ mod tests { } = setup_actor(context.clone(), cfg); // Unknown peer is NOT acceptable (bypass_ip_check only skips IP check) - assert!( - !mailbox.acceptable(peer_pk3.clone(), peer_addr3.ip()).await, + assert_ne!( + mailbox.acceptable(peer_pk3.clone(), peer_addr3.ip()).await, + Acceptable::Yes, "Unknown peer should not be acceptable" ); @@ -470,14 +495,16 @@ mod tests { context.sleep(Duration::from_millis(10)).await; // With bypass_ip_check=true, registered peer with wrong IP is acceptable - assert!( + assert_eq!( mailbox.acceptable(peer_pk2.clone(), peer_addr.ip()).await, + Acceptable::Yes, "Registered peer with wrong IP should be acceptable with bypass_ip_check=true" ); // Self is still not acceptable - assert!( - !mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await, + assert_ne!( + mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await, + Acceptable::Yes, "Self should not be acceptable" ); @@ -485,9 +512,10 @@ mod tests { oracle.block(peer_pk2.clone()).await; context.sleep(Duration::from_millis(10)).await; - assert!( - !mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await, - "Blocked peer should not be acceptable" + assert_eq!( + mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await, + Acceptable::Blocked, + "Blocked peer should return Blocked status" ); }); } @@ -514,12 +542,18 @@ mod tests { .await; context.sleep(Duration::from_millis(10)).await; // Allow register to process - assert!(mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await); + assert_eq!( + mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await, + Acceptable::Yes + ); let reservation = mailbox.listen(peer_pk.clone()).await; assert!(reservation.is_some()); - assert!(!mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await); + assert_ne!( + mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await, + Acceptable::Yes + ); let failed_reservation = mailbox.listen(peer_pk.clone()).await; assert!(failed_reservation.is_none()); diff --git a/p2p/src/authenticated/lookup/actors/tracker/directory.rs b/p2p/src/authenticated/lookup/actors/tracker/directory.rs index 655dbee62c..34ce82e5c2 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/directory.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/directory.rs @@ -1,4 +1,4 @@ -use super::{metrics::Metrics, record::Record, Metadata, Reservation}; +use super::{ingress::Acceptable, metrics::Metrics, record::Record, Metadata, Reservation}; use crate::{ authenticated::lookup::{actors::tracker::ingress::Releaser, metrics}, types::Address, @@ -268,14 +268,15 @@ impl Directory { result } - /// Returns true if this peer is acceptable (can accept an incoming connection from them). + /// Returns the acceptance status for a peer. /// /// Checks eligibility (peer set membership), egress IP match (if not bypass_ip_check), and connection status. - pub fn acceptable(&self, peer: &C, source_ip: IpAddr) -> bool { + pub fn acceptable(&self, peer: &C, source_ip: IpAddr) -> Acceptable { let now = self.context.current(); - self.peers - .get(peer) - .is_some_and(|r| r.acceptable(now, source_ip, self.bypass_ip_check)) + let Some(record) = self.peers.get(peer) else { + return Acceptable::Unknown; + }; + record.acceptable(now, source_ip, self.bypass_ip_check) } /// Return egress IPs we should listen for (accept incoming connections from). diff --git a/p2p/src/authenticated/lookup/actors/tracker/ingress.rs b/p2p/src/authenticated/lookup/actors/tracker/ingress.rs index 99a6df0c5a..0c273f7868 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/ingress.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/ingress.rs @@ -13,6 +13,17 @@ use commonware_utils::ordered::{Map, Set}; use futures::channel::{mpsc, oneshot}; use std::net::IpAddr; +/// Result of checking if a peer is acceptable for connection. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Acceptable { + /// Peer is acceptable for connection. + Yes, + /// Peer is blocked. + Blocked, + /// Peer is unknown (not in any peer set or IP mismatch). + Unknown, +} + /// Messages that can be sent to the tracker actor. #[derive(Debug)] pub enum Message { @@ -79,8 +90,8 @@ pub enum Message { /// The IP address the peer connected from. source_ip: IpAddr, - /// The sender to respond with whether the peer is acceptable. - responder: oneshot::Sender, + /// The sender to respond with the acceptance status. + responder: oneshot::Sender, }, /// Request a reservation for a particular peer. @@ -129,7 +140,11 @@ impl UnboundedMailbox> { } /// Send an `Acceptable` message to the tracker. - pub async fn acceptable(&mut self, public_key: C, source_ip: IpAddr) -> bool { + pub fn acceptable( + &mut self, + public_key: C, + source_ip: IpAddr, + ) -> impl std::future::Future { let (tx, rx) = oneshot::channel(); self.send(Message::Acceptable { public_key, @@ -137,7 +152,7 @@ impl UnboundedMailbox> { responder: tx, }) .unwrap(); - rx.await.unwrap() + async move { rx.await.unwrap() } } /// Send a `Listen` message to the tracker. diff --git a/p2p/src/authenticated/lookup/actors/tracker/mod.rs b/p2p/src/authenticated/lookup/actors/tracker/mod.rs index 9803361946..8f39ac86e8 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/mod.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/mod.rs @@ -18,7 +18,7 @@ mod record; mod reservation; pub use actor::Actor; -pub use ingress::{Message, Oracle}; +pub use ingress::{Acceptable, Message, Oracle}; pub use metadata::Metadata; pub use reservation::Reservation; diff --git a/p2p/src/authenticated/lookup/actors/tracker/record.rs b/p2p/src/authenticated/lookup/actors/tracker/record.rs index 65f9474c80..34c1d3254d 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/record.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/record.rs @@ -1,3 +1,4 @@ +use super::ingress::Acceptable; use crate::types::{self, Ingress}; use std::{net::IpAddr, time::SystemTime}; @@ -195,22 +196,33 @@ impl Record { ingress.is_valid(allow_private_ips, allow_dns) } - /// Returns `true` if this peer is acceptable (can accept an incoming connection from them). + /// Returns the acceptance status for this peer. /// /// A peer is acceptable if: /// - The peer is eligible (in a peer set, not blocked unless expired, not ourselves) /// - The source IP matches the expected egress IP for this peer (if not bypass_ip_check) /// - We are not already connected or reserved - pub fn acceptable(&self, now: SystemTime, source_ip: IpAddr, bypass_ip_check: bool) -> bool { + pub fn acceptable( + &self, + now: SystemTime, + source_ip: IpAddr, + bypass_ip_check: bool, + ) -> Acceptable { + // Check if blocked (not expired) + if self.blocked(now) { + return Acceptable::Blocked; + } + // Check eligibility (peer set membership, not ourselves) and connection status if !self.eligible(now) || self.status != Status::Inert { - return false; + return Acceptable::Unknown; } + // Check IP match if bypass_ip_check { - return true; + return Acceptable::Yes; } match &self.address { - Address::Known(addr) => addr.egress_ip() == source_ip, - _ => false, + Address::Known(addr) if addr.egress_ip() == source_ip => Acceptable::Yes, + _ => Acceptable::Unknown, } } @@ -508,21 +520,24 @@ mod tests { // Eligible, Inert, and correct IP - acceptable let mut record = Record::known(types::Address::Symmetric(public_socket)); record.increment(); - assert!( + assert_eq!( record.acceptable(now(), egress_ip, false), + Acceptable::Yes, "Eligible, Inert, correct IP is acceptable" ); // Correct everything but wrong IP - not acceptable - assert!( - !record.acceptable(now(), wrong_ip, false), + assert_eq!( + record.acceptable(now(), wrong_ip, false), + Acceptable::Unknown, "Not acceptable when IP doesn't match" ); // Not eligible (sets=0) - not acceptable let record_not_eligible = Record::known(types::Address::Symmetric(public_socket)); - assert!( - !record_not_eligible.acceptable(now(), egress_ip, false), + assert_eq!( + record_not_eligible.acceptable(now(), egress_ip, false), + Acceptable::Unknown, "Not acceptable when not eligible" ); @@ -530,8 +545,9 @@ mod tests { let mut record_reserved = Record::known(types::Address::Symmetric(public_socket)); record_reserved.increment(); record_reserved.reserve(now()); - assert!( - !record_reserved.acceptable(now(), egress_ip, false), + assert_eq!( + record_reserved.acceptable(now(), egress_ip, false), + Acceptable::Unknown, "Not acceptable when reserved" ); @@ -540,8 +556,9 @@ mod tests { record_connected.increment(); record_connected.reserve(now()); record_connected.connect(); - assert!( - !record_connected.acceptable(now(), egress_ip, false), + assert_eq!( + record_connected.acceptable(now(), egress_ip, false), + Acceptable::Unknown, "Not acceptable when connected" ); @@ -549,8 +566,9 @@ mod tests { let mut record_blocked = Record::known(types::Address::Symmetric(public_socket)); record_blocked.increment(); record_blocked.block(future_time()); - assert!( - !record_blocked.acceptable(now(), egress_ip, false), + assert_eq!( + record_blocked.acceptable(now(), egress_ip, false), + Acceptable::Blocked, "Not acceptable when blocked" ); } @@ -564,15 +582,17 @@ mod tests { // With bypass_ip_check=true, accepts even with wrong IP (skips IP check) let mut record = Record::known(types::Address::Symmetric(public_socket)); record.increment(); - assert!( + assert_eq!( record.acceptable(now(), wrong_ip, true), + Acceptable::Yes, "Acceptable with wrong IP when bypass_ip_check=true" ); // Still requires eligible (sets > 0), even with bypass_ip_check=true let record_not_eligible = Record::known(types::Address::Symmetric(public_socket)); - assert!( - !record_not_eligible.acceptable(now(), egress_ip, true), + assert_eq!( + record_not_eligible.acceptable(now(), egress_ip, true), + Acceptable::Unknown, "Not acceptable when not eligible (sets=0), even with bypass_ip_check=true" ); @@ -580,8 +600,9 @@ mod tests { let mut record_blocked = Record::known(types::Address::Symmetric(public_socket)); record_blocked.increment(); record_blocked.block(future_time()); - assert!( - !record_blocked.acceptable(now(), egress_ip, true), + assert_eq!( + record_blocked.acceptable(now(), egress_ip, true), + Acceptable::Blocked, "Not acceptable when blocked" ); @@ -589,8 +610,9 @@ mod tests { let mut record_reserved = Record::known(types::Address::Symmetric(public_socket)); record_reserved.increment(); record_reserved.reserve(now()); - assert!( - !record_reserved.acceptable(now(), egress_ip, true), + assert_eq!( + record_reserved.acceptable(now(), egress_ip, true), + Acceptable::Unknown, "Not acceptable when reserved" ); @@ -599,15 +621,17 @@ mod tests { record_connected.increment(); record_connected.reserve(now()); record_connected.connect(); - assert!( - !record_connected.acceptable(now(), egress_ip, true), + assert_eq!( + record_connected.acceptable(now(), egress_ip, true), + Acceptable::Unknown, "Not acceptable when connected" ); // Still not acceptable when myself let record_myself = Record::myself(); - assert!( - !record_myself.acceptable(now(), egress_ip, true), + assert_eq!( + record_myself.acceptable(now(), egress_ip, true), + Acceptable::Unknown, "Not acceptable when myself" ); } From f7f6369f3b3079d2cfbb968b2b3656f4702e0d90 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Thu, 1 Jan 2026 15:28:30 -0800 Subject: [PATCH 6/6] progress --- .../discovery/actors/listener.rs | 7 +++-- .../discovery/actors/tracker/actor.rs | 4 +-- .../discovery/actors/tracker/directory.rs | 9 +------ .../discovery/actors/tracker/ingress.rs | 4 ++- .../discovery/actors/tracker/record.rs | 23 +++++++++++++--- .../authenticated/lookup/actors/listener.rs | 7 +++-- .../lookup/actors/tracker/ingress.rs | 4 ++- .../lookup/actors/tracker/record.rs | 26 ++++++++++++------- 8 files changed, 56 insertions(+), 28 deletions(-) diff --git a/p2p/src/authenticated/discovery/actors/listener.rs b/p2p/src/authenticated/discovery/actors/listener.rs index ed2766c4a6..4c1ed39858 100644 --- a/p2p/src/authenticated/discovery/actors/listener.rs +++ b/p2p/src/authenticated/discovery/actors/listener.rs @@ -128,8 +128,11 @@ impl Actor< tracker::Acceptable::Blocked => { debug!(?address, "peer is blocked"); } - tracker::Acceptable::Unknown | tracker::Acceptable::Yes => { - debug!(?address, "peer not acceptable (unknown or not in peer set)"); + tracker::Acceptable::Unknown => { + debug!(?address, "peer unknown (not in peer set)"); + } + tracker::Acceptable::Rejected | tracker::Acceptable::Yes => { + debug!(?address, "peer rejected"); } } return; diff --git a/p2p/src/authenticated/discovery/actors/tracker/actor.rs b/p2p/src/authenticated/discovery/actors/tracker/actor.rs index 6eb3e7b908..72c7bcb199 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/actor.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/actor.rs @@ -819,7 +819,7 @@ mod tests { context.sleep(Duration::from_millis(10)).await; // Not listenable because self - assert_eq!(mailbox.acceptable(peer_pk).await, Acceptable::Unknown); + assert_eq!(mailbox.acceptable(peer_pk).await, Acceptable::Rejected); // Listenable because registered assert_eq!(mailbox.acceptable(peer_pk2).await, Acceptable::Yes); // Not listenable because not registered @@ -855,7 +855,7 @@ mod tests { assert_eq!( mailbox.acceptable(peer_pk.clone()).await, - Acceptable::Unknown + Acceptable::Rejected ); let failed_reservation = mailbox.listen(peer_pk.clone()).await; diff --git a/p2p/src/authenticated/discovery/actors/tracker/directory.rs b/p2p/src/authenticated/discovery/actors/tracker/directory.rs index daafa2106e..3dd197408c 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/directory.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/directory.rs @@ -383,14 +383,7 @@ impl Directory { let Some(record) = self.peers.get(peer) else { return Acceptable::Unknown; }; - if record.blocked(now) { - return Acceptable::Blocked; - } - if record.acceptable(now) { - Acceptable::Yes - } else { - Acceptable::Unknown - } + record.acceptable(now) } // --------- Helpers ---------- diff --git a/p2p/src/authenticated/discovery/actors/tracker/ingress.rs b/p2p/src/authenticated/discovery/actors/tracker/ingress.rs index 87caef98ad..9b261f3b45 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/ingress.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/ingress.rs @@ -18,8 +18,10 @@ pub enum Acceptable { Yes, /// Peer is blocked. Blocked, - /// Peer is unknown (not in any peer set). + /// Peer is not in any peer set. Unknown, + /// Peer is known but rejected (already connected, reserved, ourselves, etc.). + Rejected, } /// Messages that can be sent to the tracker actor. diff --git a/p2p/src/authenticated/discovery/actors/tracker/record.rs b/p2p/src/authenticated/discovery/actors/tracker/record.rs index 835b4d91ac..581510a006 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/record.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/record.rs @@ -1,3 +1,4 @@ +use super::ingress::Acceptable; use crate::{authenticated::discovery::types::Info, Ingress}; use commonware_cryptography::PublicKey; use std::time::SystemTime; @@ -247,13 +248,29 @@ impl Record { ingress.is_valid(allow_private_ips, allow_dns) } - /// Returns `true` if this peer is acceptable (can accept an incoming connection from them). + /// Returns the acceptance status for this peer. /// /// A peer is acceptable if: /// - The peer is eligible (in a peer set, not blocked, not ourselves) /// - We are not already connected or reserved - pub fn acceptable(&self, now: SystemTime) -> bool { - self.eligible(now) && self.status == Status::Inert + pub fn acceptable(&self, now: SystemTime) -> Acceptable { + // Check if ourselves + if matches!(self.address, Address::Myself(_)) { + return Acceptable::Rejected; + } + // Check if blocked + if self.blocked(now) { + return Acceptable::Blocked; + } + // Check if in a peer set + if self.sets == 0 && !self.persistent { + return Acceptable::Unknown; + } + // Check if already connected or reserved + if self.status != Status::Inert { + return Acceptable::Rejected; + } + Acceptable::Yes } /// Return the ingress address of the peer, if known. diff --git a/p2p/src/authenticated/lookup/actors/listener.rs b/p2p/src/authenticated/lookup/actors/listener.rs index 321b0e651b..88495cc9e8 100644 --- a/p2p/src/authenticated/lookup/actors/listener.rs +++ b/p2p/src/authenticated/lookup/actors/listener.rs @@ -149,8 +149,11 @@ impl Actor< tracker::Acceptable::Blocked => { debug!(?address, "peer is blocked"); } - tracker::Acceptable::Unknown | tracker::Acceptable::Yes => { - debug!(?address, "peer not acceptable (unknown or not in peer set)"); + tracker::Acceptable::Unknown => { + debug!(?address, "peer unknown (not in peer set)"); + } + tracker::Acceptable::Rejected | tracker::Acceptable::Yes => { + debug!(?address, "peer rejected"); } } return; diff --git a/p2p/src/authenticated/lookup/actors/tracker/ingress.rs b/p2p/src/authenticated/lookup/actors/tracker/ingress.rs index 0c273f7868..49ca08a03b 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/ingress.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/ingress.rs @@ -20,8 +20,10 @@ pub enum Acceptable { Yes, /// Peer is blocked. Blocked, - /// Peer is unknown (not in any peer set or IP mismatch). + /// Peer is not in any peer set. Unknown, + /// Peer is known but rejected (already connected, reserved, ourselves, IP mismatch, etc.). + Rejected, } /// Messages that can be sent to the tracker actor. diff --git a/p2p/src/authenticated/lookup/actors/tracker/record.rs b/p2p/src/authenticated/lookup/actors/tracker/record.rs index 34c1d3254d..7752269426 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/record.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/record.rs @@ -208,21 +208,29 @@ impl Record { source_ip: IpAddr, bypass_ip_check: bool, ) -> Acceptable { + // Check if ourselves + if matches!(self.address, Address::Myself) { + return Acceptable::Rejected; + } // Check if blocked (not expired) if self.blocked(now) { return Acceptable::Blocked; } - // Check eligibility (peer set membership, not ourselves) and connection status - if !self.eligible(now) || self.status != Status::Inert { + // Check if in a peer set + if self.sets == 0 && !self.persistent { return Acceptable::Unknown; } + // Check if already connected or reserved + if self.status != Status::Inert { + return Acceptable::Rejected; + } // Check IP match if bypass_ip_check { return Acceptable::Yes; } match &self.address { Address::Known(addr) if addr.egress_ip() == source_ip => Acceptable::Yes, - _ => Acceptable::Unknown, + _ => Acceptable::Rejected, // Known peer but wrong IP } } @@ -529,7 +537,7 @@ mod tests { // Correct everything but wrong IP - not acceptable assert_eq!( record.acceptable(now(), wrong_ip, false), - Acceptable::Unknown, + Acceptable::Rejected, "Not acceptable when IP doesn't match" ); @@ -547,7 +555,7 @@ mod tests { record_reserved.reserve(now()); assert_eq!( record_reserved.acceptable(now(), egress_ip, false), - Acceptable::Unknown, + Acceptable::Rejected, "Not acceptable when reserved" ); @@ -558,7 +566,7 @@ mod tests { record_connected.connect(); assert_eq!( record_connected.acceptable(now(), egress_ip, false), - Acceptable::Unknown, + Acceptable::Rejected, "Not acceptable when connected" ); @@ -612,7 +620,7 @@ mod tests { record_reserved.reserve(now()); assert_eq!( record_reserved.acceptable(now(), egress_ip, true), - Acceptable::Unknown, + Acceptable::Rejected, "Not acceptable when reserved" ); @@ -623,7 +631,7 @@ mod tests { record_connected.connect(); assert_eq!( record_connected.acceptable(now(), egress_ip, true), - Acceptable::Unknown, + Acceptable::Rejected, "Not acceptable when connected" ); @@ -631,7 +639,7 @@ mod tests { let record_myself = Record::myself(); assert_eq!( record_myself.acceptable(now(), egress_ip, true), - Acceptable::Unknown, + Acceptable::Rejected, "Not acceptable when myself" ); }