Skip to content

Commit 01c32f1

Browse files
Wrapping up async API
1 parent 6c869ba commit 01c32f1

File tree

3 files changed

+65
-59
lines changed

3 files changed

+65
-59
lines changed

dgram/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ edition = "2021"
88

99
[dependencies]
1010
libc = "0.2.76"
11-
nix = { version = "0.27", features = ["net", "socket", "uio"] }
11+
nix = { version = "0.26.2 features = ["net", "socket", "uio"] }
1212
smallvec = { version = "1.10", features = ["union"] }
1313
tokio = { version = "1.29", features = ["full", "test-util"], optional = true }

dgram/src/lib.rs

-7
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,3 @@ mod linux_imports {
173173
pub(super) use std::net::SocketAddrV6;
174174
pub(super) use std::os::fd::AsRawFd;
175175
}
176-
177-
#[cfg(feature = "async")]
178-
mod async_imports {
179-
pub(super) use std::io::ErrorKind;
180-
pub(super) use tokio::io::Interest;
181-
pub(super) use tokio::net::UdpSocket;
182-
}

dgram/src/tokio.rs

+64-51
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
use crate::RecvData;
2+
use std::io::ErrorKind;
23
use std::io::Result;
4+
use std::task::Context;
5+
use std::task::Poll;
36

4-
use crate::async_imports::*;
7+
use tokio::io::Interest;
8+
use tokio::net::UdpSocket;
59

610
#[cfg(target_os = "linux")]
711
mod linux {
@@ -13,64 +17,73 @@ mod linux {
1317
use linux::*;
1418

1519
#[cfg(target_os = "linux")]
16-
pub async fn send_to(
17-
socket: &UdpSocket, send_buf: &[u8], send_msg_settings: SendMsgCmsgSettings,
18-
) -> Result<usize> {
19-
loop {
20-
// Important to use try_io so that Tokio can clear the socket's readiness
21-
// flag
22-
let res = socket.try_io(Interest::WRITABLE, || {
23-
let fd = socket.as_fd();
24-
send_msg(fd, send_buf, send_msg_settings).map_err(Into::into)
25-
});
26-
27-
match res {
28-
Err(e) if e.kind() == ErrorKind::WouldBlock =>
29-
socket.writable().await?,
30-
res => return res,
31-
}
20+
pub fn poll_send_to(
21+
socket: &UdpSocket, ctx: &mut Context<'_>, send_buf: &[u8],
22+
sendmsg_settings: SendMsgSettings,
23+
) -> Poll<Result<usize>> {
24+
// We manually poll the socket here to register interest in
25+
// Writable socket events for the given `ctx`.
26+
// Under the hood, tokio's implementation just checks for
27+
// EWOULDBLOCK and, if the socket is busy, registers the provided
28+
// waker to be invoked when the socket is free.
29+
match socket.poll_send_ready(ctx) {
30+
Poll::Ready(Ok(())) => {
31+
// Important to use try_io so that Tokio can clear the socket's
32+
// readiness flag
33+
match socket.try_io(Interest::WRITABLE, || {
34+
let fd = socket.as_fd();
35+
send_msg(fd, send_buf, sendmsg_settings).map_err(Into::into)
36+
}) {
37+
Ok(n) => Poll::Ready(Ok(n)),
38+
Err(e) if e.kind() == ErrorKind::WouldBlock => Poll::Pending,
39+
Err(e) => Poll::Ready(Err(e)),
40+
}
41+
},
42+
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
43+
Poll::Pending => Poll::Pending,
3244
}
3345
}
3446

3547
#[cfg(target_os = "linux")]
36-
pub async fn recv_from(
37-
socket: &UdpSocket, read_buf: &mut [u8], msg_flags: Option<MsgFlags>,
38-
store_cmsg_settings: &mut RecvMsgCmsgSettings,
39-
) -> Result<RecvData> {
40-
loop {
41-
// Important to use try_io so that Tokio can clear the socket's readiness
42-
// flag
43-
let res = socket.try_io(Interest::READABLE, || {
44-
let fd = socket.as_fd();
45-
recv_msg(
46-
fd,
47-
read_buf,
48-
msg_flags.unwrap_or(MsgFlags::empty()),
49-
store_cmsg_settings,
50-
)
51-
.map_err(Into::into)
52-
});
53-
54-
match res {
55-
Err(e) if e.kind() == ErrorKind::WouldBlock =>
56-
socket.readable().await?,
57-
_ => return res,
58-
}
59-
}
60-
}
61-
62-
#[cfg(not(target_os = "linux"))]
6348
pub async fn send_to(
64-
socket: &UdpSocket, client_addr: SocketAddr,
49+
socket: &UdpSocket, send_buf: &[u8], sendmsg_settings: SendMsgSettings,
6550
) -> Result<usize> {
66-
socket.send_to(send_buf, client_addr).await
51+
std::future::poll_fn(|mut cx| {
52+
poll_send_to(socket, &mut cx, send_buf, sendmsg_settings)
53+
})
54+
.await
6755
}
6856

69-
#[cfg(not(target_os = "linux"))]
57+
#[cfg(target_os = "linux")]
58+
pub fn poll_recv_from(
59+
socket: &UdpSocket, ctx: &mut Context<'_>, recv_buf: &mut [u8],
60+
recvmsg_settings: &mut RecvMsgSettings,
61+
) -> Poll<Result<RecvData>> {
62+
match socket.poll_recv_ready(ctx) {
63+
Poll::Ready(Ok(())) => {
64+
// Important to use try_io so that Tokio can clear the socket's
65+
// readiness flag
66+
match socket.try_io(Interest::READABLE, || {
67+
let fd = socket.as_fd();
68+
recv_msg(fd, recv_buf, recvmsg_settings).map_err(Into::into)
69+
}) {
70+
Ok(n) => Poll::Ready(Ok(n)),
71+
Err(e) if e.kind() == ErrorKind::WouldBlock => Poll::Pending,
72+
Err(e) => Poll::Ready(Err(e)),
73+
}
74+
},
75+
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
76+
Poll::Pending => Poll::Pending,
77+
}
78+
}
79+
80+
#[cfg(target_os = "linux")]
7081
pub async fn recv_from(
71-
socket: &UdpSocket, read_buf: &mut [u8],
82+
socket: &UdpSocket, recv_buf: &mut [u8],
83+
recvmsg_settings: &mut RecvMsgSettings,
7284
) -> Result<RecvData> {
73-
let recv = socket.recv(read_buf).await?;
74-
75-
Ok(RecvData::from_bytes(bytes))
85+
std::future::poll_fn(|mut ctx| {
86+
poll_recv_from(socket, &mut ctx, recv_buf, recvmsg_settings)
87+
})
88+
.await
7689
}

0 commit comments

Comments
 (0)