Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: add a channel body #100

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions http-body-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,20 @@ Combinators and adapters for HTTP request or response bodies.
keywords = ["http"]
categories = ["web-programming"]

[features]
default = []
channel = ["dep:tokio"]
full = ["channel"]

[dependencies]
bytes = "1"
futures-util = { version = "0.3.14", default-features = false, features = ["alloc"] }
http = "1"
http-body = { version = "1", path = "../http-body" }
pin-project-lite = "0.2"

# optional dependencies
tokio = { version = "1", features = ["sync"], optional = true }

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt"] }
159 changes: 159 additions & 0 deletions http-body-util/src/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
//! A body backed by a channel.

use std::{
fmt::Display,
pin::Pin,
task::{Context, Poll},
};

use bytes::Buf;
use http::HeaderMap;
use http_body::{Body, Frame};
use tokio::sync::mpsc;

/// A body backed by a channel.
pub struct Channel<D, E = std::convert::Infallible> {
rx_frame: mpsc::Receiver<Frame<D>>,
rx_error: mpsc::Receiver<E>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a oneshot channel work? It should be a smaller struct, and use less memory wen sending the error (the mpsc channel reserves blocks).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first I thought it wouldn't be possible because one shot::Receiver doesn't have a poll_recv method but I suppose I can call Future::poll instead 🤔 I'll try that.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i ran into this issue myself, and noticed this thread here. i've opened tokio-rs/tokio#7059 to introduce a poll_recv(..) method for oneshot::Receiver<T>.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio-rs/tokio#7059 is now closed. to poll the receiver, it should be pinned and polled via Future::poll, rather than adding a poll_recv method matching mpsc::Receiver<T>.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is addressed in 08aba95

}

impl<D, E> Channel<D, E> {
/// Create a new channel body.
///
/// The channel will buffer up to the provided number of messages. Once the buffer is full,
/// attempts to send new messages will wait until a message is received from the channel. The
/// provided buffer capacity must be at least 1.
pub fn new(buffer: usize) -> (Sender<D, E>, Self) {
let (tx_frame, rx_frame) = mpsc::channel(buffer);
let (tx_error, rx_error) = mpsc::channel(1);
(Sender { tx_frame, tx_error }, Self { rx_frame, rx_error })
}
}

impl<D, E> Body for Channel<D, E>
where
D: Buf,
{
type Data = D;
type Error = E;

fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.rx_frame.poll_recv(cx) {
Poll::Ready(frame) => return Poll::Ready(frame.map(Ok)),
Poll::Pending => {}
}

match self.rx_error.poll_recv(cx) {
Poll::Ready(err) => return Poll::Ready(err.map(Err)),
Poll::Pending => {}
}

Poll::Pending
}
}

impl<D, E> std::fmt::Debug for Channel<D, E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Channel")
.field("rx_frame", &self.rx_frame)
.field("rx_error", &self.rx_error)
.finish()
}
}

/// A sender half created through [`Channel::new`].
pub struct Sender<D, E = std::convert::Infallible> {
tx_frame: mpsc::Sender<Frame<D>>,
tx_error: mpsc::Sender<E>,
}

impl<D, E> Sender<D, E> {
/// Send a frame on the channel.
pub async fn send(&self, frame: Frame<D>) -> Result<(), SendError> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though it's not required internally, should we make these all &mut self?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is addressed in 4fed62f

self.tx_frame.send(frame).await.map_err(|_| SendError)
}

/// Send data on data channel.
pub async fn send_data(&self, buf: D) -> Result<(), SendError> {
self.send(Frame::data(buf)).await
}

/// Send trailers on trailers channel.
pub async fn send_trailers(&self, trailers: HeaderMap) -> Result<(), SendError> {
self.send(Frame::trailers(trailers)).await
}

/// Aborts the body in an abnormal fashion.
pub fn abort(self, error: E) {
match self.tx_error.try_send(error) {
Ok(_) => {}
Err(err) => {
match err {
mpsc::error::TrySendError::Full(_) => {
// Channel::new creates the error channel with space for 1 message and we
// only send once because this method consumes `self`. So the receiver
// can't be full.
unreachable!("error receiver should never be full")
}
mpsc::error::TrySendError::Closed(_) => {}
}
}
}
}
}

impl<D, E> std::fmt::Debug for Sender<D, E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Sender")
.field("tx_frame", &self.tx_frame)
.field("tx_error", &self.tx_error)
.finish()
}
}

/// The error returned if [`Sender`] fails to send because the receiver is closed.
#[derive(Debug)]
#[non_exhaustive]
pub struct SendError;

impl Display for SendError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "failed to send frame")
}
}

impl std::error::Error for SendError {}

#[cfg(test)]
mod tests {
use bytes::Bytes;
use http::{HeaderName, HeaderValue};

use crate::BodyExt;

use super::*;

#[tokio::test]
async fn works() {
let (tx, body) = Channel::<Bytes>::new(1024);

tokio::spawn(async move {
tx.send_data(Bytes::from("Hel")).await.unwrap();
tx.send_data(Bytes::from("lo!")).await.unwrap();

let mut trailers = HeaderMap::new();
trailers.insert(
HeaderName::from_static("foo"),
HeaderValue::from_static("bar"),
);
tx.send_trailers(trailers).await.unwrap();
});

let collected = body.collect().await.unwrap();
assert_eq!(collected.trailers().unwrap()["foo"], "bar");
assert_eq!(collected.to_bytes(), "Hello!");
}
}
6 changes: 6 additions & 0 deletions http-body-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ mod full;
mod limited;
mod stream;

#[cfg(feature = "channel")]
pub mod channel;

mod util;

use self::combinators::{BoxBody, MapErr, MapFrame, UnsyncBoxBody};
Expand All @@ -31,6 +34,9 @@ pub use self::full::Full;
pub use self::limited::{LengthLimitError, Limited};
pub use self::stream::{BodyStream, StreamBody};

#[cfg(feature = "channel")]
pub use self::channel::Channel;

/// An extension trait for [`http_body::Body`] adding various combinators and adapters
pub trait BodyExt: http_body::Body {
/// Returns a future that resolves to the next [`Frame`], if any.
Expand Down