diff --git a/p2p/src/authenticated/discovery/actors/listener.rs b/p2p/src/authenticated/discovery/actors/listener.rs index ee7a3d7f95..44e74c079a 100644 --- a/p2p/src/authenticated/discovery/actors/listener.rs +++ b/p2p/src/authenticated/discovery/actors/listener.rs @@ -3,7 +3,7 @@ use crate::authenticated::{ discovery::actors::{spawner, tracker}, mailbox::UnboundedMailbox, - Mailbox, + Attempt, Mailbox, }; use commonware_cryptography::Signer; use commonware_macros::select_loop; @@ -105,7 +105,11 @@ impl Actor { - let _ = responder.send(true); + let _ = responder.send(Attempt::Ok); } tracker::Message::Listen { reservation, .. } => { let _ = reservation.send(None); @@ -443,7 +447,7 @@ mod tests { while let Some(message) = tracker_rx.next().await { match message { tracker::Message::Acceptable { responder, .. } => { - let _ = responder.send(true); + let _ = responder.send(Attempt::Ok); } tracker::Message::Listen { reservation, .. } => { let _ = reservation.send(None); diff --git a/p2p/src/authenticated/discovery/actors/tracker/actor.rs b/p2p/src/authenticated/discovery/actors/tracker/actor.rs index 01e61adb83..dac10b52a9 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/actor.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/actor.rs @@ -286,7 +286,7 @@ mod tests { config::Bootstrapper, types, }, - Mailbox, + Attempt, Mailbox, }, Blocker, Ingress, Manager, }; @@ -799,22 +799,28 @@ mod tests { .. } = setup_actor(context.clone(), cfg_initial); - // None listenable because not registered - assert!(!mailbox.acceptable(peer_pk.clone()).await); - assert!(!mailbox.acceptable(peer_pk2.clone()).await); - assert!(!mailbox.acceptable(peer_pk3.clone()).await); + // peer_pk is ourselves, others not registered + assert_eq!(mailbox.acceptable(peer_pk.clone()).await, Attempt::Myself); + assert_eq!( + mailbox.acceptable(peer_pk2.clone()).await, + Attempt::Unregistered + ); + assert_eq!( + mailbox.acceptable(peer_pk3.clone()).await, + Attempt::Unregistered + ); oracle .update(0, [peer_pk.clone(), peer_pk2.clone()].try_into().unwrap()) .await; context.sleep(Duration::from_millis(10)).await; - // Not listenable because self - assert!(!mailbox.acceptable(peer_pk).await); - // Listenable because registered - assert!(mailbox.acceptable(peer_pk2).await); - // Not listenable because not registered - assert!(!mailbox.acceptable(peer_pk3).await); + // Not acceptable because self + assert_eq!(mailbox.acceptable(peer_pk).await, Attempt::Myself); + // Acceptable because registered + assert_eq!(mailbox.acceptable(peer_pk2).await, Attempt::Ok); + // Not acceptable because not registered + assert_eq!(mailbox.acceptable(peer_pk3).await, Attempt::Unregistered); }); } @@ -839,12 +845,12 @@ mod tests { .await; context.sleep(Duration::from_millis(10)).await; // Allow register to process - assert!(mailbox.acceptable(peer_pk.clone()).await); + assert_eq!(mailbox.acceptable(peer_pk.clone()).await, Attempt::Ok); let reservation = mailbox.listen(peer_pk.clone()).await; assert!(reservation.is_some()); - assert!(!mailbox.acceptable(peer_pk.clone()).await); + assert_eq!(mailbox.acceptable(peer_pk.clone()).await, Attempt::Reserved); let failed_reservation = mailbox.listen(peer_pk.clone()).await; assert!(failed_reservation.is_none()); diff --git a/p2p/src/authenticated/discovery/actors/tracker/directory.rs b/p2p/src/authenticated/discovery/actors/tracker/directory.rs index 630b2edce9..9ef8e57e69 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/directory.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/directory.rs @@ -1,9 +1,12 @@ use super::{metrics::Metrics, record::Record, set::Set, Metadata, Reservation}; use crate::{ - authenticated::discovery::{ - actors::tracker::ingress::Releaser, - metrics, - types::{self, Info}, + authenticated::{ + discovery::{ + actors::tracker::ingress::Releaser, + metrics, + types::{self, Info}, + }, + Attempt, }, Ingress, }; @@ -384,9 +387,39 @@ impl Directory { result } - /// Returns true if this peer is acceptable (can accept an incoming connection from them). - pub fn acceptable(&self, peer: &C) -> bool { - !self.blocked.contains(peer) && self.peers.get(peer).is_some_and(|r| r.acceptable()) + /// Returns the acceptance status for this peer (can accept an incoming connection from them). + /// + /// Checks blocked status first, then delegates to record for eligibility and connection status. + /// Increments metrics for rejections. + pub fn acceptable(&self, peer: &C) -> Attempt { + if self.blocked.contains(peer) { + self.metrics.rejected_blocked.inc(); + debug!(?peer, "peer rejected: blocked"); + return Attempt::Blocked; + } + let result = self + .peers + .get(peer) + .map(|r| r.acceptable()) + .unwrap_or(Attempt::Unregistered); + match result { + Attempt::Ok => {} + Attempt::Blocked => unreachable!("record does not return Blocked"), + Attempt::Unregistered => { + self.metrics.rejected_unregistered.inc(); + debug!(?peer, "peer rejected: unregistered or ineligible"); + } + Attempt::Reserved => { + self.metrics.rejected_reserved.inc(); + debug!(?peer, "peer rejected: already connected"); + } + Attempt::Mismatch => unreachable!("discovery record does not return Mismatch"), + Attempt::Myself => { + self.metrics.rejected_myself.inc(); + debug!(?peer, "peer rejected: is ourselves"); + } + } + result } /// Unblock all peers whose block has expired and update the knowledge bitmap. @@ -485,7 +518,7 @@ impl Directory { #[cfg(test)] mod tests { use super::*; - use crate::authenticated::{discovery::types, mailbox::UnboundedMailbox}; + use crate::authenticated::{discovery::types, mailbox::UnboundedMailbox, Attempt}; use commonware_cryptography::{secp256r1::standard::PrivateKey, Signer}; use commonware_runtime::{deterministic, Clock, Runner}; use commonware_utils::{bitmap::BitMap, NZU32}; @@ -1032,8 +1065,9 @@ mod tests { directory.update_peers(vec![peer_info]); // Peer should be acceptable before blocking - assert!( + assert_eq!( directory.acceptable(&peer_pk), + Attempt::Ok, "Peer should be acceptable before blocking" ); @@ -1041,8 +1075,9 @@ mod tests { directory.block(&peer_pk); // Peer should NOT be acceptable while blocked - assert!( - !directory.acceptable(&peer_pk), + assert_eq!( + directory.acceptable(&peer_pk), + Attempt::Blocked, "Blocked peer should not be acceptable" ); @@ -1051,8 +1086,9 @@ mod tests { directory.unblock_expired(); // Peer should be acceptable again after unblock - assert!( + assert_eq!( directory.acceptable(&peer_pk), + Attempt::Ok, "Peer should be acceptable after unblock" ); }); diff --git a/p2p/src/authenticated/discovery/actors/tracker/ingress.rs b/p2p/src/authenticated/discovery/actors/tracker/ingress.rs index 2759b86c62..fc7751b914 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/ingress.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/ingress.rs @@ -5,7 +5,7 @@ use crate::authenticated::{ types, }, mailbox::UnboundedMailbox, - Mailbox, + Attempt, Mailbox, }; use commonware_cryptography::PublicKey; use commonware_utils::ordered::Set; @@ -110,8 +110,8 @@ pub enum Message { /// The public key of the peer to check. public_key: C, - /// The sender to respond with whether the peer is acceptable. - responder: oneshot::Sender, + /// The sender to respond with the acceptance result. + responder: oneshot::Sender, }, /// Request a reservation for a particular peer. @@ -185,7 +185,7 @@ impl UnboundedMailbox> { } /// Send an `Acceptable` message to the tracker. - pub async fn acceptable(&mut self, public_key: C) -> bool { + pub async fn acceptable(&mut self, public_key: C) -> Attempt { let (tx, rx) = oneshot::channel(); self.send(Message::Acceptable { public_key, diff --git a/p2p/src/authenticated/discovery/actors/tracker/metrics.rs b/p2p/src/authenticated/discovery/actors/tracker/metrics.rs index e0a2352373..a17da4a884 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/metrics.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/metrics.rs @@ -21,6 +21,18 @@ pub struct Metrics { /// A count of the number of updates for each peer. pub updates: Family, + + /// Number of times a peer was rejected because they were blocked. + pub rejected_blocked: Counter, + + /// Number of times a peer was rejected because they were unregistered. + pub rejected_unregistered: Counter, + + /// Number of times a peer was rejected because they were already connected. + pub rejected_reserved: Counter, + + /// Number of times a peer was rejected because they are ourselves. + pub rejected_myself: Counter, } impl Metrics { @@ -52,6 +64,26 @@ impl Metrics { "Count of the number of updates for each peer", metrics.updates.clone(), ); + context.register( + "rejected_blocked", + "Number of times a peer was rejected because they were blocked", + metrics.rejected_blocked.clone(), + ); + context.register( + "rejected_unregistered", + "Number of times a peer was rejected because they were unregistered", + metrics.rejected_unregistered.clone(), + ); + context.register( + "rejected_reserved", + "Number of times a peer was rejected because they were already connected", + metrics.rejected_reserved.clone(), + ); + context.register( + "rejected_myself", + "Number of times a peer was rejected because they are ourselves", + metrics.rejected_myself.clone(), + ); metrics } } diff --git a/p2p/src/authenticated/discovery/actors/tracker/record.rs b/p2p/src/authenticated/discovery/actors/tracker/record.rs index b0a2f732de..c5add62ed1 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/record.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/record.rs @@ -1,4 +1,7 @@ -use crate::{authenticated::discovery::types::Info, Ingress}; +use crate::{ + authenticated::{discovery::types::Info, Attempt}, + Ingress, +}; use commonware_cryptography::PublicKey; use tracing::trace; @@ -218,13 +221,20 @@ impl Record { ingress.is_valid(allow_private_ips, allow_dns) } - /// Returns `true` if this peer is acceptable (can accept an incoming connection from them). + /// Returns the acceptance status for this peer (can accept an incoming connection from them). /// - /// A peer is acceptable if: - /// - The peer is eligible (in a peer set, not ourselves) - /// - We are not already connected or reserved - pub fn acceptable(&self) -> bool { - self.eligible() && self.status == Status::Inert + /// Checks for self, eligibility (peer set membership), and connection status. + pub const fn acceptable(&self) -> Attempt { + if matches!(self.address, Address::Myself(_)) { + return Attempt::Myself; + } + if !self.eligible() { + return Attempt::Unregistered; + } + if self.reserved() { + return Attempt::Reserved; + } + Attempt::Ok } /// Return the ingress address of the peer, if known. diff --git a/p2p/src/authenticated/lookup/actors/listener.rs b/p2p/src/authenticated/lookup/actors/listener.rs index 899d325ef5..b5ce0e934c 100644 --- a/p2p/src/authenticated/lookup/actors/listener.rs +++ b/p2p/src/authenticated/lookup/actors/listener.rs @@ -1,9 +1,9 @@ //! Listener use crate::authenticated::{ - lookup::actors::{spawner, tracker}, + lookup::actors::{spawner, tracker, tracker::directory::ListenableIps}, mailbox::UnboundedMailbox, - Mailbox, + Attempt, Mailbox, }; use commonware_cryptography::Signer; use commonware_macros::select_loop; @@ -16,11 +16,7 @@ use commonware_utils::{concurrency::Limiter, net::SubnetMask, IpAddrExt}; use futures::{channel::mpsc, StreamExt}; use prometheus_client::metrics::counter::Counter; use rand_core::CryptoRngCore; -use std::{ - collections::HashSet, - net::{IpAddr, SocketAddr}, - num::NonZeroU32, -}; +use std::{net::SocketAddr, num::NonZeroU32}; use tracing::debug; /// Subnet mask of `/24` for IPv4 and `/48` for IPv6 networks. @@ -50,23 +46,37 @@ pub struct Actor, - mailbox: mpsc::Receiver>, + listenable_ips: ListenableIps, + mailbox: mpsc::Receiver, handshakes_blocked: Counter, + handshakes_ip_blocked: Counter, + handshakes_ip_unregistered: Counter, handshakes_concurrent_rate_limited: Counter, handshakes_ip_rate_limited: Counter, handshakes_subnet_rate_limited: Counter, } impl Actor { - pub fn new(context: E, cfg: Config, mailbox: mpsc::Receiver>) -> Self { + pub fn new(context: E, cfg: Config, mailbox: mpsc::Receiver) -> Self { // Create metrics let handshakes_blocked = Counter::default(); context.register( "handshakes_blocked", - "number of handshake attempts blocked because the IP was not registered", + "number of handshake attempts blocked because the IP was private", handshakes_blocked.clone(), ); + let handshakes_ip_blocked = Counter::default(); + context.register( + "handshakes_ip_blocked", + "number of handshake attempts blocked because the IP belongs to a blocked peer", + handshakes_ip_blocked.clone(), + ); + let handshakes_ip_unregistered = Counter::default(); + context.register( + "handshakes_ip_unregistered", + "number of handshake attempts blocked because the IP was not registered", + handshakes_ip_unregistered.clone(), + ); let handshakes_concurrent_rate_limited = Counter::default(); context.register( "handshake_concurrent_rate_limited", @@ -96,9 +106,11 @@ impl Actor Actor Actor { - let Some(registered_ips) = update else { + let Some(listenable_ips) = update else { debug!("mailbox closed"); break; }; - self.registered_ips = registered_ips; + self.listenable_ips = listenable_ips; }, listener = listener.accept() => { // Accept a new connection @@ -210,10 +226,17 @@ impl Actor { - let _ = responder.send(true); + let _ = responder.send(Attempt::Ok); } tracker::Message::Listen { reservation, .. } => { let _ = reservation.send(None); @@ -496,7 +523,7 @@ mod tests { while let Some(message) = tracker_rx.next().await { match message { tracker::Message::Acceptable { responder, .. } => { - let _ = responder.send(true); + let _ = responder.send(Attempt::Ok); } tracker::Message::Listen { reservation, .. } => { let _ = reservation.send(None); @@ -532,7 +559,7 @@ mod tests { // Check metrics let metrics = context.encode(); assert!( - metrics.contains("handshakes_blocked_total 1"), + metrics.contains("handshakes_ip_unregistered_total 1"), "{}", metrics ); @@ -577,7 +604,7 @@ mod tests { while let Some(message) = tracker_rx.next().await { match message { tracker::Message::Acceptable { responder, .. } => { - let _ = responder.send(true); + let _ = responder.send(Attempt::Ok); } tracker::Message::Listen { reservation, .. } => { let _ = reservation.send(None); @@ -657,7 +684,10 @@ mod tests { let mut allowed = HashSet::new(); allowed.insert(IpAddr::V4(Ipv4Addr::LOCALHOST)); updates_tx - .send(allowed) + .send(ListenableIps { + allowed, + blocked: HashSet::new(), + }) .await .expect("update registered ips"); @@ -666,7 +696,7 @@ mod tests { while let Some(message) = tracker_rx.next().await { match message { tracker::Message::Acceptable { responder, .. } => { - let _ = responder.send(true); + let _ = responder.send(Attempt::Ok); } tracker::Message::Listen { reservation, .. } => { let _ = reservation.send(None); diff --git a/p2p/src/authenticated/lookup/actors/tracker/actor.rs b/p2p/src/authenticated/lookup/actors/tracker/actor.rs index c857339ef8..17cc51ef1d 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/actor.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/actor.rs @@ -1,5 +1,5 @@ use super::{ - directory::{self, Directory}, + directory::{self, Directory, ListenableIps}, ingress::{Message, Oracle}, Config, }; @@ -16,10 +16,7 @@ use commonware_runtime::{ use commonware_utils::ordered::Set; use futures::{channel::mpsc, StreamExt}; use rand::Rng; -use std::{ - collections::{HashMap, HashSet}, - net::IpAddr, -}; +use std::collections::HashMap; use tracing::debug; /// The tracker actor that manages peer discovery and connection reservations. @@ -35,7 +32,7 @@ pub struct Actor { receiver: mpsc::UnboundedReceiver>, /// The mailbox for the listener. - listener: Mailbox>, + listener: Mailbox, // ---------- State ---------- /// Tracks peer sets and peer connectivity information. @@ -231,7 +228,10 @@ impl Actor { #[cfg(test)] mod tests { use super::*; - use crate::{authenticated::lookup::actors::peer, Blocker, Ingress, Manager}; + use crate::{ + authenticated::{lookup::actors::peer, Attempt}, + Blocker, Ingress, Manager, + }; use commonware_cryptography::{ ed25519::{PrivateKey, PublicKey}, Signer, @@ -250,7 +250,7 @@ mod tests { fn test_config( crypto: C, bypass_ip_check: bool, - ) -> (Config, mpsc::Receiver>) { + ) -> (Config, mpsc::Receiver) { let (registered_ips_sender, registered_ips_receiver) = Mailbox::new(1); ( Config { @@ -402,10 +402,19 @@ mod tests { .. } = setup_actor(context.clone(), cfg_initial); - // None acceptable because not registered - assert!(!mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await); - assert!(!mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await); - assert!(!mailbox.acceptable(peer_pk3.clone(), peer_addr3.ip()).await); + // peer_pk is ourselves, others not registered + assert_eq!( + mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await, + Attempt::Myself + ); + assert_eq!( + mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await, + Attempt::Unregistered + ); + assert_eq!( + mailbox.acceptable(peer_pk3.clone(), peer_addr3.ip()).await, + Attempt::Unregistered + ); oracle .update( @@ -421,13 +430,25 @@ mod tests { context.sleep(Duration::from_millis(10)).await; // Not acceptable because self - assert!(!mailbox.acceptable(peer_pk, peer_addr.ip()).await); + assert_eq!( + mailbox.acceptable(peer_pk, peer_addr.ip()).await, + Attempt::Myself + ); // Acceptable because registered with correct IP - assert!(mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await); + assert_eq!( + mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await, + Attempt::Ok + ); // Not acceptable with wrong IP - assert!(!mailbox.acceptable(peer_pk2, peer_addr.ip()).await); + assert_eq!( + mailbox.acceptable(peer_pk2, peer_addr.ip()).await, + Attempt::Mismatch + ); // Not acceptable because not registered - assert!(!mailbox.acceptable(peer_pk3, peer_addr3.ip()).await); + assert_eq!( + mailbox.acceptable(peer_pk3, peer_addr3.ip()).await, + Attempt::Unregistered + ); }); } @@ -451,8 +472,9 @@ mod tests { } = setup_actor(context.clone(), cfg); // Unknown peer is NOT acceptable (bypass_ip_check only skips IP check) - assert!( - !mailbox.acceptable(peer_pk3.clone(), peer_addr3.ip()).await, + assert_eq!( + mailbox.acceptable(peer_pk3.clone(), peer_addr3.ip()).await, + Attempt::Unregistered, "Unknown peer should not be acceptable" ); @@ -470,14 +492,16 @@ mod tests { context.sleep(Duration::from_millis(10)).await; // With bypass_ip_check=true, registered peer with wrong IP is acceptable - assert!( + assert_eq!( mailbox.acceptable(peer_pk2.clone(), peer_addr.ip()).await, + Attempt::Ok, "Registered peer with wrong IP should be acceptable with bypass_ip_check=true" ); // Self is still not acceptable - assert!( - !mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await, + assert_eq!( + mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await, + Attempt::Myself, "Self should not be acceptable" ); @@ -485,8 +509,9 @@ mod tests { oracle.block(peer_pk2.clone()).await; context.sleep(Duration::from_millis(10)).await; - assert!( - !mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await, + assert_eq!( + mailbox.acceptable(peer_pk2.clone(), peer_addr2.ip()).await, + Attempt::Blocked, "Blocked peer should not be acceptable" ); }); @@ -514,12 +539,18 @@ mod tests { .await; context.sleep(Duration::from_millis(10)).await; // Allow register to process - assert!(mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await); + assert_eq!( + mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await, + Attempt::Ok + ); let reservation = mailbox.listen(peer_pk.clone()).await; assert!(reservation.is_some()); - assert!(!mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await); + assert_eq!( + mailbox.acceptable(peer_pk.clone(), peer_addr.ip()).await, + Attempt::Reserved + ); let failed_reservation = mailbox.listen(peer_pk.clone()).await; assert!(failed_reservation.is_none()); @@ -672,10 +703,10 @@ mod tests { context.sleep(Duration::from_millis(10)).await; // Wait for a listener update - let registered_ips = listener_receiver.next().await.unwrap(); - assert!(registered_ips.contains(&my_addr.ip())); - assert!(registered_ips.contains(&addr_1.ip())); - assert!(!registered_ips.contains(&addr_2.ip())); + let listenable_ips: ListenableIps = listener_receiver.next().await.unwrap(); + assert!(listenable_ips.allowed.contains(&my_addr.ip())); + assert!(listenable_ips.allowed.contains(&addr_1.ip())); + assert!(!listenable_ips.allowed.contains(&addr_2.ip())); // Mark peer as connected let reservation = mailbox.listen(pk_1.clone()).await; @@ -690,10 +721,10 @@ mod tests { .await; // Wait for a listener update - let registered_ips = listener_receiver.next().await.unwrap(); - assert!(!registered_ips.contains(&my_addr.ip())); - assert!(!registered_ips.contains(&addr_1.ip())); - assert!(registered_ips.contains(&addr_2.ip())); + let listenable_ips: ListenableIps = listener_receiver.next().await.unwrap(); + assert!(!listenable_ips.allowed.contains(&my_addr.ip())); + assert!(!listenable_ips.allowed.contains(&addr_1.ip())); + assert!(listenable_ips.allowed.contains(&addr_2.ip())); // The first peer should be have received a kill message because its // peer set was removed because `tracked_peer_sets` is 1. diff --git a/p2p/src/authenticated/lookup/actors/tracker/directory.rs b/p2p/src/authenticated/lookup/actors/tracker/directory.rs index ed0f31c872..b9e479f7fa 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/directory.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/directory.rs @@ -1,6 +1,9 @@ use super::{metrics::Metrics, record::Record, Metadata, Reservation}; use crate::{ - authenticated::lookup::{actors::tracker::ingress::Releaser, metrics}, + authenticated::{ + lookup::{actors::tracker::ingress::Releaser, metrics}, + Attempt, + }, types::Address, Ingress, }; @@ -21,6 +24,15 @@ use std::{ }; use tracing::{debug, warn}; +/// IPs that the listener should filter on. +#[derive(Debug, Clone, Default)] +pub struct ListenableIps { + /// IPs that are allowed to connect (eligible peers). + pub allowed: HashSet, + /// IPs that belong to blocked peers (should be rejected with blocked metric). + pub blocked: HashSet, +} + /// Configuration for the [Directory]. pub struct Config { /// Whether private IPs are connectable. @@ -279,30 +291,67 @@ impl Directory { result } - /// Returns true if this peer is acceptable (can accept an incoming connection from them). + /// Returns the acceptance status for this peer (can accept an incoming connection from them). /// - /// Checks eligibility (peer set membership), blocked status, egress IP match (if not bypass_ip_check), - /// and connection status. - pub fn acceptable(&self, peer: &C, source_ip: IpAddr) -> bool { - !self.blocked.contains(peer) - && self - .peers - .get(peer) - .is_some_and(|record| record.acceptable(source_ip, self.bypass_ip_check)) + /// Checks blocked status first, then delegates to record for eligibility, IP match, + /// and connection status. Increments metrics for rejections. + pub fn acceptable(&self, peer: &C, source_ip: IpAddr) -> Attempt { + if self.blocked.contains(peer) { + self.metrics.rejected_blocked.inc(); + debug!(?peer, "peer rejected: blocked"); + return Attempt::Blocked; + } + let result = self + .peers + .get(peer) + .map(|r| r.acceptable(source_ip, self.bypass_ip_check)) + .unwrap_or(Attempt::Unregistered); + match result { + Attempt::Ok => {} + Attempt::Blocked => unreachable!("record does not return Blocked"), + Attempt::Unregistered => { + self.metrics.rejected_unregistered.inc(); + debug!(?peer, "peer rejected: unregistered or ineligible"); + } + Attempt::Reserved => { + self.metrics.rejected_reserved.inc(); + debug!(?peer, "peer rejected: already connected"); + } + Attempt::Mismatch => { + self.metrics.rejected_mismatch.inc(); + debug!(?peer, "peer rejected: data mismatch"); + } + Attempt::Myself => { + self.metrics.rejected_myself.inc(); + debug!(?peer, "peer rejected: is ourselves"); + } + } + result } - /// Return egress IPs we should listen for (accept incoming connections from). + /// Return IPs the listener should filter on. /// - /// Only includes IPs from peers that are: - /// - Currently eligible (not blocked, in a peer set) - /// - Have a valid egress IP (global, or private IPs are allowed) - pub fn listenable(&self) -> HashSet { - self.peers + /// Returns both allowed IPs (eligible peers) and blocked IPs (for proper metric tracking). + pub fn listenable(&self) -> ListenableIps { + let is_valid_ip = |ip: &IpAddr| self.allow_private_ips || IpAddrExt::is_global(ip); + + let allowed = self + .peers .iter() .filter(|(peer, r)| !self.blocked.contains(peer) && r.eligible()) .filter_map(|(_, r)| r.egress_ip()) - .filter(|ip| self.allow_private_ips || IpAddrExt::is_global(ip)) - .collect() + .filter(is_valid_ip) + .collect(); + + let blocked = self + .blocked + .iter() + .filter_map(|(peer, _)| self.peers.get(peer)) + .filter_map(|r| r.egress_ip()) + .filter(is_valid_ip) + .collect(); + + ListenableIps { allowed, blocked } } /// Unblock all peers whose block has expired. @@ -399,7 +448,9 @@ impl Directory { #[cfg(test)] mod tests { use crate::{ - authenticated::{lookup::actors::tracker::directory::Directory, mailbox::UnboundedMailbox}, + authenticated::{ + lookup::actors::tracker::directory::Directory, mailbox::UnboundedMailbox, Attempt, + }, types::Address, Ingress, }; @@ -720,16 +771,16 @@ mod tests { // Verify listenable() returns egress IPs for IP filtering let listenable = directory.listenable(); assert!( - listenable.contains(&egress_socket.ip()), - "Listenable should contain peer 1's egress IP" + listenable.allowed.contains(&egress_socket.ip()), + "Listenable should contain peer 1's egress IP in allowed" ); assert!( - listenable.contains(&egress_socket_2.ip()), - "Listenable should contain peer 2's egress IP" + listenable.allowed.contains(&egress_socket_2.ip()), + "Listenable should contain peer 2's egress IP in allowed" ); assert!( - !listenable.contains(&ingress_socket.ip()), - "Listenable should NOT contain peer 1's ingress IP" + !listenable.allowed.contains(&ingress_socket.ip()), + "Listenable should NOT contain peer 1's ingress IP in allowed" ); }); } @@ -863,8 +914,12 @@ mod tests { // Verify listenable() only returns public IP (private IP excluded from filter) let listenable = directory.listenable(); - assert!(listenable.contains(&Ipv4Addr::new(8, 8, 8, 8).into())); - assert!(!listenable.contains(&Ipv4Addr::new(10, 0, 0, 1).into())); + assert!(listenable + .allowed + .contains(&Ipv4Addr::new(8, 8, 8, 8).into())); + assert!(!listenable + .allowed + .contains(&Ipv4Addr::new(10, 0, 0, 1).into())); }); } @@ -901,31 +956,39 @@ mod tests { .unwrap(), ); - // Both peers eligible: IP should be in listenable set + // Both peers eligible: IP should be in allowed set let listenable = directory.listenable(); assert!( - listenable.contains(&shared_ip), - "IP should be listenable when both peers are eligible" + listenable.allowed.contains(&shared_ip), + "IP should be in allowed when both peers are eligible" + ); + assert!( + !listenable.blocked.contains(&shared_ip), + "IP should not be in blocked when both peers are eligible" ); // Block one peer directory.block(&pk_1); - // One eligible, one blocked: IP should still be listenable + // One eligible, one blocked: IP should still be in allowed (eligible wins) let listenable = directory.listenable(); assert!( - listenable.contains(&shared_ip), - "IP should be listenable when at least one peer is eligible" + listenable.allowed.contains(&shared_ip), + "IP should be in allowed when at least one peer is eligible" ); // Block the other peer directory.block(&pk_2); - // Both blocked: IP should NOT be in listenable set + // Both blocked: IP should be in blocked, not allowed let listenable = directory.listenable(); assert!( - !listenable.contains(&shared_ip), - "IP should not be listenable when all peers are blocked" + !listenable.allowed.contains(&shared_ip), + "IP should not be in allowed when all peers are blocked" + ); + assert!( + listenable.blocked.contains(&shared_ip), + "IP should be in blocked when all peers are blocked" ); }); } @@ -957,10 +1020,15 @@ mod tests { // Block the peer directory.block(&pk_1); - // Verify peer is blocked and not listenable + // Verify peer is blocked and not in allowed (but is in blocked) + let listenable = directory.listenable(); + assert!( + !listenable.allowed.contains(&addr_1.ip()), + "Blocked peer should not be in allowed" + ); assert!( - !directory.listenable().contains(&addr_1.ip()), - "Blocked peer should not be listenable" + listenable.blocked.contains(&addr_1.ip()), + "Blocked peer should be in blocked" ); // Verify peer is blocked @@ -978,10 +1046,15 @@ mod tests { // Now unblock_expired should unblock the peer assert!(directory.unblock_expired(), "Should have unblocked a peer"); - // Verify peer is now listenable + // Verify peer is now listenable (in allowed, not in blocked) + let listenable = directory.listenable(); + assert!( + listenable.allowed.contains(&addr_1.ip()), + "Unblocked peer should be in allowed" + ); assert!( - directory.listenable().contains(&addr_1.ip()), - "Unblocked peer should be listenable" + !listenable.blocked.contains(&addr_1.ip()), + "Unblocked peer should not be in blocked" ); // Verify no more blocked peers @@ -1392,8 +1465,9 @@ mod tests { directory.add_set(0, [(pk_1.clone(), addr(addr_1))].try_into().unwrap()); // Peer should be acceptable before blocking - assert!( + assert_eq!( directory.acceptable(&pk_1, addr_1.ip()), + Attempt::Ok, "Peer should be acceptable before blocking" ); @@ -1401,8 +1475,9 @@ mod tests { directory.block(&pk_1); // Peer should NOT be acceptable while blocked - assert!( - !directory.acceptable(&pk_1, addr_1.ip()), + assert_eq!( + directory.acceptable(&pk_1, addr_1.ip()), + Attempt::Blocked, "Blocked peer should not be acceptable" ); @@ -1411,8 +1486,9 @@ mod tests { directory.unblock_expired(); // Peer should be acceptable again after unblock - assert!( + assert_eq!( directory.acceptable(&pk_1, addr_1.ip()), + Attempt::Ok, "Peer should be acceptable after unblock" ); }); @@ -1442,29 +1518,44 @@ mod tests { // Add peer to a set directory.add_set(0, [(pk_1.clone(), addr(addr_1))].try_into().unwrap()); - // Peer's IP should be listenable before blocking + // Peer's IP should be in allowed before blocking + let listenable = directory.listenable(); assert!( - directory.listenable().contains(&addr_1.ip()), - "Peer's IP should be listenable before blocking" + listenable.allowed.contains(&addr_1.ip()), + "Peer's IP should be in allowed before blocking" + ); + assert!( + !listenable.blocked.contains(&addr_1.ip()), + "Peer's IP should not be in blocked before blocking" ); // Block the peer directory.block(&pk_1); - // Peer's IP should NOT be listenable while blocked + // Peer's IP should be in blocked, not allowed + let listenable = directory.listenable(); assert!( - !directory.listenable().contains(&addr_1.ip()), - "Blocked peer's IP should not be listenable" + !listenable.allowed.contains(&addr_1.ip()), + "Blocked peer's IP should not be in allowed" + ); + assert!( + listenable.blocked.contains(&addr_1.ip()), + "Blocked peer's IP should be in blocked" ); // Advance time and unblock context.sleep(block_duration + Duration::from_secs(1)).await; directory.unblock_expired(); - // Peer's IP should be listenable again after unblock + // Peer's IP should be back in allowed after unblock + let listenable = directory.listenable(); + assert!( + listenable.allowed.contains(&addr_1.ip()), + "Peer's IP should be in allowed after unblock" + ); assert!( - directory.listenable().contains(&addr_1.ip()), - "Peer's IP should be listenable after unblock" + !listenable.blocked.contains(&addr_1.ip()), + "Peer's IP should not be in blocked after unblock" ); }); } diff --git a/p2p/src/authenticated/lookup/actors/tracker/ingress.rs b/p2p/src/authenticated/lookup/actors/tracker/ingress.rs index 99a6df0c5a..3ccf12a4cc 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/ingress.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/ingress.rs @@ -3,7 +3,7 @@ use crate::{ authenticated::{ lookup::actors::{peer, tracker::Metadata}, mailbox::UnboundedMailbox, - Mailbox, + Attempt, Mailbox, }, types::Address, Ingress, @@ -79,8 +79,8 @@ pub enum Message { /// The IP address the peer connected from. source_ip: IpAddr, - /// The sender to respond with whether the peer is acceptable. - responder: oneshot::Sender, + /// The sender to respond with the acceptance result. + responder: oneshot::Sender, }, /// Request a reservation for a particular peer. @@ -129,7 +129,7 @@ impl UnboundedMailbox> { } /// Send an `Acceptable` message to the tracker. - pub async fn acceptable(&mut self, public_key: C, source_ip: IpAddr) -> bool { + pub async fn acceptable(&mut self, public_key: C, source_ip: IpAddr) -> Attempt { let (tx, rx) = oneshot::channel(); self.send(Message::Acceptable { public_key, diff --git a/p2p/src/authenticated/lookup/actors/tracker/metrics.rs b/p2p/src/authenticated/lookup/actors/tracker/metrics.rs index 1dad93fa38..210115ec3a 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/metrics.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/metrics.rs @@ -20,6 +20,21 @@ pub struct Metrics { /// A count of the number of updates for each peer. pub updates: Family, + + /// Number of times a peer was rejected because they were blocked. + pub rejected_blocked: Counter, + + /// Number of times a peer was rejected because they were unregistered. + pub rejected_unregistered: Counter, + + /// Number of times a peer was rejected because they were already connected. + pub rejected_reserved: Counter, + + /// Number of times a peer was rejected because of data mismatch (e.g., IP). + pub rejected_mismatch: Counter, + + /// Number of times a peer was rejected because they are ourselves. + pub rejected_myself: Counter, } impl Metrics { @@ -51,6 +66,31 @@ impl Metrics { "Count of the number of updates for each peer", metrics.updates.clone(), ); + context.register( + "rejected_blocked", + "Number of times a peer was rejected because they were blocked", + metrics.rejected_blocked.clone(), + ); + context.register( + "rejected_unregistered", + "Number of times a peer was rejected because they were unregistered", + metrics.rejected_unregistered.clone(), + ); + context.register( + "rejected_reserved", + "Number of times a peer was rejected because they were already connected", + metrics.rejected_reserved.clone(), + ); + context.register( + "rejected_mismatch", + "Number of times a peer was rejected because of data mismatch", + metrics.rejected_mismatch.clone(), + ); + context.register( + "rejected_myself", + "Number of times a peer was rejected because they are ourselves", + metrics.rejected_myself.clone(), + ); metrics } } diff --git a/p2p/src/authenticated/lookup/actors/tracker/mod.rs b/p2p/src/authenticated/lookup/actors/tracker/mod.rs index 197545a472..c09a1e3d8d 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/mod.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/mod.rs @@ -3,10 +3,10 @@ use crate::authenticated::Mailbox; use commonware_cryptography::Signer; use commonware_runtime::Quota; -use std::{collections::HashSet, net::IpAddr, time::Duration}; +use std::time::Duration; pub mod actor; -mod directory; +pub mod directory; mod ingress; mod metadata; mod metrics; @@ -14,6 +14,7 @@ mod record; mod reservation; pub use actor::Actor; +pub use directory::ListenableIps; pub use ingress::{Message, Oracle}; pub use metadata::Metadata; pub use reservation::Reservation; @@ -26,6 +27,6 @@ pub struct Config { pub allow_private_ips: bool, pub allow_dns: bool, pub bypass_ip_check: bool, - pub listener: Mailbox>, + pub listener: Mailbox, pub block_duration: Duration, } diff --git a/p2p/src/authenticated/lookup/actors/tracker/record.rs b/p2p/src/authenticated/lookup/actors/tracker/record.rs index bb9c0bf504..bffea3a7cf 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/record.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/record.rs @@ -1,4 +1,7 @@ -use crate::types::{self, Ingress}; +use crate::{ + authenticated::Attempt, + types::{self, Ingress}, +}; use std::net::IpAddr; /// Represents information known about a peer's address. @@ -151,23 +154,27 @@ impl Record { ingress.is_valid(allow_private_ips, allow_dns) } - /// Returns `true` if this peer is acceptable (can accept an incoming connection from them). + /// Returns the acceptance status for this peer (can accept an incoming connection from them). /// - /// A peer is acceptable if: - /// - The peer is eligible (in a peer set, not ourselves) - /// - The source IP matches the expected egress IP for this peer (if not bypass_ip_check) - /// - We are not already connected or reserved - pub fn acceptable(&self, source_ip: IpAddr, bypass_ip_check: bool) -> bool { - if !self.eligible() || self.status != Status::Inert { - return false; + /// Checks for self, eligibility (peer set membership), egress IP match (if not bypass_ip_check), + /// and connection status. + pub fn acceptable(&self, source_ip: IpAddr, bypass_ip_check: bool) -> Attempt { + if matches!(self.address, Address::Myself) { + return Attempt::Myself; } - if bypass_ip_check { - return true; + if !self.eligible() { + return Attempt::Unregistered; } - match &self.address { - Address::Known(addr) => addr.egress_ip() == source_ip, - Address::Myself => false, + if !bypass_ip_check { + let ip_matches = self.egress_ip().is_some_and(|ip| ip == source_ip); + if !ip_matches { + return Attempt::Mismatch; + } } + if self.reserved() { + return Attempt::Reserved; + } + Attempt::Ok } /// Return the ingress address for dialing, if known. @@ -391,6 +398,7 @@ mod tests { #[test] fn test_acceptable_checks_eligibility_status_and_ip() { + use crate::authenticated::Attempt; use std::net::IpAddr; let egress_ip: IpAddr = [8, 8, 8, 8].into(); @@ -400,46 +408,42 @@ mod tests { // Eligible, Inert, and correct IP - acceptable let mut record = Record::known(types::Address::Symmetric(public_socket)); record.increment(); - assert!( - record.acceptable(egress_ip, false), - "Eligible, Inert, correct IP is acceptable" - ); + assert_eq!(record.acceptable(egress_ip, false), Attempt::Ok); - // Correct everything but wrong IP - not acceptable - assert!( - !record.acceptable(wrong_ip, false), - "Not acceptable when IP doesn't match" - ); + // Correct everything but wrong IP - mismatch + assert_eq!(record.acceptable(wrong_ip, false), Attempt::Mismatch); - // Not eligible (sets=0) - not acceptable + // Not eligible (sets=0) - unregistered let record_not_eligible = Record::known(types::Address::Symmetric(public_socket)); - assert!( - !record_not_eligible.acceptable(egress_ip, false), - "Not acceptable when not eligible" + assert_eq!( + record_not_eligible.acceptable(egress_ip, false), + Attempt::Unregistered ); - // Already reserved - not acceptable + // Already reserved - reserved let mut record_reserved = Record::known(types::Address::Symmetric(public_socket)); record_reserved.increment(); record_reserved.reserve(); - assert!( - !record_reserved.acceptable(egress_ip, false), - "Not acceptable when reserved" + assert_eq!( + record_reserved.acceptable(egress_ip, false), + Attempt::Reserved ); - // Already connected - not acceptable + // Already connected - reserved let mut record_connected = Record::known(types::Address::Symmetric(public_socket)); record_connected.increment(); record_connected.reserve(); record_connected.connect(); - assert!( - !record_connected.acceptable(egress_ip, false), - "Not acceptable when connected" + assert_eq!( + record_connected.acceptable(egress_ip, false), + Attempt::Reserved ); } #[test] fn test_acceptable_bypass_ip_check() { + use crate::authenticated::Attempt; + let egress_ip: IpAddr = [8, 8, 8, 8].into(); let wrong_ip: IpAddr = [1, 2, 3, 4].into(); let public_socket = SocketAddr::from(([8, 8, 8, 8], 8080)); @@ -447,25 +451,22 @@ mod tests { // With bypass_ip_check=true, accepts even with wrong IP (skips IP check) let mut record = Record::known(types::Address::Symmetric(public_socket)); record.increment(); - assert!( - record.acceptable(wrong_ip, true), - "Acceptable with wrong IP when bypass_ip_check=true" - ); + assert_eq!(record.acceptable(wrong_ip, true), Attempt::Ok); // Still requires eligible (sets > 0), even with bypass_ip_check=true let record_not_eligible = Record::known(types::Address::Symmetric(public_socket)); - assert!( - !record_not_eligible.acceptable(egress_ip, true), - "Not acceptable when not eligible (sets=0), even with bypass_ip_check=true" + assert_eq!( + record_not_eligible.acceptable(egress_ip, true), + Attempt::Unregistered ); // Still not acceptable when reserved let mut record_reserved = Record::known(types::Address::Symmetric(public_socket)); record_reserved.increment(); record_reserved.reserve(); - assert!( - !record_reserved.acceptable(egress_ip, true), - "Not acceptable when reserved" + assert_eq!( + record_reserved.acceptable(egress_ip, true), + Attempt::Reserved ); // Still not acceptable when connected @@ -473,17 +474,14 @@ mod tests { record_connected.increment(); record_connected.reserve(); record_connected.connect(); - assert!( - !record_connected.acceptable(egress_ip, true), - "Not acceptable when connected" + assert_eq!( + record_connected.acceptable(egress_ip, true), + Attempt::Reserved ); // Still not acceptable when myself let record_myself = Record::myself(); - assert!( - !record_myself.acceptable(egress_ip, true), - "Not acceptable when myself" - ); + assert_eq!(record_myself.acceptable(egress_ip, true), Attempt::Myself); } #[test] diff --git a/p2p/src/authenticated/lookup/network.rs b/p2p/src/authenticated/lookup/network.rs index c97ffbcbb3..08d8320345 100644 --- a/p2p/src/authenticated/lookup/network.rs +++ b/p2p/src/authenticated/lookup/network.rs @@ -19,7 +19,6 @@ use commonware_stream::Config as StreamConfig; use commonware_utils::union; use futures::channel::mpsc; use rand_core::CryptoRngCore; -use std::{collections::HashSet, net::IpAddr}; use tracing::{debug, info}; /// Unique suffix for all messages signed in a stream. @@ -35,7 +34,7 @@ pub struct Network>, router: router::Actor, router_mailbox: Mailbox>, - listener: mpsc::Receiver>, + listener: mpsc::Receiver, } impl Network { @@ -50,7 +49,7 @@ impl) -> (Self, tracker::Oracle) { - let (listener_mailbox, listener) = Mailbox::>::new(cfg.mailbox_size); + let (listener_mailbox, listener) = Mailbox::::new(cfg.mailbox_size); let (tracker, tracker_mailbox, oracle) = tracker::Actor::new( context.with_label("tracker"), tracker::Config { diff --git a/p2p/src/authenticated/mod.rs b/p2p/src/authenticated/mod.rs index 73c4c6b8cd..c181d14f35 100644 --- a/p2p/src/authenticated/mod.rs +++ b/p2p/src/authenticated/mod.rs @@ -13,3 +13,20 @@ pub mod lookup; mod mailbox; pub use mailbox::Mailbox; mod relay; + +/// Result of checking if a peer is acceptable for an incoming connection. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum Attempt { + /// Peer is acceptable. + Ok, + /// Peer is explicitly blocked. + Blocked, + /// Peer is not in any tracked peer set (or failed other eligibility checks). + Unregistered, + /// Peer is already connected or has a pending connection. + Reserved, + /// Some expected data doesn't match (e.g., source IP doesn't match expected egress IP). + Mismatch, + /// Peer is ourselves. + Myself, +}