Skip to content

Commit

Permalink
Wrapping up async API
Browse files Browse the repository at this point in the history
  • Loading branch information
evanrittenhouse committed Jun 14, 2024
1 parent 6c869ba commit ee70777
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 58 deletions.
7 changes: 0 additions & 7 deletions dgram/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,3 @@ mod linux_imports {
pub(super) use std::net::SocketAddrV6;
pub(super) use std::os::fd::AsRawFd;
}

#[cfg(feature = "async")]
mod async_imports {
pub(super) use std::io::ErrorKind;
pub(super) use tokio::io::Interest;
pub(super) use tokio::net::UdpSocket;
}
115 changes: 64 additions & 51 deletions dgram/src/tokio.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use crate::RecvData;
use std::io::ErrorKind;
use std::io::Result;
use std::task::Context;
use std::task::Poll;

use crate::async_imports::*;
use tokio::io::Interest;
use tokio::net::UdpSocket;

#[cfg(target_os = "linux")]
mod linux {
Expand All @@ -13,64 +17,73 @@ mod linux {
use linux::*;

#[cfg(target_os = "linux")]
pub async fn send_to(
socket: &UdpSocket, send_buf: &[u8], send_msg_settings: SendMsgCmsgSettings,
) -> Result<usize> {
loop {
// Important to use try_io so that Tokio can clear the socket's readiness
// flag
let res = socket.try_io(Interest::WRITABLE, || {
let fd = socket.as_fd();
send_msg(fd, send_buf, send_msg_settings).map_err(Into::into)
});

match res {
Err(e) if e.kind() == ErrorKind::WouldBlock =>
socket.writable().await?,
res => return res,
}
pub fn poll_send_to(
socket: &UdpSocket, ctx: &mut Context<'_>, send_buf: &[u8],
sendmsg_settings: SendMsgSettings,
) -> Poll<Result<usize>> {
// We manually poll the socket here to register interest in
// Writable socket events for the given `ctx`.
// Under the hood, tokio's implementation just checks for
// EWOULDBLOCK and, if the socket is busy, registers the provided
// waker to be invoked when the socket is free.
match socket.poll_send_ready(ctx) {
Poll::Ready(Ok(())) => {
// Important to use try_io so that Tokio can clear the socket's
// readiness flag
match socket.try_io(Interest::WRITABLE, || {
let fd = socket.as_fd();
send_msg(fd, send_buf, sendmsg_settings).map_err(Into::into)
}) {
Ok(n) => Poll::Ready(Ok(n)),
Err(e) if e.kind() == ErrorKind::WouldBlock => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
}
},
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
}

#[cfg(target_os = "linux")]
pub async fn recv_from(
socket: &UdpSocket, read_buf: &mut [u8], msg_flags: Option<MsgFlags>,
store_cmsg_settings: &mut RecvMsgCmsgSettings,
) -> Result<RecvData> {
loop {
// Important to use try_io so that Tokio can clear the socket's readiness
// flag
let res = socket.try_io(Interest::READABLE, || {
let fd = socket.as_fd();
recv_msg(
fd,
read_buf,
msg_flags.unwrap_or(MsgFlags::empty()),
store_cmsg_settings,
)
.map_err(Into::into)
});

match res {
Err(e) if e.kind() == ErrorKind::WouldBlock =>
socket.readable().await?,
_ => return res,
}
}
}

#[cfg(not(target_os = "linux"))]
pub async fn send_to(
socket: &UdpSocket, client_addr: SocketAddr,
socket: &UdpSocket, send_buf: &[u8], sendmsg_settings: SendMsgSettings,
) -> Result<usize> {
socket.send_to(send_buf, client_addr).await
std::future::poll_fn(|mut cx| {
poll_send_to(socket, &mut cx, send_buf, sendmsg_settings)
})
.await
}

#[cfg(not(target_os = "linux"))]
#[cfg(target_os = "linux")]
pub fn poll_recv_from(
socket: &UdpSocket, ctx: &mut Context<'_>, recv_buf: &mut [u8],
recvmsg_settings: &mut RecvMsgSettings,
) -> Poll<Result<RecvData>> {
match socket.poll_recv_ready(ctx) {
Poll::Ready(Ok(())) => {
// Important to use try_io so that Tokio can clear the socket's
// readiness flag
match socket.try_io(Interest::READABLE, || {
let fd = socket.as_fd();
recv_msg(fd, recv_buf, recvmsg_settings).map_err(Into::into)
}) {
Ok(n) => Poll::Ready(Ok(n)),
Err(e) if e.kind() == ErrorKind::WouldBlock => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
}
},
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
}

#[cfg(target_os = "linux")]
pub async fn recv_from(
socket: &UdpSocket, read_buf: &mut [u8],
socket: &UdpSocket, recv_buf: &mut [u8],
recvmsg_settings: &mut RecvMsgSettings,
) -> Result<RecvData> {
let recv = socket.recv(read_buf).await?;

Ok(RecvData::from_bytes(bytes))
std::future::poll_fn(|mut ctx| {
poll_recv_from(socket, &mut ctx, recv_buf, recvmsg_settings)
})
.await
}

0 comments on commit ee70777

Please sign in to comment.