Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions examples/bridge/src/bin/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,14 @@ fn main() {
let (peer, mut sender, mut receiver) = match listen(
context.with_label("listener"),
|peer| {
let out = validators.position(&peer).is_some();
async move { out }
let valid = validators.position(&peer).is_some();
async move {
if valid {
Ok(())
} else {
Err(())
}
}
},
config.clone(),
stream,
Expand Down
32 changes: 28 additions & 4 deletions p2p/src/authenticated/discovery/actors/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use commonware_runtime::{
spawn_cell, Clock, ContextCell, Handle, KeyedRateLimiter, Listener, Metrics, Network, Quota,
SinkOf, Spawner, StreamOf,
};
use commonware_stream::{listen, Config as StreamConfig};
use commonware_stream::{listen, Config as StreamConfig, Error as StreamError};
use commonware_utils::{concurrency::Limiter, net::SubnetMask, IpAddrExt};
use prometheus_client::metrics::counter::Counter;
use rand::{CryptoRng, Rng};
Expand Down Expand Up @@ -105,14 +105,38 @@ impl<E: Spawner + Clock + Network + Rng + CryptoRng + Metrics, C: Signer> Actor<
) {
let (peer, send, recv) = match listen(
context,
|peer| tracker.acceptable(peer),
|peer| {
let fut = tracker.acceptable(peer);
async move {
let status = fut.await;
if status == tracker::Acceptable::Yes {
Ok(())
} else {
Err(status)
}
}
},
stream_cfg,
stream,
sink,
)
.await
{
Ok(x) => x,
Err(StreamError::PeerRejected(reason)) => {
match reason {
tracker::Acceptable::Blocked => {
debug!(?address, "peer is blocked");
}
tracker::Acceptable::Unknown => {
debug!(?address, "peer unknown (not in peer set)");
}
tracker::Acceptable::Rejected | tracker::Acceptable::Yes => {
debug!(?address, "peer rejected");
}
}
return;
}
Err(err) => {
debug!(?err, "failed to complete handshake");
return;
Expand Down Expand Up @@ -300,7 +324,7 @@ mod tests {
while let Some(message) = tracker_rx.next().await {
match message {
tracker::Message::Acceptable { responder, .. } => {
let _ = responder.send(true);
let _ = responder.send(tracker::Acceptable::Yes);
}
tracker::Message::Listen { reservation, .. } => {
let _ = reservation.send(None);
Expand Down Expand Up @@ -443,7 +467,7 @@ mod tests {
while let Some(message) = tracker_rx.next().await {
match message {
tracker::Message::Acceptable { responder, .. } => {
let _ = responder.send(true);
let _ = responder.send(tracker::Acceptable::Yes);
}
tracker::Message::Listen { reservation, .. } => {
let _ = reservation.send(None);
Expand Down
6 changes: 3 additions & 3 deletions p2p/src/authenticated/discovery/actors/peer/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ mod tests {
move |ctx| async move {
commonware_stream::listen(
ctx,
|_| async { true },
|_| async { Ok::<(), ()>(()) },
remote_config,
remote_stream,
remote_sink,
Expand Down Expand Up @@ -526,7 +526,7 @@ mod tests {
move |ctx| async move {
commonware_stream::listen(
ctx,
|_| async { true },
|_| async { Ok::<(), ()>(()) },
remote_config,
remote_stream,
remote_sink,
Expand Down Expand Up @@ -631,7 +631,7 @@ mod tests {
move |ctx| async move {
commonware_stream::listen(
ctx,
|_| async { true },
|_| async { Ok::<(), ()>(()) },
remote_config,
remote_stream,
remote_sink,
Expand Down
35 changes: 26 additions & 9 deletions p2p/src/authenticated/discovery/actors/tracker/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: Signer> Actor<E, C> {
max_sets: cfg.tracked_peer_sets,
dial_fail_limit: cfg.dial_fail_limit,
rate_limit: cfg.allowed_connection_rate_per_peer,
block_duration: cfg.block_duration,
};

// Create the mailboxes
Expand Down Expand Up @@ -278,7 +279,10 @@ mod tests {
use crate::{
authenticated::{
discovery::{
actors::{peer, tracker},
actors::{
peer,
tracker::{self, Acceptable},
},
config::Bootstrapper,
types,
},
Expand Down Expand Up @@ -319,6 +323,7 @@ mod tests {
peer_gossip_max_count: 5,
max_peer_set_size: 128,
dial_fail_limit: 1,
block_duration: Duration::from_secs(60),
}
}

Expand Down Expand Up @@ -795,21 +800,30 @@ mod tests {
} = setup_actor(context.clone(), cfg_initial);

// None listenable because not registered
assert!(!mailbox.acceptable(peer_pk.clone()).await);
assert!(!mailbox.acceptable(peer_pk2.clone()).await);
assert!(!mailbox.acceptable(peer_pk3.clone()).await);
assert_eq!(
mailbox.acceptable(peer_pk.clone()).await,
Acceptable::Unknown
);
assert_eq!(
mailbox.acceptable(peer_pk2.clone()).await,
Acceptable::Unknown
);
assert_eq!(
mailbox.acceptable(peer_pk3.clone()).await,
Acceptable::Unknown
);

oracle
.update(0, [peer_pk.clone(), peer_pk2.clone()].try_into().unwrap())
.await;
context.sleep(Duration::from_millis(10)).await;

// Not listenable because self
assert!(!mailbox.acceptable(peer_pk).await);
assert_eq!(mailbox.acceptable(peer_pk).await, Acceptable::Rejected);
// Listenable because registered
assert!(mailbox.acceptable(peer_pk2).await);
assert_eq!(mailbox.acceptable(peer_pk2).await, Acceptable::Yes);
// Not listenable because not registered
assert!(!mailbox.acceptable(peer_pk3).await);
assert_eq!(mailbox.acceptable(peer_pk3).await, Acceptable::Unknown);
});
}

Expand All @@ -834,12 +848,15 @@ mod tests {
.await;
context.sleep(Duration::from_millis(10)).await; // Allow register to process

assert!(mailbox.acceptable(peer_pk.clone()).await);
assert_eq!(mailbox.acceptable(peer_pk.clone()).await, Acceptable::Yes);

let reservation = mailbox.listen(peer_pk.clone()).await;
assert!(reservation.is_some());

assert!(!mailbox.acceptable(peer_pk.clone()).await);
assert_eq!(
mailbox.acceptable(peer_pk.clone()).await,
Acceptable::Rejected
);

let failed_reservation = mailbox.listen(peer_pk.clone()).await;
assert!(failed_reservation.is_none());
Expand Down
58 changes: 44 additions & 14 deletions p2p/src/authenticated/discovery/actors/tracker/directory.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::{metrics::Metrics, record::Record, set::Set, Metadata, Reservation};
use super::{
ingress::Acceptable, metrics::Metrics, record::Record, set::Set, Metadata, Reservation,
};
use crate::{
authenticated::discovery::{
actors::tracker::ingress::Releaser,
Expand All @@ -17,6 +19,7 @@ use rand::{seq::IteratorRandom, Rng};
use std::{
collections::{BTreeMap, HashMap},
ops::Deref,
time::Duration,
};
use tracing::{debug, warn};

Expand All @@ -37,6 +40,9 @@ pub struct Config {

/// The rate limit for allowing reservations per-peer.
pub rate_limit: Quota,

/// Duration after which blocked peers are automatically unblocked.
pub block_duration: Duration,
}

/// Represents a collection of records for all peers.
Expand All @@ -57,6 +63,9 @@ pub struct Directory<E: Rng + Clock + RuntimeMetrics, C: PublicKey> {
/// peers for its peer info again.
dial_fail_limit: usize,

/// Duration after which blocked peers are automatically unblocked.
block_duration: Duration,

// ---------- State ----------
/// The records of all peers.
peers: HashMap<C, Record<C>>,
Expand Down Expand Up @@ -107,6 +116,7 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
allow_dns: cfg.allow_dns,
max_sets: cfg.max_sets,
dial_fail_limit: cfg.dial_fail_limit,
block_duration: cfg.block_duration,
peers,
sets: BTreeMap::new(),
rate_limiter,
Expand All @@ -131,7 +141,8 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
record.dial_failure(ingress);
}

let want = record.want(self.dial_fail_limit);
let now = self.context.current();
let want = record.want(now, self.dial_fail_limit);
for set in self.sets.values_mut() {
set.update(peer, !want);
}
Expand All @@ -152,14 +163,16 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
record.connect();

// We may have to update the sets.
let want = record.want(self.dial_fail_limit);
let now = self.context.current();
let want = record.want(now, self.dial_fail_limit);
for set in self.sets.values_mut() {
set.update(peer, !want);
}
}

/// Using a list of (already-validated) peer information, update the records.
pub fn update_peers(&mut self, infos: Vec<types::Info<C>>) {
let now = self.context.current();
for info in infos {
// Update peer address
//
Expand All @@ -170,7 +183,7 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
let Some(record) = self.peers.get_mut(&peer) else {
continue;
};
if !record.update(info) {
if !record.update(now, info) {
continue;
}
self.metrics
Expand All @@ -179,7 +192,7 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
.inc();

// We may have to update the sets.
let want = record.want(self.dial_fail_limit);
let want = record.want(now, self.dial_fail_limit);
for set in self.sets.values_mut() {
set.update(&peer, !want);
}
Expand All @@ -204,14 +217,15 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
}

// Create and store new peer set
let now = self.context.current();
let mut set = Set::new(peers.clone());
for peer in peers.iter() {
let record = self.peers.entry(peer.clone()).or_insert_with(|| {
self.metrics.tracked.inc();
Record::unknown()
});
record.increment();
set.update(peer, !record.want(self.dial_fail_limit));
set.update(peer, !record.want(now, self.dial_fail_limit));
}
self.sets.insert(index, set);

Expand Down Expand Up @@ -271,7 +285,15 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {

/// Attempt to block a peer, updating the metrics accordingly.
pub fn block(&mut self, peer: &C) {
if self.peers.get_mut(peer).is_some_and(|r| r.block()) {
let now = self.context.current();
let blocked_until = now
.checked_add(self.block_duration)
.unwrap_or(now + Duration::from_secs(365 * 24 * 60 * 60 * 100)); // ~100 years
if self
.peers
.get_mut(peer)
.is_some_and(|r| r.block(blocked_until))
{
self.metrics.blocked.inc();
}
}
Expand Down Expand Up @@ -337,25 +359,31 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
///
/// A peer is eligible if it is in a peer set (or is persistent), not blocked, and not ourselves.
pub fn eligible(&self, peer: &C) -> bool {
self.peers.get(peer).is_some_and(|r| r.eligible())
let now = self.context.current();
self.peers.get(peer).is_some_and(|r| r.eligible(now))
}

/// Returns a vector of dialable peers. That is, unconnected peers for which we have an ingress.
pub fn dialable(&self) -> Vec<C> {
// Collect peers with known addresses
let now = self.context.current();
let mut result: Vec<_> = self
.peers
.iter()
.filter(|&(_, r)| r.dialable(self.allow_private_ips, self.allow_dns))
.filter(|&(_, r)| r.dialable(now, self.allow_private_ips, self.allow_dns))
.map(|(peer, _)| peer.clone())
.collect();
result.sort();
result
}

/// Returns true if this peer is acceptable (can accept an incoming connection from them).
pub fn acceptable(&self, peer: &C) -> bool {
self.peers.get(peer).is_some_and(|r| r.acceptable())
/// Returns the acceptance status for a peer.
pub fn acceptable(&self, peer: &C) -> Acceptable {
let now = self.context.current();
let Some(record) = self.peers.get(peer) else {
return Acceptable::Unknown;
};
record.acceptable(now)
}

// --------- Helpers ----------
Expand Down Expand Up @@ -387,7 +415,8 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
}

// Reserve
if record.reserve() {
let now = self.context.current();
if record.reserve(now) {
self.metrics.reserved.inc();
return Some(Reservation::new(metadata, self.releaser.clone()));
}
Expand All @@ -404,7 +433,8 @@ impl<E: Spawner + Rng + Clock + RuntimeMetrics, C: PublicKey> Directory<E, C> {
if !record.deletable() {
return false;
}
if record.blocked() {
let now = self.context.current();
if record.blocked(now) {
self.metrics.blocked.dec();
}
self.peers.remove(peer);
Expand Down
Loading
Loading