From ed778179e7b50ee203bfbebb595e4b678e90ac4c Mon Sep 17 00:00:00 2001 From: clabby Date: Sat, 20 Dec 2025 14:28:05 -0500 Subject: [PATCH 1/4] [runtime/network/tokio] Use `BufReader` for read buffering --- runtime/src/network/tokio.rs | 118 ++++++++++++++++++++++++++++++++--- 1 file changed, 110 insertions(+), 8 deletions(-) diff --git a/runtime/src/network/tokio.rs b/runtime/src/network/tokio.rs index b42ed2f954..799611dee1 100644 --- a/runtime/src/network/tokio.rs +++ b/runtime/src/network/tokio.rs @@ -2,7 +2,7 @@ use crate::Error; use commonware_utils::StableBuf; use std::{net::SocketAddr, time::Duration}; use tokio::{ - io::{AsyncReadExt as _, AsyncWriteExt as _}, + io::{AsyncReadExt as _, AsyncWriteExt as _, BufReader}, net::{ tcp::{OwnedReadHalf, OwnedWriteHalf}, TcpListener, TcpStream, @@ -29,9 +29,12 @@ impl crate::Sink for Sink { } /// Implementation of [crate::Stream] for the [tokio] runtime. +/// +/// Uses a [`BufReader`] to reduce syscall overhead. Multiple small reads +/// can be satisfied from the buffer without additional network operations. pub struct Stream { read_timeout: Duration, - stream: OwnedReadHalf, + stream: BufReader, } impl crate::Stream for Stream { @@ -73,7 +76,7 @@ impl crate::Listener for Listener { } // Return the sink and stream - let (stream, sink) = stream.into_split(); + let (read_half, sink) = stream.into_split(); Ok(( addr, Sink { @@ -82,7 +85,7 @@ impl crate::Listener for Listener { }, Stream { read_timeout: self.cfg.read_timeout, - stream, + stream: BufReader::with_capacity(self.cfg.read_buffer_size, read_half), }, )) } @@ -110,6 +113,11 @@ pub struct Config { read_timeout: Duration, /// Write timeout for connections, after which the connection will be closed write_timeout: Duration, + /// Size of the read buffer for batching network reads. + /// + /// A larger buffer reduces syscall overhead by reading more data per call, + /// but uses more memory per connection. Defaults to 64 KB. + read_buffer_size: usize, } #[cfg_attr(feature = "iouring-network", allow(dead_code))] @@ -130,6 +138,11 @@ impl Config { self.write_timeout = write_timeout; self } + /// See [Config] + pub const fn with_read_buffer_size(mut self, read_buffer_size: usize) -> Self { + self.read_buffer_size = read_buffer_size; + self + } // Getters /// See [Config] @@ -144,6 +157,10 @@ impl Config { pub const fn write_timeout(&self) -> Duration { self.write_timeout } + /// See [Config] + pub const fn read_buffer_size(&self) -> usize { + self.read_buffer_size + } } impl Default for Config { @@ -152,6 +169,7 @@ impl Default for Config { tcp_nodelay: None, read_timeout: Duration::from_secs(60), write_timeout: Duration::from_secs(30), + read_buffer_size: 64 * 1024, // 64 KB } } } @@ -204,7 +222,7 @@ impl crate::Network for Network { } // Return the sink and stream - let (stream, sink) = stream.into_split(); + let (read_half, sink) = stream.into_split(); Ok(( Sink { write_timeout: self.cfg.write_timeout, @@ -212,7 +230,7 @@ impl crate::Network for Network { }, Stream { read_timeout: self.cfg.read_timeout, - stream, + stream: BufReader::with_capacity(self.cfg.read_buffer_size, read_half), }, )) } @@ -220,9 +238,12 @@ impl crate::Network for Network { #[cfg(test)] mod tests { - use crate::network::{tests, tokio as TokioNetwork}; + use crate::{ + network::{tests, tokio as TokioNetwork}, + Listener as _, Network as _, Sink as _, Stream as _, + }; use commonware_macros::test_group; - use std::time::Duration; + use std::time::{Duration, Instant}; #[tokio::test] async fn test_trait() { @@ -248,4 +269,85 @@ mod tests { }) .await; } + + #[tokio::test] + async fn test_small_send_read_quickly() { + // Use a long read timeout to ensure we're not just waiting for timeout + let read_timeout = Duration::from_secs(30); + let network = TokioNetwork::Network::from( + TokioNetwork::Config::default() + .with_read_timeout(read_timeout) + .with_write_timeout(Duration::from_secs(5)), + ); + + // Bind a listener + let mut listener = network.bind("127.0.0.1:0".parse().unwrap()).await.unwrap(); + let addr = listener.local_addr().unwrap(); + + // Spawn a task to accept and read + let reader = tokio::spawn(async move { + let (_addr, _sink, mut stream) = listener.accept().await.unwrap(); + + // Read a small message (much smaller than the 64KB buffer) + let start = Instant::now(); + let buf = stream.recv(vec![0u8; 10]).await.unwrap(); + let elapsed = start.elapsed(); + + (buf, elapsed) + }); + + // Connect and send a small message + let (mut sink, _stream) = network.dial(addr).await.unwrap(); + let msg = vec![1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + sink.send(msg.clone()).await.unwrap(); + + // Wait for the reader to complete + let (received, elapsed) = reader.await.unwrap(); + + // Verify we got the right data + assert_eq!(received.as_ref(), &msg[..]); + + // Verify it completed quickly (well under the read timeout) + // Should complete in milliseconds, not seconds + assert!(elapsed < read_timeout); + } + + #[tokio::test] + async fn test_read_timeout_with_partial_data() { + // Use a short read timeout to make the test fast + let read_timeout = Duration::from_millis(100); + let network = TokioNetwork::Network::from( + TokioNetwork::Config::default() + .with_read_timeout(read_timeout) + .with_write_timeout(Duration::from_secs(5)), + ); + + // Bind a listener + let mut listener = network.bind("127.0.0.1:0".parse().unwrap()).await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let reader = tokio::spawn(async move { + let (_addr, _sink, mut stream) = listener.accept().await.unwrap(); + + // Try to read 100 bytes, but only 5 will be sent + let start = Instant::now(); + let result = stream.recv(vec![0u8; 100]).await; + let elapsed = start.elapsed(); + + (result, elapsed) + }); + + // Connect and send only partial data + let (mut sink, _stream) = network.dial(addr).await.unwrap(); + sink.send(vec![1u8, 2, 3, 4, 5]).await.unwrap(); + + // Wait for the reader to complete + let (result, elapsed) = reader.await.unwrap(); + assert!(matches!(result, Err(crate::Error::Timeout))); + + // Verify the timeout occurred around the expected time + assert!(elapsed >= read_timeout); + // Allow some margin for timing variance + assert!(elapsed < read_timeout + Duration::from_millis(10)); + } } From 74f08f004cd00acd3afef10aa860f58097921d23 Mon Sep 17 00:00:00 2001 From: clabby Date: Sat, 20 Dec 2025 15:00:46 -0500 Subject: [PATCH 2/4] lost data test --- runtime/src/network/tokio.rs | 69 ++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/runtime/src/network/tokio.rs b/runtime/src/network/tokio.rs index 799611dee1..ee2e49a195 100644 --- a/runtime/src/network/tokio.rs +++ b/runtime/src/network/tokio.rs @@ -350,4 +350,73 @@ mod tests { // Allow some margin for timing variance assert!(elapsed < read_timeout + Duration::from_millis(10)); } + + #[tokio::test] + async fn test_timeout_discards_partial_read() { + use tokio::sync::oneshot; + + let read_timeout = Duration::from_millis(50); + let network = TokioNetwork::Network::from( + TokioNetwork::Config::default() + .with_read_timeout(read_timeout) + .with_write_timeout(Duration::from_secs(5)), + ); + + let mut listener = network.bind("127.0.0.1:0".parse().unwrap()).await.unwrap(); + let addr = listener.local_addr().unwrap(); + + // Channel to coordinate between sender and receiver + let (first_msg_sent_tx, first_msg_sent_rx) = oneshot::channel::<()>(); + let (timeout_occurred_tx, timeout_occurred_rx) = oneshot::channel::<()>(); + + let reader = tokio::spawn(async move { + let (_addr, _sink, mut stream) = listener.accept().await.unwrap(); + + // Wait for first message to be sent + first_msg_sent_rx.await.unwrap(); + + // Give time for the data to arrive and be buffered + tokio::time::sleep(Duration::from_millis(10)).await; + + // Try to read 100 bytes, but only 5 were sent - this will timeout. + // The 5 bytes that were partially read are discarded. + let result = stream.recv(vec![0u8; 100]).await; + assert!(result.is_err()); // Should timeout + + // Signal that timeout occurred + timeout_occurred_tx.send(()).unwrap(); + + // Read the second message - the first message's bytes are gone + let result = stream.recv(vec![0u8; 5]).await; + + (stream, result) + }); + + // Connect and send first message + let (mut sink, _stream) = network.dial(addr).await.unwrap(); + let first_msg = vec![b'A'; 5]; + sink.send(first_msg.clone()).await.unwrap(); + first_msg_sent_tx.send(()).unwrap(); + + // Wait for the timeout to occur + timeout_occurred_rx.await.unwrap(); + + // Send second message + let second_msg = vec![b'B'; 5]; + sink.send(second_msg.clone()).await.unwrap(); + + // Get results + let (_stream, result) = reader.await.unwrap(); + let received = result.expect("second read should succeed"); + + // The first message was discarded due to timeout - we get the second. + // This is correct: after a timeout, the stream is in an undefined state + // and should be closed. Continuing to read is undefined behavior. + assert_eq!(received.as_ref(), &second_msg[..], "Should have lost data"); + assert_ne!( + received.as_ref(), + &first_msg[..], + "Should not have received the first message" + ); + } } From 742f33e993e63e3bf86a9cde1e54fa2730c3959a Mon Sep 17 00:00:00 2001 From: clabby Date: Sat, 20 Dec 2025 15:09:56 -0500 Subject: [PATCH 3/4] @patrick-ogrady review --- runtime/src/network/tokio.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/runtime/src/network/tokio.rs b/runtime/src/network/tokio.rs index ee2e49a195..596975a9f5 100644 --- a/runtime/src/network/tokio.rs +++ b/runtime/src/network/tokio.rs @@ -76,7 +76,7 @@ impl crate::Listener for Listener { } // Return the sink and stream - let (read_half, sink) = stream.into_split(); + let (stream, sink) = stream.into_split(); Ok(( addr, Sink { @@ -85,7 +85,7 @@ impl crate::Listener for Listener { }, Stream { read_timeout: self.cfg.read_timeout, - stream: BufReader::with_capacity(self.cfg.read_buffer_size, read_half), + stream: BufReader::with_capacity(self.cfg.read_buffer_size, stream), }, )) } @@ -222,7 +222,7 @@ impl crate::Network for Network { } // Return the sink and stream - let (read_half, sink) = stream.into_split(); + let (stream, sink) = stream.into_split(); Ok(( Sink { write_timeout: self.cfg.write_timeout, @@ -230,7 +230,7 @@ impl crate::Network for Network { }, Stream { read_timeout: self.cfg.read_timeout, - stream: BufReader::with_capacity(self.cfg.read_buffer_size, read_half), + stream: BufReader::with_capacity(self.cfg.read_buffer_size, stream), }, )) } @@ -244,6 +244,7 @@ mod tests { }; use commonware_macros::test_group; use std::time::{Duration, Instant}; + use tokio::sync::oneshot; #[tokio::test] async fn test_trait() { @@ -348,13 +349,11 @@ mod tests { // Verify the timeout occurred around the expected time assert!(elapsed >= read_timeout); // Allow some margin for timing variance - assert!(elapsed < read_timeout + Duration::from_millis(10)); + assert!(elapsed < read_timeout * 2); } #[tokio::test] async fn test_timeout_discards_partial_read() { - use tokio::sync::oneshot; - let read_timeout = Duration::from_millis(50); let network = TokioNetwork::Network::from( TokioNetwork::Config::default() From 474535dbf399cc8b7d43f7e6a6545f357cff5544 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sat, 20 Dec 2025 12:38:16 -0800 Subject: [PATCH 4/4] nit --- runtime/src/lib.rs | 8 +++++ runtime/src/network/tokio.rs | 68 ------------------------------------ 2 files changed, 8 insertions(+), 68 deletions(-) diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 6eb07d8366..2720ee5455 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -471,6 +471,10 @@ pub trait Listener: Sync + Send + 'static { /// messages over a network connection. pub trait Sink: Sync + Send + 'static { /// Send a message to the sink. + /// + /// # Warning + /// + /// If the sink returns an error, part of the message may still be delivered. fn send( &mut self, msg: impl Into + Send, @@ -482,6 +486,10 @@ pub trait Sink: Sync + Send + 'static { pub trait Stream: Sync + Send + 'static { /// Receive a message from the stream, storing it in the given buffer. /// Reads exactly the number of bytes that fit in the buffer. + /// + /// # Warning + /// + /// If the stream returns an error, partially read data may be discarded. fn recv( &mut self, buf: impl Into + Send, diff --git a/runtime/src/network/tokio.rs b/runtime/src/network/tokio.rs index 596975a9f5..683c2ceca0 100644 --- a/runtime/src/network/tokio.rs +++ b/runtime/src/network/tokio.rs @@ -244,7 +244,6 @@ mod tests { }; use commonware_macros::test_group; use std::time::{Duration, Instant}; - use tokio::sync::oneshot; #[tokio::test] async fn test_trait() { @@ -351,71 +350,4 @@ mod tests { // Allow some margin for timing variance assert!(elapsed < read_timeout * 2); } - - #[tokio::test] - async fn test_timeout_discards_partial_read() { - let read_timeout = Duration::from_millis(50); - let network = TokioNetwork::Network::from( - TokioNetwork::Config::default() - .with_read_timeout(read_timeout) - .with_write_timeout(Duration::from_secs(5)), - ); - - let mut listener = network.bind("127.0.0.1:0".parse().unwrap()).await.unwrap(); - let addr = listener.local_addr().unwrap(); - - // Channel to coordinate between sender and receiver - let (first_msg_sent_tx, first_msg_sent_rx) = oneshot::channel::<()>(); - let (timeout_occurred_tx, timeout_occurred_rx) = oneshot::channel::<()>(); - - let reader = tokio::spawn(async move { - let (_addr, _sink, mut stream) = listener.accept().await.unwrap(); - - // Wait for first message to be sent - first_msg_sent_rx.await.unwrap(); - - // Give time for the data to arrive and be buffered - tokio::time::sleep(Duration::from_millis(10)).await; - - // Try to read 100 bytes, but only 5 were sent - this will timeout. - // The 5 bytes that were partially read are discarded. - let result = stream.recv(vec![0u8; 100]).await; - assert!(result.is_err()); // Should timeout - - // Signal that timeout occurred - timeout_occurred_tx.send(()).unwrap(); - - // Read the second message - the first message's bytes are gone - let result = stream.recv(vec![0u8; 5]).await; - - (stream, result) - }); - - // Connect and send first message - let (mut sink, _stream) = network.dial(addr).await.unwrap(); - let first_msg = vec![b'A'; 5]; - sink.send(first_msg.clone()).await.unwrap(); - first_msg_sent_tx.send(()).unwrap(); - - // Wait for the timeout to occur - timeout_occurred_rx.await.unwrap(); - - // Send second message - let second_msg = vec![b'B'; 5]; - sink.send(second_msg.clone()).await.unwrap(); - - // Get results - let (_stream, result) = reader.await.unwrap(); - let received = result.expect("second read should succeed"); - - // The first message was discarded due to timeout - we get the second. - // This is correct: after a timeout, the stream is in an undefined state - // and should be closed. Continuing to read is undefined behavior. - assert_eq!(received.as_ref(), &second_msg[..], "Should have lost data"); - assert_ne!( - received.as_ref(), - &first_msg[..], - "Should not have received the first message" - ); - } }