Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quinn-proto/proptest-regressions/tests/proptest.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ cc 4e409ceeb023bc639c6095f657b320a4092f50cfac1430e6a496671717e2d523 # shrinks to
cc 65bf05f654c33cd7c51a20cba759c23876eee689cb4f4f6a58d410a5a895e853 # shrinks to input = _RandomInteractionWithMultipathComplexRoutingArgs { seed: [140, 230, 19, 141, 169, 227, 178, 69, 74, 147, 156, 55, 95, 139, 162, 4, 10, 216, 65, 170, 204, 226, 236, 148, 67, 42, 222, 95, 125, 25, 72, 177], interactions: [OpenPath(Client, Available, 0), ClosePath(Client, 1, 0), Drive(Client), AdvanceTime, Drive(Server), CloseConn(Client, 0), Drive(Client), DropInbound(Server)], routes: RoutingTable { client_routes: [([::1]:44433, 0)], server_routes: [([::1]:4433, 0)] } }
cc 8097f9984cdf32de70860f9937b634238806f927d9049e0885fa4ce850736677 # shrinks to input = _RandomInteractionWithMultipathComplexRoutingArgs { seed: [107, 155, 165, 131, 109, 32, 159, 34, 208, 118, 134, 109, 75, 210, 123, 192, 194, 129, 67, 62, 178, 22, 70, 40, 248, 190, 76, 30, 220, 90, 56, 211], interactions: [OpenPath(Client, Available, 0), Drive(Server), ClosePath(Client, 1, 0), DropInbound(Server)], routes: RoutingTable { client_routes: [([::1]:44433, 0)], server_routes: [([::1]:4433, 0)] } }
cc 1564b6bdcfac5091ec82355328cf6ec990238c15e3e18ba8a415f69e291112f8 # shrinks to input = _RandomInteractionWithMultipathComplexRoutingArgs { seed: [238, 126, 150, 148, 168, 209, 105, 125, 53, 117, 5, 135, 177, 42, 115, 17, 218, 246, 104, 20, 44, 90, 242, 68, 129, 206, 15, 97, 145, 16, 226, 228], interactions: [OpenPath(Client, Available, 0), Drive(Client), ClosePath(Client, 1, 0)], routes: RoutingTable { client_routes: [([::1]:44433, 0)], server_routes: [([::1]:4433, 0)] } }
cc e2f4cb531c73ebf9e324ad607f0add9f194268d80938cd9715f99b78fd1e6b02
354 changes: 214 additions & 140 deletions quinn-proto/src/connection/mod.rs

Large diffs are not rendered by default.

67 changes: 39 additions & 28 deletions quinn-proto/src/connection/paths.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::{
spaces::{PacketNumberSpace, SentPacket},
};
use crate::{
ConnectionId, Duration, Instant, TIMER_GRANULARITY, TransportConfig, VarInt,
ConnectionId, Duration, FourTuple, Instant, TIMER_GRANULARITY, TransportConfig, VarInt,
coding::{self, Decodable, Encodable},
congestion,
frame::ObservedAddr,
Expand Down Expand Up @@ -129,14 +129,14 @@ impl PathState {
pub(super) struct SentChallengeInfo {
/// When was the challenge sent on the wire.
pub(super) sent_instant: Instant,
/// The remote to which this path challenge was sent.
pub(super) remote: SocketAddr,
/// The 4-tuple on which this path challenge was sent.
pub(super) addresses: FourTuple,
}

/// Description of a particular network path
#[derive(Debug)]
pub(super) struct PathData {
pub(super) remote: SocketAddr,
pub(super) addresses: FourTuple,
pub(super) rtt: RttEstimator,
/// Whether we're enabling ECN on outgoing packets
pub(super) sending_ecn: bool,
Expand Down Expand Up @@ -230,7 +230,7 @@ pub(super) struct PathData {

impl PathData {
pub(super) fn new(
remote: SocketAddr,
addresses: FourTuple,
allow_mtud: bool,
peer_max_udp_payload_size: Option<u16>,
generation: u64,
Expand All @@ -242,7 +242,7 @@ impl PathData {
.clone()
.build(now, config.get_initial_mtu());
Self {
remote,
addresses,
rtt: RttEstimator::new(config.initial_rtt),
sending_ecn: true,
pacing: Pacer::new(
Expand Down Expand Up @@ -294,15 +294,15 @@ impl PathData {
///
/// This should only be called when migrating paths.
pub(super) fn from_previous(
remote: SocketAddr,
addresses: FourTuple,
prev: &Self,
generation: u64,
now: Instant,
) -> Self {
let congestion = prev.congestion.clone_box();
let smoothed_rtt = prev.rtt.get();
Self {
remote,
addresses,
rtt: prev.rtt,
pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
sending_ecn: true,
Expand Down Expand Up @@ -373,7 +373,7 @@ impl PathData {
self.total_sent = self.total_sent.saturating_add(inc);
if !self.validated {
trace!(
remote = %self.remote,
addresses = %self.addresses,
anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
"anti amplification budget decreased"
);
Expand All @@ -385,7 +385,7 @@ impl PathData {
self.total_recvd = self.total_recvd.saturating_add(inc);
if !self.validated {
trace!(
remote = %self.remote,
addresses = %self.addresses,
anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent),
"anti amplification budget increased"
);
Expand All @@ -410,18 +410,22 @@ impl PathData {
&mut self,
now: Instant,
token: u64,
remote: SocketAddr,
addresses: FourTuple,
) -> OnPathResponseReceived {
match self.challenges_sent.get(&token) {
// Response to an on-path PathChallenge
Some(info) if info.remote == remote && self.remote == remote => {
Some(info)
if info.addresses.is_probably_same_path(&addresses)
&& self.addresses.is_probably_same_path(&addresses) =>
{
self.addresses.update_local_if_same_remote(&addresses);
let sent_instant = info.sent_instant;
if !std::mem::replace(&mut self.validated, true) {
trace!("new path validated");
}
// Clear any other on-path sent challenge.
self.challenges_sent
.retain(|_token, info| info.remote != remote);
.retain(|_token, info| !info.addresses.is_probably_same_path(&addresses));

self.send_new_challenge = false;

Expand All @@ -434,14 +438,14 @@ impl PathData {
OnPathResponseReceived::OnPath { was_open }
}
// Response to an off-path PathChallenge
Some(info) if info.remote == remote => {
Some(info) if info.addresses.is_probably_same_path(&addresses) => {
self.challenges_sent
.retain(|_token, info| info.remote != remote);
.retain(|_token, info| !info.addresses.is_probably_same_path(&addresses));
OnPathResponseReceived::OffPath
}
// Response to a PathChallenge we recognize, but from an invalid remote
Some(info) => OnPathResponseReceived::Invalid {
expected: info.remote,
expected: info.addresses,
},
// Response to an unknown PathChallenge
None => OnPathResponseReceived::Unknown,
Expand Down Expand Up @@ -541,8 +545,8 @@ pub(super) enum OnPathResponseReceived {
Unknown,
/// The response is invalid.
Invalid {
/// The remote that was expected for this token.
expected: SocketAddr,
/// The 4-tuple that was expected for this token.
expected: FourTuple,
},
}

Expand Down Expand Up @@ -715,15 +719,18 @@ pub(crate) struct PathResponses {
}

impl PathResponses {
pub(crate) fn push(&mut self, packet: u64, token: u64, remote: SocketAddr) {
pub(crate) fn push(&mut self, packet: u64, token: u64, addresses: FourTuple) {
/// Arbitrary permissive limit to prevent abuse
const MAX_PATH_RESPONSES: usize = 16;
let response = PathResponse {
packet,
token,
remote,
addresses,
};
let existing = self.pending.iter_mut().find(|x| x.remote == remote);
let existing = self
.pending
.iter_mut()
.find(|x| x.addresses.remote == addresses.remote);
if let Some(existing) = existing {
// Update a queued response
if existing.packet <= packet {
Expand All @@ -740,20 +747,24 @@ impl PathResponses {
}
}

pub(crate) fn pop_off_path(&mut self, remote: SocketAddr) -> Option<(u64, SocketAddr)> {
pub(crate) fn pop_off_path(&mut self, addresses: FourTuple) -> Option<(u64, FourTuple)> {
let response = *self.pending.last()?;
if response.remote == remote {
// We use an exact comparison here, because once we've received for the first time,
// we really should either already have a local_ip, or we will never get one
// (because our OS doesn't support it).
if response.addresses == addresses {
// We don't bother searching further because we expect that the on-path response will
// get drained in the immediate future by a call to `pop_on_path`
return None;
}
self.pending.pop();
Some((response.token, response.remote))
Some((response.token, response.addresses))
}

pub(crate) fn pop_on_path(&mut self, remote: SocketAddr) -> Option<u64> {
pub(crate) fn pop_on_path(&mut self, addresses: FourTuple) -> Option<u64> {
let response = *self.pending.last()?;
if response.remote != remote {
// Using an exact comparison. See explanation in `pop_off_path`.
if response.addresses != addresses {
// We don't bother searching further because we expect that the off-path response will
// get drained in the immediate future by a call to `pop_off_path`
return None;
Expand All @@ -773,8 +784,8 @@ struct PathResponse {
packet: u64,
/// The token of the PATH_CHALLENGE
token: u64,
/// The address the corresponding PATH_CHALLENGE was received from
remote: SocketAddr,
/// The path the corresponding PATH_CHALLENGE was received from
addresses: FourTuple,
}

/// Summary statistics of packets that have been sent on a particular path, but which have not yet
Expand Down
16 changes: 9 additions & 7 deletions quinn-proto/src/connection/qlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use qlog::{
use tracing::warn;

use crate::{
Connection, ConnectionId, Frame, Instant, PathId,
Connection, ConnectionId, FourTuple, Frame, Instant, PathId,
connection::{PathData, SentPacket, timer::Timer},
frame::{EcnCounts, StreamMeta},
packet::{Header, SpaceId},
Expand Down Expand Up @@ -105,10 +105,10 @@ impl QlogStream {
self.emit_event_with_tuple_id(event, now, None);
}

fn emit_event_with_tuple_id(&self, event: EventData, now: Instant, tuple: Option<String>) {
fn emit_event_with_tuple_id(&self, event: EventData, now: Instant, addresses: Option<String>) {
// Time will be overwritten by `add_event_with_instant`
let mut event = Event::with_time(0.0, event);
event.tuple = tuple;
event.tuple = addresses;
let mut qlog_streamer = self.0.lock().unwrap();
if let Err(e) = qlog_streamer.add_event_with_instant(event, now) {
warn!("could not emit qlog event: {e}");
Expand Down Expand Up @@ -243,7 +243,7 @@ impl QlogSink {
}
}

pub(super) fn emit_tuple_assigned(&self, path_id: PathId, remote: SocketAddr, now: Instant) {
pub(super) fn emit_tuple_assigned(&self, path_id: PathId, tuple: FourTuple, now: Instant) {
#[cfg(feature = "qlog")]
{
let Some(stream) = self.stream.as_ref() else {
Expand All @@ -252,10 +252,12 @@ impl QlogSink {
let tuple_id = fmt_tuple_id(path_id.as_u32() as u64);
let event = TupleAssigned {
tuple_id,
tuple_local: None,
tuple_local: tuple
.local_ip
.map(|local_ip| tuple_endpoint_info(Some(local_ip), None, None)),
tuple_remote: Some(tuple_endpoint_info(
Some(remote.ip()),
Some(remote.port()),
Some(tuple.remote.ip()),
Some(tuple.remote.port()),
None,
)),
};
Expand Down
6 changes: 3 additions & 3 deletions quinn-proto/src/connection/spaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tracing::trace;

use super::{PathId, assembler::Assembler};
use crate::{
Dir, Duration, Instant, SocketAddr, StreamId, TransportError, TransportErrorCode, VarInt,
Dir, Duration, FourTuple, Instant, StreamId, TransportError, TransportErrorCode, VarInt,
connection::StreamsState,
crypto::Keys,
frame::{self, AddAddress, RemoveAddress},
Expand Down Expand Up @@ -525,10 +525,10 @@ pub struct Retransmits {
///
/// It is true that a QUIC endpoint will only want to effectively have NEW_TOKEN frames
/// enqueued for its current path at a given point in time. Based on that, we could conceivably
/// change this from a vector to an `Option<(SocketAddr, usize)>` or just a `usize` or
/// change this from a vector to an `Option<(FourTuple, usize)>` or just a `usize` or
/// something. However, due to the architecture of Quinn, it is considerably simpler to not do
/// that; consider what such a change would mean for implementing `BitOrAssign` on Self.
pub(super) new_tokens: Vec<SocketAddr>,
pub(super) new_tokens: Vec<FourTuple>,
/// Paths which need to be abandoned
pub(super) path_abandon: BTreeMap<PathId, TransportErrorCode>,
/// If a [`frame::PathStatusAvailable`] and [`frame::PathStatusBackup`] need to be sent for a path
Expand Down
32 changes: 9 additions & 23 deletions quinn-proto/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use thiserror::Error;
use tracing::{debug, error, trace, warn};

use crate::{
Duration, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, PathId, RESET_TOKEN_SIZE,
ResetToken, Side, Transmit, TransportConfig, TransportError,
Duration, FourTuple, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, PathId,
RESET_TOKEN_SIZE, ResetToken, Side, Transmit, TransportConfig, TransportError,
cid_generator::ConnectionIdGenerator,
coding::BufMutExt,
config::{ClientConfig, EndpointConfig, ServerConfig},
Expand Down Expand Up @@ -152,8 +152,7 @@ impl Endpoint {
pub fn handle(
&mut self,
now: Instant,
remote: SocketAddr,
local_ip: Option<IpAddr>,
addresses: FourTuple,
ecn: Option<EcnCodepoint>,
data: BytesMut,
buf: &mut Vec<u8>,
Expand All @@ -168,7 +167,7 @@ impl Endpoint {
) {
Ok((first_decode, remaining)) => DatagramConnectionEvent {
now,
remote,
addresses,
path_id: PathId::ZERO, // Corrected later for existing paths
ecn,
first_decode,
Expand Down Expand Up @@ -200,11 +199,11 @@ impl Endpoint {
buf.write(version);
}
return Some(DatagramEvent::Response(Transmit {
destination: remote,
destination: addresses.remote,
ecn: None,
size: buf.len(),
segment_size: None,
src_ip: local_ip,
src_ip: addresses.local_ip,
}));
}
Err(e) => {
Expand All @@ -213,7 +212,6 @@ impl Endpoint {
}
};

let addresses = FourTuple { remote, local_ip };
let dst_cid = event.first_decode.dst_cid();

if let Some(route_to) = self.index.get(&addresses, &event.first_decode) {
Expand Down Expand Up @@ -663,7 +661,7 @@ impl Endpoint {

match conn.handle_first_packet(
incoming.received_at,
incoming.addresses.remote,
incoming.addresses,
incoming.ecn,
packet_number,
incoming.packet,
Expand Down Expand Up @@ -848,8 +846,7 @@ impl Endpoint {
init_cid,
loc_cid,
rem_cid,
addresses.remote,
addresses.local_ip,
addresses,
tls,
self.local_cid_generator.as_ref(),
now,
Expand Down Expand Up @@ -1023,7 +1020,7 @@ struct ConnectionIndex {
/// Identifies outgoing connections with zero-length CIDs
///
/// We don't yet support explicit source addresses for client connections, and zero-length CIDs
/// require a unique four-tuple, so at most one client connection with zero-length local CIDs
/// require a unique 4-tuple, so at most one client connection with zero-length local CIDs
/// may be established per remote. We must omit the local address from the key because we don't
/// necessarily know what address we're sending from, and hence receiving at.
///
Expand Down Expand Up @@ -1386,14 +1383,3 @@ impl ResetTokenTable {
self.0.get(&remote)?.get(&token)
}
}

/// Identifies a connection by the combination of remote and local addresses
///
/// Including the local ensures good behavior when the host has multiple IP addresses on the same
/// subnet and zero-length connection IDs are in use.
#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
struct FourTuple {
remote: SocketAddr,
// A single socket can only listen on a single port, so no need to store it explicitly
local_ip: Option<IpAddr>,
}
Loading
Loading