Skip to content

Commit

Permalink
Socket rebind: drain old socket
Browse files Browse the repository at this point in the history
During a planned/active connection migration allow the client to
receive trafic via the old, abandoned socket until the first packet
arrives on the socket.
  • Loading branch information
nemethf authored and Ralith committed Apr 23, 2024
1 parent 1a475a5 commit fa155ea
Showing 1 changed file with 38 additions and 4 deletions.
42 changes: 38 additions & 4 deletions quinn/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
future::Future,
io,
io::IoSliceMut,
mem,
net::{SocketAddr, SocketAddrV6},
pin::Pin,
str,
Expand Down Expand Up @@ -213,7 +214,7 @@ impl Endpoint {
let addr = socket.local_addr()?;
let socket = self.runtime.wrap_udp_socket(socket)?;
let mut inner = self.inner.state.lock().unwrap();
inner.socket = socket;
inner.prev_socket = Some(mem::replace(&mut inner.socket, socket));
inner.ipv6 = addr.is_ipv6();

// Generate some activity so peers notice the rebind
Expand Down Expand Up @@ -407,6 +408,9 @@ impl EndpointInner {
#[derive(Debug)]
pub(crate) struct State {
socket: Arc<dyn AsyncUdpSocket>,
/// During an active migration, abandoned_socket receives traffic
/// until the first packet arrives on the new socket.
prev_socket: Option<Arc<dyn AsyncUdpSocket>>,
inner: proto::Endpoint,
transmit_state: TransmitState,
recv_state: RecvState,
Expand All @@ -429,6 +433,19 @@ pub(crate) struct Shared {
impl State {
fn drive_recv(&mut self, cx: &mut Context, now: Instant) -> Result<bool, io::Error> {
self.recv_state.recv_limiter.start_cycle();
if let Some(socket) = &self.prev_socket {
// We don't care about the `PollProgress` from old sockets.
let poll_res = self.recv_state.poll_socket(
cx,
&mut self.inner,
&mut self.transmit_state,
&**socket,
now,
);
if poll_res.is_err() {
self.prev_socket = None;
}
};
let poll_res = self.recv_state.poll_socket(
cx,
&mut self.inner,
Expand All @@ -437,7 +454,13 @@ impl State {
now,
);
self.recv_state.recv_limiter.finish_cycle();
poll_res.map(|x| x.keep_going)
let poll_res = poll_res?;
if poll_res.received_connection_packet {
// Traffic has arrived on self.socket, therefore there is no need for the abandoned
// one anymore. TODO: Account for multiple outgoing connections.
self.prev_socket = None;
}
Ok(poll_res.keep_going)
}

fn drive_send(&mut self, cx: &mut Context) -> Result<bool, io::Error> {
Expand Down Expand Up @@ -679,6 +702,7 @@ impl EndpointRef {
},
state: Mutex::new(State {
socket,
prev_socket: None,
inner,
transmit_state: TransmitState::default(),
ipv6,
Expand Down Expand Up @@ -765,6 +789,7 @@ impl RecvState {
socket: &dyn AsyncUdpSocket,
now: Instant,
) -> Result<PollProgress, io::Error> {
let mut received_connection_packet = false;
let mut metas = [RecvMeta::default(); BATCH_SIZE];
let mut iovs: [IoSliceMut; BATCH_SIZE] = {
let mut bufs = self
Expand Down Expand Up @@ -807,6 +832,7 @@ impl RecvState {
}
Some(DatagramEvent::ConnectionEvent(handle, event)) => {
// Ignoring errors from dropped connections that haven't yet been cleaned up
received_connection_packet = true;
let _ = self
.connections
.senders
Expand All @@ -823,7 +849,10 @@ impl RecvState {
}
}
Poll::Pending => {
return Ok(PollProgress { keep_going: false });
return Ok(PollProgress {
received_connection_packet,
keep_going: false,
});
}
// Ignore ECONNRESET as it's undefined in QUIC and may be injected by an
// attacker
Expand All @@ -835,14 +864,19 @@ impl RecvState {
}
}
if !self.recv_limiter.allow_work() {
return Ok(PollProgress { keep_going: true });
return Ok(PollProgress {
received_connection_packet,
keep_going: true,
});
}
}
}
}

#[derive(Default)]
struct PollProgress {
/// Whether a datagram was routed to an existing connection
received_connection_packet: bool,
/// Whether datagram handling was interrupted early by the work limiter for fairness
keep_going: bool,
}

0 comments on commit fa155ea

Please sign in to comment.