From cb795987602712b7ca4216fea12048941cec621d Mon Sep 17 00:00:00 2001 From: Francisco Del Roio Date: Sun, 13 Nov 2022 15:40:48 +0000 Subject: [PATCH 1/2] Implemented the `reduce` and `reduce_async` operators. --- aioreactive/__init__.py | 18 ++++++++++++ aioreactive/transform.py | 25 +++++++++++++++++ tests/test_reduce.py | 59 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 102 insertions(+) create mode 100644 tests/test_reduce.py diff --git a/aioreactive/__init__.py b/aioreactive/__init__.py index cdca1e8..978b819 100644 --- a/aioreactive/__init__.py +++ b/aioreactive/__init__.py @@ -863,6 +863,24 @@ def of_async(workflow: Awaitable[_TSource]) -> AsyncObservable[_TSource]: return of_async(workflow) +def reduce( + accumulator: Callable[[_TResult, _TSource], _TResult], + initial: _TResult, +) -> Callable[[AsyncObservable[_TSource]], AsyncObservable[_TResult]]: + from .transform import reduce as _reduce + + return _reduce(accumulator, initial) + + +def reduce_async( + accumulator: Callable[[_TResult, _TSource], Awaitable[_TResult]], + initial: _TResult, +) -> Callable[[AsyncObservable[_TSource]], AsyncObservable[_TResult]]: + from .transform import reduce_async as _reduce + + return _reduce(accumulator, initial) + + def retry( retry_count: int, ) -> Callable[[AsyncObservable[_TSource]], AsyncObservable[_TSource]]: diff --git a/aioreactive/transform.py b/aioreactive/transform.py index c563277..4173c36 100644 --- a/aioreactive/transform.py +++ b/aioreactive/transform.py @@ -528,3 +528,28 @@ def scan_async( The scan operator. """ return _scan(accumulator, initial) + + +def reduce( + accumulator: Callable[[_TResult, _TSource], _TResult], + initial: _TResult, +) -> Callable[[AsyncObservable[_TSource]], AsyncObservable[_TResult]]: + async def _reduce(current: _TResult, value: _TSource) -> _TResult: + return accumulator(current, value) + + def _operator(Observable: AsyncObservable[_TSource]) -> AsyncObservable[_TResult]: + return pipe(Observable, reduce_async(_reduce, initial)) + + return _operator + + +def reduce_async( + accumulator: Callable[[_TResult, _TSource], Awaitable[_TResult]], + initial: _TResult, +) -> Callable[[AsyncObservable[_TSource]], AsyncObservable[_TResult]]: + def _operator(source: AsyncObservable[_TSource]) -> AsyncObservable[_TResult]: + from .filtering import take_last + + return pipe(source, scan_async(accumulator, initial), take_last(1)) + + return _operator diff --git a/tests/test_reduce.py b/tests/test_reduce.py new file mode 100644 index 0000000..068eb73 --- /dev/null +++ b/tests/test_reduce.py @@ -0,0 +1,59 @@ +import asyncio + +import pytest +from expression import pipe + +import aioreactive as rx +from aioreactive.notification import OnCompleted, OnNext +from aioreactive.testing import AsyncTestObserver, VirtualTimeEventLoop + + +class MyException(Exception): + pass + + +@pytest.fixture() +def event_loop(): + loop = VirtualTimeEventLoop() + try: + yield loop + finally: + loop.close() + + +def sync_sum(a: int, b: int) -> int: + return a + b + + +async def async_sum(a: int, b: int) -> int: + await asyncio.sleep(0.2) + return a + b + + +@pytest.mark.asyncio +async def test_reduce(): + xs = rx.from_iterable([1, 2, 3, 4]) + observer = AsyncTestObserver() + ys = pipe(xs, rx.reduce(sync_sum, 0)) + await rx.run(ys, observer) + + values = list(map(lambda t: t[1], observer.values)) + assert values == [ + OnNext(10), + OnCompleted, + ] + + +@pytest.mark.asyncio +async def test_reduce_async(): + xs = rx.from_iterable([1, 2, 3, 4]) + observer = AsyncTestObserver() + ys = pipe(xs, rx.reduce_async(async_sum, 0)) + + await rx.run(ys, observer) + + values = list(map(lambda t: t[1], observer.values)) + assert values == [ + OnNext(10), + OnCompleted, + ] From 92de0d8eb7495e633432f132af5b1533d02fce00 Mon Sep 17 00:00:00 2001 From: Francisco Del Roio Date: Sun, 13 Nov 2022 13:00:19 -0300 Subject: [PATCH 2/2] Added documentation to the reduce operator. Also added reduce calls to the chained observable class. --- aioreactive/__init__.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/aioreactive/__init__.py b/aioreactive/__init__.py index 978b819..27ef5fd 100644 --- a/aioreactive/__init__.py +++ b/aioreactive/__init__.py @@ -341,6 +341,18 @@ def merge(self, other: AsyncObservable[_TSource]) -> AsyncRx[_TSource]: AsyncRx.create, ) + def reduce( + self, accumulator: Callable[[_TResult, _TSource], _TResult], initial: _TResult + ) -> "AsyncRx[_TResult]": + return pipe(self, reduce(accumulator, initial), AsyncRx[_TResult]) + + def reduce_async( + self, + accumulator: Callable[[_TResult, _TSource], Awaitable[_TResult]], + initial: _TResult, + ) -> "AsyncRx[_TResult]": + return pipe(self, reduce_async(accumulator, initial), AsyncRx[_TResult]) + def skip(self, count: int) -> AsyncObservable[_TSource]: """Skip items from start of the stream. @@ -867,6 +879,17 @@ def reduce( accumulator: Callable[[_TResult, _TSource], _TResult], initial: _TResult, ) -> Callable[[AsyncObservable[_TSource]], AsyncObservable[_TResult]]: + """The reduce operator. + + If an error occurs either in the accumulator or the source the subscription is disposed and the error is thrown. + + Args: + accumulator: An accumulator function + initial: The initial value + + Returns: + The reduce operator function + """ from .transform import reduce as _reduce return _reduce(accumulator, initial) @@ -876,6 +899,17 @@ def reduce_async( accumulator: Callable[[_TResult, _TSource], Awaitable[_TResult]], initial: _TResult, ) -> Callable[[AsyncObservable[_TSource]], AsyncObservable[_TResult]]: + """The async reduce operator. + + See the `reduce` operator. + + Args: + accumulator (Callable[[_TResult, _TSource], Awaitable[_TResult]]): An async accumulator function + initial (_TResult): The initial value + + Returns: + Callable[[AsyncObservable[_TSource]], AsyncObservable[_TResult]]: The operator function + """ from .transform import reduce_async as _reduce return _reduce(accumulator, initial)