Skip to content
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 151 additions & 14 deletions quinn-proto/src/cid_queue.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
use std::ops::Range;
use std::{fmt::Debug, ops::Range};

use crate::{ConnectionId, ResetToken, frame::NewConnectionId};

/// DataType stored in CidQueue buffer
type CidData = (ConnectionId, Option<ResetToken>);
#[derive(Debug, Clone, Copy)]
struct CidData(ConnectionId, Option<ResetToken>);

/// Sliding window of active Connection IDs
///
/// May contain gaps due to packet loss or reordering
/// This represents a circular buffer that can contain gaps due to packet loss or reordering.
/// The buffer has three regions:
/// - Exactly one active CID at `self.buffer[self.cursor]`.
/// - Zero to `Self::LEN - 1` reserved CIDs from `self.cursor` up to `self.cursor_reserved`.
/// - More "available"/"ready" CIDs after `self.cursor_reserved`.
///
/// You grow the range of reserved CIDs by calling [`CidQueue::next_reserved`], which takes one
/// of the available ones and returns the CID that was reserved.
/// You add available/ready CIDs by calling [`CidQueue::insert`].
#[derive(Debug)]
pub(crate) struct CidQueue {
/// Ring buffer indexed by `self.cursor`
Expand All @@ -18,16 +27,20 @@ pub(crate) struct CidQueue {
///
/// The sequence number of the active CID; must be the smallest among CIDs in `buffer`.
offset: u64,
/// Circular index for the last reserved CID, i.e. a CID that is
/// not active, but was used for probing packets on a different remote address.
cursor_reserved: usize,
}

impl CidQueue {
pub(crate) fn new(cid: ConnectionId) -> Self {
let mut buffer = [None; Self::LEN];
buffer[0] = Some((cid, None));
buffer[0] = Some(CidData(cid, None));
Self {
buffer,
cursor: 0,
offset: 0,
cursor_reserved: 0,
}
}

Expand All @@ -40,9 +53,8 @@ impl CidQueue {
cid: NewConnectionId,
) -> Result<Option<(Range<u64>, ResetToken)>, InsertError> {
// Position of new CID wrt. the current active CID
let index = match cid.sequence.checked_sub(self.offset) {
None => return Err(InsertError::Retired),
Some(x) => x,
let Some(index) = cid.sequence.checked_sub(self.offset) else {
return Err(InsertError::Retired);
};

let retired_count = cid.retire_prior_to.saturating_sub(self.offset);
Expand All @@ -57,7 +69,7 @@ impl CidQueue {

// Record the new CID
let index = ((self.cursor as u64 + index) % Self::LEN as u64) as usize;
self.buffer[index] = Some((cid.id, Some(cid.reset_token)));
self.buffer[index] = Some(CidData(cid.id, Some(cid.reset_token)));

if retired_count == 0 {
return Ok(None);
Expand All @@ -67,11 +79,12 @@ impl CidQueue {
// retire_prior_to, and inform the caller that all prior CIDs have been retired, and of
// the new CID's reset token.
self.cursor = ((self.cursor as u64 + retired_count) % Self::LEN as u64) as usize;
let (i, (_, token)) = self
.iter()
let (i, CidData(_, token)) = self
.iter_from_active()
.next()
.expect("it is impossible to retire a CID without supplying a new one");
self.cursor = (self.cursor + i) % Self::LEN;
self.cursor_reserved = self.cursor;
let orig_offset = self.offset;
self.offset = cid.retire_prior_to + i as u64;
// We don't immediately retire CIDs in the range (orig_offset +
Expand All @@ -89,27 +102,58 @@ impl CidQueue {
/// Switch to next active CID if possible, return
/// 1) the corresponding ResetToken and 2) a non-empty range preceding it to retire
pub(crate) fn next(&mut self) -> Option<(ResetToken, Range<u64>)> {
let (i, cid_data) = self.iter().nth(1)?;
self.buffer[self.cursor] = None;
let (i, cid_data) = self.iter_from_reserved().nth(1)?;
for i in 0..=self.reserved_len() {
self.buffer[self.cursor + i] = None;
}

let orig_offset = self.offset;
self.offset += i as u64;
self.cursor = (self.cursor + i) % Self::LEN;
self.cursor_reserved = self.cursor;
Some((cid_data.1.unwrap(), orig_offset..self.offset))
}

/// Returns a CID from the available ones and marks it as reserved.
///
/// If there's no more CIDs in the ready set, this will return None.
/// CIDs marked as reserved will be skipped when the active one advances.
pub(crate) fn next_reserved(&mut self) -> Option<ConnectionId> {
let (i, cid_data) = self.iter_from_reserved().nth(1)?;

self.cursor_reserved = (self.cursor_reserved + i) % Self::LEN;
Some(cid_data.0)
}

/// Iterate CIDs in CidQueue that are not `None`, including the active CID
fn iter(&self) -> impl Iterator<Item = (usize, CidData)> + '_ {
fn iter_from_active(&self) -> impl Iterator<Item = (usize, CidData)> + '_ {
(0..Self::LEN).filter_map(move |step| {
let index = (self.cursor + step) % Self::LEN;
self.buffer[index].map(|cid_data| (step, cid_data))
})
}

/// Iterate CIDs in CidQueue that are not `None`, including the active CID
fn iter_from_reserved(&self) -> impl Iterator<Item = (usize, CidData)> + '_ {
(0..(Self::LEN - self.reserved_len())).filter_map(move |step| {
let index = (self.cursor_reserved + step) % Self::LEN;
self.buffer[index].map(|cid_data| (step, cid_data))
})
}

/// The length of the internal buffer's section of CIDs that are marked as reserved.
fn reserved_len(&self) -> usize {
if self.cursor_reserved >= self.cursor {
self.cursor_reserved - self.cursor
} else {
self.cursor_reserved + Self::LEN - self.cursor
}
}

/// Replace the initial CID
pub(crate) fn update_initial_cid(&mut self, cid: ConnectionId) {
debug_assert_eq!(self.offset, 0);
self.buffer[self.cursor] = Some((cid, None));
self.buffer[self.cursor] = Some(CidData(cid, None));
}

/// Return active remote CID itself
Expand Down Expand Up @@ -167,6 +211,7 @@ mod tests {
}
assert!(q.next().is_none());
}

#[test]
fn next_sparse() {
let mut q = CidQueue::new(initial_cid());
Expand Down Expand Up @@ -301,4 +346,96 @@ mod tests {
assert_eq!(q.active(), initial_cid());
assert_eq!(q.active_seq(), 0);
}

#[test]
fn reserved_smoke() {
let mut q = CidQueue::new(initial_cid());
assert_eq!(q.next_reserved(), None);

let one = cid(1, 0);
q.insert(one).unwrap();
assert_eq!(q.next_reserved(), Some(one.id));

let two = cid(2, 2);
let (retired_range, reset_token) = q.insert(two).unwrap().unwrap();
assert_eq!(reset_token, two.reset_token);
assert_eq!(retired_range, 0..2);

assert_eq!(q.next_reserved(), None);

let four = cid(4, 2);
q.insert(four).unwrap();
println!("{q:?}");
assert_eq!(q.next_reserved(), Some(four.id));
assert_eq!(q.active(), two.id);

assert_eq!(q.next(), None);
}

#[test]
fn reserve_multiple() {
let mut q = CidQueue::new(initial_cid());
let one = cid(1, 0);
let two = cid(2, 0);
q.insert(one).unwrap();
q.insert(two).unwrap();
assert_eq!(q.next_reserved(), Some(one.id));
assert_eq!(q.next_reserved(), Some(two.id));
assert_eq!(q.next_reserved(), None);
}

#[test]
fn reserve_multiple_sparse() {
let mut q = CidQueue::new(initial_cid());
let two = cid(2, 0);
let four = cid(4, 0);
q.insert(two).unwrap();
q.insert(four).unwrap();
assert_eq!(q.next_reserved(), Some(two.id));
assert_eq!(q.next_reserved(), Some(four.id));
assert_eq!(q.next_reserved(), None);
}

#[test]
fn reserve_many_next_clears() {
let mut q = CidQueue::new(initial_cid());
for i in 1..CidQueue::LEN {
q.insert(cid(i as u64, 0)).unwrap();
}

for _ in 0..CidQueue::LEN - 2 {
assert!(q.next_reserved().is_some());
}

q.next();
assert_eq!(q.next(), None);
}

#[test]
fn reserve_many_next_reserved_none() {
let mut q = CidQueue::new(initial_cid());
for i in 1..CidQueue::LEN {
q.insert(cid(i as u64, 0)).unwrap();
}

for _ in 0..CidQueue::LEN - 1 {
assert!(q.next_reserved().is_some());
}

assert_eq!(q.next_reserved(), None);
}

#[test]
fn one_active_all_else_reserved_next_none() {
let mut q = CidQueue::new(initial_cid());
for i in 1..CidQueue::LEN {
q.insert(cid(i as u64, 0)).unwrap();
}

for _ in 0..CidQueue::LEN - 1 {
assert!(q.next_reserved().is_some());
}

assert_eq!(q.next(), None);
}
}
64 changes: 44 additions & 20 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -892,26 +892,6 @@ impl Connection {
max_datagrams: usize,
buf: &mut Vec<u8>,
) -> Option<Transmit> {
if let Some(probing) = self
.iroh_hp
.server_side_mut()
.ok()
.and_then(iroh_hp::ServerState::next_probe)
{
let destination = probing.remote();
trace!(%destination, "RAND_DATA packet");
let token: u64 = self.rng.random();
buf.put_u64(token);
probing.finish(token);
return Some(Transmit {
destination,
ecn: None,
size: 8,
segment_size: None,
src_ip: None,
});
}

assert!(max_datagrams != 0);
let max_datagrams = match self.config.enable_segmentation_offload {
false => 1,
Expand Down Expand Up @@ -1118,6 +1098,47 @@ impl Connection {
break;
}

if transmit.is_empty() {
// Nothing to send on this path, and nothing yet built; try to send hole punching
// path challenges.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit: see the suggestion i make above to address these things.

I know we've tried to cover this bit about where to put this before, but upon reading this again I wondered:

  • are we doing this on the right side of the congestion controller?
    • do we need to have a congestion controller for each 4-tuple we're sending off-path patch_challenges for?
  • this branch is only taken if path_should_send is false. but since holepunching is time-sensitive, should it should probably be prioritised over any other data to be sent on this path. otherwise application data can starve out the holepunching.

I think the congestion controller bit is fine to ignore at first. We can not count the off-path path challenges to a congestion controller for now (but should create an issue).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another problem with this location that I hadn't realised before is that this is now prioritised over a CONNECTION_CLOSE frame, which it shouldn't.

if let Some(probing) = self
.iroh_hp
.server_side_mut()
.ok()
.and_then(iroh_hp::ServerState::next_probe)
{
if let Some(new_cid) = self
.rem_cids
.get_mut(&path_id)
.and_then(CidQueue::next_reserved)
{
let destination = probing.remote();
let token: u64 = self.rng.random();
trace!(%destination, cid=%new_cid, token=format!("{:08x}", token), "Nat traversal: PATH_CHALLENGE packet");
// TODO(@divma): abstract writting path challenges, this logic should
// no be here
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How far do you want to go? I was thinking that at some point we have a generic function that writes frames (takes frame::Frame enum) and takes care of all the things we want for writing them:

  • log the FRAME_TYPE being written with appropriate fields (please add that to the code here 😄 )
  • increment the frame stats
  • records the frame with qlog
  • writes the frame to the packet/buffer

That's probably better as a separate refactor though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

completely agree on all accounts, this current state is gross

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, the log line is already there friend :p


buf.write(frame::FrameType::PATH_CHALLENGE);
buf.write(token);
// TODO(@divma): can I just create a new qlog?
// qlog.frame(&Frame::PathChallenge(token));
// TODO(@divma): separate stat?
self.stats.frame_tx.path_challenge += 1;

// Remember the token sent to this remote
probing.finish(token);
// TODO(@divma): figure out padding if any
return Some(Transmit {
destination,
ecn: None,
size: 8,
segment_size: None,
src_ip: None,
});
}
}
}

match self.paths.keys().find(|&&next| next > path_id) {
Some(next_path_id) => {
// See if this next path can send anything.
Expand Down Expand Up @@ -4261,6 +4282,9 @@ impl Connection {
self.ping_path(path_id).ok();
}
}
} else if self.iroh_hp.client_side().is_ok() {
// TODO(@divma): temp log. This might be too much
debug!("Potential Nat traversal PATH_CHALLENGE received");
}
}
Frame::PathResponse(token) => {
Expand Down
Loading