diff --git a/README.md b/README.md index c3bbead..ede672c 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@

tinyio

-

A tiny (~200 lines) event loop for Python

+

A tiny (~300 lines) event loop for Python

_Ever used `asyncio` and wished you hadn't?_ @@ -93,7 +93,7 @@ This gives every coroutine a chance to shut down gracefully. Debuggers like [`pa ### Batteries-included -We ship batteries-included with a collection of standard operations, all built on top of just the functionality you've already seen. +We ship batteries-included with the usual collection of standard operations.
Click to expand @@ -104,7 +104,6 @@ tinyio.Barrier tinyio.timeout tinyio.Event tinyio.TimeoutError tinyio.Lock ``` -None of these require special support from the event loop, they are all just simple implementations that you could have written yourself :) --- @@ -138,7 +137,13 @@ None of these require special support from the event loop, they are all just sim - `tinyio.Event()` - This has a method `.wait()`, which is a coroutine you can `yield` on. This will unblock once its `.set()` method is called (typically from another coroutine). It also has a `is_set()` method for checking whether it has been set. + This is a wrapper around a boolean flag, initialised with `False`. + This has the following methods: + + - `.is_set()`: check the value of the flag. + - `.set()`: set the flag to `True`. + - `.clear()`: set the flag to `False`. + - `.wait()`, which is a coroutine you can `yield` on. This will unblock if the internal flag is `True`. (Typically this is accomplished by calling `.set()` from another coroutine or from a thread.) --- @@ -166,7 +171,7 @@ None of these require special support from the event loop, they are all just sim - `tinyio.timeout(coro, timeout_in_seconds)` - This is a coroutine you can `yield` on, used as `output, success = yield tinyio.timeout(coro, timeout_in_seconds)`. + This is a coroutine you can `yield` on, used as `output, success = yield tinyio.timeout(coro, timeout_in_seconds)`. This runs `coro` for at most `timeout_in_seconds`. If it succeeds in that time then the pair `(output, True)` is returned . Else this will return `(None, False)`, and `coro` will be halted by raising `tinyio.TimeoutError` inside it. @@ -201,18 +206,21 @@ The reason is that `await` does not offer a suspension point to an event loop (i You can distinguish it from a normal Python function by putting `if False: yield` somewhere inside its body. Another common trick is to put a `yield` statement after the final `return` statement. Bit ugly but oh well.
-
-Any funny business to know around loops? -
- -The output of each coroutine is stored on the `Loop()` class. If you attempt to run a previously-ran coroutine in a new `Loop()` then they will be treated as just returning `None`, which is probably not what you want. -
-
vs asyncio or trio?.
I wasted a *lot* of time trying to get correct error propagation with `asyncio`, trying to reason whether my tasks would be cleaned up correctly or not (edge-triggered vs level-triggered etc etc). `trio` is excellent but still has a one-loop-per-thread rule, and doesn't propagate cancellations to/from threads. These points inspired me to try writing my own. -Nonetheless you'll definitely still want one of the above if you need anything fancy. If you don't, and you really really want simple error semantics, then maybe `tinyio` is for you instead. (In particular `trio` will be a better choice if you still need the event loop when cleaning up from errors; in contrast `tinyio` does not allow scheduling work back on the event loop at that time.) +`tinyio` has the following unique features, and as such may be the right choice if any of the following are must-haves for you: + +- the propagation of errors to/from threads; +- no one-loop-per-thread rule; +- simple+robust error semantics (crash the whole loop if anything goes wrong); +- tiny, hackable, codebase. + +However conversely, `tinyio` does not offer the ability to schedule work on the event loop whilst cleaning up from errors. + +If none of the bullet points are must-haves for you, or if needing the event loop during cleanup is a dealbreaker, then either `trio` or `asyncio` are likely to be better choices. :) +
diff --git a/pyproject.toml b/pyproject.toml index 9e10961..6618187 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ name = "tinyio" readme = "README.md" requires-python = ">=3.11" urls = {repository = "https://github.com/patrick-kidger/tinyio"} -version = "0.1.3" +version = "0.1.4" [project.optional-dependencies] dev = ["pre-commit", "pytest"] @@ -38,7 +38,7 @@ src = [] [tool.ruff.lint] fixable = ["I001", "F401", "UP"] -ignore = ["E402", "E721", "E731", "E741", "F722"] +ignore = ["E402", "E721", "E731", "E741", "F722", "UP038"] select = ["E", "F", "I001", "UP"] [tool.ruff.lint.flake8-import-conventions.extend-aliases] diff --git a/tests/test_background.py b/tests/test_background.py index 6fcac70..8d0ce9d 100644 --- a/tests/test_background.py +++ b/tests/test_background.py @@ -1,6 +1,43 @@ +import pytest import tinyio +def _sleep(x): + yield tinyio.sleep(x) + return x + + +def test_as_completed(): + def _run(): + iterator = tinyio.AsCompleted({_sleep(0.3), _sleep(0.1), _sleep(0.2)}) + outs = [] + while not iterator.done(): + x = yield iterator.get() + outs.append(x) + return outs + + loop = tinyio.Loop() + assert loop.run(_run()) == [0.1, 0.2, 0.3] + + +def test_as_completed_out_of_order(): + def _run(): + iterator = tinyio.AsCompleted({_sleep(0.3), _sleep(0.1), _sleep(0.2)}) + get1 = iterator.get() + get2 = iterator.get() + get3 = iterator.get() + with pytest.raises(RuntimeError, match="which is greater than the number of coroutines"): + iterator.get() + assert iterator.done() + out3 = yield get3 + out2 = yield get2 + out1 = yield get1 + return [out1, out2, out3] + + loop = tinyio.Loop() + assert loop.run(_run()) == [0.1, 0.2, 0.3] + + def _block(event1: tinyio.Event, event2: tinyio.Event, out): yield event1.wait() event2.set() @@ -19,6 +56,8 @@ def _test_done_callback(): assert len(out) == 0 event1.set() yield event3.wait() + for _ in range(20): + yield assert out == [2, 1] @@ -27,6 +66,31 @@ def test_done_callback(): loop.run(_test_done_callback()) +def test_yield_on_wrapped_coroutine(): + callbacked = False + event = tinyio.Event() + + def _callback(_): + nonlocal callbacked + callbacked = True + event.set() + + def _foo(): + yield + return 3 + + foo = _foo() + + def _bar(): + yield {tinyio.add_done_callback(foo, _callback)} + yield event.wait() + out = yield foo + return out + + loop = tinyio.Loop() + assert loop.run(_bar()) == 3 + + # To reinstate if we ever reintroduce error callbacks. diff --git a/tests/test_basic.py b/tests/test_basic.py deleted file mode 100644 index cb8a816..0000000 --- a/tests/test_basic.py +++ /dev/null @@ -1,208 +0,0 @@ -import time - -import pytest -import tinyio - - -def _add_one(x: int) -> tinyio.Coro[int]: - yield - return x + 1 - - -def _add_two(x: int) -> tinyio.Coro[int]: - y = yield _add_one(x) - z = yield _add_one(y) - return z - - -def test_basic(): - loop = tinyio.Loop() - assert loop.run(_add_two(4)) == 6 - - -def test_gather(): - def _gather(x: int): - return (yield [_add_one(x), _add_two(x)]) - - loop = tinyio.Loop() - assert loop.run(_gather(3)) == [4, 5] - - -def _multi_yield(): - foo = _add_one(x=3) - x = yield foo - y = yield foo - return x, y - - -def test_multi_yield(): - loop = tinyio.Loop() - assert loop.run(_multi_yield()) == (4, 4) - - -def test_diamond(): - def _diamond1(x: int) -> tinyio.Coro[int]: - y = _add_one(x) - a, b = yield [_diamond2(y, 1), _diamond2(y, 2)] - return a + b - - def _diamond2(y: tinyio.Coro[int], factor: int): - z = yield y - return z * factor - - loop = tinyio.Loop() - assert loop.run(_diamond1(2)) == 9 - - -def test_sleep(): - def _slow_add_one(x: int): - yield tinyio.sleep(0.1) - return x + 1 - - def _big_gather(x: int): - out = yield [_slow_add_one(x) for _ in range(100)] - return out - - loop = tinyio.Loop() - start = time.time() - out = loop.run(_big_gather(1)) - end = time.time() - assert out == [2 for _ in range(100)] - assert end - start < 0.5 - - -def test_multi_run(): - foo = _add_one(x=4) - - def _mul(): - out = yield foo - return out * 5 - - loop = tinyio.Loop() - assert loop.run(_mul()) == 25 - assert loop.run(_mul()) == 25 - assert loop.run(_mul()) == 25 - - -def test_waiting_on_already_finished(): - def f(): - yield - return 3 - - def g(coro): - yield h(coro) - yield [f(), coro] - - def h(coro): - yield coro - - foo = f() - loop = tinyio.Loop() - loop.run(g(foo)) - - -def test_cycle(): - def f(): - yield gg - - def g(): - yield ff - - ff = f() - gg = g() - - def h(): - yield [ff, gg] - - loop = tinyio.Loop() - with pytest.raises(RuntimeError, match="Cycle detected in `tinyio` loop"): - loop.run(h(), exception_group=False) - - -@pytest.mark.parametrize("wait_on_f", (False, True)) -def test_background(wait_on_f: bool): - val = False - done = False - - def f(): - while val is False: - yield - return 3 - - def g(): - nonlocal val - val = True - yield - - def h(): - nonlocal done - ff = f() - out = yield {ff} - assert out is None - yield g() - if wait_on_f: - out = yield ff - assert out == 3 - done = True - - loop = tinyio.Loop() - loop.run(h()) - assert done - - -@pytest.mark.parametrize("wait_on_f", (False, True)) -def test_background_already_waiting(wait_on_f: bool): - val = False - done = False - - def f(): - while val is False: - yield - return 3 - - ff = f() - - def g(): - nonlocal val - val = True - yield - - def h(): - nonlocal done - out = yield {ff} - assert out is None - yield g() - if wait_on_f: - out = yield ff - assert out == 3 - done = True - - def i(): - yield [ff, h()] - - loop = tinyio.Loop() - loop.run(i()) - assert done - - -def test_background_multiple_yields(): - done = False - - def f(): - yield - return 3 - - def g(): - nonlocal done - ff = f() - yield {ff} - yield {ff} - x = yield ff - y = yield ff - assert x == 3 - assert y == 3 - done = True - - loop = tinyio.Loop() - loop.run(g()) - assert done diff --git a/tests/test_core.py b/tests/test_core.py new file mode 100644 index 0000000..ba5af15 --- /dev/null +++ b/tests/test_core.py @@ -0,0 +1,505 @@ +import contextlib +import gc +import threading +import time +import weakref +from typing import Any + +import pytest +import tinyio + + +def _add_one(x: int) -> tinyio.Coro[int]: + yield + return x + 1 + + +def _add_two(x: int) -> tinyio.Coro[int]: + y = yield _add_one(x) + z = yield _add_one(y) + return z + + +def test_basic(): + loop = tinyio.Loop() + assert loop.run(_add_two(4)) == 6 + + +def test_gather(): + def _gather(x: int): + return (yield [_add_one(x), _add_two(x)]) + + loop = tinyio.Loop() + assert loop.run(_gather(3)) == [4, 5] + + +def test_empty_gather(): + def _gather(): + out = yield [] + return out + + loop = tinyio.Loop() + assert loop.run(_gather()) == [] + + +def test_multi_yield(): + def _multi_yield(): + foo = _add_one(x=3) + x = yield foo + y = yield foo + return x, y + + loop = tinyio.Loop() + assert loop.run(_multi_yield()) == (4, 4) + + +def test_simultaneous_yield(): + def _simultaneous_yield(): + foo = _add_one(x=3) + x, y = yield [foo, foo] + return x, y + + loop = tinyio.Loop() + assert loop.run(_simultaneous_yield()) == (4, 4) + + +def test_diamond(): + def _diamond1(x: int) -> tinyio.Coro[int]: + y = _add_one(x) + a, b = yield [_diamond2(y, 1), _diamond2(y, 2)] + return a + b + + def _diamond2(y: tinyio.Coro[int], factor: int): + z = yield y + return z * factor + + loop = tinyio.Loop() + assert loop.run(_diamond1(2)) == 9 + + +def test_sleep(): + def _slow_add_one(x: int): + yield tinyio.sleep(0.1) + return x + 1 + + def _big_gather(x: int): + out = yield [_slow_add_one(x) for _ in range(100)] + return out + + loop = tinyio.Loop() + start = time.time() + out = loop.run(_big_gather(1)) + end = time.time() + assert out == [2 for _ in range(100)] + assert end - start < 0.5 + + +def test_multi_run(): + foo = _add_one(x=4) + + def _mul(): + out = yield foo + return out * 5 + + loop = tinyio.Loop() + assert loop.run(_mul()) == 25 + assert loop.run(_mul()) == 25 + assert loop.run(_mul()) == 25 + + +def test_waiting_on_already_finished(): + def f(): + yield + return 3 + + def g(coro): + yield h(coro) + yield [f(), coro] + + def h(coro): + yield coro + + foo = f() + loop = tinyio.Loop() + loop.run(g(foo)) + + +def test_cycle(): + def f(): + yield gg + + def g(): + yield ff + + ff = f() + gg = g() + + def h(): + yield [ff, gg] + + loop = tinyio.Loop() + with pytest.raises(RuntimeError, match="Cycle detected in `tinyio` loop"): + loop.run(h(), exception_group=False) + + +@pytest.mark.parametrize("wait_on_f", (False, True)) +def test_background(wait_on_f: bool): + val = False + done = False + + def f(): + while val is False: + yield + return 3 + + def g(): + nonlocal val + val = True + yield + + def h(): + nonlocal done + ff = f() + out = yield {ff} + assert out is None + yield g() + if wait_on_f: + out = yield ff + assert out == 3 + done = True + + loop = tinyio.Loop() + loop.run(h()) + assert done + + +@pytest.mark.parametrize("wait_on_f", (False, True)) +def test_background_already_waiting(wait_on_f: bool): + val = False + done = False + + def f(): + while val is False: + yield + return 3 + + ff = f() + + def g(): + nonlocal val + val = True + yield + + def h(): + nonlocal done + out = yield {ff} + assert out is None + yield g() + if wait_on_f: + out = yield ff + assert out == 3 + done = True + + def i(): + yield [ff, h()] + + loop = tinyio.Loop() + loop.run(i()) + assert done + + +def test_empty_background(): + def _background(): + yield set() + return 3 + + loop = tinyio.Loop() + assert loop.run(_background()) == 3 + + +def test_background_multiple_yields(): + done = False + + def f(): + yield + return 3 + + def g(): + nonlocal done + ff = f() + yield {ff} + yield {ff} + x = yield ff + y = yield ff + assert x == 3 + assert y == 3 + done = True + + loop = tinyio.Loop() + loop.run(g()) + assert done + + +def test_no_yield_direct(): + def f(): + return 3 + yield + + loop = tinyio.Loop() + assert loop.run(f()) == 3 + + +def test_no_yield_indirect(): + def f(): + return 3 + yield + + def g(): + out = yield f() + return out + + loop = tinyio.Loop() + assert loop.run(g()) == 3 + + +def test_gc_simple(): + def _block_add_one(x): + return x + 1 + + def _foo(x): + return (yield tinyio.run_in_thread(_block_add_one, x)) + + def _gc(x: int) -> tinyio.Coro[tuple[int, int]]: + iterator = tinyio.AsCompleted({_foo(x), _add_one(x)}) + y = yield iterator.get() + z = yield iterator.get() + return y, z + + loop = tinyio.Loop() + coro = _gc(4) + assert loop.run(coro) == (5, 5) + gc.collect() + assert set(loop._results.keys()) == {coro} + + +@pytest.mark.parametrize("yield_from", (False, True)) +@pytest.mark.parametrize("timeout", (None, 10)) +def test_gc_after_event(yield_from, timeout): + """The interesting case here is `yield_from=True`, `timeout=10`. + (The others are just for completeness.) + + In this case we have that: + - `f1` has `timeout=10` but triggers immediately. + - `f2` has `timeout=2` but triggers at the end of our main coroutine. + And so we have that `f2` is before of `f1` in the internal heap of timeouts, but that `f1` will trigger first. + Thus when `f1` triggers it will remain in that heap even after it has triggered (until `f2` has triggered as well + and they can both be popped). + In this scenario, we don't want the generator object to remain in memory just because it's still sitting in that + heap! + This test checks that the generator can be cleaned up even whilst we wait for the `_Wait` object to get collected + later. + """ + + def wait(event, wait_time): + if yield_from: + yield from event.wait(wait_time) + else: + yield event.wait(wait_time) + + def set_event(event): + for _ in range(20): + yield + event.set() + + def baz(): + event1 = tinyio.Event() + event2 = tinyio.Event() + f1 = wait(event1, timeout) + f2 = wait(event2, 2) + ref = weakref.ref(f1) + yield {f2} + yield [f1, set_event(event1)] + del f1 + gc.collect() + assert ref() is None + event2.set() + return 3 + + loop = tinyio.Loop() + assert loop.run(baz()) == 3 + + +def test_event_fairness(): + """This checks that once one event unblocks, that we don't just keep chasing all the stuff downstream of that event, + i.e. that we do schedule work from any other event that has finished. + """ + outs = [] + + def f(): + yield tinyio.Event().wait(0) + outs.append(1) + for _ in range(20): + yield + outs.append(2) + + def g(): + yield [f(), f()] + + loop = tinyio.Loop() + loop.run(g()) + assert outs == [1, 1, 2, 2] + + +def test_event_fairness2(): + event1 = tinyio.Event() + outs = [] + + def f(): + yield event1.wait(0) + outs.append(1) + + def g(): + yield {f()} + for _ in range(20): + yield + outs.append(2) + + loop = tinyio.Loop() + loop.run(g()) + assert outs == [1, 2] + + +def test_simultaneous_set(): + event = tinyio.Event() + + def f(): + for _ in range(20): + yield + yield [tinyio.run_in_thread(event.set) for _ in range(100)] + + def g(): + yield event.wait() + + def h(): + yield [g(), f()] + + loop = tinyio.Loop() + loop.run(h()) + + +def test_timeout_then_set(): + event1 = tinyio.Event() + event2 = tinyio.Event() + + def f(): + yield [event1.wait(0), event2.wait()] + + def g(): + yield {f()} + for _ in range(20): + yield + event1.set() + for _ in range(20): + yield + event2.set() + return 3 + + loop = tinyio.Loop() + assert loop.run(g()) == 3 + + +def test_set_then_timeout(): + event1 = tinyio.Event() + event2 = tinyio.Event() + + def f(): + event1.set() + yield [event1.wait(0), event2.wait()] + + def g(): + yield {f()} + for _ in range(20): + yield + event2.set() + return 3 + + loop = tinyio.Loop() + assert loop.run(g()) == 3 + + +def test_set_then_timeout_then_clear(): + event1 = tinyio.Event() + event2 = tinyio.Event() + + def f(): + event1.set() + yield [event1.wait(0), event2.wait()] + + def g(): + yield {f()} + for _ in range(20): + yield + event1.clear() + event2.set() + return 3 + + loop = tinyio.Loop() + assert loop.run(g()) == 3 + + +def test_set_then_timeout_then_clear_then_set(): + event1 = tinyio.Event() + event2 = tinyio.Event() + + def f(): + event1.set() + yield [event1.wait(0), event2.wait()] + + def g(): + yield {f()} + for _ in range(20): + yield + event1.clear() + event2.set() + event1.set() + return 3 + + loop = tinyio.Loop() + assert loop.run(g()) == 3 + + +def test_timeout_as_part_of_group_and_only_coroutine(): + event1 = tinyio.Event() + event2 = tinyio.Event() + wait: Any = event1.wait(0) + wait2 = event2.wait() + + def f(): + yield [wait, wait2] + return 3 + + def set2(): + time.sleep(0.1) + event2.set() + + t = threading.Thread(target=set2) + t.start() + loop = tinyio.Loop() + assert loop.run(f()) == 3 + + +def test_yield_finished_coroutine(): + def f(): + yield + + ff = f() + next(ff) + with contextlib.suppress(StopIteration): + next(ff) + + def g(): + yield ff + + loop = tinyio.Loop() + with pytest.raises(RuntimeError, match="has already finished"): + loop.run(g()) diff --git a/tests/test_errors.py b/tests/test_errors.py index 921f701..1c90023 100644 --- a/tests/test_errors.py +++ b/tests/test_errors.py @@ -192,8 +192,8 @@ def two(): assert type(catcher.value) is BaseExceptionGroup [raising, cancelled] = catcher.value.exceptions assert type(raising) is RuntimeError and str(raising) == "Raising!" - assert type(cancelled) is tinyio.CancelledError assert _flat_tb(raising) == [workflow.__name__, "cancel", "cancel2"] + assert type(cancelled) is tinyio.CancelledError assert _flat_tb(cancelled) == ["target", "blocking_operation"] else: raising = catcher.value @@ -243,10 +243,10 @@ def baz(): assert type(value) is ValueError assert str(value) == "Responding improperly to cancellation" assert _flat_tb(value) == ["target", "blocking_operation"] - cancelled = value.__context__ - assert cancelled is value.__cause__ - assert type(cancelled) is tinyio.CancelledError - assert _flat_tb(cancelled) == ["blocking_operation", "sub_blocking_operation"] + cancelled_context = value.__context__ + assert cancelled_context is value.__cause__ + assert type(cancelled_context) is tinyio.CancelledError + assert _flat_tb(cancelled_context) == ["blocking_operation", "sub_blocking_operation"] @pytest.mark.parametrize("exception_group", (None, False, True)) @@ -264,7 +264,8 @@ def bar(): yield [baz(), tinyio.run_in_thread(blocking_operation)] def baz(): - yield + while True: + yield loop = tinyio.Loop() with pytest.raises(BaseException) as catcher: @@ -309,7 +310,8 @@ def bar(): yield [baz(), tinyio.run_in_thread(blocking_operation)] def baz(): - yield + while True: + yield loop = tinyio.Loop() with pytest.raises(BaseException) as catcher: diff --git a/tests/test_sync.py b/tests/test_sync.py index dc9d596..2f010c4 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -1,3 +1,5 @@ +import contextlib +import threading import time import pytest @@ -28,6 +30,26 @@ def _run(value): loop.run(_run(3)) +def test_semaphore_reuse(): + counter = 0 + + def _foo(coro): + nonlocal counter + context = yield coro + counter += 1 + with pytest.raises(RuntimeError, match="do not") if counter == 2 else contextlib.nullcontext(): + with context: + yield + + def _bar(): + semaphore = tinyio.Semaphore(2) + coro = semaphore() + yield [_foo(coro), _foo(coro)] + + loop = tinyio.Loop() + loop.run(_bar()) + + def test_lock(): counter = 0 @@ -98,3 +120,130 @@ def _bar(): assert done assert done2 assert event.is_set() + + +@pytest.mark.parametrize("is_set", (False, True)) +def test_event_only(is_set: bool): + event = tinyio.Event() + if is_set: + event.set() + + def foo(): + if not is_set: + t = threading.Timer(0.1, lambda: event.set()) + t.start() + yield event.wait() + + loop = tinyio.Loop() + loop.run(foo()) + + +@pytest.mark.parametrize("is_set", (False, True)) +def test_event_run(is_set: bool): + event = tinyio.Event() + if is_set: + event.set() + loop = tinyio.Loop() + if not is_set: + t = threading.Timer(0.1, lambda: event.set()) + t.start() + loop.run(event.wait()) + + +@pytest.mark.parametrize("is_set", (False, True)) +def test_event_simultaneous_wait(is_set: bool): + event = tinyio.Event() + if is_set: + event.set() + + def _foo(): + if not is_set: + t = threading.Timer(0.1, lambda: event.set()) + t.start() + yield [event.wait(), event.wait()] + + loop = tinyio.Loop() + loop.run(_foo()) + + +def test_event_clear_not_strict(): + """Test that even though we `clear()` the event after setting it, that both `foo()` still unblock.""" + event = tinyio.Event() + event.set() + event.clear() + assert not event.is_set() + + out = [] + + def foo(): + yield event.wait() + out.append(2) + + def bar(): + yield + out.append(1) + event.set() + for _ in range(20): + yield + event.clear() + out.append(3) + + def baz(): + yield [foo(), foo(), bar()] + + loop = tinyio.Loop() + loop.run(baz()) + assert out == [1, 2, 2, 3] + + +class _Semaphore: + def __init__(self, value): + self.value = value + self.event = tinyio.Event() + self.event.set() + + def __call__(self): + while True: + yield self.event.wait() + if self.event.is_set(): + break + assert self.value > 0 + self.value -= 1 + if self.value == 0: + self.event.clear() + return _closing(self) + + +@contextlib.contextmanager +def _closing(semaphore): + try: + yield + finally: + semaphore.value += 1 + semaphore.event.set() + + +def test_alternate_semaphore(): + """This test is useful as it makes use of `Event.clear()`.""" + + counter = 0 + + def _count(semaphore, i): + nonlocal counter + with (yield semaphore()): + counter += 1 + if counter > 2: + raise RuntimeError + yield + counter -= 1 + return i + + def _run(value): + semaphore = _Semaphore(value) + out = yield [_count(semaphore, i) for i in range(50)] + return out + + loop = tinyio.Loop() + assert loop.run(_run(2)) == list(range(50)) + with pytest.raises(RuntimeError): + loop.run(_run(3)) diff --git a/tests/test_thread.py b/tests/test_thread.py index 9fd1e61..36d6c22 100644 --- a/tests/test_thread.py +++ b/tests/test_thread.py @@ -26,14 +26,15 @@ def _big_gather(x: int): @pytest.mark.parametrize("with_map", (False, True)) def test_thread_pool(with_map: bool): counter = 0 + invalid_counter = False lock = threading.Lock() def _count(x, y): - nonlocal counter + nonlocal counter, invalid_counter with lock: counter += 1 time.sleep(0.01) - assert counter <= 2 + invalid_counter = invalid_counter | (counter > 2) with lock: counter -= 1 return x, y @@ -48,7 +49,24 @@ def _run(max_threads): loop = tinyio.Loop() assert loop.run(_run(2)) == [(i, i) for i in range(50)] + assert not invalid_counter + loop.run(_run(3)) + assert invalid_counter + + +def test_simultaneous_errors(): + def _raises(): + raise RuntimeError + + def _run(): + out = yield [tinyio.run_in_thread(_raises) for _ in range(10)] + return out + + loop = tinyio.Loop() with warnings.catch_warnings(): - warnings.simplefilter("ignore", RuntimeWarning) - with pytest.raises(BaseExceptionGroup): - loop.run(_run(3)) + warnings.simplefilter("error") + with pytest.raises(BaseExceptionGroup) as catcher: + loop.run(_run()) + assert len(catcher.value.exceptions) > 1 + for e in catcher.value.exceptions: + assert type(e) is RuntimeError diff --git a/tests/test_time.py b/tests/test_time.py index c01cfa8..9a5a11d 100644 --- a/tests/test_time.py +++ b/tests/test_time.py @@ -1,6 +1,27 @@ +import time + import tinyio +def test_sleep(): + outs = [] + + def f(): + start = time.monotonic() + yield [tinyio.sleep(0.05), tinyio.sleep(0.1)] + actual_duration = time.monotonic() - start + # Note that these are pretty inaccurate tolerances! This is about what we get with `asyncio` too. + # The reason for this seems to be the accuracy in the `threading.Event.wait()` that we bottom out in. If we need + # greater resolution than this then we could do that by using a busy-loop for the last 1e-2 seconds. + success = 0.09 < actual_duration < 0.11 + outs.append(success) + + loop = tinyio.Loop() + for _ in range(5): + loop.run(f()) + assert sum(outs) >= 4 # We allow one failure, to decrease flakiness. + + def _sleep(x): yield tinyio.sleep(x) return 3 diff --git a/tinyio/__init__.py b/tinyio/__init__.py index cc9b6f3..730b557 100644 --- a/tinyio/__init__.py +++ b/tinyio/__init__.py @@ -2,8 +2,9 @@ from ._core import ( CancelledError as CancelledError, Coro as Coro, + Event as Event, Loop as Loop, ) -from ._sync import Barrier as Barrier, Event as Event, Lock as Lock, Semaphore as Semaphore +from ._sync import Barrier as Barrier, Lock as Lock, Semaphore as Semaphore from ._thread import ThreadPool as ThreadPool, run_in_thread as run_in_thread from ._time import TimeoutError as TimeoutError, sleep as sleep, timeout as timeout diff --git a/tinyio/_background.py b/tinyio/_background.py index 85c0f5a..c5769f7 100644 --- a/tinyio/_background.py +++ b/tinyio/_background.py @@ -1,8 +1,7 @@ -import collections as co -from collections.abc import Callable +from collections.abc import Callable, Generator from typing import Generic, TypeVar -from ._core import Coro +from ._core import Coro, Event _T = TypeVar("_T") @@ -88,23 +87,42 @@ def as_completed_demo(): """ def __init__(self, coros: set[Coro[_T]]): + if not isinstance(coros, set) or any(not isinstance(coro, Generator) for coro in coros): + raise ValueError("`AsCompleted(coros=...)` must be a set of coroutines.") self._coros = set(coros) - self._count = 0 - self._queue = co.deque() + self._put_count = 0 + self._get_count = 0 + self._outs = {} + self._events = [Event() for _ in self._coros] self._started = False def done(self) -> bool: - """Whether all coroutines have completed.""" - - return self._count == len(self._coros) + """Whether all coroutines are being waited on. This does not imply that all coroutines have necessarily + finished executing; it just implies that you should not call `.get()` any more times. + """ + return self._get_count == len(self._events) def get(self) -> Coro[_T]: - """Yields the output of the most next coroutine to complete.""" - + """Yields the output of the next coroutine to complete.""" + get_count = self._get_count + if self._get_count >= len(self._events): + raise RuntimeError( + f"Called `AsCompleted.get` {self._get_count + 1} times, which is greater than the number of coroutines " + f"which are being waited on ({len(self._events)})." + ) + self._get_count += 1 + return self._get(get_count) + + def _get(self, get_count: int): if not self._started: self._started = True - yield {add_done_callback(coro, self._queue.append) for coro in self._coros} - while len(self._queue) == 0: - yield - self._count += 1 - return self._queue.popleft() + + def callback(out): + self._outs[self._put_count] = out + self._events[self._put_count].set() + self._put_count += 1 + + yield {add_done_callback(coro, callback) for coro in self._coros} + self._coros.clear() # Enable them to be GC'd as they complete. + yield from self._events[get_count].wait() + return self._outs.pop(get_count) diff --git a/tinyio/_core.py b/tinyio/_core.py index 9c7ad23..dbb77c4 100644 --- a/tinyio/_core.py +++ b/tinyio/_core.py @@ -1,5 +1,10 @@ import collections as co import dataclasses +import enum +import graphlib +import heapq +import threading +import time import traceback import types import warnings @@ -9,7 +14,13 @@ # -# Loop implementation +# Public API: loop implementation +# +# The main logic is that each time coroutine yields, we create a `_WaitingFor` object which holds a counter for how many +# things it is waiting on before it can wake up. Once this counter hits zero, the `_WaitingFor` object schedules the +# coroutine back on the loop. +# Counters can be decremented in three ways: another coroutine finishes, an `Event.set()` is triggered, or a timeout in +# `Event.wait(timeout=...)` is triggered. # @@ -49,125 +60,334 @@ def run(self, coro: Coro[_Return], exception_group: None | bool = None) -> _Retu The final `return` from `coro`. """ + if not isinstance(coro, Generator): + raise ValueError("Invalid input `coro`, which is not a coroutine (a function using `yield` statements).") queue: co.deque[_Todo] = co.deque() - waiting_on: dict[Coro, list[Coro]] = {} - waiting_for: dict[Coro, _WaitingFor] = {} queue.appendleft(_Todo(coro, None)) + waiting_on = dict[Coro, list[_WaitingFor]]() waiting_on[coro] = [] - # Loop invariant: always holds a single element. It's not really load-bearing, it's just used for making a nice + wait_heap: list[_Wait] = [] + # Loop invariant: `{x.coro for x in queue}.issubset(set(waiting_on.keys()))` + wake_loop = threading.Event() + wake_loop.set() + # Loop invariant: `len(current_coro_ref) == 1`. It's not really load-bearing, it's just used for making a nice # traceback when we get an error. current_coro_ref = [coro] try: - while len(queue) > 0: + while True: + if len(queue) == 0: + if len(waiting_on) == 0: + # We're done. + break + else: + # We might have a cycle bug... + self._check_cycle(waiting_on, coro) + # ...but hopefully we're just waiting on a thread or exogeneous event to unblock one of our + # coroutines. + while len(queue) == 0: + self._wait(wait_heap, wake_loop) + self._clear(wait_heap, wake_loop) + # This whole block needs to be wrapped in a `len(queue)` check, as just because we've + # unblocked doesn't necessarily mean that we're ready to schedule a coroutine: we could have + # something like `yield [event1.wait(...), event2.wait(...)]`, and only one of the two has + # unblocked. + else: + self._clear(wait_heap, wake_loop) todo = queue.pop() current_coro_ref[0] = todo.coro - self._step(todo, queue, waiting_on, waiting_for) + self._step(todo, queue, waiting_on, wait_heap, wake_loop) current_coro_ref[0] = coro - self._check_cycle(waiting_on, coro) except BaseException as e: _cleanup(e, waiting_on, current_coro_ref, exception_group) raise # if not raising an `exception_group` return self._results[coro] - def _check_cycle(self, waiting_on, coro): - del self - if len(waiting_on) != 0: - import graphlib + @staticmethod + def _check_cycle(waiting_on, coro): + sorter = graphlib.TopologicalSorter() + for k, v in waiting_on.items(): + for vi in v: + sorter.add(k, vi.coro) + try: + sorter.prepare() + except graphlib.CycleError: + coro.throw(RuntimeError("Cycle detected in `tinyio` loop. Cancelling all coroutines.")) + + @staticmethod + def _wait(wait_heap: list["_Wait"], wake_loop: threading.Event): + timeout = None + while len(wait_heap) > 0: + soonest = wait_heap[0] + assert soonest.timeout_in_seconds is not None + if soonest.state == _WaitState.DONE: + heapq.heappop(wait_heap) + else: + timeout = soonest.timeout_in_seconds - time.monotonic() + break + wake_loop.wait(timeout=timeout) - try: - graphlib.TopologicalSorter(waiting_on).prepare() - except graphlib.CycleError: - coro.throw(RuntimeError("Cycle detected in `tinyio` loop. Cancelling all coroutines.")) + @staticmethod + def _clear(wait_heap: list["_Wait"], wake_loop: threading.Event): + wake_loop.clear() + while len(wait_heap) > 0: + soonest = wait_heap[0] + assert soonest.timeout_in_seconds is not None + if soonest.state == _WaitState.DONE: + heapq.heappop(wait_heap) + elif soonest.timeout_in_seconds <= time.monotonic(): + heapq.heappop(wait_heap) + soonest.notify_from_timeout() else: - assert False, "Something has gone wrong inside the `tinyio` loop." + break def _step( self, todo: "_Todo", queue: co.deque["_Todo"], - waiting_on: dict[Coro, list[Coro]], - waiting_for: dict[Coro, "_WaitingFor"], + waiting_on: dict[Coro, list["_WaitingFor"]], + wait_heap: list["_Wait"], + wake_loop: threading.Event, ) -> None: try: out = todo.coro.send(todo.value) except StopIteration as e: self._results[todo.coro] = e.value - for coro in waiting_on.pop(todo.coro): - coro_waiting_for = waiting_for[coro] - coro_waiting_for.count -= 1 - if coro_waiting_for.count == 0: - del waiting_for[coro] - if isinstance(coro_waiting_for.coros, list): - value = [self._results[g] for g in coro_waiting_for.coros] - else: - value = self._results[coro_waiting_for.coros] - queue.appendleft(_Todo(coro, value)) + for waiting_for in waiting_on.pop(todo.coro): + waiting_for.decrement() else: original_out = out - if single_coroutine := isinstance(out, Generator): + if type(out) is list and len(out) == 0: + out = None + if isinstance(out, (_Wait, Generator)): out = [out] match out: case None: - queue.appendleft(_Todo(todo.coro, None)) + # original_out will either be `None` or `[]`. + queue.appendleft(_Todo(todo.coro, original_out)) case set(): - queue.appendleft(_Todo(todo.coro, None)) for out_i in out: - if not isinstance(out_i, Generator): - todo.coro.throw(_invalid(out)) - if out_i not in waiting_on.keys(): - queue.appendleft(_Todo(out_i, None)) - waiting_on[out_i] = [] + if isinstance(out_i, Generator): + if out_i not in self._results.keys() and out_i not in waiting_on.keys(): + queue.appendleft(_Todo(out_i, None)) + waiting_on[out_i] = [] + else: + assert not isinstance(out_i, _Wait) + if out_i.gi_frame is None: + todo.coro.throw(_already_finished(out_i)) + todo.coro.throw(_invalid(original_out)) + queue.appendleft(_Todo(todo.coro, None)) case list(): - num_done = 0 + waiting_for = _WaitingFor(len(out), todo.coro, original_out, wake_loop, self._results, queue) for out_i in out: - if not isinstance(out_i, Generator): - # Not just a direct `raise` as we need to shut down this coroutine too + this gives a - # nicer stack trace. - todo.coro.throw(_invalid(out)) - if out_i in self._results.keys(): - # Already finished. - num_done += 1 - elif out_i in waiting_on.keys(): - # Already in queue; someone else is waiting on this coroutine too. - waiting_on[out_i].append(todo.coro) - else: - # New coroutine - waiting_on[out_i] = [todo.coro] - queue.appendleft(_Todo(out_i, None)) - if num_done == len(out): - # All requested coroutines already finished; immediately queue up original coroutine. - # again. - if single_coroutine: - queue.appendleft(_Todo(todo.coro, self._results[original_out])) + if isinstance(out_i, Generator): + if out_i in self._results.keys(): + waiting_for.decrement() + elif out_i in waiting_on.keys(): + waiting_on[out_i].append(waiting_for) + else: + if out_i.gi_frame is None: + todo.coro.throw(_already_finished(out_i)) + queue.appendleft(_Todo(out_i, None)) + waiting_on[out_i] = [waiting_for] + elif isinstance(out_i, _Wait): + out_i.register(waiting_for) + if out_i.timeout_in_seconds is not None: + heapq.heappush(wait_heap, out_i) else: - queue.appendleft(_Todo(todo.coro, [self._results[out_i] for out_i in out])) - else: - assert todo.coro not in waiting_for.keys() - waiting_for[todo.coro] = _WaitingFor( - len(out) - num_done, original_out if single_coroutine else out - ) + todo.coro.throw(_invalid(original_out)) case _: - todo.coro.throw(_invalid(out)) + todo.coro.throw(_invalid(original_out)) class CancelledError(BaseException): """Raised when a `tinyio` coroutine is cancelled due an error in another coroutine.""" -Loop.__module__ = "tinyio" CancelledError.__module__ = "tinyio" +# +# Loop internals, in particular events and waiting +# + + @dataclasses.dataclass(frozen=True) class _Todo: coro: Coro value: Any +# We need at least some use of locks, as `Event`s are public objects that may interact with user threads. If the +# internals of our event/wait/waitingfor mechanisms are modified concurrently then it would be very easy for things to +# go wrong. +# In particular note that our event loop is one actor that is making modifications, in addition to user threads. +# For this reason it doesn't suffice to just have a lock around `Event.{set, clear}`. +# For simplicity, we simply guard all entries into the event/wait/waitingfor mechanism with a single lock. We could try +# to use some other locking strategy but that seems error-prone. +_global_event_lock = threading.RLock() + + @dataclasses.dataclass(frozen=False) class _WaitingFor: - count: int - coros: Coro | list[Coro] + counter: int + coro: Coro + out: "_Wait | Coro | list[_Wait | Coro]" + wake_loop: threading.Event + results: weakref.WeakKeyDictionary[Coro, Any] + queue: co.deque[_Todo] + + def __post_init__(self): + assert self.counter > 0 + + def increment(self): + with _global_event_lock: + # This assert is valid as our only caller is `_Wait.unnotify_from_event`, which will only have a reference + # to us if we haven't completed yet -- otherwise we'd have already called its `_Wait.cleanup` method. + assert self.counter != 0 + self.counter += 1 + + def decrement(self): + # We need a lock here as this may be called simultaneously between our event loop and via `Event.set`. + # (Though `Event.set` has its only internal lock, that doesn't cover the event loop as well.) + with _global_event_lock: + assert self.counter > 0 + self.counter -= 1 + if self.counter == 0: + match self.out: + case None: + result = None + waits = [] + case _Wait(): + result = None + waits = [self.out] + case Generator(): + result = self.results[self.out] + waits = [] + case list(): + result = [None if isinstance(out_i, _Wait) else self.results[out_i] for out_i in self.out] + waits = [out_i for out_i in self.out if isinstance(out_i, _Wait)] + case _: + assert False + for wait in waits: + wait.cleanup() + self.queue.appendleft(_Todo(self.coro, result)) + # If we're callling this function from a thread, and the main event loop is blocked, then use this to + # notify the main event loop that it can wake up. + self.wake_loop.set() + + +class _WaitState(enum.Enum): + INITIALISED = "initialised" + REGISTERED = "registered" + NOTIFIED_EVENT = "notified_event" + NOTIFIED_TIMEOUT = "notified_timeout" + DONE = "done" + + +class _Wait: + def __init__(self, event: "Event", timeout_in_seconds: None | int | float): + self._event = event + self._timeout_in_seconds = timeout_in_seconds + self._waiting_for = None + self.state = _WaitState.INITIALISED + + # This is basically just a second `__init__` method. We're not really initialised until this has been called + # precisely once as well. The reason we have two is that an end-user creates us during `Event.wait()`, and then we + # need to register on the event loop. + def register(self, waiting_for: "_WaitingFor") -> None: + with _global_event_lock: + if self.state is not _WaitState.INITIALISED: + waiting_for.coro.throw( + RuntimeError( + "Do not yield the same `event.wait()` multiple times. Make a new `.wait()` call instead." + ) + ) + assert self._waiting_for is None + assert self._event is not None + self.state = _WaitState.REGISTERED + if self._timeout_in_seconds is None: + self.timeout_in_seconds = None + else: + self.timeout_in_seconds = time.monotonic() + self._timeout_in_seconds + self._waiting_for = waiting_for + self._event._waits[self] = None + if self._event.is_set(): + self.notify_from_event() + + def notify_from_event(self): + with _global_event_lock: + # We cannot have `NOTIFIED_EVENT` as our event will have toggled its internal state to `True` as part of + # calling us, and so future `Event.set()` calls will not call `.notify_from_event`. + # We cannot have `DONE` as this is only set during `.cleanup()`, and at that point we deregister from + # `self._event._waits`. + assert self.state in {_WaitState.REGISTERED, _WaitState.NOTIFIED_TIMEOUT} + assert self._waiting_for is not None + if self.state == _WaitState.REGISTERED: + self.state = _WaitState.NOTIFIED_EVENT + self._waiting_for.decrement() + + def notify_from_timeout(self): + with _global_event_lock: + assert self.state in {_WaitState.REGISTERED, _WaitState.NOTIFIED_EVENT} + assert self._waiting_for is not None + is_registered = self.state == _WaitState.REGISTERED + self.state = _WaitState.NOTIFIED_TIMEOUT # Override `NOTIFIED_EVENT` in case we `unnotify_from_event` later + if is_registered: + self._waiting_for.decrement() + + def unnotify_from_event(self): + with _global_event_lock: + assert self.state in {_WaitState.NOTIFIED_EVENT, _WaitState.NOTIFIED_TIMEOUT} + assert self._waiting_for is not None + # But ignore un-notifies if we've already triggered our timeout. + if self.state is _WaitState.NOTIFIED_EVENT: + self.state = _WaitState.REGISTERED + self._waiting_for.increment() + + def cleanup(self): + with _global_event_lock: + assert self.state in {_WaitState.NOTIFIED_EVENT, _WaitState.NOTIFIED_TIMEOUT} + assert self._waiting_for is not None + assert self._event is not None + self.state = _WaitState.DONE + self._waiting_for = None # For GC purposes. + del self._event._waits[self] + self._event = None # For GC purposes. + + # For `heapq` to work. + def __lt__(self, other): + return self.timeout_in_seconds < other.timeout_in_seconds + + +class Event: + """A marker that something has happened.""" + + def __init__(self): + self._value = False + self._waits = dict[_Wait, None]() + + def is_set(self): + return self._value + + def set(self): + with _global_event_lock: + if not self._value: + for wait in self._waits.copy().keys(): + wait.notify_from_event() + self._value = True + + def clear(self): + with _global_event_lock: + if self._value: + for wait in self._waits.keys(): + wait.unnotify_from_event() + self._value = False + + def wait(self, timeout_in_seconds: None | int | float = None) -> Coro[None]: + yield _Wait(self, timeout_in_seconds) + + def __bool__(self): + raise TypeError("Cannot convert `tinyio.Event` to boolean. Did you mean `event.is_set()`?") # @@ -185,7 +405,7 @@ def _strip_frames(e: BaseException, n: int): def _cleanup( base_e: BaseException, - waiting_on: dict[Coro, list[Coro]], + waiting_on: dict[Coro, list[_WaitingFor]], current_coro_ref: list[Coro], exception_group: None | bool, ): @@ -213,6 +433,8 @@ def _cleanup( except BaseException as e: # Skipped frame is the `coro.throw` above. other_errors[coro] = _strip_frames(e, 1) + if getattr(e, "__tinyio_no_warn__", False): + continue details = "".join(traceback.format_exception_only(e)).strip() what_did = f"raised the exception `{details}`." else: @@ -259,7 +481,8 @@ def _cleanup( # Either no-one is waiting on us and we're at the root, or multiple are waiting and we can't uniquely append # tracebacks any more. break - [coro] = waiting_on[coro] + [waiting_for] = waiting_on[coro] + coro = waiting_for.coro base_e.with_traceback(tb) # pyright: ignore[reportPossiblyUnboundVariable] if exception_group is None: exception_group = len(other_errors) > 0 @@ -291,7 +514,7 @@ def _cleanup( def _invalid(out): - msg = f"Invalid yield {out}. Must be either `None`, a coroutine, or a list of coroutines." + msg = f"Invalid yield {out}. Must be either `None`, a coroutine, or a list/set of coroutines." if type(out) is tuple: # We could support this but I find the `[]` visually distinctive. msg += ( @@ -299,3 +522,10 @@ def _invalid(out): "not `yield foo, bar`." ) return RuntimeError(msg) + + +def _already_finished(out): + return RuntimeError( + f"The coroutine `{out}` has already finished. However it has not been seen by the `tinyio` loop before and as " + "such does not have any result associated with it." + ) diff --git a/tinyio/_sync.py b/tinyio/_sync.py index cd712ee..f87ee54 100644 --- a/tinyio/_sync.py +++ b/tinyio/_sync.py @@ -1,6 +1,7 @@ +import collections as co import contextlib -from ._core import Coro +from ._core import Coro, Event class Semaphore: @@ -23,24 +24,34 @@ def __init__(self, value: int): if value <= 0: raise ValueError("`tinyio.Semaphore(value=...)` must be positive.") self._value = value + self._events = co.deque[Event]() def __call__(self) -> Coro[contextlib.AbstractContextManager[None]]: - while self._value <= 0: - assert self._value >= 0 - yield + if self._value == 0: + event = Event() + self._events.append(event) + yield from event.wait() + assert self._value > 0 self._value -= 1 - return _close_semaphore(self, [False]) + return _CloseSemaphore(self, [False]) -@contextlib.contextmanager -def _close_semaphore(semaphore: Semaphore, cell: list[bool]): - if cell[0]: - raise RuntimeError("Use a new `semaphore()` call in each `with (yield semaphore())`, do not re-use it.") - cell[0] = True - try: - yield - finally: - semaphore._value += 1 +class _CloseSemaphore: + def __init__(self, semaphore: Semaphore, cell: list[bool]): + self._semaphore = semaphore + self._cell = cell + + def __enter__(self): + if self._cell[0]: + raise RuntimeError("Use a new `semaphore()` call in each `with (yield semaphore())`, do not re-use it.") + self._cell[0] = True + + def __exit__(self, exc_type, exc_value, exc_tb): + del exc_type, exc_value, exc_tb + self._semaphore._value += 1 + if len(self._semaphore._events) > 0: + event = self._semaphore._events.popleft() + event.set() class Lock: @@ -59,27 +70,12 @@ class Barrier: def __init__(self, value: int): self._count = 0 self._value = value + self._event = Event() def wait(self): count = self._count self._count += 1 - while self._count < self._value: - yield + if self._count == self._value: + self._event.set() + yield from self._event.wait() return count - - -class Event: - """A marker than something has happened.""" - - def __init__(self): - self._set = False - - def is_set(self): - return self._set - - def set(self): - self._set = True - - def wait(self): - while not self._set: - yield diff --git a/tinyio/_thread.py b/tinyio/_thread.py index a111615..81e0d52 100644 --- a/tinyio/_thread.py +++ b/tinyio/_thread.py @@ -1,9 +1,10 @@ +import contextlib import ctypes import threading from collections.abc import Callable, Iterable from typing import ParamSpec, TypeVar, cast -from ._core import CancelledError, Coro +from ._core import CancelledError, Coro, Event from ._sync import Semaphore @@ -31,6 +32,7 @@ def run_in_thread(fn: Callable[_Params, _Return], /, *args: _Params.args, **kwar is_exception = None result = None + event = Event() def target(): nonlocal result, is_exception @@ -38,26 +40,51 @@ def target(): result = fn(*args, **kwargs) is_exception = False except BaseException as e: - result = e - is_exception = True + try: + result = e + is_exception = True + except BaseException: + # We have an `except` here just in case we were already within the `except` block due to an error from + # within the thread, whilst our `ctypes` error below triggers. + result = e + is_exception = True + raise + finally: + event.set() t = threading.Thread(target=target) try: t.start() - while is_exception is None: - yield + yield from event.wait() except BaseException as e: # We can end up here if an `tinyio.CancelledError` arises out of the `yield`, or from an exogeneous - # `KeyboardInterrupt`, or from re-raising the error out of our thread. - thread_id = t.ident - assert thread_id is not None - # Raise a `CancelledError` in the thread that is running the task. This allows the thread to do any cleanup. - # This is not readily supported and needs to be done via ctypes, see: https://gist.github.com/liuw/2407154. - ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(thread_id), ctypes.py_object(CancelledError)) - t.join() + # `KeyboardInterrupt`. + + # First check whether we have a race condition: that our thread itself produced an error whilst we were being + # cancelled due to another error. If this is the case then we suppress the warning in the core event loop about + # resource cleanup. + # This is best-effort as our thread may still crash with its own exception between now and the end of this + # function. + already_error = is_exception is True + + # Second, cancel the thread if necessary. This `event.is_set()` isn't load-bearing, it's just a code cleanliness + # thing, as raising the `CancelledError` in the thread is a no-op if the thread has already terminated. + # (And in principle the thread may terminate after we check the event but before we try raising the exception.) + if not event.is_set(): + thread_id = t.ident + assert thread_id is not None + # Raise a `CancelledError` in the thread that is running the task. This allows the thread to do any cleanup. + # This is not readily supported and needs to be done via ctypes, see: https://gist.github.com/liuw/2407154. + ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(thread_id), ctypes.py_object(CancelledError)) + t.join() + # Our thread above has now completed. - if is_exception and type(e) is CancelledError: + # We can `is_exception is True` either because the thread already crashed, or from our `ctypes` crash above. + # We can have `is_exception is False` if the the thread managed to finish successfully whilst we are in this + # `except` block. + # I don't think we can have `is_exception is None`. + if is_exception is True and type(e) is CancelledError: # We were cancelled. # # Note that we raise this regardless of whether `result` is itself a `CancelledError`. It's probably the @@ -82,11 +109,14 @@ def target(): e.__traceback__ = e.__traceback__.tb_next e.__context__ = context e.__cause__ = cause + with contextlib.suppress(Exception): + e.__tinyio_no_warn__ = already_error # pyright: ignore[reportAttributeAccessIssue] raise else: # Probably a `KeyboardInterrupt`, forward it on. raise else: + assert is_exception is not None if is_exception: try: raise cast(BaseException, result) diff --git a/tinyio/_time.py b/tinyio/_time.py index 1188e44..1e13320 100644 --- a/tinyio/_time.py +++ b/tinyio/_time.py @@ -1,10 +1,8 @@ import contextlib -import time from typing import TypeVar from ._background import add_done_callback -from ._core import Coro -from ._sync import Event +from ._core import Coro, Event _T = TypeVar("_T") @@ -21,9 +19,7 @@ def sleep(delay_in_seconds: int | float) -> Coro[None]: A coroutine that just sleeps. """ - timeout = time.monotonic() + delay_in_seconds - while time.monotonic() <= timeout: - yield + yield from Event().wait(delay_in_seconds) class TimeoutError(BaseException): @@ -47,13 +43,18 @@ def timeout(coro: Coro[_T], timeout_in_seconds: int | float) -> Coro[tuple[None corresponding to whether `coro` completed within the timeout or not. """ done = Event() - timeout = time.monotonic() + timeout_in_seconds - yield {add_done_callback(coro, lambda _: done.set())} - while time.monotonic() <= timeout and not done.is_set(): - yield - if done.is_set(): - return (yield coro), True - else: + outs = [] + + def callback(out): + outs.append(out) + done.set() + + yield {add_done_callback(coro, callback)} + yield from done.wait(timeout_in_seconds) + if len(outs) == 0: with contextlib.suppress(TimeoutError): coro.throw(TimeoutError) return None, False + else: + [out] = outs + return out, True