Skip to content

Commit

Permalink
Socket rebind: drain old socket
Browse files Browse the repository at this point in the history
During a planned/active connection migration allow the client to
receive trafic via the old, abandoned socket until the first packet
arrives on the socket.
  • Loading branch information
nemethf authored and Ralith committed Apr 20, 2024
1 parent 27d8b7d commit 8193b11
Showing 1 changed file with 52 additions and 11 deletions.
63 changes: 52 additions & 11 deletions quinn/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
future::Future,
io,
io::IoSliceMut,
mem,
net::{SocketAddr, SocketAddrV6},
pin::Pin,
str,
Expand Down Expand Up @@ -215,7 +216,7 @@ impl Endpoint {
let addr = socket.local_addr()?;
let socket = self.runtime.wrap_udp_socket(socket)?;
let mut inner = self.inner.state.lock().unwrap();
inner.socket = socket;
inner.abandoned_socket = Some(mem::replace(&mut inner.socket, socket));
inner.ipv6 = addr.is_ipv6();

// Generate some activity so peers notice the rebind
Expand Down Expand Up @@ -419,6 +420,9 @@ impl EndpointInner {
pub(crate) struct State {
socket: Arc<dyn AsyncUdpSocket>,
inner: proto::Endpoint,
/// During an active migration, abandoned_socket receives traffic
/// until the first packet arrives on the new socket.
abandoned_socket: Option<Arc<dyn AsyncUdpSocket>>,
recv_state: RecvState,
driver: Option<Waker>,
ipv6: bool,
Expand All @@ -439,11 +443,27 @@ pub(crate) struct Shared {
impl State {
fn drive_recv(&mut self, cx: &mut Context, now: Instant) -> Result<bool, io::Error> {
self.recv_state.recv_limiter.start_cycle();
let poll_res = self
.recv_state
.poll_socket(cx, &mut self.inner, &*self.socket, now)?;
let mut poll_res = PollResult::default();
if let Some(socket) = &self.abandoned_socket {
poll_res = self
.recv_state
.poll_socket(cx, &mut self.inner, &**socket, now);
};
if !poll_res.keep_going {
poll_res = self
.recv_state
.poll_socket(cx, &mut self.inner, &*self.socket, now);
if poll_res.received_connection_packet {
// Traffic has arrived on self.socket, therefore
// there is no need for the abandoned one anymore.
self.abandoned_socket = None;
}
}
self.recv_state.recv_limiter.finish_cycle();
Ok(poll_res)
match poll_res.error {
None => Ok(poll_res.keep_going),
Some(err) => Err(err),
}
}

fn drive_send(&mut self, cx: &mut Context) -> Result<bool, io::Error> {
Expand Down Expand Up @@ -689,6 +709,7 @@ impl EndpointRef {
state: Mutex::new(State {
socket,
inner,
abandoned_socket: None,
ipv6,
events,
driver: None,
Expand Down Expand Up @@ -731,6 +752,7 @@ impl std::ops::Deref for EndpointRef {
&self.0
}
}

/// State directly involved in handling incoming packets
#[derive(Debug)]
struct RecvState {
Expand Down Expand Up @@ -772,7 +794,8 @@ impl RecvState {
endpoint: &mut proto::Endpoint,
socket: &dyn AsyncUdpSocket,
now: Instant,
) -> Result<bool, io::Error> {
) -> PollResult {
let mut received_connection_packet = false;
let mut metas = [RecvMeta::default(); BATCH_SIZE];
let mut iovs: [IoSliceMut; BATCH_SIZE] = {
let mut bufs = self
Expand Down Expand Up @@ -813,6 +836,7 @@ impl RecvState {
}
Some(DatagramEvent::ConnectionEvent(handle, event)) => {
// Ignoring errors from dropped connections that haven't yet been cleaned up
received_connection_packet = true;
let _ = self
.connections
.senders
Expand All @@ -829,22 +853,39 @@ impl RecvState {
}
}
Poll::Pending => {
break;
return PollResult {
received_connection_packet,
keep_going: false,
error: None,
};
}
// Ignore ECONNRESET as it's undefined in QUIC and may be injected by an
// attacker
Poll::Ready(Err(ref e)) if e.kind() == io::ErrorKind::ConnectionReset => {
continue;
}
Poll::Ready(Err(e)) => {
return Err(e);
return PollResult {
received_connection_packet,
keep_going: false,
error: Some(e),
};
}
}
if !self.recv_limiter.allow_work() {
return Ok(true);
return PollResult {
received_connection_packet,
keep_going: true,
error: None,
};
}
}

Ok(false)
}
}

#[derive(Default)]
struct PollResult {
received_connection_packet: bool,
keep_going: bool,
error: Option<io::Error>,
}

0 comments on commit 8193b11

Please sign in to comment.