Skip to content

Commit

Permalink
More docs and test cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
evanrittenhouse committed Jun 12, 2024
1 parent 4bc7d91 commit c611b61
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 133 deletions.
4 changes: 3 additions & 1 deletion dgram/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use super::linux_imports::*;
pub fn send_to(
fd: &impl AsFd, send_buf: &[u8], sendmsg_settings: SendMsgSettings,
) -> Result<usize> {
// TODO: separate mio module that uses try_io?
// TODO: separate mio module that uses try_io? This works for stateless
// polling e.g. epoll/kqueue, but stateful polling (select(), poll()) will
// require re-registering the socket after an event. try_io() does that for us
let sent = send_msg(fd, send_buf, sendmsg_settings);

match sent {
Expand Down
188 changes: 56 additions & 132 deletions dgram/src/syscalls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ fn raw_send_to(
}

/// GSO-compatible convenience wrapper for the `sendmsg` syscall.
///
/// It is the caller's responsibility to set any relevant socket options.
#[cfg(target_os = "linux")]
pub fn send_msg(
fd: impl AsFd, send_buf: &[u8], send_msg_cmsg_settings: SendMsgSettings,
fd: impl AsFd, send_buf: &[u8], send_msg_settings: SendMsgSettings,
) -> SyscallResult<usize> {
use crate::IpPktInfo;

Expand All @@ -45,7 +47,7 @@ pub fn send_msg(
tx_time,
dst,
pkt_info,
} = send_msg_cmsg_settings;
} = send_msg_settings;

let raw_time = tx_time
.map(|t| t.duration_since(INSTANT_ZERO).as_nanos() as u64)
Expand All @@ -58,8 +60,7 @@ pub fn send_msg(
cmsgs.push(ControlMessage::UdpGsoSegments(ss));
}

let now = Instant::now();
if tx_time.filter(|t| t > &now).is_some() {
if tx_time.filter(|t| *t > Instant::now()).is_some() {
// Create cmsg for TXTIME.
cmsgs.push(ControlMessage::TxTime(&raw_time));
}
Expand Down Expand Up @@ -88,19 +89,21 @@ pub fn send_msg(
///
/// # Note
///
/// It is the caller's responsibility to create and clear the cmsg space. `nix`
/// recommends that the space be created via the `cmsg_space!()` macro.
/// It is the caller's responsibility to create the cmsg space. `nix` recommends
/// that the space be created via the `cmsg_space!()` macro. Calling this
/// function will clear the cmsg buffer. It is also the caller's responsibility
/// to set any relevant socket options.
#[cfg(target_os = "linux")]
pub fn recv_msg(
fd: impl AsFd, read_buf: &mut [u8], msg_flags: MsgFlags,
handle_cmsg_settings: &mut RecvMsgSettings,
recvmsg_settings: &mut RecvMsgSettings,
) -> SyscallResult<RecvData> {
use crate::IpOrigDstAddr;

let RecvMsgSettings {
store_cmsgs,
ref mut cmsg_space,
} = handle_cmsg_settings;
} = recvmsg_settings;

cmsg_space.clear();

Expand Down Expand Up @@ -167,20 +170,6 @@ pub fn recv_msg(
}
}

// TODO: deletable depending on how the txtime fix works out
fn std_time_to_u64(time: &Instant) -> u64 {
const NANOS_PER_SEC: u64 = 1_000_000_000;
const INSTANT_ZERO: std::time::Instant =
unsafe { std::mem::transmute(std::time::UNIX_EPOCH) };

let raw_time = time.duration_since(INSTANT_ZERO);

let sec = raw_time.as_secs();
let nsec = raw_time.subsec_nanos();

sec * NANOS_PER_SEC + nsec as u64
}

#[cfg(all(test, target_os = "linux", not(target_os = "android")))]
mod tests {
use nix::cmsg_space;
Expand All @@ -190,13 +179,12 @@ mod tests {
use nix::sys::time::TimeVal;
use std::io::IoSliceMut;
use std::io::Result;
use std::net::SocketAddr;
use std::os::fd::OwnedFd;
use std::str::FromStr;

use super::*;

const UDP_MAX_GSO_PACKET_SIZE: u16 = 65507;

fn new_sockets() -> Result<(OwnedFd, OwnedFd)> {
let recv = socket(
AddressFamily::Inet,
Expand All @@ -205,10 +193,8 @@ mod tests {
None,
)
.unwrap();
setsockopt(&recv, ReceiveTimestampns, &true)?;
setsockopt(&recv, UdpGroSegment, &true)?;
let localhost = SockaddrIn::from_str("127.0.0.1:0").unwrap();
bind(recv.as_raw_fd(), &localhost).unwrap();
let recv_addr = SockaddrIn::from_str("127.0.0.1:0").unwrap();
bind(recv.as_raw_fd(), &recv_addr).unwrap();

let send = socket(
AddressFamily::Inet,
Expand All @@ -217,13 +203,13 @@ mod tests {
None,
)
.unwrap();
connect(send.as_raw_fd(), &localhost).unwrap();
connect(send.as_raw_fd(), &recv_addr).unwrap();

Ok((send, recv))
}

fn fd_to_socket_addr(fd: &impl AsRawFd) -> Option<SocketAddr> {
SocketAddr::from_str(
fn fd_to_socket_addr(fd: &impl AsRawFd) -> Option<SocketAddrV4> {
SocketAddrV4::from_str(
&getsockname::<SockaddrStorage>(fd.as_raw_fd())
.unwrap()
.to_string(),
Expand All @@ -232,17 +218,18 @@ mod tests {
}

#[test]
fn send_to_simple() -> Result<()> {
fn send_msg_simple() -> Result<()> {
let (send, recv) = new_sockets()?;
let send_buf = b"njd";
let addr = fd_to_socket_addr(&recv);

send_msg(send, send_buf, SendMsgSettings {
segment_size: Some(UDP_MAX_GSO_PACKET_SIZE),
let sent = send_msg(send, send_buf, SendMsgSettings {
segment_size: None,
tx_time: None,
dst: addr,
dst: Some(SocketAddr::V4(addr.unwrap())),
pkt_info: None,
})?;
assert_eq!(sent, send_buf.len());

let mut buf = [0; 3];
let mut read_buf = [IoSliceMut::new(&mut buf)];
Expand All @@ -264,146 +251,78 @@ mod tests {
}

#[test]
fn send_to_invalid_tx_time() -> Result<()> {
fn recv_msg_simple() -> Result<()> {
let (send, recv) = new_sockets()?;
let addr = fd_to_socket_addr(&recv);
let addr = getsockname::<SockaddrStorage>(recv.as_raw_fd()).unwrap();

let send_buf = b"nyj";
send_msg(send, send_buf, SendMsgSettings {
segment_size: Some(UDP_MAX_GSO_PACKET_SIZE),
// Invalid because we'll have already passed Instant::now by the time
// we send the message
tx_time: Some(Instant::now()),
dst: addr,
pkt_info: None,
})?;
let send_buf = b"jets";
let iov = [IoSlice::new(send_buf)];
sendmsg(send.as_raw_fd(), &iov, &[], MsgFlags::empty(), Some(&addr))?;

let mut buf = [0; 3];
let mut read_buf = [IoSliceMut::new(&mut buf)];
let recv = recvmsg::<()>(
recv.as_raw_fd(),
let mut read_buf = [0; 4];
let recv_data = recv_msg(
recv,
&mut read_buf,
None,
MsgFlags::empty(),
)
.unwrap();
&mut RecvMsgSettings::default(),
)?;

assert_eq!(recv.bytes, 3);
assert_eq!(
String::from_utf8(buf.to_vec()).unwrap().as_bytes(),
send_buf
);
assert_eq!(recv_data.bytes, 4);
assert_eq!(&read_buf, b"jets");
assert!(recv_data.cmsgs().is_empty());

Ok(())
}

#[test]
fn send_to_multiple_segments() -> Result<()> {
let (send, recv) = new_sockets()?;
let addr = fd_to_socket_addr(&recv);
// TODO: determine why this has to be set before sendmsg
setsockopt(&recv, UdpGroSegment, &true).expect("couldn't set UDP_GRO");

let addr = fd_to_socket_addr(&recv);
let send_buf = b"devils";
send_msg(send, send_buf, SendMsgSettings {
let sent = send_msg(send, send_buf, SendMsgSettings {
segment_size: Some(1),
tx_time: None,
dst: addr,
dst: Some(SocketAddr::V4(addr.unwrap())),
pkt_info: None,
})?;
assert_eq!(sent, send_buf.len());

let mut buf = [0; 6];
let mut read_buf = [IoSliceMut::new(&mut buf)];
let mut x = cmsg_space!(u32);
let recv = recvmsg::<()>(
let mut cmsgs = cmsg_space!(i32);
let recv = recvmsg::<SockaddrStorage>(
recv.as_raw_fd(),
&mut read_buf,
Some(&mut x),
Some(&mut cmsgs),
MsgFlags::empty(),
)
.unwrap();

assert_eq!(recv.bytes, 6);
assert_eq!(
String::from_utf8(buf.to_vec()).unwrap().as_bytes(),
send_buf
);
// TODO: determine why no cmsg shows up
assert!(cmsgs.is_empty());

Ok(())
}

#[test]
fn send_to_control_messages() -> Result<()> {
fn recvfrom_cmsgs() -> Result<()> {
let (send, recv) = new_sockets()?;
let addr = fd_to_socket_addr(&recv);

let send_buf = b"nyj";

send_msg(send, send_buf, SendMsgSettings {
segment_size: None,
tx_time: Some(Instant::now() + std::time::Duration::from_secs(5)),
dst: addr,
pkt_info: None,
})?;

let mut buf = [0; 3];
let mut read_buf = [IoSliceMut::new(&mut buf)];

let mut cmsg_space = cmsg_space!(TimeVal);
{
let recv = recvmsg::<SockaddrStorage>(
recv.as_raw_fd(),
&mut read_buf,
Some(&mut cmsg_space),
MsgFlags::empty(),
)
.unwrap();

assert_eq!(recv.bytes, 3);
}

assert!(!cmsg_space.is_empty());
assert_eq!(
String::from_utf8(buf.to_vec()).unwrap().as_bytes(),
send_buf
);

Ok(())
}

#[test]
fn recv_from_simple() -> Result<()> {
let (send, recv) = new_sockets()?;
let addr = getsockname::<SockaddrStorage>(recv.as_raw_fd()).unwrap();

let send_buf = b"jets";
let iov = [IoSlice::new(send_buf)];
sendmsg(send.as_raw_fd(), &iov, &[], MsgFlags::empty(), Some(&addr))?;

let mut read_buf = [0; 4];
let recv_data = recv_msg(
recv,
&mut read_buf,
MsgFlags::empty(),
&mut RecvMsgSettings::default(),
)?;

assert_eq!(recv_data.bytes, 4);
assert_eq!(&read_buf, b"jets");
assert!(recv_data.cmsgs().is_empty());

Ok(())
}
setsockopt(&recv, ReceiveTimestampns, &true)?;

#[test]
fn recv_from_cmsgs() -> Result<()> {
let (send, recv) = new_sockets()?;
let addr = getsockname::<SockaddrStorage>(recv.as_raw_fd()).unwrap();

let send_buf = b"jets";
let iov = [IoSlice::new(send_buf)];
sendmsg(send.as_raw_fd(), &iov, &[], MsgFlags::empty(), Some(&addr))?;

let cmsg_space = cmsg_space!(TimeVal);
let mut store_cmsg_settings = RecvMsgSettings {
let mut recvmsg_settings = RecvMsgSettings {
store_cmsgs: true,
cmsg_space,
};
Expand All @@ -413,12 +332,17 @@ mod tests {
recv,
&mut read_buf,
MsgFlags::empty(),
&mut store_cmsg_settings,
&mut recvmsg_settings,
)?;

assert_eq!(recv_data.bytes, 4);
assert_eq!(&read_buf, b"jets");
assert!(!recv_data.cmsgs().is_empty());

assert_eq!(recv_data.cmsgs().len(), 1);
match recv_data.cmsgs()[0] {
ControlMessageOwned::ScmTimestampns(_) => {},
_ => panic!("invalid cmsg received"),
};

Ok(())
}
Expand Down

0 comments on commit c611b61

Please sign in to comment.