Skip to content

Commit

Permalink
Added documentation to the reduce operator. Also added reduce calls t…
Browse files Browse the repository at this point in the history
…o the chained observable class.
  • Loading branch information
francipvb committed Nov 13, 2022
1 parent cb79598 commit 92de0d8
Showing 1 changed file with 34 additions and 0 deletions.
34 changes: 34 additions & 0 deletions aioreactive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,18 @@ def merge(self, other: AsyncObservable[_TSource]) -> AsyncRx[_TSource]:
AsyncRx.create,
)

def reduce(
self, accumulator: Callable[[_TResult, _TSource], _TResult], initial: _TResult
) -> "AsyncRx[_TResult]":
return pipe(self, reduce(accumulator, initial), AsyncRx[_TResult])

def reduce_async(
self,
accumulator: Callable[[_TResult, _TSource], Awaitable[_TResult]],
initial: _TResult,
) -> "AsyncRx[_TResult]":
return pipe(self, reduce_async(accumulator, initial), AsyncRx[_TResult])

def skip(self, count: int) -> AsyncObservable[_TSource]:
"""Skip items from start of the stream.
Expand Down Expand Up @@ -867,6 +879,17 @@ def reduce(
accumulator: Callable[[_TResult, _TSource], _TResult],
initial: _TResult,
) -> Callable[[AsyncObservable[_TSource]], AsyncObservable[_TResult]]:
"""The reduce operator.
If an error occurs either in the accumulator or the source the subscription is disposed and the error is thrown.
Args:
accumulator: An accumulator function
initial: The initial value
Returns:
The reduce operator function
"""
from .transform import reduce as _reduce

return _reduce(accumulator, initial)
Expand All @@ -876,6 +899,17 @@ def reduce_async(
accumulator: Callable[[_TResult, _TSource], Awaitable[_TResult]],
initial: _TResult,
) -> Callable[[AsyncObservable[_TSource]], AsyncObservable[_TResult]]:
"""The async reduce operator.
See the `reduce` operator.
Args:
accumulator (Callable[[_TResult, _TSource], Awaitable[_TResult]]): An async accumulator function
initial (_TResult): The initial value
Returns:
Callable[[AsyncObservable[_TSource]], AsyncObservable[_TResult]]: The operator function
"""
from .transform import reduce_async as _reduce

return _reduce(accumulator, initial)
Expand Down

0 comments on commit 92de0d8

Please sign in to comment.