Skip to content

Commit 8bc8386

Browse files
Finish async API
1 parent 84e3ff4 commit 8bc8386

File tree

5 files changed

+88
-88
lines changed

5 files changed

+88
-88
lines changed

apps/src/common.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -629,10 +629,8 @@ impl HttpConn for Http09Conn {
629629
s
630630
);
631631

632-
// let body =
633-
// std::fs::read(path.as_path())
634-
// .unwrap_or_else(|_| b"Not Found!\r\n".to_vec());
635-
let body = vec![0; 1_000_000];
632+
let body = std::fs::read(path.as_path())
633+
.unwrap_or_else(|_| b"Not Found!\r\n".to_vec());
636634

637635
info!(
638636
"{} sending response of size {} on stream {}",
@@ -1094,7 +1092,7 @@ impl Http3Conn {
10941092
match std::fs::read(file_path.as_path()) {
10951093
Ok(data) => (200, data),
10961094

1097-
Err(_) => (404, vec![57; 1_000_000]),
1095+
Err(_) => (404, b"Not Found!".to_vec()),
10981096
}
10991097
},
11001098

apps/src/recvfrom.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ pub fn recv_from(
3939
use dgram::RecvMsgSettings;
4040
use std::os::unix::io::AsRawFd;
4141

42-
let mut recvmsg_cmsg_settings = RecvMsgSettings::default();
42+
let mut recvmsg_cmsg_settings = RecvMsgSettings {
43+
store_cmsgs: false,
44+
cmsg_space: &mut vec![],
45+
};
4346
socket.try_io(|| {
4447
let fd =
4548
unsafe { std::os::fd::BorrowedFd::borrow_raw(socket.as_raw_fd()) };

dgram/src/lib.rs

+8-22
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,9 @@ pub struct SendMsgSettings {
3131

3232
/// Settings for handling control messages when receiving data.
3333
#[cfg(target_os = "linux")]
34-
#[derive(Clone)]
35-
pub struct RecvMsgSettings {
36-
// TODO(evanrittenhouse): deprecate store_cmsgs and only store based on what
37-
// cmsg_space can handle.
34+
pub struct RecvMsgSettings<'c> {
3835
/// If cmsgs should be stored when receiving a message. If set, cmsgs will
39-
/// be stored in the `cmsg_space` vector.
36+
/// be stored in the [`RecvData`]'s `cmsgs` field.
4037
pub store_cmsgs: bool,
4138
/// The vector where cmsgs will be stored, if store_cmsgs is set.
4239
///
@@ -45,19 +42,15 @@ pub struct RecvMsgSettings {
4542
/// [`cmsg_space`] macro.
4643
///
4744
/// [`cmsg_space`]: https://docs.rs/nix/latest/nix/macro.cmsg_space.html
48-
pub cmsg_space: Vec<u8>,
49-
/// Flags for [`recvmsg`]. See [MsgFlags] for more.
50-
///
51-
/// [`recvmsg`]: [nix::sys::socket::recvmsg]
52-
pub msg_flags: MsgFlags,
45+
pub cmsg_space: &'c mut Vec<u8>,
5346
}
5447

55-
impl Default for RecvMsgSettings {
56-
fn default() -> Self {
48+
impl<'c> RecvMsgSettings<'c> {
49+
// Convenience to avoid forcing a specific version of nix
50+
pub fn new(store_cmsgs: bool, cmsg_space: &'c mut Vec<u8>) -> Self {
5751
Self {
58-
msg_flags: MsgFlags::empty(),
59-
store_cmsgs: false,
60-
cmsg_space: vec![],
52+
store_cmsgs,
53+
cmsg_space,
6154
}
6255
}
6356
}
@@ -173,10 +166,3 @@ mod linux_imports {
173166
pub(super) use std::net::SocketAddrV6;
174167
pub(super) use std::os::fd::AsRawFd;
175168
}
176-
177-
#[cfg(feature = "async")]
178-
mod async_imports {
179-
pub(super) use std::io::ErrorKind;
180-
pub(super) use tokio::io::Interest;
181-
pub(super) use tokio::net::UdpSocket;
182-
}

dgram/src/syscalls.rs

+11-12
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,10 @@ pub fn send_msg(
8989
///
9090
/// # Note
9191
///
92-
/// It is the caller's responsibility to create the cmsg space. `nix` recommends
93-
/// that the space be created via the `cmsg_space!()` macro. Calling this
94-
/// function will clear the cmsg buffer. It is also the caller's responsibility
95-
/// to set any relevant socket options.
92+
/// It is the caller's responsibility to create and clear the cmsg space.`nix`
93+
/// recommends that the space be created via the `cmsg_space!()` macro. Calling
94+
/// this function will clear the cmsg buffer. It is also the caller's
95+
/// responsibility to set any relevant socket options.
9696
#[cfg(target_os = "linux")]
9797
pub fn recv_msg(
9898
fd: impl AsFd, read_buf: &mut [u8], recvmsg_settings: &mut RecvMsgSettings,
@@ -102,7 +102,6 @@ pub fn recv_msg(
102102
let RecvMsgSettings {
103103
store_cmsgs,
104104
ref mut cmsg_space,
105-
msg_flags,
106105
} = recvmsg_settings;
107106

108107
cmsg_space.clear();
@@ -115,7 +114,7 @@ pub fn recv_msg(
115114
borrowed.as_raw_fd(),
116115
iov_s,
117116
Some(cmsg_space),
118-
*msg_flags,
117+
MsgFlags::empty(),
119118
) {
120119
Ok(r) => {
121120
let bytes = r.bytes;
@@ -137,7 +136,6 @@ pub fn recv_msg(
137136
};
138137

139138
let mut recv_data = RecvData::new(peer_addr, bytes, cmsg_space_len);
140-
141139
for msg in r.cmsgs() {
142140
match msg {
143141
ControlMessageOwned::ScmTimestampns(time) =>
@@ -260,8 +258,10 @@ mod tests {
260258
sendmsg(send.as_raw_fd(), &iov, &[], MsgFlags::empty(), Some(&addr))?;
261259

262260
let mut read_buf = [0; 4];
263-
let recv_data =
264-
recv_msg(recv, &mut read_buf, &mut RecvMsgSettings::default())?;
261+
let recv_data = recv_msg(recv, &mut read_buf, &mut RecvMsgSettings {
262+
store_cmsgs: false,
263+
cmsg_space: &mut vec![],
264+
})?;
265265

266266
assert_eq!(recv_data.bytes, 4);
267267
assert_eq!(&read_buf, b"jets");
@@ -317,11 +317,10 @@ mod tests {
317317
let iov = [IoSlice::new(send_buf)];
318318
sendmsg(send.as_raw_fd(), &iov, &[], MsgFlags::empty(), Some(&addr))?;
319319

320-
let cmsg_space = cmsg_space!(TimeVal);
320+
let mut cmsg_space = cmsg_space!(TimeVal);
321321
let mut recvmsg_settings = RecvMsgSettings {
322322
store_cmsgs: true,
323-
cmsg_space,
324-
msg_flags: MsgFlags::empty(),
323+
cmsg_space: &mut cmsg_space,
325324
};
326325

327326
let mut read_buf = [0; 4];

dgram/src/tokio.rs

+62-48
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
use crate::RecvData;
2+
use std::io::ErrorKind;
23
use std::io::Result;
4+
use std::task::Context;
5+
use std::task::Poll;
36

4-
use crate::async_imports::*;
7+
use tokio::io::Interest;
8+
use tokio::net::UdpSocket;
59

610
#[cfg(target_os = "linux")]
711
mod linux {
@@ -13,64 +17,74 @@ mod linux {
1317
use linux::*;
1418

1519
#[cfg(target_os = "linux")]
16-
pub async fn send_to(
17-
socket: &UdpSocket, send_buf: &[u8], send_msg_settings: SendMsgCmsgSettings,
18-
) -> Result<usize> {
20+
pub fn poll_send_to(
21+
socket: &UdpSocket, ctx: &mut Context<'_>, send_buf: &[u8],
22+
sendmsg_settings: SendMsgSettings,
23+
) -> Poll<Result<usize>> {
1924
loop {
20-
// Important to use try_io so that Tokio can clear the socket's readiness
21-
// flag
22-
let res = socket.try_io(Interest::WRITABLE, || {
23-
let fd = socket.as_fd();
24-
send_msg(fd, send_buf, send_msg_settings).map_err(Into::into)
25-
});
26-
27-
match res {
28-
Err(e) if e.kind() == ErrorKind::WouldBlock =>
29-
socket.writable().await?,
30-
res => return res,
25+
match socket.poll_send_ready(ctx) {
26+
Poll::Ready(Ok(())) => {
27+
// Important to use try_io so that Tokio can clear the socket's
28+
// readiness flag
29+
match socket.try_io(Interest::WRITABLE, || {
30+
let fd = socket.as_fd();
31+
send_msg(fd, send_buf, sendmsg_settings).map_err(Into::into)
32+
}) {
33+
Err(e) if e.kind() == ErrorKind::WouldBlock => {},
34+
io_res => break Poll::Ready(io_res),
35+
}
36+
},
37+
Poll::Ready(Err(e)) => break Poll::Ready(Err(e)),
38+
Poll::Pending => break Poll::Pending,
3139
}
3240
}
3341
}
3442

3543
#[cfg(target_os = "linux")]
36-
pub async fn recv_from(
37-
socket: &UdpSocket, read_buf: &mut [u8], msg_flags: Option<MsgFlags>,
38-
store_cmsg_settings: &mut RecvMsgCmsgSettings,
39-
) -> Result<RecvData> {
40-
loop {
41-
// Important to use try_io so that Tokio can clear the socket's readiness
42-
// flag
43-
let res = socket.try_io(Interest::READABLE, || {
44-
let fd = socket.as_fd();
45-
recv_msg(
46-
fd,
47-
read_buf,
48-
msg_flags.unwrap_or(MsgFlags::empty()),
49-
store_cmsg_settings,
50-
)
51-
.map_err(Into::into)
52-
});
44+
pub async fn send_to(
45+
socket: &UdpSocket, send_buf: &[u8], sendmsg_settings: SendMsgSettings,
46+
) -> Result<usize> {
47+
std::future::poll_fn(|mut cx| {
48+
poll_send_to(socket, &mut cx, send_buf, sendmsg_settings)
49+
})
50+
.await
51+
}
5352

54-
match res {
55-
Err(e) if e.kind() == ErrorKind::WouldBlock =>
56-
socket.readable().await?,
57-
_ => return res,
53+
#[cfg(target_os = "linux")]
54+
pub fn poll_recv_from(
55+
socket: &UdpSocket, ctx: &mut Context<'_>, recv_buf: &mut [u8],
56+
recvmsg_settings: &mut RecvMsgSettings,
57+
) -> Poll<Result<RecvData>> {
58+
loop {
59+
match socket.poll_recv_ready(ctx) {
60+
Poll::Ready(Ok(())) => {
61+
// Important to use try_io so that Tokio can clear the socket's
62+
// readiness flag
63+
match socket.try_io(Interest::READABLE, || {
64+
let fd = socket.as_fd();
65+
recv_msg(fd, recv_buf, recvmsg_settings).map_err(Into::into)
66+
}) {
67+
// The `poll_recv_ready` future registers the ctx with Tokio.
68+
// We can only return Pending when that
69+
// future is Pending or we won't wake the
70+
// runtime properly
71+
Err(e) if e.kind() == ErrorKind::WouldBlock => {},
72+
io_res => break Poll::Ready(io_res),
73+
}
74+
},
75+
Poll::Ready(Err(e)) => break Poll::Ready(Err(e)),
76+
Poll::Pending => break Poll::Pending,
5877
}
5978
}
6079
}
6180

62-
#[cfg(not(target_os = "linux"))]
63-
pub async fn send_to(
64-
socket: &UdpSocket, client_addr: SocketAddr,
65-
) -> Result<usize> {
66-
socket.send_to(send_buf, client_addr).await
67-
}
68-
69-
#[cfg(not(target_os = "linux"))]
81+
#[cfg(target_os = "linux")]
7082
pub async fn recv_from(
71-
socket: &UdpSocket, read_buf: &mut [u8],
83+
socket: &UdpSocket, recv_buf: &mut [u8],
84+
recvmsg_settings: &mut RecvMsgSettings<'_>,
7285
) -> Result<RecvData> {
73-
let recv = socket.recv(read_buf).await?;
74-
75-
Ok(RecvData::from_bytes(bytes))
86+
std::future::poll_fn(|mut ctx| {
87+
poll_recv_from(socket, &mut ctx, recv_buf, recvmsg_settings)
88+
})
89+
.await
7690
}

0 commit comments

Comments
 (0)