From d9216c352a0d21603152ecf3d1929bc0b89cdc4b Mon Sep 17 00:00:00 2001 From: tottoto Date: Wed, 18 Oct 2023 19:45:53 +0900 Subject: [PATCH] refactor(common): remove common re-export (#3346) --- src/body/incoming.rs | 12 ++++++----- src/client/conn/http1.rs | 10 ++++++---- src/client/conn/http2.rs | 8 +++++--- src/client/dispatch.rs | 19 ++++++++---------- src/common/io/rewind.rs | 13 ++++++------ src/common/mod.rs | 5 ----- src/common/task.rs | 3 ++- src/proto/h1/conn.rs | 28 ++++++++++++-------------- src/proto/h1/decode.rs | 35 ++++++++++++++++---------------- src/proto/h1/dispatch.rs | 43 +++++++++++++++++++++------------------- src/proto/h1/io.rs | 18 ++++++++--------- src/proto/h2/client.rs | 32 +++++++++++++++--------------- src/proto/h2/mod.rs | 16 ++++++++------- src/proto/h2/server.rs | 18 +++++++++-------- src/server/conn/http1.rs | 10 ++++++---- src/server/conn/http2.rs | 6 ++++-- src/service/http.rs | 2 +- src/service/util.rs | 2 +- src/upgrade.rs | 27 ++++++++++++------------- 19 files changed, 156 insertions(+), 151 deletions(-) diff --git a/src/body/incoming.rs b/src/body/incoming.rs index cdebd3db58..ad5bd16116 100644 --- a/src/body/incoming.rs +++ b/src/body/incoming.rs @@ -1,4 +1,7 @@ use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; use bytes::Bytes; use futures_channel::mpsc; @@ -8,8 +11,7 @@ use http::HeaderMap; use http_body::{Body, Frame, SizeHint}; use super::DecodedLength; -use crate::common::Future; -use crate::common::{task, watch, Pin, Poll}; +use crate::common::watch; #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] use crate::proto::h2::ping; @@ -157,7 +159,7 @@ impl Body for Incoming { fn poll_frame( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, ) -> Poll, Self::Error>>> { match self.kind { Kind::Empty => Poll::Ready(None), @@ -287,7 +289,7 @@ impl fmt::Debug for Incoming { impl Sender { /// Check to see if this `Sender` can send more data. - pub(crate) fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { // Check if the receiver end has tried polling for the body yet ready!(self.poll_want(cx)?); self.data_tx @@ -295,7 +297,7 @@ impl Sender { .map_err(|_| crate::Error::new_closed()) } - fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll> { match self.want_rx.load(cx) { WANT_READY => Poll::Ready(Ok(())), WANT_PENDING => Poll::Pending, diff --git a/src/client/conn/http1.rs b/src/client/conn/http1.rs index 4887f86663..c487b1927a 100644 --- a/src/client/conn/http1.rs +++ b/src/client/conn/http1.rs @@ -2,6 +2,9 @@ use std::error::Error as StdError; use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; use crate::rt::{Read, Write}; use bytes::Bytes; @@ -10,7 +13,6 @@ use httparse::ParserConfig; use super::super::dispatch; use crate::body::{Body, Incoming as IncomingBody}; -use crate::common::{task, Future, Pin, Poll}; use crate::proto; use crate::upgrade::Upgraded; @@ -84,7 +86,7 @@ where /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html) /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html) /// to work with this function; or use the `without_shutdown` wrapper. - pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll> { + pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner .as_mut() .expect("already upgraded") @@ -128,7 +130,7 @@ impl SendRequest { /// Polls to determine whether this sender can be used yet for a request. /// /// If the associated connection is closed, this returns an Error. - pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.dispatch.poll_ready(cx) } @@ -254,7 +256,7 @@ where { type Output = crate::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? { proto::Dispatched::Shutdown => Poll::Ready(Ok(())), proto::Dispatched::Upgrade(pending) => match self.inner.take() { diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index edb99cfeff..ec8ecf3e08 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -2,8 +2,11 @@ use std::error::Error; use std::fmt; +use std::future::Future; use std::marker::PhantomData; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use std::time::Duration; use crate::rt::{Read, Write}; @@ -12,7 +15,6 @@ use http::{Request, Response}; use super::super::dispatch; use crate::body::{Body, Incoming as IncomingBody}; use crate::common::time::Time; -use crate::common::{task, Future, Pin, Poll}; use crate::proto; use crate::rt::bounds::ExecutorClient; use crate::rt::Timer; @@ -79,7 +81,7 @@ impl SendRequest { /// Polls to determine whether this sender can be used yet for a request. /// /// If the associated connection is closed, this returns an Error. - pub fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll> { + pub fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { if self.is_closed() { Poll::Ready(Err(crate::Error::new_closed())) } else { @@ -236,7 +238,7 @@ where { type Output = crate::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match ready!(Pin::new(&mut self.inner.1).poll(cx))? { proto::Dispatched::Shutdown => Poll::Ready(Ok(())), #[cfg(feature = "http1")] diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index c4347d0614..88b3107877 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -1,5 +1,6 @@ +use std::task::{Context, Poll}; #[cfg(feature = "http2")] -use std::future::Future; +use std::{future::Future, pin::Pin}; #[cfg(feature = "http2")] use http::{Request, Response}; @@ -9,9 +10,8 @@ use http_body::Body; use pin_project_lite::pin_project; use tokio::sync::{mpsc, oneshot}; -use crate::common::{task, Poll}; #[cfg(feature = "http2")] -use crate::{body::Incoming, common::Pin, proto::h2::client::ResponseFutMap}; +use crate::{body::Incoming, proto::h2::client::ResponseFutMap}; #[cfg(test)] pub(crate) type RetryPromise = oneshot::Receiver)>>; @@ -62,7 +62,7 @@ pub(crate) struct UnboundedSender { impl Sender { #[cfg(feature = "http1")] - pub(crate) fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.giver .poll_want(cx) .map_err(|_| crate::Error::new_closed()) @@ -169,10 +169,7 @@ pub(crate) struct Receiver { } impl Receiver { - pub(crate) fn poll_recv( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll)>> { + pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll)>> { match self.inner.poll_recv(cx) { Poll::Ready(item) => { Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped"))) @@ -261,7 +258,7 @@ impl Callback { } } - pub(crate) fn poll_canceled(&mut self, cx: &mut task::Context<'_>) -> Poll<()> { + pub(crate) fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> { match *self { Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx), Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx), @@ -302,7 +299,7 @@ where { type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); let mut call_back = this.call_back.take().expect("polled after complete"); @@ -319,7 +316,7 @@ where Poll::Pending => { // Move call_back back to struct before return this.call_back.set(Some(call_back)); - return std::task::Poll::Pending; + return Poll::Pending; } }; trace!("send_when canceled"); diff --git a/src/common/io/rewind.rs b/src/common/io/rewind.rs index f6b6bab3c7..c84ce8f25e 100644 --- a/src/common/io/rewind.rs +++ b/src/common/io/rewind.rs @@ -1,9 +1,10 @@ use std::marker::Unpin; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::{cmp, io}; use bytes::{Buf, Bytes}; -use crate::common::{task, Pin, Poll}; use crate::rt::{Read, ReadBufCursor, Write}; /// Combine a buffer with an IO, rewinding reads to use the buffer. @@ -50,7 +51,7 @@ where { fn poll_read( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, mut buf: ReadBufCursor<'_>, ) -> Poll> { if let Some(mut prefix) = self.pre.take() { @@ -78,7 +79,7 @@ where { fn poll_write( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { Pin::new(&mut self.inner).poll_write(cx, buf) @@ -86,17 +87,17 @@ where fn poll_write_vectored( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll> { Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.inner).poll_flush(cx) } - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.inner).poll_shutdown(cx) } diff --git a/src/common/mod.rs b/src/common/mod.rs index d57d059a1c..655e9189b6 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -21,8 +21,3 @@ pub(crate) mod task; ))] pub(crate) mod time; pub(crate) mod watch; - -pub(crate) use self::task::Poll; - -// group up types normally needed for `Future` -pub(crate) use std::{future::Future, pin::Pin}; diff --git a/src/common/task.rs b/src/common/task.rs index 9a296c4cfc..63b5540464 100644 --- a/src/common/task.rs +++ b/src/common/task.rs @@ -1,4 +1,5 @@ -pub(crate) use std::task::{Context, Poll}; +#[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))] +use std::task::{Context, Poll}; /// A function to help "yield" a future, such that it is re-scheduled immediately. /// diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 4885f9c0dc..43fba3b793 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -1,6 +1,8 @@ use std::fmt; use std::io; use std::marker::{PhantomData, Unpin}; +use std::pin::Pin; +use std::task::{Context, Poll}; #[cfg(feature = "server")] use std::time::Duration; @@ -15,7 +17,6 @@ use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext use crate::body::DecodedLength; #[cfg(feature = "server")] use crate::common::time::Time; -use crate::common::{task, Pin, Poll}; use crate::headers::connection_keep_alive; use crate::proto::{BodyLength, MessageHead}; #[cfg(feature = "server")] @@ -193,7 +194,7 @@ where pub(super) fn poll_read_head( &mut self, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, ) -> Poll, DecodedLength, Wants)>>> { debug_assert!(self.can_read_head()); trace!("Conn::read_head"); @@ -294,7 +295,7 @@ where pub(crate) fn poll_read_body( &mut self, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, ) -> Poll>> { debug_assert!(self.can_read_body()); @@ -355,10 +356,7 @@ where ret } - pub(crate) fn poll_read_keep_alive( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll> { + pub(crate) fn poll_read_keep_alive(&mut self, cx: &mut Context<'_>) -> Poll> { debug_assert!(!self.can_read_head() && !self.can_read_body()); if self.is_read_closed() { @@ -381,7 +379,7 @@ where // // This should only be called for Clients wanting to enter the idle // state. - fn require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn require_empty_read(&mut self, cx: &mut Context<'_>) -> Poll> { debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed()); debug_assert!(!self.is_mid_message()); debug_assert!(T::is_client()); @@ -414,7 +412,7 @@ where Poll::Ready(Err(crate::Error::new_unexpected_message())) } - fn mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn mid_message_detect_eof(&mut self, cx: &mut Context<'_>) -> Poll> { debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed()); debug_assert!(self.is_mid_message()); @@ -433,7 +431,7 @@ where } } - fn force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn force_io_read(&mut self, cx: &mut Context<'_>) -> Poll> { debug_assert!(!self.state.is_read_closed()); let result = ready!(self.io.poll_read_from_io(cx)); @@ -444,7 +442,7 @@ where })) } - fn maybe_notify(&mut self, cx: &mut task::Context<'_>) { + fn maybe_notify(&mut self, cx: &mut Context<'_>) { // its possible that we returned NotReady from poll() without having // exhausted the underlying Io. We would have done this when we // determined we couldn't keep reading until we knew how writing @@ -491,7 +489,7 @@ where } } - fn try_keep_alive(&mut self, cx: &mut task::Context<'_>) { + fn try_keep_alive(&mut self, cx: &mut Context<'_>) { self.state.try_keep_alive::(); self.maybe_notify(cx); } @@ -716,14 +714,14 @@ where Err(err) } - pub(crate) fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll> { + pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll> { ready!(Pin::new(&mut self.io).poll_flush(cx))?; self.try_keep_alive(cx); trace!("flushed({}): {:?}", T::LOG, self.state); Poll::Ready(Ok(())) } - pub(crate) fn poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll> { + pub(crate) fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll> { match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) { Ok(()) => { trace!("shut down IO complete"); @@ -737,7 +735,7 @@ where } /// If the read side can be cheaply drained, do so. Otherwise, close. - pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>) { + pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut Context<'_>) { if let Reading::Continue(ref decoder) = self.state.reading { // skip sending the 100-continue // just move forward to a read, in case a tiny body was included diff --git a/src/proto/h1/decode.rs b/src/proto/h1/decode.rs index 81ac3a95c3..9c7b4b39d1 100644 --- a/src/proto/h1/decode.rs +++ b/src/proto/h1/decode.rs @@ -1,12 +1,11 @@ use std::error::Error as StdError; use std::fmt; use std::io; +use std::task::{Context, Poll}; use std::usize; use bytes::Bytes; -use crate::common::{task, Poll}; - use super::io::MemRead; use super::DecodedLength; @@ -102,7 +101,7 @@ impl Decoder { pub(crate) fn decode( &mut self, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, body: &mut R, ) -> Poll> { trace!("decode; state={:?}", self.kind); @@ -184,7 +183,7 @@ macro_rules! byte ( impl ChunkedState { fn step( &self, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, body: &mut R, size: &mut u64, buf: &mut Option, @@ -206,7 +205,7 @@ impl ChunkedState { } } fn read_size( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, size: &mut u64, ) -> Poll> { @@ -251,7 +250,7 @@ impl ChunkedState { Poll::Ready(Ok(ChunkedState::Size)) } fn read_size_lws( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, ) -> Poll> { trace!("read_size_lws"); @@ -267,7 +266,7 @@ impl ChunkedState { } } fn read_extension( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, ) -> Poll> { trace!("read_extension"); @@ -287,7 +286,7 @@ impl ChunkedState { } } fn read_size_lf( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, size: u64, ) -> Poll> { @@ -309,7 +308,7 @@ impl ChunkedState { } fn read_body( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, rem: &mut u64, buf: &mut Option, @@ -343,7 +342,7 @@ impl ChunkedState { } } fn read_body_cr( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, ) -> Poll> { match byte!(rdr, cx) { @@ -355,7 +354,7 @@ impl ChunkedState { } } fn read_body_lf( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, ) -> Poll> { match byte!(rdr, cx) { @@ -368,7 +367,7 @@ impl ChunkedState { } fn read_trailer( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, ) -> Poll> { trace!("read_trailer"); @@ -378,7 +377,7 @@ impl ChunkedState { } } fn read_trailer_lf( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, ) -> Poll> { match byte!(rdr, cx) { @@ -391,7 +390,7 @@ impl ChunkedState { } fn read_end_cr( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, ) -> Poll> { match byte!(rdr, cx) { @@ -400,7 +399,7 @@ impl ChunkedState { } } fn read_end_lf( - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, rdr: &mut R, ) -> Poll> { match byte!(rdr, cx) { @@ -432,7 +431,7 @@ mod tests { use std::time::Duration; impl<'a> MemRead for &'a [u8] { - fn read_mem(&mut self, _: &mut task::Context<'_>, len: usize) -> Poll> { + fn read_mem(&mut self, _: &mut Context<'_>, len: usize) -> Poll> { let n = std::cmp::min(len, self.len()); if n > 0 { let (a, b) = self.split_at(n); @@ -446,7 +445,7 @@ mod tests { } impl<'a> MemRead for &'a mut (dyn Read + Unpin) { - fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll> { + fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll> { let mut v = vec![0; len]; let mut buf = ReadBuf::new(&mut v); ready!(Pin::new(self).poll_read(cx, buf.unfilled())?); @@ -456,7 +455,7 @@ mod tests { #[cfg(feature = "nightly")] impl MemRead for Bytes { - fn read_mem(&mut self, _: &mut task::Context<'_>, len: usize) -> Poll> { + fn read_mem(&mut self, _: &mut Context<'_>, len: usize) -> Poll> { let n = std::cmp::min(len, self.len()); let ret = self.split_to(n); Poll::Ready(Ok(ret)) diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 7083c46217..7190959cd7 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -1,4 +1,10 @@ -use std::{error::Error as StdError, marker::Unpin}; +use std::{ + error::Error as StdError, + future::Future, + marker::Unpin, + pin::Pin, + task::{Context, Poll}, +}; use crate::rt::{Read, Write}; use bytes::{Buf, Bytes}; @@ -6,7 +12,7 @@ use http::Request; use super::{Http1Transaction, Wants}; use crate::body::{Body, DecodedLength, Incoming as IncomingBody}; -use crate::common::{task, Future, Pin, Poll}; +use crate::common::task; use crate::proto::{BodyLength, Conn, Dispatched, MessageHead, RequestHead}; use crate::upgrade::OnUpgrade; @@ -25,11 +31,11 @@ pub(crate) trait Dispatch { type RecvItem; fn poll_msg( self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, ) -> Poll>>; fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>) -> crate::Result<()>; - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll>; + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; fn should_poll(&self) -> bool; } @@ -100,10 +106,7 @@ where /// /// This is useful for old-style HTTP upgrades, but ignores /// newer-style upgrade API. - pub(crate) fn poll_without_shutdown( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll> + pub(crate) fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll> where Self: Unpin, { @@ -116,7 +119,7 @@ where fn poll_catch( &mut self, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, should_shutdown: bool, ) -> Poll> { Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| { @@ -135,7 +138,7 @@ where fn poll_inner( &mut self, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, should_shutdown: bool, ) -> Poll> { T::update_date(); @@ -156,7 +159,7 @@ where } } - fn poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_loop(&mut self, cx: &mut Context<'_>) -> Poll> { // Limit the looping on this connection, in case it is ready far too // often, so that other futures don't starve. // @@ -186,7 +189,7 @@ where task::yield_now(cx).map(|never| match never {}) } - fn poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_read(&mut self, cx: &mut Context<'_>) -> Poll> { loop { if self.is_closing { return Poll::Ready(Ok(())); @@ -240,7 +243,7 @@ where } } - fn poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_read_head(&mut self, cx: &mut Context<'_>) -> Poll> { // can dispatch receive, or does it still care about other incoming message? match ready!(self.dispatch.poll_ready(cx)) { Ok(()) => (), @@ -297,7 +300,7 @@ where } } - fn poll_write(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll> { loop { if self.is_closing { return Poll::Ready(Ok(())); @@ -386,7 +389,7 @@ where } } - fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll> { self.conn.poll_flush(cx).map_err(|err| { debug!("error writing: {}", err); crate::Error::new_body_write(err) @@ -433,7 +436,7 @@ where type Output = crate::Result; #[inline] - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.poll_catch(cx, true) } } @@ -497,7 +500,7 @@ cfg_server! { fn poll_msg( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, ) -> Poll>> { let mut this = self.as_mut(); let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() { @@ -532,7 +535,7 @@ cfg_server! { Ok(()) } - fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll> { + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { if self.in_flight.is_some() { Poll::Pending } else { @@ -572,7 +575,7 @@ cfg_client! { fn poll_msg( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, ) -> Poll>> { let mut this = self.as_mut(); debug_assert!(!this.rx_closed); @@ -643,7 +646,7 @@ cfg_client! { } } - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { match self.callback { Some(ref mut cb) => match cb.poll_canceled(cx) { Poll::Ready(()) => { diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index 785f6c0221..3827e4ed15 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -5,13 +5,14 @@ use std::future::Future; use std::io::{self, IoSlice}; use std::marker::Unpin; use std::mem::MaybeUninit; +use std::pin::Pin; +use std::task::{Context, Poll}; use crate::rt::{Read, ReadBuf, Write}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use super::{Http1Transaction, ParseContext, ParsedMessage}; use crate::common::buf::BufList; -use crate::common::{task, Pin, Poll}; /// The initial buffer size allocated before trying to read from IO. pub(crate) const INIT_BUFFER_SIZE: usize = 8192; @@ -169,7 +170,7 @@ where pub(super) fn parse( &mut self, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, parse_ctx: ParseContext<'_>, ) -> Poll>> where @@ -237,10 +238,7 @@ where } } - pub(crate) fn poll_read_from_io( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll> { + pub(crate) fn poll_read_from_io(&mut self, cx: &mut Context<'_>) -> Poll> { self.read_blocked = false; let next = self.read_buf_strategy.next(); if self.read_buf_remaining_mut() < next { @@ -283,7 +281,7 @@ where self.read_blocked } - pub(crate) fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll> { + pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll> { if self.flush_pipeline && !self.read_buf.is_empty() { Poll::Ready(Ok(())) } else if self.write_buf.remaining() == 0 { @@ -323,7 +321,7 @@ where /// /// Since all buffered bytes are flattened into the single headers buffer, /// that skips some bookkeeping around using multiple buffers. - fn poll_flush_flattened(&mut self, cx: &mut task::Context<'_>) -> Poll> { + fn poll_flush_flattened(&mut self, cx: &mut Context<'_>) -> Poll> { loop { let n = ready!(Pin::new(&mut self.io).poll_write(cx, self.write_buf.headers.chunk()))?; debug!("flushed {} bytes", n); @@ -353,7 +351,7 @@ impl Unpin for Buffered {} // TODO: This trait is old... at least rename to PollBytes or something... pub(crate) trait MemRead { - fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll>; + fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll>; } impl MemRead for Buffered @@ -361,7 +359,7 @@ where T: Read + Write + Unpin, B: Buf, { - fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll> { + fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll> { if !self.read_buf.is_empty() { let n = std::cmp::min(len, self.read_buf.len()); Poll::Ready(Ok(self.read_buf.split_to(n).freeze())) diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index c6b24212e8..f25ce1a4e0 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -1,4 +1,11 @@ -use std::{convert::Infallible, marker::PhantomData, time::Duration}; +use std::{ + convert::Infallible, + future::Future, + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; use crate::rt::{Read, Write}; use bytes::Bytes; @@ -17,7 +24,6 @@ use crate::body::{Body, Incoming as IncomingBody}; use crate::client::dispatch::{Callback, SendWhen}; use crate::common::io::Compat; use crate::common::time::Time; -use crate::common::{task, Future, Pin, Poll}; use crate::ext::Protocol; use crate::headers; use crate::proto::h2::UpgradedSendStream; @@ -190,7 +196,7 @@ where { type Output = Result<(), h2::Error>; - fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); match this.ponger.poll(cx) { Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => { @@ -230,7 +236,7 @@ where { type Output = Result<(), ()>; - fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); if *this.is_terminated { @@ -298,7 +304,7 @@ where { type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); if !this.conn.is_terminated() { @@ -356,10 +362,7 @@ where { type Output = (); - fn poll( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll { let this = self.project(); match this { @@ -432,10 +435,7 @@ where { type Output = (); - fn poll( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll { let mut this = self.project(); match this.pipe.poll_unpin(cx) { @@ -461,7 +461,7 @@ where B::Error: Into>, T: Read + Write + Unpin, { - fn poll_pipe(&mut self, f: FutCtx, cx: &mut task::Context<'_>) { + fn poll_pipe(&mut self, f: FutCtx, cx: &mut Context<'_>) { let ping = self.ping.clone(); let send_stream = if !f.is_connect { @@ -530,7 +530,7 @@ where { type Output = Result, (crate::Error, Option>)>; - fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); let result = ready!(this.fut.poll(cx)); @@ -598,7 +598,7 @@ where { type Output = crate::Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { match ready!(self.h2_tx.poll_ready(cx)) { Ok(()) => (), diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index defc2512d6..15afe48d23 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -1,17 +1,19 @@ -use crate::rt::{Read, ReadBufCursor, Write}; +use std::error::Error as StdError; +use std::future::Future; +use std::io::{Cursor, IoSlice}; +use std::mem; +use std::pin::Pin; +use std::task::{Context, Poll}; + use bytes::{Buf, Bytes}; use h2::{Reason, RecvStream, SendStream}; use http::header::{HeaderName, CONNECTION, TE, TRAILER, TRANSFER_ENCODING, UPGRADE}; use http::HeaderMap; use pin_project_lite::pin_project; -use std::error::Error as StdError; -use std::io::{Cursor, IoSlice}; -use std::mem; -use std::task::Context; use crate::body::Body; -use crate::common::{task, Future, Pin, Poll}; use crate::proto::h2::ping::Recorder; +use crate::rt::{Read, ReadBufCursor, Write}; pub(crate) mod ping; @@ -115,7 +117,7 @@ where { type Output = crate::Result<()>; - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut me = self.project(); loop { // we don't have the next chunk of data yet, so just reserve 1 byte to make diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 0d830ad017..0337e22ea8 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -1,9 +1,10 @@ use std::error::Error as StdError; +use std::future::Future; use std::marker::Unpin; - +use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Duration; -use crate::rt::{Read, Write}; use bytes::Bytes; use h2::server::{Connection, Handshake, SendResponse}; use h2::{Reason, RecvStream}; @@ -12,14 +13,15 @@ use pin_project_lite::pin_project; use super::{ping, PipeToSendStream, SendBuf}; use crate::body::{Body, Incoming as IncomingBody}; +use crate::common::date; use crate::common::time::Time; -use crate::common::{date, task, Future, Pin, Poll}; use crate::ext::Protocol; use crate::headers; use crate::proto::h2::ping::Recorder; use crate::proto::h2::{H2Upgraded, UpgradedSendStream}; use crate::proto::Dispatched; use crate::rt::bounds::Http2ConnExec; +use crate::rt::{Read, Write}; use crate::service::HttpService; use crate::upgrade::{OnUpgrade, Pending, Upgraded}; @@ -189,7 +191,7 @@ where { type Output = crate::Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = &mut *self; loop { let next = match me.state { @@ -232,7 +234,7 @@ where { fn poll_server( &mut self, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, service: &mut S, exec: &mut E, ) -> Poll> @@ -320,7 +322,7 @@ where Poll::Ready(Err(self.closing.take().expect("polled after error"))) } - fn poll_ping(&mut self, cx: &mut task::Context<'_>) { + fn poll_ping(&mut self, cx: &mut Context<'_>) { if let Some((_, ref mut estimator)) = self.ping { match estimator.poll(cx) { Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => { @@ -410,7 +412,7 @@ where B::Error: Into>, E: Into>, { - fn poll2(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + fn poll2(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut me = self.project(); loop { let next = match me.state.as_mut().project() { @@ -505,7 +507,7 @@ where { type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.poll2(cx).map(|res| { if let Err(_e) = res { debug!("stream error: {}", _e); diff --git a/src/server/conn/http1.rs b/src/server/conn/http1.rs index ca7d919e81..d4b8924e9f 100644 --- a/src/server/conn/http1.rs +++ b/src/server/conn/http1.rs @@ -2,15 +2,17 @@ use std::error::Error as StdError; use std::fmt; +use std::future::Future; use std::marker::Unpin; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use std::time::Duration; use crate::rt::{Read, Write}; use bytes::Bytes; use crate::body::{Body, Incoming as IncomingBody}; -use crate::common::{task, Future, Pin, Poll}; use crate::proto; use crate::service::HttpService; use crate::{common::time::Time, rt::Timer}; @@ -133,7 +135,7 @@ where /// upgrade. Once the upgrade is completed, the connection would be "done", /// but it is not desired to actually shutdown the IO object. Instead you /// would take it back using `into_parts`. - pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll> + pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll> where S: Unpin, S::Future: Unpin, @@ -182,7 +184,7 @@ where { type Output = crate::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match ready!(Pin::new(&mut self.conn).poll(cx)) { Ok(done) => { match done { @@ -440,7 +442,7 @@ mod upgrades { { type Output = crate::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match ready!(Pin::new(&mut self.inner.as_mut().unwrap().conn).poll(cx)) { Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())), Ok(proto::Dispatched::Upgrade(pending)) => { diff --git a/src/server/conn/http2.rs b/src/server/conn/http2.rs index acb7247eb4..e412f0001f 100644 --- a/src/server/conn/http2.rs +++ b/src/server/conn/http2.rs @@ -2,15 +2,17 @@ use std::error::Error as StdError; use std::fmt; +use std::future::Future; use std::marker::Unpin; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use std::time::Duration; use crate::rt::{Read, Write}; use pin_project_lite::pin_project; use crate::body::{Body, Incoming as IncomingBody}; -use crate::common::{task, Future, Pin, Poll}; use crate::proto; use crate::rt::bounds::Http2ConnExec; use crate::service::HttpService; @@ -86,7 +88,7 @@ where { type Output = crate::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match ready!(Pin::new(&mut self.conn).poll(cx)) { Ok(_done) => { //TODO: the proto::h2::Server no longer needs to return diff --git a/src/service/http.rs b/src/service/http.rs index dbbdaa107b..dd1743168c 100644 --- a/src/service/http.rs +++ b/src/service/http.rs @@ -1,7 +1,7 @@ use std::error::Error as StdError; +use std::future::Future; use crate::body::Body; -use crate::common::Future; use crate::service::service::Service; use crate::{Request, Response}; diff --git a/src/service/util.rs b/src/service/util.rs index 710ba53543..3e017a782c 100644 --- a/src/service/util.rs +++ b/src/service/util.rs @@ -1,9 +1,9 @@ use std::error::Error as StdError; use std::fmt; +use std::future::Future; use std::marker::PhantomData; use crate::body::Body; -use crate::common::Future; use crate::service::service::Service; use crate::{Request, Response}; diff --git a/src/upgrade.rs b/src/upgrade.rs index 2e5d25959f..fb172b8b54 100644 --- a/src/upgrade.rs +++ b/src/upgrade.rs @@ -42,15 +42,17 @@ use std::any::TypeId; use std::error::Error as StdError; use std::fmt; +use std::future::Future; use std::io; use std::marker::Unpin; +use std::pin::Pin; +use std::task::{Context, Poll}; use crate::rt::{Read, ReadBufCursor, Write}; use bytes::Bytes; use tokio::sync::oneshot; use crate::common::io::Rewind; -use crate::common::{task, Future, Pin, Poll}; /// An upgraded HTTP connection. /// @@ -158,7 +160,7 @@ impl Upgraded { impl Read for Upgraded { fn poll_read( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, buf: ReadBufCursor<'_>, ) -> Poll> { Pin::new(&mut self.io).poll_read(cx, buf) @@ -168,7 +170,7 @@ impl Read for Upgraded { impl Write for Upgraded { fn poll_write( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { Pin::new(&mut self.io).poll_write(cx, buf) @@ -176,17 +178,17 @@ impl Write for Upgraded { fn poll_write_vectored( mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, + cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll> { Pin::new(&mut self.io).poll_write_vectored(cx, bufs) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.io).poll_flush(cx) } - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.io).poll_shutdown(cx) } @@ -217,7 +219,7 @@ impl OnUpgrade { impl Future for OnUpgrade { type Output = Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.rx { Some(ref mut rx) => Pin::new(rx).poll(cx).map(|res| match res { Ok(Ok(upgraded)) => Ok(upgraded), @@ -364,7 +366,7 @@ mod tests { impl Read for Mock { fn poll_read( self: Pin<&mut Self>, - _cx: &mut task::Context<'_>, + _cx: &mut Context<'_>, _buf: ReadBufCursor<'_>, ) -> Poll> { unreachable!("Mock::poll_read") @@ -374,21 +376,18 @@ mod tests { impl Write for Mock { fn poll_write( self: Pin<&mut Self>, - _: &mut task::Context<'_>, + _: &mut Context<'_>, buf: &[u8], ) -> Poll> { // panic!("poll_write shouldn't be called"); Poll::Ready(Ok(buf.len())) } - fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { unreachable!("Mock::poll_flush") } - fn poll_shutdown( - self: Pin<&mut Self>, - _cx: &mut task::Context<'_>, - ) -> Poll> { + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { unreachable!("Mock::poll_shutdown") } }