Skip to content

Commit

Permalink
fix(udp): handle multiple datagrams through gro
Browse files Browse the repository at this point in the history
Previously `Socket::recv` would at most return a single `Datagram` (i.e. `->
Result<Option<Datagram>, io::Error>`). When supported by the OS, the underlying
`quinn-udp` can use both recvMmsg and GRO, each with the ability to return one
or more datagrams.

As of today, `neqo_common::udp` does not make use of recvmmsg, i.e. it only
provides a single `IoSliceMut` to write into. That said, that single
`IoSliceMut` might contain multiple `Datagram`s through GRO. Previously this
would have been provided as a single `Datagram` to the caller of `Socket::recv`.

This commit makes sure to handle the case where many `Datagram`s are retrieved
via GRO (see `meta.stride` flag). It updates `neqo_common::udp::Socket::recv`
and `neqo-server` and `neqo-client` accordingly.
  • Loading branch information
mxinden committed Mar 3, 2024
1 parent bc1659a commit 5db3ca8
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 92 deletions.
94 changes: 38 additions & 56 deletions neqo-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ use futures::{
future::{select, Either},
FutureExt, TryFutureExt,
};
use neqo_common::{
self as common, event::Provider, hex, qdebug, qinfo, qlog::NeqoQlog, udp, Datagram, Role,
};
use neqo_common::{self as common, event::Provider, hex, qdebug, qinfo, qlog::NeqoQlog, udp, Role};
use neqo_crypto::{
constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256},
init, AuthenticationStatus, Cipher, ResumptionToken,
Expand Down Expand Up @@ -822,15 +820,29 @@ impl<'a> ClientRunner<'a> {
break;
}

self.process(None).await?;
match self.client.process_output(Instant::now()) {
Output::Datagram(dgram) => {
self.socket.writable().await?;
self.socket.send(dgram)?;
continue;
}
Output::Callback(new_timeout) => {
qinfo!("Setting timeout of {:?}", new_timeout);
self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout)));
}
Output::None => {
qdebug!("Output::None");
}
}

match ready(self.socket, self.timeout.as_mut()).await? {
Ready::Socket => loop {
let dgram = self.socket.recv(&self.local_addr)?;
if dgram.is_none() {
let dgrams = self.socket.recv(&self.local_addr)?;
if dgrams.is_empty() {
break;
}
self.process(dgram.as_ref()).await?;
self.client
.process_multiple_input(dgrams.iter(), Instant::now());
self.handler.maybe_key_update(&mut self.client)?;
},
Ready::Timeout => {
Expand All @@ -856,28 +868,6 @@ impl<'a> ClientRunner<'a> {
};
Ok(token)
}

async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> {
loop {
match self.client.process(dgram.take(), Instant::now()) {
Output::Datagram(dgram) => {
self.socket.writable().await?;
self.socket.send(dgram)?;
}
Output::Callback(new_timeout) => {
qinfo!("Setting timeout of {:?}", new_timeout);
self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout)));
break;
}
Output::None => {
qdebug!("Output::None");
break;
}
}
}

Ok(())
}
}

fn create_http3_client(
Expand Down Expand Up @@ -1055,15 +1045,15 @@ mod old {
cell::RefCell,
collections::{HashMap, VecDeque},
fs::File,
io::{self, Write},
io::Write,
net::SocketAddr,
path::PathBuf,
pin::Pin,
rc::Rc,
time::Instant,
};

use neqo_common::{event::Provider, qdebug, qinfo, udp, Datagram};
use neqo_common::{event::Provider, qdebug, qinfo, udp};
use neqo_crypto::{AuthenticationStatus, ResumptionToken};
use neqo_transport::{
Connection, ConnectionEvent, EmptyConnectionIdGenerator, Error, Output, State, StreamId,
Expand Down Expand Up @@ -1331,7 +1321,20 @@ mod old {
}
}

self.process(None).await?;
match self.client.process_output(Instant::now()) {
Output::Datagram(dgram) => {
self.socket.writable().await?;
self.socket.send(dgram)?;
continue;
}
Output::Callback(new_timeout) => {
qinfo!("Setting timeout of {:?}", new_timeout);
self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout)));
}
Output::None => {
qdebug!("Output::None");
}
}

if let State::Closed(..) = self.client.state() {
return Ok(self.handler.token.take());
Expand All @@ -1340,10 +1343,11 @@ mod old {
match ready(self.socket, self.timeout.as_mut()).await? {
Ready::Socket => loop {
let dgram = self.socket.recv(&self.local_addr)?;
if dgram.is_none() {
if dgram.is_empty() {
break;
}
self.process(dgram.as_ref()).await?;
self.client
.process_multiple_input(dgram.iter(), Instant::now());
self.handler.maybe_key_update(&mut self.client)?;
},
Ready::Timeout => {
Expand All @@ -1352,27 +1356,5 @@ mod old {
}
}
}

async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> {
loop {
match self.client.process(dgram.take(), Instant::now()) {
Output::Datagram(dgram) => {
self.socket.writable().await?;
self.socket.send(dgram)?;
}
Output::Callback(new_timeout) => {
qinfo!("Setting timeout of {:?}", new_timeout);
self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout)));
break;
}
Output::None => {
qdebug!("Output::None");
break;
}
}
}

Ok(())
}
}
}
59 changes: 26 additions & 33 deletions neqo-common/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl Socket {
}

/// Receive a UDP datagram on the specified socket.
pub fn recv(&mut self, local_address: &SocketAddr) -> Result<Option<Datagram>, io::Error> {
pub fn recv(&mut self, local_address: &SocketAddr) -> Result<Vec<Datagram>, io::Error> {
let mut meta = RecvMeta::default();

match self.socket.try_io(Interest::READABLE, || {
Expand All @@ -89,7 +89,7 @@ impl Socket {
if err.kind() == io::ErrorKind::WouldBlock
|| err.kind() == io::ErrorKind::Interrupted =>
{
return Ok(None)
return Ok(vec![])
}
Err(err) => {
return Err(err);
Expand All @@ -98,32 +98,32 @@ impl Socket {

if meta.len == 0 {
eprintln!("zero length datagram received?");
return Ok(None);
return Ok(vec![]);
}

if meta.len == self.recv_buf.len() {
eprintln!(
"Might have received more than {} bytes",
self.recv_buf.len()
);
}

Ok(Some(Datagram::new(
meta.addr,
*local_address,
meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(),
None, // TODO: get the real TTL https://github.com/quinn-rs/quinn/issues/1749
&self.recv_buf[..meta.len],
)))
Ok(self.recv_buf[0..meta.len]
.chunks(meta.stride.min(self.recv_buf.len()))
.map(|d| {
Datagram::new(
meta.addr,
*local_address,
meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(),
None, // TODO: get the real TTL https://github.com/quinn-rs/quinn/issues/1749
d,
)
})
.collect())
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use tokio::time::timeout;

use super::*;
use crate::{IpTos, IpTosDscp, IpTosEcn};

Expand All @@ -148,6 +148,8 @@ mod tests {
let received_datagram = receiver
.recv(&receiver_addr)
.expect("receive to succeed")
.into_iter()
.next()
.expect("receive to yield datagram");

// Assert that the ECN is correct.
Expand All @@ -163,6 +165,8 @@ mod tests {
#[tokio::test]
#[cfg_attr(not(any(target_os = "linux", target_os = "windows")), ignore)]
async fn many_datagrams_through_gro() -> Result<(), io::Error> {
const SEGMENT_SIZE: usize = 128;

let sender = Socket::bind("127.0.0.1:0")?;
let receiver_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut receiver = Socket::bind(receiver_addr)?;
Expand All @@ -171,7 +175,6 @@ mod tests {
// (https://github.com/mozilla/neqo/issues/1693) support GSO. Use
// `quinn_udp` directly.
let max_segments = sender.state.max_gso_segments();
const SEGMENT_SIZE: usize = 128;
let msg = vec![0xAB; SEGMENT_SIZE * max_segments];
let transmit = Transmit {
destination: receiver.local_addr()?,
Expand All @@ -191,25 +194,15 @@ mod tests {
})?;
assert_eq!(n, 1, "only passed one slice");

for i in 0..max_segments {
// Wait for socket to become readable.
timeout(Duration::from_secs(1), receiver.readable())
.await
.map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
format!("timeout waiting for {i}. datagram"),
)
})??;
// Read from socket.
let received_datagram = receiver
.recv(&receiver_addr)
.expect("receive to succeed")
.expect("receive to yield datagram");
receiver.readable().await?;
let received_datagrams = receiver.recv(&receiver_addr).expect("receive to succeed");

assert_eq!(received_datagrams.len(), max_segments);
for datagram in received_datagrams {
assert_eq!(
SEGMENT_SIZE,
received_datagram.len(),
"Expect received datagrams to have same length as sent datagrams."
datagram.len(),
"Expect received datagram to have same length as sent datagram."
);
}

Expand Down
8 changes: 5 additions & 3 deletions neqo-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,11 +688,13 @@ impl ServersRunner {
match self.ready().await? {
Ready::Socket(inx) => loop {
let (host, socket) = self.sockets.get_mut(inx).unwrap();
let dgram = socket.recv(host)?;
if dgram.is_none() {
let dgrams = socket.recv(host)?;
if dgrams.is_empty() {
break;
}
self.process(dgram.as_ref()).await?;
for dgram in dgrams {
self.process(Some(&dgram)).await?;
}
},
Ready::Timeout => {
self.timeout = None;
Expand Down

0 comments on commit 5db3ca8

Please sign in to comment.