From 8ea3dd9b79697b06fc3a3891aa37cdf3c5f34252 Mon Sep 17 00:00:00 2001 From: Evan Rittenhouse Date: Thu, 2 May 2024 09:07:35 -0500 Subject: [PATCH] API changes 1) Remove Mio layer and replace it with a sync FD layer 2) Move RecvData, RecvMetrics, and StoreCmsgSettings to `lib`, since syscalls is private 3) Rework the signature of `recv_msg` to take a `StoreCmsgSettings` to allow applications to handle custom cmsgs --- apps/src/sendto.rs | 33 +++++++++---- dgram/src/lib.rs | 75 +++++++++++++++++++++++++++++ dgram/src/sync.rs | 81 ++++++++++--------------------- dgram/src/syscalls.rs | 109 +++++++++++++++++++++++------------------- dgram/src/tokio.rs | 16 ++----- 5 files changed, 189 insertions(+), 125 deletions(-) diff --git a/apps/src/sendto.rs b/apps/src/sendto.rs index 07f562f6ca..4573807ab3 100644 --- a/apps/src/sendto.rs +++ b/apps/src/sendto.rs @@ -55,15 +55,30 @@ fn send_to_gso_pacing( ) -> io::Result { use dgram::GsoSettings; - dgram::sync::send_to( - socket, - buf, - Some(GsoSettings { - segment_size: segment_size as u16, - }), - Some(send_info.at), - &send_info.to, - ) + loop { + // Important to use try_io so events keep coming even if we see + // EAGAIN/EWOULDBLOCK + let res = socket.try_io(|| { + // mio::net::UdpSocket doesn't implement AsFd (yet?). + let fd = unsafe { + std::os::fd::BorrowedFd::borrow_raw(socket.as_raw_fd()) + }; + + dgram::sync::send_to( + socket, + buf, + Some(GsoSettings { + segment_size: segment_size as u16, + }), + Some(send_info.at), + &send_info.to, + ) + }); + + if let Ok(sent) = res { + return sent.map_err(Into::into); + } + } } /// For non-Linux platforms. diff --git a/dgram/src/lib.rs b/dgram/src/lib.rs index 17f4eaec17..e68db9361b 100644 --- a/dgram/src/lib.rs +++ b/dgram/src/lib.rs @@ -4,16 +4,91 @@ mod syscalls; #[cfg(feature = "async")] pub mod tokio; +use std::net::SocketAddr; +use std::time::SystemTime; + +use nix::sys::socket::ControlMessageOwned; + #[derive(Default, Copy, Clone)] pub struct GsoSettings { pub segment_size: u16, } +#[cfg(target_os = "linux")] +#[derive(Default)] +pub struct StoreCmsgSettings { + store_cmsgs: bool, + cmsg_space: Vec, +} + +/// Output of a `recvmsg` call. +#[derive(Default)] +pub struct RecvData { + /// The number of bytes which `recvmsg` returned. + pub bytes: usize, + /// The peer address for this message. + pub peer_addr: Option, + /// Metrics for this `recvmsg` call. + /// + /// If no valid metrics exist - for example, when the RXQOVFL sockopt is not + /// set - this will be `None`. + pub metrics: Option, + /// The `UDP_GRO_SEGMENTS` control message data from the result of + /// `recvmsg`, if it exist. + pub gro: Option, + /// The RX_TIME control message data from the result of `recvmsg`, if it + /// exists. + pub rx_time: Option, + cmsgs: Vec, +} + +impl RecvData { + pub fn new( + peer_addr: Option, bytes: usize, cmsg_space_len: usize, + ) -> Self { + Self { + peer_addr, + bytes, + metrics: None, + gro: None, + rx_time: None, + cmsgs: Vec::with_capacity(cmsg_space_len), + } + } + + pub fn from_bytes(bytes: usize) -> Self { + Self { + bytes, + ..Default::default() + } + } + + /// Returns the list of cmsgs which were returned from calling `recvmsg`. If + /// `recvmsg` was called with `recv_cmsgs` set to to `false`, this will + /// return an empty slice. + pub fn cmsgs(&self) -> &[ControlMessageOwned] { + &self.cmsgs + } +} + +/// Metrics for `recvmsg` calls. +#[derive(Default)] +pub struct RecvMetrics { + /// The number of packets dropped between the last received packet and this + /// one. + /// + /// See SO_RXQOVFL for more. + pub udp_packets_dropped: u64, +} + #[cfg(target_os = "linux")] mod linux_imports { pub(super) use crate::syscalls::recv_msg; pub(super) use crate::syscalls::send_msg; pub(super) use crate::GsoSettings; + pub(super) use crate::RecvData; + pub(super) use crate::RecvMetrics; + pub(super) use crate::StoreCmsgSettings; pub(super) use nix::errno::Errno; pub(super) use nix::sys::socket::getsockopt; pub(super) use nix::sys::socket::recvmsg; diff --git a/dgram/src/sync.rs b/dgram/src/sync.rs index 091a319f8c..266ad162fc 100644 --- a/dgram/src/sync.rs +++ b/dgram/src/sync.rs @@ -1,7 +1,8 @@ -use crate::syscalls::RecvData; -use mio::net::UdpSocket; +use crate::RecvData; +use crate::StoreCmsgSettings; use std::io::Result; use std::net::SocketAddr; +use std::os::fd::AsFd; use std::time::Instant; #[cfg(target_os = "linux")] @@ -9,68 +10,38 @@ use super::linux_imports::*; #[cfg(target_os = "linux")] pub fn send_to( - socket: &UdpSocket, send_buf: &[u8], gso_settings: Option, + fd: &impl AsFd, send_buf: &[u8], gso_settings: Option, tx_time: Option, client_addr: &SocketAddr, ) -> Result { - loop { - // Important to use try_io so events keep coming even if we see - // EAGAIN/EWOULDBLOCK - let res = socket.try_io(|| { - // mio::net::UdpSocket doesn't implement AsFd (yet?). - let fd = unsafe { - std::os::fd::BorrowedFd::borrow_raw(socket.as_raw_fd()) - }; + let sent = send_msg( + fd, + send_buf, + gso_settings, + tx_time, + &SockaddrStorage::from(*client_addr), + ); - let sent = send_msg( - fd, - send_buf, - gso_settings, - tx_time, - &SockaddrStorage::from(*client_addr), - ); - - match sent { - Err(Errno::EAGAIN) => Err(std::io::Error::last_os_error()), - _ => Ok(sent), - } - }); - - if let Ok(sent) = res { - return sent.map_err(Into::into); - } + match sent { + Err(Errno::EAGAIN) => Err(std::io::Error::last_os_error()), + _ => Ok(sent?), } } #[cfg(target_os = "linux")] -pub async fn recv_from( - socket: &UdpSocket, read_buf: &mut [u8], cmsg_space: &mut Vec, - msg_flags: Option, +pub fn recv_from( + fd: &impl AsFd, read_buf: &mut [u8], msg_flags: Option, + store_cmsg_settings: &mut StoreCmsgSettings, ) -> Result { - loop { - // Important to use try_io so events keep coming even if we see - // EAGAIN/EWOULDBLOCK - let res = socket.try_io(|| { - // mio::net::UdpSocket doesn't implement AsFd (yet?). - let fd = unsafe { - std::os::fd::BorrowedFd::borrow_raw(socket.as_raw_fd()) - }; - - let recvd = recv_msg( - fd, - read_buf, - cmsg_space, - msg_flags.unwrap_or(MsgFlags::empty()), - ); - - match recvd { - Err(Errno::EAGAIN) => Err(std::io::Error::last_os_error()), - _ => Ok(recvd), - } - }); + let recvd = recv_msg( + fd, + read_buf, + msg_flags.unwrap_or(MsgFlags::empty()), + store_cmsg_settings, + ); - if let Ok(recvd) = res { - return recvd.map_err(Into::into); - } + match recvd { + Err(Errno::EAGAIN) => Err(std::io::Error::last_os_error()), + _ => Ok(recvd?), } } diff --git a/dgram/src/syscalls.rs b/dgram/src/syscalls.rs index 7b79ce68aa..bc7fc27aee 100644 --- a/dgram/src/syscalls.rs +++ b/dgram/src/syscalls.rs @@ -1,4 +1,3 @@ -use std::net::SocketAddr; use std::time::SystemTime; #[cfg(target_os = "linux")] @@ -19,6 +18,7 @@ const INSTANT_ZERO: Instant = #[cfg(target_os = "linux")] pub(crate) type SyscallResult = std::result::Result; +/// GSO-compatible convenience wrapper for the `sendmsg` syscall. #[cfg(target_os = "linux")] pub fn send_msg( fd: impl AsFd, send_buf: &[u8], gso_settings: Option, @@ -53,7 +53,9 @@ pub fn send_msg( ) } -/// Receive a message via `recvmsg`. +/// Receive a message via `recvmsg`. The returned `RecvData` will contain data +/// from supported cmsgs regardless of if the passed [`StoreCmsgSettings`] +/// indicates that we should store the cmsgs. /// /// # Note /// @@ -61,12 +63,18 @@ pub fn send_msg( /// recommends that the space be created via the `cmsg_space!()` macro. #[cfg(target_os = "linux")] pub fn recv_msg( - fd: impl AsFd, read_buf: &mut [u8], cmsg_space: &mut Vec, - msg_flags: MsgFlags, + fd: impl AsFd, read_buf: &mut [u8], msg_flags: MsgFlags, + store_cmsg_settings: &mut StoreCmsgSettings, ) -> SyscallResult { + let StoreCmsgSettings { + store_cmsgs, + ref mut cmsg_space, + } = store_cmsg_settings; + cmsg_space.clear(); let iov_s = &mut [IoSliceMut::new(read_buf)]; + let cmsg_space_len = cmsg_space.len(); let borrowed = fd.as_fd(); match recvmsg::( @@ -94,7 +102,7 @@ pub fn recv_msg( _ => None, }; - let mut recv_data = RecvData::new(peer_addr, bytes); + let mut recv_data = RecvData::new(peer_addr, bytes, cmsg_space_len); for msg in r.cmsgs() { match msg { @@ -112,6 +120,10 @@ pub fn recv_msg( }, _ => return Err(Errno::EINVAL), } + + if *store_cmsgs { + recv_data.cmsgs.push(msg); + } } Ok(recv_data) @@ -120,46 +132,6 @@ pub fn recv_msg( } } -/// Output of a `recvmsg` call. -pub struct RecvData { - /// The number of bytes which `recvmsg` returned. - pub bytes: usize, - /// The peer address for this message. - pub peer_addr: Option, - /// Metrics for this `recvmsg` call. - /// - /// If no valid metrics exist - for example, when the RXQOVFL sockopt is not - /// set - this will be `None` to prevent confusion when parsing metrics. - pub metrics: Option, - /// The `UDP_GRO_SEGMENTS` control message from the result of `recvmsg`, if - /// it exist. - pub gro: Option, - /// The RX_TIME control message from the result of `recvmsg`, if it exists. - pub rx_time: Option, -} - -impl RecvData { - pub fn new(peer_addr: Option, bytes: usize) -> Self { - Self { - peer_addr, - bytes, - metrics: None, - gro: None, - rx_time: None, - } - } -} - -/// Metrics for `recvmsg` calls. -#[derive(Default)] -pub struct RecvMetrics { - /// The number of packets dropped between the last received packet and this - /// one. - /// - /// See SO_RXQOVFL for more. - pub udp_packets_dropped: u64, -} - #[cfg(all(test, target_os = "linux", not(target_os = "android")))] mod tests { use nix::cmsg_space; @@ -312,15 +284,19 @@ mod tests { let (send, recv) = new_sockets()?; let addr = getsockname::(recv.as_raw_fd()).unwrap(); + // Invalid because time advances past this Instant by the time we + // call sendmsg() + let invalid_time = Some(Instant::now()); + let send_buf = b"nyj"; + send_msg( send, send_buf, Some(GsoSettings { segment_size: UDP_MAX_GSO_PACKET_SIZE, }), - // Invalid because we pass this Instant by the time we call sendmsg() - Some(Instant::now()), + invalid_time, &addr, )?; @@ -353,14 +329,47 @@ mod tests { let iov = [IoSlice::new(send_buf)]; sendmsg(send.as_raw_fd(), &iov, &[], MsgFlags::empty(), Some(&addr))?; - let mut cmsg_space = cmsg_space!(TimeVal); let mut read_buf = [0; 4]; + let recv_data = recv_msg( + recv, + &mut read_buf, + MsgFlags::empty(), + &mut StoreCmsgSettings::default(), + )?; + + assert_eq!(recv_data.bytes, 4); + assert_eq!(&read_buf, b"jets"); + assert!(recv_data.cmsgs().is_empty()); + + Ok(()) + } + + #[test] + fn recv_from_cmsgs() -> Result<()> { + let (send, recv) = new_sockets()?; + let addr = getsockname::(recv.as_raw_fd()).unwrap(); - let recv_data = - recv_msg(recv, &mut read_buf, &mut cmsg_space, MsgFlags::empty())?; + let send_buf = b"jets"; + let iov = [IoSlice::new(send_buf)]; + sendmsg(send.as_raw_fd(), &iov, &[], MsgFlags::empty(), Some(&addr))?; + + let cmsg_space = cmsg_space!(TimeVal); + let mut store_cmsg_settings = StoreCmsgSettings { + store_cmsgs: true, + cmsg_space, + }; + + let mut read_buf = [0; 4]; + let recv_data = recv_msg( + recv, + &mut read_buf, + MsgFlags::empty(), + &mut store_cmsg_settings, + )?; assert_eq!(recv_data.bytes, 4); assert_eq!(&read_buf, b"jets"); + assert!(!recv_data.cmsgs().is_empty()); Ok(()) } diff --git a/dgram/src/tokio.rs b/dgram/src/tokio.rs index 6d23f04c25..b727938cda 100644 --- a/dgram/src/tokio.rs +++ b/dgram/src/tokio.rs @@ -1,4 +1,4 @@ -use crate::syscalls::RecvData; +use crate::RecvData; use std::io::Result; use std::net::SocketAddr; use std::time::Instant; @@ -44,8 +44,8 @@ pub async fn send_to( #[cfg(target_os = "linux")] pub async fn recv_from( - socket: &UdpSocket, read_buf: &mut [u8], cmsg_space: &mut Vec, - msg_flags: Option, + socket: &UdpSocket, read_buf: &mut [u8], msg_flags: Option, + store_cmsg_settings: &mut StoreCmsgSettings, ) -> Result { loop { // Important to use try_io so that Tokio can clear the socket's readiness @@ -55,8 +55,8 @@ pub async fn recv_from( recv_msg( fd, read_buf, - cmsg_space, msg_flags.unwrap_or(MsgFlags::empty()), + store_cmsg_settings, ) .map_err(Into::into) }); @@ -84,11 +84,5 @@ pub async fn recv_from( ) -> Result { let recv = socket.recv(read_buf).await?; - Ok(RecvData { - bytes: recv, - peer_addr: None, - metrics: None, - gro: None, - rx_time: None, - }) + Ok(RecvData::from_bytes(bytes)) }