Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(common): remove common re-export #3346

Merged
merged 1 commit into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/body/incoming.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::Bytes;
use futures_channel::mpsc;
Expand All @@ -8,8 +11,7 @@ use http::HeaderMap;
use http_body::{Body, Frame, SizeHint};

use super::DecodedLength;
use crate::common::Future;
use crate::common::{task, watch, Pin, Poll};
use crate::common::watch;
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
use crate::proto::h2::ping;

Expand Down Expand Up @@ -157,7 +159,7 @@ impl Body for Incoming {

fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.kind {
Kind::Empty => Poll::Ready(None),
Expand Down Expand Up @@ -287,15 +289,15 @@ impl fmt::Debug for Incoming {

impl Sender {
/// Check to see if this `Sender` can send more data.
pub(crate) fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
// Check if the receiver end has tried polling for the body yet
ready!(self.poll_want(cx)?);
self.data_tx
.poll_ready(cx)
.map_err(|_| crate::Error::new_closed())
}

fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
match self.want_rx.load(cx) {
WANT_READY => Poll::Ready(Ok(())),
WANT_PENDING => Poll::Pending,
Expand Down
10 changes: 6 additions & 4 deletions src/client/conn/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
use std::error::Error as StdError;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::rt::{Read, Write};
use bytes::Bytes;
Expand All @@ -10,7 +13,6 @@ use httparse::ParserConfig;

use super::super::dispatch;
use crate::body::{Body, Incoming as IncomingBody};
use crate::common::{task, Future, Pin, Poll};
use crate::proto;
use crate::upgrade::Upgraded;

Expand Down Expand Up @@ -84,7 +86,7 @@ where
/// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
/// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
/// to work with this function; or use the `without_shutdown` wrapper.
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
self.inner
.as_mut()
.expect("already upgraded")
Expand Down Expand Up @@ -128,7 +130,7 @@ impl<B> SendRequest<B> {
/// Polls to determine whether this sender can be used yet for a request.
///
/// If the associated connection is closed, this returns an Error.
pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
self.dispatch.poll_ready(cx)
}

Expand Down Expand Up @@ -254,7 +256,7 @@ where
{
type Output = crate::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? {
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
proto::Dispatched::Upgrade(pending) => match self.inner.take() {
Expand Down
8 changes: 5 additions & 3 deletions src/client/conn/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
use std::error::Error;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use crate::rt::{Read, Write};
Expand All @@ -12,7 +15,6 @@ use http::{Request, Response};
use super::super::dispatch;
use crate::body::{Body, Incoming as IncomingBody};
use crate::common::time::Time;
use crate::common::{task, Future, Pin, Poll};
use crate::proto;
use crate::rt::bounds::ExecutorClient;
use crate::rt::Timer;
Expand Down Expand Up @@ -79,7 +81,7 @@ impl<B> SendRequest<B> {
/// Polls to determine whether this sender can be used yet for a request.
///
/// If the associated connection is closed, this returns an Error.
pub fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
pub fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
if self.is_closed() {
Poll::Ready(Err(crate::Error::new_closed()))
} else {
Expand Down Expand Up @@ -236,7 +238,7 @@ where
{
type Output = crate::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(&mut self.inner.1).poll(cx))? {
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
#[cfg(feature = "http1")]
Expand Down
19 changes: 8 additions & 11 deletions src/client/dispatch.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::task::{Context, Poll};
#[cfg(feature = "http2")]
use std::future::Future;
use std::{future::Future, pin::Pin};

#[cfg(feature = "http2")]
use http::{Request, Response};
Expand All @@ -9,9 +10,8 @@ use http_body::Body;
use pin_project_lite::pin_project;
use tokio::sync::{mpsc, oneshot};

use crate::common::{task, Poll};
#[cfg(feature = "http2")]
use crate::{body::Incoming, common::Pin, proto::h2::client::ResponseFutMap};
use crate::{body::Incoming, proto::h2::client::ResponseFutMap};

#[cfg(test)]
pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, (crate::Error, Option<T>)>>;
Expand Down Expand Up @@ -62,7 +62,7 @@ pub(crate) struct UnboundedSender<T, U> {

impl<T, U> Sender<T, U> {
#[cfg(feature = "http1")]
pub(crate) fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
self.giver
.poll_want(cx)
.map_err(|_| crate::Error::new_closed())
Expand Down Expand Up @@ -169,10 +169,7 @@ pub(crate) struct Receiver<T, U> {
}

impl<T, U> Receiver<T, U> {
pub(crate) fn poll_recv(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Option<(T, Callback<T, U>)>> {
pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<(T, Callback<T, U>)>> {
match self.inner.poll_recv(cx) {
Poll::Ready(item) => {
Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped")))
Expand Down Expand Up @@ -261,7 +258,7 @@ impl<T, U> Callback<T, U> {
}
}

pub(crate) fn poll_canceled(&mut self, cx: &mut task::Context<'_>) -> Poll<()> {
pub(crate) fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> {
match *self {
Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx),
Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx),
Expand Down Expand Up @@ -302,7 +299,7 @@ where
{
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

let mut call_back = this.call_back.take().expect("polled after complete");
Expand All @@ -319,7 +316,7 @@ where
Poll::Pending => {
// Move call_back back to struct before return
this.call_back.set(Some(call_back));
return std::task::Poll::Pending;
return Poll::Pending;
}
};
trace!("send_when canceled");
Expand Down
13 changes: 7 additions & 6 deletions src/common/io/rewind.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::marker::Unpin;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{cmp, io};

use bytes::{Buf, Bytes};

use crate::common::{task, Pin, Poll};
use crate::rt::{Read, ReadBufCursor, Write};

/// Combine a buffer with an IO, rewinding reads to use the buffer.
Expand Down Expand Up @@ -50,7 +51,7 @@ where
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
cx: &mut Context<'_>,
mut buf: ReadBufCursor<'_>,
) -> Poll<io::Result<()>> {
if let Some(mut prefix) = self.pre.take() {
Expand Down Expand Up @@ -78,25 +79,25 @@ where
{
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.inner).poll_write(cx, buf)
}

fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_flush(cx)
}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
}

Expand Down
5 changes: 0 additions & 5 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,3 @@ pub(crate) mod task;
))]
pub(crate) mod time;
pub(crate) mod watch;

pub(crate) use self::task::Poll;

// group up types normally needed for `Future`
pub(crate) use std::{future::Future, pin::Pin};
3 changes: 2 additions & 1 deletion src/common/task.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub(crate) use std::task::{Context, Poll};
#[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))]
use std::task::{Context, Poll};

/// A function to help "yield" a future, such that it is re-scheduled immediately.
///
Expand Down
28 changes: 13 additions & 15 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::fmt;
use std::io;
use std::marker::{PhantomData, Unpin};
use std::pin::Pin;
use std::task::{Context, Poll};
#[cfg(feature = "server")]
use std::time::Duration;

Expand All @@ -15,7 +17,6 @@ use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext
use crate::body::DecodedLength;
#[cfg(feature = "server")]
use crate::common::time::Time;
use crate::common::{task, Pin, Poll};
use crate::headers::connection_keep_alive;
use crate::proto::{BodyLength, MessageHead};
#[cfg(feature = "server")]
Expand Down Expand Up @@ -193,7 +194,7 @@ where

pub(super) fn poll_read_head(
&mut self,
cx: &mut task::Context<'_>,
cx: &mut Context<'_>,
) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> {
debug_assert!(self.can_read_head());
trace!("Conn::read_head");
Expand Down Expand Up @@ -294,7 +295,7 @@ where

pub(crate) fn poll_read_body(
&mut self,
cx: &mut task::Context<'_>,
cx: &mut Context<'_>,
) -> Poll<Option<io::Result<Bytes>>> {
debug_assert!(self.can_read_body());

Expand Down Expand Up @@ -355,10 +356,7 @@ where
ret
}

pub(crate) fn poll_read_keep_alive(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<crate::Result<()>> {
pub(crate) fn poll_read_keep_alive(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
debug_assert!(!self.can_read_head() && !self.can_read_body());

if self.is_read_closed() {
Expand All @@ -381,7 +379,7 @@ where
//
// This should only be called for Clients wanting to enter the idle
// state.
fn require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
fn require_empty_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
debug_assert!(!self.is_mid_message());
debug_assert!(T::is_client());
Expand Down Expand Up @@ -414,7 +412,7 @@ where
Poll::Ready(Err(crate::Error::new_unexpected_message()))
}

fn mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
fn mid_message_detect_eof(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
debug_assert!(self.is_mid_message());

Expand All @@ -433,7 +431,7 @@ where
}
}

fn force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>> {
fn force_io_read(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
debug_assert!(!self.state.is_read_closed());

let result = ready!(self.io.poll_read_from_io(cx));
Expand All @@ -444,7 +442,7 @@ where
}))
}

fn maybe_notify(&mut self, cx: &mut task::Context<'_>) {
fn maybe_notify(&mut self, cx: &mut Context<'_>) {
// its possible that we returned NotReady from poll() without having
// exhausted the underlying Io. We would have done this when we
// determined we couldn't keep reading until we knew how writing
Expand Down Expand Up @@ -491,7 +489,7 @@ where
}
}

fn try_keep_alive(&mut self, cx: &mut task::Context<'_>) {
fn try_keep_alive(&mut self, cx: &mut Context<'_>) {
self.state.try_keep_alive::<T>();
self.maybe_notify(cx);
}
Expand Down Expand Up @@ -716,14 +714,14 @@ where
Err(err)
}

pub(crate) fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
ready!(Pin::new(&mut self.io).poll_flush(cx))?;
self.try_keep_alive(cx);
trace!("flushed({}): {:?}", T::LOG, self.state);
Poll::Ready(Ok(()))
}

pub(crate) fn poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
pub(crate) fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) {
Ok(()) => {
trace!("shut down IO complete");
Expand All @@ -737,7 +735,7 @@ where
}

/// If the read side can be cheaply drained, do so. Otherwise, close.
pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>) {
pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut Context<'_>) {
if let Reading::Continue(ref decoder) = self.state.reading {
// skip sending the 100-continue
// just move forward to a read, in case a tiny body was included
Expand Down
Loading