diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index afcc130..a5a1ad2 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -18,7 +18,7 @@ repos:
types_or: [ python, pyi, jupyter ]
- id: pytest
name: pytest
- entry: uv run pytest -vv --cov=src --cov-report term-missing:skip-covered
+ entry: uv run pytest -vv --cov=src --cov-report term-missing:skip-covered --cov-report=json:coverage.json
language: python
pass_filenames: false
always_run: true
@@ -44,4 +44,10 @@ repos:
entry: uv run -m dev_tools.update_readme
language: python
pass_filenames: false
+ always_run: true
+ - id: update-cov
+ name: update_cov
+ entry: uv run -m dev_tools.update_cov
+ language: python
+ pass_filenames: false
always_run: true
\ No newline at end of file
diff --git a/README.md b/README.md
index 330513a..a09ffbc 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
# danom
-[](https://pepy.tech/projects/danom)
+[](https://pepy.tech/projects/danom) 
# API Reference
@@ -178,113 +178,50 @@ Simple functions can be passed in sequence to compose more complex transformatio
```
-### `Stream.partition`
-```python
-Stream.partition(self, fn: 'Callable[[T], bool]') -> 'tuple[Self, Self]'
-```
-Similar to `filter` except splits the True and False values. Will return a two new `Stream` with the partitioned sequences.
-
-Each partition is independently replayable.
+### `Stream.par_collect`
```python
->>> part1, part2 = Stream.from_iterable([0, 1, 2, 3]).partition(lambda x: x % 2 == 0)
->>> part1.collect() == (0, 2)
->>> part2.collect() == (1, 3)
+Stream.par_collect(self, workers: 'int' = 4, *, use_threads: 'bool' = False) -> 'tuple'
```
+Materialise the sequence from the `Stream` in parallel.
-
-### `Stream.to_par_stream`
```python
-Stream.to_par_stream(self) -> 'ParStream'
-```
-Convert `Stream` to `ParStream`. This will incur a `collect`.
-
-```python
->>> Stream.from_iterable([0, 1, 2, 3]).to_par_stream().map(some_expensive_cpu_task).collect() == (1, 2, 3, 4)
-
-```
-
-
-## ParStream
-
-A parallel iterator with functional operations.
-
-### `ParStream.collect`
-```python
-ParStream.collect(self, workers: 'int' = 4, *, use_threads: 'bool' = False) -> 'tuple'
-```
-Materialise the sequence from the `ParStream`.
-
-```python
->>> stream = ParStream.from_iterable([0, 1, 2, 3]).map(add_one)
->>> stream.collect() == (1, 2, 3, 4)
+>>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
+>>> stream.par_collect() == (1, 2, 3, 4)
```
Use the `workers` arg to select the number of workers to use. Use `-1` to use all available processors (except 1).
Defaults to `4`.
```python
->>> stream = ParStream.from_iterable([0, 1, 2, 3]).map(add_one)
->>> stream.collect(workers=-1) == (1, 2, 3, 4)
-```
-
-For smaller I/O bound tasks use the `use_threads` flag as True
-```python
->>> stream = ParStream.from_iterable([0, 1, 2, 3]).map(add_one)
->>> stream.collect(use_threads=True) == (1, 2, 3, 4)
-```
-
-
-### `ParStream.filter`
-```python
-ParStream.filter(self, *fns: 'Callable[[T], bool]') -> 'Self'
-```
-Filter the par stream based on a predicate. Will return a new `ParStream` with the modified sequence.
-
-```python
->>> ParStream.from_iterable([0, 1, 2, 3]).filter(lambda x: x % 2 == 0).collect() == (0, 2)
-```
-
-Simple functions can be passed in sequence to compose more complex filters
-```python
->>> ParStream.from_iterable(range(20)).filter(divisible_by_3, divisible_by_5).collect() == (0, 15)
-```
-
-
-### `ParStream.from_iterable`
-```python
-ParStream.from_iterable(it: 'Iterable') -> 'Self'
-```
-This is the recommended way of creating a `ParStream` object.
-
-```python
->>> ParStream.from_iterable([0, 1, 2, 3]).collect() == (0, 1, 2, 3)
-```
-
-
-### `ParStream.map`
-```python
-ParStream.map(self, *fns: 'Callable[[T], U]') -> 'Self'
+>>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
+>>> stream.par_collect(workers=-1) == (1, 2, 3, 4)
```
-Map functions to the elements in the `ParStream` in parallel. Will return a new `ParStream` with the modified sequence.
+For smaller I/O bound tasks use the `use_threads` flag as True.
+If False the processing will use `ProcessPoolExecutor` else it will use `ThreadPoolExecutor`.
```python
->>> ParStream.from_iterable([0, 1, 2, 3]).map(add_one, add_one).collect() == (2, 3, 4, 5)
+>>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
+>>> stream.par_collect(use_threads=True) == (1, 2, 3, 4)
```
-### `ParStream.partition`
+### `Stream.partition`
```python
-ParStream.partition(self, _fn: 'Callable[[T], bool]') -> 'tuple[Self, Self]'
+Stream.partition(self, fn: 'Callable[[T], bool]', *, workers: 'int' = 1, use_threads: 'bool' = False) -> 'tuple[Self, Self]'
```
-Partition isn't implemented for `ParStream`. Convert to `Stream` with the `to_stream()` method and then call partition.
+Similar to `filter` except splits the True and False values. Will return a two new `Stream` with the partitioned sequences.
-### `ParStream.to_stream`
+Each partition is independently replayable.
```python
-ParStream.to_stream(self) -> 'Stream'
+>>> part1, part2 = Stream.from_iterable([0, 1, 2, 3]).partition(lambda x: x % 2 == 0)
+>>> part1.collect() == (0, 2)
+>>> part2.collect() == (1, 3)
```
-Convert `ParStream` to `Stream`. This will incur a `collect`.
+As `partition` triggers an action, the parameters will be forwarded to the `collect` call if the `workers` are greater than 1.
```python
->>> ParStream.from_iterable([0, 1, 2, 3]).to_stream().map(some_memory_hungry_task).collect() == (1, 2, 3, 4)
+>>> Stream.from_iterable(range(10)).map(add_one, add_one).partition(divisible_by_3, workers=4)
+>>> part1.map(add_one).par_collect() == (4, 7, 10)
+>>> part2.collect() == (2, 4, 5, 7, 8, 10, 11)
```
@@ -330,7 +267,7 @@ The same as `safe` except it forwards on the `self` of the class instance to the
### `compose`
```python
-compose(*fns: 'Callable[[T], U]') -> 'Callable[[T], U]'
+compose(*fns: collections.abc.Callable[[T], U]) -> collections.abc.Callable[[T], U]
```
Compose multiple functions into one.
@@ -347,6 +284,34 @@ The functions will be called in sequence with the result of one being used as th
```
+## all_of
+
+### `all_of`
+```python
+all_of(*fns: collections.abc.Callable[[T], bool]) -> collections.abc.Callable[[T], bool]
+```
+True if all of the given functions return True.
+
+```python
+>>> is_valid_user = all_of(is_subscribed, is_active, has_2fa)
+>>> is_valid_user(user) == True
+```
+
+
+## any_of
+
+### `any_of`
+```python
+any_of(*fns: collections.abc.Callable[[T], bool]) -> collections.abc.Callable[[T], bool]
+```
+True if any of the given functions return True.
+
+```python
+>>> is_eligible = any_of(has_coupon, is_vip, is_staff)
+>>> is_eligible(user) == True
+```
+
+
## identity
### `identity`
@@ -419,6 +384,7 @@ Alternatively the map method can be used to return a new type instance with the
│ └── publish.yaml
├── dev_tools
│ ├── __init__.py
+│ ├── update_cov.py
│ └── update_readme.py
├── src
│ └── danom
@@ -432,6 +398,7 @@ Alternatively the map method can be used to return a new type instance with the
│ └── _utils.py
├── tests
│ ├── __init__.py
+│ ├── conftest.py
│ ├── test_api.py
│ ├── test_err.py
│ ├── test_new_type.py
diff --git a/coverage.svg b/coverage.svg
new file mode 100644
index 0000000..16c3628
--- /dev/null
+++ b/coverage.svg
@@ -0,0 +1,16 @@
+
\ No newline at end of file
diff --git a/dev_tools/update_cov.py b/dev_tools/update_cov.py
new file mode 100644
index 0000000..f6563e7
--- /dev/null
+++ b/dev_tools/update_cov.py
@@ -0,0 +1,58 @@
+import json
+import re
+import sys
+from pathlib import Path
+
+REPO_ROOT = Path(__file__).parents[1]
+
+BADGE_STR = """"""
+
+
+def update_cov_badge(root: str) -> int:
+ cov_report = json.loads(Path(f"{root}/coverage.json").read_text())
+ new_pct = cov_report["totals"]["percent_covered"]
+
+ curr_badge = Path(f"{root}/coverage.svg").read_text()
+ curr_pct = float(
+ re.findall(r'([0-9]+(?:\.[0-9]+)?)%', curr_badge)[0]
+ )
+
+ if new_pct == curr_pct:
+ return 0
+
+ Path(f"{root}/coverage.svg").write_text(make_badge(BADGE_STR, new_pct))
+ return 1
+
+
+def make_badge(badge_str: str, pct: int) -> str:
+ colour = (
+ "red"
+ if pct < 50 # noqa: PLR2004
+ else "orange"
+ if pct < 70 # noqa: PLR2004
+ else "yellow"
+ if pct < 80 # noqa: PLR2004
+ else "yellowgreen"
+ if pct < 90 # noqa: PLR2004
+ else "green"
+ )
+ return badge_str.format(colour=colour, pct=pct)
+
+
+if __name__ == "__main__":
+ sys.exit(update_cov_badge(REPO_ROOT))
diff --git a/dev_tools/update_readme.py b/dev_tools/update_readme.py
index 72373e6..c442e9b 100644
--- a/dev_tools/update_readme.py
+++ b/dev_tools/update_readme.py
@@ -5,7 +5,19 @@
import attrs
-from danom import Err, Ok, ParStream, Stream, compose, identity, invert, new_type, safe, safe_method
+from danom import (
+ Err,
+ Ok,
+ Stream,
+ all_of,
+ any_of,
+ compose,
+ identity,
+ invert,
+ new_type,
+ safe,
+ safe_method,
+)
@attrs.define(frozen=True)
@@ -22,7 +34,7 @@ def to_readme(self) -> str:
def create_readme_lines() -> str:
readme_lines = []
- for ent in [Ok, Err, Stream, ParStream]:
+ for ent in [Ok, Err, Stream]:
readme_lines.append(f"## {ent.__name__}")
readme_lines.append(ent.__doc__)
readme_docs = [
@@ -32,7 +44,7 @@ def create_readme_lines() -> str:
]
readme_lines.extend([entry.to_readme() for entry in readme_docs])
- for fn in [safe, safe_method, compose, identity, invert, new_type]:
+ for fn in [safe, safe_method, compose, all_of, any_of, identity, invert, new_type]:
readme_lines.append(f"## {fn.__name__}")
readme_docs = [ReadmeDoc(f"{fn.__name__}", inspect.signature(fn), fn.__doc__)]
readme_lines.extend([entry.to_readme() for entry in readme_docs])
diff --git a/pyproject.toml b/pyproject.toml
index 3cd93d2..4445401 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
[project]
name = "danom"
-version = "0.4.0"
+version = "0.5.0"
description = "Functional streams and monads"
readme = "README.md"
license = "MIT"
diff --git a/src/danom/__init__.py b/src/danom/__init__.py
index 3ec2f4d..35819e3 100644
--- a/src/danom/__init__.py
+++ b/src/danom/__init__.py
@@ -3,15 +3,16 @@
from danom._ok import Ok
from danom._result import Result
from danom._safe import safe, safe_method
-from danom._stream import ParStream, Stream, compose
-from danom._utils import identity, invert
+from danom._stream import Stream
+from danom._utils import all_of, any_of, compose, identity, invert
__all__ = [
"Err",
"Ok",
- "ParStream",
"Result",
"Stream",
+ "all_of",
+ "any_of",
"compose",
"identity",
"invert",
diff --git a/src/danom/_stream.py b/src/danom/_stream.py
index d94b96e..12ab713 100644
--- a/src/danom/_stream.py
+++ b/src/danom/_stream.py
@@ -2,7 +2,7 @@
import os
from abc import ABC, abstractmethod
-from collections.abc import Callable, Generator, Iterable
+from collections.abc import Callable, Iterable
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from enum import Enum, auto, unique
from typing import Self
@@ -12,6 +12,9 @@
@attrs.define(frozen=True)
class _BaseStream(ABC):
+ seq: Iterable = attrs.field(validator=attrs.validators.instance_of(Iterable), repr=False)
+ ops: tuple = attrs.field(default=(), validator=attrs.validators.instance_of(tuple), repr=False)
+
@abstractmethod
def map[T, U](self, *fns: Callable[[T], U]) -> Self: ...
@@ -21,15 +24,14 @@ def filter[T](self, *fns: Callable[[T], bool]) -> Self: ...
@abstractmethod
def partition[T](self, fn: Callable[[T], bool]) -> tuple[Self, Self]: ...
+ @abstractmethod
+ def collect(self) -> tuple: ...
+
@attrs.define(frozen=True)
class Stream(_BaseStream):
"""A lazy iterator with functional operations."""
- seq: Callable[[], Iterable] = attrs.field(
- validator=attrs.validators.instance_of(Callable), repr=False
- )
-
@classmethod
def from_iterable(cls, it: Iterable) -> Self:
"""This is the recommended way of creating a `Stream` object.
@@ -40,17 +42,7 @@ def from_iterable(cls, it: Iterable) -> Self:
"""
if not isinstance(it, Iterable):
it = [it]
- return cls(lambda: iter(it))
-
- def to_par_stream(self) -> ParStream:
- """Convert `Stream` to `ParStream`. This will incur a `collect`.
-
- ```python
- >>> Stream.from_iterable([0, 1, 2, 3]).to_par_stream().map(some_expensive_cpu_task).collect() == (1, 2, 3, 4)
-
- ```
- """
- return ParStream.from_iterable(self.collect())
+ return cls(seq=iter(it))
def map[T, U](self, *fns: Callable[[T], U]) -> Self:
"""Map a function to the elements in the `Stream`. Will return a new `Stream` with the modified sequence.
@@ -75,12 +67,8 @@ def map[T, U](self, *fns: Callable[[T], U]) -> Self:
>>> Stream.from_iterable(range(5)).map(mul_two, add_one).collect() == (1, 3, 5, 7, 9)
```
"""
-
- def generator() -> Generator[U, None, None]:
- for elem in self.seq():
- yield compose(*fns)(elem)
-
- return Stream(generator)
+ plan = (*self.ops, *tuple((_OpType.MAP, fn) for fn in fns))
+ return Stream(seq=self.seq, ops=plan)
def filter[T](self, *fns: Callable[[T], bool]) -> Self:
"""Filter the stream based on a predicate. Will return a new `Stream` with the modified sequence.
@@ -94,9 +82,12 @@ def filter[T](self, *fns: Callable[[T], bool]) -> Self:
>>> Stream.from_iterable(range(20)).filter(divisible_by_3, divisible_by_5).collect() == (0, 15)
```
"""
- return Stream(lambda: (x for x in self.seq() if all(fn(x) for fn in fns)))
+ plan = (*self.ops, *tuple((_OpType.FILTER, fn) for fn in fns))
+ return Stream(seq=self.seq, ops=plan)
- def partition[T](self, fn: Callable[[T], bool]) -> tuple[Self, Self]:
+ def partition[T](
+ self, fn: Callable[[T], bool], *, workers: int = 1, use_threads: bool = False
+ ) -> tuple[Self, Self]:
"""Similar to `filter` except splits the True and False values. Will return a two new `Stream` with the partitioned sequences.
Each partition is independently replayable.
@@ -105,12 +96,22 @@ def partition[T](self, fn: Callable[[T], bool]) -> tuple[Self, Self]:
>>> part1.collect() == (0, 2)
>>> part2.collect() == (1, 3)
```
+
+ As `partition` triggers an action, the parameters will be forwarded to the `collect` call if the `workers` are greater than 1.
+ ```python
+ >>> Stream.from_iterable(range(10)).map(add_one, add_one).partition(divisible_by_3, workers=4)
+ >>> part1.map(add_one).par_collect() == (4, 7, 10)
+ >>> part2.collect() == (2, 4, 5, 7, 8, 10, 11)
+ ```
"""
# have to materialise to be able to replay each side independently
- seq_tuple = self.collect()
+ if workers > 1:
+ seq_tuple = self.par_collect(workers=workers, use_threads=use_threads)
+ else:
+ seq_tuple = self.collect()
return (
- Stream(lambda: (x for x in seq_tuple if fn(x))),
- Stream(lambda: (x for x in seq_tuple if not fn(x))),
+ Stream(seq=(x for x in seq_tuple if fn(x))),
+ Stream(seq=(x for x in seq_tuple if not fn(x))),
)
def collect(self) -> tuple:
@@ -121,87 +122,30 @@ def collect(self) -> tuple:
>>> stream.collect() == (1, 2, 3, 4)
```
"""
- return tuple(self.seq())
-
-
-@attrs.define(frozen=True)
-class ParStream(_BaseStream):
- """A parallel iterator with functional operations."""
-
- seq: Iterable = attrs.field(validator=attrs.validators.instance_of(Iterable), repr=False)
- ops: tuple = attrs.field(default=(), validator=attrs.validators.instance_of(tuple), repr=False)
-
- @classmethod
- def from_iterable(cls, it: Iterable) -> Self:
- """This is the recommended way of creating a `ParStream` object.
-
- ```python
- >>> ParStream.from_iterable([0, 1, 2, 3]).collect() == (0, 1, 2, 3)
- ```
- """
- if not isinstance(it, Iterable):
- it = [it]
- return cls(it)
-
- def to_stream(self) -> Stream:
- """Convert `ParStream` to `Stream`. This will incur a `collect`.
-
- ```python
- >>> ParStream.from_iterable([0, 1, 2, 3]).to_stream().map(some_memory_hungry_task).collect() == (1, 2, 3, 4)
- ```
- """
- return Stream.from_iterable(self.collect())
-
- def map[T, U](self, *fns: Callable[[T], U]) -> Self:
- """Map functions to the elements in the `ParStream` in parallel. Will return a new `ParStream` with the modified sequence.
-
- ```python
- >>> ParStream.from_iterable([0, 1, 2, 3]).map(add_one, add_one).collect() == (2, 3, 4, 5)
- ```
- """
- plan = (*self.ops, *tuple((_OpType.MAP, fn) for fn in fns))
- return ParStream(self.seq, ops=plan)
-
- def filter[T](self, *fns: Callable[[T], bool]) -> Self:
- """Filter the par stream based on a predicate. Will return a new `ParStream` with the modified sequence.
-
- ```python
- >>> ParStream.from_iterable([0, 1, 2, 3]).filter(lambda x: x % 2 == 0).collect() == (0, 2)
- ```
-
- Simple functions can be passed in sequence to compose more complex filters
- ```python
- >>> ParStream.from_iterable(range(20)).filter(divisible_by_3, divisible_by_5).collect() == (0, 15)
- ```
- """
- plan = (*self.ops, *tuple((_OpType.FILTER, fn) for fn in fns))
- return ParStream(self.seq, ops=plan)
-
- def partition[T](self, _fn: Callable[[T], bool]) -> tuple[Self, Self]:
- """Partition isn't implemented for `ParStream`. Convert to `Stream` with the `to_stream()` method and then call partition."""
- raise NotImplementedError(
- "`partition` is not implemented for `ParStream`. Convert to `Stream` with the `to_stream()` method."
+ return tuple(
+ elem for x in self.seq if (elem := _apply_fns(x, self.ops)) != _Nothing.NOTHING
)
- def collect(self, workers: int = 4, *, use_threads: bool = False) -> tuple:
- """Materialise the sequence from the `ParStream`.
+ def par_collect(self, workers: int = 4, *, use_threads: bool = False) -> tuple:
+ """Materialise the sequence from the `Stream` in parallel.
```python
- >>> stream = ParStream.from_iterable([0, 1, 2, 3]).map(add_one)
- >>> stream.collect() == (1, 2, 3, 4)
+ >>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
+ >>> stream.par_collect() == (1, 2, 3, 4)
```
Use the `workers` arg to select the number of workers to use. Use `-1` to use all available processors (except 1).
Defaults to `4`.
```python
- >>> stream = ParStream.from_iterable([0, 1, 2, 3]).map(add_one)
- >>> stream.collect(workers=-1) == (1, 2, 3, 4)
+ >>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
+ >>> stream.par_collect(workers=-1) == (1, 2, 3, 4)
```
- For smaller I/O bound tasks use the `use_threads` flag as True
+ For smaller I/O bound tasks use the `use_threads` flag as True.
+ If False the processing will use `ProcessPoolExecutor` else it will use `ThreadPoolExecutor`.
```python
- >>> stream = ParStream.from_iterable([0, 1, 2, 3]).map(add_one)
- >>> stream.collect(use_threads=True) == (1, 2, 3, 4)
+ >>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
+ >>> stream.par_collect(use_threads=True) == (1, 2, 3, 4)
```
"""
if workers == -1:
@@ -215,30 +159,6 @@ def collect(self, workers: int = 4, *, use_threads: bool = False) -> tuple:
return tuple(elem for elem in res if elem != _Nothing.NOTHING)
-def compose[T, U](*fns: Callable[[T], U]) -> Callable[[T], U]:
- """Compose multiple functions into one.
-
- The functions will be called in sequence with the result of one being used as the input for the next.
-
- ```python
- >>> add_two = compose(add_one, add_one)
- >>> add_two(0) == 2
- ```
-
- ```python
- >>> add_two = compose(add_one, add_one, is_even)
- >>> add_two(0) == True
- ```
- """
-
- def wrapper(value: T) -> U:
- for fn in fns:
- value = fn(value)
- return value
-
- return wrapper
-
-
@unique
class _OpType(Enum):
MAP = auto()
diff --git a/src/danom/_utils.py b/src/danom/_utils.py
index 835c88b..a6f99a3 100644
--- a/src/danom/_utils.py
+++ b/src/danom/_utils.py
@@ -1,8 +1,78 @@
-from collections.abc import Callable
+from collections.abc import Callable, Sequence
+from operator import not_
+
+import attrs
from danom._result import P
+@attrs.define(frozen=True, hash=True, eq=True)
+class _Compose[T, U]:
+ fns: Sequence[Callable[[T], U]]
+
+ def __call__(self, initial: T) -> U:
+ value = initial
+ for fn in self.fns:
+ value = fn(value)
+ return value
+
+
+def compose[T, U](*fns: Callable[[T], U]) -> Callable[[T], U]:
+ """Compose multiple functions into one.
+
+ The functions will be called in sequence with the result of one being used as the input for the next.
+
+ ```python
+ >>> add_two = compose(add_one, add_one)
+ >>> add_two(0) == 2
+ ```
+
+ ```python
+ >>> add_two = compose(add_one, add_one, is_even)
+ >>> add_two(0) == True
+ ```
+ """
+ return _Compose(fns)
+
+
+@attrs.define(frozen=True, hash=True, eq=True)
+class _AllOf[T, U]:
+ fns: Sequence[Callable[[T], U]]
+
+ def __call__(self, initial: T) -> U:
+ return all(fn(initial) for fn in self.fns)
+
+
+def all_of[T](*fns: Callable[[T], bool]) -> Callable[[T], bool]:
+ """True if all of the given functions return True.
+
+ ```python
+ >>> is_valid_user = all_of(is_subscribed, is_active, has_2fa)
+ >>> is_valid_user(user) == True
+ ```
+ """
+ return _AllOf(fns)
+
+
+@attrs.define(frozen=True, hash=True, eq=True)
+class _AnyOf[T, U]:
+ fns: Sequence[Callable[[T], U]]
+
+ def __call__(self, initial: T) -> U:
+ return any(fn(initial) for fn in self.fns)
+
+
+def any_of[T](*fns: Callable[[T], bool]) -> Callable[[T], bool]:
+ """True if any of the given functions return True.
+
+ ```python
+ >>> is_eligible = any_of(has_coupon, is_vip, is_staff)
+ >>> is_eligible(user) == True
+ ```
+ """
+ return _AnyOf(fns)
+
+
def identity[T](x: T) -> T:
"""Basic identity function.
@@ -23,8 +93,4 @@ def invert(func: Callable[[P], bool]) -> Callable[[P], bool]:
>>> invert(has_len)("") == True
```
"""
-
- def wrapper(*args: P.args, **kwargs: P.kwargs) -> bool:
- return not func(*args, **kwargs)
-
- return wrapper
+ return compose(func, not_)
diff --git a/tests/conftest.py b/tests/conftest.py
new file mode 100644
index 0000000..46086f8
--- /dev/null
+++ b/tests/conftest.py
@@ -0,0 +1,49 @@
+from typing import Any, Self
+
+from src.danom import safe, safe_method
+from src.danom._result import Result
+
+
+def has_len(value: str) -> bool:
+ return len(value) > 0
+
+
+def add_one(x: int) -> int:
+ return x + 1
+
+
+def divisible_by_3(x: int) -> bool:
+ return x % 3 == 0
+
+
+def divisible_by_5(x: int) -> bool:
+ return x % 5 == 0
+
+
+@safe
+def safe_add(a: int, b: int) -> Result[int, Exception]:
+ return a + b
+
+
+@safe
+def safe_raise_type_error(_a: Any) -> Result[None, Exception]: # noqa: ANN401
+ raise TypeError
+
+
+@safe
+def safe_get_error_type(exception: Exception) -> str:
+ return exception.__class__.__name__
+
+
+class Adder:
+ def __init__(self) -> None:
+ self.result = 0
+
+ @safe_method
+ def add(self, a: int, b: int) -> Self:
+ self.result += a + b
+ return self
+
+ @safe_method
+ def cls_raises(self, *_args: tuple, **_kwargs: dict) -> None:
+ raise ValueError
diff --git a/tests/test_new_type.py b/tests/test_new_type.py
index ff590bb..21bf000 100644
--- a/tests/test_new_type.py
+++ b/tests/test_new_type.py
@@ -3,10 +3,7 @@
import pytest
from src.danom import new_type
-
-
-def has_len(value: str) -> bool:
- return len(value) > 0
+from tests.conftest import has_len
@pytest.mark.parametrize(
diff --git a/tests/test_safe.py b/tests/test_safe.py
index 2147446..913fbf2 100644
--- a/tests/test_safe.py
+++ b/tests/test_safe.py
@@ -1,40 +1,23 @@
from functools import partial
-from typing import Any, Self
import pytest
-from src.danom import safe, safe_method
-from src.danom._result import Result
-
-
-@safe
-def mock_func(a: int, b: int) -> Result[int, Exception]:
- return a + b
-
-
-@safe
-def mock_func_raises(_a: Any) -> Result[None, Exception]:
- raise TypeError
-
-
-@safe
-def mock_get_error_type(exception: Exception) -> str:
- return exception.__class__.__name__
+from tests.conftest import Adder, safe_add, safe_get_error_type, safe_raise_type_error
def test_valid_safe_pipeline():
pipeline = (
- mock_func(1, 2)
- .and_then(mock_func, b=1)
- .and_then(mock_func, b=1)
- .match(partial(mock_func, b=1), mock_get_error_type)
+ safe_add(1, 2)
+ .and_then(safe_add, b=1)
+ .and_then(safe_add, b=1)
+ .match(partial(safe_add, b=1), safe_get_error_type)
)
assert pipeline.is_ok()
assert pipeline.unwrap() == 6
def test_invalid_safe_pipeline():
- pipeline = mock_func(1, 2).and_then(mock_func_raises).and_then(mock_func, b=1)
+ pipeline = safe_add(1, 2).and_then(safe_raise_type_error).and_then(safe_add, b=1)
assert not pipeline.is_ok()
with pytest.raises(TypeError):
@@ -43,29 +26,15 @@ def test_invalid_safe_pipeline():
def test_invalid_safe_pipeline_with_match():
pipeline = (
- mock_func(1, 2)
- .and_then(mock_func_raises)
- .and_then(mock_func, b=1)
- .match(partial(mock_func, b=1), mock_get_error_type)
+ safe_add(1, 2)
+ .and_then(safe_raise_type_error)
+ .and_then(safe_add, b=1)
+ .match(partial(safe_add, b=1), safe_get_error_type)
)
assert pipeline.is_ok()
assert pipeline.unwrap() == "TypeError"
-class Adder:
- def __init__(self):
- self.result = 0
-
- @safe_method
- def add(self, a: int, b: int) -> Self:
- self.result += a + b
- return self
-
- @safe_method
- def cls_raises(self, *args, **kwargs) -> None:
- raise ValueError
-
-
def test_valid_safe_method_pipeline():
cls = Adder()
res = cls.add(2, 2).add(2, 2).add(2, 2)
diff --git a/tests/test_stream.py b/tests/test_stream.py
index 3ff7207..8a5f82a 100644
--- a/tests/test_stream.py
+++ b/tests/test_stream.py
@@ -1,6 +1,7 @@
import pytest
-from src.danom import ParStream, Stream, compose
+from src.danom import Stream
+from tests.conftest import add_one, divisible_by_3, divisible_by_5
@pytest.mark.parametrize(
@@ -26,18 +27,6 @@ def test_stream_with_multiple_fns():
) == (5,)
-def add_one(x: int) -> int:
- return x + 1
-
-
-def divisible_by_3(x: int) -> bool:
- return x % 3 == 0
-
-
-def divisible_by_5(x: int) -> bool:
- return x % 5 == 0
-
-
@pytest.mark.parametrize(
("it", "n_workers", "expected_result"),
[
@@ -45,40 +34,19 @@ def divisible_by_5(x: int) -> bool:
pytest.param(13, -1, (15,)),
],
)
-def test_par_stream(it, n_workers, expected_result):
+def test_par_collect(it, n_workers, expected_result):
assert (
- ParStream.from_iterable(it)
+ Stream.from_iterable(it)
.map(add_one, add_one)
.filter(divisible_by_3, divisible_by_5)
- .collect(workers=n_workers)
+ .par_collect(workers=n_workers)
== expected_result
)
def test_stream_to_par_stream():
part1, part2 = (
- Stream.from_iterable(range(10))
- .map(add_one)
- .to_par_stream()
- .map(add_one)
- .to_stream()
- .partition(divisible_by_3)
+ Stream.from_iterable(range(10)).map(add_one, add_one).partition(divisible_by_3, workers=4)
)
- assert part1.to_par_stream().map(add_one).collect() == (4, 7, 10)
+ assert part1.map(add_one).collect() == (4, 7, 10)
assert part2.collect() == (2, 4, 5, 7, 8, 10, 11)
-
-
-def test_par_stream_partition():
- with pytest.raises(NotImplementedError):
- ParStream.from_iterable(range(10)).partition(divisible_by_3)
-
-
-@pytest.mark.parametrize(
- ("inp_args", "fns", "expected_result"),
- [
- pytest.param(0, (add_one, add_one), 2),
- pytest.param(0, (add_one, divisible_by_3), False),
- ],
-)
-def test_compose(inp_args, fns, expected_result):
- assert compose(*fns)(inp_args) == expected_result
diff --git a/tests/test_utils.py b/tests/test_utils.py
index ec3e0ba..38baeee 100644
--- a/tests/test_utils.py
+++ b/tests/test_utils.py
@@ -1,14 +1,52 @@
import pytest
-from src.danom import Ok, identity, invert
+from src.danom import Ok, compose, identity, invert
+from src.danom._utils import all_of, any_of
+from tests.conftest import add_one, divisible_by_3, divisible_by_5, has_len
+
+
+@pytest.mark.parametrize(
+ ("inp_args", "fns", "expected_result"),
+ [
+ pytest.param(0, (add_one, add_one), 2),
+ pytest.param(0, (add_one, divisible_by_3), False),
+ ],
+)
+def test_compose(inp_args, fns, expected_result):
+ assert compose(*fns)(inp_args) == expected_result
+
+
+@pytest.mark.parametrize(
+ ("inp_args", "fns", "expected_result"),
+ [
+ pytest.param(3, (divisible_by_3, divisible_by_5), False),
+ pytest.param(5, (divisible_by_3, divisible_by_5), False),
+ pytest.param(15, (divisible_by_3, divisible_by_5), True),
+ ],
+)
+def test_all_of(inp_args, fns, expected_result):
+ assert all_of(*fns)(inp_args) == expected_result
+
+
+@pytest.mark.parametrize(
+ ("inp_args", "fns", "expected_result"),
+ [
+ pytest.param(3, (divisible_by_3, divisible_by_5), True),
+ pytest.param(5, (divisible_by_3, divisible_by_5), True),
+ pytest.param(15, (divisible_by_3, divisible_by_5), True),
+ pytest.param(7, (divisible_by_3, divisible_by_5), False),
+ ],
+)
+def test_any_of(inp_args, fns, expected_result):
+ assert any_of(*fns)(inp_args) == expected_result
@pytest.mark.parametrize(
"x",
[
- pytest.param(1, id="identity on a primative datatype (int)"),
- pytest.param("abc", id="identity on a primative datatype (str)"),
- pytest.param([0, 1, 2], id="identity on a primative datatype (list)"),
+ pytest.param(1, id="identity on a primitive datatype (int)"),
+ pytest.param("abc", id="identity on a primitive datatype (str)"),
+ pytest.param([0, 1, 2], id="identity on a primitive datatype (list)"),
pytest.param(Ok(1), id="identity on a more complex datatype"),
],
)
@@ -16,10 +54,6 @@ def test_identity(x):
assert identity(x) == x
-def has_len(inp_str: str) -> bool:
- return len(inp_str) > 0
-
-
@pytest.mark.parametrize(
("input_args", "fn", "expected_result"),
[
diff --git a/uv.lock b/uv.lock
index a4136c5..e49466f 100644
--- a/uv.lock
+++ b/uv.lock
@@ -189,7 +189,7 @@ wheels = [
[[package]]
name = "danom"
-version = "0.4.0"
+version = "0.5.0"
source = { editable = "." }
dependencies = [
{ name = "attrs" },