Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions examples/sync/src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,8 @@ where
outgoing = response_receiver.next() => {
if let Some(response) = outgoing {
// We have a response to send to the client.
let response_data = response.encode().to_vec();
if let Err(err) = send_frame(&mut sink, &response_data, MAX_MESSAGE_SIZE).await {
let response_data = response.encode();
if let Err(err) = send_frame(&mut sink, response_data, MAX_MESSAGE_SIZE).await {
info!(client_addr = %client_addr, ?err, "send failed (client likely disconnected)");
state.error_counter.inc();
return Ok(());
Expand Down
4 changes: 2 additions & 2 deletions examples/sync/src/net/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ async fn run_loop<E, Si, St, M>(
Some(Request { request, response_tx }) => {
let request_id = request.request_id();
pending_requests.insert(request_id, response_tx);
let data = request.encode().to_vec();
if let Err(e) = send_frame(&mut sink, &data, MAX_MESSAGE_SIZE).await {
let data = request.encode();
if let Err(e) = send_frame(&mut sink, data, MAX_MESSAGE_SIZE).await {
if let Some(sender) = pending_requests.remove(&request_id) {
let _ = sender.send(Err(Error::Network(e)));
}
Expand Down
6 changes: 4 additions & 2 deletions p2p/src/simulated/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,9 @@ impl Link {
context.with_label("link").spawn(move |context| async move {
// Dial the peer and handshake by sending it the dialer's public key
let (mut sink, _) = context.dial(socket).await.unwrap();
if let Err(err) = send_frame(&mut sink, &dialer, max_size).await {
if let Err(err) =
send_frame(&mut sink, Bytes::from_owner(dialer.clone()), max_size).await
{
error!(?err, "failed to send public key to listener");
return;
}
Expand All @@ -1070,7 +1072,7 @@ impl Link {
data.extend_from_slice(&channel.to_be_bytes());
data.extend_from_slice(&message);
let data = data.freeze();
let _ = send_frame(&mut sink, &data, max_size).await;
let _ = send_frame(&mut sink, data, max_size).await;

// Bump received messages metric
received_messages
Expand Down
61 changes: 45 additions & 16 deletions runtime/src/iouring/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
//! 3. If `shutdown_timeout` is configured, abandons remaining operations after the timeout
//! 4. Cleans up and exits

use bytes::Buf;
use commonware_utils::StableBuf;
use futures::{
channel::{mpsc, oneshot},
Expand All @@ -78,13 +79,17 @@ const TIMEOUT_WORK_ID: u64 = u64::MAX;
/// Active operations keyed by their work id.
///
/// Each entry keeps the caller's oneshot sender, the `StableBuf` that must stay
/// alive until the kernel finishes touching it, and when op_timeout is enabled,
/// the boxed `Timespec` used when we link in an IOSQE_IO_LINK timeout.
/// alive until the kernel finishes touching it, an optional boxed `Buf` that must
/// remain valid and is returned to the caller, an optional keepalive that must
/// stay alive but is not returned, and when op_timeout is enabled, the boxed
/// `Timespec` used when we link in an IOSQE_IO_LINK timeout.
type Waiters = HashMap<
u64,
(
oneshot::Sender<(i32, Option<StableBuf>)>,
oneshot::Sender<(i32, Option<StableBuf>, Option<Box<dyn Buf + Send>>)>,
Option<StableBuf>,
Option<Box<dyn Buf + Send>>,
Option<Box<dyn Send>>,
Option<Box<Timespec>>,
),
>;
Expand Down Expand Up @@ -205,14 +210,22 @@ pub struct Op {
/// The submission queue entry to be submitted to the ring.
/// Its user data field will be overwritten. Users shouldn't rely on it.
pub work: SqueueEntry,
/// Sends the result of the operation and `buffer`.
pub sender: oneshot::Sender<(i32, Option<StableBuf>)>,
/// Sends the result of the operation, the `buffer`, and the `buf`.
pub sender: oneshot::Sender<(i32, Option<StableBuf>, Option<Box<dyn Buf + Send>>)>,
/// The buffer used for the operation, if any.
/// E.g. For read, this is the buffer being read into.
/// If None, the operation doesn't use a buffer (e.g. a sync operation).
/// We hold the buffer here so it's guaranteed to live until the operation
/// completes, preventing write-after-free issues.
pub buffer: Option<StableBuf>,
/// A boxed `Buf` to keep alive for the duration of the operation and return
/// to the caller. This is useful for vectored I/O where the original buffer
/// must remain valid and the caller needs it back to call `advance()`.
pub buf: Option<Box<dyn Buf + Send>>,
/// Additional data to keep alive for the duration of the operation but not
/// returned to the caller. Useful for things like iovec arrays that the kernel
/// references during the operation.
pub keepalive: Option<Box<dyn Send>>,
Comment on lines 215 to +228
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kept the original StableBuf in here given that storage still uses it; If we proceeded, we'd likely want to do a full swap of StableBuf -> Bytes/Buf throughout the storage and network primitives.

}

// Returns false iff we received a shutdown timeout
Expand All @@ -235,8 +248,9 @@ fn handle_cqe(waiters: &mut Waiters, cqe: CqueueEntry, cfg: &Config) {
result
};

let (result_sender, buffer, _) = waiters.remove(&work_id).expect("missing sender");
let _ = result_sender.send((result, buffer));
let (result_sender, buffer, buf, _keepalive, _timespec) =
waiters.remove(&work_id).expect("missing sender");
let _ = result_sender.send((result, buffer, buf));
}
}
}
Expand Down Expand Up @@ -296,6 +310,8 @@ pub(crate) async fn run(cfg: Config, metrics: Arc<Metrics>, mut receiver: mpsc::
mut work,
sender,
buffer,
buf,
keepalive,
} = op;

// Assign a unique id
Expand Down Expand Up @@ -357,7 +373,7 @@ pub(crate) async fn run(cfg: Config, metrics: Arc<Metrics>, mut receiver: mpsc::
};

// We'll send the result of this operation to `sender`.
waiters.insert(work_id, (sender, buffer, timespec));
waiters.insert(work_id, (sender, buffer, buf, keepalive, timespec));
}

// Submit and wait for at least 1 item to be in the completion queue.
Expand Down Expand Up @@ -480,6 +496,8 @@ mod tests {
work: recv,
sender: recv_tx,
buffer: Some(buf.into()),
buf: None,
keepalive: None,
})
.await
.expect("failed to send work");
Expand All @@ -498,15 +516,17 @@ mod tests {
work: write,
sender: write_tx,
buffer: Some(msg.into()),
buf: None,
keepalive: None,
})
.await
.expect("failed to send work");

// Wait for the read and write operations to complete.
if should_succeed {
let (result, _) = recv_rx.await.expect("failed to receive result");
let (result, _, _) = recv_rx.await.expect("failed to receive result");
assert!(result > 0, "recv failed: {result}");
let (result, _) = write_rx.await.expect("failed to receive result");
let (result, _, _) = write_rx.await.expect("failed to receive result");
assert!(result > 0, "write failed: {result}");
} else {
let _ = recv_rx.await;
Expand Down Expand Up @@ -567,11 +587,13 @@ mod tests {
work,
sender: tx,
buffer: Some(buf.into()),
buf: None,
keepalive: None,
})
.await
.expect("failed to send work");
// Wait for the timeout
let (result, _) = rx.await.expect("failed to receive result");
let (result, _, _) = rx.await.expect("failed to receive result");
assert_eq!(result, -libc::ETIMEDOUT);
drop(submitter);
handle.await.unwrap();
Expand All @@ -597,6 +619,8 @@ mod tests {
work: timeout,
sender: tx,
buffer: None,
buf: None,
keepalive: None,
})
.await
.unwrap();
Expand All @@ -605,7 +629,7 @@ mod tests {
drop(submitter);

// Wait for the operation `timeout` to fire.
let (result, _) = rx.await.unwrap();
let (result, _, _) = rx.await.unwrap();
assert_eq!(result, -libc::ETIME);
handle.await.unwrap();
}
Expand All @@ -630,6 +654,8 @@ mod tests {
work: timeout,
sender: tx,
buffer: None,
buf: None,
keepalive: None,
})
.await
.unwrap();
Expand All @@ -642,8 +668,7 @@ mod tests {

// The event loop should shut down before the `timeout` fires,
// dropping `tx` and causing `rx` to return Canceled.
let err = rx.await.unwrap_err();
assert!(matches!(err, Canceled { .. }));
assert!(matches!(rx.await, Err(Canceled { .. })));
handle.await.unwrap();
}

Expand Down Expand Up @@ -673,6 +698,8 @@ mod tests {
work: nop,
sender: tx,
buffer: None,
buf: None,
keepalive: None,
})
.await
.unwrap();
Expand All @@ -681,7 +708,7 @@ mod tests {

// All NOPs should complete successfully
for rx in rxs {
let (res, _) = rx.await.unwrap();
let (res, _, _) = rx.await.unwrap();
assert_eq!(res, 0, "NOP op failed: {res}");
}

Expand Down Expand Up @@ -711,12 +738,14 @@ mod tests {
work: opcode::Nop::new().build(),
sender: tx,
buffer: None,
buf: None,
keepalive: None,
})
.await
.unwrap();

// Verify it completes successfully
let (result, _) = rx.await.unwrap();
let (result, _, _) = rx.await.unwrap();
assert_eq!(result, 0);

// Clean shutdown
Expand Down
15 changes: 11 additions & 4 deletions runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
html_favicon_url = "https://commonware.xyz/favicon.ico"
)]

use bytes::Buf;
use commonware_macros::select;
use commonware_utils::StableBuf;
use prometheus_client::registry::Metric;
Expand Down Expand Up @@ -436,10 +437,17 @@ pub trait Listener: Sync + Send + 'static {
/// Interface that any runtime must implement to send
/// messages over a network connection.
pub trait Sink: Sync + Send + 'static {
/// Send a message to the sink.
/// Send messages to the sink using vectored I/O.
///
/// All buffers are written in order as if they were concatenated
/// into a single contiguous message. The implementation guarantees
/// that either all bytes are written or an error is returned.
///
/// Implementations restrict the maximum number of buffers that can be
/// written at once to `16`.
fn send(
&mut self,
msg: impl Into<StableBuf> + Send,
bufs: impl Buf + Send + 'static,
) -> impl Future<Output = Result<(), Error>> + Send;
}

Expand Down Expand Up @@ -545,7 +553,6 @@ pub trait Blob: Clone + Send + Sync + 'static {
mod tests {
use super::*;
use crate::telemetry::traces::collector::TraceStorage;
use bytes::Bytes;
use commonware_macros::{select, test_collect_traces};
use futures::{
channel::{mpsc, oneshot},
Expand Down Expand Up @@ -2600,7 +2607,7 @@ mod tests {
let request = format!(
"GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
);
sink.send(Bytes::from(request).to_vec()).await.unwrap();
sink.send(bytes::Bytes::from(request)).await.unwrap();

// Read and verify the HTTP status line
let status_line = read_line(&mut stream).await.unwrap();
Expand Down
32 changes: 18 additions & 14 deletions runtime/src/mocks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! A mock implementation of a channel that implements the Sink and Stream traits.

use crate::{Error, Sink as SinkTrait, Stream as StreamTrait};
use bytes::Bytes;
use bytes::{Buf, Bytes};
use commonware_utils::StableBuf;
use futures::channel::oneshot;
use std::{
Expand Down Expand Up @@ -50,8 +50,7 @@ pub struct Sink {
}

impl SinkTrait for Sink {
async fn send(&mut self, msg: impl Into<StableBuf> + Send) -> Result<(), Error> {
let msg = msg.into();
async fn send(&mut self, mut buf: impl Buf + Send + 'static) -> Result<(), Error> {
let (os_send, data) = {
let mut channel = self.channel.lock().unwrap();

Expand All @@ -60,8 +59,15 @@ impl SinkTrait for Sink {
return Err(Error::Closed);
}

// Reserve memory for the upcoming write.
let total_size = buf.remaining();
let current_len = channel.buffer.len();
channel.buffer.resize(total_size + current_len, 0);

// Add the data to the buffer.
channel.buffer.extend(msg.as_ref());
buf.copy_to_slice(
&mut channel.buffer.make_contiguous()[current_len..current_len + total_size],
);

// If there is a waiter and the buffer is large enough,
// return the waiter (while clearing the waiter field).
Expand Down Expand Up @@ -153,11 +159,11 @@ mod tests {
#[test]
fn test_send_recv() {
let (mut sink, mut stream) = Channel::init();
let data = b"hello world".to_vec();
let data = b"hello world";

let executor = deterministic::Runner::default();
executor.start(|_| async move {
sink.send(data.clone()).await.unwrap();
sink.send(Bytes::from_static(data)).await.unwrap();
let buf = stream.recv(vec![0; data.len()]).await.unwrap();
assert_eq!(buf.as_ref(), data);
});
Expand All @@ -166,13 +172,11 @@ mod tests {
#[test]
fn test_send_recv_partial_multiple() {
let (mut sink, mut stream) = Channel::init();
let data = b"hello".to_vec();
let data2 = b" world".to_vec();

let executor = deterministic::Runner::default();
executor.start(|_| async move {
sink.send(data).await.unwrap();
sink.send(data2).await.unwrap();
sink.send(Bytes::from_static(b"hello")).await.unwrap();
sink.send(Bytes::from_static(b" world")).await.unwrap();
let buf = stream.recv(vec![0; 5]).await.unwrap();
assert_eq!(buf.as_ref(), b"hello");
let buf = stream.recv(buf).await.unwrap();
Expand All @@ -191,7 +195,7 @@ mod tests {
executor.start(|_| async move {
let (buf, _) = futures::try_join!(stream.recv(vec![0; data.len()]), async {
sleep(Duration::from_millis(50));
sink.send(data.to_vec()).await
sink.send(data.as_ref()).await
})
.unwrap();
assert_eq!(buf.as_ref(), data);
Expand Down Expand Up @@ -237,7 +241,7 @@ mod tests {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
// Send some bytes
assert!(sink.send(b"7 bytes".to_vec()).await.is_ok());
assert!(sink.send(b"7 bytes".as_slice()).await.is_ok());

// Spawn a task to initiate recv's where the first one will succeed and then will drop.
let handle = context.clone().spawn(|_| async move {
Expand All @@ -253,7 +257,7 @@ mod tests {
assert!(matches!(handle.await, Err(Error::Closed)));

// Try to send a message. The stream is dropped, so this should fail.
let result = sink.send(b"hello world".to_vec()).await;
let result = sink.send(b"hello world".as_slice()).await;
assert!(matches!(result, Err(Error::Closed)));
});
}
Expand All @@ -265,7 +269,7 @@ mod tests {

let executor = deterministic::Runner::default();
executor.start(|_| async move {
let result = sink.send(b"hello world".to_vec()).await;
let result = sink.send(b"hello world".as_slice()).await;
assert!(matches!(result, Err(Error::Closed)));
});
}
Expand Down
Loading
Loading