diff --git a/p2p/src/authenticated/discovery/actors/tracker/directory.rs b/p2p/src/authenticated/discovery/actors/tracker/directory.rs index 630b2edce9..8de350ddf0 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/directory.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/directory.rs @@ -302,7 +302,11 @@ impl Directory { let blocked_until = self.context.current() + self.block_duration; self.blocked.put(peer.clone(), blocked_until); - let _ = self.metrics.blocked.try_set(self.blocked.len()); + let _ = self + .metrics + .blocked + .get_or_create(&metrics::Peer::new(peer)) + .try_set(blocked_until.epoch_millis()); } // ---------- Getters ---------- @@ -398,6 +402,7 @@ impl Directory { } let (peer, _) = self.blocked.pop().unwrap(); debug!(?peer, "unblocked peer"); + self.metrics.blocked.remove(&metrics::Peer::new(&peer)); // Update knowledge bitmaps if let Some(record) = self.peers.get(&peer) { @@ -407,7 +412,6 @@ impl Directory { } } } - let _ = self.metrics.blocked.try_set(self.blocked.len()); } /// Waits until the next blocked peer should be unblocked. @@ -532,11 +536,14 @@ mod tests { // Blocking myself should be ignored (Myself is unblockable) directory.block(&my_pk); - // Metrics should not be incremented - assert_eq!( - directory.metrics.blocked.get(), - 0, - "Blocking myself should not increment metric" + // Metrics should not have an entry for myself + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&my_pk)) + .is_none(), + "Blocking myself should not create metric entry" ); // No peers should be blocked @@ -574,11 +581,14 @@ mod tests { // Block a peer that doesn't exist yet directory.block(&unknown_pk); - // Metrics should be incremented - assert_eq!( - directory.metrics.blocked.get(), - 1, - "Blocking nonexistent peer should increment metric" + // Metrics should have an entry for the blocked peer + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&unknown_pk)) + .is_some(), + "Blocking nonexistent peer should create metric entry" ); // Peer should be blocked @@ -616,11 +626,14 @@ mod tests { // Unblock the peer directory.unblock_expired(); - // Metrics should be decremented - assert_eq!( - directory.metrics.blocked.get(), - 0, - "Blocked metric should be 0 after unblock" + // Metrics entry should be removed for the unblocked peer + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&unknown_pk)) + .is_none(), + "Blocked metric should be removed after unblock" ); // Peer should now be eligible @@ -657,42 +670,75 @@ mod tests { let peer_set: OrderedSet<_> = [registered_pk.clone()].into_iter().try_collect().unwrap(); directory.add_set(0, peer_set); - assert_eq!(directory.metrics.blocked.get(), 0); + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(®istered_pk)) + .is_none(), + "Peer should not be blocked initially" + ); // Block registered peer multiple times directory.block(®istered_pk); - assert_eq!(directory.metrics.blocked.get(), 1); + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(®istered_pk)) + .is_some(), + "Registered peer should be marked blocked" + ); directory.block(®istered_pk); - assert_eq!( - directory.metrics.blocked.get(), - 1, - "Blocking same registered peer twice should not increment metric" + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(®istered_pk)) + .is_some(), + "Blocking same registered peer twice should not change metric" ); directory.block(®istered_pk); - assert_eq!( - directory.metrics.blocked.get(), - 1, - "Blocking same registered peer thrice should not increment metric" + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(®istered_pk)) + .is_some(), + "Blocking same registered peer thrice should not change metric" ); // Block a nonexistent peer multiple times directory.block(&unknown_pk); - assert_eq!(directory.metrics.blocked.get(), 2); + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&unknown_pk)) + .is_some(), + "Unknown peer should be marked blocked" + ); directory.block(&unknown_pk); - assert_eq!( - directory.metrics.blocked.get(), - 2, - "Blocking same nonexistent peer twice should not increment metric" + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&unknown_pk)) + .is_some(), + "Blocking same nonexistent peer twice should not change metric" ); directory.block(&unknown_pk); - assert_eq!( - directory.metrics.blocked.get(), - 2, - "Blocking same nonexistent peer thrice should not increment metric" + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&unknown_pk)) + .is_some(), + "Blocking same nonexistent peer thrice should not change metric" ); }); } @@ -796,6 +842,12 @@ mod tests { // Verify peer is blocked assert_eq!(directory.blocked(), 1, "Should have one blocked peer"); + // Get first expiry time + let first_expiry = directory + .blocked + .get(&peer_pk) + .expect("peer should be blocked"); + // unblock_expired should do nothing before expiry directory.unblock_expired(); assert!( @@ -815,6 +867,20 @@ mod tests { // Verify no more blocked peers assert_eq!(directory.blocked(), 0, "No more blocked peers"); + + // Re-block the peer and verify expiry time increased + directory.block(&peer_pk); + assert_eq!(directory.blocked(), 1, "Should have one blocked peer again"); + + let second_expiry = directory + .blocked + .get(&peer_pk) + .expect("peer should be blocked again"); + + assert!( + second_expiry > first_expiry, + "Re-blocking should have a later expiry time" + ); }); } @@ -841,27 +907,44 @@ mod tests { let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser); // Initially no blocked peers - assert_eq!(directory.metrics.blocked.get(), 0); + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_1)) + .is_none(), + "pk_1 should not be blocked initially" + ); // Add pk_1 and block it let peer_set: OrderedSet<_> = [pk_1.clone()].into_iter().try_collect().unwrap(); directory.add_set(0, peer_set); directory.block(&pk_1); assert!(directory.blocked.contains(&pk_1)); - assert_eq!(directory.metrics.blocked.get(), 1); + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_1)) + .is_some(), + "pk_1 should be marked blocked" + ); // Add a new set that evicts pk_1 (max_sets=1) - // The blocked metric should remain 1 since the block persists + // The blocked metric should remain since the block persists let peer_set_2: OrderedSet<_> = [pk_2.clone()].into_iter().try_collect().unwrap(); directory.add_set(1, peer_set_2); assert!( !directory.peers.contains_key(&pk_1), "pk_1 should be removed" ); - assert_eq!( - directory.metrics.blocked.get(), - 1, - "blocked metric should still be 1 after peer removal" + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_1)) + .is_some(), + "blocked metric should persist after peer removal" ); // Re-add pk_1 - should still be blocked because block persists @@ -871,10 +954,13 @@ mod tests { directory.blocked.contains(&pk_1), "Re-added pk_1 should still be blocked" ); - assert_eq!( - directory.metrics.blocked.get(), - 1, - "blocked metric should still be 1 after re-add" + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_1)) + .is_some(), + "blocked metric should persist after re-add" ); // Advance time past block duration @@ -886,10 +972,13 @@ mod tests { !directory.blocked.contains(&pk_1), "pk_1 should no longer be blocked" ); - assert_eq!( - directory.metrics.blocked.get(), - 0, - "blocked metric should be 0 after unblock" + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_1)) + .is_none(), + "blocked metric should be removed after unblock" ); }); } @@ -923,24 +1012,56 @@ mod tests { .try_collect() .unwrap(); directory.add_set(0, peer_set); - assert_eq!(directory.metrics.blocked.get(), 0); + assert_eq!(directory.blocked(), 0); // Block all three peers directory.block(&pk_1); - assert_eq!(directory.metrics.blocked.get(), 1); + assert!(directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_1)) + .is_some()); directory.block(&pk_2); - assert_eq!(directory.metrics.blocked.get(), 2); + assert!(directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_2)) + .is_some()); directory.block(&pk_3); - assert_eq!(directory.metrics.blocked.get(), 3); - - // Blocking again should not increment + assert!(directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_3)) + .is_some()); + assert_eq!(directory.blocked(), 3); + + // Blocking again should not change anything directory.block(&pk_1); - assert_eq!(directory.metrics.blocked.get(), 3); + assert!(directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_1)) + .is_some()); // Advance time and unblock all context.sleep(block_duration + Duration::from_secs(1)).await; directory.unblock_expired(); - assert_eq!(directory.metrics.blocked.get(), 0); + assert!(directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_1)) + .is_none()); + assert!(directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_2)) + .is_none()); + assert!(directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_3)) + .is_none()); + assert_eq!(directory.blocked(), 0); }); } diff --git a/p2p/src/authenticated/discovery/actors/tracker/metrics.rs b/p2p/src/authenticated/discovery/actors/tracker/metrics.rs index e0a2352373..65a08f9f72 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/metrics.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/metrics.rs @@ -10,8 +10,8 @@ pub struct Metrics { /// Does not include self, despite having a record for it. pub tracked: Gauge, - /// The total number of blocked peers. - pub blocked: Gauge, + /// Blocked peers (value = expiry time as epoch millis). + pub blocked: Family, /// The total number of outstanding reservations. pub reserved: Gauge, @@ -34,7 +34,7 @@ impl Metrics { ); context.register( "blocked", - "Total number of blocked peers", + "Blocked peers (value = expiry time as epoch millis)", metrics.blocked.clone(), ); context.register( diff --git a/p2p/src/authenticated/lookup/actors/tracker/directory.rs b/p2p/src/authenticated/lookup/actors/tracker/directory.rs index ed0f31c872..5d399abcaf 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/directory.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/directory.rs @@ -11,7 +11,7 @@ use commonware_runtime::{ }; use commonware_utils::{ ordered::{Map, Set}, - IpAddrExt, PrioritySet, TryCollect, + IpAddrExt, PrioritySet, SystemTimeExt, TryCollect, }; use rand::Rng; use std::{ @@ -240,7 +240,11 @@ impl Directory { let blocked_until = self.context.current() + self.block_duration; self.blocked.put(peer.clone(), blocked_until); - let _ = self.metrics.blocked.try_set(self.blocked.len()); + let _ = self + .metrics + .blocked + .get_or_create(&metrics::Peer::new(peer)) + .try_set(blocked_until.epoch_millis()); } // ---------- Getters ---------- @@ -317,9 +321,9 @@ impl Directory { } let (peer, _) = self.blocked.pop().unwrap(); debug!(?peer, "unblocked peer"); + self.metrics.blocked.remove(&metrics::Peer::new(&peer)); any_unblocked = true; } - let _ = self.metrics.blocked.try_set(self.blocked.len()); any_unblocked } @@ -399,7 +403,10 @@ impl Directory { #[cfg(test)] mod tests { use crate::{ - authenticated::{lookup::actors::tracker::directory::Directory, mailbox::UnboundedMailbox}, + authenticated::{ + lookup::{actors::tracker::directory::Directory, metrics}, + mailbox::UnboundedMailbox, + }, types::Address, Ingress, }; @@ -966,6 +973,12 @@ mod tests { // Verify peer is blocked assert_eq!(directory.blocked(), 1, "Should have one blocked peer"); + // Get first expiry time + let first_expiry = directory + .blocked + .get(&pk_1) + .expect("peer should be blocked"); + // unblock_expired should return false before expiry assert!( !directory.unblock_expired(), @@ -986,6 +999,20 @@ mod tests { // Verify no more blocked peers assert_eq!(directory.blocked(), 0, "No more blocked peers"); + + // Re-block the peer and verify expiry time increased + directory.block(&pk_1); + assert_eq!(directory.blocked(), 1, "Should have one blocked peer again"); + + let second_expiry = directory + .blocked + .get(&pk_1) + .expect("peer should be blocked again"); + + assert!( + second_expiry > first_expiry, + "Re-blocking should have a later expiry time" + ); }); } @@ -1014,25 +1041,42 @@ mod tests { let mut directory = Directory::init(context.clone(), my_pk, config, releaser); // Initially no blocked peers - assert_eq!(directory.metrics.blocked.get(), 0); + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_1)) + .is_none(), + "pk_1 should not be blocked initially" + ); // Add pk_1 and block it directory.add_set(0, [(pk_1.clone(), addr(addr_1))].try_into().unwrap()); directory.block(&pk_1); assert!(directory.blocked.contains(&pk_1)); - assert_eq!(directory.metrics.blocked.get(), 1); + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_1)) + .is_some(), + "pk_1 should be marked blocked" + ); // Add a new set that evicts pk_1 (max_sets=1) - // The blocked metric should remain 1 since the block persists + // The blocked metric should remain since the block persists directory.add_set(1, [(pk_2.clone(), addr(addr_2))].try_into().unwrap()); assert!( !directory.peers.contains_key(&pk_1), "pk_1 should be removed" ); - assert_eq!( - directory.metrics.blocked.get(), - 1, - "blocked metric should still be 1 after peer removal" + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_1)) + .is_some(), + "blocked metric should persist after peer removal" ); // Re-add pk_1 - should still be blocked because block persists @@ -1041,10 +1085,13 @@ mod tests { directory.blocked.contains(&pk_1), "Re-added pk_1 should still be blocked" ); - assert_eq!( - directory.metrics.blocked.get(), - 1, - "blocked metric should still be 1 after re-add" + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_1)) + .is_some(), + "blocked metric should persist after re-add" ); // Advance time past block duration @@ -1056,10 +1103,13 @@ mod tests { !directory.blocked.contains(&pk_1), "pk_1 should no longer be blocked" ); - assert_eq!( - directory.metrics.blocked.get(), - 0, - "blocked metric should be 0 after unblock" + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_1)) + .is_none(), + "blocked metric should be removed after unblock" ); }); } @@ -1101,24 +1151,56 @@ mod tests { .try_into() .unwrap(), ); - assert_eq!(directory.metrics.blocked.get(), 0); + assert_eq!(directory.blocked(), 0); // Block all three peers directory.block(&pk_1); - assert_eq!(directory.metrics.blocked.get(), 1); + assert!(directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_1)) + .is_some()); directory.block(&pk_2); - assert_eq!(directory.metrics.blocked.get(), 2); + assert!(directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_2)) + .is_some()); directory.block(&pk_3); - assert_eq!(directory.metrics.blocked.get(), 3); - - // Blocking again should not increment + assert!(directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_3)) + .is_some()); + assert_eq!(directory.blocked(), 3); + + // Blocking again should not change anything directory.block(&pk_1); - assert_eq!(directory.metrics.blocked.get(), 3); + assert!(directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_1)) + .is_some()); // Advance time and unblock all context.sleep(block_duration + Duration::from_secs(1)).await; assert!(directory.unblock_expired()); - assert_eq!(directory.metrics.blocked.get(), 0); + assert!(directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_1)) + .is_none()); + assert!(directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_2)) + .is_none()); + assert!(directory + .metrics + .blocked + .get(&metrics::Peer::new(&pk_3)) + .is_none()); + assert_eq!(directory.blocked(), 0); }); } @@ -1144,11 +1226,14 @@ mod tests { // Blocking myself should be ignored (Myself is unblockable) directory.block(&my_pk); - // Metrics should not be incremented - assert_eq!( - directory.metrics.blocked.get(), - 0, - "Blocking myself should not increment metric" + // Metrics should not have an entry for myself + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&my_pk)) + .is_none(), + "Blocking myself should not create metric entry" ); // No peers should be blocked @@ -1186,11 +1271,14 @@ mod tests { // Block a peer that doesn't exist yet directory.block(&unknown_pk); - // Metrics should be incremented - assert_eq!( - directory.metrics.blocked.get(), - 1, - "Blocking nonexistent peer should increment metric" + // Metrics should have an entry for the blocked peer + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&unknown_pk)) + .is_some(), + "Blocking nonexistent peer should create metric entry" ); // Peer should be blocked @@ -1232,11 +1320,14 @@ mod tests { // Unblock the peer directory.unblock_expired(); - // Metrics should be decremented - assert_eq!( - directory.metrics.blocked.get(), - 0, - "Blocked metric should be 0 after unblock" + // Metrics entry should be removed for the unblocked peer + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&unknown_pk)) + .is_none(), + "Blocked metric should be removed after unblock" ); // Peer should now be eligible @@ -1276,42 +1367,75 @@ mod tests { .try_into() .unwrap(), ); - assert_eq!(directory.metrics.blocked.get(), 0); + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(®istered_pk)) + .is_none(), + "Peer should not be blocked initially" + ); // Block registered peer multiple times directory.block(®istered_pk); - assert_eq!(directory.metrics.blocked.get(), 1); + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(®istered_pk)) + .is_some(), + "Registered peer should be marked blocked" + ); directory.block(®istered_pk); - assert_eq!( - directory.metrics.blocked.get(), - 1, - "Blocking same registered peer twice should not increment metric" + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(®istered_pk)) + .is_some(), + "Blocking same registered peer twice should not change metric" ); directory.block(®istered_pk); - assert_eq!( - directory.metrics.blocked.get(), - 1, - "Blocking same registered peer thrice should not increment metric" + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(®istered_pk)) + .is_some(), + "Blocking same registered peer thrice should not change metric" ); // Block a nonexistent peer multiple times directory.block(&unknown_pk); - assert_eq!(directory.metrics.blocked.get(), 2); + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&unknown_pk)) + .is_some(), + "Unknown peer should be marked blocked" + ); directory.block(&unknown_pk); - assert_eq!( - directory.metrics.blocked.get(), - 2, - "Blocking same nonexistent peer twice should not increment metric" + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&unknown_pk)) + .is_some(), + "Blocking same nonexistent peer twice should not change metric" ); directory.block(&unknown_pk); - assert_eq!( - directory.metrics.blocked.get(), - 2, - "Blocking same nonexistent peer thrice should not increment metric" + assert!( + directory + .metrics + .blocked + .get(&metrics::Peer::new(&unknown_pk)) + .is_some(), + "Blocking same nonexistent peer thrice should not change metric" ); }); } diff --git a/p2p/src/authenticated/lookup/actors/tracker/metrics.rs b/p2p/src/authenticated/lookup/actors/tracker/metrics.rs index 1dad93fa38..962b0c00e4 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/metrics.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/metrics.rs @@ -9,8 +9,8 @@ pub struct Metrics { /// Does not include self, despite having a record for it. pub tracked: Gauge, - /// The total number of blocked peers. - pub blocked: Gauge, + /// Blocked peers (value = expiry time as epoch millis). + pub blocked: Family, /// The total number of outstanding reservations. pub reserved: Gauge, @@ -33,7 +33,7 @@ impl Metrics { ); context.register( "blocked", - "Total number of blocked peers", + "Blocked peers (value = expiry time as epoch millis)", metrics.blocked.clone(), ); context.register(