Skip to content

Commit

Permalink
Merge branch 'master' into check-external-types
Browse files Browse the repository at this point in the history
  • Loading branch information
palango authored Aug 23, 2023
2 parents 0be71fb + fece9f7 commit 20a097a
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 34 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ authors = ["Sean McArthur <[email protected]>"]
keywords = ["http", "hyper", "hyperium"]
categories = ["network-programming", "web-programming::http-client", "web-programming::http-server"]
edition = "2018"
rust-version = "1.56" # keep in sync with MSRV.md dev doc
rust-version = "1.63" # keep in sync with MSRV.md dev doc

include = [
"Cargo.toml",
Expand Down
2 changes: 1 addition & 1 deletion docs/MSRV.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ hyper. It is possible that an older compiler can work, but that is not
guaranteed. We try to increase the MSRV responsibly, only when a significant
new feature is needed.

The current MSRV is: **1.56**.
The current MSRV is: **1.63**.
11 changes: 10 additions & 1 deletion src/body/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,16 @@ impl Body for Incoming {
ping.record_data(bytes.len());
return Poll::Ready(Some(Ok(Frame::data(bytes))));
}
Some(Err(e)) => return Poll::Ready(Some(Err(crate::Error::new_body(e)))),
Some(Err(e)) => {
return match e.reason() {
// These reasons should cause the body reading to stop, but not fail it.
// The same logic as for `Read for H2Upgraded` is applied here.
Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => {
Poll::Ready(None)
}
_ => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
};
}
None => {
*data_done = true;
// fall through to trailers
Expand Down
2 changes: 1 addition & 1 deletion src/ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
//! `cargo`, staring with `1.64.0`, it can be compiled with the following command:
//!
//! ```notrust
//! RUSTFLAGS="--cfg hyper_unstable_ffi" cargo rustc --features client,http1,http2,ffi --crate-type cdylib
//! RUSTFLAGS="--cfg hyper_unstable_ffi" cargo rustc --crate-type cdylib --features client,http1,http2,ffi
//! ```
// We may eventually allow the FFI to be enabled without `client` or `http1`,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub mod service;
pub mod upgrade;

#[cfg(feature = "ffi")]
#[cfg_attr(docsrs, doc(cfg(all(feature = "ffi", hyper_unstable_ffi))))]
pub mod ffi;

cfg_proto! {
Expand Down
77 changes: 49 additions & 28 deletions src/proto/h2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::rt::{Read, Write};
use bytes::Bytes;
use futures_channel::mpsc::{Receiver, Sender};
use futures_channel::{mpsc, oneshot};
use futures_util::future::{self, Either, FutureExt as _, Select};
use futures_util::future::{Either, FusedFuture, FutureExt as _};
use futures_util::stream::{StreamExt as _, StreamFuture};
use h2::client::{Builder, Connection, SendRequest};
use h2::SendStream;
Expand Down Expand Up @@ -143,7 +143,10 @@ where
} else {
(Either::Right(conn), ping::disabled())
};
let conn: ConnMapErr<T, B> = ConnMapErr { conn };
let conn: ConnMapErr<T, B> = ConnMapErr {
conn,
is_terminated: false,
};

exec.execute_h2_future(H2ClientFuture::Task {
task: ConnTask::new(conn, conn_drop_rx, cancel_tx),
Expand Down Expand Up @@ -218,6 +221,8 @@ pin_project! {
{
#[pin]
conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>,
#[pin]
is_terminated: bool,
}
}

Expand All @@ -229,10 +234,26 @@ where
type Output = Result<(), ()>;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
self.project()
.conn
.poll(cx)
.map_err(|e| debug!("connection error: {}", e))
let mut this = self.project();

if *this.is_terminated {
return Poll::Pending;
}
let polled = this.conn.poll(cx);
if polled.is_ready() {
*this.is_terminated = true;
}
polled.map_err(|e| debug!("connection error: {}", e))
}
}

impl<T, B> FusedFuture for ConnMapErr<T, B>
where
B: Body,
T: Read + Write + Unpin,
{
fn is_terminated(&self) -> bool {
self.is_terminated
}
}

Expand All @@ -245,10 +266,11 @@ pin_project! {
T: Unpin,
{
#[pin]
select: Select<ConnMapErr<T, B>, StreamFuture<Receiver<Never>>>,
drop_rx: StreamFuture<Receiver<Never>>,
#[pin]
cancel_tx: Option<oneshot::Sender<Never>>,
conn: Option<ConnMapErr<T, B>>,
#[pin]
conn: ConnMapErr<T, B>,
}
}

Expand All @@ -263,9 +285,9 @@ where
cancel_tx: oneshot::Sender<Never>,
) -> Self {
Self {
select: future::select(conn, drop_rx),
drop_rx,
cancel_tx: Some(cancel_tx),
conn: None,
conn,
}
}
}
Expand All @@ -280,25 +302,24 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

if let Some(conn) = this.conn {
conn.poll_unpin(cx).map(|_| ())
} else {
match ready!(this.select.poll_unpin(cx)) {
Either::Left((_, _)) => {
// ok or err, the `conn` has finished
return Poll::Ready(());
}
Either::Right((_, b)) => {
// mpsc has been dropped, hopefully polling
// the connection some more should start shutdown
// and then close
trace!("send_request dropped, starting conn shutdown");
drop(this.cancel_tx.take().expect("Future polled twice"));
this.conn = &mut Some(b);
return Poll::Pending;
}
}
if !this.conn.is_terminated() {
if let Poll::Ready(_) = this.conn.poll_unpin(cx) {
// ok or err, the `conn` has finished.
return Poll::Ready(());
};
}

if !this.drop_rx.is_terminated() {
if let Poll::Ready(_) = this.drop_rx.poll_unpin(cx) {
// mpsc has been dropped, hopefully polling
// the connection some more should start shutdown
// and then close.
trace!("send_request dropped, starting conn shutdown");
drop(this.cancel_tx.take().expect("ConnTask Future polled twice"));
}
};

Poll::Pending
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub(crate) enum BodyLength {
Unknown,
}

/// Status of when a Disaptcher future completes.
/// Status of when a Dispatcher future completes.
pub(crate) enum Dispatched {
/// Dispatcher completely shutdown connection.
Shutdown,
Expand Down
58 changes: 57 additions & 1 deletion tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,7 @@ mod conn {
use bytes::{Buf, Bytes};
use futures_channel::{mpsc, oneshot};
use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt};
use http_body_util::{BodyExt, Empty, StreamBody};
use http_body_util::{BodyExt, Empty, Full, StreamBody};
use hyper::rt::Timer;
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
use tokio::net::{TcpListener as TkTcpListener, TcpStream};
Expand Down Expand Up @@ -2126,6 +2126,62 @@ mod conn {
.expect("client should be open");
}

#[tokio::test]
async fn http2_responds_before_consuming_request_body() {
// Test that a early-response from server works correctly (request body wasn't fully consumed).
// https://github.com/hyperium/hyper/issues/2872
use hyper::service::service_fn;

let _ = pretty_env_logger::try_init();

let (listener, addr) = setup_tk_test_server().await;

// Spawn an HTTP2 server that responds before reading the whole request body.
// It's normal case to decline the request due to headers or size of the body.
tokio::spawn(async move {
let sock = TokioIo::new(listener.accept().await.unwrap().0);
hyper::server::conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.serve_connection(
sock,
service_fn(|_req| async move {
Ok::<_, hyper::Error>(Response::new(Full::new(Bytes::from(
"No bread for you!",
))))
}),
)
.await
.expect("serve_connection");
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.handshake(io)
.await
.expect("http handshake");

tokio::spawn(async move {
conn.await.expect("client conn shouldn't error");
});

// Use a channel to keep request stream open
let (_tx, recv) = mpsc::channel::<Result<Frame<Bytes>, Box<dyn Error + Send + Sync>>>(0);
let req = Request::post("/a").body(StreamBody::new(recv)).unwrap();
let resp = client.send_request(req).await.expect("send_request");
assert!(resp.status().is_success());

let mut body = String::new();
concat(resp.into_body())
.await
.unwrap()
.reader()
.read_to_string(&mut body)
.unwrap();

assert_eq!(&body, "No bread for you!");
}

#[tokio::test]
async fn h2_connect() {
let (listener, addr) = setup_tk_test_server().await;
Expand Down

0 comments on commit 20a097a

Please sign in to comment.