diff --git a/src/common/io/compat.rs b/src/common/io/compat.rs index 3320e4ff44..d026b6d38b 100644 --- a/src/common/io/compat.rs +++ b/src/common/io/compat.rs @@ -7,11 +7,11 @@ use std::task::{Context, Poll}; #[derive(Debug)] pub(crate) struct Compat(pub(crate) T); -pub(crate) fn compat(io: T) -> Compat { - Compat(io) -} - impl Compat { + pub(crate) fn new(io: T) -> Self { + Compat(io) + } + fn p(self: Pin<&mut Self>) -> Pin<&mut T> { // SAFETY: The simplest of projections. This is just // a wrapper, we don't do anything that would undo the projection. diff --git a/src/common/io/mod.rs b/src/common/io/mod.rs index 92f71c4ee6..98c297ca14 100644 --- a/src/common/io/mod.rs +++ b/src/common/io/mod.rs @@ -3,5 +3,5 @@ mod compat; mod rewind; #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))] -pub(crate) use self::compat::{compat, Compat}; +pub(crate) use self::compat::Compat; pub(crate) use self::rewind::Rewind; diff --git a/src/common/io/rewind.rs b/src/common/io/rewind.rs index 1a5567ae80..2307b25958 100644 --- a/src/common/io/rewind.rs +++ b/src/common/io/rewind.rs @@ -108,7 +108,7 @@ where #[cfg(test)] mod tests { - use super::super::compat; + use super::super::Compat; use super::Rewind; use bytes::Bytes; use tokio::io::AsyncReadExt; @@ -120,7 +120,7 @@ mod tests { let mock = tokio_test::io::Builder::new().read(&underlying).build(); - let mut stream = compat(Rewind::new(compat(mock))); + let mut stream = Compat::new(Rewind::new(Compat::new(mock))); // Read off some bytes, ensure we filled o1 let mut buf = [0; 2]; @@ -143,7 +143,7 @@ mod tests { let mock = tokio_test::io::Builder::new().read(&underlying).build(); - let mut stream = compat(Rewind::new(compat(mock))); + let mut stream = Compat::new(Rewind::new(Compat::new(mock))); let mut buf = [0; 5]; stream.read_exact(&mut buf).await.expect("read1"); diff --git a/src/proto/h1/decode.rs b/src/proto/h1/decode.rs index 9c7b4b39d1..4e316a7f78 100644 --- a/src/proto/h1/decode.rs +++ b/src/proto/h1/decode.rs @@ -627,7 +627,7 @@ mod tests { async fn read_async(mut decoder: Decoder, content: &[u8], block_at: usize) -> String { let mut outs = Vec::new(); - let mut ins = crate::common::io::compat(if block_at == 0 { + let mut ins = crate::common::io::Compat::new(if block_at == 0 { tokio_test::io::Builder::new() .wait(Duration::from_millis(10)) .read(content) diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 7190959cd7..c29c15dcae 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -668,7 +668,7 @@ cfg_client! { #[cfg(test)] mod tests { use super::*; - use crate::common::io::compat; + use crate::common::io::Compat; use crate::proto::h1::ClientTransaction; use std::time::Duration; @@ -682,7 +682,7 @@ mod tests { // Block at 0 for now, but we will release this response before // the request is ready to write later... let (mut tx, rx) = crate::client::dispatch::channel(); - let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(compat(io)); + let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(Compat::new(io)); let mut dispatcher = Dispatcher::new(Client::new(rx), conn); // First poll is needed to allow tx to send... @@ -719,7 +719,7 @@ mod tests { .build_with_handle(); let (mut tx, rx) = crate::client::dispatch::channel(); - let mut conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(compat(io)); + let mut conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(Compat::new(io)); conn.set_write_strategy_queue(); let dispatcher = Dispatcher::new(Client::new(rx), conn); @@ -750,7 +750,7 @@ mod tests { .build(); let (mut tx, rx) = crate::client::dispatch::channel(); - let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(compat(io)); + let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(Compat::new(io)); let mut dispatcher = tokio_test::task::spawn(Dispatcher::new(Client::new(rx), conn)); // First poll is needed to allow tx to send... diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index 3827e4ed15..d0c0cba3bf 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -659,7 +659,7 @@ enum WriteStrategy { #[cfg(test)] mod tests { - use crate::common::io::compat; + use crate::common::io::Compat; use crate::common::time::Time; use super::*; @@ -715,7 +715,7 @@ mod tests { .wait(Duration::from_secs(1)) .build(); - let mut buffered = Buffered::<_, Cursor>>::new(compat(mock)); + let mut buffered = Buffered::<_, Cursor>>::new(Compat::new(mock)); // We expect a `parse` to be not ready, and so can't await it directly. // Rather, this `poll_fn` will wrap the `Poll` result. @@ -860,7 +860,7 @@ mod tests { #[cfg(debug_assertions)] // needs to trigger a debug_assert fn write_buf_requires_non_empty_bufs() { let mock = Mock::new().build(); - let mut buffered = Buffered::<_, Cursor>>::new(compat(mock)); + let mut buffered = Buffered::<_, Cursor>>::new(Compat::new(mock)); buffered.buffer(Cursor::new(Vec::new())); } @@ -895,7 +895,7 @@ mod tests { let mock = Mock::new().write(b"hello world, it's hyper!").build(); - let mut buffered = Buffered::<_, Cursor>>::new(compat(mock)); + let mut buffered = Buffered::<_, Cursor>>::new(Compat::new(mock)); buffered.write_buf.set_strategy(WriteStrategy::Flatten); buffered.headers_buf().extend(b"hello "); @@ -954,7 +954,7 @@ mod tests { .write(b"hyper!") .build(); - let mut buffered = Buffered::<_, Cursor>>::new(compat(mock)); + let mut buffered = Buffered::<_, Cursor>>::new(Compat::new(mock)); buffered.write_buf.set_strategy(WriteStrategy::Queue); // we have 4 buffers, and vec IO disabled, but explicitly said diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index f25ce1a4e0..f5104f1f66 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -122,7 +122,7 @@ where B::Error: Into>, { let (h2_tx, mut conn) = new_builder(config) - .handshake::<_, SendBuf>(crate::common::io::compat(io)) + .handshake::<_, SendBuf>(Compat::new(io)) .await .map_err(crate::Error::new_h2)?; diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 12103c3a58..0c6c1472a2 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -14,6 +14,7 @@ use pin_project_lite::pin_project; use super::{ping, PipeToSendStream, SendBuf}; use crate::body::{Body, Incoming as IncomingBody}; use crate::common::date; +use crate::common::io::Compat; use crate::common::time::Time; use crate::ext::Protocol; use crate::headers; @@ -90,7 +91,7 @@ where { Handshaking { ping_config: ping::Config, - hs: Handshake, SendBuf>, + hs: Handshake, SendBuf>, }, Serving(Serving), Closed, @@ -101,7 +102,7 @@ where B: Body, { ping: Option<(ping::Recorder, ping::Ponger)>, - conn: Connection, SendBuf>, + conn: Connection, SendBuf>, closing: Option, } @@ -133,7 +134,7 @@ where if config.enable_connect_protocol { builder.enable_connect_protocol(); } - let handshake = builder.handshake(crate::common::io::compat(io)); + let handshake = builder.handshake(Compat::new(io)); let bdp = if config.adaptive_window { Some(config.initial_stream_window_size)