Skip to content

Commit

Permalink
Implemented Filter::map_async
Browse files Browse the repository at this point in the history
  • Loading branch information
gtsiam committed Jul 5, 2021
1 parent 5a5dba0 commit 7ef4b6d
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 4 deletions.
6 changes: 3 additions & 3 deletions src/filter/and_then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ where

#[allow(missing_debug_implementations)]
#[pin_project]
pub struct AndThenFuture<T: Filter, F>
pub struct AndThenFuture<T, F>
where
T: Filter,
F: Func<T::Extract>,
Expand Down Expand Up @@ -96,12 +96,12 @@ where
self.set(State::Second(fut2));
}
StateProj::Second(second) => {
let ex3 = match ready!(second.try_poll(cx)) {
let ex2 = match ready!(second.try_poll(cx)) {
Ok(item) => Ok((item,)),
Err(err) => Err(From::from(err)),
};
self.set(State::Done);
return Poll::Ready(ex3);
return Poll::Ready(ex2);
}
StateProj::Done => panic!("polled after complete"),
}
Expand Down
95 changes: 95 additions & 0 deletions src/filter/map_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::{ready, TryFuture};
use pin_project::pin_project;

use super::{Filter, FilterBase, Func, Internal};

#[derive(Clone, Copy, Debug)]
pub struct MapAsync<T, F> {
pub(super) filter: T,
pub(super) callback: F,
}

impl<T, F> FilterBase for MapAsync<T, F>
where
T: Filter,
F: Func<T::Extract> + Clone + Send,
F::Output: Future + Send,
{
type Extract = (<F::Output as Future>::Output,);
type Error = T::Error;
type Future = MapAsyncFuture<T, F>;
#[inline]
fn filter(&self, _: Internal) -> Self::Future {
MapAsyncFuture {
state: State::First(self.filter.filter(Internal), self.callback.clone()),
}
}
}

#[allow(missing_debug_implementations)]
#[pin_project]
pub struct MapAsyncFuture<T, F>
where
T: Filter,
F: Func<T::Extract>,
F::Output: Future + Send,
{
#[pin]
state: State<T::Future, F>,
}

#[pin_project(project = StateProj)]
enum State<T, F>
where
T: TryFuture,
F: Func<T::Ok>,
F::Output: Future + Send,
{
First(#[pin] T, F),
Second(#[pin] F::Output),
Done,
}

impl<T, F> Future for MapAsyncFuture<T, F>
where
T: Filter,
F: Func<T::Extract>,
F::Output: Future + Send,
{
type Output = Result<(<F::Output as Future>::Output,), T::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.project().state.poll(cx)
}
}

impl<T, F> Future for State<T, F>
where
T: TryFuture,
F: Func<T::Ok>,
F::Output: Future + Send,
{
type Output = Result<(<F::Output as Future>::Output,), T::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
match self.as_mut().project() {
StateProj::First(first, second) => {
let ex1 = ready!(first.try_poll(cx))?;
let fut2 = second.call(ex1);
self.set(State::Second(fut2));
}
StateProj::Second(second) => {
let ex2 = (ready!(second.poll(cx)),);
self.set(State::Done);
return Poll::Ready(Ok(ex2));
}
StateProj::Done => panic!("polled after complete"),
}
}
}
}
36 changes: 35 additions & 1 deletion src/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod and;
mod and_then;
mod boxed;
mod map;
mod map_async;
mod map_err;
mod or;
mod or_else;
Expand All @@ -23,6 +24,7 @@ pub(crate) use self::and::And;
use self::and_then::AndThen;
pub use self::boxed::BoxedFilter;
pub(crate) use self::map::Map;
use self::map_async::MapAsync;
pub(crate) use self::map_err::MapErr;
pub(crate) use self::or::Or;
use self::or_else::OrElse;
Expand Down Expand Up @@ -197,13 +199,45 @@ pub trait Filter: FilterBase {
}
}

/// Composes this `Filter` with a function receiving the extracted value.
/// Composes this `Filter` with an async function receiving
/// the extracted value.
///
/// The function should return some `Future` type.
///
/// # Example
///
/// ```
/// use warp::Filter;
///
/// // Map `/:id`
/// warp::path::param().map_async(|id: u64| async move {
/// format!("Hello #{}", id)
/// });
/// ```
fn map_async<F>(self, fun: F) -> MapAsync<Self, F>
where
Self: Sized,
F: Func<Self::Extract> + Clone,
F::Output: Future + Send,
{
MapAsync {
filter: self,
callback: fun,
}
}

/// Composes this `Filter` with an async function receiving
/// the extracted value.
///
/// The function should return some `TryFuture` type.
///
/// The `Error` type of the return `Future` needs be a `Rejection`, which
/// means most futures will need to have their error mapped into one.
///
/// Rejections are meant to say "this filter didn't accept the request,
/// maybe another can". So for application-level errors, consider using
/// [`Filter::map_async`] instead.
///
/// # Example
///
/// ```
Expand Down

0 comments on commit 7ef4b6d

Please sign in to comment.