From c611b61ff980a392428b017c9a6dce2b1224815b Mon Sep 17 00:00:00 2001 From: Evan Rittenhouse Date: Wed, 12 Jun 2024 13:13:21 -0500 Subject: [PATCH] More docs and test cleanup --- dgram/src/sync.rs | 4 +- dgram/src/syscalls.rs | 188 +++++++++++++----------------------------- 2 files changed, 59 insertions(+), 133 deletions(-) diff --git a/dgram/src/sync.rs b/dgram/src/sync.rs index 5a2426360b..c1a4dd48c7 100644 --- a/dgram/src/sync.rs +++ b/dgram/src/sync.rs @@ -11,7 +11,9 @@ use super::linux_imports::*; pub fn send_to( fd: &impl AsFd, send_buf: &[u8], sendmsg_settings: SendMsgSettings, ) -> Result { - // TODO: separate mio module that uses try_io? + // TODO: separate mio module that uses try_io? This works for stateless + // polling e.g. epoll/kqueue, but stateful polling (select(), poll()) will + // require re-registering the socket after an event. try_io() does that for us let sent = send_msg(fd, send_buf, sendmsg_settings); match sent { diff --git a/dgram/src/syscalls.rs b/dgram/src/syscalls.rs index d707b54f6f..a9da25c450 100644 --- a/dgram/src/syscalls.rs +++ b/dgram/src/syscalls.rs @@ -34,9 +34,11 @@ fn raw_send_to( } /// GSO-compatible convenience wrapper for the `sendmsg` syscall. +/// +/// It is the caller's responsibility to set any relevant socket options. #[cfg(target_os = "linux")] pub fn send_msg( - fd: impl AsFd, send_buf: &[u8], send_msg_cmsg_settings: SendMsgSettings, + fd: impl AsFd, send_buf: &[u8], send_msg_settings: SendMsgSettings, ) -> SyscallResult { use crate::IpPktInfo; @@ -45,7 +47,7 @@ pub fn send_msg( tx_time, dst, pkt_info, - } = send_msg_cmsg_settings; + } = send_msg_settings; let raw_time = tx_time .map(|t| t.duration_since(INSTANT_ZERO).as_nanos() as u64) @@ -58,8 +60,7 @@ pub fn send_msg( cmsgs.push(ControlMessage::UdpGsoSegments(ss)); } - let now = Instant::now(); - if tx_time.filter(|t| t > &now).is_some() { + if tx_time.filter(|t| *t > Instant::now()).is_some() { // Create cmsg for TXTIME. cmsgs.push(ControlMessage::TxTime(&raw_time)); } @@ -88,19 +89,21 @@ pub fn send_msg( /// /// # Note /// -/// It is the caller's responsibility to create and clear the cmsg space. `nix` -/// recommends that the space be created via the `cmsg_space!()` macro. +/// It is the caller's responsibility to create the cmsg space. `nix` recommends +/// that the space be created via the `cmsg_space!()` macro. Calling this +/// function will clear the cmsg buffer. It is also the caller's responsibility +/// to set any relevant socket options. #[cfg(target_os = "linux")] pub fn recv_msg( fd: impl AsFd, read_buf: &mut [u8], msg_flags: MsgFlags, - handle_cmsg_settings: &mut RecvMsgSettings, + recvmsg_settings: &mut RecvMsgSettings, ) -> SyscallResult { use crate::IpOrigDstAddr; let RecvMsgSettings { store_cmsgs, ref mut cmsg_space, - } = handle_cmsg_settings; + } = recvmsg_settings; cmsg_space.clear(); @@ -167,20 +170,6 @@ pub fn recv_msg( } } -// TODO: deletable depending on how the txtime fix works out -fn std_time_to_u64(time: &Instant) -> u64 { - const NANOS_PER_SEC: u64 = 1_000_000_000; - const INSTANT_ZERO: std::time::Instant = - unsafe { std::mem::transmute(std::time::UNIX_EPOCH) }; - - let raw_time = time.duration_since(INSTANT_ZERO); - - let sec = raw_time.as_secs(); - let nsec = raw_time.subsec_nanos(); - - sec * NANOS_PER_SEC + nsec as u64 -} - #[cfg(all(test, target_os = "linux", not(target_os = "android")))] mod tests { use nix::cmsg_space; @@ -190,13 +179,12 @@ mod tests { use nix::sys::time::TimeVal; use std::io::IoSliceMut; use std::io::Result; + use std::net::SocketAddr; use std::os::fd::OwnedFd; use std::str::FromStr; use super::*; - const UDP_MAX_GSO_PACKET_SIZE: u16 = 65507; - fn new_sockets() -> Result<(OwnedFd, OwnedFd)> { let recv = socket( AddressFamily::Inet, @@ -205,10 +193,8 @@ mod tests { None, ) .unwrap(); - setsockopt(&recv, ReceiveTimestampns, &true)?; - setsockopt(&recv, UdpGroSegment, &true)?; - let localhost = SockaddrIn::from_str("127.0.0.1:0").unwrap(); - bind(recv.as_raw_fd(), &localhost).unwrap(); + let recv_addr = SockaddrIn::from_str("127.0.0.1:0").unwrap(); + bind(recv.as_raw_fd(), &recv_addr).unwrap(); let send = socket( AddressFamily::Inet, @@ -217,13 +203,13 @@ mod tests { None, ) .unwrap(); - connect(send.as_raw_fd(), &localhost).unwrap(); + connect(send.as_raw_fd(), &recv_addr).unwrap(); Ok((send, recv)) } - fn fd_to_socket_addr(fd: &impl AsRawFd) -> Option { - SocketAddr::from_str( + fn fd_to_socket_addr(fd: &impl AsRawFd) -> Option { + SocketAddrV4::from_str( &getsockname::(fd.as_raw_fd()) .unwrap() .to_string(), @@ -232,17 +218,18 @@ mod tests { } #[test] - fn send_to_simple() -> Result<()> { + fn send_msg_simple() -> Result<()> { let (send, recv) = new_sockets()?; let send_buf = b"njd"; let addr = fd_to_socket_addr(&recv); - send_msg(send, send_buf, SendMsgSettings { - segment_size: Some(UDP_MAX_GSO_PACKET_SIZE), + let sent = send_msg(send, send_buf, SendMsgSettings { + segment_size: None, tx_time: None, - dst: addr, + dst: Some(SocketAddr::V4(addr.unwrap())), pkt_info: None, })?; + assert_eq!(sent, send_buf.len()); let mut buf = [0; 3]; let mut read_buf = [IoSliceMut::new(&mut buf)]; @@ -264,35 +251,25 @@ mod tests { } #[test] - fn send_to_invalid_tx_time() -> Result<()> { + fn recv_msg_simple() -> Result<()> { let (send, recv) = new_sockets()?; - let addr = fd_to_socket_addr(&recv); + let addr = getsockname::(recv.as_raw_fd()).unwrap(); - let send_buf = b"nyj"; - send_msg(send, send_buf, SendMsgSettings { - segment_size: Some(UDP_MAX_GSO_PACKET_SIZE), - // Invalid because we'll have already passed Instant::now by the time - // we send the message - tx_time: Some(Instant::now()), - dst: addr, - pkt_info: None, - })?; + let send_buf = b"jets"; + let iov = [IoSlice::new(send_buf)]; + sendmsg(send.as_raw_fd(), &iov, &[], MsgFlags::empty(), Some(&addr))?; - let mut buf = [0; 3]; - let mut read_buf = [IoSliceMut::new(&mut buf)]; - let recv = recvmsg::<()>( - recv.as_raw_fd(), + let mut read_buf = [0; 4]; + let recv_data = recv_msg( + recv, &mut read_buf, - None, MsgFlags::empty(), - ) - .unwrap(); + &mut RecvMsgSettings::default(), + )?; - assert_eq!(recv.bytes, 3); - assert_eq!( - String::from_utf8(buf.to_vec()).unwrap().as_bytes(), - send_buf - ); + assert_eq!(recv_data.bytes, 4); + assert_eq!(&read_buf, b"jets"); + assert!(recv_data.cmsgs().is_empty()); Ok(()) } @@ -300,110 +277,52 @@ mod tests { #[test] fn send_to_multiple_segments() -> Result<()> { let (send, recv) = new_sockets()?; - let addr = fd_to_socket_addr(&recv); + // TODO: determine why this has to be set before sendmsg + setsockopt(&recv, UdpGroSegment, &true).expect("couldn't set UDP_GRO"); + let addr = fd_to_socket_addr(&recv); let send_buf = b"devils"; - send_msg(send, send_buf, SendMsgSettings { + let sent = send_msg(send, send_buf, SendMsgSettings { segment_size: Some(1), tx_time: None, - dst: addr, + dst: Some(SocketAddr::V4(addr.unwrap())), pkt_info: None, })?; + assert_eq!(sent, send_buf.len()); let mut buf = [0; 6]; let mut read_buf = [IoSliceMut::new(&mut buf)]; - let mut x = cmsg_space!(u32); - let recv = recvmsg::<()>( + let mut cmsgs = cmsg_space!(i32); + let recv = recvmsg::( recv.as_raw_fd(), &mut read_buf, - Some(&mut x), + Some(&mut cmsgs), MsgFlags::empty(), ) .unwrap(); - assert_eq!(recv.bytes, 6); assert_eq!( String::from_utf8(buf.to_vec()).unwrap().as_bytes(), send_buf ); + // TODO: determine why no cmsg shows up + assert!(cmsgs.is_empty()); Ok(()) } #[test] - fn send_to_control_messages() -> Result<()> { + fn recvfrom_cmsgs() -> Result<()> { let (send, recv) = new_sockets()?; - let addr = fd_to_socket_addr(&recv); - - let send_buf = b"nyj"; - - send_msg(send, send_buf, SendMsgSettings { - segment_size: None, - tx_time: Some(Instant::now() + std::time::Duration::from_secs(5)), - dst: addr, - pkt_info: None, - })?; - - let mut buf = [0; 3]; - let mut read_buf = [IoSliceMut::new(&mut buf)]; - - let mut cmsg_space = cmsg_space!(TimeVal); - { - let recv = recvmsg::( - recv.as_raw_fd(), - &mut read_buf, - Some(&mut cmsg_space), - MsgFlags::empty(), - ) - .unwrap(); - - assert_eq!(recv.bytes, 3); - } - - assert!(!cmsg_space.is_empty()); - assert_eq!( - String::from_utf8(buf.to_vec()).unwrap().as_bytes(), - send_buf - ); - - Ok(()) - } - - #[test] - fn recv_from_simple() -> Result<()> { - let (send, recv) = new_sockets()?; - let addr = getsockname::(recv.as_raw_fd()).unwrap(); - - let send_buf = b"jets"; - let iov = [IoSlice::new(send_buf)]; - sendmsg(send.as_raw_fd(), &iov, &[], MsgFlags::empty(), Some(&addr))?; - - let mut read_buf = [0; 4]; - let recv_data = recv_msg( - recv, - &mut read_buf, - MsgFlags::empty(), - &mut RecvMsgSettings::default(), - )?; - - assert_eq!(recv_data.bytes, 4); - assert_eq!(&read_buf, b"jets"); - assert!(recv_data.cmsgs().is_empty()); - - Ok(()) - } + setsockopt(&recv, ReceiveTimestampns, &true)?; - #[test] - fn recv_from_cmsgs() -> Result<()> { - let (send, recv) = new_sockets()?; let addr = getsockname::(recv.as_raw_fd()).unwrap(); - let send_buf = b"jets"; let iov = [IoSlice::new(send_buf)]; sendmsg(send.as_raw_fd(), &iov, &[], MsgFlags::empty(), Some(&addr))?; let cmsg_space = cmsg_space!(TimeVal); - let mut store_cmsg_settings = RecvMsgSettings { + let mut recvmsg_settings = RecvMsgSettings { store_cmsgs: true, cmsg_space, }; @@ -413,12 +332,17 @@ mod tests { recv, &mut read_buf, MsgFlags::empty(), - &mut store_cmsg_settings, + &mut recvmsg_settings, )?; assert_eq!(recv_data.bytes, 4); assert_eq!(&read_buf, b"jets"); - assert!(!recv_data.cmsgs().is_empty()); + + assert_eq!(recv_data.cmsgs().len(), 1); + match recv_data.cmsgs()[0] { + ControlMessageOwned::ScmTimestampns(_) => {}, + _ => panic!("invalid cmsg received"), + }; Ok(()) }