-
Notifications
You must be signed in to change notification settings - Fork 176
[runtime/network/tokio] add read buffering #2593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
208c34e
8d6d71d
d1dc202
e2e2c05
0411e68
a759505
e2fdf8c
d9003bc
d9a87c5
f479216
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,23 +29,121 @@ impl crate::Sink for Sink { | |
| } | ||
|
|
||
| /// Implementation of [crate::Stream] for the [tokio] runtime. | ||
| /// | ||
| /// This stream uses an internal read buffer to reduce syscall overhead. | ||
| /// Multiple small reads can be satisfied from the buffer without | ||
| /// additional network operations. | ||
| pub struct Stream { | ||
| read_timeout: Duration, | ||
| stream: OwnedReadHalf, | ||
| /// Internal buffer for batching reads. | ||
| buffer: Vec<u8>, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should use
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. #2595 may be a better soln |
||
| /// Start position of valid data in the buffer. | ||
| start: usize, | ||
| /// End position of valid data in the buffer (exclusive). | ||
| end: usize, | ||
| } | ||
|
|
||
| impl Stream { | ||
| /// Returns the number of buffered bytes available. | ||
| #[inline] | ||
| const fn buffered(&self) -> usize { | ||
| self.end - self.start | ||
| } | ||
|
|
||
| /// Compacts the buffer by moving valid data to the front. | ||
| /// Only called when we need space and start > 0. | ||
| #[inline] | ||
| fn compact(&mut self) { | ||
| if self.start > 0 { | ||
| self.buffer.copy_within(self.start..self.end, 0); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure we ever need to actually do this. Whenever we need to fill the buffer, there is nothing of interest left (should I think be good to just start at 0 in buffered read). |
||
| self.end -= self.start; | ||
| self.start = 0; | ||
| } | ||
| } | ||
|
|
||
| /// Reads at least `min_bytes` into the internal buffer, up to available capacity. | ||
| /// Returns the total number of bytes read, or an error. | ||
| async fn fill_buffer(&mut self, min_bytes: usize) -> Result<usize, Error> { | ||
| if min_bytes == 0 { | ||
| return Ok(0); | ||
| } | ||
|
|
||
| // Compact if we need space | ||
| self.compact(); | ||
|
|
||
| let mut total_read = 0; | ||
patrick-ogrady marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| // Read at least min_bytes, up to buffer capacity | ||
| while total_read < min_bytes { | ||
| let bytes_read = timeout( | ||
| self.read_timeout, | ||
patrick-ogrady marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| self.stream.read(&mut self.buffer[self.end..]), | ||
| ) | ||
| .await | ||
| .map_err(|_| Error::Timeout)? | ||
| .map_err(|_| Error::RecvFailed)?; | ||
|
|
||
| if bytes_read == 0 { | ||
| return Err(Error::RecvFailed); // EOF | ||
| } | ||
|
|
||
| self.end += bytes_read; | ||
| total_read += bytes_read; | ||
| } | ||
|
|
||
| Ok(total_read) | ||
| } | ||
|
|
||
| /// Copies bytes from the internal buffer to the output. | ||
| /// Returns the number of bytes copied. | ||
| #[inline] | ||
| fn copy_from_buffer(&mut self, output: &mut [u8]) -> usize { | ||
| let to_copy = output.len().min(self.buffered()); | ||
| output[..to_copy].copy_from_slice(&self.buffer[self.start..self.start + to_copy]); | ||
| self.start += to_copy; | ||
| to_copy | ||
| } | ||
| } | ||
|
|
||
| impl crate::Stream for Stream { | ||
| async fn recv(&mut self, buf: impl Into<StableBuf> + Send) -> Result<StableBuf, Error> { | ||
| let mut buf = buf.into(); | ||
| if buf.is_empty() { | ||
| let needed = buf.len(); | ||
| if needed == 0 { | ||
| return Ok(buf); | ||
| } | ||
|
|
||
| // Time out if we take too long to read | ||
| timeout(self.read_timeout, self.stream.read_exact(buf.as_mut())) | ||
| let mut filled = 0; | ||
|
|
||
| // First, drain any buffered data | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tokio has some built-ins but AFAICT nothing quite right |
||
| if self.buffered() > 0 { | ||
| filled = self.copy_from_buffer(&mut buf.as_mut()[..needed]); | ||
| if filled == needed { | ||
| return Ok(buf); | ||
| } | ||
| } | ||
|
|
||
| // Need more data. If the remaining request is large (>= buffer capacity), | ||
| // read directly into the output buffer to avoid extra copies. | ||
| let remaining = needed - filled; | ||
| if remaining >= self.buffer.len() { | ||
| // Read directly into output buffer | ||
| timeout( | ||
| self.read_timeout, | ||
| self.stream.read_exact(&mut buf.as_mut()[filled..]), | ||
| ) | ||
| .await | ||
| .map_err(|_| Error::Timeout)? | ||
| .map_err(|_| Error::RecvFailed)?; | ||
| return Ok(buf); | ||
| } | ||
|
|
||
| // For smaller remaining requests, fill the buffer with at least | ||
| // the remaining bytes needed (but opportunistically read more), | ||
| // then copy out what we need. | ||
| self.fill_buffer(remaining).await?; | ||
| self.copy_from_buffer(&mut buf.as_mut()[filled..needed]); | ||
|
|
||
| Ok(buf) | ||
| } | ||
|
|
@@ -73,16 +171,19 @@ impl crate::Listener for Listener { | |
| } | ||
|
|
||
| // Return the sink and stream | ||
| let (stream, sink) = stream.into_split(); | ||
| let (read_half, write_half) = stream.into_split(); | ||
patrick-ogrady marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Ok(( | ||
| addr, | ||
| Sink { | ||
| write_timeout: self.cfg.write_timeout, | ||
| sink, | ||
| sink: write_half, | ||
| }, | ||
| Stream { | ||
| read_timeout: self.cfg.read_timeout, | ||
| stream, | ||
| stream: read_half, | ||
| buffer: vec![0u8; self.cfg.read_buffer_size], | ||
| start: 0, | ||
| end: 0, | ||
| }, | ||
| )) | ||
| } | ||
|
|
@@ -92,6 +193,9 @@ impl crate::Listener for Listener { | |
| } | ||
| } | ||
|
|
||
| /// Default read buffer size (64 KB). | ||
| const DEFAULT_READ_BUFFER_SIZE: usize = 64 * 1024; | ||
|
|
||
| /// Configuration for the tokio [Network] implementation of the [crate::Network] trait. | ||
| #[derive(Clone, Debug)] | ||
| pub struct Config { | ||
|
|
@@ -110,6 +214,11 @@ pub struct Config { | |
| read_timeout: Duration, | ||
| /// Write timeout for connections, after which the connection will be closed | ||
| write_timeout: Duration, | ||
| /// Size of the read buffer for batching network reads. | ||
| /// | ||
| /// A larger buffer reduces syscall overhead by reading more data per call, | ||
| /// but uses more memory per connection. Defaults to 64 KB. | ||
| read_buffer_size: usize, | ||
| } | ||
|
|
||
| #[cfg_attr(feature = "iouring-network", allow(dead_code))] | ||
|
|
@@ -130,6 +239,11 @@ impl Config { | |
| self.write_timeout = write_timeout; | ||
| self | ||
| } | ||
| /// See [Config] | ||
| pub const fn with_read_buffer_size(mut self, read_buffer_size: usize) -> Self { | ||
| self.read_buffer_size = read_buffer_size; | ||
| self | ||
| } | ||
|
|
||
| // Getters | ||
| /// See [Config] | ||
|
|
@@ -144,6 +258,10 @@ impl Config { | |
| pub const fn write_timeout(&self) -> Duration { | ||
| self.write_timeout | ||
| } | ||
| /// See [Config] | ||
| pub const fn read_buffer_size(&self) -> usize { | ||
| self.read_buffer_size | ||
| } | ||
| } | ||
|
|
||
| impl Default for Config { | ||
|
|
@@ -152,6 +270,7 @@ impl Default for Config { | |
| tcp_nodelay: None, | ||
| read_timeout: Duration::from_secs(60), | ||
| write_timeout: Duration::from_secs(30), | ||
| read_buffer_size: DEFAULT_READ_BUFFER_SIZE, | ||
patrick-ogrady marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
| } | ||
|
|
@@ -204,25 +323,31 @@ impl crate::Network for Network { | |
| } | ||
|
|
||
| // Return the sink and stream | ||
| let (stream, sink) = stream.into_split(); | ||
| let (read_half, write_half) = stream.into_split(); | ||
patrick-ogrady marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Ok(( | ||
| Sink { | ||
| write_timeout: self.cfg.write_timeout, | ||
| sink, | ||
| sink: write_half, | ||
| }, | ||
| Stream { | ||
| read_timeout: self.cfg.read_timeout, | ||
| stream, | ||
| stream: read_half, | ||
| buffer: vec![0u8; self.cfg.read_buffer_size], | ||
| start: 0, | ||
| end: 0, | ||
| }, | ||
| )) | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use crate::network::{tests, tokio as TokioNetwork}; | ||
| use crate::{ | ||
| network::{tests, tokio as TokioNetwork}, | ||
| Listener as _, Network as _, Sink as _, Stream as _, | ||
| }; | ||
| use commonware_macros::test_group; | ||
| use std::time::Duration; | ||
| use std::time::{Duration, Instant}; | ||
|
|
||
| #[tokio::test] | ||
| async fn test_trait() { | ||
|
|
@@ -248,4 +373,46 @@ mod tests { | |
| }) | ||
| .await; | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_small_send_read_quickly() { | ||
| // Use a long read timeout to ensure we're not just waiting for timeout | ||
| let read_timeout = Duration::from_secs(30); | ||
| let network = TokioNetwork::Network::from( | ||
| TokioNetwork::Config::default() | ||
| .with_read_timeout(read_timeout) | ||
| .with_write_timeout(Duration::from_secs(5)), | ||
| ); | ||
|
|
||
| // Bind a listener | ||
| let mut listener = network.bind("127.0.0.1:0".parse().unwrap()).await.unwrap(); | ||
| let addr = listener.local_addr().unwrap(); | ||
|
|
||
| // Spawn a task to accept and read | ||
| let reader = tokio::spawn(async move { | ||
| let (_addr, _sink, mut stream) = listener.accept().await.unwrap(); | ||
|
|
||
| // Read a small message (much smaller than the 64KB buffer) | ||
| let start = Instant::now(); | ||
| let buf = stream.recv(vec![0u8; 10]).await.unwrap(); | ||
| let elapsed = start.elapsed(); | ||
|
|
||
| (buf, elapsed) | ||
| }); | ||
|
|
||
| // Connect and send a small message | ||
| let (mut sink, _stream) = network.dial(addr).await.unwrap(); | ||
| let msg = vec![1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10]; | ||
| sink.send(msg.clone()).await.unwrap(); | ||
|
|
||
| // Wait for the reader to complete | ||
| let (received, elapsed) = reader.await.unwrap(); | ||
|
|
||
| // Verify we got the right data | ||
| assert_eq!(received.as_ref(), &msg[..]); | ||
|
|
||
| // Verify it completed quickly (well under the read timeout) | ||
| // Should complete in milliseconds, not seconds | ||
| assert!(elapsed < read_timeout); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.