Skip to content

Commit

Permalink
io: extend Buf length only after having read into it
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini committed Jan 8, 2025
1 parent bd3e857 commit be3cc8c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 20 deletions.
8 changes: 6 additions & 2 deletions tokio/src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::io::blocking::{Buf, DEFAULT_MAX_BUF_SIZE};
use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
use crate::sync::Mutex;

use std::cmp;
use std::fmt;
use std::fs::{Metadata, Permissions};
use std::future::Future;
Expand Down Expand Up @@ -600,11 +601,14 @@ impl AsyncRead for File {
return Poll::Ready(Ok(()));
}

buf.ensure_capacity_for(dst, me.max_buf_size);
let std = me.std.clone();

let max_buf_size = cmp::min(dst.remaining(), me.max_buf_size);
inner.state = State::Busy(spawn_blocking(move || {
let res = buf.read_from(&mut &*std);
// SAFETY: `std` does not read from the buffer
// borrowed to `read` and correctly reports the
// length of the written section.
let res = unsafe { buf.read_from(&mut &*std, max_buf_size) };
(Operation::Read(res), buf)
}));
}
Expand Down
48 changes: 30 additions & 18 deletions tokio/src/io/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::cmp;
use std::future::Future;
use std::io;
use std::io::prelude::*;
use std::mem::MaybeUninit;
use std::pin::Pin;
use std::task::{ready, Context, Poll};

Expand Down Expand Up @@ -33,8 +34,13 @@ enum State<T> {

cfg_io_blocking! {
impl<T> Blocking<T> {
/// # Safety
///
/// The `Read` implementation of `inner` must never read the
/// contents of the buffer borrowed to it and must correctly
/// report the length of the written section.
#[cfg_attr(feature = "fs", allow(dead_code))]
pub(crate) fn new(inner: T) -> Blocking<T> {
pub(crate) unsafe fn new(inner: T) -> Blocking<T> {
Blocking {
inner: Some(inner),
state: State::Idle(Some(Buf::with_capacity(0))),
Expand Down Expand Up @@ -64,11 +70,12 @@ where
return Poll::Ready(Ok(()));
}

buf.ensure_capacity_for(dst, DEFAULT_MAX_BUF_SIZE);
let mut inner = self.inner.take().unwrap();

let max_buf_size = cmp::min(dst.remaining(), DEFAULT_MAX_BUF_SIZE);
self.state = State::Busy(sys::run(move || {
let res = buf.read_from(&mut inner);
// SAFETY: the requirements are satisfied by `Blocking::new`.
let res = unsafe { buf.read_from(&mut inner, max_buf_size) };
(res, buf, inner)
}));
}
Expand Down Expand Up @@ -227,25 +234,30 @@ impl Buf {
&self.buf[self.pos..]
}

pub(crate) fn ensure_capacity_for(&mut self, bytes: &ReadBuf<'_>, max_buf_size: usize) {
/// # Safety
///
/// `rd` must not read from the buffer borrowed to `read` and must
/// correctly report the length of the written section.
pub(crate) unsafe fn read_from<T: Read>(
&mut self,
rd: &mut T,
max_buf_size: usize,
) -> io::Result<usize> {
assert!(self.is_empty());
self.buf.reserve(max_buf_size);

let len = cmp::min(bytes.remaining(), max_buf_size);

if self.buf.len() < len {
self.buf.reserve(len - self.buf.len());
}

unsafe {
self.buf.set_len(len);
}
}

pub(crate) fn read_from<T: Read>(&mut self, rd: &mut T) -> io::Result<usize> {
let res = uninterruptibly!(rd.read(&mut self.buf));
let buf = &mut self.buf.spare_capacity_mut()[..max_buf_size];
// SAFETY: The memory may be uninitialized, but `rd.read` will only write to the buffer.
let buf = unsafe { &mut *(buf as *mut [MaybeUninit<u8>] as *mut [u8]) };
let res = uninterruptibly!(rd.read(buf));

if let Ok(n) = res {
self.buf.truncate(n);
// SAFETY: the caller promises that `rd.read` initializes
// a section of `buf` and correctly reports that length.
// The `self.is_empty()` assertion verifies that `n`
// equals the length of the `buf` capacity that was written
// to (and that `buf` isn't being shrunk).
unsafe { self.buf.set_len(n) }
} else {
self.buf.clear();
}
Expand Down

0 comments on commit be3cc8c

Please sign in to comment.