diff --git a/quinn-proto/proptest-regressions/tests/proptest.txt b/quinn-proto/proptest-regressions/tests/proptest.txt index 967a07c0f..32d7c5c6f 100644 --- a/quinn-proto/proptest-regressions/tests/proptest.txt +++ b/quinn-proto/proptest-regressions/tests/proptest.txt @@ -18,3 +18,4 @@ cc 4e409ceeb023bc639c6095f657b320a4092f50cfac1430e6a496671717e2d523 # shrinks to cc 65bf05f654c33cd7c51a20cba759c23876eee689cb4f4f6a58d410a5a895e853 # shrinks to input = _RandomInteractionWithMultipathComplexRoutingArgs { seed: [140, 230, 19, 141, 169, 227, 178, 69, 74, 147, 156, 55, 95, 139, 162, 4, 10, 216, 65, 170, 204, 226, 236, 148, 67, 42, 222, 95, 125, 25, 72, 177], interactions: [OpenPath(Client, Available, 0), ClosePath(Client, 1, 0), Drive(Client), AdvanceTime, Drive(Server), CloseConn(Client, 0), Drive(Client), DropInbound(Server)], routes: RoutingTable { client_routes: [([::1]:44433, 0)], server_routes: [([::1]:4433, 0)] } } cc 8097f9984cdf32de70860f9937b634238806f927d9049e0885fa4ce850736677 # shrinks to input = _RandomInteractionWithMultipathComplexRoutingArgs { seed: [107, 155, 165, 131, 109, 32, 159, 34, 208, 118, 134, 109, 75, 210, 123, 192, 194, 129, 67, 62, 178, 22, 70, 40, 248, 190, 76, 30, 220, 90, 56, 211], interactions: [OpenPath(Client, Available, 0), Drive(Server), ClosePath(Client, 1, 0), DropInbound(Server)], routes: RoutingTable { client_routes: [([::1]:44433, 0)], server_routes: [([::1]:4433, 0)] } } cc 1564b6bdcfac5091ec82355328cf6ec990238c15e3e18ba8a415f69e291112f8 # shrinks to input = _RandomInteractionWithMultipathComplexRoutingArgs { seed: [238, 126, 150, 148, 168, 209, 105, 125, 53, 117, 5, 135, 177, 42, 115, 17, 218, 246, 104, 20, 44, 90, 242, 68, 129, 206, 15, 97, 145, 16, 226, 228], interactions: [OpenPath(Client, Available, 0), Drive(Client), ClosePath(Client, 1, 0)], routes: RoutingTable { client_routes: [([::1]:44433, 0)], server_routes: [([::1]:4433, 0)] } } +cc e2f4cb531c73ebf9e324ad607f0add9f194268d80938cd9715f99b78fd1e6b02 diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index e61ec5ac7..bf3bd4181 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -18,9 +18,9 @@ use thiserror::Error; use tracing::{debug, error, trace, trace_span, warn}; use crate::{ - Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT, - MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError, - TransportErrorCode, VarInt, + Dir, Duration, EndpointConfig, FourTuple, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, + MAX_STREAM_COUNT, MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, + TransportError, TransportErrorCode, VarInt, cid_generator::ConnectionIdGenerator, cid_queue::CidQueue, coding::{BufMutExt, Encodable}, @@ -159,9 +159,6 @@ pub struct Connection { handshake_cid: ConnectionId, /// The CID the peer initially chose, for use during the handshake rem_handshake_cid: ConnectionId, - /// The "real" local IP address which was was used to receive the initial packet. - /// This is only populated for the server case, and if known - local_ip: Option, /// The [`PathData`] for each path /// /// This needs to be ordered because [`Connection::poll_transmit`] needs to @@ -326,8 +323,7 @@ impl Connection { init_cid: ConnectionId, loc_cid: ConnectionId, rem_cid: ConnectionId, - remote: SocketAddr, - local_ip: Option, + network_path: FourTuple, crypto: Box, cid_gen: &dyn ConnectionIdGenerator, now: Instant, @@ -371,7 +367,7 @@ impl Connection { ), )]); - let mut path = PathData::new(remote, allow_mtud, None, 0, now, &config); + let mut path = PathData::new(network_path, allow_mtud, None, 0, now, &config); // TODO(@divma): consider if we want to delay this until the path is validated path.open = true; let mut this = Self { @@ -389,7 +385,6 @@ impl Connection { )]), path_counter: 0, allow_mtud, - local_ip, state, side: connection_side, zero_rtt_enabled: false, @@ -469,7 +464,8 @@ impl Connection { this.write_crypto(); this.init_0rtt(now); } - this.qlog.emit_tuple_assigned(PathId::ZERO, remote, now); + this.qlog + .emit_tuple_assigned(PathId::ZERO, network_path, now); this } @@ -545,28 +541,38 @@ impl Connection { } } - /// Opens a new path only if no path to the remote address exists so far + /// Opens a new path only if no path on the same network path currently exists. /// - /// See [`open_path`]. Returns `(path_id, true)` if the path already existed. `(path_id, + /// This comparison will use [`FourTuple::is_probably_same_path`] on the given `network_path` + /// and pass it existing path's network paths. + /// + /// This means that you can pass `local_ip: None` to make the comparison only compare + /// remote addresses. + /// + /// This avoids having to guess which local interface will be used to communicate with the + /// remote, should it not be known yet. We assume that if we already have a path to the remote, + /// the OS is likely to use the same interface to talk to said remote. + /// + /// See also [`open_path`]. Returns `(path_id, true)` if the path already existed. `(path_id, /// false)` if was opened. /// /// [`open_path`]: Connection::open_path pub fn open_path_ensure( &mut self, - remote: SocketAddr, + network_path: FourTuple, initial_status: PathStatus, now: Instant, ) -> Result<(PathId, bool), PathError> { - match self - .paths - .iter() - .find(|(_id, path)| path.data.remote == remote) - { - Some((path_id, _state)) => Ok((*path_id, true)), - None => self - .open_path(remote, initial_status, now) - .map(|id| (id, false)), - } + Ok( + match self + .paths + .iter() + .find(|(_id, path)| network_path.is_probably_same_path(&path.data.network_path)) + { + Some((path_id, _state)) => (*path_id, true), + None => (self.open_path(network_path, initial_status, now)?, false), + }, + ) } /// Opens a new path @@ -575,7 +581,7 @@ impl Connection { /// When the path is opened it will be reported as an [`PathEvent::Opened`]. pub fn open_path( &mut self, - remote: SocketAddr, + network_path: FourTuple, initial_status: PathStatus, now: Instant, ) -> Result { @@ -608,7 +614,7 @@ impl Connection { return Err(PathError::RemoteCidsExhausted); } - let path = self.ensure_path(path_id, remote, now, None); + let path = self.ensure_path(path_id, network_path, now, None); path.status.local_update(initial_status); Ok(path_id) @@ -738,10 +744,10 @@ impl Connection { .ok_or(ClosedPath { _private: () }) } - /// Returns the path's remote socket address - pub fn path_remote_address(&self, path_id: PathId) -> Result { + /// Returns the path's network path represented as a 4-tuple. + pub fn network_path(&self, path_id: PathId) -> Result { self.path(path_id) - .map(|path| path.remote) + .map(|path| path.network_path) .ok_or(ClosedPath { _private: () }) } @@ -822,13 +828,32 @@ impl Connection { &mut self.paths.get_mut(&path_id).expect("known path").data } + /// Check if the 4-tuple path (as in RFC9000 Path, not multipath path) had already been validated. + fn find_open_path_on_network_path( + &self, + network_path: FourTuple, + ) -> Option<(&PathId, &PathState)> { + self.paths.iter().find(|(path_id, path_state)| { + path_state.data.validated + // Would this use the same network path, if network_path were used to send right now? + && network_path.is_probably_same_path(&path_state.data.network_path) + && !self.abandoned_paths.contains(path_id) + }) + // TODO(@divma): we might want to ensure the path has been recently active to consider the + // address validated + // matheus23: Perhaps looking at !self.abandoned_paths.contains(path_id) is enough, given keep-alives? + } + fn ensure_path( &mut self, path_id: PathId, - remote: SocketAddr, + network_path: FourTuple, now: Instant, pn: Option, ) -> &mut PathData { + let valid_path = self.find_open_path_on_network_path(network_path); + let validated = valid_path.is_some(); + let initial_rtt = valid_path.map(|(_, state)| state.data.rtt.conservative()); let vacant_entry = match self.paths.entry(path_id) { btree_map::Entry::Vacant(vacant_entry) => vacant_entry, btree_map::Entry::Occupied(occupied_entry) => { @@ -836,14 +861,12 @@ impl Connection { } }; - // TODO(matheus23): Add back short-circuiting path.validated = true, if we know that the - // path's four-tuple was already validated. - debug!(%path_id, ?remote, "path added"); + debug!(%validated, %path_id, %network_path, "path added"); let peer_max_udp_payload_size = u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX); self.path_counter = self.path_counter.wrapping_add(1); let mut data = PathData::new( - remote, + network_path, self.allow_mtud, Some(peer_max_udp_payload_size), self.path_counter, @@ -851,6 +874,11 @@ impl Connection { &self.config, ); + data.validated = validated; + if let Some(initial_rtt) = initial_rtt { + data.rtt.reset_initial_rtt(initial_rtt); + } + let pto = self.ack_frequency.max_ack_delay_for_pto() + data.rtt.pto_base(); self.timers.set( Timer::PerPath(path_id, PathTimer::PathOpen), @@ -871,7 +899,7 @@ impl Connection { self.spaces[SpaceId::Data] .number_spaces .insert(path_id, pn_space); - self.qlog.emit_tuple_assigned(path_id, remote, now); + self.qlog.emit_tuple_assigned(path_id, network_path, now); &mut path.data } @@ -1321,7 +1349,9 @@ impl Connection { // path validation can occur while the link is saturated. if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 { let path = self.path_data_mut(path_id); - if let Some((token, remote)) = path.path_responses.pop_off_path(path.remote) { + if let Some((token, network_path)) = + path.path_responses.pop_off_path(path.network_path) + { // TODO(flub): We need to use the right CID! We shouldn't use the same // CID as the current active one for the path. Though see also // https://github.com/quinn-rs/quinn/issues/2184 @@ -1343,11 +1373,11 @@ impl Connection { ); self.stats.udp_tx.on_sent(1, transmit.len()); return Some(Transmit { - destination: remote, + destination: network_path.remote, size: transmit.len(), ecn: None, segment_size: None, - src_ip: self.local_ip, + src_ip: network_path.local_ip, }); } } @@ -1580,11 +1610,11 @@ impl Connection { return None; } - let destination = self.path_data(path_id).remote; + let network_path = self.path_data(path_id).network_path; trace!( segment_size = transmit.segment_size(), last_datagram_len = transmit.len() % transmit.segment_size(), - ?destination, + %network_path, "sending {} bytes in {} datagrams", transmit.len(), transmit.num_datagrams() @@ -1597,7 +1627,7 @@ impl Connection { .on_sent(transmit.num_datagrams() as u64, transmit.len()); Some(Transmit { - destination, + destination: network_path.remote, size: transmit.len(), ecn: if self.path_data(path_id).sending_ecn { Some(EcnCodepoint::Ect0) @@ -1608,7 +1638,7 @@ impl Connection { 1 => None, _ => Some(transmit.segment_size()), }, - src_ip: self.local_ip, + src_ip: network_path.local_ip, }) } @@ -1713,11 +1743,11 @@ impl Connection { return None; }; prev_path.send_new_challenge = false; - let destination = prev_path.remote; + let network_path = prev_path.network_path; let token = self.rng.random(); let info = paths::SentChallengeInfo { sent_instant: now, - remote: destination, + network_path, }; prev_path.challenges_sent.insert(token, info); debug_assert_eq!( @@ -1760,11 +1790,11 @@ impl Connection { self.stats.udp_tx.on_sent(1, buf.len()); Some(Transmit { - destination, + destination: network_path.remote, size: buf.len(), ecn: None, segment_size: None, - src_ip: self.local_ip, + src_ip: network_path.local_ip, }) } @@ -1811,7 +1841,7 @@ impl Connection { match event.0 { Datagram(DatagramConnectionEvent { now, - remote, + network_path, path_id, ecn, first_decode, @@ -1819,19 +1849,10 @@ impl Connection { }) => { let span = trace_span!("pkt", %path_id); let _guard = span.enter(); - // If this packet could initiate a migration and we're a client or a server that - // forbids migration, drop the datagram. This could be relaxed to heuristically - // permit NAT-rebinding-like migration. - if let Some(known_remote) = self.path(path_id).map(|path| path.remote) { - if remote != known_remote && !self.side.remote_may_migrate(&self.state) { - trace!( - %path_id, - ?remote, - path_remote = ?self.path(path_id).map(|p| p.remote), - "discarding packet from unrecognized peer" - ); - return; - } + + if self.update_network_path_or_discard(network_path, path_id) { + // A return value of true indicates we should discard this packet. + return; } let was_anti_amplification_blocked = self @@ -1844,7 +1865,7 @@ impl Connection { self.stats.udp_rx.bytes += first_decode.len() as u64; let data_len = first_decode.len(); - self.handle_decode(now, remote, path_id, ecn, first_decode); + self.handle_decode(now, network_path, path_id, ecn, first_decode); // The current `path` might have changed inside `handle_decode` since the packet // could have triggered a migration. The packet might also belong to an unknown // path and have been rejected. Make sure the data received is accounted for the @@ -1855,7 +1876,7 @@ impl Connection { if let Some(data) = remaining { self.stats.udp_rx.bytes += data.len() as u64; - self.handle_coalesced(now, remote, path_id, ecn, data); + self.handle_coalesced(now, network_path, path_id, ecn, data); } if let Some(path) = self.paths.get_mut(&path_id) { @@ -1888,6 +1909,68 @@ impl Connection { } } + /// Updates the network path for `path_id` and returns false, or returns true if a packet + /// coming in for this `path_id` over given `network_path` should be discarded. + fn update_network_path_or_discard(&mut self, network_path: FourTuple, path_id: PathId) -> bool { + let remote_may_migrate = self.side.remote_may_migrate(&self.state); + let local_ip_may_migrate = self.side.is_client(); + // If this packet could initiate a migration and we're a client or a server that + // forbids migration, drop the datagram. This could be relaxed to heuristically + // permit NAT-rebinding-like migration. + if let Some(known_path) = self.path_mut(path_id) { + if network_path.remote != known_path.network_path.remote && !remote_may_migrate { + trace!( + %path_id, + %network_path, + %known_path.network_path, + "discarding packet from unrecognized peer" + ); + return true; + } + + if known_path.network_path.local_ip.is_some() + && network_path.local_ip.is_some() + && known_path.network_path.local_ip != network_path.local_ip + && !local_ip_may_migrate + { + trace!( + %path_id, + %network_path, + %known_path.network_path, + "discarding packet sent to incorrect interface" + ); + return true; + } + // If the datagram indicates that we've changed our local IP, we update it. + // This is alluded to in Section 5.2 of the Multipath RFC draft 18: + // https://www.ietf.org/archive/id/draft-ietf-quic-multipath-18.html#name-using-multiple-paths-on-the + // > Client receives the packet, recognizes a path migration, updates the source address of path 2 to 192.0.2.1. + if let Some(local_ip) = network_path.local_ip { + // If we already had a local_ip, but it changed, then we need to re-trigger path validation. + if known_path + .network_path + .local_ip + .is_some_and(|ip| ip != local_ip) + { + debug!( + %path_id, + %network_path, + %known_path.network_path, + "path's local address seemingly migrated" + ); + } + // We update the address without path validation on the client side. + // https://www.ietf.org/archive/id/draft-ietf-quic-multipath-18.html#section-5.1 + // > Servers observing a 4-tuple change will perform path validation (see Section 9 of [QUIC-TRANSPORT]). + // This sounds like it's *only* the server endpoints that do this. + // TODO(matheus23): We should still consider doing a proper migration on the client side in the future. + // For now, this preserves the behavior of this code pre 4-tuple tracking. + known_path.network_path.local_ip = Some(local_ip); + } + } + false + } + /// Process timer expirations /// /// Executes protocol logic, potentially preparing signals (including application `Event`s, @@ -1991,11 +2074,15 @@ impl Connection { path.data.send_new_challenge = true; } PathTimer::PathOpen => { - let Some(path) = self.path_mut(path_id) else { + let Some(path) = self.paths.get_mut(&path_id) else { continue; }; - path.challenges_sent.clear(); - path.send_new_challenge = false; + path.data.challenges_sent.clear(); + path.data.send_new_challenge = false; + self.timers.stop( + Timer::PerPath(path_id, PathTimer::PathChallengeLost), + self.qlog.with_time(now), + ); debug!("new path validation failed"); if let Err(err) = self.close_path( now, @@ -2202,19 +2289,6 @@ impl Connection { .ok_or(ClosedPath { _private: () }) } - /// The local IP address which was used when the peer established - /// the connection - /// - /// This can be different from the address the endpoint is bound to, in case - /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`. - /// - /// This will return `None` for clients, or when no `local_ip` was passed to - /// [`Endpoint::handle()`](crate::Endpoint::handle) for the datagrams establishing this - /// connection. - pub fn local_ip(&self) -> Option { - self.local_ip - } - /// Current best estimate of this connection's latency (round-trip-time) pub fn rtt(&self, path_id: PathId) -> Option { self.path(path_id).map(|d| d.rtt.get()) @@ -3286,7 +3360,7 @@ impl Connection { pub(crate) fn handle_first_packet( &mut self, now: Instant, - remote: SocketAddr, + network_path: FourTuple, ecn: Option, packet_number: u64, packet: InitialPacket, @@ -3323,7 +3397,7 @@ impl Connection { self.process_decrypted_packet( now, - remote, + network_path, path_id, Some(packet_number), packet, @@ -3331,7 +3405,7 @@ impl Connection { )?; self.qlog.emit_packet_received(qlog, now); if let Some(data) = remaining { - self.handle_coalesced(now, remote, path_id, ecn, data); + self.handle_coalesced(now, network_path, path_id, ecn, data); } self.qlog.emit_recovery_metrics( @@ -3521,7 +3595,7 @@ impl Connection { fn handle_coalesced( &mut self, now: Instant, - remote: SocketAddr, + network_path: FourTuple, path_id: PathId, ecn: Option, data: BytesMut, @@ -3544,7 +3618,7 @@ impl Connection { ) { Ok((partial_decode, rest)) => { remaining = rest; - self.handle_decode(now, remote, path_id, ecn, partial_decode); + self.handle_decode(now, network_path, path_id, ecn, partial_decode); } Err(e) => { trace!("malformed header: {}", e); @@ -3557,7 +3631,7 @@ impl Connection { fn handle_decode( &mut self, now: Instant, - remote: SocketAddr, + network_path: FourTuple, path_id: PathId, ecn: Option, partial_decode: PartialDecode, @@ -3571,7 +3645,7 @@ impl Connection { ) { self.handle_packet( now, - remote, + network_path, path_id, ecn, decoded.packet, @@ -3584,7 +3658,7 @@ impl Connection { fn handle_packet( &mut self, now: Instant, - remote: SocketAddr, + network_path: FourTuple, path_id: PathId, ecn: Option, packet: Option, @@ -3597,22 +3671,22 @@ impl Connection { "got {:?} packet ({} bytes) from {} using id {}", packet.header.space(), packet.payload.len() + packet.header_data.len(), - remote, + network_path, packet.header.dst_cid(), ); } if self.is_handshaking() { if path_id != PathId::ZERO { - debug!(%remote, %path_id, "discarding multipath packet during handshake"); + debug!(%network_path, %path_id, "discarding multipath packet during handshake"); return; } - if remote != self.path_data_mut(path_id).remote { + if network_path != self.path_data_mut(path_id).network_path { if let Some(hs) = self.state.as_handshake() { if hs.allow_server_migration { - trace!(?remote, prev = ?self.path_data(path_id).remote, "server migrated to new remote"); - self.path_data_mut(path_id).remote = remote; - self.qlog.emit_tuple_assigned(path_id, remote, now); + trace!(%network_path, prev = ?self.path_data(path_id).network_path, "server migrated to new remote"); + self.path_data_mut(path_id).network_path = network_path; + self.qlog.emit_tuple_assigned(path_id, network_path, now); } else { debug!("discarding packet with unexpected remote during handshake"); return; @@ -3700,7 +3774,7 @@ impl Connection { if self.side().is_server() && !self.abandoned_paths.contains(&path_id) { // Only the client is allowed to open paths - self.ensure_path(path_id, remote, now, number); + self.ensure_path(path_id, network_path, now, number); } if self.paths.contains_key(&path_id) { self.on_packet_authenticated( @@ -3715,8 +3789,14 @@ impl Connection { } } - let res = self - .process_decrypted_packet(now, remote, path_id, number, packet, &mut qlog); + let res = self.process_decrypted_packet( + now, + network_path, + path_id, + number, + packet, + &mut qlog, + ); self.qlog.emit_packet_received(qlog, now); res @@ -3777,16 +3857,16 @@ impl Connection { let path_remote = self .paths .get(&path_id) - .map(|p| p.data.remote) - .unwrap_or(remote); - self.close = remote == path_remote; + .map(|p| p.data.network_path) + .unwrap_or(network_path); + self.close = network_path == path_remote; } } fn process_decrypted_packet( &mut self, now: Instant, - remote: SocketAddr, + network_path: FourTuple, path_id: PathId, number: Option, packet: Packet, @@ -3802,9 +3882,14 @@ impl Connection { let state = match self.state.as_type() { StateType::Established => { match packet.header.space() { - SpaceId::Data => { - self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)? - } + SpaceId::Data => self.process_payload( + now, + network_path, + path_id, + number.unwrap(), + packet, + qlog, + )?, _ if packet.header.has_frames() => { self.process_early_payload(now, path_id, packet, qlog)? } @@ -3998,7 +4083,8 @@ impl Connection { } } if let Some(token) = params.stateless_reset_token { - let remote = self.path_data(path_id).remote; + // TODO(matheus23): Reset token for a remote, or for a 4-tuple? + let remote = self.path_data(path_id).network_path.remote; self.endpoint_events .push_back(EndpointEventInner::ResetToken(path_id, remote, token)); } @@ -4069,7 +4155,7 @@ impl Connection { ty: LongType::ZeroRtt, .. } => { - self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?; + self.process_payload(now, network_path, path_id, number.unwrap(), packet, qlog)?; Ok(()) } Header::VersionNegotiate { .. } => { @@ -4169,7 +4255,7 @@ impl Connection { fn process_payload( &mut self, now: Instant, - remote: SocketAddr, + network_path: FourTuple, path_id: PathId, number: u64, packet: Packet, @@ -4264,8 +4350,8 @@ impl Connection { let path = &mut self .path_mut(path_id) .expect("payload is processed only after the path becomes known"); - path.path_responses.push(number, challenge.0, remote); - if remote == path.remote { + path.path_responses.push(number, challenge.0, network_path); + if network_path == path.network_path { // PATH_CHALLENGE on active path, possible off-path packet forwarding // attack. Send a non-probing packet to recover the active path. // TODO(flub): No longer true! We now path_challege also to validate @@ -4291,7 +4377,10 @@ impl Connection { use PathTimer::*; use paths::OnPathResponseReceived::*; - match path.data.on_path_response_received(now, response.0, remote) { + match path + .data + .on_path_response_received(now, response.0, network_path) + { OnPath { was_open } => { let qlog = self.qlog.with_time(now); @@ -4339,7 +4428,7 @@ impl Connection { ); } Invalid { expected } => { - debug!(%response, from=%remote, %expected, "ignoring invalid PATH_RESPONSE") + debug!(%response, %network_path, %expected, "ignoring invalid PATH_RESPONSE") } Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"), } @@ -4490,7 +4579,8 @@ impl Connection { )); } pending_retired.extend(retired.map(|seq| (path_id, seq))); - self.set_reset_token(path_id, remote, reset_token); + // TODO(matheus23): Reset token for a remote or a full 4-tuple? + self.set_reset_token(path_id, network_path.remote, reset_token); } Err(InsertError::ExceedsLimit) => { return Err(TransportError::CONNECTION_ID_LIMIT_ERROR("")); @@ -4609,7 +4699,7 @@ impl Connection { } let path = self.path_data_mut(path_id); - if remote == path.remote { + if network_path == path.network_path { if let Some(updated) = path.update_observed_addr_report(observed) { if path.open { self.events.push_back(Event::Path(PathEvent::ObservedAddr { @@ -4845,7 +4935,7 @@ impl Connection { if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet && !is_probing_packet - && remote != self.path_data(path_id).remote + && network_path != self.path_data(path_id).network_path { let ConnectionSide::Server { ref server_config } = self.side else { panic!("packets from unknown remote should be dropped by clients"); @@ -4854,7 +4944,7 @@ impl Connection { server_config.migration, "migration-initiating packets should have been dropped immediately" ); - self.migrate(path_id, now, remote, migration_observed_addr); + self.migrate(path_id, now, network_path, migration_observed_addr); // Break linkability, if possible self.update_rem_cid(path_id); self.spin = false; @@ -4867,10 +4957,10 @@ impl Connection { &mut self, path_id: PathId, now: Instant, - remote: SocketAddr, + network_path: FourTuple, observed_addr: Option, ) { - trace!(%remote, %path_id, "migration initiated"); + trace!(%network_path, %path_id, "migration initiated"); self.path_counter = self.path_counter.wrapping_add(1); // TODO(@divma): conditions for path migration in multipath are very specific, check them // again to prevent path migrations that should actually create a new path @@ -4881,14 +4971,16 @@ impl Connection { let prev_pto = self.pto(SpaceId::Data, path_id); let known_path = self.paths.get_mut(&path_id).expect("known path"); let path = &mut known_path.data; - let mut new_path = if remote.is_ipv4() && remote.ip() == path.remote.ip() { - PathData::from_previous(remote, path, self.path_counter, now) + let mut new_path = if network_path.remote.is_ipv4() + && network_path.remote.ip() == path.network_path.remote.ip() + { + PathData::from_previous(network_path, path, self.path_counter, now) } else { let peer_max_udp_payload_size = u16::try_from(self.peer_params.max_udp_payload_size.into_inner()) .unwrap_or(u16::MAX); PathData::new( - remote, + network_path, self.allow_mtud, Some(peer_max_udp_payload_size), self.path_counter, @@ -4919,7 +5011,7 @@ impl Connection { } // We need to re-assign the correct remote to this path in qlog - self.qlog.emit_tuple_assigned(path_id, remote, now); + self.qlog.emit_tuple_assigned(path_id, network_path, now); self.timers.set( Timer::PerPath(path_id, PathTimer::PathValidation), @@ -4948,7 +5040,7 @@ impl Connection { .pending .retire_cids .extend(retired.map(|seq| (path_id, seq))); - let remote = self.path_data(path_id).remote; + let remote = self.path_data(path_id).network_path.remote; self.set_reset_token(path_id, remote, reset_token); } @@ -5090,7 +5182,8 @@ impl Connection { .should_report(&self.peer_params.address_discovery_role) && (!path.observed_addr_sent || space.pending.observed_addr) { - let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no); + let frame = + frame::ObservedAddr::new(path.network_path.remote, self.next_observed_addr_seq_no); if buf.remaining_mut() > frame.size() { trace!(seq = %frame.seq_no, ip = %frame.ip, port = frame.port, "OBSERVED_ADDRESS"); frame.encode(buf); @@ -5199,7 +5292,7 @@ impl Connection { let token = self.rng.random(); let info = paths::SentChallengeInfo { sent_instant: now, - remote: path.remote, + network_path: path.network_path, }; path.challenges_sent.insert(token, info); sent.non_retransmits = true; @@ -5229,7 +5322,10 @@ impl Connection { .address_discovery_role .should_report(&self.peer_params.address_discovery_role) { - let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no); + let frame = frame::ObservedAddr::new( + path.network_path.remote, + self.next_observed_addr_seq_no, + ); if buf.remaining_mut() > frame.size() { frame.encode(buf); qlog.frame(&Frame::ObservedAddr(frame)); @@ -5247,7 +5343,7 @@ impl Connection { // PATH_RESPONSE if buf.remaining_mut() > frame::PathResponse::SIZE_BOUND && space_id == SpaceId::Data { - if let Some(token) = path.path_responses.pop_on_path(path.remote) { + if let Some(token) = path.path_responses.pop_on_path(path.network_path) { sent.non_retransmits = true; sent.requires_padding = true; let response = frame::PathResponse(token); @@ -5265,8 +5361,10 @@ impl Connection { .address_discovery_role .should_report(&self.peer_params.address_discovery_role) { - let frame = - frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no); + let frame = frame::ObservedAddr::new( + path.network_path.remote, + self.next_observed_addr_seq_no, + ); if buf.remaining_mut() > frame.size() { frame.encode(buf); qlog.frame(&Frame::ObservedAddr(frame)); @@ -5558,7 +5656,7 @@ impl Connection { let path = &mut self.paths.get_mut(&path_id).expect("known path").data; // NEW_TOKEN - while let Some(remote_addr) = space.pending.new_tokens.pop() { + while let Some(network_path) = space.pending.new_tokens.pop() { if path_exclusive_only { break; } @@ -5567,7 +5665,7 @@ impl Connection { panic!("NEW_TOKEN frames should not be enqueued by clients"); }; - if remote_addr != path.remote { + if !network_path.is_probably_same_path(&path.network_path) { // NEW_TOKEN frames contain tokens bound to a client's IP address, and are only // useful if used from the same IP address. Thus, we abandon enqueued NEW_TOKEN // frames upon an path change. Instead, when the new path becomes validated, @@ -5577,7 +5675,7 @@ impl Connection { let token = Token::new( TokenPayload::Validation { - ip: remote_addr.ip(), + ip: network_path.remote.ip(), issued: server_config.time_source.now(), }, &mut self.rng, @@ -5587,7 +5685,7 @@ impl Connection { }; if buf.remaining_mut() < new_token.size() { - space.pending.new_tokens.push(remote_addr); + space.pending.new_tokens.push(network_path); break; } @@ -5597,7 +5695,7 @@ impl Connection { sent.retransmits .get_or_create() .new_tokens - .push(remote_addr); + .push(network_path); self.stats.frame_tx.new_token += 1; } @@ -5776,7 +5874,7 @@ impl Connection { .expect( "preferred address CID is the first received, and hence is guaranteed to be legal", ); - let remote = self.path_data(PathId::ZERO).remote; + let remote = self.path_data(PathId::ZERO).network_path.remote; self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token); } self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms); @@ -6183,11 +6281,11 @@ impl Connection { let ConnectionSide::Server { server_config } = &self.side else { return; }; - let remote_addr = self.path_data(path_id).remote; + let network_path = self.path_data(path_id).network_path; let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens; new_tokens.clear(); for _ in 0..server_config.validation_token.sent { - new_tokens.push(remote_addr); + new_tokens.push(network_path); } } @@ -6283,7 +6381,15 @@ impl Connection { return Ok(None); } }; - match self.open_path_ensure(remote, PathStatus::Backup, now) { + // TODO(matheus23): Probe the correct 4-tuple, instead of only a remote address? + // By specifying None, we do two things: 1. open_path_ensure won't generate two + // paths to the same remote and 2. we let the OS choose which interface to use for + // sending on that path. + let network_path = FourTuple { + remote, + local_ip: None, + }; + match self.open_path_ensure(network_path, PathStatus::Backup, now) { Ok((path_id, path_was_known)) => { if path_was_known { trace!(%path_id, %remote, "nat traversal: path existed for remote"); @@ -6345,7 +6451,10 @@ impl Connection { let mut path_ids = Vec::with_capacity(addresses_to_probe.len()); let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len()); - let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6()); + let ipv6 = self + .paths + .values() + .any(|p| p.data.network_path.remote.is_ipv6()); for (id, address) in addresses_to_probe { match self.open_nat_traversal_path(now, address, ipv6) { @@ -6388,7 +6497,10 @@ impl Connection { fn continue_nat_traversal_round(&mut self, now: Instant) -> Option { let client_state = self.iroh_hp.client_side_mut().ok()?; let (id, address) = client_state.continue_nat_traversal_round()?; - let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6()); + let ipv6 = self + .paths + .values() + .any(|p| p.data.network_path.remote.is_ipv6()); let open_result = self.open_nat_traversal_path(now, address, ipv6); let client_state = self.iroh_hp.client_side_mut().expect("validated"); match open_result { diff --git a/quinn-proto/src/connection/paths.rs b/quinn-proto/src/connection/paths.rs index d522f4a20..c15018292 100644 --- a/quinn-proto/src/connection/paths.rs +++ b/quinn-proto/src/connection/paths.rs @@ -11,7 +11,7 @@ use super::{ spaces::{PacketNumberSpace, SentPacket}, }; use crate::{ - ConnectionId, Duration, Instant, TIMER_GRANULARITY, TransportConfig, VarInt, + ConnectionId, Duration, FourTuple, Instant, TIMER_GRANULARITY, TransportConfig, VarInt, coding::{self, Decodable, Encodable}, congestion, frame::ObservedAddr, @@ -129,14 +129,14 @@ impl PathState { pub(super) struct SentChallengeInfo { /// When was the challenge sent on the wire. pub(super) sent_instant: Instant, - /// The remote to which this path challenge was sent. - pub(super) remote: SocketAddr, + /// The 4-tuple on which this path challenge was sent. + pub(super) network_path: FourTuple, } /// Description of a particular network path #[derive(Debug)] pub(super) struct PathData { - pub(super) remote: SocketAddr, + pub(super) network_path: FourTuple, pub(super) rtt: RttEstimator, /// Whether we're enabling ECN on outgoing packets pub(super) sending_ecn: bool, @@ -230,7 +230,7 @@ pub(super) struct PathData { impl PathData { pub(super) fn new( - remote: SocketAddr, + network_path: FourTuple, allow_mtud: bool, peer_max_udp_payload_size: Option, generation: u64, @@ -242,7 +242,7 @@ impl PathData { .clone() .build(now, config.get_initial_mtu()); Self { - remote, + network_path, rtt: RttEstimator::new(config.initial_rtt), sending_ecn: true, pacing: Pacer::new( @@ -294,7 +294,7 @@ impl PathData { /// /// This should only be called when migrating paths. pub(super) fn from_previous( - remote: SocketAddr, + network_path: FourTuple, prev: &Self, generation: u64, now: Instant, @@ -302,7 +302,7 @@ impl PathData { let congestion = prev.congestion.clone_box(); let smoothed_rtt = prev.rtt.get(); Self { - remote, + network_path, rtt: prev.rtt, pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now), sending_ecn: true, @@ -373,7 +373,7 @@ impl PathData { self.total_sent = self.total_sent.saturating_add(inc); if !self.validated { trace!( - remote = %self.remote, + network_path = %self.network_path, anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent), "anti amplification budget decreased" ); @@ -385,7 +385,7 @@ impl PathData { self.total_recvd = self.total_recvd.saturating_add(inc); if !self.validated { trace!( - remote = %self.remote, + network_path = %self.network_path, anti_amplification_budget = %(self.total_recvd * 3).saturating_sub(self.total_sent), "anti amplification budget increased" ); @@ -410,18 +410,22 @@ impl PathData { &mut self, now: Instant, token: u64, - remote: SocketAddr, + network_path: FourTuple, ) -> OnPathResponseReceived { match self.challenges_sent.get(&token) { // Response to an on-path PathChallenge - Some(info) if info.remote == remote && self.remote == remote => { + Some(info) + if info.network_path.is_probably_same_path(&network_path) + && self.network_path.is_probably_same_path(&network_path) => + { + self.network_path.update_local_if_same_remote(&network_path); let sent_instant = info.sent_instant; if !std::mem::replace(&mut self.validated, true) { trace!("new path validated"); } // Clear any other on-path sent challenge. self.challenges_sent - .retain(|_token, info| info.remote != remote); + .retain(|_token, info| !info.network_path.is_probably_same_path(&network_path)); self.send_new_challenge = false; @@ -434,14 +438,14 @@ impl PathData { OnPathResponseReceived::OnPath { was_open } } // Response to an off-path PathChallenge - Some(info) if info.remote == remote => { + Some(info) if info.network_path.is_probably_same_path(&network_path) => { self.challenges_sent - .retain(|_token, info| info.remote != remote); + .retain(|_token, info| !info.network_path.is_probably_same_path(&network_path)); OnPathResponseReceived::OffPath } // Response to a PathChallenge we recognize, but from an invalid remote Some(info) => OnPathResponseReceived::Invalid { - expected: info.remote, + expected: info.network_path, }, // Response to an unknown PathChallenge None => OnPathResponseReceived::Unknown, @@ -541,8 +545,8 @@ pub(super) enum OnPathResponseReceived { Unknown, /// The response is invalid. Invalid { - /// The remote that was expected for this token. - expected: SocketAddr, + /// The 4-tuple that was expected for this token. + expected: FourTuple, }, } @@ -715,15 +719,18 @@ pub(crate) struct PathResponses { } impl PathResponses { - pub(crate) fn push(&mut self, packet: u64, token: u64, remote: SocketAddr) { + pub(crate) fn push(&mut self, packet: u64, token: u64, network_path: FourTuple) { /// Arbitrary permissive limit to prevent abuse const MAX_PATH_RESPONSES: usize = 16; let response = PathResponse { packet, token, - remote, + network_path, }; - let existing = self.pending.iter_mut().find(|x| x.remote == remote); + let existing = self + .pending + .iter_mut() + .find(|x| x.network_path.remote == network_path.remote); if let Some(existing) = existing { // Update a queued response if existing.packet <= packet { @@ -740,20 +747,24 @@ impl PathResponses { } } - pub(crate) fn pop_off_path(&mut self, remote: SocketAddr) -> Option<(u64, SocketAddr)> { + pub(crate) fn pop_off_path(&mut self, network_path: FourTuple) -> Option<(u64, FourTuple)> { let response = *self.pending.last()?; - if response.remote == remote { + // We use an exact comparison here, because once we've received for the first time, + // we really should either already have a local_ip, or we will never get one + // (because our OS doesn't support it). + if response.network_path == network_path { // We don't bother searching further because we expect that the on-path response will // get drained in the immediate future by a call to `pop_on_path` return None; } self.pending.pop(); - Some((response.token, response.remote)) + Some((response.token, response.network_path)) } - pub(crate) fn pop_on_path(&mut self, remote: SocketAddr) -> Option { + pub(crate) fn pop_on_path(&mut self, network_path: FourTuple) -> Option { let response = *self.pending.last()?; - if response.remote != remote { + // Using an exact comparison. See explanation in `pop_off_path`. + if response.network_path != network_path { // We don't bother searching further because we expect that the off-path response will // get drained in the immediate future by a call to `pop_off_path` return None; @@ -773,8 +784,8 @@ struct PathResponse { packet: u64, /// The token of the PATH_CHALLENGE token: u64, - /// The address the corresponding PATH_CHALLENGE was received from - remote: SocketAddr, + /// The path the corresponding PATH_CHALLENGE was received from + network_path: FourTuple, } /// Summary statistics of packets that have been sent on a particular path, but which have not yet diff --git a/quinn-proto/src/connection/qlog.rs b/quinn-proto/src/connection/qlog.rs index 47d97d965..975392256 100644 --- a/quinn-proto/src/connection/qlog.rs +++ b/quinn-proto/src/connection/qlog.rs @@ -37,7 +37,7 @@ use qlog::{ use tracing::warn; use crate::{ - Connection, ConnectionId, Frame, Instant, PathId, + Connection, ConnectionId, FourTuple, Frame, Instant, PathId, connection::{PathData, SentPacket, timer::Timer}, frame::{EcnCounts, StreamMeta}, packet::{Header, SpaceId}, @@ -105,10 +105,15 @@ impl QlogStream { self.emit_event_with_tuple_id(event, now, None); } - fn emit_event_with_tuple_id(&self, event: EventData, now: Instant, tuple: Option) { + fn emit_event_with_tuple_id( + &self, + event: EventData, + now: Instant, + network_path: Option, + ) { // Time will be overwritten by `add_event_with_instant` let mut event = Event::with_time(0.0, event); - event.tuple = tuple; + event.tuple = network_path; let mut qlog_streamer = self.0.lock().unwrap(); if let Err(e) = qlog_streamer.add_event_with_instant(event, now) { warn!("could not emit qlog event: {e}"); @@ -243,7 +248,7 @@ impl QlogSink { } } - pub(super) fn emit_tuple_assigned(&self, path_id: PathId, remote: SocketAddr, now: Instant) { + pub(super) fn emit_tuple_assigned(&self, path_id: PathId, tuple: FourTuple, now: Instant) { #[cfg(feature = "qlog")] { let Some(stream) = self.stream.as_ref() else { @@ -252,10 +257,12 @@ impl QlogSink { let tuple_id = fmt_tuple_id(path_id.as_u32() as u64); let event = TupleAssigned { tuple_id, - tuple_local: None, + tuple_local: tuple + .local_ip + .map(|local_ip| tuple_endpoint_info(Some(local_ip), None, None)), tuple_remote: Some(tuple_endpoint_info( - Some(remote.ip()), - Some(remote.port()), + Some(tuple.remote.ip()), + Some(tuple.remote.port()), None, )), }; diff --git a/quinn-proto/src/connection/spaces.rs b/quinn-proto/src/connection/spaces.rs index 85d7f8392..4c3dc7aaf 100644 --- a/quinn-proto/src/connection/spaces.rs +++ b/quinn-proto/src/connection/spaces.rs @@ -13,7 +13,7 @@ use tracing::trace; use super::{PathId, assembler::Assembler}; use crate::{ - Dir, Duration, Instant, SocketAddr, StreamId, TransportError, TransportErrorCode, VarInt, + Dir, Duration, FourTuple, Instant, StreamId, TransportError, TransportErrorCode, VarInt, connection::StreamsState, crypto::Keys, frame::{self, AddAddress, RemoveAddress}, @@ -525,10 +525,10 @@ pub struct Retransmits { /// /// It is true that a QUIC endpoint will only want to effectively have NEW_TOKEN frames /// enqueued for its current path at a given point in time. Based on that, we could conceivably - /// change this from a vector to an `Option<(SocketAddr, usize)>` or just a `usize` or + /// change this from a vector to an `Option<(FourTuple, usize)>` or just a `usize` or /// something. However, due to the architecture of Quinn, it is considerably simpler to not do /// that; consider what such a change would mean for implementing `BitOrAssign` on Self. - pub(super) new_tokens: Vec, + pub(super) new_tokens: Vec, /// Paths which need to be abandoned pub(super) path_abandon: BTreeMap, /// If a [`frame::PathStatusAvailable`] and [`frame::PathStatusBackup`] need to be sent for a path diff --git a/quinn-proto/src/endpoint.rs b/quinn-proto/src/endpoint.rs index ce94c926a..77991f29f 100644 --- a/quinn-proto/src/endpoint.rs +++ b/quinn-proto/src/endpoint.rs @@ -15,8 +15,8 @@ use thiserror::Error; use tracing::{debug, error, trace, warn}; use crate::{ - Duration, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, PathId, RESET_TOKEN_SIZE, - ResetToken, Side, Transmit, TransportConfig, TransportError, + Duration, FourTuple, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, PathId, + RESET_TOKEN_SIZE, ResetToken, Side, Transmit, TransportConfig, TransportError, cid_generator::ConnectionIdGenerator, coding::{BufMutExt, Encodable}, config::{ClientConfig, EndpointConfig, ServerConfig}, @@ -152,8 +152,7 @@ impl Endpoint { pub fn handle( &mut self, now: Instant, - remote: SocketAddr, - local_ip: Option, + network_path: FourTuple, ecn: Option, data: BytesMut, buf: &mut Vec, @@ -168,7 +167,7 @@ impl Endpoint { ) { Ok((first_decode, remaining)) => DatagramConnectionEvent { now, - remote, + network_path, path_id: PathId::ZERO, // Corrected later for existing paths ecn, first_decode, @@ -200,11 +199,11 @@ impl Endpoint { buf.write(version); } return Some(DatagramEvent::Response(Transmit { - destination: remote, + destination: network_path.remote, ecn: None, size: buf.len(), segment_size: None, - src_ip: local_ip, + src_ip: network_path.local_ip, })); } Err(e) => { @@ -213,10 +212,9 @@ impl Endpoint { } }; - let addresses = FourTuple { remote, local_ip }; let dst_cid = event.first_decode.dst_cid(); - if let Some(route_to) = self.index.get(&addresses, &event.first_decode) { + if let Some(route_to) = self.index.get(&network_path, &event.first_decode) { event.path_id = match route_to { RouteDatagramTo::Incoming(_) => PathId::ZERO, RouteDatagramTo::Connection(_, path_id) => path_id, @@ -250,7 +248,7 @@ impl Endpoint { } else if event.first_decode.initial_header().is_some() { // Potentially create a new connection - self.handle_first_packet(datagram_len, event, addresses, buf) + self.handle_first_packet(datagram_len, event, network_path, buf) } else if event.first_decode.has_long_header() { debug!( "ignoring non-initial packet for unknown connection {}", @@ -268,7 +266,7 @@ impl Endpoint { } else { // If we got this far, we're receiving a seemingly valid packet for an unknown // connection. Send a stateless reset if possible. - self.stateless_reset(now, datagram_len, addresses, dst_cid, buf) + self.stateless_reset(now, datagram_len, network_path, dst_cid, buf) .map(DatagramEvent::Response) } } @@ -278,7 +276,7 @@ impl Endpoint { &mut self, now: Instant, inciting_dgram_len: usize, - addresses: FourTuple, + network_path: FourTuple, dst_cid: ConnectionId, buf: &mut Vec, ) -> Option { @@ -308,7 +306,7 @@ impl Endpoint { debug!( "sending stateless reset for {} to {}", - dst_cid, addresses.remote + dst_cid, network_path.remote ); self.last_stateless_reset = Some(now); // Resets with at least this much padding can't possibly be distinguished from real packets @@ -328,11 +326,11 @@ impl Endpoint { debug_assert!(buf.len() < inciting_dgram_len); Some(Transmit { - destination: addresses.remote, + destination: network_path.remote, ecn: None, size: buf.len(), segment_size: None, - src_ip: addresses.local_ip, + src_ip: network_path.local_ip, }) } @@ -443,7 +441,7 @@ impl Endpoint { &mut self, datagram_len: usize, event: DatagramConnectionEvent, - addresses: FourTuple, + network_path: FourTuple, buf: &mut Vec, ) -> Option { let dst_cid = event.first_decode.dst_cid(); @@ -452,7 +450,7 @@ impl Endpoint { let Some(server_config) = &self.server_config else { debug!("packet for unrecognized connection {}", dst_cid); return self - .stateless_reset(event.now, datagram_len, addresses, dst_cid, buf) + .stateless_reset(event.now, datagram_len, network_path, dst_cid, buf) .map(DatagramEvent::Response); }; @@ -477,7 +475,7 @@ impl Endpoint { if let Err(reason) = self.early_validate_first_packet(header) { return Some(DatagramEvent::Response(self.initial_close( header.version, - addresses, + network_path, &crypto, header.src_cid, reason, @@ -504,13 +502,13 @@ impl Endpoint { let server_config = self.server_config.as_ref().unwrap().clone(); - let token = match IncomingToken::from_header(&header, &server_config, addresses.remote) { + let token = match IncomingToken::from_header(&header, &server_config, network_path.remote) { Ok(token) => token, Err(InvalidRetryTokenError) => { debug!("rejecting invalid retry token"); return Some(DatagramEvent::Response(self.initial_close( header.version, - addresses, + network_path, &crypto, header.src_cid, TransportError::INVALID_TOKEN(""), @@ -525,7 +523,7 @@ impl Endpoint { Some(DatagramEvent::NewConnection(Incoming { received_at: event.now, - addresses, + network_path, ecn: event.ecn, packet: InitialPacket { header, @@ -586,7 +584,7 @@ impl Endpoint { cause: ConnectionError::CidsExhausted, response: Some(self.initial_close( version, - incoming.addresses, + incoming.network_path, &incoming.crypto, src_cid, TransportError::CONNECTION_REFUSED(""), @@ -648,7 +646,7 @@ impl Endpoint { dst_cid, loc_cid, src_cid, - incoming.addresses, + incoming.network_path, incoming.received_at, tls, transport_config, @@ -663,7 +661,7 @@ impl Endpoint { match conn.handle_first_packet( incoming.received_at, - incoming.addresses.remote, + incoming.network_path, incoming.ecn, packet_number, incoming.packet, @@ -684,7 +682,7 @@ impl Endpoint { let response = match e { ConnectionError::TransportError(ref e) => Some(self.initial_close( version, - incoming.addresses, + incoming.network_path, &incoming.crypto, src_cid, e.clone(), @@ -734,7 +732,7 @@ impl Endpoint { self.initial_close( incoming.packet.header.version, - incoming.addresses, + incoming.network_path, &incoming.crypto, incoming.packet.header.src_cid, TransportError::CONNECTION_REFUSED(""), @@ -764,7 +762,7 @@ impl Endpoint { let loc_cid = self.local_cid_generator.generate_cid(); let payload = TokenPayload::Retry { - address: incoming.addresses.remote, + address: incoming.network_path.remote, orig_dst_cid: incoming.packet.header.dst_cid, issued: server_config.time_source.now(), }; @@ -786,11 +784,11 @@ impl Endpoint { encode.finish(buf, &*incoming.crypto.header.local, None); Ok(Transmit { - destination: incoming.addresses.remote, + destination: incoming.network_path.remote, ecn: None, size: buf.len(), segment_size: None, - src_ip: incoming.addresses.local_ip, + src_ip: incoming.network_path.local_ip, }) } @@ -817,7 +815,7 @@ impl Endpoint { init_cid: ConnectionId, loc_cid: ConnectionId, rem_cid: ConnectionId, - addresses: FourTuple, + network_path: FourTuple, now: Instant, tls: Box, transport_config: Arc, @@ -831,14 +829,14 @@ impl Endpoint { let pref_addr_cid = side_args.pref_addr_cid(); let qlog = - transport_config.create_qlog_sink(side_args.side(), addresses.remote, init_cid, now); + transport_config.create_qlog_sink(side_args.side(), network_path.remote, init_cid, now); qlog.emit_connection_started( now, loc_cid, rem_cid, - addresses.remote, - addresses.local_ip, + network_path.remote, + network_path.local_ip, params, ); @@ -848,8 +846,7 @@ impl Endpoint { init_cid, loc_cid, rem_cid, - addresses.remote, - addresses.local_ip, + network_path, tls, self.local_cid_generator.as_ref(), now, @@ -873,13 +870,13 @@ impl Endpoint { let id = self.connections.insert(ConnectionMeta { init_cid, loc_cids: FxHashMap::from_iter([(PathId::ZERO, path_cids)]), - addresses, + network_path, side, reset_token: Default::default(), }); debug_assert_eq!(id, ch.0, "connection handle allocation out of sync"); - self.index.insert_conn(addresses, loc_cid, ch, side); + self.index.insert_conn(network_path, loc_cid, ch, side); conn } @@ -887,7 +884,7 @@ impl Endpoint { fn initial_close( &mut self, version: u32, - addresses: FourTuple, + network_path: FourTuple, crypto: &Keys, remote_id: ConnectionId, reason: TransportError, @@ -917,11 +914,11 @@ impl Endpoint { Some((0, Default::default(), &*crypto.packet.local)), ); Transmit { - destination: addresses.remote, + destination: network_path.remote, ecn: None, size: buf.len(), segment_size: None, - src_ip: addresses.local_ip, + src_ip: network_path.local_ip, } } @@ -1023,11 +1020,12 @@ struct ConnectionIndex { /// Identifies outgoing connections with zero-length CIDs /// /// We don't yet support explicit source addresses for client connections, and zero-length CIDs - /// require a unique four-tuple, so at most one client connection with zero-length local CIDs + /// require a unique 4-tuple, so at most one client connection with zero-length local CIDs /// may be established per remote. We must omit the local address from the key because we don't /// necessarily know what address we're sending from, and hence receiving at. /// /// Uses a standard `HashMap` to protect against hash collision attacks. + // TODO(matheus23): It's possible this could be changed now that we track the full 4-tuple on the client side, too. outgoing_connection_remotes: HashMap, /// Reset tokens provided by the peer for the CID each connection is currently sending to /// @@ -1070,7 +1068,7 @@ impl ConnectionIndex { /// its current 4-tuple fn insert_conn( &mut self, - addresses: FourTuple, + network_path: FourTuple, dst_cid: ConnectionId, connection: ConnectionHandle, side: Side, @@ -1079,11 +1077,11 @@ impl ConnectionIndex { 0 => match side { Side::Server => { self.incoming_connection_remotes - .insert(addresses, connection); + .insert(network_path, connection); } Side::Client => { self.outgoing_connection_remotes - .insert(addresses.remote, connection); + .insert(network_path.remote, connection); } }, _ => { @@ -1106,16 +1104,16 @@ impl ConnectionIndex { for cid in conn.loc_cids.values().flat_map(|pcids| pcids.cids.values()) { self.connection_ids.remove(cid); } - self.incoming_connection_remotes.remove(&conn.addresses); + self.incoming_connection_remotes.remove(&conn.network_path); self.outgoing_connection_remotes - .remove(&conn.addresses.remote); + .remove(&conn.network_path.remote); for (remote, token) in conn.reset_token.values() { self.connection_reset_tokens.remove(*remote, *token); } } /// Find the existing connection that `datagram` should be routed to, if any - fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option { + fn get(&self, network_path: &FourTuple, datagram: &PartialDecode) -> Option { if !datagram.dst_cid().is_empty() { if let Some(&(ch, path_id)) = self.connection_ids.get(&datagram.dst_cid()) { return Some(RouteDatagramTo::Connection(ch, path_id)); @@ -1127,12 +1125,12 @@ impl ConnectionIndex { } } if datagram.dst_cid().is_empty() { - if let Some(&ch) = self.incoming_connection_remotes.get(addresses) { + if let Some(&ch) = self.incoming_connection_remotes.get(network_path) { // Never multipath because QUIC-MULTIPATH 1.1 mandates the use of non-zero // length CIDs. So this is always PathId::ZERO. return Some(RouteDatagramTo::Connection(ch, PathId::ZERO)); } - if let Some(&ch) = self.outgoing_connection_remotes.get(&addresses.remote) { + if let Some(&ch) = self.outgoing_connection_remotes.get(&network_path.remote) { // Like above, QUIC-MULTIPATH 1.1 mandates the use of non-zero length CIDs. return Some(RouteDatagramTo::Connection(ch, PathId::ZERO)); } @@ -1144,7 +1142,7 @@ impl ConnectionIndex { // For stateless resets the PathId is meaningless since it closes the entire // connection regardless of path. So use PathId::ZERO. self.connection_reset_tokens - .get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..]) + .get(network_path.remote, &data[data.len() - RESET_TOKEN_SIZE..]) .cloned() .map(|ch| RouteDatagramTo::Connection(ch, PathId::ZERO)) } @@ -1159,7 +1157,7 @@ pub(crate) struct ConnectionMeta { /// /// Only needed to support connections with zero-length CIDs, which cannot migrate, so we don't /// bother keeping it up to date. - addresses: FourTuple, + network_path: FourTuple, side: Side, /// Reset tokens provided by the peer for CIDs we're currently sending to /// @@ -1219,7 +1217,7 @@ pub enum DatagramEvent { /// An incoming connection for which the server has not yet begun its part of the handshake. pub struct Incoming { received_at: Instant, - addresses: FourTuple, + network_path: FourTuple, ecn: Option, packet: InitialPacket, rest: Option, @@ -1234,12 +1232,12 @@ impl Incoming { /// /// This has the same behavior as [`Connection::local_ip`]. pub fn local_ip(&self) -> Option { - self.addresses.local_ip + self.network_path.local_ip } /// The peer's UDP address pub fn remote_address(&self) -> SocketAddr { - self.addresses.remote + self.network_path.remote } /// Whether the socket address that is initiating this connection has been validated @@ -1270,7 +1268,7 @@ impl Incoming { impl fmt::Debug for Incoming { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Incoming") - .field("addresses", &self.addresses) + .field("network_path", &self.network_path) .field("ecn", &self.ecn) // packet doesn't implement debug // rest is too big and not meaningful enough @@ -1386,14 +1384,3 @@ impl ResetTokenTable { self.0.get(&remote)?.get(&token) } } - -/// Identifies a connection by the combination of remote and local addresses -/// -/// Including the local ensures good behavior when the host has multiple IP addresses on the same -/// subnet and zero-length connection IDs are in use. -#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)] -struct FourTuple { - remote: SocketAddr, - // A single socket can only listen on a single port, so no need to store it explicitly - local_ip: Option, -} diff --git a/quinn-proto/src/lib.rs b/quinn-proto/src/lib.rs index e74549dd0..42a9cdc0b 100644 --- a/quinn-proto/src/lib.rs +++ b/quinn-proto/src/lib.rs @@ -343,3 +343,69 @@ const MAX_UDP_PAYLOAD: u16 = 65527; const TIMER_GRANULARITY: Duration = Duration::from_millis(1); /// Maximum number of streams that can be uniquely identified by a stream ID const MAX_STREAM_COUNT: u64 = 1 << 60; + +/// Identifies a network path by the combination of remote and local addresses +/// +/// Including the local ensures good behavior when the host has multiple IP addresses on the same +/// subnet and zero-length connection IDs are in use or when multipath is enabled and multiple +/// paths exist with the same remote, but different local IP interfaces. +#[derive(Hash, Eq, PartialEq, Copy, Clone)] +pub struct FourTuple { + /// The remote side of this tuple + pub remote: SocketAddr, + /// The local side of this tuple. + /// + /// The socket is irrelevant for our intents and purposes: + /// When we send, we can only specify the `src_ip`, not the source port. + /// So even if we track the port, we won't be able to make use of it. + pub local_ip: Option, +} + +impl FourTuple { + /// Returns whether we think the other address probably represents the same path + /// as ours. + /// + /// If we have a local IP set, then we're exact and only match if the 4-tuples are + /// exactly equal. + /// If we don't have a local IP set, then we only check the remote addresses for equality. + pub fn is_probably_same_path(&self, other: &Self) -> bool { + self.remote == other.remote && (self.local_ip.is_none() || self.local_ip == other.local_ip) + } + + /// Updates this tuple's local address iff + /// - it was unset before, + /// - the other tuple has the same remote, and + /// - the other tuple has a local address set. + /// + /// Returns whether this and the other remote are now fully equal. + pub fn update_local_if_same_remote(&mut self, other: &Self) -> bool { + if self.remote != other.remote { + return false; + } + if self.local_ip.is_some() && self.local_ip != other.local_ip { + return false; + } + self.local_ip = other.local_ip; + true + } +} + +impl fmt::Display for FourTuple { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("(")?; + if let Some(local_ip) = &self.local_ip { + local_ip.fmt(f)?; + f.write_str(", ")?; + } else { + f.write_str(", ")?; + } + self.remote.fmt(f)?; + f.write_str(")") + } +} + +impl fmt::Debug for FourTuple { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self, f) + } +} diff --git a/quinn-proto/src/shared.rs b/quinn-proto/src/shared.rs index 34bf68414..040c35495 100644 --- a/quinn-proto/src/shared.rs +++ b/quinn-proto/src/shared.rs @@ -2,6 +2,7 @@ use std::{fmt, net::SocketAddr}; use bytes::{Buf, BufMut, BytesMut}; +use crate::FourTuple; use crate::PathId; use crate::{Duration, Instant, MAX_CID_SIZE, ResetToken, coding::BufExt, packet::PartialDecode}; @@ -21,7 +22,7 @@ pub(crate) enum ConnectionEventInner { #[derive(Debug)] pub(crate) struct DatagramConnectionEvent { pub(crate) now: Instant, - pub(crate) remote: SocketAddr, + pub(crate) network_path: FourTuple, pub(crate) path_id: PathId, pub(crate) ecn: Option, pub(crate) first_decode: PartialDecode, diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index fef7df2c7..0c512fead 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -26,7 +26,7 @@ use tracing::info; use super::*; use crate::{ - Duration, Instant, + Duration, FourTuple, Instant, cid_generator::{ConnectionIdGenerator, RandomConnectionIdGenerator}, crypto::rustls::QuicServerConfig, frame::FrameStruct, @@ -64,8 +64,10 @@ fn version_negotiate_server() { let mut buf = Vec::with_capacity(server.config().get_max_udp_payload_size() as usize); let event = server.handle( now, - client_addr, - None, + FourTuple { + remote: client_addr, + local_ip: None, + }, None, // Long-header packet with reserved version number hex!("80 0a1a2a3a 04 00000000 04 00000000 00")[..].into(), @@ -106,8 +108,10 @@ fn version_negotiate_client() { let mut buf = Vec::with_capacity(client.config().get_max_udp_payload_size() as usize); let opt_event = client.handle( now, - server_addr, - None, + FourTuple { + remote: server_addr, + local_ip: None, + }, None, // Version negotiation packet for reserved version, with empty DCID hex!( @@ -268,14 +272,17 @@ fn stateless_reset_limit() { ); let time = Instant::now(); let mut buf = Vec::new(); - let event = endpoint.handle(time, remote, None, None, [0u8; 1024][..].into(), &mut buf); + let network_path = FourTuple { + remote, + local_ip: None, + }; + let event = endpoint.handle(time, network_path, None, [0u8; 1024][..].into(), &mut buf); assert!(matches!(event, Some(DatagramEvent::Response(_)))); - let event = endpoint.handle(time, remote, None, None, [0u8; 1024][..].into(), &mut buf); + let event = endpoint.handle(time, network_path, None, [0u8; 1024][..].into(), &mut buf); assert!(event.is_none()); let event = endpoint.handle( time + endpoint_config.min_reset_interval - Duration::from_nanos(1), - remote, - None, + network_path, None, [0u8; 1024][..].into(), &mut buf, @@ -283,8 +290,7 @@ fn stateless_reset_limit() { assert!(event.is_none()); let event = endpoint.handle( time + endpoint_config.min_reset_interval, - remote, - None, + network_path, None, [0u8; 1024][..].into(), &mut buf, @@ -1352,7 +1358,8 @@ fn migration() { assert_matches!(pair.client_conn_mut(client_ch).poll(), None); assert_eq!( pair.server_conn_mut(server_ch) - .path_remote_address(PathId::ZERO), + .network_path(PathId::ZERO) + .map(|addrs| addrs.remote), Ok(pair.client.addr) ); @@ -2400,8 +2407,10 @@ fn malformed_token_len() { let mut buf = Vec::with_capacity(server.config().get_max_udp_payload_size() as usize); server.handle( Instant::now(), - client_addr, - None, + FourTuple { + remote: client_addr, + local_ip: None, + }, None, hex!("8900 0000 0101 0000 1b1b 841b 0000 0000 3f00")[..].into(), &mut buf, @@ -2542,7 +2551,8 @@ fn migrate_detects_new_mtu_and_respects_original_peer_max_udp_payload_size() { // Sanity check: the server saw that the client address was updated assert_eq!( pair.server_conn_mut(server_ch) - .path_remote_address(PathId::ZERO), + .network_path(PathId::ZERO) + .map(|addrs| addrs.remote), Ok(pair.client.addr) ); @@ -3829,6 +3839,10 @@ fn address_discovery_rebind_retransmission() { fn reject_short_idcid() { let _guard = subscribe(); let client_addr = "[::2]:7890".parse().unwrap(); + let network_path = FourTuple { + remote: client_addr, + local_ip: None, + }; let mut server = Endpoint::new( Default::default(), Some(Arc::new(server_config())), @@ -3840,7 +3854,7 @@ fn reject_short_idcid() { // Initial header that has an empty DCID but is otherwise well-formed let mut initial = BytesMut::from(hex!("c4 00000001 00 00 00 3f").as_ref()); initial.resize(MIN_INITIAL_SIZE.into(), 0); - let event = server.handle(now, client_addr, None, None, initial, &mut buf); + let event = server.handle(now, network_path, None, initial, &mut buf); let Some(DatagramEvent::Response(Transmit { .. })) = event else { panic!("expected an initial close"); }; diff --git a/quinn-proto/src/tests/multipath.rs b/quinn-proto/src/tests/multipath.rs index 79ca0fcc2..a63a8b0d9 100644 --- a/quinn-proto/src/tests/multipath.rs +++ b/quinn-proto/src/tests/multipath.rs @@ -11,8 +11,8 @@ use tracing::info; use crate::tests::util::{CLIENT_PORTS, SERVER_PORTS}; use crate::{ ClientConfig, ClosePathError, ConnectionHandle, ConnectionId, ConnectionIdGenerator, Endpoint, - EndpointConfig, LOC_CID_COUNT, PathId, PathStatus, RandomConnectionIdGenerator, ServerConfig, - TransportConfig, cid_queue::CidQueue, + EndpointConfig, FourTuple, LOC_CID_COUNT, PathId, PathStatus, RandomConnectionIdGenerator, + ServerConfig, TransportConfig, cid_queue::CidQueue, }; use crate::{Event, PathError, PathEvent}; @@ -460,7 +460,7 @@ fn open_path() { let (mut pair, client_ch, _server_ch) = multipath_pair(); let now = pair.time; - let server_addr = pair.server.addr; + let server_addr = pair.addrs_to_server(); let path_id = pair .client_conn_mut(client_ch) .open_path(server_addr, PathStatus::Available, now) @@ -485,7 +485,7 @@ fn open_path_key_update() { let (mut pair, client_ch, _server_ch) = multipath_pair(); let now = pair.time; - let server_addr = pair.server.addr; + let server_addr = pair.addrs_to_server(); let path_id = pair .client_conn_mut(client_ch) .open_path(server_addr, PathStatus::Available, now) @@ -516,10 +516,13 @@ fn open_path_validation_fails_server_side() { let _guard = subscribe(); let (mut pair, client_ch, _server_ch) = multipath_pair(); - let different_addr = SocketAddr::new( - [9, 8, 7, 6].into(), - SERVER_PORTS.lock().unwrap().next().unwrap(), - ); + let different_addr = FourTuple { + remote: SocketAddr::new( + [9, 8, 7, 6].into(), + SERVER_PORTS.lock().unwrap().next().unwrap(), + ), + local_ip: None, + }; let now = pair.time; let path_id = pair .client_conn_mut(client_ch) @@ -554,9 +557,13 @@ fn open_path_validation_fails_client_side() { let now = pair.time; let addr = pair.server.addr; + let network_path = FourTuple { + remote: addr, + local_ip: None, + }; let path_id = pair .client_conn_mut(client_ch) - .open_path(addr, PathStatus::Available, now) + .open_path(network_path, PathStatus::Available, now) .unwrap(); // block the client from receiving anything @@ -574,7 +581,7 @@ fn close_path() { let (mut pair, client_ch, _server_ch) = multipath_pair(); let now = pair.time; - let server_addr = pair.server.addr; + let server_addr = pair.addrs_to_server(); let path_id = pair .client_conn_mut(client_ch) .open_path(server_addr, PathStatus::Available, now) @@ -610,7 +617,7 @@ fn close_last_path() { let (mut pair, client_ch, server_ch) = multipath_pair(); let now = pair.time; - let server_addr = pair.server.addr; + let server_addr = pair.addrs_to_server(); let path_id = pair .client_conn_mut(client_ch) .open_path(server_addr, PathStatus::Available, now) @@ -684,14 +691,16 @@ fn per_path_observed_address() { // open a second path let now = pair.time; - let remote = pair.server.addr; + let network_path = pair.addrs_to_server(); let conn = pair.client_conn_mut(client_ch); - let _new_path_id = conn.open_path(remote, PathStatus::Available, now).unwrap(); + let _new_path_id = conn + .open_path(network_path, PathStatus::Available, now) + .unwrap(); pair.drive(); let conn = pair.client_conn_mut(client_ch); // check the migration related event - assert_matches!(conn.poll(), Some(Event::Path(PathEvent::ObservedAddr{id: PathId::ZERO, addr})) if addr == our_addr); + assert_matches!(conn.poll(), Some(Event::Path(PathEvent::ObservedAddr{ id: PathId::ZERO, addr })) if addr == our_addr); // wait for the open event let mut opened = false; while let Some(ev) = conn.poll() { @@ -736,7 +745,7 @@ fn mtud_on_two_paths() { // Open a 2nd path. let now = pair.time; - let server_addr = pair.server.addr; + let server_addr = pair.addrs_to_server(); let path_id = pair .client_conn_mut(client_ch) .open_path(server_addr, PathStatus::Available, now) diff --git a/quinn-proto/src/tests/proptest.rs b/quinn-proto/src/tests/proptest.rs index 592f134f5..cca1dddf8 100644 --- a/quinn-proto/src/tests/proptest.rs +++ b/quinn-proto/src/tests/proptest.rs @@ -512,3 +512,47 @@ fn regression_path_validation() { pair.server_conn_mut(server_ch) ))); } + +/// This regression test used to fail with the client never becoming idle. +/// It kept sending PATH_CHALLENGEs forever. +/// +/// The situation in which that happened was this: +/// 1. The server closes the connection, but the close frame is lost. +/// 2. The client opens another path on the same 4-tuple (thus that path is immediately validated). +/// 3. It immediately closes path 0 afterwards. +/// +/// At this point, the server is already fully checked out and not responding anymore. +/// The client however thinks the connection is still ongoing and continues sending (that's fine). +/// However, it never stops sending path challenges, because of a bug where only when the +/// path validation timer times out, the path challenge lost timer was stopped. This means +/// the client would keep re-sending path challenges infinitely (never getting a response, +/// which would also stop the challenge lost timer). +/// +/// Correctly stopping the path challenge lost timer fixes this. +#[test] +fn regression_never_idle3() { + let prefix = "regression_never_idle3"; + let seed = [0u8; 32]; + let interactions = vec![ + TestOp::CloseConn(Side::Server, 0), + TestOp::Drive(Side::Server), + TestOp::DropInbound(Side::Client), + TestOp::OpenPath(Side::Client, PathStatus::Available, 0), + TestOp::ClosePath(Side::Client, 0, 0), + TestOp::AdvanceTime, + ]; + + let _guard = subscribe(); + let routes = RoutingTable::simple_symmetric([CLIENT_ADDRS[0]], [SERVER_ADDRS[0]]); + let mut pair = setup_deterministic_with_multipath(seed, routes, prefix); + let (client_ch, server_ch) = + run_random_interaction(&mut pair, interactions, multipath_transport_config(prefix)); + + assert!(!pair.drive_bounded(1000), "connection never became idle"); + assert!(allowed_error(poll_to_close( + pair.client_conn_mut(client_ch) + ))); + assert!(allowed_error(poll_to_close( + pair.server_conn_mut(server_ch) + ))); +} diff --git a/quinn-proto/src/tests/random_interaction.rs b/quinn-proto/src/tests/random_interaction.rs index a4f83d5be..68dde54b9 100644 --- a/quinn-proto/src/tests/random_interaction.rs +++ b/quinn-proto/src/tests/random_interaction.rs @@ -9,7 +9,7 @@ use test_strategy::Arbitrary; use tracing::{debug, trace}; use crate::{ - Connection, ConnectionHandle, Dir, PathId, PathStatus, StreamId, TransportConfig, + Connection, ConnectionHandle, Dir, FourTuple, PathId, PathStatus, StreamId, TransportConfig, tests::{Pair, TestEndpoint, client_config}, }; @@ -123,7 +123,11 @@ impl TestOp { Side::Server => server, }; let conn = state.conn(pair)?; - conn.open_path(remote, initial_status, now).ok(); + let network_path = FourTuple { + remote, + local_ip: None, + }; + conn.open_path(network_path, initial_status, now).ok(); } Self::ClosePath(side, path_idx, error_code) => { let state = match side { diff --git a/quinn-proto/src/tests/util.rs b/quinn-proto/src/tests/util.rs index 1a3ac0ade..a3906411c 100644 --- a/quinn-proto/src/tests/util.rs +++ b/quinn-proto/src/tests/util.rs @@ -364,6 +364,20 @@ impl Pair { pub(super) fn server_datagrams(&mut self, ch: ConnectionHandle) -> Datagrams<'_> { self.server_conn_mut(ch).datagrams() } + + pub(super) fn addrs_to_server(&self) -> FourTuple { + FourTuple { + remote: self.server.addr, + local_ip: Some(self.client.addr.ip()), + } + } + + pub(super) fn addrs_to_client(&self) -> FourTuple { + FourTuple { + remote: self.client.addr, + local_ip: Some(self.server.addr.ip()), + } + } } impl Default for Pair { @@ -469,9 +483,13 @@ impl TestEndpoint { remote, dst_ip, } = self.inbound.pop_front().unwrap(); - if let Some(event) = self - .endpoint - .handle(recv_time, remote, dst_ip, ecn, packet, &mut buf) + let network_path = FourTuple { + remote, + local_ip: dst_ip, + }; + if let Some(event) = + self.endpoint + .handle(recv_time, network_path, ecn, packet, &mut buf) { match event { DatagramEvent::NewConnection(incoming) => { diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index 04a48e561..c1291b574 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -28,9 +28,9 @@ use crate::{ udp_transmit, }; use proto::{ - ConnectionError, ConnectionHandle, ConnectionStats, Dir, EndpointEvent, PathError, PathEvent, - PathId, PathStats, PathStatus, Side, StreamEvent, StreamId, TransportError, TransportErrorCode, - congestion::Controller, iroh_hp, + ConnectionError, ConnectionHandle, ConnectionStats, Dir, EndpointEvent, FourTuple, PathError, + PathEvent, PathId, PathStats, PathStatus, Side, StreamEvent, StreamId, TransportError, + TransportErrorCode, congestion::Controller, iroh_hp, }; /// In-progress connection attempt future @@ -171,25 +171,7 @@ impl Connecting { }) } - /// The local IP address which was used when the peer established - /// the connection - /// - /// This can be different from the address the endpoint is bound to, in case - /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`. - /// - /// This will return `None` for clients, or when the platform does not expose this - /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of - /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default. - /// - /// Will panic if called after `poll` has returned `Ready`. - pub fn local_ip(&self) -> Option { - let conn = self.conn.as_ref().unwrap(); - let inner = conn.state.lock("local_ip"); - - inner.inner.local_ip() - } - - /// The peer's UDP address + /// The peer's UDP addresses /// /// Will panic if called after `poll` has returned `Ready`. pub fn remote_address(&self) -> SocketAddr { @@ -199,8 +181,9 @@ impl Connecting { .state .lock("remote_address") .inner - .path_remote_address(PathId::ZERO) + .network_path(PathId::ZERO) .expect("path exists when connecting") + .remote } } @@ -391,8 +374,8 @@ impl Connection { .filter_map(|id| { state .inner - .path_remote_address(*id) - .map(|ip| ip.is_ipv6()) + .network_path(*id) + .map(|addrs| addrs.remote.is_ipv6()) .ok() }) .next() @@ -407,7 +390,13 @@ impl Connection { }; let now = state.runtime.now(); - let open_res = state.inner.open_path_ensure(addr, initial_status, now); + // TODO(matheus23): For now this means it's impossible to make use of short-circuiting path validation currently. + // However, changing that would mean changing the API. + let addrs = FourTuple { + remote: addr, + local_ip: None, + }; + let open_res = state.inner.open_path_ensure(addrs, initial_status, now); state.wake(); match open_res { Ok((path_id, existed)) if existed => { @@ -454,8 +443,8 @@ impl Connection { .filter_map(|id| { state .inner - .path_remote_address(*id) - .map(|ip| ip.is_ipv6()) + .network_path(*id) + .map(|addrs| addrs.remote.is_ipv6()) .ok() }) .next() @@ -471,7 +460,11 @@ impl Connection { let (on_open_path_send, on_open_path_recv) = watch::channel(Ok(())); let now = state.runtime.now(); - let open_res = state.inner.open_path(addr, initial_status, now); + let addrs = FourTuple { + remote: addr, + local_ip: None, + }; + let open_res = state.inner.open_path(addrs, initial_status, now); state.wake(); match open_res { Ok(path_id) => { @@ -725,9 +718,10 @@ impl Connection { .inner .paths() .iter() - .filter_map(|id| state.inner.path_remote_address(*id).ok()) + .filter_map(|id| state.inner.network_path(*id).ok()) .next() .unwrap() + .remote } /// The local IP address which was used when the peer established @@ -740,7 +734,16 @@ impl Connection { /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default. pub fn local_ip(&self) -> Option { - self.0.state.lock("local_ip").inner.local_ip() + // TODO: an unwrap again + let state = self.0.state.lock("remote_address"); + state + .inner + .paths() + .iter() + .filter_map(|id| state.inner.network_path(*id).ok()) + .next() + .unwrap() + .local_ip } /// Current best estimate of this connection's latency (round-trip-time) diff --git a/quinn/src/endpoint.rs b/quinn/src/endpoint.rs index b62aeb98c..36b6e68e7 100644 --- a/quinn/src/endpoint.rs +++ b/quinn/src/endpoint.rs @@ -27,7 +27,7 @@ use bytes::{Bytes, BytesMut}; use pin_project_lite::pin_project; use proto::{ self as proto, ClientConfig, ConnectError, ConnectionError, ConnectionHandle, DatagramEvent, - EndpointEvent, ServerConfig, + EndpointEvent, FourTuple, ServerConfig, }; use rustc_hash::FxHashMap; #[cfg(all( @@ -851,10 +851,13 @@ impl RecvState { while !data.is_empty() { let buf = data.split_to(meta.stride.min(data.len())); let mut response_buffer = Vec::new(); + let addresses = FourTuple { + remote: meta.addr, + local_ip: meta.dst_ip, + }; match endpoint.handle( now, - meta.addr, - meta.dst_ip, + addresses, meta.ecn.map(proto_ecn), buf, &mut response_buffer, diff --git a/quinn/src/path.rs b/quinn/src/path.rs index 1d7bf4a5f..1a7b68b19 100644 --- a/quinn/src/path.rs +++ b/quinn/src/path.rs @@ -209,7 +209,7 @@ impl Path { /// The peer's UDP address for this path. pub fn remote_address(&self) -> Result { let state = self.conn.state.lock("per_path_remote_address"); - state.inner.path_remote_address(self.id) + Ok(state.inner.network_path(self.id)?.remote) } /// Ping the remote endpoint over this path.