Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions p2p/src/authenticated/discovery/actors/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use commonware_runtime::{
spawn_cell, Clock, ContextCell, Handle, KeyedRateLimiter, Listener, Metrics, Network, Quota,
SinkOf, Spawner, StreamOf,
};
use commonware_stream::{listen, Config as StreamConfig};
use commonware_stream::{listen, Config as StreamConfig, Error as StreamError};
use commonware_utils::{concurrency::Limiter, net::SubnetMask, IpAddrExt};
use prometheus_client::metrics::counter::Counter;
use rand::{CryptoRng, Rng};
Expand Down Expand Up @@ -103,16 +103,44 @@ impl<E: Spawner + Clock + Network + Rng + CryptoRng + Metrics, C: Signer> Actor<
mut tracker: UnboundedMailbox<tracker::Message<C::PublicKey>>,
mut supervisor: Mailbox<spawner::Message<SinkOf<E>, StreamOf<E>, C::PublicKey>>,
) {
// Track the rejection reason from the bouncer
let rejection_reason = std::sync::Arc::new(std::sync::Mutex::new(None));
let rejection_reason_clone = rejection_reason.clone();

let (peer, send, recv) = match listen(
context,
|peer| tracker.acceptable(peer),
|peer| {
let mut tracker = tracker.clone();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is jank but technically this the way you'd need to do this given the interface of listen

let rejection_reason = rejection_reason_clone.clone();
async move {
let result = tracker.acceptable(peer).await;
if result != tracker::Acceptable::Yes {
*rejection_reason.lock().unwrap() = Some(result);
}
result == tracker::Acceptable::Yes
}
},
stream_cfg,
stream,
sink,
)
.await
{
Ok(x) => x,
Err(StreamError::PeerRejected(_)) => {
// The bouncer returned false - check the captured reason
let reason = rejection_reason.lock().unwrap().take();
match reason {
Some(tracker::Acceptable::Blocked) => {
debug!(?address, "peer is blocked");
}
Some(tracker::Acceptable::Unknown) | None => {
debug!(?address, "peer not acceptable (unknown or not in peer set)");
}
Some(tracker::Acceptable::Yes) => unreachable!(),
}
return;
}
Err(err) => {
debug!(?err, "failed to complete handshake");
return;
Expand Down Expand Up @@ -300,7 +328,7 @@ mod tests {
while let Some(message) = tracker_rx.next().await {
match message {
tracker::Message::Acceptable { responder, .. } => {
let _ = responder.send(true);
let _ = responder.send(tracker::Acceptable::Yes);
}
tracker::Message::Listen { reservation, .. } => {
let _ = reservation.send(None);
Expand Down Expand Up @@ -443,7 +471,7 @@ mod tests {
while let Some(message) = tracker_rx.next().await {
match message {
tracker::Message::Acceptable { responder, .. } => {
let _ = responder.send(true);
let _ = responder.send(tracker::Acceptable::Yes);
}
tracker::Message::Listen { reservation, .. } => {
let _ = reservation.send(None);
Expand Down
32 changes: 24 additions & 8 deletions p2p/src/authenticated/discovery/actors/tracker/actor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(test)]
use super::ingress::Acceptable;
use super::{
directory::{self, Directory},
ingress::{Message, Oracle},
Expand Down Expand Up @@ -79,6 +81,7 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: Signer> Actor<E, C> {
max_sets: cfg.tracked_peer_sets,
dial_fail_limit: cfg.dial_fail_limit,
rate_limit: cfg.allowed_connection_rate_per_peer,
block_duration: cfg.block_duration,
};

// Create the mailboxes
Expand Down Expand Up @@ -319,6 +322,7 @@ mod tests {
peer_gossip_max_count: 5,
max_peer_set_size: 128,
dial_fail_limit: 1,
block_duration: Duration::from_secs(60),
}
}

Expand Down Expand Up @@ -795,21 +799,30 @@ mod tests {
} = setup_actor(context.clone(), cfg_initial);

// None listenable because not registered
assert!(!mailbox.acceptable(peer_pk.clone()).await);
assert!(!mailbox.acceptable(peer_pk2.clone()).await);
assert!(!mailbox.acceptable(peer_pk3.clone()).await);
assert_eq!(
mailbox.acceptable(peer_pk.clone()).await,
Acceptable::Unknown
);
assert_eq!(
mailbox.acceptable(peer_pk2.clone()).await,
Acceptable::Unknown
);
assert_eq!(
mailbox.acceptable(peer_pk3.clone()).await,
Acceptable::Unknown
);

oracle
.update(0, [peer_pk.clone(), peer_pk2.clone()].try_into().unwrap())
.await;
context.sleep(Duration::from_millis(10)).await;

// Not listenable because self
assert!(!mailbox.acceptable(peer_pk).await);
assert_eq!(mailbox.acceptable(peer_pk).await, Acceptable::Unknown);
// Listenable because registered
assert!(mailbox.acceptable(peer_pk2).await);
assert_eq!(mailbox.acceptable(peer_pk2).await, Acceptable::Yes);
// Not listenable because not registered
assert!(!mailbox.acceptable(peer_pk3).await);
assert_eq!(mailbox.acceptable(peer_pk3).await, Acceptable::Unknown);
});
}

Expand All @@ -834,12 +847,15 @@ mod tests {
.await;
context.sleep(Duration::from_millis(10)).await; // Allow register to process

assert!(mailbox.acceptable(peer_pk.clone()).await);
assert_eq!(mailbox.acceptable(peer_pk.clone()).await, Acceptable::Yes);

let reservation = mailbox.listen(peer_pk.clone()).await;
assert!(reservation.is_some());

assert!(!mailbox.acceptable(peer_pk.clone()).await);
assert_eq!(
mailbox.acceptable(peer_pk.clone()).await,
Acceptable::Unknown
);

let failed_reservation = mailbox.listen(peer_pk.clone()).await;
assert!(failed_reservation.is_none());
Expand Down
59 changes: 46 additions & 13 deletions p2p/src/authenticated/discovery/actors/tracker/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use rand::{seq::IteratorRandom, Rng};
use std::{
collections::{BTreeMap, HashMap},
ops::Deref,
time::Duration,
};
use tracing::{debug, warn};

Expand All @@ -37,6 +38,9 @@ pub struct Config {

/// The rate limit for allowing reservations per-peer.
pub rate_limit: Quota,

/// Duration after which blocked peers are automatically unblocked.
pub block_duration: Duration,
}

/// Represents a collection of records for all peers.
Expand All @@ -57,6 +61,9 @@ pub struct Directory<E: Rng + Clock + RuntimeMetrics, C: PublicKey> {
/// peers for its peer info again.
dial_fail_limit: usize,

/// Duration after which blocked peers are automatically unblocked.
block_duration: Duration,

// ---------- State ----------
/// The records of all peers.
peers: HashMap<C, Record<C>>,
Expand Down Expand Up @@ -107,6 +114,7 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
allow_dns: cfg.allow_dns,
max_sets: cfg.max_sets,
dial_fail_limit: cfg.dial_fail_limit,
block_duration: cfg.block_duration,
peers,
sets: BTreeMap::new(),
rate_limiter,
Expand All @@ -131,7 +139,8 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
record.dial_failure(ingress);
}

let want = record.want(self.dial_fail_limit);
let now = self.context.current();
let want = record.want(now, self.dial_fail_limit);
for set in self.sets.values_mut() {
set.update(peer, !want);
}
Expand All @@ -152,14 +161,16 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
record.connect();

// We may have to update the sets.
let want = record.want(self.dial_fail_limit);
let now = self.context.current();
let want = record.want(now, self.dial_fail_limit);
for set in self.sets.values_mut() {
set.update(peer, !want);
}
}

/// Using a list of (already-validated) peer information, update the records.
pub fn update_peers(&mut self, infos: Vec<types::Info<C>>) {
let now = self.context.current();
for info in infos {
// Update peer address
//
Expand All @@ -170,7 +181,7 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
let Some(record) = self.peers.get_mut(&peer) else {
continue;
};
if !record.update(info) {
if !record.update(now, info) {
continue;
}
self.metrics
Expand All @@ -179,7 +190,7 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
.inc();

// We may have to update the sets.
let want = record.want(self.dial_fail_limit);
let want = record.want(now, self.dial_fail_limit);
for set in self.sets.values_mut() {
set.update(&peer, !want);
}
Expand All @@ -204,14 +215,15 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
}

// Create and store new peer set
let now = self.context.current();
let mut set = Set::new(peers.clone());
for peer in peers.iter() {
let record = self.peers.entry(peer.clone()).or_insert_with(|| {
self.metrics.tracked.inc();
Record::unknown()
});
record.increment();
set.update(peer, !record.want(self.dial_fail_limit));
set.update(peer, !record.want(now, self.dial_fail_limit));
}
self.sets.insert(index, set);

Expand Down Expand Up @@ -268,7 +280,12 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {

/// Attempt to block a peer, updating the metrics accordingly.
pub fn block(&mut self, peer: &C) {
if self.peers.get_mut(peer).is_some_and(|r| r.block()) {
let blocked_until = self.context.current() + self.block_duration;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saturating add

if self
.peers
.get_mut(peer)
.is_some_and(|r| r.block(blocked_until))
{
self.metrics.blocked.inc();
}
}
Expand Down Expand Up @@ -334,25 +351,39 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
///
/// A peer is eligible if it is in a peer set (or is persistent), not blocked, and not ourselves.
pub fn eligible(&self, peer: &C) -> bool {
self.peers.get(peer).is_some_and(|r| r.eligible())
let now = self.context.current();
self.peers.get(peer).is_some_and(|r| r.eligible(now))
}

/// Returns a vector of dialable peers. That is, unconnected peers for which we have an ingress.
pub fn dialable(&self) -> Vec<C> {
// Collect peers with known addresses
let now = self.context.current();
let mut result: Vec<_> = self
.peers
.iter()
.filter(|&(_, r)| r.dialable(self.allow_private_ips, self.allow_dns))
.filter(|&(_, r)| r.dialable(now, self.allow_private_ips, self.allow_dns))
.map(|(peer, _)| peer.clone())
.collect();
result.sort();
result
}

/// Returns true if this peer is acceptable (can accept an incoming connection from them).
pub fn acceptable(&self, peer: &C) -> bool {
self.peers.get(peer).is_some_and(|r| r.acceptable())
/// Returns the acceptance status for a peer.
pub fn acceptable(&self, peer: &C) -> super::ingress::Acceptable {
use super::ingress::Acceptable;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove inline import

let now = self.context.current();
let Some(record) = self.peers.get(peer) else {
return Acceptable::Unknown;
};
if record.blocked(now) {
return Acceptable::Blocked;
}
if record.acceptable(now) {
Acceptable::Yes
} else {
Acceptable::Unknown
}
}

// --------- Helpers ----------
Expand Down Expand Up @@ -384,7 +415,8 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
}

// Reserve
if record.reserve() {
let now = self.context.current();
if record.reserve(now) {
self.metrics.reserved.inc();
return Some(Reservation::new(metadata, self.releaser.clone()));
}
Expand All @@ -401,7 +433,8 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
if !record.deletable() {
return false;
}
if record.blocked() {
let now = self.context.current();
if record.blocked(now) {
self.metrics.blocked.dec();
}
self.peers.remove(peer);
Expand Down
17 changes: 14 additions & 3 deletions p2p/src/authenticated/discovery/actors/tracker/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ use commonware_cryptography::PublicKey;
use commonware_utils::ordered::Set;
use futures::channel::{mpsc, oneshot};

/// Result of checking if a peer is acceptable for connection.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Acceptable {
/// Peer is acceptable for connection.
Yes,
/// Peer is blocked.
Blocked,
/// Peer is unknown (not in any peer set).
Unknown,
}

/// Messages that can be sent to the tracker actor.
#[derive(Debug)]
pub enum Message<C: PublicKey> {
Expand Down Expand Up @@ -110,8 +121,8 @@ pub enum Message<C: PublicKey> {
/// The public key of the peer to check.
public_key: C,

/// The sender to respond with whether the peer is acceptable.
responder: oneshot::Sender<bool>,
/// The sender to respond with the acceptance status.
responder: oneshot::Sender<Acceptable>,
},

/// Request a reservation for a particular peer.
Expand Down Expand Up @@ -185,7 +196,7 @@ impl<C: PublicKey> UnboundedMailbox<Message<C>> {
}

/// Send an `Acceptable` message to the tracker.
pub async fn acceptable(&mut self, public_key: C) -> bool {
pub async fn acceptable(&mut self, public_key: C) -> Acceptable {
let (tx, rx) = oneshot::channel();
self.send(Message::Acceptable {
public_key,
Expand Down
3 changes: 2 additions & 1 deletion p2p/src/authenticated/discovery/actors/tracker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod reservation;
mod set;

pub use actor::Actor;
pub use ingress::{Message, Oracle};
pub use ingress::{Acceptable, Message, Oracle};
pub use metadata::Metadata;
pub use reservation::Reservation;

Expand All @@ -31,6 +31,7 @@ pub struct Config<C: Signer> {
pub tracked_peer_sets: usize,
pub max_peer_set_size: u64,
pub allowed_connection_rate_per_peer: Quota,
pub block_duration: Duration,
pub peer_gossip_max_count: usize,
pub dial_fail_limit: usize,
}
Loading
Loading