Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tokio-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ futures-core = "0.3.0"
[dev-dependencies]
tokio = { version = "1.2.0", path = "../tokio", features = ["full"] }
futures-util = "0.3.0"
futures-test = "0.3.5"

[package.metadata.docs.rs]
all-features = true
Expand Down
130 changes: 123 additions & 7 deletions tokio-test/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use tokio::sync::mpsc;
use tokio::time::{self, Duration, Instant, Sleep};
use tokio_stream::wrappers::UnboundedReceiverStream;

use core::panic;
use futures_core::Stream;
use std::collections::VecDeque;
use std::fmt;
Expand Down Expand Up @@ -59,11 +60,13 @@ pub struct Builder {
enum Action {
Read(Vec<u8>),
Write(Vec<u8>),
Shutdown,
Wait(Duration),
// Wrapped in Arc so that Builder can be cloned and Send.
// Mock is not cloned as does not need to check Rc for ref counts.
ReadError(Option<Arc<io::Error>>),
WriteError(Option<Arc<io::Error>>),
ShutdownError(Option<Arc<io::Error>>),
}

struct Inner {
Expand All @@ -75,6 +78,13 @@ struct Inner {
name: String,
}

enum Shutdown {
ShouldSuccess,
ShouldError(io::Error),
NeedWait,
NoActions,
}

impl Builder {
/// Return a new, empty `Builder`.
pub fn new() -> Self {
Expand Down Expand Up @@ -129,6 +139,25 @@ impl Builder {
self
}

/// Sequence a shutdown operation.
///
/// The next operation in the mock's script will be to expect a
/// [`AsyncWrite::poll_shutdown`] call.
pub fn shutdown(&mut self) -> &mut Self {
self.actions.push_back(Action::Shutdown);
self
}

/// Sequence a shutdown operation that produces an error.
///
/// The next operation in the mock's script will be to expect a
/// [`AsyncWrite::poll_shutdown`] call that returns `error`.
pub fn shutdown_error(&mut self, error: io::Error) -> &mut Self {
let error = Some(error.into());
self.actions.push_back(Action::ShutdownError(error));
self
}

/// Set name of the mock IO object to include in panic messages and debug output
pub fn name(&mut self, name: impl Into<String>) -> &mut Self {
self.name = name.into();
Expand Down Expand Up @@ -197,6 +226,10 @@ impl Inner {

let rx = UnboundedReceiverStream::new(rx);

// Actually, we should deny any write action after the shutdown action.
// However, since we currently doesn't check the write action after the error
// like BrokenPipe error, we ignore this case to keep the behavior consistent.

let inner = Inner {
actions,
sleep: None,
Expand Down Expand Up @@ -235,10 +268,15 @@ impl Inner {
let err = Arc::try_unwrap(err).expect("There are no other references.");
Err(err)
}
Some(_) => {
Some(&mut Action::Write(_))
| Some(&mut Action::WriteError(_))
| Some(&mut Action::Wait(_)) => {
// Either waiting or expecting a write
Err(io::ErrorKind::WouldBlock.into())
}
Some(&mut Action::Shutdown) | Some(&mut Action::ShutdownError(_)) => {
panic!("unexpected read, expect poll_shutdown");
}
None => Ok(()),
}
}
Expand Down Expand Up @@ -280,10 +318,11 @@ impl Inner {
Action::Wait(..) | Action::WriteError(..) => {
break;
}
_ => {}
Action::Read(_) | Action::ReadError(_) => (),
Action::Shutdown | Action::ShutdownError(_) => {
panic!("unexpected write, expect poll_shutdown");
}
}

// TODO: remove write
}

Ok(ret)
Expand All @@ -296,6 +335,25 @@ impl Inner {
}
}

fn shutdown(&mut self) -> Shutdown {
match self.action() {
Some(&mut Action::Shutdown) => Shutdown::ShouldSuccess,
Some(&mut Action::ShutdownError(ref mut err)) => {
let err = err.take().expect("Should have been removed from actions.");
let err = Arc::try_unwrap(err).expect("There are no other references.");
Shutdown::ShouldError(err)
}
Some(&mut Action::Read(_)) | Some(&mut Action::ReadError(_)) => {
panic!("unexpected poll_shutdown, expect read");
}
Some(&mut Action::Write(_)) | Some(&mut Action::WriteError(_)) => {
panic!("unexpected poll_shutdown, expect write");
}
Some(&mut Action::Wait(_)) => Shutdown::NeedWait,
None => Shutdown::NoActions,
}
}

fn action(&mut self) -> Option<&mut Action> {
loop {
if self.actions.is_empty() {
Expand Down Expand Up @@ -332,6 +390,12 @@ impl Inner {
break;
}
}
Action::Shutdown => break,
Action::ShutdownError(ref mut error) => {
if error.is_some() {
break;
}
}
}

let _action = self.actions.pop_front();
Expand Down Expand Up @@ -441,6 +505,7 @@ impl AsyncWrite for Mock {
panic!("unexpected WouldBlock {}", self.pmsg());
}
}
Ok(0) if buf.is_empty() => return Poll::Ready(Ok(0)),
Ok(0) => {
// TODO: Is this correct?
if !self.inner.actions.is_empty() {
Expand Down Expand Up @@ -470,8 +535,55 @@ impl AsyncWrite for Mock {
Poll::Ready(Ok(()))
}

fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
loop {
if let Some(ref mut sleep) = self.inner.sleep {
ready!(Pin::new(sleep).poll(cx));
}

// If a sleep is set, it has already fired
self.inner.sleep = None;

match self.inner.shutdown() {
Shutdown::ShouldSuccess => {
assert!(matches!(
self.inner.actions.pop_front(),
Some(Action::Shutdown)
));
self.maybe_wakeup_reader();
return Poll::Ready(Ok(()));
}
Shutdown::ShouldError(e) => {
assert!(matches!(
self.inner.actions.pop_front(),
Some(Action::ShutdownError(_))
));
self.maybe_wakeup_reader();
return Poll::Ready(Err(e));
}
Shutdown::NeedWait => {
if let Some(rem) = self.inner.remaining_wait() {
let until = Instant::now() + rem;
self.inner.sleep = Some(Box::pin(time::sleep_until(until)));
} else {
panic!(
"unexpected poll_shutdown, expect read or write {}",
self.pmsg()
);
}
}
Shutdown::NoActions => {
if let Some(action) = ready!(self.inner.poll_action(cx)) {
self.inner.actions.push_back(action);
} else {
panic!(
"unexpected poll_shutdown, but actually no more sequenced actions {}",
self.pmsg()
);
}
}
}
}
}
}

Expand All @@ -494,7 +606,11 @@ impl Drop for Mock {
"There is still data left to write. {}",
self.pmsg()
),
_ => (),
Action::Shutdown => panic!("AsyncWrite::poll_shutdown was not called. {}", self.pmsg()),
Action::ReadError(_) | Action::WriteError(_) | Action::Wait(_) => (),
// Since the existing implementation ignores the read/write error, so we also ignore the
// shutdown error here to keep the behavior consistent.
Action::ShutdownError(_) => (),
});
}
}
Expand Down
128 changes: 128 additions & 0 deletions tokio-test/tests/io.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
#![warn(rust_2018_idioms)]

use futures_test::task::{noop_context, panic_waker};
use futures_util::pin_mut;
use std::future::Future;
use std::io;
use std::task::{Context, Poll};
use tokio::io::AsyncWrite;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::time::{Duration, Instant};
use tokio_test::io::Builder;
use tokio_test::{assert_pending, assert_ready};

#[tokio::test]
async fn read() {
Expand Down Expand Up @@ -170,3 +176,125 @@ async fn multiple_wait() {
start.elapsed().as_millis()
);
}

// No matter which usecase, it doesn't make sense for a read
// hang forever. However, currently, if there is no sequenced read
// action, it will hang forever.
//
// Since we want be aware of the fixing of this bug,
// no matter intentionally or unintentionally,
// we add this test to catch the behavior change.
//
// It looks like fixing it is not hard, but not sure the downstream
// impact, which might be a breaking change due to the
// `Mock::inner::read_wait` field, so we keep it as is for now.
//
// TODO: fix this bug
#[test]
fn should_hang_forever_on_read_but_no_sequenced_read_action() {
let mut mock = Builder::new()
.write_error(io::Error::new(io::ErrorKind::Other, "cruel"))
.build();

let mut buf = [0; 1];
let read_exact_fut = mock.read(&mut buf);
pin_mut!(read_exact_fut);
assert_pending!(read_exact_fut.poll(&mut Context::from_waker(&panic_waker())));
}

// The `Mock` is expected to always panic if there is an unconsumed error action,
// rather than silently ignoring it. However,
// currently it only panics on unconsumed read/write actions,
// not on error actions. Fixing this requires a breaking change.
//
// This test verifies that it does not panic yet,
// to prevent accidentally introducing the breaking change prematurely.
//
// TODO: fix this bug in the next major release
#[test]
fn do_not_panic_unconsumed_error() {
let _mock = Builder::new()
.read_error(io::Error::new(io::ErrorKind::Other, "cruel"))
.build();
}

// The `Mock` must never panic, even if cloned multiple times.
// However, at present, cloning the builder under certain
// conditions causes a panic.
//
// Fixing this would require making `Mock` non-`Clone`,
// which is a breaking change.
//
// Since we want be aware of the fixing of this bug,
// no matter intentionally or unintentionally,
// we add this test to catch the behavior change.
//
// TODO: fix this bug in the next major release
#[tokio::test]
#[should_panic = "There are no other references.: Custom { kind: Other, error: \"cruel\" }"]
async fn should_panic_if_clone_the_builder_with_error_action() {
let mut builder = Builder::new();
builder.write_error(io::Error::new(io::ErrorKind::Other, "cruel"));
let mut builder2 = builder.clone();

let mut mock = builder.build();
let _mock2 = builder2.build();

// this write_all will panic due to unwrapping the error from `Arc`
mock.write_all(b"hello").await.unwrap();
unreachable!();
}

#[tokio::test]
async fn should_not_hang_forever_on_zero_length_write() {
let mock = Builder::new().write(b"write").build();
pin_mut!(mock);
match mock.as_mut().poll_write(&mut noop_context(), &[0u8; 0]) {
// drain the remaining write action to avoid panic at drop of the `mock`
Poll::Ready(Ok(0)) => mock.write_all(b"write").await.unwrap(),
Poll::Ready(Ok(n)) => panic!("expected to write 0 bytes, wrote {n} bytes instead"),
Poll::Ready(Err(e)) => panic!("expected to write 0 bytes, got error {e} instead"),
Poll::Pending => panic!("expected to write 0 bytes immediately, but pending instead"),
}
}

#[tokio::test]
async fn shutdown() {
let mut mock = Builder::new().shutdown().build();
mock.shutdown().await.unwrap();
}

#[tokio::test]
async fn shutdown_error() {
let error = io::Error::new(io::ErrorKind::Other, "cruel");
let mut mock = Builder::new().shutdown_error(error).build();
let err = mock.shutdown().await.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::Other);
assert_eq!("cruel", format!("{err}"));
}

#[tokio::test(start_paused = true)]
async fn shutdown_wait() {
const WAIT: Duration = Duration::from_secs(1);

let mock = Builder::new().wait(WAIT).shutdown().build();
pin_mut!(mock);

assert_pending!(mock.as_mut().poll_shutdown(&mut noop_context()));

tokio::time::advance(WAIT).await;
let _ = assert_ready!(mock.as_mut().poll_shutdown(&mut noop_context()));
}

#[test]
#[should_panic = "AsyncWrite::poll_shutdown was not called. (1 actions remain)"]
fn should_panic_on_leftover_shutdown_action() {
let _mock = Builder::new().shutdown().build();
}

#[test]
fn should_not_panic_on_leftover_shutdown_error_action() {
let _mock = Builder::new()
.shutdown_error(io::Error::new(io::ErrorKind::Other, "cruel"))
.build();
}
Loading