diff --git a/p2p/src/authenticated/discovery/actors/tracker/directory.rs b/p2p/src/authenticated/discovery/actors/tracker/directory.rs index d96cbe3191..14449c363a 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/directory.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/directory.rs @@ -225,10 +225,6 @@ impl Directory { self.metrics.tracked.inc(); Record::unknown() }); - // If peer is blocked (from before they were removed), mark the new record - if self.blocked.is_blocked(peer) { - record.block(); - } record.increment(); set.update(peer, !record.want(self.dial_fail_limit)); } @@ -294,20 +290,20 @@ impl Directory { /// when they are added to a peer set via `add_set`. pub fn block(&mut self, peer: &C) { // Already blocked in queue - if self.blocked.is_blocked(peer) { + if self.blocked.contains(peer) { return; } - // If record exists, use record.block() which handles Myself/Blocked - if let Some(record) = self.peers.get_mut(peer) { - if !record.block() { + // If record exists, check if it's blockable (not Myself) + if let Some(record) = self.peers.get(peer) { + if !record.is_blockable() { return; } } let blocked_until = self.context.current() + self.block_duration; self.blocked.block(peer.clone(), blocked_until); - self.metrics.blocked.inc(); + let _ = self.metrics.blocked.try_set(self.blocked.len()); } // ---------- Getters ---------- @@ -371,16 +367,18 @@ impl Directory { /// /// A peer is eligible if it is in a peer set (or is persistent), not blocked, and not ourselves. pub fn eligible(&self, peer: &C) -> bool { - self.peers.get(peer).is_some_and(|r| r.eligible()) + !self.blocked.contains(peer) && self.peers.get(peer).is_some_and(|r| r.eligible()) } /// Returns a vector of dialable peers. That is, unconnected peers for which we have an ingress. pub fn dialable(&self) -> Vec { - // Collect peers with known addresses + // Collect peers with known addresses that are not blocked let mut result: Vec<_> = self .peers .iter() - .filter(|&(_, r)| r.dialable(self.allow_private_ips, self.allow_dns)) + .filter(|&(peer, r)| { + !self.blocked.contains(peer) && r.dialable(self.allow_private_ips, self.allow_dns) + }) .map(|(peer, _)| peer.clone()) .collect(); result.sort(); @@ -389,21 +387,21 @@ impl Directory { /// Returns true if this peer is acceptable (can accept an incoming connection from them). pub fn acceptable(&self, peer: &C) -> bool { - self.peers.get(peer).is_some_and(|r| r.acceptable()) + !self.blocked.contains(peer) && self.peers.get(peer).is_some_and(|r| r.acceptable()) } /// Unblock all peers whose block has expired and update the knowledge bitmap. pub fn unblock_expired(&mut self) { let now = self.context.current(); let unblocked = self.blocked.unblock_expired(now); + if unblocked.is_empty() { + return; + } + let _ = self.metrics.blocked.try_set(self.blocked.len()); - // Update metrics and clear blocks on records + // Update knowledge bitmaps for peer in unblocked { - self.metrics.blocked.dec(); - if let Some(record) = self.peers.get_mut(&peer) { - record.clear_expired_block(); - - // Update the knowledge bitmap for this peer + if let Some(record) = self.peers.get(&peer) { let want = record.want(self.dial_fail_limit); for set in self.sets.values_mut() { set.update(&peer, !want); @@ -482,7 +480,7 @@ mod tests { use crate::authenticated::{discovery::types, mailbox::UnboundedMailbox}; use commonware_cryptography::{secp256r1::standard::PrivateKey, Signer}; use commonware_runtime::{deterministic, Clock, Runner}; - use commonware_utils::NZU32; + use commonware_utils::{bitmap::BitMap, NZU32}; use std::net::SocketAddr; const NAMESPACE: &[u8] = b"test"; @@ -594,13 +592,13 @@ mod tests { let peer_set: OrderedSet<_> = [unknown_pk.clone()].into_iter().try_collect().unwrap(); directory.add_set(0, peer_set); - // Peer should now be in peers and blocked + // Peer should now be in peers and blocked (via blocked::Queue) assert!( directory.peers.contains_key(&unknown_pk), "Peer should be in peers after add_set" ); assert!( - directory.peers.get(&unknown_pk).unwrap().is_blocked(), + directory.blocked.contains(&unknown_pk), "Peer should be blocked after add_set" ); @@ -654,7 +652,8 @@ mod tests { let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser); // Register a peer - let peer_set: OrderedSet<_> = [registered_pk.clone()].into_iter().try_collect().unwrap(); + let peer_set: OrderedSet<_> = + [registered_pk.clone()].into_iter().try_collect().unwrap(); directory.add_set(0, peer_set); assert_eq!(directory.metrics.blocked.get(), 0); @@ -695,4 +694,634 @@ mod tests { ); }); } + + #[test] + fn test_blocked_peer_remains_blocked_on_update() { + let runtime = deterministic::Runner::default(); + let signer = PrivateKey::from_seed(0); + let my_info = create_myself_info(&signer, test_socket(), 100); + let peer_signer = PrivateKey::from_seed(1); + let peer_pk = peer_signer.public_key(); + let (tx, _rx) = UnboundedMailbox::new(); + let releaser = Releaser::new(tx); + let block_duration = Duration::from_secs(100); + let config = Config { + allow_private_ips: true, + allow_dns: true, + max_sets: 3, + dial_fail_limit: 1, + rate_limit: Quota::per_second(NZU32!(10)), + block_duration, + }; + + runtime.start(|context| async move { + let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser); + + // Add peer to a set + let peer_set: OrderedSet<_> = [peer_pk.clone()].into_iter().try_collect().unwrap(); + directory.add_set(0, peer_set); + + // Block the peer + directory.block(&peer_pk); + assert!( + directory.blocked.contains(&peer_pk), + "Peer should be blocked after call to block" + ); + + // Update with peer info while blocked + let peer_info = types::Info::sign(&peer_signer, NAMESPACE, test_socket(), 200); + directory.update_peers(vec![peer_info.clone()]); + + // Peer should still be blocked + assert!( + directory.blocked.contains(&peer_pk), + "Peer should remain blocked after update" + ); + + // But info should be updated + let record = directory.peers.get(&peer_pk).unwrap(); + assert!( + record.ingress().is_some(), + "Peer info should be updated while blocked" + ); + + // Advance time past block duration and unblock + context.sleep(block_duration + Duration::from_secs(1)).await; + directory.unblock_expired(); + + // Verify the peer is unblocked with the updated info + assert!( + !directory.blocked.contains(&peer_pk), + "Peer should be unblocked after expiry" + ); + let record = directory.peers.get(&peer_pk).unwrap(); + assert!( + record.ingress().is_some(), + "Unblocked peer should have the updated info" + ); + }); + } + + #[test] + fn test_unblock_expired() { + let runtime = deterministic::Runner::default(); + let signer = PrivateKey::from_seed(0); + let my_info = create_myself_info(&signer, test_socket(), 100); + let peer_pk = PrivateKey::from_seed(1).public_key(); + let (tx, _rx) = UnboundedMailbox::new(); + let releaser = Releaser::new(tx); + let block_duration = Duration::from_secs(100); + let config = Config { + allow_private_ips: true, + allow_dns: true, + max_sets: 3, + dial_fail_limit: 1, + rate_limit: Quota::per_second(NZU32!(10)), + block_duration, + }; + + runtime.start(|context| async move { + let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser); + + // Add peer to a set + let peer_set: OrderedSet<_> = [peer_pk.clone()].into_iter().try_collect().unwrap(); + directory.add_set(0, peer_set); + + // Block the peer + directory.block(&peer_pk); + assert!(directory.blocked.contains(&peer_pk)); + + // Verify next_unblock_deadline is set + let deadline = directory.next_unblock_deadline(); + assert!(deadline.is_some(), "Should have an unblock deadline"); + + // unblock_expired should do nothing before expiry + directory.unblock_expired(); + assert!( + directory.blocked.contains(&peer_pk), + "Peer should still be blocked before expiry" + ); + + // Advance time past block duration + context.sleep(block_duration + Duration::from_secs(1)).await; + + // Now unblock_expired should unblock the peer + directory.unblock_expired(); + assert!( + !directory.blocked.contains(&peer_pk), + "Peer should be unblocked after expiry" + ); + + // Verify next_unblock_deadline is now None + assert!( + directory.next_unblock_deadline().is_none(), + "No more blocked peers, no deadline" + ); + }); + } + + #[test] + fn test_unblock_expired_peer_removed_and_readded() { + let runtime = deterministic::Runner::default(); + let signer = PrivateKey::from_seed(0); + let my_info = create_myself_info(&signer, test_socket(), 100); + let pk_1 = PrivateKey::from_seed(1).public_key(); + let pk_2 = PrivateKey::from_seed(2).public_key(); + let (tx, _rx) = UnboundedMailbox::new(); + let releaser = Releaser::new(tx); + let block_duration = Duration::from_secs(100); + let config = Config { + allow_private_ips: true, + allow_dns: true, + max_sets: 1, // Only keep 1 set so we can evict peers + dial_fail_limit: 1, + rate_limit: Quota::per_second(NZU32!(10)), + block_duration, + }; + + runtime.start(|context| async move { + let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser); + + // Initially no blocked peers + assert_eq!(directory.metrics.blocked.get(), 0); + + // Add pk_1 and block it + let peer_set: OrderedSet<_> = [pk_1.clone()].into_iter().try_collect().unwrap(); + directory.add_set(0, peer_set); + directory.block(&pk_1); + assert!(directory.blocked.contains(&pk_1)); + assert_eq!(directory.metrics.blocked.get(), 1); + + // Add a new set that evicts pk_1 (max_sets=1) + // The blocked metric should remain 1 since the block persists + let peer_set_2: OrderedSet<_> = [pk_2.clone()].into_iter().try_collect().unwrap(); + directory.add_set(1, peer_set_2); + assert!( + !directory.peers.contains_key(&pk_1), + "pk_1 should be removed" + ); + assert_eq!( + directory.metrics.blocked.get(), + 1, + "blocked metric should still be 1 after peer removal" + ); + + // Re-add pk_1 - should still be blocked because block persists + let peer_set_3: OrderedSet<_> = [pk_1.clone()].into_iter().try_collect().unwrap(); + directory.add_set(2, peer_set_3); + assert!( + directory.blocked.contains(&pk_1), + "Re-added pk_1 should still be blocked" + ); + assert_eq!( + directory.metrics.blocked.get(), + 1, + "blocked metric should still be 1 after re-add" + ); + + // Advance time past block duration + context.sleep(block_duration + Duration::from_secs(1)).await; + + // Now unblock_expired should unblock pk_1 + directory.unblock_expired(); + assert!( + !directory.blocked.contains(&pk_1), + "pk_1 should no longer be blocked" + ); + assert_eq!( + directory.metrics.blocked.get(), + 0, + "blocked metric should be 0 after unblock" + ); + }); + } + + #[test] + fn test_blocked_metric_multiple_peers() { + let runtime = deterministic::Runner::default(); + let signer = PrivateKey::from_seed(0); + let my_info = create_myself_info(&signer, test_socket(), 100); + let pk_1 = PrivateKey::from_seed(1).public_key(); + let pk_2 = PrivateKey::from_seed(2).public_key(); + let pk_3 = PrivateKey::from_seed(3).public_key(); + let (tx, _rx) = UnboundedMailbox::new(); + let releaser = Releaser::new(tx); + let block_duration = Duration::from_secs(100); + let config = Config { + allow_private_ips: true, + allow_dns: true, + max_sets: 3, + dial_fail_limit: 1, + rate_limit: Quota::per_second(NZU32!(10)), + block_duration, + }; + + runtime.start(|context| async move { + let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser); + + // Add all peers + let peer_set: OrderedSet<_> = [pk_1.clone(), pk_2.clone(), pk_3.clone()] + .into_iter() + .try_collect() + .unwrap(); + directory.add_set(0, peer_set); + assert_eq!(directory.metrics.blocked.get(), 0); + + // Block all three peers + directory.block(&pk_1); + assert_eq!(directory.metrics.blocked.get(), 1); + directory.block(&pk_2); + assert_eq!(directory.metrics.blocked.get(), 2); + directory.block(&pk_3); + assert_eq!(directory.metrics.blocked.get(), 3); + + // Blocking again should not increment + directory.block(&pk_1); + assert_eq!(directory.metrics.blocked.get(), 3); + + // Advance time and unblock all + context.sleep(block_duration + Duration::from_secs(1)).await; + directory.unblock_expired(); + assert_eq!(directory.metrics.blocked.get(), 0); + }); + } + + #[test] + fn test_blocked_peer_not_dialable() { + let runtime = deterministic::Runner::default(); + let signer = PrivateKey::from_seed(0); + let my_info = create_myself_info(&signer, test_socket(), 100); + let peer_signer = PrivateKey::from_seed(1); + let peer_pk = peer_signer.public_key(); + let (tx, _rx) = UnboundedMailbox::new(); + let releaser = Releaser::new(tx); + let block_duration = Duration::from_secs(100); + let config = Config { + allow_private_ips: true, + allow_dns: true, + max_sets: 3, + dial_fail_limit: 1, + rate_limit: Quota::per_second(NZU32!(10)), + block_duration, + }; + + runtime.start(|context| async move { + let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser); + + // Add peer to a set + let peer_set: OrderedSet<_> = [peer_pk.clone()].into_iter().try_collect().unwrap(); + directory.add_set(0, peer_set); + + // Update with peer info so it has a dialable address + let peer_info = types::Info::sign(&peer_signer, NAMESPACE, test_socket(), 200); + directory.update_peers(vec![peer_info]); + + // Peer should be dialable before blocking + assert!( + directory.dialable().contains(&peer_pk), + "Peer should be dialable before blocking" + ); + + // Block the peer + directory.block(&peer_pk); + + // Peer should NOT be dialable while blocked + assert!( + !directory.dialable().contains(&peer_pk), + "Blocked peer should not be dialable" + ); + + // Advance time and unblock + context.sleep(block_duration + Duration::from_secs(1)).await; + directory.unblock_expired(); + + // Peer should be dialable again after unblock + assert!( + directory.dialable().contains(&peer_pk), + "Peer should be dialable after unblock" + ); + }); + } + + #[test] + fn test_blocked_peer_not_acceptable() { + let runtime = deterministic::Runner::default(); + let signer = PrivateKey::from_seed(0); + let my_info = create_myself_info(&signer, test_socket(), 100); + let peer_signer = PrivateKey::from_seed(1); + let peer_pk = peer_signer.public_key(); + let (tx, _rx) = UnboundedMailbox::new(); + let releaser = Releaser::new(tx); + let block_duration = Duration::from_secs(100); + let config = Config { + allow_private_ips: true, + allow_dns: true, + max_sets: 3, + dial_fail_limit: 1, + rate_limit: Quota::per_second(NZU32!(10)), + block_duration, + }; + + runtime.start(|context| async move { + let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser); + + // Add peer to a set + let peer_set: OrderedSet<_> = [peer_pk.clone()].into_iter().try_collect().unwrap(); + directory.add_set(0, peer_set); + + // Update with peer info + let peer_info = types::Info::sign(&peer_signer, NAMESPACE, test_socket(), 200); + directory.update_peers(vec![peer_info]); + + // Peer should be acceptable before blocking + assert!( + directory.acceptable(&peer_pk), + "Peer should be acceptable before blocking" + ); + + // Block the peer + directory.block(&peer_pk); + + // Peer should NOT be acceptable while blocked + assert!( + !directory.acceptable(&peer_pk), + "Blocked peer should not be acceptable" + ); + + // Advance time and unblock + context.sleep(block_duration + Duration::from_secs(1)).await; + directory.unblock_expired(); + + // Peer should be acceptable again after unblock + assert!( + directory.acceptable(&peer_pk), + "Peer should be acceptable after unblock" + ); + }); + } + + #[test] + fn test_blocked_peer_not_eligible() { + let runtime = deterministic::Runner::default(); + let signer = PrivateKey::from_seed(0); + let my_info = create_myself_info(&signer, test_socket(), 100); + let peer_pk = PrivateKey::from_seed(1).public_key(); + let (tx, _rx) = UnboundedMailbox::new(); + let releaser = Releaser::new(tx); + let block_duration = Duration::from_secs(100); + let config = Config { + allow_private_ips: true, + allow_dns: true, + max_sets: 3, + dial_fail_limit: 1, + rate_limit: Quota::per_second(NZU32!(10)), + block_duration, + }; + + runtime.start(|context| async move { + let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser); + + // Add peer to a set + let peer_set: OrderedSet<_> = [peer_pk.clone()].into_iter().try_collect().unwrap(); + directory.add_set(0, peer_set); + + // Peer should be eligible before blocking + assert!( + directory.eligible(&peer_pk), + "Peer should be eligible before blocking" + ); + + // Block the peer + directory.block(&peer_pk); + + // Peer should NOT be eligible while blocked + assert!( + !directory.eligible(&peer_pk), + "Blocked peer should not be eligible" + ); + + // Advance time and unblock + context.sleep(block_duration + Duration::from_secs(1)).await; + directory.unblock_expired(); + + // Peer should be eligible again after unblock + assert!( + directory.eligible(&peer_pk), + "Peer should be eligible after unblock" + ); + }); + } + + #[test] + fn test_blocked_peer_info_not_sharable() { + let runtime = deterministic::Runner::default(); + let signer = PrivateKey::from_seed(0); + let my_info = create_myself_info(&signer, test_socket(), 100); + let peer_signer = PrivateKey::from_seed(1); + let peer_pk = peer_signer.public_key(); + let (tx, _rx) = UnboundedMailbox::new(); + let releaser = Releaser::new(tx); + let block_duration = Duration::from_secs(100); + let config = Config { + allow_private_ips: true, + allow_dns: true, + max_sets: 3, + dial_fail_limit: 1, + rate_limit: Quota::per_second(NZU32!(10)), + block_duration, + }; + + runtime.start(|context| async move { + let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser); + + // Add peer to a set + let peer_set: OrderedSet<_> = [peer_pk.clone()].into_iter().try_collect().unwrap(); + directory.add_set(0, peer_set); + + // Update with peer info + let peer_info = types::Info::sign(&peer_signer, NAMESPACE, test_socket(), 200); + directory.update_peers(vec![peer_info]); + + // Reserve and connect to make peer Active (so info would be sharable) + let reservation = directory.dial(&peer_pk); + assert!(reservation.is_some(), "Should be able to dial peer"); + directory.connect(&peer_pk, true); + + // Verify info is sharable when connected + assert!( + directory.info(&peer_pk).is_some(), + "Connected peer's info should be sharable" + ); + + // Block the peer - this should trigger disconnect (making status Inert) + directory.block(&peer_pk); + + // Release the reservation to simulate the connection being killed + directory.release(Metadata::Dialer( + peer_pk.clone(), + Ingress::Socket(test_socket()), + )); + + // Now info should NOT be sharable (peer is Inert after block/disconnect) + assert!( + directory.info(&peer_pk).is_none(), + "Blocked peer's info should not be sharable after disconnect" + ); + + // Advance time and unblock + context.sleep(block_duration + Duration::from_secs(1)).await; + directory.unblock_expired(); + + // Info still not sharable because peer is not connected + assert!( + directory.info(&peer_pk).is_none(), + "Unblocked but disconnected peer's info should not be sharable" + ); + }); + } + + #[test] + fn test_bootstrapper_remains_persistent_after_blocking() { + let runtime = deterministic::Runner::default(); + let signer = PrivateKey::from_seed(0); + let my_info = create_myself_info(&signer, test_socket(), 100); + let bootstrapper_pk = PrivateKey::from_seed(1).public_key(); + let bootstrapper_ingress = Ingress::Socket(SocketAddr::from(([1, 2, 3, 4], 8080))); + let (tx, _rx) = UnboundedMailbox::new(); + let releaser = Releaser::new(tx); + let block_duration = Duration::from_secs(100); + let config = Config { + allow_private_ips: true, + allow_dns: true, + max_sets: 3, + dial_fail_limit: 1, + rate_limit: Quota::per_second(NZU32!(10)), + block_duration, + }; + + runtime.start(|context| async move { + // Initialize with a bootstrapper + let mut directory = Directory::init( + context.clone(), + vec![(bootstrapper_pk.clone(), bootstrapper_ingress)], + my_info, + config, + releaser, + ); + + // Verify bootstrapper is not deletable (because it's persistent) + let record = directory.peers.get(&bootstrapper_pk).unwrap(); + assert!( + !record.deletable(), + "Bootstrapper should not be deletable (persistent)" + ); + + // Block the bootstrapper + directory.block(&bootstrapper_pk); + assert!( + directory.blocked.contains(&bootstrapper_pk), + "Bootstrapper should be blocked" + ); + + // Verify bootstrapper is STILL not deletable after blocking + // (blocking should NOT change persistence) + let record = directory.peers.get(&bootstrapper_pk).unwrap(); + assert!( + !record.deletable(), + "Bootstrapper should still not be deletable after blocking" + ); + + // Advance time and unblock + context.sleep(block_duration + Duration::from_secs(1)).await; + directory.unblock_expired(); + + // Verify bootstrapper is still not deletable after unblock + let record = directory.peers.get(&bootstrapper_pk).unwrap(); + assert!( + !record.deletable(), + "Bootstrapper should remain not deletable after unblock" + ); + }); + } + + #[test] + fn test_infos_excludes_blocked_peers() { + let runtime = deterministic::Runner::default(); + let signer = PrivateKey::from_seed(0); + let my_info = create_myself_info(&signer, test_socket(), 100); + let peer_signer_1 = PrivateKey::from_seed(1); + let peer_pk_1 = peer_signer_1.public_key(); + let peer_signer_2 = PrivateKey::from_seed(2); + let peer_pk_2 = peer_signer_2.public_key(); + let (tx, _rx) = UnboundedMailbox::new(); + let releaser = Releaser::new(tx); + let block_duration = Duration::from_secs(100); + let config = Config { + allow_private_ips: true, + allow_dns: true, + max_sets: 3, + dial_fail_limit: 1, + rate_limit: Quota::per_second(NZU32!(10)), + block_duration, + }; + + runtime.start(|context| async move { + let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser); + + // Add both peers to a set + let peer_set: OrderedSet<_> = [peer_pk_1.clone(), peer_pk_2.clone()] + .into_iter() + .try_collect() + .unwrap(); + directory.add_set(0, peer_set); + + // Update with peer info for both (use timestamp 0 to pass the epoch_millis filter) + let peer_info_1 = types::Info::sign(&peer_signer_1, NAMESPACE, test_socket(), 0); + let peer_info_2 = types::Info::sign( + &peer_signer_2, + NAMESPACE, + SocketAddr::from(([9, 9, 9, 9], 9090)), + 0, + ); + directory.update_peers(vec![peer_info_1, peer_info_2]); + + // Connect both peers to make them Active (sharable) + let reservation_1 = directory.dial(&peer_pk_1); + assert!(reservation_1.is_some()); + directory.connect(&peer_pk_1, true); + + let reservation_2 = directory.dial(&peer_pk_2); + assert!(reservation_2.is_some()); + directory.connect(&peer_pk_2, true); + + // Create a bit vector requesting info for both peers (bits = false means "want info") + let bit_vec = types::BitVec { + index: 0, + bits: BitMap::zeroes(2), + }; + + // Both peers' info should be returned + let infos = directory.infos(bit_vec.clone()).unwrap(); + assert_eq!(infos.len(), 2, "Should have info for both peers"); + + // Block peer 1 and release their connection + directory.block(&peer_pk_1); + directory.release(Metadata::Dialer( + peer_pk_1.clone(), + Ingress::Socket(test_socket()), + )); + + // Now only peer 2's info should be returned (peer 1 is Inert after disconnect) + let infos = directory.infos(bit_vec).unwrap(); + assert_eq!( + infos.len(), + 1, + "Should only have info for unblocked connected peer" + ); + assert_eq!( + infos[0].public_key, peer_pk_2, + "Returned info should be for peer 2" + ); + }); + } } diff --git a/p2p/src/authenticated/discovery/actors/tracker/record.rs b/p2p/src/authenticated/discovery/actors/tracker/record.rs index 4fe4a0efac..ed25ea317b 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/record.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/record.rs @@ -20,10 +20,6 @@ pub enum Address { /// /// The `usize` indicates the number of times dialing this record has failed. Discovered(Info, usize), - - /// Peer is blocked. - /// Info can be restored when unblocked. - Blocked, } /// Represents the connection status of a peer. @@ -98,7 +94,7 @@ impl Record { /// Returns true if the update was successful. pub fn update(&mut self, info: Info) -> bool { match &self.address { - Address::Myself(_) | Address::Blocked => false, + Address::Myself(_) => false, Address::Unknown | Address::Bootstrapper(_) => { self.address = Address::Discovered(info, 0); true @@ -123,23 +119,6 @@ impl Record { } } - /// Attempt to mark the peer as blocked. - /// - /// Returns `true` if the peer was newly blocked. - /// Returns `false` if the peer was already blocked or is the local node (unblockable). - /// - /// Address info is discarded when blocking so we serve info for honest peers - /// instead. - pub fn block(&mut self) -> bool { - if matches!(self.address, Address::Blocked | Address::Myself(_)) { - return false; - } - self.address = Address::Blocked; - // Misbehaving bootstrappers should be deletable once no longer in any peer sets. - self.persistent = false; - true - } - /// Increase the count of peer sets this peer is part of. pub const fn increment(&mut self) { self.sets = self.sets.checked_add(1).unwrap(); @@ -158,7 +137,7 @@ impl Record { /// /// Returns `true` if the reservation was successful, `false` otherwise. pub const fn reserve(&mut self) -> bool { - if matches!(self.address, Address::Blocked | Address::Myself(_)) { + if matches!(self.address, Address::Myself(_)) { return false; } if matches!(self.status, Status::Inert) { @@ -203,21 +182,14 @@ impl Record { } } - /// Clear the block on this peer, resetting to unknown address. - /// - /// # Panics - /// - /// Panics if the peer is not blocked. - pub fn clear_expired_block(&mut self) { - assert!(self.is_blocked()); - self.address = Address::Unknown; - } - // ---------- Getters ---------- - /// Returns `true` if the record is currently blocked. - pub const fn is_blocked(&self) -> bool { - matches!(self.address, Address::Blocked) + /// Returns `true` if this peer can be blocked. + /// + /// Only `Myself` cannot be blocked. Actual blocked status is tracked + /// by the Directory via blocked::Queue. + pub const fn is_blockable(&self) -> bool { + !matches!(self.address, Address::Myself(_)) } /// Returns the number of peer sets this peer is part of. @@ -258,7 +230,7 @@ impl Record { /// Return the ingress address of the peer, if known. pub const fn ingress(&self) -> Option<&Ingress> { match &self.address { - Address::Unknown | Address::Blocked => None, + Address::Unknown => None, Address::Myself(info) => Some(&info.ingress), Address::Bootstrapper(ingress) => Some(ingress), Address::Discovered(info, _) => Some(&info.ingress), @@ -267,11 +239,9 @@ impl Record { /// Get the peer information if it is sharable. The information is considered sharable if it is /// known and we are connected to the peer. - /// - /// Blocked peers are not sharable so we serve info for honest peers instead. pub fn sharable(&self) -> Option> { match &self.address { - Address::Unknown | Address::Blocked => None, + Address::Unknown => None, Address::Myself(info) => Some(info), Address::Bootstrapper(_) => None, Address::Discovered(info, _) => (self.status == Status::Active).then_some(info), @@ -287,19 +257,13 @@ impl Record { /// Returns `true` if we want to ask for updated peer information for this peer. /// - /// - Returns `false` for `Myself` addresses and blocked peers. + /// - Returns `false` for `Myself` addresses. /// - Returns `true` for addresses for which we don't have peer info. - /// - Returns true for addresses for which we do have peer info if-and-only-if we have failed to + /// - Returns `true` for addresses for which we do have peer info if-and-only-if we have failed to /// dial at least `min_fails` times. pub fn want(&self, min_fails: usize) -> bool { - // Ignore how many sets the peer is part of. - // If the peer is not in any sets, this function is not called anyway. - - // Return true if we either: - // - Don't have signed peer info - // - Are not connected to the peer and have failed dialing it match self.address { - Address::Myself(_) | Address::Blocked => false, + Address::Myself(_) => false, Address::Unknown | Address::Bootstrapper(_) => true, Address::Discovered(_, fails) => self.status != Status::Active && fails >= min_fails, } @@ -313,11 +277,11 @@ impl Record { /// Returns `true` if this peer is eligible for connection. /// /// A peer is eligible if: - /// - It is not blocked or ourselves + /// - It is not ourselves /// - It is part of at least one peer set (or is persistent, e.g., bootstrapper) pub const fn eligible(&self) -> bool { match self.address { - Address::Blocked | Address::Myself(_) => false, + Address::Myself(_) => false, Address::Bootstrapper(_) | Address::Unknown | Address::Discovered(_, _) => { self.sets > 0 || self.persistent } @@ -382,7 +346,6 @@ mod tests { assert!(!record.persistent); assert!(record.ingress().is_none()); assert!(record.sharable().is_none()); - assert!(!record.is_blocked()); assert!(!record.reserved()); assert!(record.want(0), "Should want info for unknown peer"); assert!(record.deletable()); @@ -404,7 +367,6 @@ mod tests { record.sharable().as_ref(), &my_info )); - assert!(!record.is_blocked()); assert!(!record.reserved()); assert!(!record.want(0), "Should not want info for myself"); assert!(!record.deletable()); @@ -422,7 +384,6 @@ mod tests { assert!(record.persistent); assert_eq!(record.ingress(), Some(&ingress)); assert!(record.sharable().is_none()); - assert!(!record.is_blocked()); assert!(!record.reserved()); assert!(record.want(0), "Should want info for bootstrapper"); assert!(!record.deletable()); @@ -503,27 +464,19 @@ mod tests { } #[test] - fn test_update_myself_and_blocked() { + fn test_update_myself() { let my_info = create_peer_info::(0, test_socket(), 100); let mut record_myself = Record::myself(my_info.clone()); let other_info = create_peer_info::(1, test_socket2(), 200); let newer_my_info = create_peer_info::(0, test_socket(), 300); // Cannot update Myself record with other info or newer self info - assert!(!record_myself.update(other_info.clone())); + assert!(!record_myself.update(other_info)); assert!(!record_myself.update(newer_my_info)); assert!( matches!(&record_myself.address, Address::Myself(info) if peer_info_contents_are_equal(info, &my_info)), "Myself record should remain unchanged" ); - - // Cannot update a blocked record (address info is discarded when blocking) - let mut record_blocked = Record::::unknown(); - assert!(record_blocked.block()); - assert!(record_blocked.is_blocked()); - assert!(!record_blocked.update(other_info)); // Update fails - assert!(record_blocked.is_blocked()); // Still blocked - assert!(record_blocked.ingress().is_none()); // No address info } #[test] @@ -598,84 +551,26 @@ mod tests { } #[test] - fn test_block_behavior_and_persistence() { - let sample_peer_info = create_peer_info::(20, test_socket(), 1000); - - // Block an Unknown record - let mut record_unknown = Record::::unknown(); - assert!(!record_unknown.persistent); - assert!(record_unknown.block()); // Newly blocked - assert!(record_unknown.is_blocked()); - // Address is set to Blocked (discarded) - assert!(matches!(record_unknown.address, Address::Blocked)); - assert_eq!(record_unknown.status, Status::Inert); - assert!(!record_unknown.persistent, "Blocking sets persistent=false"); - assert!(!record_unknown.block()); // Already blocked (returns false) - - // Block a Bootstrapper record (initially persistent) - let mut record_boot = Record::::bootstrapper(test_socket()); - assert!(record_boot.persistent); - assert!(record_boot.block()); - assert!(record_boot.is_blocked()); - // Address is set to Blocked (discarded) - assert!(matches!(record_boot.address, Address::Blocked)); - assert!(!record_boot.persistent, "Blocking sets persistent=false"); - - // Block a Discovered record (initially not persistent) - let mut record_disc = Record::::unknown(); - assert!(record_disc.update(sample_peer_info.clone())); - assert!(!record_disc.persistent); - assert!(record_disc.block()); - assert!(record_disc.is_blocked()); - // Address is set to Blocked (discarded) - assert!(matches!(record_disc.address, Address::Blocked)); - assert!(record_disc.ingress().is_none()); // Ingress is discarded - assert!(!record_disc.persistent); - - // Block a Discovered record that came from a Bootstrapper (initially persistent) - let mut record_disc_from_boot = Record::::bootstrapper(test_socket()); - assert!(record_disc_from_boot.update(sample_peer_info.clone())); - assert!(record_disc_from_boot.persistent); - assert!(record_disc_from_boot.block()); - assert!(record_disc_from_boot.is_blocked()); - // Address is set to Blocked (discarded) - assert!(matches!(record_disc_from_boot.address, Address::Blocked)); - assert!( - !record_disc_from_boot.persistent, - "Blocking sets persistent=false" - ); + fn test_is_blockable() { + let my_info = create_peer_info::(0, test_socket(), 100); - // Check status remains unchanged when blocking - let mut record_reserved = Record::::unknown(); - assert!(record_reserved.update(sample_peer_info.clone())); - assert!(record_reserved.reserve()); - assert!(record_reserved.block()); - assert_eq!(record_reserved.status, Status::Reserved); + // Myself is not blockable + let record_myself = Record::myself(my_info); + assert!(!record_myself.is_blockable()); - let mut record_active = Record::::unknown(); - assert!(record_active.update(sample_peer_info)); - assert!(record_active.reserve()); - record_active.connect(); - assert!(record_active.block()); - assert_eq!(record_active.status, Status::Active); - } + // Bootstrapper is blockable + let record_boot = Record::::bootstrapper(test_socket()); + assert!(record_boot.is_blockable()); - #[test] - fn test_block_myself_and_already_blocked() { - let my_info = create_peer_info::(0, test_socket(), 100); - let mut record_myself = Record::myself(my_info.clone()); - assert!(!record_myself.block(), "Cannot block myself"); - assert!( - matches!(&record_myself.address, Address::Myself(info) if peer_info_contents_are_equal(info, &my_info)) - ); + // Unknown is blockable + let record_unknown = Record::::unknown(); + assert!(record_unknown.is_blockable()); - let mut record_to_be_blocked = Record::::unknown(); - assert!(record_to_be_blocked.block()); - assert!( - !record_to_be_blocked.block(), - "Cannot block already blocked peer (returns false)" - ); - assert!(record_to_be_blocked.is_blocked()); + // Discovered is blockable + let peer_info = create_peer_info::(1, test_socket(), 1000); + let mut record_disc = Record::::unknown(); + assert!(record_disc.update(peer_info)); + assert!(record_disc.is_blockable()); } #[test] @@ -750,13 +645,6 @@ mod tests { let record_boot = Record::::bootstrapper(socket); assert!(record_boot.sharable().is_none()); - // Blocked with Discovered address: Sharable if Active - let mut record_blocked = Record::::unknown(); - assert!(record_blocked.update(peer_info_data.clone())); - record_blocked.block(); - // Not sharable because not Active - assert!(record_blocked.sharable().is_none()); - // Discovered but not Active: Not sharable let mut record_disc = Record::::unknown(); assert!(record_disc.update(peer_info_data.clone())); @@ -845,11 +733,6 @@ mod tests { // Myself never wants info assert!(!Record::myself(peer_info.clone()).want(min_fails)); - // Blocked never wants info - let mut blocked = Record::::unknown(); - blocked.block(); - assert!(!blocked.want(min_fails)); - let mut record_disc = Record::::unknown(); assert!(record_disc.update(peer_info)); @@ -927,26 +810,13 @@ mod tests { record.decrement(); // sets = 0 assert!(record.deletable()); // sets = 0, !persistent, Inert - - // Blocking makes a record non-persistent, but deletability still depends on sets/status - let mut record_blocked = Record::::bootstrapper(test_socket()); - assert!(record_blocked.persistent); - record_blocked.increment(); // sets = 1 - assert!(record_blocked.block()); - assert!(!record_blocked.persistent); - assert!(!record_blocked.deletable()); // sets = 1 - record_blocked.decrement(); // sets = 0 - assert!(record_blocked.deletable()); // sets = 0, !persistent, Inert } #[test] fn test_eligible_logic() { let peer_info = create_peer_info::(16, test_socket(), 100); - // Blocked and Myself are never eligible - let mut record_blocked = Record::::unknown(); - record_blocked.block(); - assert!(!record_blocked.eligible()); + // Myself is never eligible assert!(!Record::myself(peer_info.clone()).eligible()); // Persistent records (Bootstrapper) are allowed even with sets=0 @@ -969,30 +839,4 @@ mod tests { record_disc.increment(); // sets = 1 assert!(record_disc.eligible()); // sets > 0 } - - #[test] - fn test_block_expiration() { - let mut record = Record::::unknown(); - - // Initially not blocked - assert!(!record.is_blocked()); - - // Block the peer - assert!(record.block()); - assert!(record.is_blocked()); - assert!(!record.eligible()); - - // After block is cleared - record.clear_expired_block(); - assert!(!record.is_blocked()); - record.increment(); // Need sets > 0 to be eligible - assert!(record.eligible()); - - // Test clear on another record - let mut record2 = Record::::unknown(); - record2.block(); - assert!(record2.is_blocked()); - record2.clear_expired_block(); - assert!(!record2.is_blocked()); - } } diff --git a/p2p/src/authenticated/lookup/actors/tracker/directory.rs b/p2p/src/authenticated/lookup/actors/tracker/directory.rs index 40918646de..9269dd80e8 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/directory.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/directory.rs @@ -165,12 +165,7 @@ impl Directory { } Entry::Vacant(entry) => { self.metrics.tracked.inc(); - let record = entry.insert(Record::known(addr.clone())); - // If peer is blocked (from before they were removed), mark the new record - if self.blocked.is_blocked(peer) { - record.block(); - } - record + entry.insert(Record::known(addr.clone())) } }; record.increment(); @@ -233,20 +228,20 @@ impl Directory { /// when they are added to a peer set via `add_set`. pub fn block(&mut self, peer: &C) { // Already blocked in queue - if self.blocked.is_blocked(peer) { + if self.blocked.contains(peer) { return; } - // If record exists, use record.block() which handles Myself/Blocked - if let Some(record) = self.peers.get_mut(peer) { - if !record.block() { + // If record exists, check if it's blockable (not Myself) + if let Some(record) = self.peers.get(peer) { + if !record.is_blockable() { return; } } let blocked_until = self.context.current() + self.block_duration; self.blocked.block(peer.clone(), blocked_until); - self.metrics.blocked.inc(); + let _ = self.metrics.blocked.try_set(self.blocked.len()); } // ---------- Getters ---------- @@ -267,16 +262,18 @@ impl Directory { /// This does NOT check IP validity - that is done separately for dialing (ingress) /// and accepting (egress). pub fn eligible(&self, peer: &C) -> bool { - self.peers.get(peer).is_some_and(|r| r.eligible()) + !self.blocked.contains(peer) && self.peers.get(peer).is_some_and(|r| r.eligible()) } /// Returns a vector of dialable peers. That is, unconnected peers for which we have a socket. pub fn dialable(&self) -> Vec { - // Collect peers with known addresses + // Collect peers with known addresses (excluding blocked peers) let mut result: Vec<_> = self .peers .iter() - .filter(|&(_, r)| r.dialable(self.allow_private_ips, self.allow_dns)) + .filter(|&(peer, r)| { + !self.blocked.contains(peer) && r.dialable(self.allow_private_ips, self.allow_dns) + }) .map(|(peer, _)| peer.clone()) .collect(); result.sort(); @@ -285,11 +282,14 @@ impl Directory { /// Returns true if this peer is acceptable (can accept an incoming connection from them). /// - /// Checks eligibility (peer set membership), egress IP match (if not bypass_ip_check), and connection status. + /// Checks eligibility (peer set membership), blocked status, egress IP match (if not bypass_ip_check), + /// and connection status. pub fn acceptable(&self, peer: &C, source_ip: IpAddr) -> bool { - self.peers - .get(peer) - .is_some_and(|record| record.acceptable(source_ip, self.bypass_ip_check)) + !self.blocked.contains(peer) + && self + .peers + .get(peer) + .is_some_and(|record| record.acceptable(source_ip, self.bypass_ip_check)) } /// Return egress IPs we should listen for (accept incoming connections from). @@ -299,29 +299,25 @@ impl Directory { /// - Have a valid egress IP (global, or private IPs are allowed) pub fn listenable(&self) -> HashSet { self.peers - .values() - .filter(|r| r.eligible()) - .filter_map(|r| r.egress_ip()) + .iter() + .filter(|(peer, r)| !self.blocked.contains(peer) && r.eligible()) + .filter_map(|(_, r)| r.egress_ip()) .filter(|ip| self.allow_private_ips || IpAddrExt::is_global(ip)) .collect() } /// Unblock all peers whose block has expired. /// - /// Returns the list of peers that were unblocked (for logging/debugging). + /// Returns `true` if any peers were unblocked. pub fn unblock_expired(&mut self) -> bool { let now = self.context.current(); let unblocked = self.blocked.unblock_expired(now); - - // Update metrics and clear blocks on records - for peer in &unblocked { - self.metrics.blocked.dec(); - if let Some(record) = self.peers.get_mut(peer) { - record.clear_expired_block(); - } + if !unblocked.is_empty() { + let _ = self.metrics.blocked.try_set(self.blocked.len()); + true + } else { + false } - - !unblocked.is_empty() } /// Get the next unblock deadline (earliest blocked_until time). @@ -584,28 +580,30 @@ mod tests { directory.add_set(0, [(pk_1.clone(), addr(addr_1))].try_into().unwrap()); directory.block(&pk_1); - let record = directory.peers.get(&pk_1).unwrap(); assert!( - record.is_blocked(), + directory.blocked.contains(&pk_1), "Peer should be blocked after call to block" ); - // Address is preserved in Blocked variant but ingress() returns None for blocked peers - assert!( - record.ingress().is_none(), - "Blocked peer should not expose ingress" + // Address is preserved (blocking is tracked in blocked::Queue) + let record = directory.peers.get(&pk_1).unwrap(); + assert_eq!( + record.ingress(), + Some(Ingress::Socket(addr_1)), + "Record still has address (blocking is at Directory level)" ); // Update the address while blocked directory.add_set(1, [(pk_1.clone(), addr(addr_2))].try_into().unwrap()); - let record = directory.peers.get(&pk_1).unwrap(); assert!( - record.is_blocked(), + directory.blocked.contains(&pk_1), "Blocked peer should remain blocked after update" ); - // Address is still preserved but not exposed - assert!( - record.ingress().is_none(), - "Blocked peer should not expose ingress" + // Address is updated + let record = directory.peers.get(&pk_1).unwrap(); + assert_eq!( + record.ingress(), + Some(Ingress::Socket(addr_2)), + "Record has updated address" ); // Advance time past block duration and unblock @@ -613,15 +611,15 @@ mod tests { directory.unblock_expired(); // Verify the peer is unblocked with the UPDATED address - let record = directory.peers.get(&pk_1).unwrap(); assert!( - !record.is_blocked(), + !directory.blocked.contains(&pk_1), "Peer should be unblocked after expiry" ); + let record = directory.peers.get(&pk_1).unwrap(); assert_eq!( record.ingress(), Some(Ingress::Socket(addr_2)), - "Unblocked peer should have the updated address, not the original" + "Unblocked peer should have the updated address" ); }); } @@ -1013,7 +1011,7 @@ mod tests { // Add pk_1 and block it directory.add_set(0, [(pk_1.clone(), addr(addr_1))].try_into().unwrap()); directory.block(&pk_1); - assert!(directory.peers.get(&pk_1).unwrap().is_blocked()); + assert!(directory.blocked.contains(&pk_1)); assert_eq!(directory.metrics.blocked.get(), 1); // Add a new set that evicts pk_1 (max_sets=1) @@ -1032,7 +1030,7 @@ mod tests { // Re-add pk_1 - should still be blocked because block persists directory.add_set(2, [(pk_1.clone(), addr(addr_1))].try_into().unwrap()); assert!( - directory.peers.get(&pk_1).unwrap().is_blocked(), + directory.blocked.contains(&pk_1), "Re-added pk_1 should still be blocked" ); assert_eq!( @@ -1047,7 +1045,7 @@ mod tests { // Now unblock_expired should unblock pk_1 assert!(directory.unblock_expired()); assert!( - !directory.peers.get(&pk_1).unwrap().is_blocked(), + !directory.blocked.contains(&pk_1), "pk_1 should no longer be blocked" ); assert_eq!( @@ -1216,7 +1214,7 @@ mod tests { "Peer should be in peers after add_set" ); assert!( - directory.peers.get(&unknown_pk).unwrap().is_blocked(), + directory.blocked.contains(&unknown_pk), "Peer should be blocked after add_set" ); @@ -1315,4 +1313,208 @@ mod tests { ); }); } + + #[test] + fn test_blocked_peer_not_dialable() { + let runtime = deterministic::Runner::default(); + let my_pk = ed25519::PrivateKey::from_seed(0).public_key(); + let pk_1 = ed25519::PrivateKey::from_seed(1).public_key(); + let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1235); + let (tx, _rx) = UnboundedMailbox::new(); + let releaser = super::Releaser::new(tx); + let block_duration = Duration::from_secs(100); + let config = super::Config { + allow_private_ips: true, + allow_dns: true, + bypass_ip_check: false, + max_sets: 3, + rate_limit: Quota::per_second(NZU32!(10)), + block_duration, + }; + + runtime.start(|context| async move { + let mut directory = Directory::init(context.clone(), my_pk, config, releaser); + + // Add peer to a set + directory.add_set(0, [(pk_1.clone(), addr(addr_1))].try_into().unwrap()); + + // Peer should be dialable before blocking + assert!( + directory.dialable().contains(&pk_1), + "Peer should be dialable before blocking" + ); + + // Block the peer + directory.block(&pk_1); + + // Peer should NOT be dialable while blocked + assert!( + !directory.dialable().contains(&pk_1), + "Blocked peer should not be dialable" + ); + + // Advance time and unblock + context.sleep(block_duration + Duration::from_secs(1)).await; + directory.unblock_expired(); + + // Peer should be dialable again after unblock + assert!( + directory.dialable().contains(&pk_1), + "Peer should be dialable after unblock" + ); + }); + } + + #[test] + fn test_blocked_peer_not_acceptable() { + let runtime = deterministic::Runner::default(); + let my_pk = ed25519::PrivateKey::from_seed(0).public_key(); + let pk_1 = ed25519::PrivateKey::from_seed(1).public_key(); + let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1235); + let (tx, _rx) = UnboundedMailbox::new(); + let releaser = super::Releaser::new(tx); + let block_duration = Duration::from_secs(100); + let config = super::Config { + allow_private_ips: true, + allow_dns: true, + bypass_ip_check: true, // Bypass IP check to simplify test + max_sets: 3, + rate_limit: Quota::per_second(NZU32!(10)), + block_duration, + }; + + runtime.start(|context| async move { + let mut directory = Directory::init(context.clone(), my_pk, config, releaser); + + // Add peer to a set + directory.add_set(0, [(pk_1.clone(), addr(addr_1))].try_into().unwrap()); + + // Peer should be acceptable before blocking + assert!( + directory.acceptable(&pk_1, addr_1.ip()), + "Peer should be acceptable before blocking" + ); + + // Block the peer + directory.block(&pk_1); + + // Peer should NOT be acceptable while blocked + assert!( + !directory.acceptable(&pk_1, addr_1.ip()), + "Blocked peer should not be acceptable" + ); + + // Advance time and unblock + context.sleep(block_duration + Duration::from_secs(1)).await; + directory.unblock_expired(); + + // Peer should be acceptable again after unblock + assert!( + directory.acceptable(&pk_1, addr_1.ip()), + "Peer should be acceptable after unblock" + ); + }); + } + + #[test] + fn test_blocked_peer_not_listenable() { + let runtime = deterministic::Runner::default(); + let my_pk = ed25519::PrivateKey::from_seed(0).public_key(); + let pk_1 = ed25519::PrivateKey::from_seed(1).public_key(); + let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1235); + let (tx, _rx) = UnboundedMailbox::new(); + let releaser = super::Releaser::new(tx); + let block_duration = Duration::from_secs(100); + let config = super::Config { + allow_private_ips: true, + allow_dns: true, + bypass_ip_check: false, + max_sets: 3, + rate_limit: Quota::per_second(NZU32!(10)), + block_duration, + }; + + runtime.start(|context| async move { + let mut directory = Directory::init(context.clone(), my_pk, config, releaser); + + // Add peer to a set + directory.add_set(0, [(pk_1.clone(), addr(addr_1))].try_into().unwrap()); + + // Peer's IP should be listenable before blocking + assert!( + directory.listenable().contains(&addr_1.ip()), + "Peer's IP should be listenable before blocking" + ); + + // Block the peer + directory.block(&pk_1); + + // Peer's IP should NOT be listenable while blocked + assert!( + !directory.listenable().contains(&addr_1.ip()), + "Blocked peer's IP should not be listenable" + ); + + // Advance time and unblock + context.sleep(block_duration + Duration::from_secs(1)).await; + directory.unblock_expired(); + + // Peer's IP should be listenable again after unblock + assert!( + directory.listenable().contains(&addr_1.ip()), + "Peer's IP should be listenable after unblock" + ); + }); + } + + #[test] + fn test_blocked_peer_not_eligible() { + let runtime = deterministic::Runner::default(); + let my_pk = ed25519::PrivateKey::from_seed(0).public_key(); + let pk_1 = ed25519::PrivateKey::from_seed(1).public_key(); + let addr_1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 1235); + let (tx, _rx) = UnboundedMailbox::new(); + let releaser = super::Releaser::new(tx); + let block_duration = Duration::from_secs(100); + let config = super::Config { + allow_private_ips: true, + allow_dns: true, + bypass_ip_check: false, + max_sets: 3, + rate_limit: Quota::per_second(NZU32!(10)), + block_duration, + }; + + runtime.start(|context| async move { + let mut directory = Directory::init(context.clone(), my_pk, config, releaser); + + // Add peer to a set + directory.add_set(0, [(pk_1.clone(), addr(addr_1))].try_into().unwrap()); + + // Peer should be eligible before blocking + assert!( + directory.eligible(&pk_1), + "Peer should be eligible before blocking" + ); + + // Block the peer + directory.block(&pk_1); + + // Peer should NOT be eligible while blocked + assert!( + !directory.eligible(&pk_1), + "Blocked peer should not be eligible" + ); + + // Advance time and unblock + context.sleep(block_duration + Duration::from_secs(1)).await; + directory.unblock_expired(); + + // Peer should be eligible again after unblock + assert!( + directory.eligible(&pk_1), + "Peer should be eligible after unblock" + ); + }); + } } diff --git a/p2p/src/authenticated/lookup/actors/tracker/record.rs b/p2p/src/authenticated/lookup/actors/tracker/record.rs index e9e68caa1d..d7f2558284 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/record.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/record.rs @@ -9,9 +9,6 @@ pub enum Address { /// Address is provided when peer is registered. Known(types::Address), - - /// Peer is blocked. Address info is preserved so it can be restored when unblocked. - Blocked(types::Address), } /// Represents the connection status of a peer. @@ -75,36 +72,10 @@ impl Record { pub fn update(&mut self, addr: types::Address) { match &mut self.address { Address::Myself => {} - Address::Known(existing) | Address::Blocked(existing) => *existing = addr, - } - } - - /// Attempt to mark the peer as blocked. - /// - /// Returns `true` if the peer was newly blocked. - /// Returns `false` if the peer was already blocked or is the local node (unblockable). - pub fn block(&mut self) -> bool { - match &self.address { - Address::Myself | Address::Blocked(_) => false, - Address::Known(addr) => { - self.address = Address::Blocked(addr.clone()); - true - } + Address::Known(existing) => *existing = addr, } } - /// Clear the block on this peer, restoring the original address. - /// - /// # Panics - /// - /// Panics if the peer is not blocked. - pub fn clear_expired_block(&mut self) { - let Address::Blocked(addr) = &mut self.address else { - panic!("clear_expired_block called on non-blocked address"); - }; - self.address = Address::Known(addr.clone()); - } - /// Increase the count of peer sets this peer is part of. pub const fn increment(&mut self) { self.sets = self.sets.checked_add(1).unwrap(); @@ -123,7 +94,7 @@ impl Record { /// /// Returns `true` if the reservation was successful, `false` otherwise. pub const fn reserve(&mut self) -> bool { - if matches!(self.address, Address::Blocked(_) | Address::Myself) { + if matches!(self.address, Address::Myself) { return false; } if matches!(self.status, Status::Inert) { @@ -149,10 +120,12 @@ impl Record { // ---------- Getters ---------- - /// Returns `true` if the peer is currently blocked. - #[cfg(test)] - pub const fn is_blocked(&self) -> bool { - matches!(self.address, Address::Blocked(_)) + /// Returns `true` if this peer can be blocked. + /// + /// Only `Myself` cannot be blocked. Actual blocked status is tracked + /// by the Directory via blocked::Queue. + pub const fn is_blockable(&self) -> bool { + !matches!(self.address, Address::Myself) } /// Returns the number of peer sets this peer is part of. @@ -164,7 +137,7 @@ impl Record { /// /// A record is dialable if: /// - We have a known address of the peer - /// - It is not ourselves or blocked + /// - It is not ourselves /// - We are not already connected or reserved /// - The ingress address is allowed (DNS enabled, Socket IP is global or private IPs allowed) pub fn dialable(&self, allow_private_ips: bool, allow_dns: bool) -> bool { @@ -173,7 +146,7 @@ impl Record { } let ingress = match &self.address { Address::Known(addr) => addr.ingress(), - Address::Myself | Address::Blocked(_) => return false, + Address::Myself => return false, }; ingress.is_valid(allow_private_ips, allow_dns) } @@ -181,7 +154,7 @@ impl Record { /// Returns `true` if this peer is acceptable (can accept an incoming connection from them). /// /// A peer is acceptable if: - /// - The peer is eligible (in a peer set, not blocked, not ourselves) + /// - The peer is eligible (in a peer set, not ourselves) /// - The source IP matches the expected egress IP for this peer (if not bypass_ip_check) /// - We are not already connected or reserved pub fn acceptable(&self, source_ip: IpAddr, bypass_ip_check: bool) -> bool { @@ -193,14 +166,14 @@ impl Record { } match &self.address { Address::Known(addr) => addr.egress_ip() == source_ip, - Address::Myself | Address::Blocked(_) => false, + Address::Myself => false, } } /// Return the ingress address for dialing, if known. pub fn ingress(&self) -> Option { match &self.address { - Address::Myself | Address::Blocked(_) => None, + Address::Myself => None, Address::Known(addr) => Some(addr.ingress()), } } @@ -208,7 +181,7 @@ impl Record { /// Return the egress IP for filtering, if known. pub const fn egress_ip(&self) -> Option { match &self.address { - Address::Myself | Address::Blocked(_) => None, + Address::Myself => None, Address::Known(addr) => Some(addr.egress_ip()), } } @@ -227,15 +200,11 @@ impl Record { /// Returns `true` if this peer is eligible for connection. /// /// A peer is eligible if: - /// - It is not blocked or ourselves + /// - It is not ourselves /// - It is part of at least one peer set (or is persistent) - /// - /// This is the base check for reserving a connection. IP validity is checked - /// separately: ingress validity for dialing (in `dialable()`), egress validity - /// for the IP filter (in `Directory::eligible_egress_ips()`). pub const fn eligible(&self) -> bool { match &self.address { - Address::Myself | Address::Blocked(_) => false, + Address::Myself => false, Address::Known(_) => self.sets > 0 || self.persistent, } } @@ -261,17 +230,35 @@ mod tests { assert_eq!(record.sets, 0); assert!(record.persistent); assert!(record.ingress().is_none()); - assert!(!record.is_blocked()); + assert!(!record.is_blockable()); assert!(!record.reserved()); assert!(!record.deletable()); assert!(!record.eligible()); } #[test] - fn test_myself_blocked_to_known() { - let mut record = Record::myself(); - assert!(!record.block(), "Can't block myself"); - assert!(!record.is_blocked(), "Can't block myself"); + fn test_known_initial_state() { + let record = Record::known(test_address()); + assert!(matches!(record.address, Address::Known(_))); + assert_eq!(record.status, Status::Inert); + assert_eq!(record.sets, 0); + assert!(!record.persistent); + assert!(record.ingress().is_some()); + assert!(record.is_blockable()); + assert!(!record.reserved()); + assert!(record.deletable()); + assert!(!record.eligible()); + } + + #[test] + fn test_is_blockable() { + // Myself is not blockable + let record_myself = Record::myself(); + assert!(!record_myself.is_blockable()); + + // Known peers are blockable + let record_known = Record::known(test_address()); + assert!(record_known.is_blockable()); } #[test] @@ -299,41 +286,6 @@ mod tests { record.decrement(); } - #[test] - fn test_block_behavior_and_persistence() { - let mut record_known = Record::known(test_address()); - assert!(!record_known.persistent); - assert!(record_known.block()); - assert!(record_known.is_blocked()); - assert!(!record_known.persistent); - - let mut record_reserved = Record::known(test_address()); - assert!(record_reserved.reserve()); - assert!(record_reserved.block()); - assert_eq!(record_reserved.status, Status::Reserved); - - let mut record_active = Record::known(test_address()); - assert!(record_active.reserve()); - record_active.connect(); - assert!(record_active.block()); - assert_eq!(record_active.status, Status::Active); - } - - #[test] - fn test_block_myself_and_already_blocked() { - let mut record_myself = Record::myself(); - assert!(!record_myself.block(), "Cannot block myself"); - assert!(matches!(&record_myself.address, Address::Myself)); - - let mut record_to_be_blocked = Record::known(test_address()); - assert!(record_to_be_blocked.block()); - assert!( - !record_to_be_blocked.block(), - "Cannot block already blocked peer" - ); - assert!(record_to_be_blocked.is_blocked()); - } - #[test] fn test_status_transitions_reserve_connect_release() { let mut record = Record::known(test_address()); @@ -425,10 +377,7 @@ mod tests { #[test] fn test_eligible_logic() { - // Blocked and Myself are never eligible - let mut record_blocked = Record::known(test_address()); - record_blocked.block(); - assert!(!record_blocked.eligible()); + // Myself is never eligible assert!(!Record::myself().eligible()); // Known records are only eligible when in a peer set @@ -440,41 +389,6 @@ mod tests { assert!(!record_known.eligible(), "Not eligible when sets=0 again"); } - #[test] - fn test_block_expiration() { - let mut record = Record::known(test_address()); - record.increment(); - - // Block the peer - assert!(record.block()); - assert!(record.is_blocked()); - assert!(!record.eligible()); - assert!(!record.reserve()); - - // After block is cleared, peer should be eligible again - record.clear_expired_block(); - assert!(!record.is_blocked()); - assert!(record.eligible()); - assert!(record.reserve()); - - // Address should be preserved - assert!(matches!(record.address, Address::Known(_))); - } - - #[test] - fn test_clear_expired_block() { - let mut record = Record::known(test_address()); - record.increment(); - - // Block the peer - assert!(record.block()); - assert!(record.is_blocked()); - - // Clear should restore the address - record.clear_expired_block(); - assert!(!record.is_blocked()); - } - #[test] fn test_acceptable_checks_eligibility_status_and_ip() { use std::net::IpAddr; @@ -522,15 +436,6 @@ mod tests { !record_connected.acceptable(egress_ip, false), "Not acceptable when connected" ); - - // Blocked - not acceptable - let mut record_blocked = Record::known(types::Address::Symmetric(public_socket)); - record_blocked.increment(); - record_blocked.block(); - assert!( - !record_blocked.acceptable(egress_ip, false), - "Not acceptable when blocked" - ); } #[test] @@ -554,15 +459,6 @@ mod tests { "Not acceptable when not eligible (sets=0), even with bypass_ip_check=true" ); - // Still not acceptable when blocked - let mut record_blocked = Record::known(types::Address::Symmetric(public_socket)); - record_blocked.increment(); - record_blocked.block(); - assert!( - !record_blocked.acceptable(egress_ip, true), - "Not acceptable when blocked" - ); - // Still not acceptable when reserved let mut record_reserved = Record::known(types::Address::Symmetric(public_socket)); record_reserved.increment(); diff --git a/p2p/src/utils/blocked.rs b/p2p/src/utils/blocked.rs index ed7d65b40b..0ec5e338bd 100644 --- a/p2p/src/utils/blocked.rs +++ b/p2p/src/utils/blocked.rs @@ -20,14 +20,20 @@ pub struct Queue { queue: VecDeque<(SystemTime, K)>, } -impl Queue { - /// Create a new empty block queue. - pub fn new() -> Self { +impl Default for Queue { + fn default() -> Self { Self { blocked: HashMap::new(), queue: VecDeque::new(), } } +} + +impl Queue { + /// Create a new empty block queue. + pub fn new() -> Self { + Self::default() + } /// Block a peer until the given time. /// @@ -41,8 +47,18 @@ impl Queue { true } + /// Returns the number of currently blocked peers. + pub fn len(&self) -> usize { + self.blocked.len() + } + + /// Returns `true` if no peers are currently blocked. + pub fn is_empty(&self) -> bool { + self.blocked.is_empty() + } + /// Returns `true` if the peer is currently blocked. - pub fn is_blocked(&self, peer: &K) -> bool { + pub fn contains(&self, peer: &K) -> bool { self.blocked.contains_key(peer) } @@ -76,12 +92,6 @@ impl Queue { } } -impl Default for Queue { - fn default() -> Self { - Self::new() - } -} - /// Sleep until the next deadline, or wait forever if none. pub async fn wait_for(context: &E, deadline: Option) { match deadline { @@ -100,17 +110,19 @@ mod tests { } #[test] - fn test_block_and_is_blocked() { + fn test_block_and_contains() { let mut queue = Queue::new(); let peer = "peer1"; let until = now() + Duration::from_secs(100); - assert!(!queue.is_blocked(&peer)); + assert!(!queue.contains(&peer)); assert!(queue.block(peer, until)); - assert!(queue.is_blocked(&peer)); + assert!(queue.contains(&peer)); + assert_eq!(queue.len(), 1); // Blocking again returns false assert!(!queue.block(peer, until)); + assert_eq!(queue.len(), 1); } #[test] @@ -134,28 +146,32 @@ mod tests { queue.block(peer1, until1); queue.block(peer2, until2); + assert_eq!(queue.len(), 2); // Nothing expired yet let unblocked = queue.unblock_expired(now()); assert!(unblocked.is_empty()); - assert!(queue.is_blocked(&peer1)); - assert!(queue.is_blocked(&peer2)); + assert!(queue.contains(&peer1)); + assert!(queue.contains(&peer2)); + assert_eq!(queue.len(), 2); // Only peer1 expired let unblocked = queue.unblock_expired(until1 + Duration::from_secs(1)); assert_eq!(unblocked, vec![peer1]); - assert!(!queue.is_blocked(&peer1)); - assert!(queue.is_blocked(&peer2)); + assert!(!queue.contains(&peer1)); + assert!(queue.contains(&peer2)); + assert_eq!(queue.len(), 1); // peer2 expired let unblocked = queue.unblock_expired(until2 + Duration::from_secs(1)); assert_eq!(unblocked, vec![peer2]); - assert!(!queue.is_blocked(&peer2)); + assert!(!queue.contains(&peer2)); + assert_eq!(queue.len(), 0); } #[test] fn test_next_deadline() { - let mut queue = Queue::<&str>::new(); + let mut queue: Queue<&str> = Queue::new(); assert!(queue.next_deadline().is_none()); @@ -185,6 +201,6 @@ mod tests { // Calling unblock_expired before expiration should not affect the block queue.unblock_expired(now()); - assert!(queue.is_blocked(&peer)); + assert!(queue.contains(&peer)); } }