diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 1a29979aa2..fa1ec93665 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -39,7 +39,7 @@ jobs: - stable - beta - nightly - - 1.39.0 + - 1.45.2 os: - ubuntu-latest diff --git a/Cargo.toml b/Cargo.toml index 497b891ec0..d148078083 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,4 @@ + [package] name = "hyper" version = "0.14.0-dev" # don't forget to update html_root_url @@ -30,12 +31,12 @@ http = "0.2" http-body = "0.3.1" httpdate = "0.3" httparse = "1.0" -h2 = "0.2.2" +h2 = { git = "https://github.com/hyperium/h2" } itoa = "0.4.1" tracing = { version = "0.1", default-features = false, features = ["log", "std"] } pin-project = "1.0" tower-service = "0.3" -tokio = { version = "0.2.11", features = ["sync"] } +tokio = { version = "0.3", features = ["sync", "stream"] } want = "0.3" # Optional @@ -51,9 +52,18 @@ spmc = "0.3" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" -tokio = { version = "0.2.2", features = ["fs", "macros", "io-std", "rt-util", "sync", "time", "test-util"] } -tokio-test = "0.2" -tokio-util = { version = "0.3", features = ["codec"] } +tokio = { version = "0.3", features = [ + "fs", + "macros", + "io-std", + "rt", + "rt-multi-thread", # so examples can use #[tokio::main] + "sync", + "time", + "test-util", +] } +tokio-test = "0.3" +tokio-util = { version = "0.4", features = ["codec"] } tower-util = "0.3" url = "1.0" @@ -67,12 +77,12 @@ default = [ ] runtime = [ "tcp", - "tokio/rt-core", + "tokio/rt", ] tcp = [ "socket2", - "tokio/blocking", - "tokio/tcp", + "tokio/net", + "tokio/rt", "tokio/time", ] @@ -219,4 +229,3 @@ required-features = ["runtime", "stream"] name = "server" path = "tests/server.rs" required-features = ["runtime"] - diff --git a/benches/body.rs b/benches/body.rs index 6c25dfbe2c..255914d7a8 100644 --- a/benches/body.rs +++ b/benches/body.rs @@ -10,8 +10,7 @@ use hyper::body::Body; macro_rules! bench_stream { ($bencher:ident, bytes: $bytes:expr, count: $count:expr, $total_ident:ident, $body_pat:pat, $block:expr) => {{ - let mut rt = tokio::runtime::Builder::new() - .basic_scheduler() + let rt = tokio::runtime::Builder::new_current_thread() .build() .expect("rt build"); diff --git a/benches/connect.rs b/benches/connect.rs index c8ca68a769..eafb8fd34d 100644 --- a/benches/connect.rs +++ b/benches/connect.rs @@ -12,12 +12,11 @@ use tokio::net::TcpListener; #[bench] fn http_connector(b: &mut test::Bencher) { let _ = pretty_env_logger::try_init(); - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .basic_scheduler() .build() .expect("rt build"); - let mut listener = rt + let listener = rt .block_on(TcpListener::bind(&SocketAddr::from(([127, 0, 0, 1], 0)))) .expect("bind"); let addr = listener.local_addr().expect("local_addr"); diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index f6ff2cfda4..6376697afc 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -270,14 +270,16 @@ impl Opts { } fn bench(self, b: &mut test::Bencher) { + use std::sync::Arc; let _ = pretty_env_logger::try_init(); // Create a runtime of current thread. - let mut rt = tokio::runtime::Builder::new() - .enable_all() - .basic_scheduler() - .build() - .expect("rt build"); - let exec = rt.handle().clone(); + let rt = Arc::new( + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("rt build"), + ); + let exec = rt.clone(); let req_len = self.request_body.map(|b| b.len()).unwrap_or(0) as u64; let req_len = if self.request_chunks > 0 { @@ -288,7 +290,7 @@ impl Opts { let bytes_per_iter = (req_len + self.response_body.len() as u64) * self.parallel_cnt as u64; b.bytes = bytes_per_iter; - let addr = spawn_server(&mut rt, &self); + let addr = spawn_server(&rt, &self); let connector = HttpConnector::new(); let client = hyper::Client::builder() @@ -351,7 +353,7 @@ impl Opts { } } -fn spawn_server(rt: &mut tokio::runtime::Runtime, opts: &Opts) -> SocketAddr { +fn spawn_server(rt: &tokio::runtime::Runtime, opts: &Opts) -> SocketAddr { use hyper::service::{make_service_fn, service_fn}; let addr = "127.0.0.1:0".parse().unwrap(); diff --git a/benches/pipeline.rs b/benches/pipeline.rs index 2d23b4ad96..40b0b31d22 100644 --- a/benches/pipeline.rs +++ b/benches/pipeline.rs @@ -31,9 +31,8 @@ fn hello_world(b: &mut test::Bencher) { })) }); - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .basic_scheduler() .build() .expect("rt build"); let srv = rt.block_on(async move { diff --git a/benches/server.rs b/benches/server.rs index 8bc1bbb8fa..7ca0d0896a 100644 --- a/benches/server.rs +++ b/benches/server.rs @@ -34,9 +34,8 @@ macro_rules! bench_server { })) }); - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .basic_scheduler() .build() .expect("rt build"); @@ -185,6 +184,7 @@ fn raw_tcp_throughput_large_payload(b: &mut test::Bencher) { let mut buf = [0u8; 8192]; while rx.try_recv().is_err() { let r = sock.read(&mut buf).unwrap(); + extern crate test; if r == 0 { break; } diff --git a/examples/single_threaded.rs b/examples/single_threaded.rs index 38a7052ca1..89f4953c71 100644 --- a/examples/single_threaded.rs +++ b/examples/single_threaded.rs @@ -10,15 +10,14 @@ fn main() { pretty_env_logger::init(); // Configure a runtime that runs everything on the current thread - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .basic_scheduler() .build() .expect("build runtime"); // Combine it with a `LocalSet, which means it can spawn !Send futures... let local = tokio::task::LocalSet::new(); - local.block_on(&mut rt, run()); + local.block_on(&rt, run()); } async fn run() { diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index c1cdf4e129..639a21acf7 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -12,8 +12,8 @@ use std::time::Duration; use futures_util::future::Either; use http::uri::{Scheme, Uri}; use pin_project::pin_project; -use tokio::net::TcpStream; -use tokio::time::Delay; +use tokio::net::{TcpSocket, TcpStream}; +use tokio::time::Sleep; use super::dns::{self, resolve, GaiResolver, Resolve}; use super::{Connected, Connection}; @@ -331,34 +331,9 @@ where dns::IpAddrs::new(addrs) }; - let c = ConnectingTcp::new( - config.local_address_ipv4, - config.local_address_ipv6, - addrs, - config.connect_timeout, - config.happy_eyeballs_timeout, - config.reuse_address, - ); - - let sock = c - .connect() - .await - .map_err(ConnectError::m("tcp connect error"))?; - - if let Some(dur) = config.keep_alive_timeout { - sock.set_keepalive(Some(dur)) - .map_err(ConnectError::m("tcp set_keepalive error"))?; - } - - if let Some(size) = config.send_buffer_size { - sock.set_send_buffer_size(size) - .map_err(ConnectError::m("tcp set_send_buffer_size error"))?; - } + let c = ConnectingTcp::new(addrs, config); - if let Some(size) = config.recv_buffer_size { - sock.set_recv_buffer_size(size) - .map_err(ConnectError::m("tcp set_recv_buffer_size error"))?; - } + let sock = c.connect().await?; sock.set_nodelay(config.nodelay) .map_err(ConnectError::m("tcp set_nodelay error"))?; @@ -475,60 +450,45 @@ impl StdError for ConnectError { } } -struct ConnectingTcp { - local_addr_ipv4: Option, - local_addr_ipv6: Option, +struct ConnectingTcp<'a> { preferred: ConnectingTcpRemote, fallback: Option, - reuse_address: bool, + config: &'a Config, } -impl ConnectingTcp { - fn new( - local_addr_ipv4: Option, - local_addr_ipv6: Option, - remote_addrs: dns::IpAddrs, - connect_timeout: Option, - fallback_timeout: Option, - reuse_address: bool, - ) -> ConnectingTcp { - if let Some(fallback_timeout) = fallback_timeout { - let (preferred_addrs, fallback_addrs) = - remote_addrs.split_by_preference(local_addr_ipv4, local_addr_ipv6); +impl<'a> ConnectingTcp<'a> { + fn new(remote_addrs: dns::IpAddrs, config: &'a Config) -> Self { + if let Some(fallback_timeout) = config.happy_eyeballs_timeout { + let (preferred_addrs, fallback_addrs) = remote_addrs + .split_by_preference(config.local_address_ipv4, config.local_address_ipv6); if fallback_addrs.is_empty() { return ConnectingTcp { - local_addr_ipv4, - local_addr_ipv6, - preferred: ConnectingTcpRemote::new(preferred_addrs, connect_timeout), + preferred: ConnectingTcpRemote::new(preferred_addrs, config.connect_timeout), fallback: None, - reuse_address, + config, }; } ConnectingTcp { - local_addr_ipv4, - local_addr_ipv6, - preferred: ConnectingTcpRemote::new(preferred_addrs, connect_timeout), + preferred: ConnectingTcpRemote::new(preferred_addrs, config.connect_timeout), fallback: Some(ConnectingTcpFallback { - delay: tokio::time::delay_for(fallback_timeout), - remote: ConnectingTcpRemote::new(fallback_addrs, connect_timeout), + delay: tokio::time::sleep(fallback_timeout), + remote: ConnectingTcpRemote::new(fallback_addrs, config.connect_timeout), }), - reuse_address, + config, } } else { ConnectingTcp { - local_addr_ipv4, - local_addr_ipv6, - preferred: ConnectingTcpRemote::new(remote_addrs, connect_timeout), + preferred: ConnectingTcpRemote::new(remote_addrs, config.connect_timeout), fallback: None, - reuse_address, + config, } } } } struct ConnectingTcpFallback { - delay: Delay, + delay: Sleep, remote: ConnectingTcpRemote, } @@ -549,24 +509,11 @@ impl ConnectingTcpRemote { } impl ConnectingTcpRemote { - async fn connect( - &mut self, - local_addr_ipv4: &Option, - local_addr_ipv6: &Option, - reuse_address: bool, - ) -> io::Result { + async fn connect(&mut self, config: &Config) -> Result { let mut err = None; for addr in &mut self.addrs { debug!("connecting to {}", addr); - match connect( - &addr, - local_addr_ipv4, - local_addr_ipv6, - reuse_address, - self.connect_timeout, - )? - .await - { + match connect(&addr, config, self.connect_timeout)?.await { Ok(tcp) => { debug!("connected to {}", addr); return Ok(tcp); @@ -580,9 +527,9 @@ impl ConnectingTcpRemote { match err { Some(e) => Err(e), - None => Err(std::io::Error::new( - std::io::ErrorKind::NotConnected, - "Network unreachable", + None => Err(ConnectError::new( + "tcp connect error", + std::io::Error::new(std::io::ErrorKind::NotConnected, "Network unreachable"), )), } } @@ -618,30 +565,79 @@ fn bind_local_address( fn connect( addr: &SocketAddr, - local_addr_ipv4: &Option, - local_addr_ipv6: &Option, - reuse_address: bool, + config: &Config, connect_timeout: Option, -) -> io::Result>> { +) -> Result>, ConnectError> { + // TODO(eliza): if Tokio's `TcpSocket` gains support for setting the + // keepalive timeout and send/recv buffer size, it would be nice to use that + // instead of socket2, and avoid the unsafe `into_raw_fd`/`from_raw_fd` + // dance... use socket2::{Domain, Protocol, Socket, Type}; let domain = match *addr { SocketAddr::V4(_) => Domain::ipv4(), SocketAddr::V6(_) => Domain::ipv6(), }; - let socket = Socket::new(domain, Type::stream(), Some(Protocol::tcp()))?; - - if reuse_address { - socket.set_reuse_address(true)?; - } - - bind_local_address(&socket, addr, local_addr_ipv4, local_addr_ipv6)?; - - let addr = *addr; - - let std_tcp = socket.into_tcp_stream(); - + let socket = Socket::new(domain, Type::stream(), Some(Protocol::tcp())) + .map_err(ConnectError::m("tcp open error"))?; + + if config.reuse_address { + socket + .set_reuse_address(true) + .map_err(ConnectError::m("tcp set_reuse_address error"))?; + } + + // When constructing a Tokio `TcpSocket` from a raw fd/socket, the user is + // responsible for ensuring O_NONBLOCK is set. + socket + .set_nonblocking(true) + .map_err(ConnectError::m("tcp set_nonblocking error"))?; + + bind_local_address( + &socket, + addr, + &config.local_address_ipv4, + &config.local_address_ipv6, + ) + .map_err(ConnectError::m("tcp bind local error"))?; + + if let Some(dur) = config.keep_alive_timeout { + socket + .set_keepalive(Some(dur)) + .map_err(ConnectError::m("tcp set_keepalive error"))?; + } + + if let Some(size) = config.send_buffer_size { + socket + .set_send_buffer_size(size) + .map_err(ConnectError::m("tcp set_send_buffer_size error"))?; + } + + if let Some(size) = config.recv_buffer_size { + socket + .set_recv_buffer_size(size) + .map_err(ConnectError::m("tcp set_recv_buffer_size error"))?; + } + + #[cfg(unix)] + let socket = unsafe { + // Safety: `from_raw_fd` is only safe to call if ownership of the raw + // file descriptor is transferred. Since we call `into_raw_fd` on the + // socket2 socket, it gives up ownership of the fd and will not close + // it, so this is safe. + use std::os::unix::io::{FromRawFd, IntoRawFd}; + TcpSocket::from_raw_fd(socket.into_raw_fd()) + }; + #[cfg(windows)] + let socket = unsafe { + // Safety: `from_raw_socket` is only safe to call if ownership of the raw + // Windows SOCKET is transferred. Since we call `into_raw_socket` on the + // socket2 socket, it gives up ownership of the SOCKET and will not close + // it, so this is safe. + use std::os::windows::io::{FromRawSocket, IntoRawSocket}; + TcpSocket::from_raw_socket(socket.into_raw_socket()) + }; + let connect = socket.connect(*addr); Ok(async move { - let connect = TcpStream::connect_std(std_tcp, &addr); match connect_timeout { Some(dur) => match tokio::time::timeout(dur, connect).await { Ok(Ok(s)) => Ok(s), @@ -650,33 +646,19 @@ fn connect( }, None => connect.await, } + .map_err(ConnectError::m("tcp connect error")) }) } -impl ConnectingTcp { - async fn connect(mut self) -> io::Result { - let Self { - ref local_addr_ipv4, - ref local_addr_ipv6, - reuse_address, - .. - } = self; +impl ConnectingTcp<'_> { + async fn connect(mut self) -> Result { match self.fallback { - None => { - self.preferred - .connect(local_addr_ipv4, local_addr_ipv6, reuse_address) - .await - } + None => self.preferred.connect(self.config).await, Some(mut fallback) => { - let preferred_fut = - self.preferred - .connect(local_addr_ipv4, local_addr_ipv6, reuse_address); + let preferred_fut = self.preferred.connect(self.config); futures_util::pin_mut!(preferred_fut); - let fallback_fut = - fallback - .remote - .connect(local_addr_ipv4, local_addr_ipv6, reuse_address); + let fallback_fut = fallback.remote.connect(self.config); futures_util::pin_mut!(fallback_fut); let (result, future) = @@ -711,7 +693,7 @@ mod tests { use ::http::Uri; use super::super::sealed::{Connect, ConnectSvc}; - use super::HttpConnector; + use super::{Config, ConnectError, HttpConnector}; async fn connect( connector: C, @@ -773,6 +755,7 @@ mod tests { #[tokio::test] async fn local_address() { use std::net::{IpAddr, TcpListener}; + let _ = pretty_env_logger::try_init(); let (bind_ip_v4, bind_ip_v6) = get_local_ips(); let server4 = TcpListener::bind("127.0.0.1:0").unwrap(); @@ -818,10 +801,8 @@ mod tests { let server4 = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server4.local_addr().unwrap(); let _server6 = TcpListener::bind(&format!("[::1]:{}", addr.port())).unwrap(); - let mut rt = tokio::runtime::Builder::new() - .enable_io() - .enable_time() - .basic_scheduler() + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() .build() .unwrap(); @@ -925,16 +906,21 @@ mod tests { .iter() .map(|host| (host.clone(), addr.port()).into()) .collect(); - let connecting_tcp = ConnectingTcp::new( - None, - None, - dns::IpAddrs::new(addrs), - None, - Some(fallback_timeout), - false, - ); + let cfg = Config { + local_address_ipv4: None, + local_address_ipv6: None, + connect_timeout: None, + keep_alive_timeout: None, + happy_eyeballs_timeout: Some(fallback_timeout), + nodelay: false, + reuse_address: false, + enforce_http: false, + send_buffer_size: None, + recv_buffer_size: None, + }; + let connecting_tcp = ConnectingTcp::new(dns::IpAddrs::new(addrs), &cfg); let start = Instant::now(); - Ok::<_, io::Error>((start, connecting_tcp.connect().await?)) + Ok::<_, ConnectError>((start, ConnectingTcp::connect(connecting_tcp).await?)) }) .unwrap(); let res = if stream.peer_addr().unwrap().is_ipv4() { diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 9a580f85ac..88b2daac95 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -1,4 +1,5 @@ use futures_util::future; +use tokio::stream::Stream; use tokio::sync::{mpsc, oneshot}; use crate::common::{task, Future, Pin, Poll}; @@ -131,22 +132,25 @@ impl Clone for UnboundedSender { } } +#[pin_project::pin_project(PinnedDrop)] pub struct Receiver { + #[pin] inner: mpsc::UnboundedReceiver>, taker: want::Taker, } impl Receiver { pub(crate) fn poll_next( - &mut self, + self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll)>> { - match self.inner.poll_recv(cx) { + let this = self.project(); + match this.inner.poll_next(cx) { Poll::Ready(item) => { Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped"))) } Poll::Pending => { - self.taker.want(); + this.taker.want(); Poll::Pending } } @@ -165,11 +169,12 @@ impl Receiver { } } -impl Drop for Receiver { - fn drop(&mut self) { +#[pin_project::pinned_drop] +impl PinnedDrop for Receiver { + fn drop(mut self: Pin<&mut Self>) { // Notify the giver about the closure first, before dropping // the mpsc::Receiver. - self.taker.cancel(); + self.as_mut().taker.cancel(); } } @@ -262,7 +267,7 @@ mod tests { impl Future for Receiver { type Output = Option<(T, Callback)>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.poll_next(cx) } } @@ -344,9 +349,8 @@ mod tests { fn giver_queue_throughput(b: &mut test::Bencher) { use crate::{Body, Request, Response}; - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .basic_scheduler() .build() .unwrap(); let (mut tx, mut rx) = channel::, Response>(); @@ -368,9 +372,8 @@ mod tests { #[cfg(feature = "nightly")] #[bench] fn giver_queue_not_ready(b: &mut test::Bencher) { - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .basic_scheduler() .build() .unwrap(); let (_tx, mut rx) = channel::(); diff --git a/src/client/pool.rs b/src/client/pool.rs index 8c1ee24c0d..52288a9fa7 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -706,12 +706,15 @@ impl Expiration { } #[cfg(feature = "runtime")] +#[pin_project::pin_project] struct IdleTask { + #[pin] interval: Interval, pool: WeakOpt>>, // This allows the IdleTask to be notified as soon as the entire // Pool is fully dropped, and shutdown. This channel is never sent on, // but Err(Canceled) will be received when the Pool is dropped. + #[pin] pool_drop_notifier: oneshot::Receiver, } @@ -719,9 +722,11 @@ struct IdleTask { impl Future for IdleTask { type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + use tokio::stream::Stream; + let mut this = self.project(); loop { - match Pin::new(&mut self.pool_drop_notifier).poll(cx) { + match this.pool_drop_notifier.as_mut().poll(cx) { Poll::Ready(Ok(n)) => match n {}, Poll::Pending => (), Poll::Ready(Err(_canceled)) => { @@ -730,9 +735,9 @@ impl Future for IdleTask { } } - ready!(self.interval.poll_tick(cx)); + ready!(this.interval.as_mut().poll_next(cx)); - if let Some(inner) = self.pool.upgrade() { + if let Some(inner) = this.pool.upgrade() { if let Ok(mut inner) = inner.lock() { trace!("idle interval checking for expired"); inner.clear_expired(); @@ -850,7 +855,7 @@ mod tests { let pooled = pool.pooled(c(key.clone()), Uniq(41)); drop(pooled); - tokio::time::delay_for(pool.locked().timeout.unwrap()).await; + tokio::time::sleep(pool.locked().timeout.unwrap()).await; let mut checkout = pool.checkout(key); let poll_once = PollOnce(&mut checkout); let is_not_ready = poll_once.await.is_none(); @@ -871,7 +876,7 @@ mod tests { pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3) ); - tokio::time::delay_for(pool.locked().timeout.unwrap()).await; + tokio::time::sleep(pool.locked().timeout.unwrap()).await; let mut checkout = pool.checkout(key.clone()); let poll_once = PollOnce(&mut checkout); diff --git a/src/common/drain.rs b/src/common/drain.rs index 7abb9f9ded..e98f5b3453 100644 --- a/src/common/drain.rs +++ b/src/common/drain.rs @@ -1,20 +1,13 @@ use std::mem; use pin_project::pin_project; +use tokio::stream::Stream; use tokio::sync::{mpsc, watch}; use super::{task, Future, Never, Pin, Poll}; -// Sentinel value signaling that the watch is still open -#[derive(Clone, Copy)] -enum Action { - Open, - // Closed isn't sent via the `Action` type, but rather once - // the watch::Sender is dropped. -} - pub fn channel() -> (Signal, Watch) { - let (tx, rx) = watch::channel(Action::Open); + let (tx, rx) = watch::channel(()); let (drained_tx, drained_rx) = mpsc::channel(1); ( Signal { @@ -27,17 +20,19 @@ pub fn channel() -> (Signal, Watch) { pub struct Signal { drained_rx: mpsc::Receiver, - _tx: watch::Sender, + _tx: watch::Sender<()>, } +#[pin_project::pin_project] pub struct Draining { + #[pin] drained_rx: mpsc::Receiver, } #[derive(Clone)] pub struct Watch { drained_tx: mpsc::Sender, - rx: watch::Receiver, + rx: watch::Receiver<()>, } #[allow(missing_debug_implementations)] @@ -46,7 +41,8 @@ pub struct Watching { #[pin] future: F, state: State, - watch: Watch, + watch: Pin + Send + Sync>>, + _drained_tx: mpsc::Sender, } enum State { @@ -66,8 +62,8 @@ impl Signal { impl Future for Draining { type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - match ready!(self.drained_rx.poll_recv(cx)) { + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + match ready!(self.project().drained_rx.poll_next(cx)) { Some(never) => match never {}, None => Poll::Ready(()), } @@ -80,10 +76,14 @@ impl Watch { F: Future, FN: FnOnce(Pin<&mut F>), { + let Self { drained_tx, mut rx } = self; Watching { future, state: State::Watch(on_drain), - watch: self, + watch: Box::pin(async move { + let _ = rx.changed().await; + }), + _drained_tx: drained_tx, } } } @@ -100,12 +100,12 @@ where loop { match mem::replace(me.state, State::Draining) { State::Watch(on_drain) => { - match me.watch.rx.poll_recv_ref(cx) { - Poll::Ready(None) => { + match Pin::new(&mut me.watch).poll(cx) { + Poll::Ready(()) => { // Drain has been triggered! on_drain(me.future.as_mut()); } - Poll::Ready(Some(_ /*State::Open*/)) | Poll::Pending => { + Poll::Pending => { *me.state = State::Watch(on_drain); return me.future.poll(cx); } diff --git a/src/common/io/rewind.rs b/src/common/io/rewind.rs index 14650697c3..b01662440f 100644 --- a/src/common/io/rewind.rs +++ b/src/common/io/rewind.rs @@ -2,7 +2,7 @@ use std::marker::Unpin; use std::{cmp, io}; use bytes::{Buf, Bytes}; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use crate::common::{task, Pin, Poll}; @@ -37,36 +37,33 @@ impl Rewind { (self.inner, self.pre.unwrap_or_else(Bytes::new)) } - pub(crate) fn get_mut(&mut self) -> &mut T { - &mut self.inner - } + // pub(crate) fn get_mut(&mut self) -> &mut T { + // &mut self.inner + // } } impl AsyncRead for Rewind where T: AsyncRead + Unpin, { - #[inline] - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [std::mem::MaybeUninit]) -> bool { - self.inner.prepare_uninitialized_buffer(buf) - } - fn poll_read( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { if let Some(mut prefix) = self.pre.take() { // If there are no remaining bytes, let the bytes get dropped. if !prefix.is_empty() { - let copy_len = cmp::min(prefix.len(), buf.len()); - prefix.copy_to_slice(&mut buf[..copy_len]); + let copy_len = cmp::min(prefix.len(), buf.remaining()); + // TODO: There should be a way to do following two lines cleaner... + buf.put_slice(&prefix[..copy_len]); + prefix.advance(copy_len); // Put back whats left if !prefix.is_empty() { self.pre = Some(prefix); } - return Poll::Ready(Ok(copy_len)); + return Poll::Ready(Ok(())); } } Pin::new(&mut self.inner).poll_read(cx, buf) @@ -92,15 +89,6 @@ where fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { Pin::new(&mut self.inner).poll_shutdown(cx) } - - #[inline] - fn poll_write_buf( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - buf: &mut B, - ) -> Poll> { - Pin::new(&mut self.inner).poll_write_buf(cx, buf) - } } #[cfg(test)] diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index f16d7d99e4..a0f2a55614 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -967,9 +967,8 @@ mod tests { *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]); conn.state.cached_headers = Some(HeaderMap::with_capacity(2)); - let mut rt = tokio::runtime::Builder::new() + let rt = tokio::runtime::Builder::new_current_thread() .enable_all() - .basic_scheduler() .build() .unwrap(); diff --git a/src/proto/h1/decode.rs b/src/proto/h1/decode.rs index beaf9aff7a..15c934d1f0 100644 --- a/src/proto/h1/decode.rs +++ b/src/proto/h1/decode.rs @@ -382,7 +382,7 @@ mod tests { use super::*; use std::pin::Pin; use std::time::Duration; - use tokio::io::AsyncRead; + use tokio::io::{AsyncRead, ReadBuf}; impl<'a> MemRead for &'a [u8] { fn read_mem(&mut self, _: &mut task::Context<'_>, len: usize) -> Poll> { @@ -401,8 +401,9 @@ mod tests { impl<'a> MemRead for &'a mut (dyn AsyncRead + Unpin) { fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll> { let mut v = vec![0; len]; - let n = ready!(Pin::new(self).poll_read(cx, &mut v)?); - Poll::Ready(Ok(Bytes::copy_from_slice(&v[..n]))) + let mut buf = ReadBuf::new(&mut v); + ready!(Pin::new(self).poll_read(cx, &mut buf)?); + Poll::Ready(Ok(Bytes::copy_from_slice(&buf.filled()))) } } @@ -623,7 +624,7 @@ mod tests { #[cfg(feature = "nightly")] #[bench] fn bench_decode_chunked_1kb(b: &mut test::Bencher) { - let mut rt = new_runtime(); + let rt = new_runtime(); const LEN: usize = 1024; let mut vec = Vec::new(); @@ -647,7 +648,7 @@ mod tests { #[cfg(feature = "nightly")] #[bench] fn bench_decode_length_1kb(b: &mut test::Bencher) { - let mut rt = new_runtime(); + let rt = new_runtime(); const LEN: usize = 1024; let content = Bytes::from(&[0; LEN][..]); @@ -665,9 +666,8 @@ mod tests { #[cfg(feature = "nightly")] fn new_runtime() -> tokio::runtime::Runtime { - tokio::runtime::Builder::new() + tokio::runtime::Builder::new_current_thread() .enable_all() - .basic_scheduler() .build() .expect("rt build") } diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index c0e407bf59..9bef39c6de 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -27,7 +27,7 @@ pub(crate) trait Dispatch { type PollError; type RecvItem; fn poll_msg( - &mut self, + self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll>>; fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>; @@ -40,8 +40,10 @@ pub struct Server, B> { pub(crate) service: S, } +#[pin_project::pin_project] pub struct Client { callback: Option, Response>>, + #[pin] rx: ClientRx, rx_closed: bool, } @@ -281,7 +283,7 @@ where && self.conn.can_write_head() && self.dispatch.should_poll() { - if let Some(msg) = ready!(self.dispatch.poll_msg(cx)) { + if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) { let (head, mut body) = msg.map_err(crate::Error::new_user_service)?; // Check if the body knows its full data immediately. @@ -469,10 +471,11 @@ where type RecvItem = RequestHead; fn poll_msg( - &mut self, + mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll>> { - let ret = if let Some(ref mut fut) = self.in_flight.as_mut().as_pin_mut() { + let mut this = self.as_mut(); + let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() { let resp = ready!(fut.as_mut().poll(cx)?); let (parts, body) = resp.into_parts(); let head = MessageHead { @@ -486,7 +489,7 @@ where }; // Since in_flight finished, remove it - self.in_flight.set(None); + this.in_flight.set(None); ret } @@ -540,11 +543,12 @@ where type RecvItem = ResponseHead; fn poll_msg( - &mut self, + self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll>> { - debug_assert!(!self.rx_closed); - match self.rx.poll_next(cx) { + let this = self.project(); + debug_assert!(!*this.rx_closed); + match this.rx.poll_next(cx) { Poll::Ready(Some((req, mut cb))) => { // check that future hasn't been canceled already match cb.poll_canceled(cx) { @@ -559,7 +563,7 @@ where subject: RequestLine(parts.method, parts.uri), headers: parts.headers, }; - self.callback = Some(cb); + *this.callback = Some(cb); Poll::Ready(Some(Ok((head, body)))) } } @@ -567,7 +571,7 @@ where Poll::Ready(None) => { // user has dropped sender handle trace!("client tx closed"); - self.rx_closed = true; + *this.rx_closed = true; Poll::Ready(None) } Poll::Pending => Poll::Pending, diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index ed1b731306..067ed67343 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -4,7 +4,7 @@ use std::fmt; use std::io::{self, IoSlice}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use super::{Http1Transaction, ParseContext, ParsedMessage}; use crate::common::buf::BufList; @@ -188,9 +188,16 @@ where if self.read_buf_remaining_mut() < next { self.read_buf.reserve(next); } - match Pin::new(&mut self.io).poll_read_buf(cx, &mut self.read_buf) { - Poll::Ready(Ok(n)) => { - debug!("read {} bytes", n); + let mut buf = ReadBuf::uninit(&mut self.read_buf.bytes_mut()[..]); + match Pin::new(&mut self.io).poll_read(cx, &mut buf) { + Poll::Ready(Ok(_)) => { + let n = buf.filled().len(); + unsafe { + // Safety: we just read that many bytes into the + // uninitialized part of the buffer, so this is okay. + // @tokio pls give me back `poll_read_buf` thanks + self.read_buf.advance_mut(n); + } self.read_buf_strategy.record(n); Poll::Ready(Ok(n)) } @@ -224,8 +231,16 @@ where return self.poll_flush_flattened(cx); } loop { - let n = - ready!(Pin::new(&mut self.io).poll_write_buf(cx, &mut self.write_buf.auto()))?; + // TODO(eliza): this basically ignores all of `WriteBuf`...put + // back vectored IO and `poll_write_buf` when the appropriate Tokio + // changes land... + let n = ready!(Pin::new(&mut self.io) + // .poll_write_buf(cx, &mut self.write_buf.auto()))?; + .poll_write(cx, self.write_buf.auto().bytes()))?; + // TODO(eliza): we have to do this manually because + // `poll_write_buf` doesn't exist in Tokio 0.3 yet...when + // `poll_write_buf` comes back, the manual advance will need to leave! + self.write_buf.advance(n); debug!("flushed {} bytes", n); if self.write_buf.remaining() == 0 { break; @@ -452,6 +467,7 @@ where self.strategy = strategy; } + // TODO(eliza): put back writev! #[inline] fn auto(&mut self) -> WriteBufAuto<'_, B> { WriteBufAuto::new(self) @@ -628,28 +644,31 @@ mod tests { */ #[tokio::test] + #[ignore] async fn iobuf_write_empty_slice() { - // First, let's just check that the Mock would normally return an - // error on an unexpected write, even if the buffer is empty... - let mut mock = Mock::new().build(); - futures_util::future::poll_fn(|cx| { - Pin::new(&mut mock).poll_write_buf(cx, &mut Cursor::new(&[])) - }) - .await - .expect_err("should be a broken pipe"); - - // underlying io will return the logic error upon write, - // so we are testing that the io_buf does not trigger a write - // when there is nothing to flush - let mock = Mock::new().build(); - let mut io_buf = Buffered::<_, Cursor>>::new(mock); - io_buf.flush().await.expect("should short-circuit flush"); + // TODO(eliza): can i have writev back pls T_T + // // First, let's just check that the Mock would normally return an + // // error on an unexpected write, even if the buffer is empty... + // let mut mock = Mock::new().build(); + // futures_util::future::poll_fn(|cx| { + // Pin::new(&mut mock).poll_write_buf(cx, &mut Cursor::new(&[])) + // }) + // .await + // .expect_err("should be a broken pipe"); + + // // underlying io will return the logic error upon write, + // // so we are testing that the io_buf does not trigger a write + // // when there is nothing to flush + // let mock = Mock::new().build(); + // let mut io_buf = Buffered::<_, Cursor>>::new(mock); + // io_buf.flush().await.expect("should short-circuit flush"); } #[tokio::test] async fn parse_reads_until_blocked() { use crate::proto::h1::ClientTransaction; + let _ = pretty_env_logger::try_init(); let mock = Mock::new() // Split over multiple reads will read all of it .read(b"HTTP/1.1 200 OK\r\n") diff --git a/src/proto/h2/ping.rs b/src/proto/h2/ping.rs index 54dd6442a8..b3af976920 100644 --- a/src/proto/h2/ping.rs +++ b/src/proto/h2/ping.rs @@ -33,7 +33,7 @@ use std::time::Instant; use h2::{Ping, PingPong}; #[cfg(feature = "runtime")] -use tokio::time::{Delay, Instant}; +use tokio::time::{Instant, Sleep}; type WindowSize = u32; @@ -60,7 +60,7 @@ pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger) interval, timeout: config.keep_alive_timeout, while_idle: config.keep_alive_while_idle, - timer: tokio::time::delay_for(interval), + timer: tokio::time::sleep(interval), state: KeepAliveState::Init, }); @@ -156,7 +156,7 @@ struct KeepAlive { while_idle: bool, state: KeepAliveState, - timer: Delay, + timer: Sleep, } #[cfg(feature = "runtime")] diff --git a/src/server/conn.rs b/src/server/conn.rs index 0fe7a22aba..c06e7d5972 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -809,9 +809,9 @@ where type Output = Result, FE>; fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let me = self.project(); + let mut me = self.project(); let service = ready!(me.future.poll(cx))?; - let io = me.io.take().expect("polled after complete"); + let io = Option::take(&mut me.io).expect("polled after complete"); Poll::Ready(Ok(me.protocol.serve_connection(io, service))) } } diff --git a/src/server/tcp.rs b/src/server/tcp.rs index e526303429..bef38f2d4f 100644 --- a/src/server/tcp.rs +++ b/src/server/tcp.rs @@ -4,7 +4,7 @@ use std::net::{SocketAddr, TcpListener as StdTcpListener}; use std::time::Duration; use tokio::net::TcpListener; -use tokio::time::Delay; +use tokio::time::Sleep; use crate::common::{task, Future, Pin, Poll}; @@ -19,7 +19,7 @@ pub struct AddrIncoming { sleep_on_errors: bool, tcp_keepalive_timeout: Option, tcp_nodelay: bool, - timeout: Option, + timeout: Option, } impl AddrIncoming { @@ -30,6 +30,10 @@ impl AddrIncoming { } pub(super) fn from_std(std_listener: StdTcpListener) -> crate::Result { + // TcpListener::from_std doesn't set O_NONBLOCK + std_listener + .set_nonblocking(true) + .map_err(crate::Error::new_listen)?; let listener = TcpListener::from_std(std_listener).map_err(crate::Error::new_listen)?; let addr = listener.local_addr().map_err(crate::Error::new_listen)?; Ok(AddrIncoming { @@ -98,9 +102,46 @@ impl AddrIncoming { match ready!(self.listener.poll_accept(cx)) { Ok((socket, addr)) => { if let Some(dur) = self.tcp_keepalive_timeout { + // Convert the Tokio `TcpStream` into a `socket2` socket + // so we can call `set_keepalive`. + // TODO(eliza): if Tokio's `TcpSocket` API grows a few + // more methods in the future, hopefully we shouldn't + // have to do the `from_raw_fd` dance any longer... + #[cfg(unix)] + let socket = unsafe { + // Safety: `socket2`'s socket will try to close the + // underlying fd when it's dropped. However, we + // can't take ownership of the fd from the tokio + // TcpStream, so instead we will call `into_raw_fd` + // on the socket2 socket before dropping it. This + // prevents it from trying to close the fd. + use std::os::unix::io::{AsRawFd, FromRawFd}; + socket2::Socket::from_raw_fd(socket.as_raw_fd()) + }; + #[cfg(windows)] + let socket = unsafe { + // Safety: `socket2`'s socket will try to close the + // underlying SOCKET when it's dropped. However, we + // can't take ownership of the SOCKET from the tokio + // TcpStream, so instead we will call `into_raw_socket` + // on the socket2 socket before dropping it. This + // prevents it from trying to close the SOCKET. + use std::os::windows::io::{AsRawSocket, FromRawSocket}; + socket2::Socket::from_raw_socket(socket.as_raw_socket()) + }; + + // Actually set the TCP keepalive timeout. if let Err(e) = socket.set_keepalive(Some(dur)) { trace!("error trying to set TCP keepalive: {}", e); } + + // Take ownershop of the fd/socket back from the socket2 + // `Socket`, so that socket2 doesn't try to close it + // when it's dropped. + #[cfg(unix)] + drop(std::os::unix::io::IntoRawFd::into_raw_fd(socket)); + #[cfg(windows)] + drop(std::os::windows::io::IntoRawSocket::into_raw_socket(socket)); } if let Err(e) = socket.set_nodelay(self.tcp_nodelay) { trace!("error trying to set TCP nodelay: {}", e); @@ -119,7 +160,7 @@ impl AddrIncoming { error!("accept error: {}", e); // Sleep 1s. - let mut timeout = tokio::time::delay_for(Duration::from_secs(1)); + let mut timeout = tokio::time::sleep(Duration::from_secs(1)); match Pin::new(&mut timeout).poll(cx) { Poll::Ready(()) => { @@ -181,19 +222,20 @@ impl fmt::Debug for AddrIncoming { } mod addr_stream { - use bytes::{Buf, BufMut}; use std::io; use std::net::SocketAddr; #[cfg(unix)] use std::os::unix::io::{AsRawFd, RawFd}; - use tokio::io::{AsyncRead, AsyncWrite}; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; use crate::common::{task, Pin, Poll}; /// A transport returned yieled by `AddrIncoming`. + #[pin_project::pin_project] #[derive(Debug)] pub struct AddrStream { + #[pin] inner: TcpStream, pub(super) remote_addr: SocketAddr, } @@ -231,49 +273,24 @@ mod addr_stream { } impl AsyncRead for AddrStream { - unsafe fn prepare_uninitialized_buffer( - &self, - buf: &mut [std::mem::MaybeUninit], - ) -> bool { - self.inner.prepare_uninitialized_buffer(buf) - } - #[inline] fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - buf: &mut [u8], - ) -> Poll> { - Pin::new(&mut self.inner).poll_read(cx, buf) - } - - #[inline] - fn poll_read_buf( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut task::Context<'_>, - buf: &mut B, - ) -> Poll> { - Pin::new(&mut self.inner).poll_read_buf(cx, buf) + buf: &mut ReadBuf<'_>, + ) -> Poll> { + self.project().inner.poll_read(cx, buf) } } impl AsyncWrite for AddrStream { #[inline] fn poll_write( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8], ) -> Poll> { - Pin::new(&mut self.inner).poll_write(cx, buf) - } - - #[inline] - fn poll_write_buf( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - buf: &mut B, - ) -> Poll> { - Pin::new(&mut self.inner).poll_write_buf(cx, buf) + self.project().inner.poll_write(cx, buf) } #[inline] @@ -283,11 +300,8 @@ mod addr_stream { } #[inline] - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> Poll> { - Pin::new(&mut self.inner).poll_shutdown(cx) + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().inner.poll_shutdown(cx) } } diff --git a/src/upgrade.rs b/src/upgrade.rs index 55f390431f..f2bfe346cc 100644 --- a/src/upgrade.rs +++ b/src/upgrade.rs @@ -12,7 +12,7 @@ use std::io; use std::marker::Unpin; use bytes::{Buf, Bytes}; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::sync::oneshot; use crate::common::io::Rewind; @@ -105,15 +105,11 @@ impl Upgraded { } impl AsyncRead for Upgraded { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [std::mem::MaybeUninit]) -> bool { - self.io.prepare_uninitialized_buffer(buf) - } - fn poll_read( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { Pin::new(&mut self.io).poll_read(cx, buf) } } @@ -127,14 +123,6 @@ impl AsyncWrite for Upgraded { Pin::new(&mut self.io).poll_write(cx, buf) } - fn poll_write_buf( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - buf: &mut B, - ) -> Poll> { - Pin::new(self.io.get_mut()).poll_write_dyn_buf(cx, buf) - } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { Pin::new(&mut self.io).poll_flush(cx) } @@ -247,15 +235,11 @@ impl dyn Io + Send { } impl AsyncRead for ForwardsWriteBuf { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [std::mem::MaybeUninit]) -> bool { - self.0.prepare_uninitialized_buffer(buf) - } - fn poll_read( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { Pin::new(&mut self.0).poll_read(cx, buf) } } @@ -269,14 +253,6 @@ impl AsyncWrite for ForwardsWriteBuf { Pin::new(&mut self.0).poll_write(cx, buf) } - fn poll_write_buf( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - buf: &mut B, - ) -> Poll> { - Pin::new(&mut self.0).poll_write_buf(cx, buf) - } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { Pin::new(&mut self.0).poll_flush(cx) } @@ -290,9 +266,9 @@ impl Io for ForwardsWriteBuf { fn poll_write_dyn_buf( &mut self, cx: &mut task::Context<'_>, - mut buf: &mut dyn Buf, + buf: &mut dyn Buf, ) -> Poll> { - Pin::new(&mut self.0).poll_write_buf(cx, &mut buf) + Pin::new(&mut self.0).poll_write(cx, buf.bytes()) } } @@ -326,8 +302,8 @@ mod tests { fn poll_read( self: Pin<&mut Self>, _cx: &mut task::Context<'_>, - _buf: &mut [u8], - ) -> Poll> { + _buf: &mut ReadBuf<'_>, + ) -> Poll> { unreachable!("Mock::poll_read") } } @@ -335,21 +311,23 @@ mod tests { impl AsyncWrite for Mock { fn poll_write( self: Pin<&mut Self>, - _cx: &mut task::Context<'_>, - _buf: &[u8], + _: &mut task::Context<'_>, + buf: &[u8], ) -> Poll> { - panic!("poll_write shouldn't be called"); + // panic!("poll_write shouldn't be called"); + Poll::Ready(Ok(buf.len())) } - fn poll_write_buf( - self: Pin<&mut Self>, - _cx: &mut task::Context<'_>, - buf: &mut B, - ) -> Poll> { - let n = buf.remaining(); - buf.advance(n); - Poll::Ready(Ok(n)) - } + // TODO(eliza): :( + // fn poll_write_buf( + // self: Pin<&mut Self>, + // _cx: &mut task::Context<'_>, + // buf: &mut B, + // ) -> Poll> { + // let n = buf.remaining(); + // buf.advance(n); + // Poll::Ready(Ok(n)) + // } fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll> { unreachable!("Mock::poll_flush") diff --git a/tests/client.rs b/tests/client.rs index 576423768f..d3a91aae0d 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -18,7 +18,7 @@ use futures_channel::oneshot; use futures_core::{Future, Stream, TryFuture}; use futures_util::future::{self, FutureExt, TryFutureExt}; use tokio::net::TcpStream; -use tokio::runtime::Runtime; +mod support; fn s(buf: &[u8]) -> &str { std::str::from_utf8(buf).expect("from_utf8") @@ -115,12 +115,12 @@ macro_rules! test { #[test] fn $name() { let _ = pretty_env_logger::try_init(); - let mut rt = Runtime::new().expect("runtime new"); + let rt = support::runtime(); let res = test! { INNER; name: $name, - runtime: &mut rt, + runtime: &rt, server: expected: $server_expected, reply: $server_reply, @@ -169,12 +169,12 @@ macro_rules! test { #[test] fn $name() { let _ = pretty_env_logger::try_init(); - let mut rt = Runtime::new().expect("runtime new"); + let rt = support::runtime(); let err: ::hyper::Error = test! { INNER; name: $name, - runtime: &mut rt, + runtime: &rt, server: expected: $server_expected, reply: $server_reply, @@ -963,10 +963,10 @@ mod dispatch_impl { use futures_util::future::{FutureExt, TryFutureExt}; use futures_util::stream::StreamExt; use http::Uri; - use tokio::io::{AsyncRead, AsyncWrite}; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; - use tokio::runtime::Runtime; + use super::support; use hyper::body::HttpBody; use hyper::client::connect::{Connected, Connection, HttpConnector}; use hyper::Client; @@ -978,7 +978,7 @@ mod dispatch_impl { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut rt = Runtime::new().unwrap(); + let rt = support::runtime(); let (closes_tx, closes) = mpsc::channel(10); let client = Client::builder().build(DebugConnector::with_http_and_closes( HttpConnector::new(), @@ -1016,7 +1016,7 @@ mod dispatch_impl { rt.block_on(async move { let (res, ()) = future::join(res, rx).await; res.unwrap(); - tokio::time::delay_for(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_secs(1)).await; }); rt.block_on(closes.into_future()).0.expect("closes"); @@ -1029,7 +1029,7 @@ mod dispatch_impl { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut rt = Runtime::new().unwrap(); + let rt = support::runtime(); let (closes_tx, closes) = mpsc::channel(10); let (tx1, rx1) = oneshot::channel(); @@ -1075,7 +1075,7 @@ mod dispatch_impl { rt.block_on(async move { let (res, ()) = future::join(res, rx).await; res.unwrap(); - tokio::time::delay_for(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_secs(1)).await; }); rt.block_on(closes.into_future()).0.expect("closes"); @@ -1113,9 +1113,7 @@ mod dispatch_impl { // prevent this thread from closing until end of test, so the connection // stays open and idle until Client is dropped - Runtime::new() - .unwrap() - .block_on(client_drop_rx.into_future()) + support::runtime().block_on(client_drop_rx.into_future()) }); let client = Client::builder().build(DebugConnector::with_http_and_closes( @@ -1147,7 +1145,7 @@ mod dispatch_impl { drop(client); // and wait a few ticks for the connections to close - let t = tokio::time::delay_for(Duration::from_millis(100)).map(|_| panic!("time out")); + let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(t, close).await; } @@ -1195,7 +1193,7 @@ mod dispatch_impl { future::select(res, rx1).await; // res now dropped - let t = tokio::time::delay_for(Duration::from_millis(100)).map(|_| panic!("time out")); + let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(t, close).await; } @@ -1250,7 +1248,7 @@ mod dispatch_impl { res.unwrap(); // and wait a few ticks to see the connection drop - let t = tokio::time::delay_for(Duration::from_millis(100)).map(|_| panic!("time out")); + let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(t, close).await; } @@ -1300,7 +1298,7 @@ mod dispatch_impl { let (res, ()) = future::join(res, rx).await; res.unwrap(); - let t = tokio::time::delay_for(Duration::from_millis(100)).map(|_| panic!("time out")); + let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(t, close).await; } @@ -1346,7 +1344,7 @@ mod dispatch_impl { let (res, ()) = future::join(res, rx).await; res.unwrap(); - let t = tokio::time::delay_for(Duration::from_millis(100)).map(|_| panic!("time out")); + let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); future::select(t, close).await; } @@ -1357,7 +1355,7 @@ mod dispatch_impl { // idle connections that the Checkout would have found let _ = pretty_env_logger::try_init(); - let _rt = Runtime::new().unwrap(); + let _rt = support::runtime(); let connector = DebugConnector::new(); let connects = connector.connects.clone(); @@ -1379,7 +1377,7 @@ mod dispatch_impl { let _ = pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut rt = Runtime::new().unwrap(); + let rt = support::runtime(); let connector = DebugConnector::new(); let connects = connector.connects.clone(); @@ -1445,7 +1443,7 @@ mod dispatch_impl { let _ = pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut rt = Runtime::new().unwrap(); + let rt = support::runtime(); let connector = DebugConnector::new(); let connects = connector.connects.clone(); @@ -1507,7 +1505,7 @@ mod dispatch_impl { let _ = pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut rt = Runtime::new().unwrap(); + let rt = support::runtime(); let connector = DebugConnector::new(); let connects = connector.connects.clone(); @@ -1544,7 +1542,7 @@ mod dispatch_impl { assert_eq!(connects.load(Ordering::Relaxed), 0); let delayed_body = rx1 - .then(|_| tokio::time::delay_for(Duration::from_millis(200))) + .then(|_| tokio::time::sleep(Duration::from_millis(200))) .map(|_| Ok::<_, ()>("hello a")) .map_err(|_| -> hyper::Error { panic!("rx1") }) .into_stream(); @@ -1559,7 +1557,7 @@ mod dispatch_impl { // req 1 let fut = future::join(client.request(req), rx) - .then(|_| tokio::time::delay_for(Duration::from_millis(200))) + .then(|_| tokio::time::sleep(Duration::from_millis(200))) // req 2 .then(move |()| { let rx = rx3.expect("thread panicked"); @@ -1646,7 +1644,7 @@ mod dispatch_impl { // sleep real quick to let the threadpool put connection in ready // state and back into client pool - tokio::time::delay_for(Duration::from_millis(50)).await; + tokio::time::sleep(Duration::from_millis(50)).await; let rx = rx2.expect("thread panicked"); let req = Request::builder() @@ -1669,7 +1667,7 @@ mod dispatch_impl { let _ = pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut rt = Runtime::new().unwrap(); + let rt = support::runtime(); let connector = DebugConnector::new().proxy(); let client = Client::builder().build(connector); @@ -1708,7 +1706,7 @@ mod dispatch_impl { let _ = pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut rt = Runtime::new().unwrap(); + let rt = support::runtime(); let connector = DebugConnector::new().proxy(); let client = Client::builder().build(connector); @@ -1750,7 +1748,7 @@ mod dispatch_impl { let _ = pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut rt = Runtime::new().unwrap(); + let rt = support::runtime(); let connector = DebugConnector::new(); @@ -1814,8 +1812,8 @@ mod dispatch_impl { use tokio::net::TcpListener; let _ = pretty_env_logger::try_init(); - let mut rt = Runtime::new().unwrap(); - let mut listener = rt + let rt = support::runtime(); + let listener = rt .block_on(TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))) .unwrap(); let addr = listener.local_addr().unwrap(); @@ -1963,8 +1961,8 @@ mod dispatch_impl { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { Pin::new(&mut self.tcp).poll_read(cx, buf) } } @@ -1993,19 +1991,18 @@ mod conn { use futures_channel::oneshot; use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt}; use futures_util::StreamExt; - use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _}; + use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, ReadBuf}; use tokio::net::{TcpListener as TkTcpListener, TcpStream}; - use tokio::runtime::Runtime; use hyper::client::conn; use hyper::{self, Body, Method, Request}; - use super::{concat, s, tcp_connect, FutureHyperExt}; + use super::{concat, s, support, tcp_connect, FutureHyperExt}; #[tokio::test] async fn get() { let _ = ::pretty_env_logger::try_init(); - let mut listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))) + let listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))) .await .unwrap(); let addr = listener.local_addr().unwrap(); @@ -2052,7 +2049,7 @@ mod conn { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut rt = Runtime::new().unwrap(); + let rt = support::runtime(); let (tx1, rx1) = oneshot::channel(); @@ -2090,7 +2087,7 @@ mod conn { }); let rx = rx1.expect("thread panicked"); - let rx = rx.then(|_| tokio::time::delay_for(Duration::from_millis(200))); + let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200))); let chunk = rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); assert_eq!(chunk.len(), 5); } @@ -2100,7 +2097,7 @@ mod conn { let _ = ::pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut rt = Runtime::new().unwrap(); + let rt = support::runtime(); let (tx, rx) = oneshot::channel(); let server = thread::spawn(move || { @@ -2127,7 +2124,7 @@ mod conn { let (mut sender, body) = Body::channel(); let sender = thread::spawn(move || { sender.try_send_data("hello".into()).expect("try_send_data"); - Runtime::new().unwrap().block_on(rx).unwrap(); + support::runtime().block_on(rx).unwrap(); sender.abort(); }); @@ -2147,7 +2144,7 @@ mod conn { fn uri_absolute_form() { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut rt = Runtime::new().unwrap(); + let rt = support::runtime(); let (tx1, rx1) = oneshot::channel(); @@ -2185,7 +2182,7 @@ mod conn { concat(res) }); let rx = rx1.expect("thread panicked"); - let rx = rx.then(|_| tokio::time::delay_for(Duration::from_millis(200))); + let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200))); rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); } @@ -2193,7 +2190,7 @@ mod conn { fn http1_conn_coerces_http2_request() { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut rt = Runtime::new().unwrap(); + let rt = support::runtime(); let (tx1, rx1) = oneshot::channel(); @@ -2231,7 +2228,7 @@ mod conn { concat(res) }); let rx = rx1.expect("thread panicked"); - let rx = rx.then(|_| tokio::time::delay_for(Duration::from_millis(200))); + let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200))); rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); } @@ -2239,7 +2236,7 @@ mod conn { fn pipeline() { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut rt = Runtime::new().unwrap(); + let rt = support::runtime(); let (tx1, rx1) = oneshot::channel(); @@ -2283,20 +2280,18 @@ mod conn { }); let rx = rx1.expect("thread panicked"); - let rx = rx.then(|_| tokio::time::delay_for(Duration::from_millis(200))); + let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200))); rt.block_on(future::join3(res1, res2, rx).map(|r| r.0)) .unwrap(); } #[test] fn upgrade() { - use tokio::io::{AsyncReadExt, AsyncWriteExt}; - let _ = ::pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut rt = Runtime::new().unwrap(); + let rt = support::runtime(); let (tx1, rx1) = oneshot::channel(); @@ -2346,7 +2341,7 @@ mod conn { }); let rx = rx1.expect("thread panicked"); - let rx = rx.then(|_| tokio::time::delay_for(Duration::from_millis(200))); + let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200))); rt.block_on(future::join3(until_upgrade, res, rx).map(|r| r.0)) .unwrap(); @@ -2379,13 +2374,11 @@ mod conn { #[test] fn connect_method() { - use tokio::io::{AsyncReadExt, AsyncWriteExt}; - let _ = ::pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut rt = Runtime::new().unwrap(); + let rt = support::runtime(); let (tx1, rx1) = oneshot::channel(); @@ -2439,7 +2432,7 @@ mod conn { }); let rx = rx1.expect("thread panicked"); - let rx = rx.then(|_| tokio::time::delay_for(Duration::from_millis(200))); + let rx = rx.then(|_| tokio::time::sleep(Duration::from_millis(200))); rt.block_on(future::join3(until_tunneled, res, rx).map(|r| r.0)) .unwrap(); @@ -2529,7 +2522,7 @@ mod conn { let _ = shdn_tx.send(()); // Allow time for graceful shutdown roundtrips... - tokio::time::delay_for(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; // After graceful shutdown roundtrips, the client should be closed... future::poll_fn(|ctx| client.poll_ready(ctx)) @@ -2541,7 +2534,7 @@ mod conn { async fn http2_keep_alive_detects_unresponsive_server() { let _ = pretty_env_logger::try_init(); - let mut listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))) + let listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))) .await .unwrap(); let addr = listener.local_addr().unwrap(); @@ -2581,7 +2574,7 @@ mod conn { let _ = pretty_env_logger::try_init(); - let mut listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))) + let listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))) .await .unwrap(); let addr = listener.local_addr().unwrap(); @@ -2606,7 +2599,7 @@ mod conn { }); // sleep longer than keepalive would trigger - tokio::time::delay_for(Duration::from_secs(4)).await; + tokio::time::sleep(Duration::from_secs(4)).await; future::poll_fn(|ctx| client.poll_ready(ctx)) .await @@ -2617,7 +2610,7 @@ mod conn { async fn http2_keep_alive_closes_open_streams() { let _ = pretty_env_logger::try_init(); - let mut listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))) + let listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))) .await .unwrap(); let addr = listener.local_addr().unwrap(); @@ -2667,7 +2660,7 @@ mod conn { let _ = pretty_env_logger::try_init(); - let mut listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))) + let listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))) .await .unwrap(); let addr = listener.local_addr().unwrap(); @@ -2711,7 +2704,7 @@ mod conn { let _resp = client.send_request(req1).await.expect("send_request"); // sleep longer than keepalive would trigger - tokio::time::delay_for(Duration::from_secs(4)).await; + tokio::time::sleep(Duration::from_secs(4)).await; future::poll_fn(|ctx| client.poll_ready(ctx)) .await @@ -2763,8 +2756,8 @@ mod conn { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { Pin::new(&mut self.tcp).poll_read(cx, buf) } } diff --git a/tests/server.rs b/tests/server.rs index bb465126fa..0b7bd46a41 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -18,9 +18,8 @@ use futures_util::future::{self, Either, FutureExt, TryFutureExt}; #[cfg(feature = "stream")] use futures_util::stream::StreamExt as _; use http::header::{HeaderName, HeaderValue}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio::net::{TcpListener, TcpStream as TkTcpStream}; -use tokio::runtime::Runtime; use hyper::body::HttpBody as _; use hyper::client::Client; @@ -29,6 +28,8 @@ use hyper::server::Server; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, StatusCode, Version}; +mod support; + #[test] fn get_should_ignore_body() { let server = serve(); @@ -788,7 +789,7 @@ fn expect_continue_but_no_body_is_ignored() { #[tokio::test] async fn expect_continue_waits_for_body_poll() { let _ = pretty_env_logger::try_init(); - let mut listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); let child = thread::spawn(move || { @@ -821,7 +822,7 @@ async fn expect_continue_waits_for_body_poll() { service_fn(|req| { assert_eq!(req.headers()["expect"], "100-continue"); // But! We're never going to poll the body! - tokio::time::delay_for(Duration::from_millis(50)).map(move |_| { + tokio::time::sleep(Duration::from_millis(50)).map(move |_| { // Move and drop the req, so we don't auto-close drop(req); Response::builder() @@ -956,7 +957,7 @@ fn http_10_request_receives_http_10_response() { #[tokio::test] async fn disable_keep_alive_mid_request() { - let mut listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); let (tx1, rx1) = oneshot::channel(); @@ -994,7 +995,7 @@ async fn disable_keep_alive_mid_request() { #[tokio::test] async fn disable_keep_alive_post_request() { let _ = pretty_env_logger::try_init(); - let mut listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); let (tx1, rx1) = oneshot::channel(); @@ -1046,7 +1047,7 @@ async fn disable_keep_alive_post_request() { #[tokio::test] async fn empty_parse_eof_does_not_return_error() { let _ = pretty_env_logger::try_init(); - let mut listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); thread::spawn(move || { @@ -1062,7 +1063,7 @@ async fn empty_parse_eof_does_not_return_error() { #[tokio::test] async fn nonempty_parse_eof_returns_error() { - let mut listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); thread::spawn(move || { @@ -1080,7 +1081,7 @@ async fn nonempty_parse_eof_returns_error() { #[tokio::test] async fn http1_allow_half_close() { let _ = pretty_env_logger::try_init(); - let mut listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); let t1 = thread::spawn(move || { @@ -1100,7 +1101,7 @@ async fn http1_allow_half_close() { .serve_connection( socket, service_fn(|_| { - tokio::time::delay_for(Duration::from_millis(500)) + tokio::time::sleep(Duration::from_millis(500)) .map(|_| Ok::<_, hyper::Error>(Response::new(Body::empty()))) }), ) @@ -1113,7 +1114,7 @@ async fn http1_allow_half_close() { #[tokio::test] async fn disconnect_after_reading_request_before_responding() { let _ = pretty_env_logger::try_init(); - let mut listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); thread::spawn(move || { @@ -1127,7 +1128,7 @@ async fn disconnect_after_reading_request_before_responding() { .serve_connection( socket, service_fn(|_| { - tokio::time::delay_for(Duration::from_secs(2)).map( + tokio::time::sleep(Duration::from_secs(2)).map( |_| -> Result, hyper::Error> { panic!("response future should have been dropped"); }, @@ -1140,7 +1141,7 @@ async fn disconnect_after_reading_request_before_responding() { #[tokio::test] async fn returning_1xx_response_is_error() { - let mut listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); thread::spawn(move || { @@ -1193,7 +1194,7 @@ async fn upgrades() { use tokio::io::{AsyncReadExt, AsyncWriteExt}; let _ = pretty_env_logger::try_init(); - let mut listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); let (tx, rx) = oneshot::channel(); @@ -1252,7 +1253,7 @@ async fn http_connect() { use tokio::io::{AsyncReadExt, AsyncWriteExt}; let _ = pretty_env_logger::try_init(); - let mut listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); let (tx, rx) = oneshot::channel(); @@ -1308,7 +1309,7 @@ async fn upgrades_new() { use tokio::io::{AsyncReadExt, AsyncWriteExt}; let _ = pretty_env_logger::try_init(); - let mut listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); let (read_101_tx, read_101_rx) = oneshot::channel(); @@ -1375,7 +1376,7 @@ async fn upgrades_new() { #[tokio::test] async fn upgrades_ignored() { let _ = pretty_env_logger::try_init(); - let mut listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); tokio::spawn(async move { @@ -1417,7 +1418,7 @@ async fn http_connect_new() { use tokio::io::{AsyncReadExt, AsyncWriteExt}; let _ = pretty_env_logger::try_init(); - let mut listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); let (read_200_tx, read_200_rx) = oneshot::channel(); @@ -1480,7 +1481,7 @@ async fn http_connect_new() { #[tokio::test] async fn parse_errors_send_4xx_response() { - let mut listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); thread::spawn(move || { @@ -1502,7 +1503,7 @@ async fn parse_errors_send_4xx_response() { #[tokio::test] async fn illegal_request_length_returns_400_response() { - let mut listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); thread::spawn(move || { @@ -1538,7 +1539,7 @@ fn max_buf_size_no_panic() { #[tokio::test] async fn max_buf_size() { let _ = pretty_env_logger::try_init(); - let mut listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); const MAX: usize = 16_000; @@ -1592,7 +1593,7 @@ fn http1_response_with_http2_version() { let server = serve(); let addr_str = format!("http://{}", server.addr()); - let mut rt = Runtime::new().expect("runtime new"); + let rt = support::runtime(); server.reply().version(hyper::Version::HTTP_2); @@ -1609,7 +1610,7 @@ fn try_h2() { let server = serve(); let addr_str = format!("http://{}", server.addr()); - let mut rt = Runtime::new().expect("runtime new"); + let rt = support::runtime(); rt.block_on({ let client = Client::builder() @@ -1629,7 +1630,7 @@ fn http1_only() { let server = serve_opts().http1_only().serve(); let addr_str = format!("http://{}", server.addr()); - let mut rt = Runtime::new().expect("runtime new"); + let rt = support::runtime(); rt.block_on({ let client = Client::builder() @@ -1684,7 +1685,7 @@ fn http2_body_user_error_sends_reset_reason() { server.reply().body_stream(b); - let mut rt = Runtime::new().expect("runtime new"); + let rt = support::runtime(); let err: hyper::Error = rt .block_on(async move { @@ -1823,7 +1824,7 @@ fn skips_content_length_and_body_for_304_responses() { async fn http2_keep_alive_detects_unresponsive_client() { let _ = pretty_env_logger::try_init(); - let mut listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); // Spawn a "client" conn that only reads until EOF @@ -1871,7 +1872,7 @@ async fn http2_keep_alive_detects_unresponsive_client() { async fn http2_keep_alive_with_responsive_client() { let _ = pretty_env_logger::try_init(); - let mut listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); tokio::spawn(async move { @@ -1897,7 +1898,7 @@ async fn http2_keep_alive_with_responsive_client() { conn.await.expect("client conn"); }); - tokio::time::delay_for(Duration::from_secs(4)).await; + tokio::time::sleep(Duration::from_secs(4)).await; let req = http::Request::new(hyper::Body::empty()); client.send_request(req).await.expect("client.send_request"); @@ -1938,7 +1939,7 @@ async fn write_pong_frame(conn: &mut TkTcpStream) { async fn http2_keep_alive_count_server_pings() { let _ = pretty_env_logger::try_init(); - let mut listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); tokio::spawn(async move { @@ -2294,38 +2295,32 @@ impl ServeOptions { let thread = thread::Builder::new() .name(thread_name) .spawn(move || { - let mut rt = tokio::runtime::Builder::new() - .enable_io() - .enable_time() - .basic_scheduler() - .build() - .expect("rt new"); - - rt.block_on(async move { - let service = make_service_fn(|_| { - let msg_tx = msg_tx.clone(); - let reply_rx = reply_rx.clone(); - future::ok::<_, BoxError>(TestService { - tx: msg_tx, - reply: reply_rx, - }) - }); - - let server = Server::bind(&addr) - .http1_only(options.http1_only) - .http1_keepalive(options.keep_alive) - .http1_pipeline_flush(options.pipeline) - .serve(service); - - addr_tx.send(server.local_addr()).expect("server addr tx"); - - server - .with_graceful_shutdown(async { - let _ = shutdown_rx.await; - }) - .await - }) - .expect("serve()"); + support::runtime() + .block_on(async move { + let service = make_service_fn(|_| { + let msg_tx = msg_tx.clone(); + let reply_rx = reply_rx.clone(); + future::ok::<_, BoxError>(TestService { + tx: msg_tx, + reply: reply_rx, + }) + }); + + let server = Server::bind(&addr) + .http1_only(options.http1_only) + .http1_keepalive(options.keep_alive) + .http1_pipeline_flush(options.pipeline) + .serve(service); + + addr_tx.send(server.local_addr()).expect("server addr tx"); + + server + .with_graceful_shutdown(async { + let _ = shutdown_rx.await; + }) + .await + }) + .expect("serve()"); }) .expect("thread spawn"); @@ -2353,6 +2348,7 @@ fn has_header(msg: &str, name: &str) -> bool { fn tcp_bind(addr: &SocketAddr) -> ::tokio::io::Result { let std_listener = StdTcpListener::bind(addr).unwrap(); + std_listener.set_nonblocking(true).unwrap(); TcpListener::from_std(std_listener) } @@ -2429,8 +2425,8 @@ impl AsyncRead for DebugStream { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { Pin::new(&mut self.stream).poll_read(cx, buf) } } diff --git a/tests/support/mod.rs b/tests/support/mod.rs index fd089bc561..56ed839106 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -1,3 +1,4 @@ +#![allow(dead_code)] use std::future::Future; use std::pin::Pin; use std::sync::{ @@ -15,6 +16,7 @@ pub use futures_util::{ pub use hyper::{HeaderMap, StatusCode}; pub use std::net::SocketAddr; +#[allow(unused_macros)] macro_rules! t { ( $name:ident, @@ -303,15 +305,16 @@ pub struct __TestConfig { pub proxy: bool, } -pub fn __run_test(cfg: __TestConfig) { - let _ = pretty_env_logger::try_init(); - tokio::runtime::Builder::new() - .enable_io() - .enable_time() - .basic_scheduler() +pub fn runtime() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() .build() .expect("new rt") - .block_on(async_test(cfg)); +} + +pub fn __run_test(cfg: __TestConfig) { + let _ = pretty_env_logger::try_init(); + runtime().block_on(async_test(cfg)); } async fn async_test(cfg: __TestConfig) {