Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: extend Buf length only after having read into it #7054

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions tokio/src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::io::blocking::{Buf, DEFAULT_MAX_BUF_SIZE};
use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
use crate::sync::Mutex;

use std::cmp;
use std::fmt;
use std::fs::{Metadata, Permissions};
use std::future::Future;
Expand Down Expand Up @@ -600,11 +601,14 @@ impl AsyncRead for File {
return Poll::Ready(Ok(()));
}

buf.ensure_capacity_for(dst, me.max_buf_size);
let std = me.std.clone();

let max_buf_size = cmp::min(dst.remaining(), me.max_buf_size);
inner.state = State::Busy(spawn_blocking(move || {
let res = buf.read_from(&mut &*std);
// SAFETY: the `Read` implementation of `std` does not
// read from the buffer it is borrowing and correctly
// reports the length of the data written into the buffer.
let res = unsafe { buf.read_from(&mut &*std, max_buf_size) };
(Operation::Read(res), buf)
}));
}
Expand Down
48 changes: 30 additions & 18 deletions tokio/src/io/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::cmp;
use std::future::Future;
use std::io;
use std::io::prelude::*;
use std::mem::MaybeUninit;
use std::pin::Pin;
use std::task::{ready, Context, Poll};

Expand Down Expand Up @@ -33,8 +34,13 @@ enum State<T> {

cfg_io_blocking! {
impl<T> Blocking<T> {
/// # Safety
///
/// The `Read` implementation of `inner` must never read from the buffer
/// it is borrowing and must correctly report the length of the data
/// written into the buffer.
#[cfg_attr(feature = "fs", allow(dead_code))]
pub(crate) fn new(inner: T) -> Blocking<T> {
pub(crate) unsafe fn new(inner: T) -> Blocking<T> {
Blocking {
inner: Some(inner),
state: State::Idle(Some(Buf::with_capacity(0))),
Expand Down Expand Up @@ -64,11 +70,12 @@ where
return Poll::Ready(Ok(()));
}

buf.ensure_capacity_for(dst, DEFAULT_MAX_BUF_SIZE);
let mut inner = self.inner.take().unwrap();

let max_buf_size = cmp::min(dst.remaining(), DEFAULT_MAX_BUF_SIZE);
self.state = State::Busy(sys::run(move || {
let res = buf.read_from(&mut inner);
// SAFETY: the requirements are satisfied by `Blocking::new`.
let res = unsafe { buf.read_from(&mut inner, max_buf_size) };
paolobarbolini marked this conversation as resolved.
Show resolved Hide resolved
(res, buf, inner)
}));
}
Expand Down Expand Up @@ -227,25 +234,30 @@ impl Buf {
&self.buf[self.pos..]
}

pub(crate) fn ensure_capacity_for(&mut self, bytes: &ReadBuf<'_>, max_buf_size: usize) {
/// # Safety
///
/// `rd` must not read from the buffer `read` is borrowing and must correctly
/// report the length of the data written into the buffer.
pub(crate) unsafe fn read_from<T: Read>(
&mut self,
rd: &mut T,
max_buf_size: usize,
) -> io::Result<usize> {
assert!(self.is_empty());
self.buf.reserve(max_buf_size);

let len = cmp::min(bytes.remaining(), max_buf_size);

if self.buf.len() < len {
self.buf.reserve(len - self.buf.len());
}

unsafe {
self.buf.set_len(len);
}
}

pub(crate) fn read_from<T: Read>(&mut self, rd: &mut T) -> io::Result<usize> {
let res = uninterruptibly!(rd.read(&mut self.buf));
let buf = &mut self.buf.spare_capacity_mut()[..max_buf_size];
// SAFETY: The memory may be uninitialized, but `rd.read` will only write to the buffer.
let buf = unsafe { &mut *(buf as *mut [MaybeUninit<u8>] as *mut [u8]) };
paolobarbolini marked this conversation as resolved.
Show resolved Hide resolved
let res = uninterruptibly!(rd.read(buf));

if let Ok(n) = res {
self.buf.truncate(n);
// SAFETY: the caller promises that `rd.read` initializes
// a section of `buf` and correctly reports that length.
// The `self.is_empty()` assertion verifies that `n`
// equals the length of the `buf` capacity that was written
// to (and that `buf` isn't being shrunk).
unsafe { self.buf.set_len(n) }
paolobarbolini marked this conversation as resolved.
Show resolved Hide resolved
} else {
self.buf.clear();
}
Expand Down
6 changes: 5 additions & 1 deletion tokio/src/io/stderr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,12 @@ cfg_io_std! {
/// ```
pub fn stderr() -> Stderr {
let std = io::stderr();
// SAFETY: The `Read` implementation of `std` does not read from the
// buffer it is borrowing and correctly reports the length of the data
// written into the buffer.
let blocking = unsafe { Blocking::new(std) };
Stderr {
std: SplitByUtf8BoundaryIfWindows::new(Blocking::new(std)),
std: SplitByUtf8BoundaryIfWindows::new(blocking),
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion tokio/src/io/stdin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@ cfg_io_std! {
/// user input and use blocking IO directly in that thread.
pub fn stdin() -> Stdin {
let std = io::stdin();
// SAFETY: The `Read` implementation of `std` does not read from the
// buffer it is borrowing and correctly reports the length of the data
// written into the buffer.
let std = unsafe { Blocking::new(std) };
Stdin {
std: Blocking::new(std),
std,
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion tokio/src/io/stdout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,12 @@ cfg_io_std! {
/// ```
pub fn stdout() -> Stdout {
let std = io::stdout();
// SAFETY: The `Read` implementation of `std` does not read from the
// buffer it is borrowing and correctly reports the length of the data
// written into the buffer.
let blocking = unsafe { Blocking::new(std) };
Stdout {
std: SplitByUtf8BoundaryIfWindows::new(Blocking::new(std)),
std: SplitByUtf8BoundaryIfWindows::new(blocking),
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion tokio/src/process/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,11 @@ where
use std::os::windows::prelude::FromRawHandle;

let raw = Arc::new(unsafe { StdFile::from_raw_handle(io.into_raw_handle()) });
let io = Blocking::new(ArcFile(raw.clone()));
let io = ArcFile(raw.clone());
// SAFETY: the `Read` implementation of `io` does not
// read from the buffer it is borrowing and correctly
// reports the length of the data written into the buffer.
let io = unsafe { Blocking::new(io) };
Ok(ChildStdio { raw, io })
}

Expand Down
Loading