diff --git a/src/body/body.rs b/src/body/body.rs index 9164320a0a..fddf4f07b8 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -2,6 +2,9 @@ use std::borrow::Cow; #[cfg(feature = "stream")] use std::error::Error as StdError; use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; use bytes::Bytes; use futures_channel::mpsc; @@ -15,10 +18,9 @@ use http_body::{Body as HttpBody, SizeHint}; use super::DecodedLength; #[cfg(feature = "stream")] use crate::common::sync_wrapper::SyncWrapper; -use crate::common::Future; +use crate::common::watch; #[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] use crate::common::Never; -use crate::common::{task, watch, Pin, Poll}; #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] use crate::proto::h2::ping; @@ -239,7 +241,7 @@ impl Body { .get_or_insert_with(|| Box::new(Extra { delayed_eof: None })) } - fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll>> { + fn poll_eof(&mut self, cx: &mut Context<'_>) -> Poll>> { match self.take_delayed_eof() { #[cfg(any(feature = "http1", feature = "http2"))] #[cfg(feature = "client")] @@ -292,7 +294,7 @@ impl Body { } } - fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll>> { + fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll>> { match self.kind { Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)), Kind::Chan { @@ -367,14 +369,14 @@ impl HttpBody for Body { fn poll_data( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, ) -> Poll>> { self.poll_eof(cx) } fn poll_trailers( #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>, - #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut task::Context<'_>, + #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut Context<'_>, ) -> Poll, Self::Error>> { match self.kind { #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] @@ -470,7 +472,7 @@ impl fmt::Debug for Body { impl Stream for Body { type Item = crate::Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { HttpBody::poll_data(self, cx) } } @@ -550,7 +552,7 @@ impl From> for Body { impl Sender { /// Check to see if this `Sender` can send more data. - pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { // Check if the receiver end has tried polling for the body yet ready!(self.poll_want(cx)?); self.data_tx @@ -558,7 +560,7 @@ impl Sender { .map_err(|_| crate::Error::new_closed()) } - fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll> { match self.want_rx.load(cx) { WANT_READY => Poll::Ready(Ok(())), WANT_PENDING => Poll::Pending, diff --git a/src/client/client.rs b/src/client/client.rs index 10a0ed4b77..8195554bd7 100644 --- a/src/client/client.rs +++ b/src/client/client.rs @@ -1,6 +1,10 @@ use std::error::Error as StdError; use std::fmt; +use std::future::Future; +use std::marker::Unpin; use std::mem; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Duration; use futures_channel::oneshot; @@ -12,10 +16,7 @@ use tracing::{debug, trace, warn}; use crate::body::{Body, HttpBody}; use crate::client::connect::CaptureConnectionExtension; -use crate::common::{ - exec::BoxSendFuture, lazy as hyper_lazy, sync_wrapper::SyncWrapper, task, Future, Lazy, Pin, - Poll, -}; +use crate::common::{exec::BoxSendFuture, lazy as hyper_lazy, sync_wrapper::SyncWrapper, Lazy}; #[cfg(feature = "http2")] use crate::ext::Protocol; use crate::rt::Executor; @@ -553,7 +554,7 @@ where type Error = crate::Error; type Future = ResponseFuture; - fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll> { + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } @@ -573,7 +574,7 @@ where type Error = crate::Error; type Future = ResponseFuture; - fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll> { + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } @@ -628,7 +629,7 @@ impl fmt::Debug for ResponseFuture { impl Future for ResponseFuture { type Output = crate::Result>; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.inner.get_mut().as_mut().poll(cx) } } @@ -650,7 +651,7 @@ enum PoolTx { } impl PoolClient { - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { match self.tx { PoolTx::Http1(ref mut tx) => tx.poll_ready(cx), #[cfg(feature = "http2")] diff --git a/src/client/conn.rs b/src/client/conn.rs index 7410ec6d09..35770d4e6b 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -61,9 +61,13 @@ pub mod http2; use std::error::Error as StdError; use std::fmt; +use std::future::Future; #[cfg(not(all(feature = "http1", feature = "http2")))] use std::marker::PhantomData; +use std::marker::Unpin; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; #[cfg(all(feature = "runtime", feature = "http2"))] use std::time::Duration; @@ -77,12 +81,9 @@ use tracing::{debug, trace}; use super::dispatch; use crate::body::HttpBody; +use crate::common::exec::{BoxSendFuture, Exec}; #[cfg(not(all(feature = "http1", feature = "http2")))] use crate::common::Never; -use crate::common::{ - exec::{BoxSendFuture, Exec}, - task, Future, Pin, Poll, -}; use crate::proto; use crate::rt::Executor; #[cfg(feature = "http1")] @@ -257,7 +258,7 @@ impl SendRequest { /// Polls to determine whether this sender can be used yet for a request. /// /// If the associated connection is closed, this returns an Error. - pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.dispatch.poll_ready(cx) } @@ -381,7 +382,7 @@ where type Error = crate::Error; type Future = ResponseFuture; - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.poll_ready(cx) } @@ -502,7 +503,7 @@ where /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html) /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html) /// to work with this function; or use the `without_shutdown` wrapper. - pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll> { + pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll> { match *self.inner.as_mut().expect("already upgraded") { #[cfg(feature = "http1")] ProtoClient::H1 { ref mut h1 } => h1.poll_without_shutdown(cx), @@ -554,7 +555,7 @@ where { type Output = crate::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? { proto::Dispatched::Shutdown => Poll::Ready(Ok(())), #[cfg(feature = "http1")] @@ -1067,7 +1068,7 @@ impl Builder { impl Future for ResponseFuture { type Output = crate::Result>; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.inner { ResponseFutureState::Waiting(ref mut rx) => { Pin::new(rx).poll(cx).map(|res| match res { @@ -1101,7 +1102,7 @@ where { type Output = crate::Result; - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.project() { #[cfg(feature = "http1")] ProtoClientProj::H1 { h1 } => h1.poll(cx), diff --git a/src/client/connect/dns.rs b/src/client/connect/dns.rs index b0d9c4741b..50245de68d 100644 --- a/src/client/connect/dns.rs +++ b/src/client/connect/dns.rs @@ -26,7 +26,7 @@ use std::future::Future; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs}; use std::pin::Pin; use std::str::FromStr; -use std::task::{self, Poll}; +use std::task::{Context, Poll}; use std::{fmt, io, vec}; use tokio::task::JoinHandle; @@ -113,7 +113,7 @@ impl Service for GaiResolver { type Error = io::Error; type Future = GaiFuture; - fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll> { + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } @@ -138,7 +138,7 @@ impl fmt::Debug for GaiResolver { impl Future for GaiFuture { type Output = Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { Pin::new(&mut self.inner).poll(cx).map(|res| match res { Ok(Ok(addrs)) => Ok(GaiAddrs { inner: addrs }), Ok(Err(err)) => Err(err), @@ -286,7 +286,7 @@ impl Service for TokioThreadpoolGaiResolver { type Error = io::Error; type Future = TokioThreadpoolGaiFuture; - fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll> { + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } @@ -299,7 +299,7 @@ impl Service for TokioThreadpoolGaiResolver { impl Future for TokioThreadpoolGaiFuture { type Output = Result; - fn poll(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { match ready!(tokio_executor::threadpool::blocking(|| ( self.name.as_str(), 0 @@ -318,8 +318,10 @@ impl Future for TokioThreadpoolGaiFuture { */ mod sealed { + use std::future::Future; + use std::task::{Context, Poll}; + use super::{Name, SocketAddr}; - use crate::common::{task, Future, Poll}; use tower_service::Service; // "Trait alias" for `Service` @@ -328,7 +330,7 @@ mod sealed { type Error: Into>; type Future: Future>; - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll>; + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; fn resolve(&mut self, name: Name) -> Self::Future; } @@ -342,7 +344,7 @@ mod sealed { type Error = S::Error; type Future = S::Future; - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { Service::poll_ready(self, cx) } diff --git a/src/client/connect/mod.rs b/src/client/connect/mod.rs index 1f24de1f7e..4c29dd3a3e 100644 --- a/src/client/connect/mod.rs +++ b/src/client/connect/mod.rs @@ -427,12 +427,13 @@ where #[cfg(any(feature = "http1", feature = "http2"))] pub(super) mod sealed { use std::error::Error as StdError; + use std::future::Future; + use std::marker::Unpin; use ::http::Uri; use tokio::io::{AsyncRead, AsyncWrite}; use super::Connection; - use crate::common::{Future, Unpin}; /// Connect to a destination, returning an IO transport. /// diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 771c40da30..a1a93ea964 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -1,13 +1,13 @@ #[cfg(feature = "http2")] use std::future::Future; +use std::marker::Unpin; +#[cfg(feature = "http2")] +use std::pin::Pin; +use std::task::{Context, Poll}; use futures_util::FutureExt; use tokio::sync::{mpsc, oneshot}; -#[cfg(feature = "http2")] -use crate::common::Pin; -use crate::common::{task, Poll}; - pub(crate) type RetryPromise = oneshot::Receiver)>>; pub(crate) type Promise = oneshot::Receiver>; @@ -53,7 +53,7 @@ pub(crate) struct UnboundedSender { } impl Sender { - pub(crate) fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.giver .poll_want(cx) .map_err(|_| crate::Error::new_closed()) @@ -155,10 +155,7 @@ pub(crate) struct Receiver { } impl Receiver { - pub(crate) fn poll_recv( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll)>> { + pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll)>> { match self.inner.poll_recv(cx) { Poll::Ready(item) => { Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped"))) @@ -245,7 +242,7 @@ impl Callback { } } - pub(crate) fn poll_canceled(&mut self, cx: &mut task::Context<'_>) -> Poll<()> { + pub(crate) fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> { match *self { Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx), Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx), diff --git a/src/client/pool.rs b/src/client/pool.rs index 91b7fcb21f..5391ab2894 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -1,8 +1,12 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::error::Error as StdError; use std::fmt; +use std::future::Future; +use std::marker::Unpin; use std::ops::{Deref, DerefMut}; +use std::pin::Pin; use std::sync::{Arc, Mutex, Weak}; +use std::task::{Context, Poll}; #[cfg(not(feature = "runtime"))] use std::time::{Duration, Instant}; @@ -13,7 +17,7 @@ use tokio::time::{Duration, Instant, Interval}; use tracing::{debug, trace}; use super::client::Ver; -use crate::common::{exec::Exec, task, Future, Pin, Poll, Unpin}; +use crate::common::exec::Exec; // FIXME: allow() required due to `impl Trait` leaking types to this lint #[allow(missing_debug_implementations)] @@ -576,10 +580,7 @@ impl fmt::Display for CheckoutIsClosedError { } impl Checkout { - fn poll_waiter( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll>>> { + fn poll_waiter(&mut self, cx: &mut Context<'_>) -> Poll>>> { if let Some(mut rx) = self.waiter.take() { match Pin::new(&mut rx).poll(cx) { Poll::Ready(Ok(value)) => { @@ -604,7 +605,7 @@ impl Checkout { } } - fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option> { + fn checkout(&mut self, cx: &mut Context<'_>) -> Option> { let entry = { let mut inner = self.pool.inner.as_ref()?.lock().unwrap(); let expiration = Expiration::new(inner.timeout); @@ -657,7 +658,7 @@ impl Checkout { impl Future for Checkout { type Output = crate::Result>; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if let Some(pooled) = ready!(self.poll_waiter(cx)?) { return Poll::Ready(Ok(pooled)); } @@ -748,7 +749,7 @@ pin_project_lite::pin_project! { impl Future for IdleTask { type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); loop { match this.pool_drop_notifier.as_mut().poll(cx) { @@ -864,7 +865,7 @@ mod tests { { type Output = Option<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match Pin::new(&mut self.0).poll(cx) { Poll::Ready(Ok(_)) => Poll::Ready(Some(())), Poll::Ready(Err(_)) => Poll::Ready(Some(())), diff --git a/src/client/service.rs b/src/client/service.rs index f3560ea088..047dd98766 100644 --- a/src/client/service.rs +++ b/src/client/service.rs @@ -5,6 +5,8 @@ use std::error::Error as StdError; use std::future::Future; use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; use tracing::debug; @@ -12,7 +14,6 @@ use tracing::debug; use super::conn::{Builder, SendRequest}; use crate::{ body::HttpBody, - common::{task, Pin, Poll}, service::{MakeConnection, Service}, }; @@ -58,7 +59,7 @@ where type Future = Pin> + Send + 'static>>; - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner .poll_ready(cx) .map_err(|e| crate::Error::new(crate::error::Kind::Connect).with(e.into())) diff --git a/src/common/drain.rs b/src/common/drain.rs index 174da876df..c8562d3c98 100644 --- a/src/common/drain.rs +++ b/src/common/drain.rs @@ -1,10 +1,11 @@ +use std::future::Future; use std::mem; +use std::pin::Pin; +use std::task::{Context, Poll}; use pin_project_lite::pin_project; use tokio::sync::watch; -use super::{task, Future, Pin, Poll}; - pub(crate) fn channel() -> (Signal, Watch) { let (tx, rx) = watch::channel(()); (Signal { tx }, Watch { rx }) @@ -47,7 +48,7 @@ impl Signal { impl Future for Draining { type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { Pin::new(&mut self.as_mut().0).poll(cx) } } @@ -80,7 +81,7 @@ where { type Output = F::Output; - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut me = self.project(); loop { match mem::replace(me.state, State::Draining) { @@ -115,7 +116,7 @@ mod tests { impl Future for TestMe { type Output = (); - fn poll(mut self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { self.poll_cnt += 1; if self.finished { Poll::Ready(()) diff --git a/src/common/io/rewind.rs b/src/common/io/rewind.rs index 0afef5f7ea..9ed7c42fea 100644 --- a/src/common/io/rewind.rs +++ b/src/common/io/rewind.rs @@ -1,11 +1,11 @@ use std::marker::Unpin; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::{cmp, io}; use bytes::{Buf, Bytes}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use crate::common::{task, Pin, Poll}; - /// Combine a buffer with an IO, rewinding reads to use the buffer. #[derive(Debug)] pub(crate) struct Rewind { @@ -50,7 +50,7 @@ where { fn poll_read( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { if let Some(mut prefix) = self.pre.take() { @@ -78,7 +78,7 @@ where { fn poll_write( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { Pin::new(&mut self.inner).poll_write(cx, buf) @@ -86,17 +86,17 @@ where fn poll_write_vectored( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll> { Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.inner).poll_flush(cx) } - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.inner).poll_shutdown(cx) } diff --git a/src/common/lazy.rs b/src/common/lazy.rs index 2722077303..df2c07d596 100644 --- a/src/common/lazy.rs +++ b/src/common/lazy.rs @@ -1,6 +1,9 @@ -use pin_project_lite::pin_project; +use std::future::Future; +use std::marker::Unpin; +use std::pin::Pin; +use std::task::{Context, Poll}; -use super::{task, Future, Pin, Poll}; +use pin_project_lite::pin_project; pub(crate) trait Started: Future { fn started(&self) -> bool; @@ -55,7 +58,7 @@ where { type Output = R::Output; - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); if let InnerProj::Fut { fut } = this.inner.as_mut().project() { diff --git a/src/common/mod.rs b/src/common/mod.rs index e38c6f5c7a..5327f21b5b 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -23,6 +23,7 @@ mod never; all(feature = "client", any(feature = "http1", feature = "http2")) ))] pub(crate) mod sync_wrapper; +#[cfg(feature = "http1")] pub(crate) mod task; pub(crate) mod watch; @@ -30,10 +31,3 @@ pub(crate) mod watch; pub(crate) use self::lazy::{lazy, Started as Lazy}; #[cfg(any(feature = "http1", feature = "http2", feature = "runtime"))] pub(crate) use self::never::Never; -pub(crate) use self::task::Poll; - -// group up types normally needed for `Future` -cfg_proto! { - pub(crate) use std::marker::Unpin; -} -pub(crate) use std::{future::Future, pin::Pin}; diff --git a/src/common/task.rs b/src/common/task.rs index ec70c957d6..1d9f2a9982 100644 --- a/src/common/task.rs +++ b/src/common/task.rs @@ -1,11 +1,10 @@ -#[cfg(feature = "http1")] +use std::task::{Context, Poll}; + use super::Never; -pub(crate) use std::task::{Context, Poll}; /// A function to help "yield" a future, such that it is re-scheduled immediately. /// /// Useful for spin counts, so a future doesn't hog too much time. -#[cfg(feature = "http1")] pub(crate) fn yield_now(cx: &mut Context<'_>) -> Poll { cx.waker().wake_by_ref(); Poll::Pending diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 5ebff2803e..5ab72f264e 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -1,6 +1,9 @@ use std::fmt; use std::io; use std::marker::PhantomData; +use std::marker::Unpin; +use std::pin::Pin; +use std::task::{Context, Poll}; #[cfg(all(feature = "server", feature = "runtime"))] use std::time::Duration; @@ -16,7 +19,6 @@ use tracing::{debug, error, trace}; use super::io::Buffered; use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants}; use crate::body::DecodedLength; -use crate::common::{task, Pin, Poll, Unpin}; use crate::headers::connection_keep_alive; use crate::proto::{BodyLength, MessageHead}; @@ -185,7 +187,7 @@ where pub(super) fn poll_read_head( &mut self, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, ) -> Poll, DecodedLength, Wants)>>> { debug_assert!(self.can_read_head()); trace!("Conn::read_head"); @@ -286,7 +288,7 @@ where pub(crate) fn poll_read_body( &mut self, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, ) -> Poll>> { debug_assert!(self.can_read_body()); @@ -347,10 +349,7 @@ where ret } - pub(crate) fn poll_read_keep_alive( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll> { + pub(crate) fn poll_read_keep_alive(&mut self, cx: &mut Context<'_>) -> Poll> { debug_assert!(!self.can_read_head() && !self.can_read_body()); if self.is_read_closed() { @@ -373,7 +372,7 @@ where // // This should only be called for Clients wanting to enter the idle // state. - fn require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn require_empty_read(&mut self, cx: &mut Context<'_>) -> Poll> { debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed()); debug_assert!(!self.is_mid_message()); debug_assert!(T::is_client()); @@ -406,7 +405,7 @@ where Poll::Ready(Err(crate::Error::new_unexpected_message())) } - fn mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn mid_message_detect_eof(&mut self, cx: &mut Context<'_>) -> Poll> { debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed()); debug_assert!(self.is_mid_message()); @@ -425,7 +424,7 @@ where } } - fn force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn force_io_read(&mut self, cx: &mut Context<'_>) -> Poll> { debug_assert!(!self.state.is_read_closed()); let result = ready!(self.io.poll_read_from_io(cx)); @@ -436,7 +435,7 @@ where })) } - fn maybe_notify(&mut self, cx: &mut task::Context<'_>) { + fn maybe_notify(&mut self, cx: &mut Context<'_>) { // its possible that we returned NotReady from poll() without having // exhausted the underlying Io. We would have done this when we // determined we couldn't keep reading until we knew how writing @@ -483,7 +482,7 @@ where } } - fn try_keep_alive(&mut self, cx: &mut task::Context<'_>) { + fn try_keep_alive(&mut self, cx: &mut Context<'_>) { self.state.try_keep_alive::(); self.maybe_notify(cx); } @@ -726,14 +725,14 @@ where Err(err) } - pub(crate) fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll> { + pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll> { ready!(Pin::new(&mut self.io).poll_flush(cx))?; self.try_keep_alive(cx); trace!("flushed({}): {:?}", T::LOG, self.state); Poll::Ready(Ok(())) } - pub(crate) fn poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll> { + pub(crate) fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll> { match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) { Ok(()) => { trace!("shut down IO complete"); @@ -747,7 +746,7 @@ where } /// If the read side can be cheaply drained, do so. Otherwise, close. - pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>) { + pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut Context<'_>) { if let Reading::Continue(ref decoder) = self.state.reading { // skip sending the 100-continue // just move forward to a read, in case a tiny body was included diff --git a/src/proto/h1/decode.rs b/src/proto/h1/decode.rs index a04dca84c1..b1a8e8861e 100644 --- a/src/proto/h1/decode.rs +++ b/src/proto/h1/decode.rs @@ -1,13 +1,12 @@ use std::error::Error as StdError; use std::fmt; use std::io; +use std::task::{Context, Poll}; use std::usize; use bytes::Bytes; use tracing::{debug, trace}; -use crate::common::{task, Poll}; - use super::io::MemRead; use super::DecodedLength; @@ -103,7 +102,7 @@ impl Decoder { pub(crate) fn decode( &mut self, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, body: &mut R, ) -> Poll> { trace!("decode; state={:?}", self.kind); @@ -185,7 +184,7 @@ macro_rules! byte ( impl ChunkedState { fn step( &self, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, body: &mut R, size: &mut u64, buf: &mut Option, @@ -207,7 +206,7 @@ impl ChunkedState { } } fn read_size( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, size: &mut u64, ) -> Poll> { @@ -252,7 +251,7 @@ impl ChunkedState { Poll::Ready(Ok(ChunkedState::Size)) } fn read_size_lws( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, ) -> Poll> { trace!("read_size_lws"); @@ -268,7 +267,7 @@ impl ChunkedState { } } fn read_extension( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, ) -> Poll> { trace!("read_extension"); @@ -288,7 +287,7 @@ impl ChunkedState { } } fn read_size_lf( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, size: u64, ) -> Poll> { @@ -310,7 +309,7 @@ impl ChunkedState { } fn read_body( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, rem: &mut u64, buf: &mut Option, @@ -344,7 +343,7 @@ impl ChunkedState { } } fn read_body_cr( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, ) -> Poll> { match byte!(rdr, cx) { @@ -356,7 +355,7 @@ impl ChunkedState { } } fn read_body_lf( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, ) -> Poll> { match byte!(rdr, cx) { @@ -369,7 +368,7 @@ impl ChunkedState { } fn read_trailer( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, ) -> Poll> { trace!("read_trailer"); @@ -379,7 +378,7 @@ impl ChunkedState { } } fn read_trailer_lf( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, ) -> Poll> { match byte!(rdr, cx) { @@ -392,7 +391,7 @@ impl ChunkedState { } fn read_end_cr( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, ) -> Poll> { match byte!(rdr, cx) { @@ -401,7 +400,7 @@ impl ChunkedState { } } fn read_end_lf( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, ) -> Poll> { match byte!(rdr, cx) { @@ -433,7 +432,7 @@ mod tests { use tokio::io::{AsyncRead, ReadBuf}; impl<'a> MemRead for &'a [u8] { - fn read_mem(&mut self, _: &mut task::Context<'_>, len: usize) -> Poll> { + fn read_mem(&mut self, _: &mut Context<'_>, len: usize) -> Poll> { let n = std::cmp::min(len, self.len()); if n > 0 { let (a, b) = self.split_at(n); @@ -447,7 +446,7 @@ mod tests { } impl<'a> MemRead for &'a mut (dyn AsyncRead + Unpin) { - fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll> { + fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll> { let mut v = vec![0; len]; let mut buf = ReadBuf::new(&mut v); ready!(Pin::new(self).poll_read(cx, &mut buf)?); @@ -457,7 +456,7 @@ mod tests { #[cfg(feature = "nightly")] impl MemRead for Bytes { - fn read_mem(&mut self, _: &mut task::Context<'_>, len: usize) -> Poll> { + fn read_mem(&mut self, _: &mut Context<'_>, len: usize) -> Poll> { let n = std::cmp::min(len, self.len()); let ret = self.split_to(n); Poll::Ready(Ok(ret)) diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index f663210546..a0c4daec1e 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -1,4 +1,8 @@ use std::error::Error as StdError; +use std::future::Future; +use std::marker::Unpin; +use std::pin::Pin; +use std::task::{Context, Poll}; use bytes::{Buf, Bytes}; use http::Request; @@ -7,7 +11,7 @@ use tracing::{debug, trace}; use super::{Http1Transaction, Wants}; use crate::body::{Body, DecodedLength, HttpBody}; -use crate::common::{task, Future, Pin, Poll, Unpin}; +use crate::common; use crate::proto::{BodyLength, Conn, Dispatched, MessageHead, RequestHead}; use crate::upgrade::OnUpgrade; @@ -26,10 +30,10 @@ pub(crate) trait Dispatch { type RecvItem; fn poll_msg( self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, ) -> Poll>>; fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>; - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll>; + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; fn should_poll(&self) -> bool; } @@ -96,10 +100,7 @@ where /// /// This is useful for old-style HTTP upgrades, but ignores /// newer-style upgrade API. - pub(crate) fn poll_without_shutdown( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll> + pub(crate) fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll> where Self: Unpin, { @@ -112,7 +113,7 @@ where fn poll_catch( &mut self, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, should_shutdown: bool, ) -> Poll> { Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| { @@ -131,7 +132,7 @@ where fn poll_inner( &mut self, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, should_shutdown: bool, ) -> Poll> { T::update_date(); @@ -152,7 +153,7 @@ where } } - fn poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_loop(&mut self, cx: &mut Context<'_>) -> Poll> { // Limit the looping on this connection, in case it is ready far too // often, so that other futures don't starve. // @@ -179,10 +180,10 @@ where trace!("poll_loop yielding (self = {:p})", self); - task::yield_now(cx).map(|never| match never {}) + common::task::yield_now(cx).map(|never| match never {}) } - fn poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_read(&mut self, cx: &mut Context<'_>) -> Poll> { loop { if self.is_closing { return Poll::Ready(Ok(())); @@ -236,7 +237,7 @@ where } } - fn poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_read_head(&mut self, cx: &mut Context<'_>) -> Poll> { // can dispatch receive, or does it still care about, an incoming message? match ready!(self.dispatch.poll_ready(cx)) { Ok(()) => (), @@ -291,7 +292,7 @@ where } } - fn poll_write(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll> { loop { if self.is_closing { return Poll::Ready(Ok(())); @@ -383,7 +384,7 @@ where } } - fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll> { self.conn.poll_flush(cx).map_err(|err| { debug!("error writing: {}", err); crate::Error::new_body_write(err) @@ -430,7 +431,7 @@ where type Output = crate::Result; #[inline] - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.poll_catch(cx, true) } } @@ -494,7 +495,7 @@ cfg_server! { fn poll_msg( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, ) -> Poll>> { let mut this = self.as_mut(); let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() { @@ -529,7 +530,7 @@ cfg_server! { Ok(()) } - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { if self.in_flight.is_some() { Poll::Pending } else { @@ -570,7 +571,7 @@ cfg_client! { fn poll_msg( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, ) -> Poll>> { let mut this = self.as_mut(); debug_assert!(!this.rx_closed); @@ -641,7 +642,7 @@ cfg_client! { } } - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { match self.callback { Some(ref mut cb) => match cb.poll_canceled(cx) { Poll::Ready(()) => { diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index 1d251e2c84..02d8a4a9ec 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -5,6 +5,8 @@ use std::future::Future; use std::io::{self, IoSlice}; use std::marker::Unpin; use std::mem::MaybeUninit; +use std::pin::Pin; +use std::task::{Context, Poll}; #[cfg(all(feature = "server", feature = "runtime"))] use std::time::Duration; @@ -16,7 +18,6 @@ use tracing::{debug, trace}; use super::{Http1Transaction, ParseContext, ParsedMessage}; use crate::common::buf::BufList; -use crate::common::{task, Pin, Poll}; /// The initial buffer size allocated before trying to read from IO. pub(crate) const INIT_BUFFER_SIZE: usize = 8192; @@ -174,7 +175,7 @@ where pub(super) fn parse( &mut self, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, parse_ctx: ParseContext<'_>, ) -> Poll>> where @@ -250,10 +251,7 @@ where } } - pub(crate) fn poll_read_from_io( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll> { + pub(crate) fn poll_read_from_io(&mut self, cx: &mut Context<'_>) -> Poll> { self.read_blocked = false; let next = self.read_buf_strategy.next(); if self.read_buf_remaining_mut() < next { @@ -296,7 +294,7 @@ where self.read_blocked } - pub(crate) fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll> { + pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll> { if self.flush_pipeline && !self.read_buf.is_empty() { Poll::Ready(Ok(())) } else if self.write_buf.remaining() == 0 { @@ -336,7 +334,7 @@ where /// /// Since all buffered bytes are flattened into the single headers buffer, /// that skips some bookkeeping around using multiple buffers. - fn poll_flush_flattened(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_flush_flattened(&mut self, cx: &mut Context<'_>) -> Poll> { loop { let n = ready!(Pin::new(&mut self.io).poll_write(cx, self.write_buf.headers.chunk()))?; debug!("flushed {} bytes", n); @@ -366,7 +364,7 @@ impl Unpin for Buffered {} // TODO: This trait is old... at least rename to PollBytes or something... pub(crate) trait MemRead { - fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll>; + fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll>; } impl MemRead for Buffered @@ -374,7 +372,7 @@ where T: AsyncRead + AsyncWrite + Unpin, B: Buf, { - fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll> { + fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll> { if !self.read_buf.is_empty() { let n = std::cmp::min(len, self.read_buf.len()); Poll::Ready(Ok(self.read_buf.split_to(n).freeze())) diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index bac8eceb3a..f43aaa0c01 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -1,4 +1,8 @@ use std::error::Error as StdError; +use std::future::Future; +use std::marker::Unpin; +use std::pin::Pin; +use std::task::{Context, Poll}; #[cfg(feature = "runtime")] use std::time::Duration; @@ -15,7 +19,7 @@ use tracing::{debug, trace, warn}; use super::{ping, H2Upgraded, PipeToSendStream, SendBuf}; use crate::body::HttpBody; use crate::client::dispatch::Callback; -use crate::common::{exec::Exec, task, Future, Never, Pin, Poll}; +use crate::common::{exec::Exec, Never}; use crate::ext::Protocol; use crate::headers; use crate::proto::h2::UpgradedSendStream; @@ -239,7 +243,7 @@ where B::Data: Send, B::Error: Into>, { - fn poll_pipe(&mut self, f: FutCtx, cx: &mut task::Context<'_>) { + fn poll_pipe(&mut self, f: FutCtx, cx: &mut Context<'_>) { let ping = self.ping.clone(); let send_stream = if !f.is_connect { if !f.eos { @@ -334,7 +338,7 @@ where { type Output = crate::Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { match ready!(self.h2_tx.poll_ready(cx)) { Ok(()) => (), diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index 216aa2dac4..d50850d0a0 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -4,14 +4,15 @@ use http::header::{HeaderName, CONNECTION, TE, TRAILER, TRANSFER_ENCODING, UPGRA use http::HeaderMap; use pin_project_lite::pin_project; use std::error::Error as StdError; +use std::future::Future; use std::io::{self, Cursor, IoSlice}; use std::mem; -use std::task::Context; +use std::pin::Pin; +use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tracing::{debug, trace, warn}; use crate::body::HttpBody; -use crate::common::{task, Future, Pin, Poll}; use crate::proto::h2::ping::Recorder; pub(crate) mod ping; @@ -116,7 +117,7 @@ where { type Output = crate::Result<()>; - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut me = self.project(); loop { if !*me.data_done { diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index f4c0a848be..b4e8d68a15 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -1,5 +1,8 @@ use std::error::Error as StdError; +use std::future::Future; use std::marker::Unpin; +use std::pin::Pin; +use std::task::{Context, Poll}; #[cfg(feature = "runtime")] use std::time::Duration; @@ -13,8 +16,8 @@ use tracing::{debug, trace, warn}; use super::{ping, PipeToSendStream, SendBuf}; use crate::body::HttpBody; +use crate::common::date; use crate::common::exec::ConnStreamExec; -use crate::common::{date, task, Future, Pin, Poll}; use crate::ext::Protocol; use crate::headers; use crate::proto::h2::ping::Recorder; @@ -192,7 +195,7 @@ where { type Output = crate::Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = &mut *self; loop { let next = match me.state { @@ -235,7 +238,7 @@ where { fn poll_server( &mut self, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, service: &mut S, exec: &mut E, ) -> Poll> @@ -355,7 +358,7 @@ where Poll::Ready(Err(self.closing.take().expect("polled after error"))) } - fn poll_ping(&mut self, cx: &mut task::Context<'_>) { + fn poll_ping(&mut self, cx: &mut Context<'_>) { if let Some((_, ref mut estimator)) = self.ping { match estimator.poll(cx) { Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => { @@ -446,7 +449,7 @@ where B::Error: Into>, E: Into>, { - fn poll2(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + fn poll2(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut me = self.project(); loop { let next = match me.state.as_mut().project() { @@ -541,7 +544,7 @@ where { type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.poll2(cx).map(|res| { if let Err(e) = res { debug!("stream error: {}", e); diff --git a/src/server/accept.rs b/src/server/accept.rs index 4b7a1487dd..07dcd62524 100644 --- a/src/server/accept.rs +++ b/src/server/accept.rs @@ -6,16 +6,14 @@ //! connections. //! - Utilities like `poll_fn` to ease creating a custom `Accept`. +use std::pin::Pin; +use std::task::{Context, Poll}; + #[cfg(feature = "stream")] use futures_core::Stream; #[cfg(feature = "stream")] use pin_project_lite::pin_project; -use crate::common::{ - task::{self, Poll}, - Pin, -}; - /// Asynchronously accept incoming connections. pub trait Accept { /// The connection type that can be accepted. @@ -26,7 +24,7 @@ pub trait Accept { /// Poll to accept the next connection. fn poll_accept( self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, ) -> Poll>>; } @@ -51,7 +49,7 @@ pub trait Accept { /// ``` pub fn poll_fn(func: F) -> impl Accept where - F: FnMut(&mut task::Context<'_>) -> Poll>>, + F: FnMut(&mut Context<'_>) -> Poll>>, { struct PollFn(F); @@ -60,13 +58,13 @@ where impl Accept for PollFn where - F: FnMut(&mut task::Context<'_>) -> Poll>>, + F: FnMut(&mut Context<'_>) -> Poll>>, { type Conn = IO; type Error = E; fn poll_accept( self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, ) -> Poll>> { (self.get_mut().0)(cx) } @@ -101,7 +99,7 @@ where type Error = E; fn poll_accept( self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, ) -> Poll>> { self.project().stream.poll_next(cx) } diff --git a/src/server/conn.rs b/src/server/conn.rs index dfe2172457..d8b881a778 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -68,6 +68,10 @@ cfg_feature! { use std::error::Error as StdError; use std::fmt; + use std::task::{Context, Poll}; + use std::pin::Pin; + use std::future::Future; + use std::marker::Unpin; use bytes::Bytes; use pin_project_lite::pin_project; @@ -76,7 +80,6 @@ cfg_feature! { pub use super::server::Connecting; use crate::body::{Body, HttpBody}; - use crate::common::{task, Future, Pin, Poll, Unpin}; #[cfg(not(all(feature = "http1", feature = "http2")))] use crate::common::Never; use crate::common::exec::{ConnStreamExec, Exec}; @@ -805,7 +808,7 @@ where /// upgrade. Once the upgrade is completed, the connection would be "done", /// but it is not desired to actually shutdown the IO object. Instead you /// would take it back using `into_parts`. - pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll> { + pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll> { loop { match *self.conn.as_mut().unwrap() { #[cfg(feature = "http1")] @@ -901,7 +904,7 @@ where { type Output = crate::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { match ready!(Pin::new(self.conn.as_mut().unwrap()).poll(cx)) { Ok(done) => { @@ -980,7 +983,7 @@ where { type Output = crate::Result; - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.project() { #[cfg(feature = "http1")] ProtoServerProj::H1 { h1, .. } => h1.poll(cx), @@ -1041,7 +1044,7 @@ mod upgrades { { type Output = crate::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { match ready!(Pin::new(self.inner.conn.as_mut().unwrap()).poll(cx)) { Ok(proto::Dispatched::Shutdown) => return Poll::Ready(Ok(())), diff --git a/src/server/server.rs b/src/server/server.rs index cd34bb7b4a..6d7d69e51b 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -1,8 +1,11 @@ use std::error::Error as StdError; use std::fmt; +use std::future::Future; +use std::marker::Unpin; #[cfg(feature = "tcp")] use std::net::{SocketAddr, TcpListener as StdTcpListener}; - +use std::pin::Pin; +use std::task::{Context, Poll}; #[cfg(feature = "tcp")] use std::time::Duration; @@ -17,7 +20,6 @@ use super::tcp::AddrIncoming; use crate::body::{Body, HttpBody}; use crate::common::exec::Exec; use crate::common::exec::{ConnStreamExec, NewSvcExec}; -use crate::common::{task, Future, Pin, Poll, Unpin}; // Renamed `Http` as `Http_` for now so that people upgrading don't see an // error that `hyper::server::Http` is private... use super::conn::{Connection, Http as Http_, UpgradeableConnection}; @@ -162,7 +164,7 @@ where fn poll_next_( self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, ) -> Poll>>> { let me = self.project(); match ready!(me.make_service.poll_ready_ref(cx)) { @@ -188,7 +190,7 @@ where pub(super) fn poll_watch( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, watcher: &W, ) -> Poll> where @@ -221,7 +223,7 @@ where { type Output = crate::Result<()>; - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.poll_watch(cx, &NoopWatcher) } } @@ -675,13 +677,17 @@ where // used by exec.rs pub(crate) mod new_svc { use std::error::Error as StdError; + use std::future::Future; + use std::marker::Unpin; + use std::pin::Pin; + use std::task::{Context, Poll}; + use tokio::io::{AsyncRead, AsyncWrite}; use tracing::debug; use super::{Connecting, Watcher}; use crate::body::{Body, HttpBody}; use crate::common::exec::ConnStreamExec; - use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::service::HttpService; use pin_project_lite::pin_project; @@ -742,7 +748,7 @@ pub(crate) mod new_svc { { type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // If it weren't for needing to name this type so the `Send` bounds // could be projected to the `Serve` executor, this could just be // an `async fn`, and much safer. Woe is me. @@ -810,7 +816,7 @@ where { type Output = Result, FE>; - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut me = self.project(); let service = ready!(me.future.poll(cx))?; let io = Option::take(&mut me.io).expect("polled after complete"); diff --git a/src/server/shutdown.rs b/src/server/shutdown.rs index 96937d0827..be858481c5 100644 --- a/src/server/shutdown.rs +++ b/src/server/shutdown.rs @@ -1,4 +1,8 @@ use std::error::Error as StdError; +use std::future::Future; +use std::marker::Unpin; +use std::pin::Pin; +use std::task::{Context, Poll}; use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; @@ -10,7 +14,6 @@ use super::server::{Server, Watcher}; use crate::body::{Body, HttpBody}; use crate::common::drain::{self, Draining, Signal, Watch, Watching}; use crate::common::exec::{ConnStreamExec, NewSvcExec}; -use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::service::{HttpService, MakeServiceRef}; pin_project! { @@ -63,7 +66,7 @@ where { type Output = crate::Result<()>; - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut me = self.project(); loop { let next = { diff --git a/src/server/tcp.rs b/src/server/tcp.rs index 31fcdfa343..edbddcd94b 100644 --- a/src/server/tcp.rs +++ b/src/server/tcp.rs @@ -1,15 +1,16 @@ use socket2::TcpKeepalive; use std::fmt; +use std::future::Future; use std::io; use std::net::{SocketAddr, TcpListener as StdTcpListener}; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Duration; use tokio::net::TcpListener; use tokio::time::Sleep; use tracing::{debug, error, trace}; -use crate::common::{task, Future, Pin, Poll}; - #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::addr_stream::AddrStream; use super::accept::Accept; @@ -200,7 +201,7 @@ impl AddrIncoming { self.sleep_on_errors = val; } - fn poll_next_(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_next_(&mut self, cx: &mut Context<'_>) -> Poll> { // Check if a previous timeout is active that was set by IO errors. if let Some(ref mut to) = self.timeout { ready!(Pin::new(to).poll(cx)); @@ -261,7 +262,7 @@ impl Accept for AddrIncoming { fn poll_accept( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, ) -> Poll>> { let result = ready!(self.poll_next_(cx)); Poll::Ready(Some(result)) @@ -300,11 +301,11 @@ mod addr_stream { use std::net::SocketAddr; #[cfg(unix)] use std::os::unix::io::{AsRawFd, RawFd}; + use std::pin::Pin; + use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; - use crate::common::{task, Pin, Poll}; - pin_project_lite::pin_project! { /// A transport returned yieled by `AddrIncoming`. #[derive(Debug)] @@ -352,7 +353,7 @@ mod addr_stream { /// not yet available. pub fn poll_peek( &mut self, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { self.inner.poll_peek(cx, buf) @@ -363,7 +364,7 @@ mod addr_stream { #[inline] fn poll_read( self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { self.project().inner.poll_read(cx, buf) @@ -374,7 +375,7 @@ mod addr_stream { #[inline] fn poll_write( self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { self.project().inner.poll_write(cx, buf) @@ -383,20 +384,20 @@ mod addr_stream { #[inline] fn poll_write_vectored( self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll> { self.project().inner.poll_write_vectored(cx, bufs) } #[inline] - fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { // TCP flush is a noop Poll::Ready(Ok(())) } #[inline] - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().inner.poll_shutdown(cx) } diff --git a/src/service/http.rs b/src/service/http.rs index 81a20c80b5..d0586d8bd2 100644 --- a/src/service/http.rs +++ b/src/service/http.rs @@ -1,7 +1,8 @@ use std::error::Error as StdError; +use std::future::Future; +use std::task::{Context, Poll}; use crate::body::HttpBody; -use crate::common::{task, Future, Poll}; use crate::{Request, Response}; /// An asynchronous function from `Request` to `Response`. @@ -20,7 +21,7 @@ pub trait HttpService: sealed::Sealed { type Future: Future, Self::Error>>; #[doc(hidden)] - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll>; + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; #[doc(hidden)] fn call(&mut self, req: Request) -> Self::Future; @@ -37,7 +38,7 @@ where type Error = T::Error; type Future = T::Future; - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { tower_service::Service::poll_ready(self, cx) } diff --git a/src/service/make.rs b/src/service/make.rs index 63e6f298f1..188e4f4c32 100644 --- a/src/service/make.rs +++ b/src/service/make.rs @@ -1,11 +1,12 @@ use std::error::Error as StdError; use std::fmt; +use std::future::Future; +use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite}; use super::{HttpService, Service}; use crate::body::HttpBody; -use crate::common::{task, Future, Poll}; // The same "trait alias" as tower::MakeConnection, but inlined to reduce // dependencies. @@ -14,7 +15,7 @@ pub trait MakeConnection: self::sealed::Sealed<(Target,)> { type Error; type Future: Future>; - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll>; + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; fn make_connection(&mut self, target: Target) -> Self::Future; } @@ -29,7 +30,7 @@ where type Error = S::Error; type Future = S::Future; - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { Service::poll_ready(self, cx) } @@ -58,7 +59,7 @@ pub trait MakeServiceRef: self::sealed::Sealed<(Target, ReqBody // if necessary. type __DontNameMe: self::sealed::CantImpl; - fn poll_ready_ref(&mut self, cx: &mut task::Context<'_>) -> Poll>; + fn poll_ready_ref(&mut self, cx: &mut Context<'_>) -> Poll>; fn make_service_ref(&mut self, target: &Target) -> Self::Future; } @@ -81,7 +82,7 @@ where type __DontNameMe = self::sealed::CantName; - fn poll_ready_ref(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_ready_ref(&mut self, cx: &mut Context<'_>) -> Poll> { self.poll_ready(cx) } @@ -159,7 +160,7 @@ where type Response = Svc; type Future = Ret; - fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll> { + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } diff --git a/src/service/oneshot.rs b/src/service/oneshot.rs index 2697af8f4c..5e2ca47630 100644 --- a/src/service/oneshot.rs +++ b/src/service/oneshot.rs @@ -1,10 +1,12 @@ // TODO: Eventually to be replaced with tower_util::Oneshot. +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + use pin_project_lite::pin_project; use tower_service::Service; -use crate::common::{task, Future, Pin, Poll}; - pub(crate) fn oneshot(svc: S, req: Req) -> Oneshot where S: Service, @@ -47,7 +49,7 @@ where { type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut me = self.project(); loop { diff --git a/src/service/util.rs b/src/service/util.rs index 7cba1206f1..59760a6858 100644 --- a/src/service/util.rs +++ b/src/service/util.rs @@ -1,9 +1,10 @@ use std::error::Error as StdError; use std::fmt; +use std::future::Future; use std::marker::PhantomData; +use std::task::{Context, Poll}; use crate::body::HttpBody; -use crate::common::{task, Future, Poll}; use crate::{Request, Response}; /// Create a `Service` from a function. @@ -54,7 +55,7 @@ where type Error = E; type Future = Ret; - fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll> { + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } diff --git a/src/upgrade.rs b/src/upgrade.rs index 1c7b5b01cd..a46a8d224d 100644 --- a/src/upgrade.rs +++ b/src/upgrade.rs @@ -42,8 +42,11 @@ use std::any::TypeId; use std::error::Error as StdError; use std::fmt; +use std::future::Future; use std::io; use std::marker::Unpin; +use std::pin::Pin; +use std::task::{Context, Poll}; use bytes::Bytes; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; @@ -52,7 +55,6 @@ use tokio::sync::oneshot; use tracing::trace; use crate::common::io::Rewind; -use crate::common::{task, Future, Pin, Poll}; /// An upgraded HTTP connection. /// @@ -151,7 +153,7 @@ impl Upgraded { impl AsyncRead for Upgraded { fn poll_read( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { Pin::new(&mut self.io).poll_read(cx, buf) @@ -161,7 +163,7 @@ impl AsyncRead for Upgraded { impl AsyncWrite for Upgraded { fn poll_write( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { Pin::new(&mut self.io).poll_write(cx, buf) @@ -169,17 +171,17 @@ impl AsyncWrite for Upgraded { fn poll_write_vectored( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll> { Pin::new(&mut self.io).poll_write_vectored(cx, bufs) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.io).poll_flush(cx) } - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.io).poll_shutdown(cx) } @@ -210,7 +212,7 @@ impl OnUpgrade { impl Future for OnUpgrade { type Output = Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.rx { Some(ref mut rx) => Pin::new(rx).poll(cx).map(|res| match res { Ok(Ok(upgraded)) => Ok(upgraded), @@ -351,7 +353,7 @@ mod tests { impl AsyncRead for Mock { fn poll_read( self: Pin<&mut Self>, - _cx: &mut task::Context<'_>, + _cx: &mut Context<'_>, _buf: &mut ReadBuf<'_>, ) -> Poll> { unreachable!("Mock::poll_read") @@ -361,21 +363,18 @@ mod tests { impl AsyncWrite for Mock { fn poll_write( self: Pin<&mut Self>, - _: &mut task::Context<'_>, + _: &mut Context<'_>, buf: &[u8], ) -> Poll> { // panic!("poll_write shouldn't be called"); Poll::Ready(Ok(buf.len())) } - fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { unreachable!("Mock::poll_flush") } - fn poll_shutdown( - self: Pin<&mut Self>, - _cx: &mut task::Context<'_>, - ) -> Poll> { + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { unreachable!("Mock::poll_shutdown") } }