diff --git a/README.md b/README.md index 1dce846..e0e34df 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ Readability counts, abstracting common operations helps reduce cognitive complex Take this imperative pipeline of operations, it iterates once over the data, skipping the value if it fails one of the filter checks: ```python +>>> from danom import Stream >>> res = [] ... >>> for x in range(1_000_000): @@ -44,6 +45,7 @@ keyword breakdown: `{'for': 1, 'in': 1, 'if': 3, 'not': 3, 'continue': 3}` After a bit of experience with python you might use list comprehensions, however this is arguably _less_ clear and iterates multiple times over the same data ```python +>>> from danom import Stream >>> mul_three = [triple(x) for x in range(1_000_000)] >>> gt_ten = [x for x in mul_three if is_gt_ten(x)] >>> sub_two = [min_two(x) for x in gt_ten] @@ -62,6 +64,7 @@ This still has a lot of tokens that the developer has to read to understand the Using a `Stream` results in this: ```python +>>> from danom import Stream >>> ( ... Stream.from_iterable(range(1_000_000)) ... .map(triple) @@ -90,12 +93,14 @@ Stream.async_collect(self) -> 'tuple' Async version of collect. Note that all functions in the stream should be `Awaitable`. ```python +>>> from danom import Stream >>> Stream.from_iterable(file_paths).map(async_read_files).async_collect() ``` If there are no operations in the `Stream` then this will act as a normal collect. ```python +>>> from danom import Stream >>> Stream.from_iterable(file_paths).async_collect() ``` @@ -107,6 +112,7 @@ Stream.collect(self) -> 'tuple' Materialise the sequence from the `Stream`. ```python +>>> from danom import Stream >>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one) >>> stream.collect() == (1, 2, 3, 4) ``` @@ -119,11 +125,13 @@ Stream.filter(self, *fns: 'Callable[[T], bool]') -> 'Self' Filter the stream based on a predicate. Will return a new `Stream` with the modified sequence. ```python +>>> from danom import Stream >>> Stream.from_iterable([0, 1, 2, 3]).filter(lambda x: x % 2 == 0).collect() == (0, 2) ``` Simple functions can be passed in sequence to compose more complex filters ```python +>>> from danom import Stream >>> Stream.from_iterable(range(20)).filter(divisible_by_3, divisible_by_5).collect() == (0, 15) ``` @@ -135,6 +143,7 @@ Stream.fold(self, initial: 'T', fn: 'Callable[[T], U]', *, workers: 'int' = 1, u Fold the results into a single value. `fold` triggers an action so will incur a `collect`. ```python +>>> from danom import Stream >>> Stream.from_iterable([1, 2, 3, 4]).fold(0, lambda a, b: a + b) == 10 >>> Stream.from_iterable([[1], [2], [3], [4]]).fold([0], lambda a, b: a + b) == [0, 1, 2, 3, 4] >>> Stream.from_iterable([1, 2, 3, 4]).fold(1, lambda a, b: a * b) == 24 @@ -143,6 +152,7 @@ Fold the results into a single value. `fold` triggers an action so will incur a As `fold` triggers an action, the parameters will be forwarded to the `par_collect` call if the `workers` are greater than 1. This will only effect the `collect` that is used to create the iterable to reduce, not the `fold` operation itself. ```python +>>> from danom import Stream >>> Stream.from_iterable([1, 2, 3, 4]).map(some_expensive_fn).fold(0, add, workers=4, use_threads=False) ``` @@ -154,6 +164,7 @@ Stream.from_iterable(it: 'Iterable') -> 'Self' This is the recommended way of creating a `Stream` object. ```python +>>> from danom import Stream >>> Stream.from_iterable([0, 1, 2, 3]).collect() == (0, 1, 2, 3) ``` @@ -165,11 +176,13 @@ Stream.map(self, *fns: 'Callable[[T], U]') -> 'Self' Map a function to the elements in the `Stream`. Will return a new `Stream` with the modified sequence. ```python +>>> from danom import Stream >>> Stream.from_iterable([0, 1, 2, 3]).map(add_one).collect() == (1, 2, 3, 4) ``` This can also be mixed with `safe` functions: ```python +>>> from danom import Stream >>> Stream.from_iterable([0, 1, 2, 3]).map(add_one).collect() == (Ok(inner=1), Ok(inner=2), Ok(inner=3), Ok(inner=4)) >>> @safe @@ -181,6 +194,7 @@ This can also be mixed with `safe` functions: Simple functions can be passed in sequence to compose more complex transformations ```python +>>> from danom import Stream >>> Stream.from_iterable(range(5)).map(mul_two, add_one).collect() == (1, 3, 5, 7, 9) ``` @@ -192,6 +206,7 @@ Stream.par_collect(self, workers: 'int' = 4, *, use_threads: 'bool' = False) -> Materialise the sequence from the `Stream` in parallel. ```python +>>> from danom import Stream >>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one) >>> stream.par_collect() == (1, 2, 3, 4) ``` @@ -199,6 +214,7 @@ Materialise the sequence from the `Stream` in parallel. Use the `workers` arg to select the number of workers to use. Use `-1` to use all available processors (except 1). Defaults to `4`. ```python +>>> from danom import Stream >>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one) >>> stream.par_collect(workers=-1) == (1, 2, 3, 4) ``` @@ -206,6 +222,7 @@ Defaults to `4`. For smaller I/O bound tasks use the `use_threads` flag as True. If False the processing will use `ProcessPoolExecutor` else it will use `ThreadPoolExecutor`. ```python +>>> from danom import Stream >>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one) >>> stream.par_collect(use_threads=True) == (1, 2, 3, 4) ``` @@ -221,6 +238,7 @@ Similar to `filter` except splits the True and False values. Will return a two n Each partition is independently replayable. ```python +>>> from danom import Stream >>> part1, part2 = Stream.from_iterable([0, 1, 2, 3]).partition(lambda x: x % 2 == 0) >>> part1.collect() == (0, 2) >>> part2.collect() == (1, 3) @@ -228,112 +246,138 @@ Each partition is independently replayable. As `partition` triggers an action, the parameters will be forwarded to the `par_collect` call if the `workers` are greater than 1. ```python +>>> from danom import Stream >>> Stream.from_iterable(range(10)).map(add_one, add_one).partition(divisible_by_3, workers=4) >>> part1.map(add_one).par_collect() == (4, 7, 10) >>> part2.collect() == (2, 4, 5, 7, 8, 10, 11) ``` -## Ok +### `Stream.tap` +```python +Stream.tap(self, *fns: 'Callable[[T], None]') -> 'Self' +``` +Tap the values to another process that returns None. Will return a new `Stream` with the modified sequence. -Frozen instance of an Ok monad used to wrap successful operations. +The value passed to the tap function will be deep-copied to avoid any modification to the `Stream` item for downstream consumers. -### `Ok.and_then` ```python -Ok.and_then(self, func: collections.abc.Callable[[~T], danom._result.Result], **kwargs: dict) -> danom._result.Result +>>> from danom import Stream +>>> Stream.from_iterable([0, 1, 2, 3]).tap(log_value).collect() == (0, 1, 2, 3) ``` -Pipe another function that returns a monad. +Simple functions can be passed in sequence for multiple `tap` operations ```python ->>> Ok(1).and_then(add_one) == Ok(2) ->>> Ok(1).and_then(raise_err) == Err(error=TypeError()) +>>> from danom import Stream +>>> Stream.from_iterable([0, 1, 2, 3]).tap(log_value, print_value).collect() == (0, 1, 2, 3) ``` +`tap` is useful for logging and similar actions without effecting the individual items, in this example eligible and dormant users are logged using `tap`: -### `Ok.is_ok` ```python -Ok.is_ok(self) -> Literal[True] +>>> from danom import Stream +>>> active_users, inactive_users = ( +... Stream.from_iterable(users).map(parse_user_objects).partition(inactive_users) +... ) +... +>>> active_users.filter(eligible_for_promotion).tap(log_eligible_users).map( +... construct_promo_email, send_with_confirmation +... ).collect() +... +>>> inactive_users.tap(log_inactive_users).map( +... create_dormant_user_entry, add_to_dormant_table +... ).collect() ``` -Returns True if the result type is Ok. -```python ->>> Ok().is_ok() == True -``` +## Result -### `Ok.match` +`Result` monad. Consists of `Ok` and `Err` for successful and failed operations respectively. +Each monad is a frozen instance to prevent further mutation. + + +### `Result.and_then` ```python -Ok.match(self, if_ok_func: collections.abc.Callable[[~T], danom._result.Result], _if_err_func: collections.abc.Callable[[~T], danom._result.Result]) -> danom._result.Result +Result.and_then(self, func: 'Callable[[T], Result[U]]', **kwargs: 'dict') -> 'Result[U]' ``` -Map Ok func to Ok and Err func to Err +Pipe another function that returns a monad. For `Err` will return original error. ```python ->>> Ok(1).match(add_one, mock_get_error_type) == Ok(inner=2) ->>> Ok("ok").match(double, mock_get_error_type) == Ok(inner='okok') ->>> Err(error=TypeError()).match(double, mock_get_error_type) == Ok(inner='TypeError') +>>> from danom import Err, Ok +>>> Ok(1).and_then(add_one) == Ok(2) +>>> Ok(1).and_then(raise_err) == Err(error=TypeError()) +>>> Err(error=TypeError()).and_then(add_one) == Err(error=TypeError()) +>>> Err(error=TypeError()).and_then(raise_value_err) == Err(error=TypeError()) ``` -### `Ok.unwrap` +### `Result.is_ok` ```python -Ok.unwrap(self) -> ~T +Result.is_ok(self) -> 'bool' ``` -Unwrap the Ok monad and get the inner value. +Returns `True` if the result type is `Ok`. +Returns `False` if the result type is `Err`. ```python ->>> Ok().unwrap() == None ->>> Ok(1).unwrap() == 1 ->>> Ok("ok").unwrap() == 'ok' +>>> from danom import Err, Ok +>>> Ok().is_ok() == True +>>> Err().is_ok() == False ``` -## Err - -Frozen instance of an Err monad used to wrap failed operations. - -### `Err.and_then` +### `Result.map` ```python -Err.and_then(self, _: 'Callable[[T], Result]', **_kwargs: 'dict') -> 'Self' +Result.map(self, func: 'Callable[[T], U]', **kwargs: 'dict') -> 'Result[U]' ``` -Pipe another function that returns a monad. For Err will return original error. +Pipe a pure function and wrap the return value with `Ok`. +Given an `Err` will return self. ```python ->>> Err(error=TypeError()).and_then(add_one) == Err(error=TypeError()) ->>> Err(error=TypeError()).and_then(raise_value_err) == Err(error=TypeError()) +>>> from danom import Err, Ok +>>> Ok(1).map(add_one) == Ok(2) +>>> Err(error=TypeError()).map(add_one) == Err(error=TypeError()) ``` -### `Err.is_ok` +### `Result.match` ```python -Err.is_ok(self) -> 'Literal[False]' +Result.match(self, if_ok_func: 'Callable[[T], Result]', if_err_func: 'Callable[[T], Result]') -> 'Result' ``` -Returns False if the result type is Err. +Map `ok_func` to `Ok` and `err_func` to `Err` ```python -Err().is_ok() == False +>>> from danom import Err, Ok +>>> Ok(1).match(add_one, mock_get_error_type) == Ok(inner=2) +>>> Ok("ok").match(double, mock_get_error_type) == Ok(inner='okok') +>>> Err(error=TypeError()).match(double, mock_get_error_type) == Ok(inner='TypeError') ``` -### `Err.match` +### `Result.unit` ```python -Err.match(self, _if_ok_func: 'Callable[[T], Result]', if_err_func: 'Callable[[T], Result]') -> 'Result' +Result.unit(inner: 'T') -> 'Ok[T]' ``` -Map Ok func to Ok and Err func to Err +Unit method. Given an item of type `T` return `Ok(T)` ```python ->>> Ok(1).match(add_one, mock_get_error_type) == Ok(inner=2) ->>> Ok("ok").match(double, mock_get_error_type) == Ok(inner='okok') ->>> Err(error=TypeError()).match(double, mock_get_error_type) == Ok(inner='TypeError') +>>> from danom import Err, Ok, Result +>>> Result.unit(0) == Ok(inner=0) +>>> Ok.unit(0) == Ok(inner=0) +>>> Err.unit(0) == Ok(inner=0) ``` -### `Err.unwrap` +### `Result.unwrap` ```python -Err.unwrap(self) -> 'None' +Result.unwrap(self) -> 'T' ``` -Unwrap the Err monad will raise the inner error. - +Unwrap the `Ok` monad and get the inner value. +Unwrap the `Err` monad will raise the inner error. ```python +>>> from danom import Err, Ok +>>> Ok().unwrap() == None +>>> Ok(1).unwrap() == 1 +>>> Ok("ok").unwrap() == 'ok' >>> Err(error=TypeError()).unwrap() raise TypeError(...) ``` @@ -347,6 +391,7 @@ safe(func: collections.abc.Callable[[T], U]) -> collections.abc.Callable[[T], da Decorator for functions that wraps the function in a try except returns `Ok` on success else `Err`. ```python +>>> from danom import safe >>> @safe ... def add_one(a: int) -> int: ... return a + 1 @@ -364,6 +409,7 @@ safe_method(func: collections.abc.Callable[[T], U]) -> collections.abc.Callable[ The same as `safe` except it forwards on the `self` of the class instance to the wrapped function. ```python +>>> from danom import safe_method >>> class Adder: ... def __init__(self, result: int = 0) -> None: ... self.result = result @@ -387,13 +433,11 @@ Compose multiple functions into one. The functions will be called in sequence with the result of one being used as the input for the next. ```python +>>> from danom import compose >>> add_two = compose(add_one, add_one) >>> add_two(0) == 2 -``` - -```python ->>> add_two = compose(add_one, add_one, is_even) ->>> add_two(0) == True +>>> add_two_is_even = compose(add_one, add_one, is_even) +>>> add_two_is_even(0) == True ``` @@ -406,6 +450,7 @@ all_of(*fns: collections.abc.Callable[[T], bool]) -> collections.abc.Callable[[T True if all of the given functions return True. ```python +>>> from danom import all_of >>> is_valid_user = all_of(is_subscribed, is_active, has_2fa) >>> is_valid_user(user) == True ``` @@ -420,6 +465,7 @@ any_of(*fns: collections.abc.Callable[[T], bool]) -> collections.abc.Callable[[T True if any of the given functions return True. ```python +>>> from danom import any_of >>> is_eligible = any_of(has_coupon, is_vip, is_staff) >>> is_eligible(user) == True ``` @@ -434,6 +480,7 @@ identity(x: T) -> T Basic identity function. ```python +>>> from danom import identity >>> identity("abc") == "abc" >>> identity(1) == 1 >>> identity(ComplexDataType(a=1, b=2, c=3)) == ComplexDataType(a=1, b=2, c=3) @@ -444,11 +491,12 @@ Basic identity function. ### `invert` ```python -invert(func: collections.abc.Callable[~P, bool]) -> collections.abc.Callable[~P, bool] +invert(func: collections.abc.Callable[[T], bool]) -> collections.abc.Callable[[T], bool] ``` Invert a boolean function so it returns False where it would've returned True. ```python +>>> from danom import invert >>> invert(has_len)("abc") == False >>> invert(has_len)("") == True ``` @@ -463,6 +511,7 @@ new_type(name: 'str', base_type: 'type', validators: 'Callable | Sequence[Callab Create a NewType based on another type. ```python +>>> from danom import new_type >>> def is_positive(value): ... return value >= 0 @@ -479,6 +528,7 @@ Unlike an inherited class, the type will not return `True` for an isinstance che The methods of the given `base_type` will be forwarded to the specialised type. Alternatively the map method can be used to return a new type instance with the transformation. ```python +>>> from danom import new_type >>> def has_len(email: str) -> bool: ... return len(email) > 0 @@ -502,9 +552,7 @@ Alternatively the map method can be used to return a new type instance with the ├── src │ └── danom │ ├── __init__.py -│ ├── _err.py │ ├── _new_type.py -│ ├── _ok.py │ ├── _result.py │ ├── _safe.py │ ├── _stream.py @@ -513,9 +561,7 @@ Alternatively the map method can be used to return a new type instance with the │ ├── __init__.py │ ├── conftest.py │ ├── test_api.py -│ ├── test_err.py │ ├── test_new_type.py -│ ├── test_ok.py │ ├── test_result.py │ ├── test_safe.py │ ├── test_stream.py diff --git a/dev_tools/update_readme.py b/dev_tools/update_readme.py index ee0b3cb..a26741c 100644 --- a/dev_tools/update_readme.py +++ b/dev_tools/update_readme.py @@ -6,8 +6,7 @@ import attrs from danom import ( - Err, - Ok, + Result, Stream, all_of, any_of, @@ -34,7 +33,7 @@ def to_readme(self) -> str: def create_readme_lines() -> str: readme_lines = [] - for ent in [Stream, Ok, Err]: + for ent in [Stream, Result]: readme_lines.append(f"## {ent.__name__}") readme_lines.append(strip_doc(ent.__doc__)) readme_docs = [ diff --git a/pyproject.toml b/pyproject.toml index d35b3e0..2270419 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "danom" -version = "0.7.0" +version = "0.8.0" description = "Functional streams and monads" readme = "README.md" license = "MIT" diff --git a/src/danom/__init__.py b/src/danom/__init__.py index 3166a1f..0dbc765 100644 --- a/src/danom/__init__.py +++ b/src/danom/__init__.py @@ -1,7 +1,5 @@ -from danom._err import Err from danom._new_type import new_type -from danom._ok import Ok -from danom._result import Result +from danom._result import Err, Ok, Result from danom._safe import safe, safe_method from danom._stream import Stream from danom._utils import all_of, any_of, compose, identity, invert, none_of diff --git a/src/danom/_err.py b/src/danom/_err.py deleted file mode 100644 index 017500a..0000000 --- a/src/danom/_err.py +++ /dev/null @@ -1,86 +0,0 @@ -from __future__ import annotations - -from collections.abc import Callable -from types import TracebackType -from typing import ( - Any, - Literal, - Self, -) - -import attrs - -from danom._result import Result, T - - -@attrs.define(frozen=True) -class Err(Result): - """Frozen instance of an Err monad used to wrap failed operations.""" - - inner: Any = attrs.field(default=None) - input_args: tuple[T] = attrs.field(default=None, repr=False) - error: Exception | None = attrs.field(default=None) - details: list[dict[str, Any]] = attrs.field(factory=list, init=False, repr=False) - - def __attrs_post_init__(self) -> None: - if isinstance(self.error, Exception): - # little hack explained here: https://www.attrs.org/en/stable/init.html#post-init - object.__setattr__(self, "details", self._extract_details(self.error.__traceback__)) - - def _extract_details(self, tb: TracebackType | None) -> list[dict[str, Any]]: - trace_info = [] - while tb: - frame = tb.tb_frame - trace_info.append( - { - "file": frame.f_code.co_filename, - "func": frame.f_code.co_name, - "line_no": tb.tb_lineno, - "locals": frame.f_locals, - }, - ) - tb = tb.tb_next - return trace_info - - def is_ok(self) -> Literal[False]: - """Returns False if the result type is Err. - - ```python - Err().is_ok() == False - ``` - """ - return False - - def and_then(self, _: Callable[[T], Result], **_kwargs: dict) -> Self: - """Pipe another function that returns a monad. For Err will return original error. - - ```python - >>> Err(error=TypeError()).and_then(add_one) == Err(error=TypeError()) - >>> Err(error=TypeError()).and_then(raise_value_err) == Err(error=TypeError()) - ``` - """ - return self - - def unwrap(self) -> None: - """Unwrap the Err monad will raise the inner error. - - ```python - >>> Err(error=TypeError()).unwrap() raise TypeError(...) - ``` - """ - if self.error is not None: - raise self.error - raise ValueError(f"Err does not have a caught error to raise: {self.inner = }") - - def match( - self, _if_ok_func: Callable[[T], Result], if_err_func: Callable[[T], Result] - ) -> Result: - """Map Ok func to Ok and Err func to Err - - ```python - >>> Ok(1).match(add_one, mock_get_error_type) == Ok(inner=2) - >>> Ok("ok").match(double, mock_get_error_type) == Ok(inner='okok') - >>> Err(error=TypeError()).match(double, mock_get_error_type) == Ok(inner='TypeError') - ``` - """ - return if_err_func(self.error) diff --git a/src/danom/_new_type.py b/src/danom/_new_type.py index f78ade4..736e0ef 100644 --- a/src/danom/_new_type.py +++ b/src/danom/_new_type.py @@ -19,6 +19,7 @@ def new_type( # noqa: ANN202 """Create a NewType based on another type. ```python + >>> from danom import new_type >>> def is_positive(value): ... return value >= 0 @@ -35,6 +36,7 @@ def new_type( # noqa: ANN202 The methods of the given `base_type` will be forwarded to the specialised type. Alternatively the map method can be used to return a new type instance with the transformation. ```python + >>> from danom import new_type >>> def has_len(email: str) -> bool: ... return len(email) > 0 diff --git a/src/danom/_ok.py b/src/danom/_ok.py deleted file mode 100644 index 7f97229..0000000 --- a/src/danom/_ok.py +++ /dev/null @@ -1,59 +0,0 @@ -from collections.abc import Callable -from typing import ( - Any, - Literal, -) - -import attrs - -from danom._result import Result, T - - -@attrs.define(frozen=True) -class Ok(Result): - """Frozen instance of an Ok monad used to wrap successful operations.""" - - inner: Any = attrs.field(default=None) - - def is_ok(self) -> Literal[True]: - """Returns True if the result type is Ok. - - ```python - >>> Ok().is_ok() == True - ``` - """ - return True - - def and_then(self, func: Callable[[T], Result], **kwargs: dict) -> Result: - """Pipe another function that returns a monad. - - ```python - >>> Ok(1).and_then(add_one) == Ok(2) - >>> Ok(1).and_then(raise_err) == Err(error=TypeError()) - ``` - """ - return func(self.inner, **kwargs) - - def unwrap(self) -> T: - """Unwrap the Ok monad and get the inner value. - - ```python - >>> Ok().unwrap() == None - >>> Ok(1).unwrap() == 1 - >>> Ok("ok").unwrap() == 'ok' - ``` - """ - return self.inner - - def match( - self, if_ok_func: Callable[[T], Result], _if_err_func: Callable[[T], Result] - ) -> Result: - """Map Ok func to Ok and Err func to Err - - ```python - >>> Ok(1).match(add_one, mock_get_error_type) == Ok(inner=2) - >>> Ok("ok").match(double, mock_get_error_type) == Ok(inner='okok') - >>> Err(error=TypeError()).match(double, mock_get_error_type) == Ok(inner='TypeError') - ``` - """ - return if_ok_func(self.inner) diff --git a/src/danom/_result.py b/src/danom/_result.py index 5b02be3..805ce24 100644 --- a/src/danom/_result.py +++ b/src/danom/_result.py @@ -2,33 +2,171 @@ from abc import ABC, abstractmethod from collections.abc import Callable +from types import TracebackType from typing import ( - ParamSpec, + Any, + Literal, Self, - TypeVar, ) import attrs -T = TypeVar("T") -P = ParamSpec("P") - @attrs.define -class Result(ABC): +class Result[T, U](ABC): + """`Result` monad. Consists of `Ok` and `Err` for successful and failed operations respectively. + Each monad is a frozen instance to prevent further mutation. + """ + + @classmethod + def unit(cls, inner: T) -> Ok[T]: + """Unit method. Given an item of type `T` return `Ok(T)` + + ```python + >>> from danom import Err, Ok, Result + >>> Result.unit(0) == Ok(inner=0) + >>> Ok.unit(0) == Ok(inner=0) + >>> Err.unit(0) == Ok(inner=0) + ``` + """ + return Ok(inner) + @abstractmethod - def is_ok(self) -> bool: ... + def is_ok(self) -> bool: + """Returns `True` if the result type is `Ok`. + Returns `False` if the result type is `Err`. + + ```python + >>> from danom import Err, Ok + >>> Ok().is_ok() == True + >>> Err().is_ok() == False + ``` + """ + ... @abstractmethod - def and_then(self, func: Callable[[T], Result], **kwargs: dict) -> Result: ... + def map(self, func: Callable[[T], U], **kwargs: dict) -> Result[U]: + """Pipe a pure function and wrap the return value with `Ok`. + Given an `Err` will return self. + + ```python + >>> from danom import Err, Ok + >>> Ok(1).map(add_one) == Ok(2) + >>> Err(error=TypeError()).map(add_one) == Err(error=TypeError()) + ``` + """ + ... @abstractmethod - def unwrap(self) -> T: ... + def and_then(self, func: Callable[[T], Result[U]], **kwargs: dict) -> Result[U]: + """Pipe another function that returns a monad. For `Err` will return original error. + + ```python + >>> from danom import Err, Ok + >>> Ok(1).and_then(add_one) == Ok(2) + >>> Ok(1).and_then(raise_err) == Err(error=TypeError()) + >>> Err(error=TypeError()).and_then(add_one) == Err(error=TypeError()) + >>> Err(error=TypeError()).and_then(raise_value_err) == Err(error=TypeError()) + ``` + """ + ... + + @abstractmethod + def unwrap(self) -> T: + """Unwrap the `Ok` monad and get the inner value. + Unwrap the `Err` monad will raise the inner error. + ```python + >>> from danom import Err, Ok + >>> Ok().unwrap() == None + >>> Ok(1).unwrap() == 1 + >>> Ok("ok").unwrap() == 'ok' + >>> Err(error=TypeError()).unwrap() raise TypeError(...) + ``` + """ + ... @abstractmethod def match( - self, if_ok_func: Callable[[T], Result], _if_err_func: Callable[[T], Result] - ) -> Result: ... + self, if_ok_func: Callable[[T], Result], if_err_func: Callable[[T], Result] + ) -> Result: + """Map `ok_func` to `Ok` and `err_func` to `Err` + + ```python + >>> from danom import Err, Ok + >>> Ok(1).match(add_one, mock_get_error_type) == Ok(inner=2) + >>> Ok("ok").match(double, mock_get_error_type) == Ok(inner='okok') + >>> Err(error=TypeError()).match(double, mock_get_error_type) == Ok(inner='TypeError') + ``` + """ + ... def __class_getitem__(cls, _params: tuple) -> Self: return cls + + +@attrs.define(frozen=True) +class Ok[T, U](Result): + inner: Any = attrs.field(default=None) + + def is_ok(self) -> Literal[True]: + return True + + def map(self, func: Callable[[T], U], **kwargs: dict) -> Result[U]: + return Ok(func(self.inner, **kwargs)) + + def and_then(self, func: Callable[[T], Result[U]], **kwargs: dict) -> Result[U]: + return func(self.inner, **kwargs) + + def unwrap(self) -> T: + return self.inner + + def match( + self, if_ok_func: Callable[[T], Result], _if_err_func: Callable[[T], Result] + ) -> Result: + return if_ok_func(self.inner) + + +@attrs.define(frozen=True) +class Err[T, U, E](Result): + error: E | Exception | None = attrs.field(default=None) + input_args: tuple[T] = attrs.field(default=None, repr=False) + details: list[dict[str, Any]] = attrs.field(factory=list, init=False, repr=False) + + def __attrs_post_init__(self) -> None: + if isinstance(self.error, Exception): + # little hack explained here: https://www.attrs.org/en/stable/init.html#post-init + object.__setattr__(self, "details", self._extract_details(self.error.__traceback__)) + + def _extract_details(self, tb: TracebackType | None) -> list[dict[str, Any]]: + trace_info = [] + while tb: + frame = tb.tb_frame + trace_info.append( + { + "file": frame.f_code.co_filename, + "func": frame.f_code.co_name, + "line_no": tb.tb_lineno, + "locals": frame.f_locals, + }, + ) + tb = tb.tb_next + return trace_info + + def is_ok(self) -> Literal[False]: + return False + + def map(self, _: Callable[[T], U], **_kwargs: dict) -> Result[U]: + return self + + def and_then(self, _: Callable[[T], Result[U]], **_kwargs: dict) -> Self: + return self + + def unwrap(self) -> None: + if isinstance(self.error, Exception): + raise self.error + raise ValueError(f"Err does not have a caught error to raise: {self.error = }") + + def match( + self, _if_ok_func: Callable[[T], Result], if_err_func: Callable[[T], Result] + ) -> Result: + return if_err_func(self.error) diff --git a/src/danom/_safe.py b/src/danom/_safe.py index 92cbe02..e5ced41 100644 --- a/src/danom/_safe.py +++ b/src/danom/_safe.py @@ -1,18 +1,20 @@ import functools from collections.abc import Callable from typing import ( + ParamSpec, Self, ) -from danom._err import Err -from danom._ok import Ok -from danom._result import P, Result +from danom._result import Err, Ok, Result + +P = ParamSpec("P") def safe[T, U](func: Callable[[T], U]) -> Callable[[T], Result]: """Decorator for functions that wraps the function in a try except returns `Ok` on success else `Err`. ```python + >>> from danom import safe >>> @safe ... def add_one(a: int) -> int: ... return a + 1 @@ -35,6 +37,7 @@ def safe_method[T, U, E](func: Callable[[T], U]) -> Callable[[T], Result[U, E]]: """The same as `safe` except it forwards on the `self` of the class instance to the wrapped function. ```python + >>> from danom import safe_method >>> class Adder: ... def __init__(self, result: int = 0) -> None: ... self.result = result diff --git a/src/danom/_stream.py b/src/danom/_stream.py index 89963b4..7b3fcf5 100644 --- a/src/danom/_stream.py +++ b/src/danom/_stream.py @@ -5,6 +5,7 @@ from abc import ABC, abstractmethod from collections.abc import Callable, Iterable from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor +from copy import deepcopy from enum import Enum, auto, unique from functools import reduce from typing import Self @@ -27,6 +28,9 @@ def map[T, U](self, *fns: Callable[[T], U]) -> Self: ... @abstractmethod def filter[T](self, *fns: Callable[[T], bool]) -> Self: ... + @abstractmethod + def tap[T](self, *fns: Callable[[T], None]) -> Self: ... + @abstractmethod def partition[T](self, fn: Callable[[T], bool]) -> tuple[Self, Self]: ... @@ -56,6 +60,7 @@ class Stream(_BaseStream): Take this imperative pipeline of operations, it iterates once over the data, skipping the value if it fails one of the filter checks: ```python + >>> from danom import Stream >>> res = [] ... >>> for x in range(1_000_000): @@ -85,6 +90,7 @@ class Stream(_BaseStream): After a bit of experience with python you might use list comprehensions, however this is arguably _less_ clear and iterates multiple times over the same data ```python + >>> from danom import Stream >>> mul_three = [triple(x) for x in range(1_000_000)] >>> gt_ten = [x for x in mul_three if is_gt_ten(x)] >>> sub_two = [min_two(x) for x in gt_ten] @@ -103,6 +109,7 @@ class Stream(_BaseStream): Using a `Stream` results in this: ```python + >>> from danom import Stream >>> ( ... Stream.from_iterable(range(1_000_000)) ... .map(triple) @@ -129,6 +136,7 @@ def from_iterable(cls, it: Iterable) -> Self: """This is the recommended way of creating a `Stream` object. ```python + >>> from danom import Stream >>> Stream.from_iterable([0, 1, 2, 3]).collect() == (0, 1, 2, 3) ``` """ @@ -140,11 +148,13 @@ def map[T, U](self, *fns: Callable[[T], U]) -> Self: """Map a function to the elements in the `Stream`. Will return a new `Stream` with the modified sequence. ```python + >>> from danom import Stream >>> Stream.from_iterable([0, 1, 2, 3]).map(add_one).collect() == (1, 2, 3, 4) ``` This can also be mixed with `safe` functions: ```python + >>> from danom import Stream >>> Stream.from_iterable([0, 1, 2, 3]).map(add_one).collect() == (Ok(inner=1), Ok(inner=2), Ok(inner=3), Ok(inner=4)) >>> @safe @@ -156,6 +166,7 @@ def map[T, U](self, *fns: Callable[[T], U]) -> Self: Simple functions can be passed in sequence to compose more complex transformations ```python + >>> from danom import Stream >>> Stream.from_iterable(range(5)).map(mul_two, add_one).collect() == (1, 3, 5, 7, 9) ``` """ @@ -166,17 +177,55 @@ def filter[T](self, *fns: Callable[[T], bool]) -> Self: """Filter the stream based on a predicate. Will return a new `Stream` with the modified sequence. ```python + >>> from danom import Stream >>> Stream.from_iterable([0, 1, 2, 3]).filter(lambda x: x % 2 == 0).collect() == (0, 2) ``` Simple functions can be passed in sequence to compose more complex filters ```python + >>> from danom import Stream >>> Stream.from_iterable(range(20)).filter(divisible_by_3, divisible_by_5).collect() == (0, 15) ``` """ plan = (*self.ops, *tuple((_OpType.FILTER, fn) for fn in fns)) return Stream(seq=self.seq, ops=plan) + def tap[T](self, *fns: Callable[[T], None]) -> Self: + """Tap the values to another process that returns None. Will return a new `Stream` with the modified sequence. + + The value passed to the tap function will be deep-copied to avoid any modification to the `Stream` item for downstream consumers. + + ```python + >>> from danom import Stream + >>> Stream.from_iterable([0, 1, 2, 3]).tap(log_value).collect() == (0, 1, 2, 3) + ``` + + Simple functions can be passed in sequence for multiple `tap` operations + ```python + >>> from danom import Stream + >>> Stream.from_iterable([0, 1, 2, 3]).tap(log_value, print_value).collect() == (0, 1, 2, 3) + ``` + + `tap` is useful for logging and similar actions without effecting the individual items, in this example eligible and dormant users are logged using `tap`: + + ```python + >>> from danom import Stream + >>> active_users, inactive_users = ( + ... Stream.from_iterable(users).map(parse_user_objects).partition(inactive_users) + ... ) + ... + >>> active_users.filter(eligible_for_promotion).tap(log_eligible_users).map( + ... construct_promo_email, send_with_confirmation + ... ).collect() + ... + >>> inactive_users.tap(log_inactive_users).map( + ... create_dormant_user_entry, add_to_dormant_table + ... ).collect() + ``` + """ + plan = (*self.ops, *tuple((_OpType.TAP, fn) for fn in fns)) + return Stream(seq=self.seq, ops=plan) + def partition[T]( self, fn: Callable[[T], bool], *, workers: int = 1, use_threads: bool = False ) -> tuple[Self, Self]: @@ -184,6 +233,7 @@ def partition[T]( Each partition is independently replayable. ```python + >>> from danom import Stream >>> part1, part2 = Stream.from_iterable([0, 1, 2, 3]).partition(lambda x: x % 2 == 0) >>> part1.collect() == (0, 2) >>> part2.collect() == (1, 3) @@ -191,6 +241,7 @@ def partition[T]( As `partition` triggers an action, the parameters will be forwarded to the `par_collect` call if the `workers` are greater than 1. ```python + >>> from danom import Stream >>> Stream.from_iterable(range(10)).map(add_one, add_one).partition(divisible_by_3, workers=4) >>> part1.map(add_one).par_collect() == (4, 7, 10) >>> part2.collect() == (2, 4, 5, 7, 8, 10, 11) @@ -212,6 +263,7 @@ def fold[T, U]( """Fold the results into a single value. `fold` triggers an action so will incur a `collect`. ```python + >>> from danom import Stream >>> Stream.from_iterable([1, 2, 3, 4]).fold(0, lambda a, b: a + b) == 10 >>> Stream.from_iterable([[1], [2], [3], [4]]).fold([0], lambda a, b: a + b) == [0, 1, 2, 3, 4] >>> Stream.from_iterable([1, 2, 3, 4]).fold(1, lambda a, b: a * b) == 24 @@ -220,6 +272,7 @@ def fold[T, U]( As `fold` triggers an action, the parameters will be forwarded to the `par_collect` call if the `workers` are greater than 1. This will only effect the `collect` that is used to create the iterable to reduce, not the `fold` operation itself. ```python + >>> from danom import Stream >>> Stream.from_iterable([1, 2, 3, 4]).map(some_expensive_fn).fold(0, add, workers=4, use_threads=False) ``` """ @@ -231,6 +284,7 @@ def collect(self) -> tuple: """Materialise the sequence from the `Stream`. ```python + >>> from danom import Stream >>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one) >>> stream.collect() == (1, 2, 3, 4) ``` @@ -243,6 +297,7 @@ def par_collect(self, workers: int = 4, *, use_threads: bool = False) -> tuple: """Materialise the sequence from the `Stream` in parallel. ```python + >>> from danom import Stream >>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one) >>> stream.par_collect() == (1, 2, 3, 4) ``` @@ -250,6 +305,7 @@ def par_collect(self, workers: int = 4, *, use_threads: bool = False) -> tuple: Use the `workers` arg to select the number of workers to use. Use `-1` to use all available processors (except 1). Defaults to `4`. ```python + >>> from danom import Stream >>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one) >>> stream.par_collect(workers=-1) == (1, 2, 3, 4) ``` @@ -257,6 +313,7 @@ def par_collect(self, workers: int = 4, *, use_threads: bool = False) -> tuple: For smaller I/O bound tasks use the `use_threads` flag as True. If False the processing will use `ProcessPoolExecutor` else it will use `ThreadPoolExecutor`. ```python + >>> from danom import Stream >>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one) >>> stream.par_collect(use_threads=True) == (1, 2, 3, 4) ``` @@ -277,12 +334,14 @@ async def async_collect(self) -> tuple: """Async version of collect. Note that all functions in the stream should be `Awaitable`. ```python + >>> from danom import Stream >>> Stream.from_iterable(file_paths).map(async_read_files).async_collect() ``` If there are no operations in the `Stream` then this will act as a normal collect. ```python + >>> from danom import Stream >>> Stream.from_iterable(file_paths).async_collect() ``` """ @@ -297,6 +356,7 @@ async def async_collect(self) -> tuple: class _OpType(Enum): MAP = auto() FILTER = auto() + TAP = auto() class _Nothing(Enum): @@ -315,6 +375,8 @@ def _apply_fns[T, U](elem: T, ops: tuple[tuple[_OpType, Callable], ...]) -> U | res = op_fn(res) elif op == _OpType.FILTER and not op_fn(res): return _Nothing.NOTHING + elif op == _OpType.TAP: + op_fn(deepcopy(res)) return res @@ -325,4 +387,6 @@ async def _async_apply_fns[T, U](elem: T, ops: tuple[tuple[_OpType, Callable], . res = await op_fn(res) elif op == _OpType.FILTER and not await op_fn(res): return _Nothing.NOTHING + elif op == _OpType.TAP: + await op_fn(deepcopy(res)) return res diff --git a/src/danom/_utils.py b/src/danom/_utils.py index bc6fd65..83258f7 100644 --- a/src/danom/_utils.py +++ b/src/danom/_utils.py @@ -3,8 +3,6 @@ import attrs -from danom._result import P - @attrs.define(frozen=True, hash=True, eq=True) class _Compose[T, U]: @@ -23,13 +21,11 @@ def compose[T, U](*fns: Callable[[T], U]) -> Callable[[T], U]: The functions will be called in sequence with the result of one being used as the input for the next. ```python + >>> from danom import compose >>> add_two = compose(add_one, add_one) >>> add_two(0) == 2 - ``` - - ```python - >>> add_two = compose(add_one, add_one, is_even) - >>> add_two(0) == True + >>> add_two_is_even = compose(add_one, add_one, is_even) + >>> add_two_is_even(0) == True ``` """ return _Compose(fns) @@ -47,6 +43,7 @@ def all_of[T](*fns: Callable[[T], bool]) -> Callable[[T], bool]: """True if all of the given functions return True. ```python + >>> from danom import all_of >>> is_valid_user = all_of(is_subscribed, is_active, has_2fa) >>> is_valid_user(user) == True ``` @@ -66,6 +63,7 @@ def any_of[T](*fns: Callable[[T], bool]) -> Callable[[T], bool]: """True if any of the given functions return True. ```python + >>> from danom import any_of >>> is_eligible = any_of(has_coupon, is_vip, is_staff) >>> is_eligible(user) == True ``` @@ -77,6 +75,7 @@ def none_of[T](*fns: Callable[[T], bool]) -> Callable[[T], bool]: """True if none of the given functions return True. ```python + >>> from danom import none_of >>> is_valid = none_of(is_empty, exceeds_size_limit, contains_unsupported_format) >>> is_valid(submission) == True ``` @@ -88,6 +87,7 @@ def identity[T](x: T) -> T: """Basic identity function. ```python + >>> from danom import identity >>> identity("abc") == "abc" >>> identity(1) == 1 >>> identity(ComplexDataType(a=1, b=2, c=3)) == ComplexDataType(a=1, b=2, c=3) @@ -96,10 +96,11 @@ def identity[T](x: T) -> T: return x -def invert(func: Callable[[P], bool]) -> Callable[[P], bool]: +def invert[T](func: Callable[[T], bool]) -> Callable[[T], bool]: """Invert a boolean function so it returns False where it would've returned True. ```python + >>> from danom import invert >>> invert(has_len)("abc") == False >>> invert(has_len)("") == True ``` diff --git a/tests/conftest.py b/tests/conftest.py index 1daf68e..3b972d7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -50,6 +50,11 @@ def safe_add(a: int, b: int) -> Result[int, Exception]: return a + b +@safe +def safe_add_one[T](x: T) -> T: + return x + 1 + + @safe def safe_raise_type_error(_a: Any) -> Result[None, Exception]: # noqa: ANN401 raise TypeError @@ -72,3 +77,19 @@ def add(self, a: int, b: int) -> Self: @safe_method def cls_raises(self, *_args: tuple, **_kwargs: dict) -> None: raise ValueError + + +class ValueLogger: + def __init__(self) -> None: + self.values = [] + + def __call__[T](self, value: T) -> None: + self.values.append(value) + + +class AsyncValueLogger: + def __init__(self) -> None: + self.values = set() + + async def __call__[T](self, value: T) -> None: + self.values.add(value) diff --git a/tests/test_api.py b/tests/test_api.py index b8cc674..77873e4 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1,6 +1,7 @@ import pytest from src.danom import Err, Ok, Result +from tests.conftest import safe_add, safe_add_one @pytest.mark.parametrize( @@ -12,3 +13,23 @@ ) def test_subclass(sub_cls, base_cls): assert isinstance(sub_cls, base_cls) + + +def test_monadic_left_identity(): + assert Result.unit(0).and_then(safe_add, b=1) == safe_add(0, 1) + + +@pytest.mark.parametrize(("monad"), [pytest.param(Ok(1)), pytest.param(Err())]) +def test_monadic_right_identity(monad): + assert monad.and_then(Result.unit) == monad + + +@pytest.mark.parametrize( + ("monad", "f", "g"), + [ + pytest.param(Ok(0), safe_add_one, safe_add_one), + pytest.param(Err(), safe_add_one, safe_add_one), + ], +) +def test_monadic_associativity(monad, f, g): + assert monad.and_then(f).and_then(g) == monad.and_then(lambda x: f(x).and_then(g)) diff --git a/tests/test_err.py b/tests/test_err.py deleted file mode 100644 index 2369178..0000000 --- a/tests/test_err.py +++ /dev/null @@ -1,50 +0,0 @@ -import pytest - -from src.danom import Err - - -@pytest.mark.parametrize( - ("monad", "expected_result", "expected_context"), - [ - pytest.param( - Err(input_args=(), error=TypeError("should raise this")), - None, - pytest.raises(TypeError), - ), - pytest.param( - Err(input_args=(), error=ValueError("should raise this")), - None, - pytest.raises(ValueError), - ), - pytest.param( - Err("some other err representation"), - None, - pytest.raises(ValueError), - ), - ], -) -def test_err_unwrap(monad, expected_result, expected_context): - with expected_context: - assert monad.unwrap() == expected_result - - -@pytest.mark.parametrize( - ("monad", "expected_result"), - [ - pytest.param(Err(input_args=(), error=TypeError("should raise this")), False), - ], -) -def test_err_is_ok(monad, expected_result): - assert monad.is_ok() == expected_result - - -@pytest.mark.parametrize( - ("err", "expected_details"), - [ - pytest.param(TypeError("an invalid type"), []), - pytest.param("A primative err", []), - ], -) -def test_err_details(err, expected_details): - monad = Err(error=err) - assert monad.details == expected_details diff --git a/tests/test_ok.py b/tests/test_ok.py deleted file mode 100644 index fad71d2..0000000 --- a/tests/test_ok.py +++ /dev/null @@ -1,28 +0,0 @@ -from contextlib import nullcontext - -import pytest - -from src.danom import Ok - - -@pytest.mark.parametrize( - ("monad", "expected_result", "expected_context"), - [ - pytest.param(Ok(None), None, nullcontext()), - pytest.param(Ok(0), 0, nullcontext()), - pytest.param(Ok("ok"), "ok", nullcontext()), - ], -) -def test_ok_unwrap(monad, expected_result, expected_context): - with expected_context: - assert monad.unwrap() == expected_result - - -@pytest.mark.parametrize( - ("monad", "expected_result"), - [ - pytest.param(Ok(None), True), - ], -) -def test_ok_is_ok(monad, expected_result): - assert monad.is_ok() == expected_result diff --git a/tests/test_result.py b/tests/test_result.py index 53d7d1c..f1ce39f 100644 --- a/tests/test_result.py +++ b/tests/test_result.py @@ -1,6 +1,22 @@ +from contextlib import nullcontext + import pytest -from src.danom._result import Result, T +from src.danom import Err, Ok, Result +from tests.conftest import add_one + + +@pytest.mark.parametrize( + ("monad", "inner"), + [ + pytest.param(Ok, 0), + pytest.param(Ok, "ok"), + pytest.param(Err, 0), + pytest.param(Result, 0), + ], +) +def test_unit(monad, inner): + assert monad.unit(inner) == Ok(inner) @pytest.mark.parametrize( @@ -14,13 +30,63 @@ def test_result_unwrap(monad, expected_result, expected_context): assert monad().unwrap() == expected_result +@pytest.mark.parametrize( + ("monad", "expected_result", "expected_context"), + [ + pytest.param(Ok(None), None, nullcontext()), + pytest.param(Ok(0), 0, nullcontext()), + pytest.param(Ok("ok"), "ok", nullcontext()), + pytest.param( + Err(error=TypeError("should raise this"), input_args=()), + None, + pytest.raises(TypeError), + ), + pytest.param( + Err(error=ValueError("should raise this"), input_args=()), + None, + pytest.raises(ValueError), + ), + pytest.param( + Err("some other err representation"), + None, + pytest.raises(ValueError), + ), + ], +) +def test_unwrap(monad, expected_result, expected_context): + with expected_context: + assert monad.unwrap() == expected_result + + +@pytest.mark.parametrize( + ("monad", "expected_result"), + [ + pytest.param(Ok(None), True), + pytest.param(Err(), False), + ], +) +def test_is_ok(monad, expected_result): + assert monad.is_ok() == expected_result + + +@pytest.mark.parametrize( + ("monad", "func", "expected_result"), + [ + pytest.param(Ok(0), add_one, Ok(1)), + pytest.param(Err(), add_one, Err()), + ], +) +def test_map(monad, func, expected_result): + assert monad.map(func) == expected_result + + class OnlyIsOk(Result): def is_ok(self) -> bool: return False class OnlyUnwrap(Result): - def unwrap(self) -> T: + def unwrap(self) -> None: return None @@ -34,3 +100,15 @@ def unwrap(self) -> T: def test_raises_not_implemented(cls): with pytest.raises(TypeError): cls() + + +@pytest.mark.parametrize( + ("err", "expected_details"), + [ + pytest.param(TypeError("an invalid type"), []), + pytest.param("A primative err", []), + ], +) +def test_err_details(err, expected_details): + monad = Err(error=err) + assert monad.details == expected_details diff --git a/tests/test_stream.py b/tests/test_stream.py index cb93532..40a1d49 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -5,6 +5,8 @@ from src.danom import Stream from tests.conftest import ( REPO_ROOT, + AsyncValueLogger, + ValueLogger, add, add_one, async_is_file, @@ -97,3 +99,27 @@ async def test_async_collect_no_fns(): Path(f"{REPO_ROOT}/tests/mock_data/file_b.py"), Path(f"{REPO_ROOT}/tests/mock_data/file_c.py"), ) + + +def test_tap(): + val_logger = ValueLogger() + val_logger_2 = ValueLogger() + + assert Stream.from_iterable(range(4)).tap(val_logger, val_logger_2).collect() == (0, 1, 2, 3) + assert sorted(val_logger.values) == [0, 1, 2, 3] + assert sorted(val_logger_2.values) == [0, 1, 2, 3] + + +@pytest.mark.asyncio +async def test_async_tap(): + val_logger = AsyncValueLogger() + val_logger_2 = AsyncValueLogger() + + assert await Stream.from_iterable(range(4)).tap(val_logger, val_logger_2).async_collect() == ( + 0, + 1, + 2, + 3, + ) + assert sorted(val_logger.values) == [0, 1, 2, 3] + assert sorted(val_logger_2.values) == [0, 1, 2, 3] diff --git a/uv.lock b/uv.lock index f2ee067..a5de28e 100644 --- a/uv.lock +++ b/uv.lock @@ -189,7 +189,7 @@ wheels = [ [[package]] name = "danom" -version = "0.7.0" +version = "0.8.0" source = { editable = "." } dependencies = [ { name = "attrs" },