Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 160 additions & 59 deletions p2p/src/authenticated/discovery/actors/tracker/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,11 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {

let blocked_until = self.context.current() + self.block_duration;
self.blocked.put(peer.clone(), blocked_until);
let _ = self.metrics.blocked.try_set(self.blocked.len());
let _ = self
.metrics
.blocked
.get_or_create(&metrics::Peer::new(peer))
.try_set(blocked_until.epoch_millis());
}

// ---------- Getters ----------
Expand Down Expand Up @@ -398,6 +402,7 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
}
let (peer, _) = self.blocked.pop().unwrap();
debug!(?peer, "unblocked peer");
self.metrics.blocked.remove(&metrics::Peer::new(&peer));

// Update knowledge bitmaps
if let Some(record) = self.peers.get(&peer) {
Expand All @@ -407,7 +412,6 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
}
}
}
let _ = self.metrics.blocked.try_set(self.blocked.len());
}

/// Waits until the next blocked peer should be unblocked.
Expand Down Expand Up @@ -532,11 +536,14 @@ mod tests {
// Blocking myself should be ignored (Myself is unblockable)
directory.block(&my_pk);

// Metrics should not be incremented
assert_eq!(
directory.metrics.blocked.get(),
0,
"Blocking myself should not increment metric"
// Metrics should not have an entry for myself
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&my_pk))
.is_none(),
"Blocking myself should not create metric entry"
);

// No peers should be blocked
Expand Down Expand Up @@ -574,11 +581,14 @@ mod tests {
// Block a peer that doesn't exist yet
directory.block(&unknown_pk);

// Metrics should be incremented
assert_eq!(
directory.metrics.blocked.get(),
1,
"Blocking nonexistent peer should increment metric"
// Metrics should have an entry for the blocked peer
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&unknown_pk))
.is_some(),
"Blocking nonexistent peer should create metric entry"
);

// Peer should be blocked
Expand Down Expand Up @@ -616,11 +626,14 @@ mod tests {
// Unblock the peer
directory.unblock_expired();

// Metrics should be decremented
assert_eq!(
directory.metrics.blocked.get(),
0,
"Blocked metric should be 0 after unblock"
// Metrics entry should be removed for the unblocked peer
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&unknown_pk))
.is_none(),
"Blocked metric should be removed after unblock"
);

// Peer should now be eligible
Expand Down Expand Up @@ -657,42 +670,75 @@ mod tests {
let peer_set: OrderedSet<_> =
[registered_pk.clone()].into_iter().try_collect().unwrap();
directory.add_set(0, peer_set);
assert_eq!(directory.metrics.blocked.get(), 0);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&registered_pk))
.is_none(),
"Peer should not be blocked initially"
);

// Block registered peer multiple times
directory.block(&registered_pk);
assert_eq!(directory.metrics.blocked.get(), 1);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&registered_pk))
.is_some(),
"Registered peer should be marked blocked"
);

directory.block(&registered_pk);
assert_eq!(
directory.metrics.blocked.get(),
1,
"Blocking same registered peer twice should not increment metric"
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&registered_pk))
.is_some(),
"Blocking same registered peer twice should not change metric"
);

directory.block(&registered_pk);
assert_eq!(
directory.metrics.blocked.get(),
1,
"Blocking same registered peer thrice should not increment metric"
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&registered_pk))
.is_some(),
"Blocking same registered peer thrice should not change metric"
);

// Block a nonexistent peer multiple times
directory.block(&unknown_pk);
assert_eq!(directory.metrics.blocked.get(), 2);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&unknown_pk))
.is_some(),
"Unknown peer should be marked blocked"
);

directory.block(&unknown_pk);
assert_eq!(
directory.metrics.blocked.get(),
2,
"Blocking same nonexistent peer twice should not increment metric"
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&unknown_pk))
.is_some(),
"Blocking same nonexistent peer twice should not change metric"
);

directory.block(&unknown_pk);
assert_eq!(
directory.metrics.blocked.get(),
2,
"Blocking same nonexistent peer thrice should not increment metric"
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&unknown_pk))
.is_some(),
"Blocking same nonexistent peer thrice should not change metric"
);
});
}
Expand Down Expand Up @@ -841,27 +887,44 @@ mod tests {
let mut directory = Directory::init(context.clone(), vec![], my_info, config, releaser);

// Initially no blocked peers
assert_eq!(directory.metrics.blocked.get(), 0);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_none(),
"pk_1 should not be blocked initially"
);

// Add pk_1 and block it
let peer_set: OrderedSet<_> = [pk_1.clone()].into_iter().try_collect().unwrap();
directory.add_set(0, peer_set);
directory.block(&pk_1);
assert!(directory.blocked.contains(&pk_1));
assert_eq!(directory.metrics.blocked.get(), 1);
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_some(),
"pk_1 should be marked blocked"
);

// Add a new set that evicts pk_1 (max_sets=1)
// The blocked metric should remain 1 since the block persists
// The blocked metric should remain since the block persists
let peer_set_2: OrderedSet<_> = [pk_2.clone()].into_iter().try_collect().unwrap();
directory.add_set(1, peer_set_2);
assert!(
!directory.peers.contains_key(&pk_1),
"pk_1 should be removed"
);
assert_eq!(
directory.metrics.blocked.get(),
1,
"blocked metric should still be 1 after peer removal"
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_some(),
"blocked metric should persist after peer removal"
);

// Re-add pk_1 - should still be blocked because block persists
Expand All @@ -871,10 +934,13 @@ mod tests {
directory.blocked.contains(&pk_1),
"Re-added pk_1 should still be blocked"
);
assert_eq!(
directory.metrics.blocked.get(),
1,
"blocked metric should still be 1 after re-add"
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_some(),
"blocked metric should persist after re-add"
);

// Advance time past block duration
Expand All @@ -886,10 +952,13 @@ mod tests {
!directory.blocked.contains(&pk_1),
"pk_1 should no longer be blocked"
);
assert_eq!(
directory.metrics.blocked.get(),
0,
"blocked metric should be 0 after unblock"
assert!(
directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_none(),
"blocked metric should be removed after unblock"
);
});
}
Expand Down Expand Up @@ -923,24 +992,56 @@ mod tests {
.try_collect()
.unwrap();
directory.add_set(0, peer_set);
assert_eq!(directory.metrics.blocked.get(), 0);
assert_eq!(directory.blocked(), 0);

// Block all three peers
directory.block(&pk_1);
assert_eq!(directory.metrics.blocked.get(), 1);
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_some());
directory.block(&pk_2);
assert_eq!(directory.metrics.blocked.get(), 2);
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_2))
.is_some());
directory.block(&pk_3);
assert_eq!(directory.metrics.blocked.get(), 3);

// Blocking again should not increment
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_3))
.is_some());
assert_eq!(directory.blocked(), 3);

// Blocking again should not change anything
directory.block(&pk_1);
assert_eq!(directory.metrics.blocked.get(), 3);
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_some());

// Advance time and unblock all
context.sleep(block_duration + Duration::from_secs(1)).await;
directory.unblock_expired();
assert_eq!(directory.metrics.blocked.get(), 0);
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_1))
.is_none());
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_2))
.is_none());
assert!(directory
.metrics
.blocked
.get(&metrics::Peer::new(&pk_3))
.is_none());
assert_eq!(directory.blocked(), 0);
});
}

Expand Down
6 changes: 3 additions & 3 deletions p2p/src/authenticated/discovery/actors/tracker/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ pub struct Metrics {
/// Does not include self, despite having a record for it.
pub tracked: Gauge,

/// The total number of blocked peers.
pub blocked: Gauge,
/// Blocked peers (value = expiry time as epoch millis).
pub blocked: Family<metrics::Peer, Gauge>,

/// The total number of outstanding reservations.
pub reserved: Gauge,
Expand All @@ -34,7 +34,7 @@ impl Metrics {
);
context.register(
"blocked",
"Total number of blocked peers",
"Blocked peers (value = expiry time as epoch millis)",
metrics.blocked.clone(),
);
context.register(
Expand Down
Loading
Loading