Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ repos:
types_or: [ python, pyi, jupyter ]
- id: pytest
name: pytest
entry: uv run pytest -vv --cov=src --cov-report term-missing:skip-covered
entry: uv run pytest -vv --cov=src --cov-report term-missing:skip-covered --cov-report=json:coverage.json
language: python
pass_filenames: false
always_run: true
Expand All @@ -44,4 +44,10 @@ repos:
entry: uv run -m dev_tools.update_readme
language: python
pass_filenames: false
always_run: true
- id: update-cov
name: update_cov
entry: uv run -m dev_tools.update_cov
language: python
pass_filenames: false
always_run: true
141 changes: 54 additions & 87 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# danom

[![PyPI Downloads](https://static.pepy.tech/personalized-badge/danom?period=total&units=INTERNATIONAL_SYSTEM&left_color=BLACK&right_color=BLUE&left_text=downloads)](https://pepy.tech/projects/danom)
[![PyPI Downloads](https://static.pepy.tech/personalized-badge/danom?period=total&units=INTERNATIONAL_SYSTEM&left_color=BLACK&right_color=BLUE&left_text=downloads)](https://pepy.tech/projects/danom) ![coverage](./coverage.svg)

# API Reference

Expand Down Expand Up @@ -178,113 +178,50 @@ Simple functions can be passed in sequence to compose more complex transformatio
```


### `Stream.partition`
```python
Stream.partition(self, fn: 'Callable[[T], bool]') -> 'tuple[Self, Self]'
```
Similar to `filter` except splits the True and False values. Will return a two new `Stream` with the partitioned sequences.

Each partition is independently replayable.
### `Stream.par_collect`
```python
>>> part1, part2 = Stream.from_iterable([0, 1, 2, 3]).partition(lambda x: x % 2 == 0)
>>> part1.collect() == (0, 2)
>>> part2.collect() == (1, 3)
Stream.par_collect(self, workers: 'int' = 4, *, use_threads: 'bool' = False) -> 'tuple'
```
Materialise the sequence from the `Stream` in parallel.


### `Stream.to_par_stream`
```python
Stream.to_par_stream(self) -> 'ParStream'
```
Convert `Stream` to `ParStream`. This will incur a `collect`.

```python
>>> Stream.from_iterable([0, 1, 2, 3]).to_par_stream().map(some_expensive_cpu_task).collect() == (1, 2, 3, 4)

```


## ParStream

A parallel iterator with functional operations.

### `ParStream.collect`
```python
ParStream.collect(self, workers: 'int' = 4, *, use_threads: 'bool' = False) -> 'tuple'
```
Materialise the sequence from the `ParStream`.

```python
>>> stream = ParStream.from_iterable([0, 1, 2, 3]).map(add_one)
>>> stream.collect() == (1, 2, 3, 4)
>>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
>>> stream.par_collect() == (1, 2, 3, 4)
```

Use the `workers` arg to select the number of workers to use. Use `-1` to use all available processors (except 1).
Defaults to `4`.
```python
>>> stream = ParStream.from_iterable([0, 1, 2, 3]).map(add_one)
>>> stream.collect(workers=-1) == (1, 2, 3, 4)
```

For smaller I/O bound tasks use the `use_threads` flag as True
```python
>>> stream = ParStream.from_iterable([0, 1, 2, 3]).map(add_one)
>>> stream.collect(use_threads=True) == (1, 2, 3, 4)
```


### `ParStream.filter`
```python
ParStream.filter(self, *fns: 'Callable[[T], bool]') -> 'Self'
```
Filter the par stream based on a predicate. Will return a new `ParStream` with the modified sequence.

```python
>>> ParStream.from_iterable([0, 1, 2, 3]).filter(lambda x: x % 2 == 0).collect() == (0, 2)
```

Simple functions can be passed in sequence to compose more complex filters
```python
>>> ParStream.from_iterable(range(20)).filter(divisible_by_3, divisible_by_5).collect() == (0, 15)
```


### `ParStream.from_iterable`
```python
ParStream.from_iterable(it: 'Iterable') -> 'Self'
```
This is the recommended way of creating a `ParStream` object.

```python
>>> ParStream.from_iterable([0, 1, 2, 3]).collect() == (0, 1, 2, 3)
```


### `ParStream.map`
```python
ParStream.map(self, *fns: 'Callable[[T], U]') -> 'Self'
>>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
>>> stream.par_collect(workers=-1) == (1, 2, 3, 4)
```
Map functions to the elements in the `ParStream` in parallel. Will return a new `ParStream` with the modified sequence.

For smaller I/O bound tasks use the `use_threads` flag as True.
If False the processing will use `ProcessPoolExecutor` else it will use `ThreadPoolExecutor`.
```python
>>> ParStream.from_iterable([0, 1, 2, 3]).map(add_one, add_one).collect() == (2, 3, 4, 5)
>>> stream = Stream.from_iterable([0, 1, 2, 3]).map(add_one)
>>> stream.par_collect(use_threads=True) == (1, 2, 3, 4)
```


### `ParStream.partition`
### `Stream.partition`
```python
ParStream.partition(self, _fn: 'Callable[[T], bool]') -> 'tuple[Self, Self]'
Stream.partition(self, fn: 'Callable[[T], bool]', *, workers: 'int' = 1, use_threads: 'bool' = False) -> 'tuple[Self, Self]'
```
Partition isn't implemented for `ParStream`. Convert to `Stream` with the `to_stream()` method and then call partition.
Similar to `filter` except splits the True and False values. Will return a two new `Stream` with the partitioned sequences.

### `ParStream.to_stream`
Each partition is independently replayable.
```python
ParStream.to_stream(self) -> 'Stream'
>>> part1, part2 = Stream.from_iterable([0, 1, 2, 3]).partition(lambda x: x % 2 == 0)
>>> part1.collect() == (0, 2)
>>> part2.collect() == (1, 3)
```
Convert `ParStream` to `Stream`. This will incur a `collect`.

As `partition` triggers an action, the parameters will be forwarded to the `collect` call if the `workers` are greater than 1.
```python
>>> ParStream.from_iterable([0, 1, 2, 3]).to_stream().map(some_memory_hungry_task).collect() == (1, 2, 3, 4)
>>> Stream.from_iterable(range(10)).map(add_one, add_one).partition(divisible_by_3, workers=4)
>>> part1.map(add_one).par_collect() == (4, 7, 10)
>>> part2.collect() == (2, 4, 5, 7, 8, 10, 11)
```


Expand Down Expand Up @@ -330,7 +267,7 @@ The same as `safe` except it forwards on the `self` of the class instance to the

### `compose`
```python
compose(*fns: 'Callable[[T], U]') -> 'Callable[[T], U]'
compose(*fns: collections.abc.Callable[[T], U]) -> collections.abc.Callable[[T], U]
```
Compose multiple functions into one.

Expand All @@ -347,6 +284,34 @@ The functions will be called in sequence with the result of one being used as th
```


## all_of

### `all_of`
```python
all_of(*fns: collections.abc.Callable[[T], bool]) -> collections.abc.Callable[[T], bool]
```
True if all of the given functions return True.

```python
>>> is_valid_user = all_of(is_subscribed, is_active, has_2fa)
>>> is_valid_user(user) == True
```


## any_of

### `any_of`
```python
any_of(*fns: collections.abc.Callable[[T], bool]) -> collections.abc.Callable[[T], bool]
```
True if any of the given functions return True.

```python
>>> is_eligible = any_of(has_coupon, is_vip, is_staff)
>>> is_eligible(user) == True
```


## identity

### `identity`
Expand Down Expand Up @@ -419,6 +384,7 @@ Alternatively the map method can be used to return a new type instance with the
│ └── publish.yaml
├── dev_tools
│ ├── __init__.py
│ ├── update_cov.py
│ └── update_readme.py
├── src
│ └── danom
Expand All @@ -432,6 +398,7 @@ Alternatively the map method can be used to return a new type instance with the
│ └── _utils.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_api.py
│ ├── test_err.py
│ ├── test_new_type.py
Expand Down
16 changes: 16 additions & 0 deletions coverage.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
58 changes: 58 additions & 0 deletions dev_tools/update_cov.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import json
import re
import sys
from pathlib import Path

REPO_ROOT = Path(__file__).parents[1]

BADGE_STR = """<svg xmlns="http://www.w3.org/2000/svg" width="120" height="20">
<linearGradient id="smooth" x2="0" y2="100%">
<stop offset="0" stop-color="#bbb" stop-opacity=".1"/>
<stop offset="1" stop-opacity=".1"/>
</linearGradient>
<rect rx="3" width="120" height="20" fill="#000"/>
<rect rx="3" x="60" width="60" height="20" fill="{colour}"/>
<path fill="{colour}" d="M60 0h4v20h-4z"/>
<rect rx="3" width="120" height="20" fill="url(#smooth)"/>
<g fill="#fff" text-anchor="middle"
font-family="DejaVu Sans,Verdana,Geneva,sans-serif"
font-size="11">
<text x="30" y="14">coverage</text>
<text x="90" y="14">{pct}%</text>
</g>
</svg>"""


def update_cov_badge(root: str) -> int:
cov_report = json.loads(Path(f"{root}/coverage.json").read_text())
new_pct = cov_report["totals"]["percent_covered"]

curr_badge = Path(f"{root}/coverage.svg").read_text()
curr_pct = float(
re.findall(r'<text x="90" y="14">([0-9]+(?:\.[0-9]+)?)%</text>', curr_badge)[0]
)

if new_pct == curr_pct:
return 0

Path(f"{root}/coverage.svg").write_text(make_badge(BADGE_STR, new_pct))
return 1


def make_badge(badge_str: str, pct: int) -> str:
colour = (
"red"
if pct < 50 # noqa: PLR2004
else "orange"
if pct < 70 # noqa: PLR2004
else "yellow"
if pct < 80 # noqa: PLR2004
else "yellowgreen"
if pct < 90 # noqa: PLR2004
else "green"
)
return badge_str.format(colour=colour, pct=pct)


if __name__ == "__main__":
sys.exit(update_cov_badge(REPO_ROOT))
18 changes: 15 additions & 3 deletions dev_tools/update_readme.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,19 @@

import attrs

from danom import Err, Ok, ParStream, Stream, compose, identity, invert, new_type, safe, safe_method
from danom import (
Err,
Ok,
Stream,
all_of,
any_of,
compose,
identity,
invert,
new_type,
safe,
safe_method,
)


@attrs.define(frozen=True)
Expand All @@ -22,7 +34,7 @@ def to_readme(self) -> str:
def create_readme_lines() -> str:
readme_lines = []

for ent in [Ok, Err, Stream, ParStream]:
for ent in [Ok, Err, Stream]:
readme_lines.append(f"## {ent.__name__}")
readme_lines.append(ent.__doc__)
readme_docs = [
Expand All @@ -32,7 +44,7 @@ def create_readme_lines() -> str:
]
readme_lines.extend([entry.to_readme() for entry in readme_docs])

for fn in [safe, safe_method, compose, identity, invert, new_type]:
for fn in [safe, safe_method, compose, all_of, any_of, identity, invert, new_type]:
readme_lines.append(f"## {fn.__name__}")
readme_docs = [ReadmeDoc(f"{fn.__name__}", inspect.signature(fn), fn.__doc__)]
readme_lines.extend([entry.to_readme() for entry in readme_docs])
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "danom"
version = "0.4.0"
version = "0.5.0"
description = "Functional streams and monads"
readme = "README.md"
license = "MIT"
Expand Down
7 changes: 4 additions & 3 deletions src/danom/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
from danom._ok import Ok
from danom._result import Result
from danom._safe import safe, safe_method
from danom._stream import ParStream, Stream, compose
from danom._utils import identity, invert
from danom._stream import Stream
from danom._utils import all_of, any_of, compose, identity, invert

__all__ = [
"Err",
"Ok",
"ParStream",
"Result",
"Stream",
"all_of",
"any_of",
"compose",
"identity",
"invert",
Expand Down
Loading