Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): allow !Send IO with HTTP/1 client #3371

Merged
merged 1 commit into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/single_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async fn http1_server() -> Result<(), Box<dyn std::error::Error>> {
loop {
let (stream, _) = listener.accept().await?;

let io = TokioIo::new(stream);
let io = IOTypeNotSend::new(TokioIo::new(stream));

let cnt = counter.clone();

Expand Down Expand Up @@ -166,7 +166,7 @@ async fn http1_client(url: hyper::Uri) -> Result<(), Box<dyn std::error::Error>>
let addr = format!("{}:{}", host, port);
let stream = TcpStream::connect(addr).await?;

let io = TokioIo::new(stream);
let io = IOTypeNotSend::new(TokioIo::new(stream));

let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;

Expand Down
3 changes: 2 additions & 1 deletion examples/upgrades.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ async fn client_upgrade_request(addr: SocketAddr) -> Result<()> {
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;

tokio::task::spawn(async move {
if let Err(err) = conn.await {
// Don't forget to enable upgrades on the connection.
if let Err(err) = conn.with_upgrades().await {
println!("Connection failed: {:?}", err);
}
});
Expand Down
108 changes: 79 additions & 29 deletions src/client/conn/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use httparse::ParserConfig;
use super::super::dispatch;
use crate::body::{Body, Incoming as IncomingBody};
use crate::proto;
use crate::upgrade::Upgraded;

type Dispatcher<T, B> =
proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
Expand Down Expand Up @@ -51,23 +50,23 @@ pub struct Parts<T> {
#[must_use = "futures do nothing unless polled"]
pub struct Connection<T, B>
where
T: Read + Write + Send + 'static,
T: Read + Write + 'static,
B: Body + 'static,
{
inner: Option<Dispatcher<T, B>>,
inner: Dispatcher<T, B>,
}

impl<T, B> Connection<T, B>
where
T: Read + Write + Send + Unpin + 'static,
T: Read + Write + Unpin + 'static,
B: Body + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
/// Return the inner IO object, and additional information.
///
/// Only works for HTTP/1 connections. HTTP/2 connections will panic.
pub fn into_parts(self) -> Parts<T> {
let (io, read_buf, _) = self.inner.expect("already upgraded").into_inner();
let (io, read_buf, _) = self.inner.into_inner();
Parts {
io,
read_buf,
Expand All @@ -87,10 +86,7 @@ where
/// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
/// to work with this function; or use the `without_shutdown` wrapper.
pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
self.inner
.as_mut()
.expect("already upgraded")
.poll_without_shutdown(cx)
self.inner.poll_without_shutdown(cx)
}
}

Expand Down Expand Up @@ -119,7 +115,7 @@ pub struct Builder {
/// See [`client::conn`](crate::client::conn) for more.
pub async fn handshake<T, B>(io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
where
T: Read + Write + Unpin + Send + 'static,
T: Read + Write + Unpin + 'static,
B: Body + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
Expand Down Expand Up @@ -240,9 +236,23 @@ impl<B> fmt::Debug for SendRequest<B> {

// ===== impl Connection

impl<T, B> Connection<T, B>
where
T: Read + Write + Unpin + Send + 'static,
B: Body + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
/// Enable this connection to support higher-level HTTP upgrades.
///
/// See [the `upgrade` module](crate::upgrade) for more.
pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<T, B> {
upgrades::UpgradeableConnection { inner: Some(self) }
}
}

impl<T, B> fmt::Debug for Connection<T, B>
where
T: Read + Write + fmt::Debug + Send + 'static,
T: Read + Write + fmt::Debug + 'static,
B: Body + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand All @@ -252,27 +262,24 @@ where

impl<T, B> Future for Connection<T, B>
where
T: Read + Write + Unpin + Send + 'static,
T: Read + Write + Unpin + 'static,
B: Body + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Output = crate::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? {
match ready!(Pin::new(&mut self.inner).poll(cx))? {
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
proto::Dispatched::Upgrade(pending) => match self.inner.take() {
Some(h1) => {
let (io, buf, _) = h1.into_inner();
pending.fulfill(Upgraded::new(io, buf));
Poll::Ready(Ok(()))
}
_ => {
drop(pending);
unreachable!("Upgraded twice");
}
},
proto::Dispatched::Upgrade(pending) => {
// With no `Send` bound on `I`, we can't try to do
// upgrades here. In case a user was trying to use
// `upgrade` with this API, send a special
// error letting them know about that.
pending.manual();
Poll::Ready(Ok(()))
}
}
}
}
Expand Down Expand Up @@ -474,7 +481,7 @@ impl Builder {
io: T,
) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
where
T: Read + Write + Unpin + Send + 'static,
T: Read + Write + Unpin + 'static,
B: Body + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
Expand Down Expand Up @@ -518,10 +525,53 @@ impl Builder {
let cd = proto::h1::dispatch::Client::new(rx);
let proto = proto::h1::Dispatcher::new(cd, conn);

Ok((
SendRequest { dispatch: tx },
Connection { inner: Some(proto) },
))
Ok((SendRequest { dispatch: tx }, Connection { inner: proto }))
}
}
}

mod upgrades {
use crate::upgrade::Upgraded;

use super::*;

// A future binding a connection with a Service with Upgrade support.
//
// This type is unnameable outside the crate.
#[must_use = "futures do nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct UpgradeableConnection<T, B>
where
T: Read + Write + Unpin + Send + 'static,
B: Body + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
pub(super) inner: Option<Connection<T, B>>,
}

impl<I, B> Future for UpgradeableConnection<I, B>
where
I: Read + Write + Unpin + Send + 'static,
B: Body + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Output = crate::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(&mut self.inner.as_mut().unwrap().inner).poll(cx)) {
Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
Ok(proto::Dispatched::Upgrade(pending)) => {
let Parts {
io,
read_buf,
_inner,
} = self.inner.take().unwrap().into_parts();
pending.fulfill(Upgraded::new(io, read_buf));
Poll::Ready(Ok(()))
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
}