-
Notifications
You must be signed in to change notification settings - Fork 171
[runtime] Vectorized sink #2518
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
runtime/src/lib.rs
Outdated
| /// | ||
| /// Implementations restrict the maximum number of buffers that can be | ||
| /// written at once to `16`. | ||
| fn send(&mut self, bufs: &[&[u8]]) -> impl Future<Output = Result<(), Error>> + Send; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to support StableBuf for iouring safety
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(unless we were doing something very dumb?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do need to keep the buffer alive until the op finishes; still drafting the initial changes, PR is up to run tests on Linux.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now (to retain cancel safety,) I've just gone ahead and allocated a stable buffer underneath the Sink::send impl for io_uring.
I'm not 100% sure if I love this, but the alternative would be to have Sink::sink's signature be:
/// Interface that any runtime must implement to send
/// messages over a network connection.
pub trait Sink: Sync + Send + 'static {
/// Send messages to the sink using vectored I/O.
///
/// All buffers are written in order as if they were concatenated
/// into a single contiguous message. The implementation guarantees
/// that either all bytes are written or an error is returned.
///
/// Implementations restrict the maximum number of buffers that can be
/// written at once to `16`.
fn send(&mut self, bufs: Vec<StableBuf>) -> impl Future<Output = Result<(), Error>> + Send;
}which forces taking ownership over the buffers, even though that's only required for io_uring (given that tokio's write_vectored function operates using the synchronous writev/pwritev syscalls.)
I'll think about this a bit. The current changes yield a big benefit for the base tokio network, but we still force a copy of the whole sent buffer(s) w/ io_uring.
We could potentially go all the way up the stack and require Sender::send + send_frame to take ownership over the data:
Lines 288 to 307 in 2be92d6
| /// Sends encrypted messages to a peer. | |
| pub struct Sender<O> { | |
| cipher: SendCipher, | |
| sink: O, | |
| max_message_size: usize, | |
| } | |
| impl<O: Sink> Sender<O> { | |
| /// Encrypts and sends a message to the peer. | |
| pub async fn send(&mut self, msg: &[u8]) -> Result<(), Error> { | |
| let c = self.cipher.send(msg)?; | |
| send_frame( | |
| &mut self.sink, | |
| &c, | |
| self.max_message_size + CIPHERTEXT_OVERHEAD, | |
| ) | |
| .await?; | |
| Ok(()) | |
| } | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, I think taking StableBuf all the way at the top works (if it means we avoid all these copies)!
Deploying monorepo with
|
| Latest commit: |
871532f
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://a21a4425.monorepo-eu0.pages.dev |
| Branch Preview URL: | https://cl-vectorized-sink.monorepo-eu0.pages.dev |
0f04a34 to
ec4d212
Compare
bbee56a to
f168651
Compare
f168651 to
f47a043
Compare
d5830bf to
871532f
Compare
| /// The buffer used for the operation, if any. | ||
| /// E.g. For read, this is the buffer being read into. | ||
| /// If None, the operation doesn't use a buffer (e.g. a sync operation). | ||
| /// We hold the buffer here so it's guaranteed to live until the operation | ||
| /// completes, preventing write-after-free issues. | ||
| pub buffer: Option<StableBuf>, | ||
| /// A boxed `Buf` to keep alive for the duration of the operation and return | ||
| /// to the caller. This is useful for vectored I/O where the original buffer | ||
| /// must remain valid and the caller needs it back to call `advance()`. | ||
| pub buf: Option<Box<dyn Buf + Send>>, | ||
| /// Additional data to keep alive for the duration of the operation but not | ||
| /// returned to the caller. Useful for things like iovec arrays that the kernel | ||
| /// references during the operation. | ||
| pub keepalive: Option<Box<dyn Send>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kept the original StableBuf in here given that storage still uses it; If we proceeded, we'd likely want to do a full swap of StableBuf -> Bytes/Buf throughout the storage and network primitives.
| prefixed_buf.extend_from_slice(buf); | ||
| sink.send(prefixed_buf).await.map_err(Error::SendFailed) | ||
| let len_bytes = len.to_be_bytes(); | ||
| sink.send(Bytes::from_owner(len_bytes).chain(buf)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this is the benefit of non-contiguous bytes here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Buf operates over chunks, so you can have a reader that spans non-contiguous memory. Really useful in cases like here where we want to not re-allocate a large slab. There's a few other areas of the codebase that this pattern would be nice to use in as well.
In general, anywhere you would use Vec<u8> / Bytes as an input argument, make it impl Buf, and always return Bytes > Vec<u8>.
Codecov Report❌ Patch coverage is
@@ Coverage Diff @@
## main #2518 +/- ##
==========================================
- Coverage 92.50% 92.49% -0.01%
==========================================
Files 340 345 +5
Lines 97412 98319 +907
==========================================
+ Hits 90110 90944 +834
- Misses 7302 7375 +73
... and 85 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Overview
Adjusts the interface of
runtime::Sinkto accept aBuf, allowing implementations to use vectorized write primitives (e.g.tokio'swrite_vectorized, oriouring'swritevop.)closes #784