Skip to content

Commit

Permalink
feat(s2n-quic-dc): Replace requested_handshakes map with immediate re…
Browse files Browse the repository at this point in the history
…quest (#2469)

This avoids issues with bounding the map, and allows us to schedule
handshakes immediately. We avoid a building herd of re-requested
handshakes by rescheduling a handshake by handshake period just after
our attempt to handshake.
  • Loading branch information
Mark-Simulacrum authored Feb 6, 2025
1 parent 21464fc commit a5d8422
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 65 deletions.
9 changes: 2 additions & 7 deletions dc/s2n-quic-dc/src/path/secret/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,8 @@ impl Map {
self.store.contains(peer)
}

/// Check whether we would like to (re-)handshake with this peer.
///
/// Note that this is distinct from `contains`, we may already have *some* credentials for a
/// peer but still be interested in handshaking (e.g., due to periodic refresh of the
/// credentials).
pub fn needs_handshake(&self, peer: &SocketAddr) -> bool {
self.store.needs_handshake(peer)
pub fn register_request_handshake(&self, cb: Box<dyn Fn(SocketAddr) + Send + Sync>) {
self.store.register_request_handshake(cb);
}

/// Gets the [`Peer`] entry for the given address
Expand Down
27 changes: 4 additions & 23 deletions dc/s2n-quic-dc/src/path/secret/map/cleaner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ impl Cleaner {
let mut address_entries_initial = 0usize;
let mut address_entries_retired = 0usize;
let mut address_entries_active = 0usize;
let mut handshake_requests = 0usize;

// For non-retired entries, if it's time for them to handshake again, request a
// handshake to happen. This handshake will currently happen on the next request for this
Expand All @@ -115,7 +116,9 @@ impl Cleaner {
current_epoch.saturating_sub(retired_at) < eviction_cycles
} else {
if entry.rehandshake_time() <= now {
handshake_requests += 1;
state.request_handshake(*entry.peer());
entry.rehandshake_time_reschedule(state.rehandshake_period());
}

// always retain
Expand Down Expand Up @@ -152,28 +155,6 @@ impl Cleaner {
retained
});

// Iteration order should be effectively random, so this effectively just prunes the list
// periodically. 5000 is chosen arbitrarily to make sure this isn't a memory leak. Note
// that peers the application is actively interested in will typically bypass this list, so
// this is mostly a risk of delaying regular re-handshaking with very large cardinalities.
//
// FIXME: Long or mid-term it likely makes sense to replace this data structure with a
// fuzzy set of some kind and/or just moving to immediate background handshake attempts.
const MAX_REQUESTED_HANDSHAKES: usize = 5000;

let mut handshake_requests = 0usize;
let mut handshake_requests_retired = 0usize;
state.requested_handshakes.pin().retain(|_| {
handshake_requests += 1;
let retain = handshake_requests < MAX_REQUESTED_HANDSHAKES;

if !retain {
handshake_requests_retired += 1;
}

retain
});

let id_entries = id_entries_initial - id_entries_retired;
let address_entries = address_entries_initial - address_entries_retired;

Expand All @@ -192,7 +173,7 @@ impl Cleaner {
address_entries_initial_utilization: utilization(address_entries_initial),
address_entries_retired,
handshake_requests,
handshake_requests_retired,
handshake_requests_retired: 0,
},
);
}
Expand Down
37 changes: 28 additions & 9 deletions dc/s2n-quic-dc/src/path/secret/map/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use s2n_quic_core::{dc, varint::VarInt};
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicU8, Ordering},
atomic::{AtomicU32, AtomicU8, Ordering},
Arc,
},
time::{Duration, Instant},
Expand All @@ -34,7 +34,7 @@ mod tests;
#[derive(Debug)]
pub(super) struct Entry {
creation_time: Instant,
rehandshake_delta_secs: u32,
rehandshake_delta_secs: AtomicU32,
peer: SocketAddr,
secret: schedule::Secret,
retired: IsRetired,
Expand Down Expand Up @@ -72,6 +72,7 @@ impl SizeOf for Entry {
}

impl SizeOf for AtomicU8 {}
impl SizeOf for AtomicU32 {}

impl Entry {
pub fn new(
Expand All @@ -88,20 +89,19 @@ impl Entry {
.fetch_min(crate::stream::MAX_DATAGRAM_SIZE as _, Ordering::Relaxed);

assert!(rehandshake_time.as_secs() <= u32::MAX as u64);
Self {
let entry = Self {
creation_time: Instant::now(),
// Schedule another handshake sometime in [5 minutes, rehandshake_time] from now.
rehandshake_delta_secs: rand::thread_rng().gen_range(
std::cmp::min(rehandshake_time.as_secs(), 360)..rehandshake_time.as_secs(),
) as u32,
rehandshake_delta_secs: AtomicU32::new(0),
peer,
secret,
retired: Default::default(),
sender,
receiver,
parameters,
accessed: AtomicU8::new(0),
}
};
entry.rehandshake_time_reschedule(rehandshake_time);
entry
}

#[cfg(any(test, feature = "testing"))]
Expand Down Expand Up @@ -246,7 +246,26 @@ impl Entry {
}

pub fn rehandshake_time(&self) -> Instant {
self.creation_time + Duration::from_secs(u64::from(self.rehandshake_delta_secs))
self.creation_time
+ Duration::from_secs(u64::from(
self.rehandshake_delta_secs.load(Ordering::Relaxed),
))
}

/// Reschedule the handshake some time into the future.
pub fn rehandshake_time_reschedule(&self, rehandshake_period: Duration) {
// The goal of rescheduling is to avoid continuously re-handshaking for N (possibly stale)
// peers every cleaner loop, instead we defer handshakes out again. This effectively acts
// as a (slow) retry mechanism.
let delta = rand::thread_rng().gen_range(
std::cmp::min(rehandshake_period.as_secs(), 360)..rehandshake_period.as_secs(),
) as u32;
// This can't practically overflow -- each time we add we push out the next add by at least
// that much time. The fastest this loops is then running once every 360 seconds and adding
// 360 each time. That takes (2**32/360)*360 to fill u32, which happens after 136 years of
// continuous execution.
self.rehandshake_delta_secs
.fetch_add(delta, Ordering::Relaxed);
}

pub fn age(&self) -> Duration {
Expand Down
60 changes: 36 additions & 24 deletions dc/s2n-quic-dc/src/path/secret/map/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use s2n_quic_core::{
use std::{
hash::{BuildHasherDefault, Hasher},
net::{Ipv4Addr, SocketAddr},
sync::{Arc, Mutex, Weak},
sync::{Arc, Mutex, RwLock, Weak},
time::Duration,
};

Expand Down Expand Up @@ -64,11 +64,6 @@ where
// needed.
pub(super) peers: fixed_map::Map<SocketAddr, Arc<Entry>>,

// Stores the set of SocketAddr for which we received a UnknownPathSecret packet.
// When handshake_with is called we will allow a new handshake if this contains a socket, this
// is a temporary solution until we implement proper background handshaking.
pub(super) requested_handshakes: flurry::HashSet<SocketAddr>,

// All known entries.
pub(super) ids: fixed_map::Map<Id, Arc<Entry>, BuildHasherDefault<NoopIdHasher>>,

Expand All @@ -78,6 +73,9 @@ where
// FIXME: This will get replaced with sending on a handshake socket associated with the map.
pub(super) control_socket: Arc<std::net::UdpSocket>,

#[allow(clippy::type_complexity)]
pub(super) request_handshake: RwLock<Option<Box<dyn Fn(SocketAddr) + Send + Sync>>>,

cleaner: Cleaner,

init_time: Timestamp,
Expand Down Expand Up @@ -131,13 +129,13 @@ where
rehandshake_period: Duration::from_secs(3600 * 24),
peers: fixed_map::Map::with_capacity(capacity, Default::default()),
ids: fixed_map::Map::with_capacity(capacity, Default::default()),
requested_handshakes: Default::default(),
cleaner: Cleaner::new(),
signer,
control_socket,
init_time,
clock,
subscriber,
request_handshake: RwLock::new(None),
};

let state = Arc::new(state);
Expand All @@ -152,19 +150,36 @@ where
}

pub fn request_handshake(&self, peer: SocketAddr) {
// The length is reset as part of cleanup to 5000.
let handshakes = self.requested_handshakes.pin();
if handshakes.len() <= 6000 {
handshakes.insert(peer);
self.subscriber()
.on_path_secret_map_background_handshake_requested(
event::builder::PathSecretMapBackgroundHandshakeRequested {
peer_address: SocketAddress::from(peer).into_event(),
},
);
self.subscriber()
.on_path_secret_map_background_handshake_requested(
event::builder::PathSecretMapBackgroundHandshakeRequested {
peer_address: SocketAddress::from(peer).into_event(),
},
);

// Normally we'd expect callers to use the Subscriber to register interest in this, but the
// Map is typically created *before* the s2n_quic::Client with the dc provider registered.
//
// Users of the state tracker typically register the callback when creating a new s2n-quic
// client to handshake into this map.
if let Some(callback) = self
.request_handshake
.read()
.unwrap_or_else(|e| e.into_inner())
.as_deref()
{
(callback)(peer);
}
}

fn register_request_handshake(&self, cb: Box<dyn Fn(SocketAddr) + Send + Sync>) {
// FIXME: Maybe panic if already initialized?
*self
.request_handshake
.write()
.unwrap_or_else(|e| e.into_inner()) = Some(cb);
}

fn handle_unknown_secret(
&self,
packet: &control::unknown_path_secret::Packet,
Expand Down Expand Up @@ -370,17 +385,10 @@ where
self.peers.contains_key(peer)
}

fn needs_handshake(&self, peer: &SocketAddr) -> bool {
self.requested_handshakes.pin().contains(peer)
}

fn on_new_path_secrets(&self, entry: Arc<Entry>) {
let id = *entry.id();
let peer = entry.peer();

// On insert clear our interest in a handshake.
self.requested_handshakes.pin().remove(peer);

let (same, other) = self.ids.insert(id, entry.clone());
if same.is_some() {
// FIXME: Make insertion fallible and fail handshakes instead?
Expand Down Expand Up @@ -445,6 +453,10 @@ where
});
}

fn register_request_handshake(&self, cb: Box<dyn Fn(SocketAddr) + Send + Sync>) {
self.register_request_handshake(cb);
}

fn get_by_addr_untracked(&self, peer: &SocketAddr) -> Option<ReadGuard<Arc<Entry>>> {
self.peers.get_by_key(peer)
}
Expand Down
4 changes: 2 additions & 2 deletions dc/s2n-quic-dc/src/path/secret/map/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ pub trait Store: 'static + Send + Sync {

fn contains(&self, peer: &SocketAddr) -> bool;

fn needs_handshake(&self, peer: &SocketAddr) -> bool;

fn get_by_addr_untracked(&self, peer: &SocketAddr) -> Option<ReadGuard<Arc<Entry>>>;

fn get_by_addr_tracked(&self, peer: &SocketAddr) -> Option<ReadGuard<Arc<Entry>>>;
Expand All @@ -47,6 +45,8 @@ pub trait Store: 'static + Send + Sync {

fn rehandshake_period(&self) -> Duration;

fn register_request_handshake(&self, cb: Box<dyn Fn(SocketAddr) + Send + Sync>);

fn check_dedup(
&self,
entry: &Entry,
Expand Down

0 comments on commit a5d8422

Please sign in to comment.