From 1c1ad0c3baac04d7545c792b506baeacae72c2ec Mon Sep 17 00:00:00 2001 From: ed cuss Date: Sun, 7 Dec 2025 17:27:48 +0000 Subject: [PATCH 1/9] refactor: compose to utils --- README.md | 3 ++- src/danom/__init__.py | 4 ++-- src/danom/_stream.py | 26 ++------------------- src/danom/_utils.py | 30 +++++++++++++++++++++---- tests/conftest.py | 49 ++++++++++++++++++++++++++++++++++++++++ tests/test_new_type.py | 5 +---- tests/test_safe.py | 51 +++++++++--------------------------------- tests/test_stream.py | 26 ++------------------- tests/test_utils.py | 24 +++++++++++++------- 9 files changed, 110 insertions(+), 108 deletions(-) create mode 100644 tests/conftest.py diff --git a/README.md b/README.md index 330513a..6a5aa26 100644 --- a/README.md +++ b/README.md @@ -330,7 +330,7 @@ The same as `safe` except it forwards on the `self` of the class instance to the ### `compose` ```python -compose(*fns: 'Callable[[T], U]') -> 'Callable[[T], U]' +compose(*fns: collections.abc.Callable[[T], U]) -> collections.abc.Callable[[T], U] ``` Compose multiple functions into one. @@ -432,6 +432,7 @@ Alternatively the map method can be used to return a new type instance with the │ └── _utils.py ├── tests │ ├── __init__.py +│ ├── conftest.py │ ├── test_api.py │ ├── test_err.py │ ├── test_new_type.py diff --git a/src/danom/__init__.py b/src/danom/__init__.py index 3ec2f4d..f52c20b 100644 --- a/src/danom/__init__.py +++ b/src/danom/__init__.py @@ -3,8 +3,8 @@ from danom._ok import Ok from danom._result import Result from danom._safe import safe, safe_method -from danom._stream import ParStream, Stream, compose -from danom._utils import identity, invert +from danom._stream import ParStream, Stream +from danom._utils import compose, identity, invert __all__ = [ "Err", diff --git a/src/danom/_stream.py b/src/danom/_stream.py index d94b96e..b73c92e 100644 --- a/src/danom/_stream.py +++ b/src/danom/_stream.py @@ -9,6 +9,8 @@ import attrs +from danom._utils import compose + @attrs.define(frozen=True) class _BaseStream(ABC): @@ -215,30 +217,6 @@ def collect(self, workers: int = 4, *, use_threads: bool = False) -> tuple: return tuple(elem for elem in res if elem != _Nothing.NOTHING) -def compose[T, U](*fns: Callable[[T], U]) -> Callable[[T], U]: - """Compose multiple functions into one. - - The functions will be called in sequence with the result of one being used as the input for the next. - - ```python - >>> add_two = compose(add_one, add_one) - >>> add_two(0) == 2 - ``` - - ```python - >>> add_two = compose(add_one, add_one, is_even) - >>> add_two(0) == True - ``` - """ - - def wrapper(value: T) -> U: - for fn in fns: - value = fn(value) - return value - - return wrapper - - @unique class _OpType(Enum): MAP = auto() diff --git a/src/danom/_utils.py b/src/danom/_utils.py index 835c88b..9c60fa3 100644 --- a/src/danom/_utils.py +++ b/src/danom/_utils.py @@ -1,8 +1,33 @@ from collections.abc import Callable +from operator import not_ from danom._result import P +def compose[T, U](*fns: Callable[[T], U]) -> Callable[[T], U]: + """Compose multiple functions into one. + + The functions will be called in sequence with the result of one being used as the input for the next. + + ```python + >>> add_two = compose(add_one, add_one) + >>> add_two(0) == 2 + ``` + + ```python + >>> add_two = compose(add_one, add_one, is_even) + >>> add_two(0) == True + ``` + """ + + def wrapper(value: T) -> U: + for fn in fns: + value = fn(value) + return value + + return wrapper + + def identity[T](x: T) -> T: """Basic identity function. @@ -24,7 +49,4 @@ def invert(func: Callable[[P], bool]) -> Callable[[P], bool]: ``` """ - def wrapper(*args: P.args, **kwargs: P.kwargs) -> bool: - return not func(*args, **kwargs) - - return wrapper + return compose(func, not_) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..46086f8 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,49 @@ +from typing import Any, Self + +from src.danom import safe, safe_method +from src.danom._result import Result + + +def has_len(value: str) -> bool: + return len(value) > 0 + + +def add_one(x: int) -> int: + return x + 1 + + +def divisible_by_3(x: int) -> bool: + return x % 3 == 0 + + +def divisible_by_5(x: int) -> bool: + return x % 5 == 0 + + +@safe +def safe_add(a: int, b: int) -> Result[int, Exception]: + return a + b + + +@safe +def safe_raise_type_error(_a: Any) -> Result[None, Exception]: # noqa: ANN401 + raise TypeError + + +@safe +def safe_get_error_type(exception: Exception) -> str: + return exception.__class__.__name__ + + +class Adder: + def __init__(self) -> None: + self.result = 0 + + @safe_method + def add(self, a: int, b: int) -> Self: + self.result += a + b + return self + + @safe_method + def cls_raises(self, *_args: tuple, **_kwargs: dict) -> None: + raise ValueError diff --git a/tests/test_new_type.py b/tests/test_new_type.py index ff590bb..21bf000 100644 --- a/tests/test_new_type.py +++ b/tests/test_new_type.py @@ -3,10 +3,7 @@ import pytest from src.danom import new_type - - -def has_len(value: str) -> bool: - return len(value) > 0 +from tests.conftest import has_len @pytest.mark.parametrize( diff --git a/tests/test_safe.py b/tests/test_safe.py index 2147446..913fbf2 100644 --- a/tests/test_safe.py +++ b/tests/test_safe.py @@ -1,40 +1,23 @@ from functools import partial -from typing import Any, Self import pytest -from src.danom import safe, safe_method -from src.danom._result import Result - - -@safe -def mock_func(a: int, b: int) -> Result[int, Exception]: - return a + b - - -@safe -def mock_func_raises(_a: Any) -> Result[None, Exception]: - raise TypeError - - -@safe -def mock_get_error_type(exception: Exception) -> str: - return exception.__class__.__name__ +from tests.conftest import Adder, safe_add, safe_get_error_type, safe_raise_type_error def test_valid_safe_pipeline(): pipeline = ( - mock_func(1, 2) - .and_then(mock_func, b=1) - .and_then(mock_func, b=1) - .match(partial(mock_func, b=1), mock_get_error_type) + safe_add(1, 2) + .and_then(safe_add, b=1) + .and_then(safe_add, b=1) + .match(partial(safe_add, b=1), safe_get_error_type) ) assert pipeline.is_ok() assert pipeline.unwrap() == 6 def test_invalid_safe_pipeline(): - pipeline = mock_func(1, 2).and_then(mock_func_raises).and_then(mock_func, b=1) + pipeline = safe_add(1, 2).and_then(safe_raise_type_error).and_then(safe_add, b=1) assert not pipeline.is_ok() with pytest.raises(TypeError): @@ -43,29 +26,15 @@ def test_invalid_safe_pipeline(): def test_invalid_safe_pipeline_with_match(): pipeline = ( - mock_func(1, 2) - .and_then(mock_func_raises) - .and_then(mock_func, b=1) - .match(partial(mock_func, b=1), mock_get_error_type) + safe_add(1, 2) + .and_then(safe_raise_type_error) + .and_then(safe_add, b=1) + .match(partial(safe_add, b=1), safe_get_error_type) ) assert pipeline.is_ok() assert pipeline.unwrap() == "TypeError" -class Adder: - def __init__(self): - self.result = 0 - - @safe_method - def add(self, a: int, b: int) -> Self: - self.result += a + b - return self - - @safe_method - def cls_raises(self, *args, **kwargs) -> None: - raise ValueError - - def test_valid_safe_method_pipeline(): cls = Adder() res = cls.add(2, 2).add(2, 2).add(2, 2) diff --git a/tests/test_stream.py b/tests/test_stream.py index 3ff7207..312f383 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -1,6 +1,7 @@ import pytest -from src.danom import ParStream, Stream, compose +from src.danom import ParStream, Stream +from tests.conftest import add_one, divisible_by_3, divisible_by_5 @pytest.mark.parametrize( @@ -26,18 +27,6 @@ def test_stream_with_multiple_fns(): ) == (5,) -def add_one(x: int) -> int: - return x + 1 - - -def divisible_by_3(x: int) -> bool: - return x % 3 == 0 - - -def divisible_by_5(x: int) -> bool: - return x % 5 == 0 - - @pytest.mark.parametrize( ("it", "n_workers", "expected_result"), [ @@ -71,14 +60,3 @@ def test_stream_to_par_stream(): def test_par_stream_partition(): with pytest.raises(NotImplementedError): ParStream.from_iterable(range(10)).partition(divisible_by_3) - - -@pytest.mark.parametrize( - ("inp_args", "fns", "expected_result"), - [ - pytest.param(0, (add_one, add_one), 2), - pytest.param(0, (add_one, divisible_by_3), False), - ], -) -def test_compose(inp_args, fns, expected_result): - assert compose(*fns)(inp_args) == expected_result diff --git a/tests/test_utils.py b/tests/test_utils.py index ec3e0ba..c23062d 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,14 +1,26 @@ import pytest -from src.danom import Ok, identity, invert +from src.danom import Ok, compose, identity, invert +from tests.conftest import add_one, divisible_by_3, has_len + + +@pytest.mark.parametrize( + ("inp_args", "fns", "expected_result"), + [ + pytest.param(0, (add_one, add_one), 2), + pytest.param(0, (add_one, divisible_by_3), False), + ], +) +def test_compose(inp_args, fns, expected_result): + assert compose(*fns)(inp_args) == expected_result @pytest.mark.parametrize( "x", [ - pytest.param(1, id="identity on a primative datatype (int)"), - pytest.param("abc", id="identity on a primative datatype (str)"), - pytest.param([0, 1, 2], id="identity on a primative datatype (list)"), + pytest.param(1, id="identity on a primitive datatype (int)"), + pytest.param("abc", id="identity on a primitive datatype (str)"), + pytest.param([0, 1, 2], id="identity on a primitive datatype (list)"), pytest.param(Ok(1), id="identity on a more complex datatype"), ], ) @@ -16,10 +28,6 @@ def test_identity(x): assert identity(x) == x -def has_len(inp_str: str) -> bool: - return len(inp_str) > 0 - - @pytest.mark.parametrize( ("input_args", "fn", "expected_result"), [ From c0a0b0f47f67d6c439fd828faca3ad3c99041aa5 Mon Sep 17 00:00:00 2001 From: ed cuss Date: Sun, 7 Dec 2025 20:51:25 +0000 Subject: [PATCH 2/9] fix: use more efficient stream --- src/danom/_stream.py | 30 +++++++++++++----------------- src/danom/_utils.py | 44 +++++++++++++++++++++++++++++++++++++------- tests/test_utils.py | 28 +++++++++++++++++++++++++++- 3 files changed, 77 insertions(+), 25 deletions(-) diff --git a/src/danom/_stream.py b/src/danom/_stream.py index b73c92e..1fb8c3f 100644 --- a/src/danom/_stream.py +++ b/src/danom/_stream.py @@ -2,15 +2,13 @@ import os from abc import ABC, abstractmethod -from collections.abc import Callable, Generator, Iterable +from collections.abc import Callable, Iterable from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from enum import Enum, auto, unique from typing import Self import attrs -from danom._utils import compose - @attrs.define(frozen=True) class _BaseStream(ABC): @@ -28,9 +26,8 @@ def partition[T](self, fn: Callable[[T], bool]) -> tuple[Self, Self]: ... class Stream(_BaseStream): """A lazy iterator with functional operations.""" - seq: Callable[[], Iterable] = attrs.field( - validator=attrs.validators.instance_of(Callable), repr=False - ) + seq: Iterable = attrs.field(validator=attrs.validators.instance_of(Iterable), repr=False) + ops: tuple = attrs.field(default=(), validator=attrs.validators.instance_of(tuple), repr=False) @classmethod def from_iterable(cls, it: Iterable) -> Self: @@ -42,7 +39,7 @@ def from_iterable(cls, it: Iterable) -> Self: """ if not isinstance(it, Iterable): it = [it] - return cls(lambda: iter(it)) + return cls(seq=iter(it)) def to_par_stream(self) -> ParStream: """Convert `Stream` to `ParStream`. This will incur a `collect`. @@ -77,12 +74,8 @@ def map[T, U](self, *fns: Callable[[T], U]) -> Self: >>> Stream.from_iterable(range(5)).map(mul_two, add_one).collect() == (1, 3, 5, 7, 9) ``` """ - - def generator() -> Generator[U, None, None]: - for elem in self.seq(): - yield compose(*fns)(elem) - - return Stream(generator) + plan = (*self.ops, *tuple((_OpType.MAP, fn) for fn in fns)) + return Stream(seq=self.seq, ops=plan) def filter[T](self, *fns: Callable[[T], bool]) -> Self: """Filter the stream based on a predicate. Will return a new `Stream` with the modified sequence. @@ -96,7 +89,8 @@ def filter[T](self, *fns: Callable[[T], bool]) -> Self: >>> Stream.from_iterable(range(20)).filter(divisible_by_3, divisible_by_5).collect() == (0, 15) ``` """ - return Stream(lambda: (x for x in self.seq() if all(fn(x) for fn in fns))) + plan = (*self.ops, *tuple((_OpType.FILTER, fn) for fn in fns)) + return Stream(seq=self.seq, ops=plan) def partition[T](self, fn: Callable[[T], bool]) -> tuple[Self, Self]: """Similar to `filter` except splits the True and False values. Will return a two new `Stream` with the partitioned sequences. @@ -111,8 +105,8 @@ def partition[T](self, fn: Callable[[T], bool]) -> tuple[Self, Self]: # have to materialise to be able to replay each side independently seq_tuple = self.collect() return ( - Stream(lambda: (x for x in seq_tuple if fn(x))), - Stream(lambda: (x for x in seq_tuple if not fn(x))), + Stream(seq=(x for x in seq_tuple if fn(x))), + Stream(seq=(x for x in seq_tuple if not fn(x))), ) def collect(self) -> tuple: @@ -123,7 +117,9 @@ def collect(self) -> tuple: >>> stream.collect() == (1, 2, 3, 4) ``` """ - return tuple(self.seq()) + return tuple( + elem for x in self.seq if (elem := _apply_fns(x, self.ops)) != _Nothing.NOTHING + ) @attrs.define(frozen=True) diff --git a/src/danom/_utils.py b/src/danom/_utils.py index 9c60fa3..60ee73a 100644 --- a/src/danom/_utils.py +++ b/src/danom/_utils.py @@ -1,9 +1,22 @@ -from collections.abc import Callable +from collections.abc import Callable, Sequence from operator import not_ +import attrs + from danom._result import P +@attrs.define(frozen=True, hash=True, eq=True) +class _Composer[T, U]: + fns: Sequence[Callable[[T], U]] + + def __call__(self, initial: T) -> U: + value = initial + for fn in self.fns: + value = fn(value) + return value + + def compose[T, U](*fns: Callable[[T], U]) -> Callable[[T], U]: """Compose multiple functions into one. @@ -19,13 +32,31 @@ def compose[T, U](*fns: Callable[[T], U]) -> Callable[[T], U]: >>> add_two(0) == True ``` """ + return _Composer(fns) - def wrapper(value: T) -> U: - for fn in fns: - value = fn(value) - return value - return wrapper +@attrs.define(frozen=True, hash=True, eq=True) +class _AllOf[T, U]: + fns: Sequence[Callable[[T], U]] + + def __call__(self, initial: T) -> U: + return all(fn(initial) for fn in self.fns) + + +def all_of[T](*fns: Callable[[T], bool]) -> Callable[[T], bool]: + return _AllOf(fns) + + +@attrs.define(frozen=True, hash=True, eq=True) +class _AnyOf[T, U]: + fns: Sequence[Callable[[T], U]] + + def __call__(self, initial: T) -> U: + return any(fn(initial) for fn in self.fns) + + +def any_of[T](*fns: Callable[[T], bool]) -> Callable[[T], bool]: + return _AnyOf(fns) def identity[T](x: T) -> T: @@ -48,5 +79,4 @@ def invert(func: Callable[[P], bool]) -> Callable[[P], bool]: >>> invert(has_len)("") == True ``` """ - return compose(func, not_) diff --git a/tests/test_utils.py b/tests/test_utils.py index c23062d..38baeee 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,7 +1,8 @@ import pytest from src.danom import Ok, compose, identity, invert -from tests.conftest import add_one, divisible_by_3, has_len +from src.danom._utils import all_of, any_of +from tests.conftest import add_one, divisible_by_3, divisible_by_5, has_len @pytest.mark.parametrize( @@ -15,6 +16,31 @@ def test_compose(inp_args, fns, expected_result): assert compose(*fns)(inp_args) == expected_result +@pytest.mark.parametrize( + ("inp_args", "fns", "expected_result"), + [ + pytest.param(3, (divisible_by_3, divisible_by_5), False), + pytest.param(5, (divisible_by_3, divisible_by_5), False), + pytest.param(15, (divisible_by_3, divisible_by_5), True), + ], +) +def test_all_of(inp_args, fns, expected_result): + assert all_of(*fns)(inp_args) == expected_result + + +@pytest.mark.parametrize( + ("inp_args", "fns", "expected_result"), + [ + pytest.param(3, (divisible_by_3, divisible_by_5), True), + pytest.param(5, (divisible_by_3, divisible_by_5), True), + pytest.param(15, (divisible_by_3, divisible_by_5), True), + pytest.param(7, (divisible_by_3, divisible_by_5), False), + ], +) +def test_any_of(inp_args, fns, expected_result): + assert any_of(*fns)(inp_args) == expected_result + + @pytest.mark.parametrize( "x", [ From bef9066783a3701f6e13c6b3a6e1bb721be308df Mon Sep 17 00:00:00 2001 From: ed cuss Date: Sun, 7 Dec 2025 21:43:26 +0000 Subject: [PATCH 3/9] refactor: move common attrs to base class --- src/danom/_stream.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/danom/_stream.py b/src/danom/_stream.py index 1fb8c3f..03f065d 100644 --- a/src/danom/_stream.py +++ b/src/danom/_stream.py @@ -12,6 +12,9 @@ @attrs.define(frozen=True) class _BaseStream(ABC): + seq: Iterable = attrs.field(validator=attrs.validators.instance_of(Iterable), repr=False) + ops: tuple = attrs.field(default=(), validator=attrs.validators.instance_of(tuple), repr=False) + @abstractmethod def map[T, U](self, *fns: Callable[[T], U]) -> Self: ... @@ -26,9 +29,6 @@ def partition[T](self, fn: Callable[[T], bool]) -> tuple[Self, Self]: ... class Stream(_BaseStream): """A lazy iterator with functional operations.""" - seq: Iterable = attrs.field(validator=attrs.validators.instance_of(Iterable), repr=False) - ops: tuple = attrs.field(default=(), validator=attrs.validators.instance_of(tuple), repr=False) - @classmethod def from_iterable(cls, it: Iterable) -> Self: """This is the recommended way of creating a `Stream` object. @@ -126,9 +126,6 @@ def collect(self) -> tuple: class ParStream(_BaseStream): """A parallel iterator with functional operations.""" - seq: Iterable = attrs.field(validator=attrs.validators.instance_of(Iterable), repr=False) - ops: tuple = attrs.field(default=(), validator=attrs.validators.instance_of(tuple), repr=False) - @classmethod def from_iterable(cls, it: Iterable) -> Self: """This is the recommended way of creating a `ParStream` object. From 66affde7ab60e6398460dd685dca023996372bd4 Mon Sep 17 00:00:00 2001 From: ed cuss Date: Sun, 7 Dec 2025 21:58:33 +0000 Subject: [PATCH 4/9] fix: rename compose class --- src/danom/_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/danom/_utils.py b/src/danom/_utils.py index 60ee73a..a79f74d 100644 --- a/src/danom/_utils.py +++ b/src/danom/_utils.py @@ -7,7 +7,7 @@ @attrs.define(frozen=True, hash=True, eq=True) -class _Composer[T, U]: +class _Compose[T, U]: fns: Sequence[Callable[[T], U]] def __call__(self, initial: T) -> U: @@ -32,7 +32,7 @@ def compose[T, U](*fns: Callable[[T], U]) -> Callable[[T], U]: >>> add_two(0) == True ``` """ - return _Composer(fns) + return _Compose(fns) @attrs.define(frozen=True, hash=True, eq=True) From b58aaa37561132565141aaa062cfe9961a1d9ba1 Mon Sep 17 00:00:00 2001 From: ed cuss Date: Sun, 7 Dec 2025 22:07:04 +0000 Subject: [PATCH 5/9] docs: add docs for any_of and all_of --- README.md | 28 ++++++++++++++++++++++++++++ dev_tools/update_readme.py | 17 +++++++++++++++-- src/danom/__init__.py | 4 +++- src/danom/_utils.py | 14 ++++++++++++++ 4 files changed, 60 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 6a5aa26..af84945 100644 --- a/README.md +++ b/README.md @@ -347,6 +347,34 @@ The functions will be called in sequence with the result of one being used as th ``` +## all_of + +### `all_of` +```python +all_of(*fns: collections.abc.Callable[[T], bool]) -> collections.abc.Callable[[T], bool] +``` +True if all of the given functions return True. + +```python +>>> is_valid_user = all_of(is_subscribed, is_active, has_2fa) +>>> is_valid_user(user) == True +``` + + +## any_of + +### `any_of` +```python +any_of(*fns: collections.abc.Callable[[T], bool]) -> collections.abc.Callable[[T], bool] +``` +True if any of the given functions return True. + +```python +>>> is_eligible = any_of(has_coupon, is_vip, is_staff) +>>> is_eligible(user) == True +``` + + ## identity ### `identity` diff --git a/dev_tools/update_readme.py b/dev_tools/update_readme.py index 72373e6..492449d 100644 --- a/dev_tools/update_readme.py +++ b/dev_tools/update_readme.py @@ -5,7 +5,20 @@ import attrs -from danom import Err, Ok, ParStream, Stream, compose, identity, invert, new_type, safe, safe_method +from danom import ( + Err, + Ok, + ParStream, + Stream, + all_of, + any_of, + compose, + identity, + invert, + new_type, + safe, + safe_method, +) @attrs.define(frozen=True) @@ -32,7 +45,7 @@ def create_readme_lines() -> str: ] readme_lines.extend([entry.to_readme() for entry in readme_docs]) - for fn in [safe, safe_method, compose, identity, invert, new_type]: + for fn in [safe, safe_method, compose, all_of, any_of, identity, invert, new_type]: readme_lines.append(f"## {fn.__name__}") readme_docs = [ReadmeDoc(f"{fn.__name__}", inspect.signature(fn), fn.__doc__)] readme_lines.extend([entry.to_readme() for entry in readme_docs]) diff --git a/src/danom/__init__.py b/src/danom/__init__.py index f52c20b..9ee081a 100644 --- a/src/danom/__init__.py +++ b/src/danom/__init__.py @@ -4,7 +4,7 @@ from danom._result import Result from danom._safe import safe, safe_method from danom._stream import ParStream, Stream -from danom._utils import compose, identity, invert +from danom._utils import all_of, any_of, compose, identity, invert __all__ = [ "Err", @@ -12,6 +12,8 @@ "ParStream", "Result", "Stream", + "all_of", + "any_of", "compose", "identity", "invert", diff --git a/src/danom/_utils.py b/src/danom/_utils.py index a79f74d..a6f99a3 100644 --- a/src/danom/_utils.py +++ b/src/danom/_utils.py @@ -44,6 +44,13 @@ def __call__(self, initial: T) -> U: def all_of[T](*fns: Callable[[T], bool]) -> Callable[[T], bool]: + """True if all of the given functions return True. + + ```python + >>> is_valid_user = all_of(is_subscribed, is_active, has_2fa) + >>> is_valid_user(user) == True + ``` + """ return _AllOf(fns) @@ -56,6 +63,13 @@ def __call__(self, initial: T) -> U: def any_of[T](*fns: Callable[[T], bool]) -> Callable[[T], bool]: + """True if any of the given functions return True. + + ```python + >>> is_eligible = any_of(has_coupon, is_vip, is_staff) + >>> is_eligible(user) == True + ``` + """ return _AnyOf(fns) From c7f6b3a2b9f20fb36aa429b75b4a3ad0b7a45a73 Mon Sep 17 00:00:00 2001 From: ed cuss Date: Mon, 8 Dec 2025 19:35:26 +0000 Subject: [PATCH 6/9] feat: update coverage badge --- .pre-commit-config.yaml | 10 ++++++- README.md | 3 ++- coverage.svg | 16 ++++++++++++ dev_tools/update_cov.py | 58 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 85 insertions(+), 2 deletions(-) create mode 100644 coverage.svg create mode 100644 dev_tools/update_cov.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index afcc130..89d0742 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,3 +1,5 @@ +exclude: | + (^coverage\.json$) repos: - repo: https://github.com/astral-sh/uv-pre-commit rev: 0.5.26 @@ -18,7 +20,7 @@ repos: types_or: [ python, pyi, jupyter ] - id: pytest name: pytest - entry: uv run pytest -vv --cov=src --cov-report term-missing:skip-covered + entry: uv run pytest -vv --cov=src --cov-report term-missing:skip-covered --cov-report=json:coverage.json language: python pass_filenames: false always_run: true @@ -44,4 +46,10 @@ repos: entry: uv run -m dev_tools.update_readme language: python pass_filenames: false + always_run: true + - id: update-cov + name: update_cov + entry: uv run -m dev_tools.update_cov + language: python + pass_filenames: false always_run: true \ No newline at end of file diff --git a/README.md b/README.md index af84945..eb02020 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # danom -[![PyPI Downloads](https://static.pepy.tech/personalized-badge/danom?period=total&units=INTERNATIONAL_SYSTEM&left_color=BLACK&right_color=BLUE&left_text=downloads)](https://pepy.tech/projects/danom) +[![PyPI Downloads](https://static.pepy.tech/personalized-badge/danom?period=total&units=INTERNATIONAL_SYSTEM&left_color=BLACK&right_color=BLUE&left_text=downloads)](https://pepy.tech/projects/danom) ![coverage](./coverage.svg) # API Reference @@ -447,6 +447,7 @@ Alternatively the map method can be used to return a new type instance with the │ └── publish.yaml ├── dev_tools │ ├── __init__.py +│ ├── update_cov.py │ └── update_readme.py ├── src │ └── danom diff --git a/coverage.svg b/coverage.svg new file mode 100644 index 0000000..16c3628 --- /dev/null +++ b/coverage.svg @@ -0,0 +1,16 @@ + + + + + + + + + + + coverage + 100.0% + + \ No newline at end of file diff --git a/dev_tools/update_cov.py b/dev_tools/update_cov.py new file mode 100644 index 0000000..f6563e7 --- /dev/null +++ b/dev_tools/update_cov.py @@ -0,0 +1,58 @@ +import json +import re +import sys +from pathlib import Path + +REPO_ROOT = Path(__file__).parents[1] + +BADGE_STR = """ + + + + + + + + + + coverage + {pct}% + +""" + + +def update_cov_badge(root: str) -> int: + cov_report = json.loads(Path(f"{root}/coverage.json").read_text()) + new_pct = cov_report["totals"]["percent_covered"] + + curr_badge = Path(f"{root}/coverage.svg").read_text() + curr_pct = float( + re.findall(r'([0-9]+(?:\.[0-9]+)?)%', curr_badge)[0] + ) + + if new_pct == curr_pct: + return 0 + + Path(f"{root}/coverage.svg").write_text(make_badge(BADGE_STR, new_pct)) + return 1 + + +def make_badge(badge_str: str, pct: int) -> str: + colour = ( + "red" + if pct < 50 # noqa: PLR2004 + else "orange" + if pct < 70 # noqa: PLR2004 + else "yellow" + if pct < 80 # noqa: PLR2004 + else "yellowgreen" + if pct < 90 # noqa: PLR2004 + else "green" + ) + return badge_str.format(colour=colour, pct=pct) + + +if __name__ == "__main__": + sys.exit(update_cov_badge(REPO_ROOT)) From 4ecbf5d8692b77e512a77ae20fb4c44a62a55ff9 Mon Sep 17 00:00:00 2001 From: ed cuss Date: Mon, 8 Dec 2025 19:36:16 +0000 Subject: [PATCH 7/9] fix: remove exclude from pre-commit --- .pre-commit-config.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 89d0742..a5a1ad2 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,5 +1,3 @@ -exclude: | - (^coverage\.json$) repos: - repo: https://github.com/astral-sh/uv-pre-commit rev: 0.5.26 From 4381e6fef9903c22fd2872a57d6d5c3216ee3973 Mon Sep 17 00:00:00 2001 From: ed cuss Date: Mon, 8 Dec 2025 21:00:04 +0000 Subject: [PATCH 8/9] refactor: combine Stream and ParStream --- README.md | 107 ++++++++----------------------------- dev_tools/update_readme.py | 3 +- src/danom/__init__.py | 3 +- src/danom/_stream.py | 105 ++++++++++-------------------------- tests/test_stream.py | 22 +++----- 5 files changed, 57 insertions(+), 183 deletions(-) diff --git a/README.md b/README.md index eb02020..a09ffbc 100644 --- a/README.md +++ b/README.md @@ -178,113 +178,50 @@ Simple functions can be passed in sequence to compose more complex transformatio ``` -### `Stream.partition` +### `Stream.par_collect` ```python -Stream.partition(self, fn: 'Callable[[T], bool]') -> 'tuple[Self, Self]' +Stream.par_collect(self, workers: 'int' = 4, *, use_threads: 'bool' = False) -> 'tuple' ``` -Similar to `filter` except splits the True and False values. Will return a two new `Stream` with the partitioned sequences. +Materialise the sequence from the `Stream` in parallel. -Each partition is independently replayable. ```python ->>> part1, part2 = Stream.from_iterable([0, 1, 2, 3]).partition(lambda x: x % 2 == 0) ->>> part1.collect() == (0, 2) ->>> part2.collect() == (1, 3) -``` - - -### `Stream.to_par_stream` -```python -Stream.to_par_stream(self) -> 'ParStream' -``` -Convert `Stream` to `ParStream`. This will incur a `collect`. - -```python ->>> Stream.from_iterable([0, 1, 2, 3]).to_par_stream().map(some_expensive_cpu_task).collect() == (1, 2, 3, 4) - -``` - - -## ParStream - -A parallel iterator with functional operations. - -### `ParStream.collect` -```python -ParStream.collect(self, workers: 'int' = 4, *, use_threads: 'bool' = False) -> 'tuple' -``` -Materialise the sequence from the `ParStream`. - -```python ->>> stream = ParStream.from_iterable([0, 1, 2, 3]).map(add_one) ->>> stream.collect() == (1, 2, 3, 4) +>>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one) +>>> stream.par_collect() == (1, 2, 3, 4) ``` Use the `workers` arg to select the number of workers to use. Use `-1` to use all available processors (except 1). Defaults to `4`. ```python ->>> stream = ParStream.from_iterable([0, 1, 2, 3]).map(add_one) ->>> stream.collect(workers=-1) == (1, 2, 3, 4) -``` - -For smaller I/O bound tasks use the `use_threads` flag as True -```python ->>> stream = ParStream.from_iterable([0, 1, 2, 3]).map(add_one) ->>> stream.collect(use_threads=True) == (1, 2, 3, 4) -``` - - -### `ParStream.filter` -```python -ParStream.filter(self, *fns: 'Callable[[T], bool]') -> 'Self' -``` -Filter the par stream based on a predicate. Will return a new `ParStream` with the modified sequence. - -```python ->>> ParStream.from_iterable([0, 1, 2, 3]).filter(lambda x: x % 2 == 0).collect() == (0, 2) -``` - -Simple functions can be passed in sequence to compose more complex filters -```python ->>> ParStream.from_iterable(range(20)).filter(divisible_by_3, divisible_by_5).collect() == (0, 15) -``` - - -### `ParStream.from_iterable` -```python -ParStream.from_iterable(it: 'Iterable') -> 'Self' -``` -This is the recommended way of creating a `ParStream` object. - -```python ->>> ParStream.from_iterable([0, 1, 2, 3]).collect() == (0, 1, 2, 3) -``` - - -### `ParStream.map` -```python -ParStream.map(self, *fns: 'Callable[[T], U]') -> 'Self' +>>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one) +>>> stream.par_collect(workers=-1) == (1, 2, 3, 4) ``` -Map functions to the elements in the `ParStream` in parallel. Will return a new `ParStream` with the modified sequence. +For smaller I/O bound tasks use the `use_threads` flag as True. +If False the processing will use `ProcessPoolExecutor` else it will use `ThreadPoolExecutor`. ```python ->>> ParStream.from_iterable([0, 1, 2, 3]).map(add_one, add_one).collect() == (2, 3, 4, 5) +>>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one) +>>> stream.par_collect(use_threads=True) == (1, 2, 3, 4) ``` -### `ParStream.partition` +### `Stream.partition` ```python -ParStream.partition(self, _fn: 'Callable[[T], bool]') -> 'tuple[Self, Self]' +Stream.partition(self, fn: 'Callable[[T], bool]', *, workers: 'int' = 1, use_threads: 'bool' = False) -> 'tuple[Self, Self]' ``` -Partition isn't implemented for `ParStream`. Convert to `Stream` with the `to_stream()` method and then call partition. +Similar to `filter` except splits the True and False values. Will return a two new `Stream` with the partitioned sequences. -### `ParStream.to_stream` +Each partition is independently replayable. ```python -ParStream.to_stream(self) -> 'Stream' +>>> part1, part2 = Stream.from_iterable([0, 1, 2, 3]).partition(lambda x: x % 2 == 0) +>>> part1.collect() == (0, 2) +>>> part2.collect() == (1, 3) ``` -Convert `ParStream` to `Stream`. This will incur a `collect`. +As `partition` triggers an action, the parameters will be forwarded to the `collect` call if the `workers` are greater than 1. ```python ->>> ParStream.from_iterable([0, 1, 2, 3]).to_stream().map(some_memory_hungry_task).collect() == (1, 2, 3, 4) +>>> Stream.from_iterable(range(10)).map(add_one, add_one).partition(divisible_by_3, workers=4) +>>> part1.map(add_one).par_collect() == (4, 7, 10) +>>> part2.collect() == (2, 4, 5, 7, 8, 10, 11) ``` diff --git a/dev_tools/update_readme.py b/dev_tools/update_readme.py index 492449d..c442e9b 100644 --- a/dev_tools/update_readme.py +++ b/dev_tools/update_readme.py @@ -8,7 +8,6 @@ from danom import ( Err, Ok, - ParStream, Stream, all_of, any_of, @@ -35,7 +34,7 @@ def to_readme(self) -> str: def create_readme_lines() -> str: readme_lines = [] - for ent in [Ok, Err, Stream, ParStream]: + for ent in [Ok, Err, Stream]: readme_lines.append(f"## {ent.__name__}") readme_lines.append(ent.__doc__) readme_docs = [ diff --git a/src/danom/__init__.py b/src/danom/__init__.py index 9ee081a..35819e3 100644 --- a/src/danom/__init__.py +++ b/src/danom/__init__.py @@ -3,13 +3,12 @@ from danom._ok import Ok from danom._result import Result from danom._safe import safe, safe_method -from danom._stream import ParStream, Stream +from danom._stream import Stream from danom._utils import all_of, any_of, compose, identity, invert __all__ = [ "Err", "Ok", - "ParStream", "Result", "Stream", "all_of", diff --git a/src/danom/_stream.py b/src/danom/_stream.py index 03f065d..12ab713 100644 --- a/src/danom/_stream.py +++ b/src/danom/_stream.py @@ -24,6 +24,9 @@ def filter[T](self, *fns: Callable[[T], bool]) -> Self: ... @abstractmethod def partition[T](self, fn: Callable[[T], bool]) -> tuple[Self, Self]: ... + @abstractmethod + def collect(self) -> tuple: ... + @attrs.define(frozen=True) class Stream(_BaseStream): @@ -41,16 +44,6 @@ def from_iterable(cls, it: Iterable) -> Self: it = [it] return cls(seq=iter(it)) - def to_par_stream(self) -> ParStream: - """Convert `Stream` to `ParStream`. This will incur a `collect`. - - ```python - >>> Stream.from_iterable([0, 1, 2, 3]).to_par_stream().map(some_expensive_cpu_task).collect() == (1, 2, 3, 4) - - ``` - """ - return ParStream.from_iterable(self.collect()) - def map[T, U](self, *fns: Callable[[T], U]) -> Self: """Map a function to the elements in the `Stream`. Will return a new `Stream` with the modified sequence. @@ -92,7 +85,9 @@ def filter[T](self, *fns: Callable[[T], bool]) -> Self: plan = (*self.ops, *tuple((_OpType.FILTER, fn) for fn in fns)) return Stream(seq=self.seq, ops=plan) - def partition[T](self, fn: Callable[[T], bool]) -> tuple[Self, Self]: + def partition[T]( + self, fn: Callable[[T], bool], *, workers: int = 1, use_threads: bool = False + ) -> tuple[Self, Self]: """Similar to `filter` except splits the True and False values. Will return a two new `Stream` with the partitioned sequences. Each partition is independently replayable. @@ -101,9 +96,19 @@ def partition[T](self, fn: Callable[[T], bool]) -> tuple[Self, Self]: >>> part1.collect() == (0, 2) >>> part2.collect() == (1, 3) ``` + + As `partition` triggers an action, the parameters will be forwarded to the `collect` call if the `workers` are greater than 1. + ```python + >>> Stream.from_iterable(range(10)).map(add_one, add_one).partition(divisible_by_3, workers=4) + >>> part1.map(add_one).par_collect() == (4, 7, 10) + >>> part2.collect() == (2, 4, 5, 7, 8, 10, 11) + ``` """ # have to materialise to be able to replay each side independently - seq_tuple = self.collect() + if workers > 1: + seq_tuple = self.par_collect(workers=workers, use_threads=use_threads) + else: + seq_tuple = self.collect() return ( Stream(seq=(x for x in seq_tuple if fn(x))), Stream(seq=(x for x in seq_tuple if not fn(x))), @@ -121,82 +126,26 @@ def collect(self) -> tuple: elem for x in self.seq if (elem := _apply_fns(x, self.ops)) != _Nothing.NOTHING ) - -@attrs.define(frozen=True) -class ParStream(_BaseStream): - """A parallel iterator with functional operations.""" - - @classmethod - def from_iterable(cls, it: Iterable) -> Self: - """This is the recommended way of creating a `ParStream` object. - - ```python - >>> ParStream.from_iterable([0, 1, 2, 3]).collect() == (0, 1, 2, 3) - ``` - """ - if not isinstance(it, Iterable): - it = [it] - return cls(it) - - def to_stream(self) -> Stream: - """Convert `ParStream` to `Stream`. This will incur a `collect`. - - ```python - >>> ParStream.from_iterable([0, 1, 2, 3]).to_stream().map(some_memory_hungry_task).collect() == (1, 2, 3, 4) - ``` - """ - return Stream.from_iterable(self.collect()) - - def map[T, U](self, *fns: Callable[[T], U]) -> Self: - """Map functions to the elements in the `ParStream` in parallel. Will return a new `ParStream` with the modified sequence. - - ```python - >>> ParStream.from_iterable([0, 1, 2, 3]).map(add_one, add_one).collect() == (2, 3, 4, 5) - ``` - """ - plan = (*self.ops, *tuple((_OpType.MAP, fn) for fn in fns)) - return ParStream(self.seq, ops=plan) - - def filter[T](self, *fns: Callable[[T], bool]) -> Self: - """Filter the par stream based on a predicate. Will return a new `ParStream` with the modified sequence. - - ```python - >>> ParStream.from_iterable([0, 1, 2, 3]).filter(lambda x: x % 2 == 0).collect() == (0, 2) - ``` - - Simple functions can be passed in sequence to compose more complex filters - ```python - >>> ParStream.from_iterable(range(20)).filter(divisible_by_3, divisible_by_5).collect() == (0, 15) - ``` - """ - plan = (*self.ops, *tuple((_OpType.FILTER, fn) for fn in fns)) - return ParStream(self.seq, ops=plan) - - def partition[T](self, _fn: Callable[[T], bool]) -> tuple[Self, Self]: - """Partition isn't implemented for `ParStream`. Convert to `Stream` with the `to_stream()` method and then call partition.""" - raise NotImplementedError( - "`partition` is not implemented for `ParStream`. Convert to `Stream` with the `to_stream()` method." - ) - - def collect(self, workers: int = 4, *, use_threads: bool = False) -> tuple: - """Materialise the sequence from the `ParStream`. + def par_collect(self, workers: int = 4, *, use_threads: bool = False) -> tuple: + """Materialise the sequence from the `Stream` in parallel. ```python - >>> stream = ParStream.from_iterable([0, 1, 2, 3]).map(add_one) - >>> stream.collect() == (1, 2, 3, 4) + >>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one) + >>> stream.par_collect() == (1, 2, 3, 4) ``` Use the `workers` arg to select the number of workers to use. Use `-1` to use all available processors (except 1). Defaults to `4`. ```python - >>> stream = ParStream.from_iterable([0, 1, 2, 3]).map(add_one) - >>> stream.collect(workers=-1) == (1, 2, 3, 4) + >>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one) + >>> stream.par_collect(workers=-1) == (1, 2, 3, 4) ``` - For smaller I/O bound tasks use the `use_threads` flag as True + For smaller I/O bound tasks use the `use_threads` flag as True. + If False the processing will use `ProcessPoolExecutor` else it will use `ThreadPoolExecutor`. ```python - >>> stream = ParStream.from_iterable([0, 1, 2, 3]).map(add_one) - >>> stream.collect(use_threads=True) == (1, 2, 3, 4) + >>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one) + >>> stream.par_collect(use_threads=True) == (1, 2, 3, 4) ``` """ if workers == -1: diff --git a/tests/test_stream.py b/tests/test_stream.py index 312f383..8a5f82a 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -1,6 +1,6 @@ import pytest -from src.danom import ParStream, Stream +from src.danom import Stream from tests.conftest import add_one, divisible_by_3, divisible_by_5 @@ -34,29 +34,19 @@ def test_stream_with_multiple_fns(): pytest.param(13, -1, (15,)), ], ) -def test_par_stream(it, n_workers, expected_result): +def test_par_collect(it, n_workers, expected_result): assert ( - ParStream.from_iterable(it) + Stream.from_iterable(it) .map(add_one, add_one) .filter(divisible_by_3, divisible_by_5) - .collect(workers=n_workers) + .par_collect(workers=n_workers) == expected_result ) def test_stream_to_par_stream(): part1, part2 = ( - Stream.from_iterable(range(10)) - .map(add_one) - .to_par_stream() - .map(add_one) - .to_stream() - .partition(divisible_by_3) + Stream.from_iterable(range(10)).map(add_one, add_one).partition(divisible_by_3, workers=4) ) - assert part1.to_par_stream().map(add_one).collect() == (4, 7, 10) + assert part1.map(add_one).collect() == (4, 7, 10) assert part2.collect() == (2, 4, 5, 7, 8, 10, 11) - - -def test_par_stream_partition(): - with pytest.raises(NotImplementedError): - ParStream.from_iterable(range(10)).partition(divisible_by_3) From 1f55c458139e127ac2b61c22fb9ac11059766b0d Mon Sep 17 00:00:00 2001 From: ed cuss Date: Mon, 8 Dec 2025 21:05:20 +0000 Subject: [PATCH 9/9] bump: => 0.5.0 --- pyproject.toml | 2 +- uv.lock | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 3cd93d2..4445401 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "danom" -version = "0.4.0" +version = "0.5.0" description = "Functional streams and monads" readme = "README.md" license = "MIT" diff --git a/uv.lock b/uv.lock index a4136c5..e49466f 100644 --- a/uv.lock +++ b/uv.lock @@ -189,7 +189,7 @@ wheels = [ [[package]] name = "danom" -version = "0.4.0" +version = "0.5.0" source = { editable = "." } dependencies = [ { name = "attrs" },