Skip to content

Commit

Permalink
Don't buffer endpoint-generated datagrams
Browse files Browse the repository at this point in the history
Dropping these has low cost because they're not associated with any
connection.
  • Loading branch information
Ralith committed Dec 20, 2023
1 parent 269672f commit e1fb663
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 74 deletions.
92 changes: 26 additions & 66 deletions quinn/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use udp::{RecvMeta, BATCH_SIZE};

use crate::{
connection::Connecting, work_limiter::WorkLimiter, ConnectionEvent, EndpointConfig, VarInt,
IO_LOOP_BOUND, MAX_TRANSMIT_QUEUE_CONTENTS_LEN, RECV_TIME_BOUND, SEND_TIME_BOUND,
IO_LOOP_BOUND, RECV_TIME_BOUND,
};

/// A QUIC endpoint.
Expand Down Expand Up @@ -216,7 +216,6 @@ impl Endpoint {
let socket = self.runtime.wrap_udp_socket(socket)?;
let mut inner = self.inner.state.lock().unwrap();
inner.socket = socket;
inner.io_poller = inner.socket.clone().create_io_poller();
inner.ipv6 = addr.is_ipv6();

// Update connection socket references
Expand Down Expand Up @@ -332,7 +331,6 @@ impl Future for EndpointDriver {
let mut keep_going = false;
keep_going |= endpoint.drive_recv(cx, now)?;
keep_going |= endpoint.handle_events(cx, &self.0.shared);
keep_going |= endpoint.drive_send(cx)?;

if !endpoint.incoming.is_empty() {
self.0.shared.incoming.notify_waiters();
Expand Down Expand Up @@ -374,7 +372,6 @@ pub(crate) struct EndpointInner {
pub(crate) struct State {
socket: Arc<dyn AsyncUdpSocket>,
inner: proto::Endpoint,
outgoing: VecDeque<udp::Transmit>,
incoming: VecDeque<Connecting>,
driver: Option<Waker>,
ipv6: bool,
Expand All @@ -385,10 +382,7 @@ pub(crate) struct State {
driver_lost: bool,
recv_limiter: WorkLimiter,
recv_buf: Box<[u8]>,
send_limiter: WorkLimiter,
runtime: Arc<dyn Runtime>,
/// The aggregateed contents length of the packets in the transmit queue
transmit_queue_contents_len: usize,
}

#[derive(Debug)]
Expand Down Expand Up @@ -449,22 +443,31 @@ impl State {
.send(ConnectionEvent::Proto(event));
}
Some(DatagramEvent::Response(transmit)) => {
// Limiting the memory usage for items queued in the outgoing queue from endpoint
// generated packets. Otherwise, we may see a build-up of the queue under test with
// flood of initial packets against the endpoint. The sender with the sender-limiter
// may not keep up the pace of these packets queued into the queue.
if self.transmit_queue_contents_len
< MAX_TRANSMIT_QUEUE_CONTENTS_LEN
{
let contents_len = transmit.size;
self.outgoing.push_back(udp_transmit(
transmit,
buffer.split_to(contents_len).freeze(),
));
self.transmit_queue_contents_len = self
.transmit_queue_contents_len
.saturating_add(contents_len);
}
// Send if there's kernel buffer space; otherwise, drop it
//
// As an endpoint-generated packet, we know this is an
// immediate, stateless response to an unconnected peer,
// one of:
//
// - A version negotiation response due to an unknown version
// - A `CLOSE` due to a malformed or unwanted connection attempt
// - A stateless reset due to an unrecognized connection
// - A `Retry` packet due to a connection attempt when
// `use_retry` is set
//
// In each case, a well-behaved peer can be trusted to retry a
// few times, which is guaranteed to produce the same response
// from us. Repeated failures might at worst cause a peer's new
// connection attempt to time out, which is acceptable if we're
// under such heavy load that there's never room for this code
// to transmit. This is morally equivalent to the packet getting
// lost due to congestion further along the link, which
// similarly relies on peer retries for recovery.
let contents_len = transmit.size;
_ = self.socket.try_send(&[udp_transmit(
transmit,
buffer.split_to(contents_len).freeze(),
)]);
}
None => {}
}
Expand Down Expand Up @@ -493,46 +496,6 @@ impl State {
Ok(false)
}

fn drive_send(&mut self, cx: &mut Context) -> Result<bool, io::Error> {
self.send_limiter.start_cycle();

let result = loop {
if self.outgoing.is_empty() {
break Ok(false);
}

if !self.send_limiter.allow_work() {
break Ok(true);
}

if self.io_poller.as_mut().poll_writable(cx)?.is_pending() {
break Ok(false);
}

match self.socket.try_send(self.outgoing.as_slices().0) {
Ok(n) => {
let contents_len: usize =
self.outgoing.drain(..n).map(|t| t.contents.len()).sum();
self.transmit_queue_contents_len = self
.transmit_queue_contents_len
.saturating_sub(contents_len);
// We count transmits instead of `poll_send` calls since the cost
// of a `sendmmsg` still linearly increases with number of packets.
self.send_limiter.record_work(n);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
break Err(e);
}
}
};

self.send_limiter.finish_cycle();
result
}

fn handle_events(&mut self, cx: &mut Context, shared: &Shared) -> bool {
for _ in 0..IO_LOOP_BOUND {
let (ch, event) = match self.events.poll_recv(cx) {
Expand Down Expand Up @@ -678,7 +641,6 @@ impl EndpointRef {
inner,
ipv6,
events,
outgoing: VecDeque::new(),
incoming: VecDeque::new(),
driver: None,
connections: ConnectionSet {
Expand All @@ -690,9 +652,7 @@ impl EndpointRef {
driver_lost: false,
recv_buf: recv_buf.into(),
recv_limiter: WorkLimiter::new(RECV_TIME_BOUND),
send_limiter: WorkLimiter::new(SEND_TIME_BOUND),
runtime,
transmit_queue_contents_len: 0,
}),
}))
}
Expand Down
8 changes: 0 additions & 8 deletions quinn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,6 @@ const IO_LOOP_BOUND: usize = 160;
/// batch of size 32 was observed to take 30us on some systems.
const RECV_TIME_BOUND: Duration = Duration::from_micros(50);

/// The maximum amount of time that should be spent in `sendmsg()` calls per endpoint iteration
const SEND_TIME_BOUND: Duration = Duration::from_micros(50);

/// The maximum size of content length of packets in the outgoing transmit queue. Transmit packets
/// generated from the endpoint (retry or initial close) can be dropped when this limit is being execeeded.
/// Chose to represent 100 MB of data.
const MAX_TRANSMIT_QUEUE_CONTENTS_LEN: usize = 100_000_000;

fn udp_transmit(t: proto::Transmit, buffer: Bytes) -> udp::Transmit {
udp::Transmit {
destination: t.destination,
Expand Down

0 comments on commit e1fb663

Please sign in to comment.