Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions p2p/src/authenticated/discovery/actors/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::authenticated::{
discovery::actors::{spawner, tracker},
mailbox::UnboundedMailbox,
Mailbox,
Attempt, Mailbox,
};
use commonware_cryptography::Signer;
use commonware_macros::select_loop;
Expand Down Expand Up @@ -105,7 +105,11 @@ impl<E: Spawner + Clock + Network + CryptoRngCore + Metrics, C: Signer> Actor<E,
) {
let (peer, send, recv) = match listen(
context,
|peer| tracker.acceptable(peer),
|peer| async {
// Check if peer is acceptable and convert to bool for the stream crate.
// Metrics and detailed logging are handled inside tracker.acceptable().
tracker.acceptable(peer).await == Attempt::Ok
},
stream_cfg,
stream,
sink,
Expand Down Expand Up @@ -300,7 +304,7 @@ mod tests {
while let Some(message) = tracker_rx.next().await {
match message {
tracker::Message::Acceptable { responder, .. } => {
let _ = responder.send(true);
let _ = responder.send(Attempt::Ok);
}
tracker::Message::Listen { reservation, .. } => {
let _ = reservation.send(None);
Expand Down Expand Up @@ -443,7 +447,7 @@ mod tests {
while let Some(message) = tracker_rx.next().await {
match message {
tracker::Message::Acceptable { responder, .. } => {
let _ = responder.send(true);
let _ = responder.send(Attempt::Ok);
}
tracker::Message::Listen { reservation, .. } => {
let _ = reservation.send(None);
Expand Down
32 changes: 19 additions & 13 deletions p2p/src/authenticated/discovery/actors/tracker/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ mod tests {
config::Bootstrapper,
types,
},
Mailbox,
Attempt, Mailbox,
},
Blocker, Ingress, Manager,
};
Expand Down Expand Up @@ -799,22 +799,28 @@ mod tests {
..
} = setup_actor(context.clone(), cfg_initial);

// None listenable because not registered
assert!(!mailbox.acceptable(peer_pk.clone()).await);
assert!(!mailbox.acceptable(peer_pk2.clone()).await);
assert!(!mailbox.acceptable(peer_pk3.clone()).await);
// peer_pk is ourselves, others not registered
assert_eq!(mailbox.acceptable(peer_pk.clone()).await, Attempt::Myself);
assert_eq!(
mailbox.acceptable(peer_pk2.clone()).await,
Attempt::Unregistered
);
assert_eq!(
mailbox.acceptable(peer_pk3.clone()).await,
Attempt::Unregistered
);

oracle
.update(0, [peer_pk.clone(), peer_pk2.clone()].try_into().unwrap())
.await;
context.sleep(Duration::from_millis(10)).await;

// Not listenable because self
assert!(!mailbox.acceptable(peer_pk).await);
// Listenable because registered
assert!(mailbox.acceptable(peer_pk2).await);
// Not listenable because not registered
assert!(!mailbox.acceptable(peer_pk3).await);
// Not acceptable because self
assert_eq!(mailbox.acceptable(peer_pk).await, Attempt::Myself);
// Acceptable because registered
assert_eq!(mailbox.acceptable(peer_pk2).await, Attempt::Ok);
// Not acceptable because not registered
assert_eq!(mailbox.acceptable(peer_pk3).await, Attempt::Unregistered);
});
}

Expand All @@ -839,12 +845,12 @@ mod tests {
.await;
context.sleep(Duration::from_millis(10)).await; // Allow register to process

assert!(mailbox.acceptable(peer_pk.clone()).await);
assert_eq!(mailbox.acceptable(peer_pk.clone()).await, Attempt::Ok);

let reservation = mailbox.listen(peer_pk.clone()).await;
assert!(reservation.is_some());

assert!(!mailbox.acceptable(peer_pk.clone()).await);
assert_eq!(mailbox.acceptable(peer_pk.clone()).await, Attempt::Reserved);

let failed_reservation = mailbox.listen(peer_pk.clone()).await;
assert!(failed_reservation.is_none());
Expand Down
60 changes: 48 additions & 12 deletions p2p/src/authenticated/discovery/actors/tracker/directory.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use super::{metrics::Metrics, record::Record, set::Set, Metadata, Reservation};
use crate::{
authenticated::discovery::{
actors::tracker::ingress::Releaser,
metrics,
types::{self, Info},
authenticated::{
discovery::{
actors::tracker::ingress::Releaser,
metrics,
types::{self, Info},
},
Attempt,
},
Ingress,
};
Expand Down Expand Up @@ -384,9 +387,39 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
result
}

/// Returns true if this peer is acceptable (can accept an incoming connection from them).
pub fn acceptable(&self, peer: &C) -> bool {
!self.blocked.contains(peer) && self.peers.get(peer).is_some_and(|r| r.acceptable())
/// Returns the acceptance status for this peer (can accept an incoming connection from them).
///
/// Checks blocked status first, then delegates to record for eligibility and connection status.
/// Increments metrics for rejections.
pub fn acceptable(&self, peer: &C) -> Attempt {
if self.blocked.contains(peer) {
self.metrics.rejected_blocked.inc();
debug!(?peer, "peer rejected: blocked");
return Attempt::Blocked;
}
let result = self
.peers
.get(peer)
.map(|r| r.acceptable())
.unwrap_or(Attempt::Unregistered);
match result {
Attempt::Ok => {}
Attempt::Blocked => unreachable!("record does not return Blocked"),
Attempt::Unregistered => {
self.metrics.rejected_unregistered.inc();
debug!(?peer, "peer rejected: unregistered or ineligible");
}
Attempt::Reserved => {
self.metrics.rejected_reserved.inc();
debug!(?peer, "peer rejected: already connected");
}
Attempt::Mismatch => unreachable!("discovery record does not return Mismatch"),
Attempt::Myself => {
self.metrics.rejected_myself.inc();
debug!(?peer, "peer rejected: is ourselves");
}
}
result
}

/// Unblock all peers whose block has expired and update the knowledge bitmap.
Expand Down Expand Up @@ -485,7 +518,7 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
#[cfg(test)]
mod tests {
use super::*;
use crate::authenticated::{discovery::types, mailbox::UnboundedMailbox};
use crate::authenticated::{discovery::types, mailbox::UnboundedMailbox, Attempt};
use commonware_cryptography::{secp256r1::standard::PrivateKey, Signer};
use commonware_runtime::{deterministic, Clock, Runner};
use commonware_utils::{bitmap::BitMap, NZU32};
Expand Down Expand Up @@ -1032,17 +1065,19 @@ mod tests {
directory.update_peers(vec![peer_info]);

// Peer should be acceptable before blocking
assert!(
assert_eq!(
directory.acceptable(&peer_pk),
Attempt::Ok,
"Peer should be acceptable before blocking"
);

// Block the peer
directory.block(&peer_pk);

// Peer should NOT be acceptable while blocked
assert!(
!directory.acceptable(&peer_pk),
assert_eq!(
directory.acceptable(&peer_pk),
Attempt::Blocked,
"Blocked peer should not be acceptable"
);

Expand All @@ -1051,8 +1086,9 @@ mod tests {
directory.unblock_expired();

// Peer should be acceptable again after unblock
assert!(
assert_eq!(
directory.acceptable(&peer_pk),
Attempt::Ok,
"Peer should be acceptable after unblock"
);
});
Expand Down
8 changes: 4 additions & 4 deletions p2p/src/authenticated/discovery/actors/tracker/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::authenticated::{
types,
},
mailbox::UnboundedMailbox,
Mailbox,
Attempt, Mailbox,
};
use commonware_cryptography::PublicKey;
use commonware_utils::ordered::Set;
Expand Down Expand Up @@ -110,8 +110,8 @@ pub enum Message<C: PublicKey> {
/// The public key of the peer to check.
public_key: C,

/// The sender to respond with whether the peer is acceptable.
responder: oneshot::Sender<bool>,
/// The sender to respond with the acceptance result.
responder: oneshot::Sender<Attempt>,
},

/// Request a reservation for a particular peer.
Expand Down Expand Up @@ -185,7 +185,7 @@ impl<C: PublicKey> UnboundedMailbox<Message<C>> {
}

/// Send an `Acceptable` message to the tracker.
pub async fn acceptable(&mut self, public_key: C) -> bool {
pub async fn acceptable(&mut self, public_key: C) -> Attempt {
let (tx, rx) = oneshot::channel();
self.send(Message::Acceptable {
public_key,
Expand Down
32 changes: 32 additions & 0 deletions p2p/src/authenticated/discovery/actors/tracker/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ pub struct Metrics {

/// A count of the number of updates for each peer.
pub updates: Family<metrics::Peer, Counter>,

/// Number of times a peer was rejected because they were blocked.
pub rejected_blocked: Counter,

/// Number of times a peer was rejected because they were unregistered.
pub rejected_unregistered: Counter,

/// Number of times a peer was rejected because they were already connected.
pub rejected_reserved: Counter,

/// Number of times a peer was rejected because they are ourselves.
pub rejected_myself: Counter,
}

impl Metrics {
Expand Down Expand Up @@ -52,6 +64,26 @@ impl Metrics {
"Count of the number of updates for each peer",
metrics.updates.clone(),
);
context.register(
"rejected_blocked",
"Number of times a peer was rejected because they were blocked",
metrics.rejected_blocked.clone(),
);
context.register(
"rejected_unregistered",
"Number of times a peer was rejected because they were unregistered",
metrics.rejected_unregistered.clone(),
);
context.register(
"rejected_reserved",
"Number of times a peer was rejected because they were already connected",
metrics.rejected_reserved.clone(),
);
context.register(
"rejected_myself",
"Number of times a peer was rejected because they are ourselves",
metrics.rejected_myself.clone(),
);
metrics
}
}
24 changes: 17 additions & 7 deletions p2p/src/authenticated/discovery/actors/tracker/record.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{authenticated::discovery::types::Info, Ingress};
use crate::{
authenticated::{discovery::types::Info, Attempt},
Ingress,
};
use commonware_cryptography::PublicKey;
use tracing::trace;

Expand Down Expand Up @@ -218,13 +221,20 @@ impl<C: PublicKey> Record<C> {
ingress.is_valid(allow_private_ips, allow_dns)
}

/// Returns `true` if this peer is acceptable (can accept an incoming connection from them).
/// Returns the acceptance status for this peer (can accept an incoming connection from them).
///
/// A peer is acceptable if:
/// - The peer is eligible (in a peer set, not ourselves)
/// - We are not already connected or reserved
pub fn acceptable(&self) -> bool {
self.eligible() && self.status == Status::Inert
/// Checks for self, eligibility (peer set membership), and connection status.
pub const fn acceptable(&self) -> Attempt {
if matches!(self.address, Address::Myself(_)) {
return Attempt::Myself;
}
if !self.eligible() {
return Attempt::Unregistered;
}
if self.reserved() {
return Attempt::Reserved;
}
Attempt::Ok
}

/// Return the ingress address of the peer, if known.
Expand Down
Loading
Loading