diff --git a/src/filters/decompression.rs b/src/filters/decompression.rs new file mode 100644 index 000000000..b8a1d8ec5 --- /dev/null +++ b/src/filters/decompression.rs @@ -0,0 +1,286 @@ +//! Decompression Filters +//! +//! Filters that decompress the body of a request. + +#[cfg(feature = "compression-brotli")] +use async_compression::tokio::bufread::BrotliDecoder; + +#[cfg(feature = "compression-gzip")] +use async_compression::tokio::bufread::{DeflateDecoder, GzipDecoder}; + +use http::header::HeaderValue; +use hyper::{ + header::{CONTENT_ENCODING, CONTENT_LENGTH}, + Body, +}; +use tokio_util::io::{ReaderStream, StreamReader}; + +use crate::filter::{Filter, WrapSealed}; +use crate::reject::IsReject; +use crate::reply::{Reply, Response}; + +use self::internal::{CompressionProps, WithDecompression}; + +enum DecompressionAlgo { + #[cfg(feature = "compression-brotli")] + BR, + #[cfg(feature = "compression-gzip")] + DEFLATE, + #[cfg(feature = "compression-gzip")] + GZIP, +} + +impl From for HeaderValue { + #[inline] + fn from(algo: DecompressionAlgo) -> Self { + HeaderValue::from_static(match algo { + #[cfg(feature = "compression-brotli")] + DecompressionAlgo::BR => "br", + #[cfg(feature = "compression-gzip")] + DecompressionAlgo::DEFLATE => "deflate", + #[cfg(feature = "compression-gzip")] + DecompressionAlgo::GZIP => "gzip", + }) + } +} + +/// Decompression +#[derive(Clone, Copy, Debug)] +pub struct Decompression { + func: F, +} + +/// Create a wrapping filter that decompresses the Body of a [`Response`](crate::reply::Response) +/// using deflate, removing `content-encoding: deflate` from the Response's [`HeaderMap`](hyper::HeaderMap) +/// +/// # Example +/// +/// ``` +/// use warp::Filter; +/// +/// let route = warp::get() +/// .and(warp::path::end()) +/// .and(warp::fs::file("./README.md")) +/// .with(warp::decompression::deflate()); +/// ``` +#[cfg(feature = "compression-gzip")] +pub fn deflate() -> Decompression Response + Copy> { + let func = move |mut props: CompressionProps| { + let body = Body::wrap_stream(ReaderStream::new(DeflateDecoder::new(StreamReader::new( + props.body, + )))); + props + .head + .headers + .append(CONTENT_ENCODING, DecompressionAlgo::DEFLATE.into()); + props.head.headers.remove(CONTENT_LENGTH); + Response::from_parts(props.head, body) + }; + Decompression { func } +} + +/// Create a wrapping filter that decompresses the Body of a [`Response`](crate::reply::Response) +/// using brotli, removing `content-encoding: br` from the Response's [`HeaderMap`](hyper::HeaderMap) +/// +/// # Example +/// +/// ``` +/// use warp::Filter; +/// +/// let route = warp::get() +/// .and(warp::path::end()) +/// .and(warp::fs::file("./README.md")) +/// .with(warp::decompression::brotli()); +/// ``` +#[cfg(feature = "compression-brotli")] +pub fn brotli() -> Decompression Response + Copy> { + let func = move |mut props: CompressionProps| { + let body = Body::wrap_stream(ReaderStream::new(BrotliDecoder::new(StreamReader::new( + props.body, + )))); + props + .head + .headers + .append(CONTENT_ENCODING, DecompressionAlgo::BR.into()); + props.head.headers.remove(CONTENT_LENGTH); + Response::from_parts(props.head, body) + }; + Decompression { func } +} + +/// Create a wrapping filter that decompresses the Body of a [`Response`](crate::reply::Response) +/// using gzip, removing `content-encoding: gzip` from the Response's [`HeaderMap`](hyper::HeaderMap) +/// +/// # Example +/// +/// ``` +/// use warp::Filter; +/// +/// let route = warp::get() +/// .and(warp::path::end()) +/// .and(warp::fs::file("./README.md")) +/// .with(warp::decompression::gzip()); +/// ``` +#[cfg(feature = "compression-gzip")] +pub fn gzip() -> Decompression Response + Copy> { + let func = move |mut props: CompressionProps| { + let body = Body::wrap_stream(ReaderStream::new(GzipDecoder::new(StreamReader::new( + props.body, + )))); + props + .head + .headers + .append(CONTENT_ENCODING, DecompressionAlgo::GZIP.into()); + props.head.headers.remove(CONTENT_LENGTH); + Response::from_parts(props.head, body) + }; + Decompression { func } +} + +impl WrapSealed for Decompression +where + FN: Fn(CompressionProps) -> Response + Clone + Send, + F: Filter + Clone + Send, + F::Extract: Reply, + F::Error: IsReject, +{ + type Wrapped = WithDecompression; + + fn wrap(&self, filter: F) -> Self::Wrapped { + WithDecompression { + filter, + decompress: self.clone(), + } + } +} + +mod internal { + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + + use bytes::Bytes; + use futures_util::{ready, Stream, TryFuture}; + use hyper::Body; + use pin_project::pin_project; + + use crate::filter::{Filter, FilterBase, Internal}; + use crate::reject::IsReject; + use crate::reply::{Reply, Response}; + + use super::Decompression; + + #[pin_project] + #[derive(Debug)] + pub struct DecompressableBody + where + E: std::error::Error, + S: Stream>, + { + #[pin] + body: S, + } + + impl Stream for DecompressableBody + where + E: std::error::Error, + S: Stream>, + { + type Item = std::io::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use std::io::{Error, ErrorKind}; + + let pin = self.project(); + S::poll_next(pin.body, cx).map_err(|_| Error::from(ErrorKind::InvalidData)) + } + } + + impl From for DecompressableBody { + fn from(body: Body) -> Self { + DecompressableBody { body } + } + } + + #[derive(Debug)] + pub struct DecompressionProps { + pub(super) body: DecompressableBody, + pub(super) head: http::response::Parts, + } + + impl From> for DecompressionProps { + fn from(resp: http::Response) -> Self { + let (head, body) = resp.into_parts(); + DecompressionProps { + body: body.into(), + head, + } + } + } + + #[allow(missing_debug_implementations)] + pub struct Decompressed(pub(super) Response); + + impl Reply for Decompressed { + #[inline] + fn into_response(self) -> Response { + self.0 + } + } + + #[allow(missing_debug_implementations)] + #[derive(Clone, Copy)] + pub struct WithDecompression { + pub(super) decompress: Decompression, + pub(super) filter: F, + } + + impl FilterBase for WithDecompression + where + FN: Fn(DecompressionProps) -> Response + Clone + Send, + F: Filter + Clone + Send, + F::Extract: Reply, + F::Error: IsReject, + { + type Extract = (Decompressed,); + type Error = F::Error; + type Future = WithDecompressionFuture; + + fn filter(&self, _: Internal) -> Self::Future { + WithDecompressionFuture { + decompress: self.decompress.clone(), + future: self.filter.filter(Internal), + } + } + } + + #[allow(missing_debug_implementations)] + #[pin_project] + pub struct WithDecompressionFuture { + decompress: Decompression, + #[pin] + future: F, + } + + impl Future for WithDecompressionFuture + where + FN: Fn(DecompressionProps) -> Response, + F: TryFuture, + F::Ok: Reply, + F::Error: IsReject, + { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let pin = self.as_mut().project(); + let result = ready!(pin.future.try_poll(cx)); + match result { + Ok(reply) => { + let resp = (self.decompress.func)(reply.into_response().into()); + Poll::Ready(Ok((Decompressed(resp),))) + } + Err(reject) => Poll::Ready(Err(reject)), + } + } + } +}