From 0b74c113234496acf4e2e076749e26fc78a752b1 Mon Sep 17 00:00:00 2001 From: Patrick Kidger <33688385+patrick-kidger@users.noreply.github.com> Date: Fri, 1 Aug 2025 18:58:11 +0200 Subject: [PATCH 1/9] The collection of _WaitingFor objects is now held on the _Wait insted of the Event. This is in support of potentially allowing timeouts in the future - we now distinguish specifically which _Wait we might be timing out. --- tinyio/_core.py | 75 ++++++++++++++++++++++++------------------------- 1 file changed, 37 insertions(+), 38 deletions(-) diff --git a/tinyio/_core.py b/tinyio/_core.py index 40b50db..9728526 100644 --- a/tinyio/_core.py +++ b/tinyio/_core.py @@ -35,7 +35,7 @@ class Event: def __init__(self): self._value = False - self._waiting_fors = dict[Coro, _WaitingFor]() + self._waits: list[_Wait] = [] self._lock = threading.Lock() def is_set(self): @@ -46,31 +46,19 @@ def set(self): # call `.set` simultaneously. with self._lock: if not self._value: - to_delete = [] - for coro, waiting_for in self._waiting_fors.items(): - waiting_for.decrement() - if waiting_for.counter == 0: - to_delete.append(coro) - # Deleting them here isn't necessary for logical correctness, but is done just to allow things to be - # GC'd. As an `Event` is user-supplied then it could have anything holding references to it, so this is - # the appropriate point to allow our internals to be GC'd as soon as possible. - for coro in to_delete: - del self._waiting_fors[coro] + new_waits = [] + for wait in self._waits: + wait.decrement() + if wait.waiting_for is not None: + new_waits.append(wait) + self._waits = new_waits # i.e. delete all completed waits self._value = True def wait(self) -> Coro[None]: # Lie about the return type, this is an implementation detail that should otherwise feel like a coroutine. - return _Wait(self, used=False) # pyright: ignore[reportReturnType] - - def _register(self, waiting_for: "_WaitingFor") -> None: - # Here we need the lock in case `self.set()` is being called at the same time as this method. Importantly, this - # is the same lock as is used in `self.set()`. - with self._lock: - if self._value: - waiting_for.decrement() - else: - assert waiting_for.coro not in self._waiting_fors.keys() - self._waiting_fors[waiting_for.coro] = waiting_for + wait = _Wait(self) + self._waits.append(wait) + return wait # pyright: ignore[reportReturnType] def __bool__(self): raise TypeError("Cannot convert `tinyio.Event` to boolean. Did you mean `event.is_set()`?") @@ -80,6 +68,32 @@ def __bool__(self): class _Wait: event: Event used: bool + waiting_for: "None | _WaitingFor" + + def __init__(self, event: Event): + self.event = event + self.used = False + self.waiting_for = None + + def decrement(self): + if self.waiting_for is not None: + self.waiting_for.decrement() + if self.waiting_for.counter == 0: + self.waiting_for = None + + def register(self, waiting_for: "_WaitingFor") -> None: + if self.used: + e = RuntimeError("Do not yield the same `event.wait()` multiple times. Make a new `.wait()` call instead.") + waiting_for.coro.throw(e) + self.used = True + # Here we need the lock in case `self.event.set()` is being called at the same time as this method. Importantly, + # this is the same lock as is used in `self.event.set()`. + with self.event._lock: + if self.event.is_set(): + waiting_for.decrement() + else: + assert self.waiting_for is None + self.waiting_for = waiting_for @dataclasses.dataclass(frozen=False) @@ -245,7 +259,6 @@ def _step( waiting_for = _WaitingFor( len(out), todo.coro, original_out, wake_loop, self._results, queue, threading.Lock() ) - seen_events = set() for out_i in out: if isinstance(out_i, Generator): if out_i in self._results.keys(): @@ -256,21 +269,7 @@ def _step( queue.appendleft(_Todo(out_i, None)) waiting_on[out_i] = [waiting_for] elif isinstance(out_i, _Wait): - if out_i.used: - # I don't think there's any actual harm in this, but it's a weird thing to do. We - # reserve the right for this to mean something more precise in the future. - todo.coro.throw( - RuntimeError( - "Do not yield the same `event.wait()` multiple times. Make a new `.wait()` " - "call instead." - ) - ) - out_i.used = True - if out_i.event in seen_events: - waiting_for.decrement() - else: - out_i.event._register(waiting_for) - seen_events.add(out_i.event) + out_i.register(waiting_for) else: todo.coro.throw(_invalid(original_out)) case _: From 4e438842eb1c688259ac308ffe747709896eeb41 Mon Sep 17 00:00:00 2001 From: Patrick Kidger <33688385+patrick-kidger@users.noreply.github.com> Date: Fri, 1 Aug 2025 21:35:12 +0200 Subject: [PATCH 2/9] Refactored to use several _Wait methods in prep for timeouts --- README.md | 8 +- tests/test_sync.py | 102 ++++++++++++++++++++++++- tinyio/_core.py | 180 +++++++++++++++++++++++++++++---------------- 3 files changed, 224 insertions(+), 66 deletions(-) diff --git a/README.md b/README.md index 5602322..27d246d 100644 --- a/README.md +++ b/README.md @@ -137,7 +137,13 @@ tinyio.Lock - `tinyio.Event()` - This has a method `.wait()`, which is a coroutine you can `yield` on. This will unblock once its `.set()` method is called (typically from another coroutine). It also has a `is_set()` method for checking whether it has been set. + This is a wrapper around a boolean flag, initialised with `False`. + This has the following methods: + + - `.is_set()`: check the value of the flag. + - `.set()`: set the flag to `True`. + - `.clear()`: set the flag to `False`. + - `.wait()`, which is a coroutine you can `yield` on. This will unblock if the internal flag is `True`. (Typically this is accomplished by calling `.set()` from another coroutine or from a thread.) --- diff --git a/tests/test_sync.py b/tests/test_sync.py index 2bf7d27..f40333f 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -152,7 +152,7 @@ def test_event_run(is_set: bool): @pytest.mark.parametrize("is_set", (False, True)) -def test_event_double_wait(is_set: bool): +def test_event_repeated_wait(is_set: bool): event = tinyio.Event() if is_set: event.set() @@ -184,3 +184,103 @@ def _foo(): loop = tinyio.Loop() loop.run(_foo()) + + +@pytest.mark.parametrize("is_set", (False, True)) +def test_event_simultaneous_repeated_wait(is_set: bool): + event = tinyio.Event() + if is_set: + event.set() + + def foo(): + wait = event.wait() + if not is_set: + t = threading.Timer(0.1, lambda: event.set()) + t.start() + yield [wait, wait] + + loop = tinyio.Loop() + with pytest.raises(RuntimeError, match=re.escape("Do not yield the same `event.wait()` multiple times")): + loop.run(foo()) + + +def test_event_clear_not_strict(): + event = tinyio.Event() + event.set() + event.clear() + assert not event.is_set() + + out = [] + + def foo(): + yield event.wait() + out.append(2) + + def bar(): + yield + out.append(1) + event.set() + yield + # Even though we `clear()` the event again afterwards, both `foo()` still unblock. + event.clear() + out.append(3) + + def baz(): + yield [foo(), foo(), bar()] + + loop = tinyio.Loop() + loop.run(baz()) + assert out == [1, 2, 2, 3] + + +class _Semaphore: + def __init__(self, value): + self.value = value + self.event = tinyio.Event() + self.event.set() + + def __call__(self): + while True: + yield self.event.wait() + if self.event.is_set(): + break + assert self.value > 0 + self.value -= 1 + if self.value == 0: + self.event.clear() + return _closing(self) + + +@contextlib.contextmanager +def _closing(semaphore): + try: + yield + finally: + semaphore.value += 1 + semaphore.event.set() + + +def test_alternate_semaphore(): + """This test is useful as it makes use of `Event.clear()`.""" + + counter = 0 + + def _count(semaphore, i): + nonlocal counter + with (yield semaphore()): + counter += 1 + if counter > 2: + raise RuntimeError + yield + counter -= 1 + return i + + def _run(value): + semaphore = _Semaphore(value) + out = yield [_count(semaphore, i) for i in range(50)] + return out + + loop = tinyio.Loop() + assert loop.run(_run(2)) == list(range(50)) + with pytest.raises(RuntimeError): + loop.run(_run(3)) diff --git a/tinyio/_core.py b/tinyio/_core.py index 9728526..008acee 100644 --- a/tinyio/_core.py +++ b/tinyio/_core.py @@ -1,5 +1,6 @@ import collections as co import dataclasses +import enum import graphlib import threading import traceback @@ -30,70 +31,108 @@ class _Todo: value: Any +# We need at least some use of locks, as `Event`s are public objects that may interact with user threads. If the +# internals of our event/wait/waitingfor mechanisms are modified concurrently then it would be very easy for things to +# go wrong. +# In particular note that our event loop is one actor that is making modifications, in addition to user threads. +# For this reason it doesn't suffice to just have a lock around `Event.{set, clear}`. +# For simplicity, we simply guard all entries into the event/wait/waitingfor mechanism with a single lock. We could try +# to use some other locking strategy but that seems error-prone. +_global_event_lock = threading.RLock() + + class Event: """A marker that something has happened.""" def __init__(self): self._value = False - self._waits: list[_Wait] = [] - self._lock = threading.Lock() + self._waits = dict[_Wait, None]() def is_set(self): return self._value def set(self): - # This is a user-visible class, so they could be doing anything with it - in particular multiple threads may - # call `.set` simultaneously. - with self._lock: + with _global_event_lock: if not self._value: - new_waits = [] - for wait in self._waits: - wait.decrement() - if wait.waiting_for is not None: - new_waits.append(wait) - self._waits = new_waits # i.e. delete all completed waits - self._value = True + for wait in self._waits.copy().keys(): + wait.notify_from_event() + self._value = True + + def clear(self): + with _global_event_lock: + if self._value: + for wait in self._waits.keys(): + wait.unnotify_from_event() + self._value = False def wait(self) -> Coro[None]: # Lie about the return type, this is an implementation detail that should otherwise feel like a coroutine. - wait = _Wait(self) - self._waits.append(wait) - return wait # pyright: ignore[reportReturnType] + return _Wait(self) # pyright: ignore[reportReturnType] def __bool__(self): raise TypeError("Cannot convert `tinyio.Event` to boolean. Did you mean `event.is_set()`?") -@dataclasses.dataclass(frozen=False) -class _Wait: - event: Event - used: bool - waiting_for: "None | _WaitingFor" +class _WaitState(enum.Enum): + INITIALISED = "initialised" + REGISTERED = "registered" + NOTIFIED_EVENT = "notified_event" + NOTIFIED_TIMEOUT = "notified_timeout" + DONE = "done" - def __init__(self, event: Event): - self.event = event - self.used = False - self.waiting_for = None - def decrement(self): - if self.waiting_for is not None: - self.waiting_for.decrement() - if self.waiting_for.counter == 0: - self.waiting_for = None +class _Wait: + def __init__(self, event: Event): + self._event = event + self._waiting_for = None + self._state = _WaitState.INITIALISED + # This is basically just a second `__init__` method. We're not really initialised until this has been called + # precisely once as well. The reason we have two is that an end-user creates us during `Event.wait()`, and then we + # need to register on the event loop. def register(self, waiting_for: "_WaitingFor") -> None: - if self.used: - e = RuntimeError("Do not yield the same `event.wait()` multiple times. Make a new `.wait()` call instead.") - waiting_for.coro.throw(e) - self.used = True - # Here we need the lock in case `self.event.set()` is being called at the same time as this method. Importantly, - # this is the same lock as is used in `self.event.set()`. - with self.event._lock: - if self.event.is_set(): - waiting_for.decrement() - else: - assert self.waiting_for is None - self.waiting_for = waiting_for + with _global_event_lock: + if self._state is not _WaitState.INITIALISED: + waiting_for.coro.throw( + RuntimeError( + "Do not yield the same `event.wait()` multiple times. Make a new `.wait()` call instead." + ) + ) + self._state = _WaitState.REGISTERED + assert self._waiting_for is None + self._waiting_for = waiting_for + self._event._waits[self] = None + if self._event.is_set(): + self.notify_from_event() + + def notify_from_event(self): + with _global_event_lock: + assert self._state is _WaitState.REGISTERED + assert self._waiting_for is not None + self._state = _WaitState.NOTIFIED_EVENT + self._waiting_for.decrement() + + def notify_from_timeout(self): + with _global_event_lock: + assert self._state is _WaitState.REGISTERED + assert self._waiting_for is not None + self._state = _WaitState.NOTIFIED_TIMEOUT + self._waiting_for.decrement() + + def unnotify_from_event(self): + with _global_event_lock: + assert self._state in {_WaitState.NOTIFIED_EVENT, _WaitState.NOTIFIED_TIMEOUT} + assert self._waiting_for is not None + if self._state is _WaitState.NOTIFIED_EVENT: + self._waiting_for.increment() + # But ignore un-notifies if we've already triggered our timeout. + + def cleanup(self): + with _global_event_lock: + assert self._state in {_WaitState.NOTIFIED_EVENT, _WaitState.NOTIFIED_TIMEOUT} + self._state = _WaitState.DONE + self._waiting_for = None + del self._event._waits[self] @dataclasses.dataclass(frozen=False) @@ -104,34 +143,45 @@ class _WaitingFor: wake_loop: threading.Event results: weakref.WeakKeyDictionary[Coro, Any] queue: co.deque[_Todo] - lock: threading.Lock def __post_init__(self): assert self.counter > 0 + def increment(self): + with _global_event_lock: + # This assert is valid as our only caller is `_Wait.unnotify_from_event`, which will only have a reference + # to us if we haven't completed yet -- otherwise we'd have already called its `_Wait.cleanup` method. + assert self.counter != 0 + self.counter += 1 + def decrement(self): # We need a lock here as this may be called simultaneously between our event loop and via `Event.set`. # (Though `Event.set` has its only internal lock, that doesn't cover the event loop as well.) - with self.lock: + with _global_event_lock: assert self.counter > 0 self.counter -= 1 - schedule = self.counter == 0 - if schedule: - match self.out: - case None: - result = None - case _Wait(): - result = None - case Generator(): - result = self.results[self.out] - case list(): - result = [None if isinstance(out_i, _Wait) else self.results[out_i] for out_i in self.out] - case _: - assert False - self.queue.appendleft(_Todo(self.coro, result)) - # If we're callling this function from a thread, and the main event loop is blocked, then use this to notify - # the main event loop that it can wake up. - self.wake_loop.set() + if self.counter == 0: + match self.out: + case None: + result = None + waits = [] + case _Wait(): + result = None + waits = [self.out] + case Generator(): + result = self.results[self.out] + waits = [] + case list(): + result = [None if isinstance(out_i, _Wait) else self.results[out_i] for out_i in self.out] + waits = [out_i for out_i in self.out if isinstance(out_i, _Wait)] + case _: + assert False + for wait in waits: + wait.cleanup() + self.queue.appendleft(_Todo(self.coro, result)) + # If we're callling this function from a thread, and the main event loop is blocked, then use this to + # notify the main event loop that it can wake up. + self.wake_loop.set() class Loop: @@ -218,7 +268,11 @@ def _check_cycle(self, waiting_on, coro): coro.throw(RuntimeError("Cycle detected in `tinyio` loop. Cancelling all coroutines.")) def _step( - self, todo: _Todo, queue: co.deque[_Todo], waiting_on: dict[Coro, list[_WaitingFor]], wake_loop: threading.Event + self, + todo: _Todo, + queue: co.deque[_Todo], + waiting_on: dict[Coro, list[_WaitingFor]], + wake_loop: threading.Event, ) -> None: try: out = todo.coro.send(todo.value) @@ -256,9 +310,7 @@ def _step( todo.coro.throw(_invalid(original_out)) queue.appendleft(_Todo(todo.coro, None)) case list(): - waiting_for = _WaitingFor( - len(out), todo.coro, original_out, wake_loop, self._results, queue, threading.Lock() - ) + waiting_for = _WaitingFor(len(out), todo.coro, original_out, wake_loop, self._results, queue) for out_i in out: if isinstance(out_i, Generator): if out_i in self._results.keys(): From 717627e293a0bab2b3acb7cbd8b669c47ca9bb80 Mon Sep 17 00:00:00 2001 From: Patrick Kidger <33688385+patrick-kidger@users.noreply.github.com> Date: Fri, 1 Aug 2025 22:35:19 +0200 Subject: [PATCH 3/9] Added support Event.wait(timeout=...) --- tinyio/_core.py | 58 +++++++++++++++++++++++++++++++++++-------------- tinyio/_time.py | 14 ++---------- 2 files changed, 44 insertions(+), 28 deletions(-) diff --git a/tinyio/_core.py b/tinyio/_core.py index 008acee..8007506 100644 --- a/tinyio/_core.py +++ b/tinyio/_core.py @@ -2,7 +2,9 @@ import dataclasses import enum import graphlib +import heapq import threading +import time import traceback import types import warnings @@ -65,9 +67,9 @@ def clear(self): wait.unnotify_from_event() self._value = False - def wait(self) -> Coro[None]: + def wait(self, timeout_in_seconds: None | int | float = None) -> Coro[None]: # Lie about the return type, this is an implementation detail that should otherwise feel like a coroutine. - return _Wait(self) # pyright: ignore[reportReturnType] + return _Wait(self, timeout_in_seconds) # pyright: ignore[reportReturnType] def __bool__(self): raise TypeError("Cannot convert `tinyio.Event` to boolean. Did you mean `event.is_set()`?") @@ -82,24 +84,28 @@ class _WaitState(enum.Enum): class _Wait: - def __init__(self, event: Event): + def __init__(self, event: Event, timeout_in_seconds: None | int | float): self._event = event self._waiting_for = None - self._state = _WaitState.INITIALISED + self.state = _WaitState.INITIALISED + if timeout_in_seconds is not None: + timeout_in_seconds = time.monotonic() + timeout_in_seconds + self.timeout_in_seconds = timeout_in_seconds # This is basically just a second `__init__` method. We're not really initialised until this has been called # precisely once as well. The reason we have two is that an end-user creates us during `Event.wait()`, and then we # need to register on the event loop. def register(self, waiting_for: "_WaitingFor") -> None: with _global_event_lock: - if self._state is not _WaitState.INITIALISED: + if self.state is not _WaitState.INITIALISED: waiting_for.coro.throw( RuntimeError( "Do not yield the same `event.wait()` multiple times. Make a new `.wait()` call instead." ) ) - self._state = _WaitState.REGISTERED assert self._waiting_for is None + assert self._event is not None + self.state = _WaitState.REGISTERED self._waiting_for = waiting_for self._event._waits[self] = None if self._event.is_set(): @@ -107,32 +113,38 @@ def register(self, waiting_for: "_WaitingFor") -> None: def notify_from_event(self): with _global_event_lock: - assert self._state is _WaitState.REGISTERED + assert self.state is _WaitState.REGISTERED assert self._waiting_for is not None - self._state = _WaitState.NOTIFIED_EVENT + self.state = _WaitState.NOTIFIED_EVENT self._waiting_for.decrement() def notify_from_timeout(self): with _global_event_lock: - assert self._state is _WaitState.REGISTERED + assert self.state is _WaitState.REGISTERED assert self._waiting_for is not None - self._state = _WaitState.NOTIFIED_TIMEOUT + self.state = _WaitState.NOTIFIED_TIMEOUT self._waiting_for.decrement() def unnotify_from_event(self): with _global_event_lock: - assert self._state in {_WaitState.NOTIFIED_EVENT, _WaitState.NOTIFIED_TIMEOUT} + assert self.state in {_WaitState.NOTIFIED_EVENT, _WaitState.NOTIFIED_TIMEOUT} assert self._waiting_for is not None - if self._state is _WaitState.NOTIFIED_EVENT: + if self.state is _WaitState.NOTIFIED_EVENT: self._waiting_for.increment() # But ignore un-notifies if we've already triggered our timeout. def cleanup(self): with _global_event_lock: - assert self._state in {_WaitState.NOTIFIED_EVENT, _WaitState.NOTIFIED_TIMEOUT} - self._state = _WaitState.DONE + assert self.state in {_WaitState.NOTIFIED_EVENT, _WaitState.NOTIFIED_TIMEOUT} + assert self._waiting_for is not None + assert self._event is not None + self.state = _WaitState.DONE self._waiting_for = None del self._event._waits[self] + self._event = None # For GC purposes. + + def __lt__(self, other): + return self.timeout_in_seconds < other.timeout_in_seconds @dataclasses.dataclass(frozen=False) @@ -228,6 +240,7 @@ def gen(coro=coro): queue.appendleft(_Todo(coro, None)) waiting_on = dict[Coro, list[_WaitingFor]]() waiting_on[coro] = [] + wait_heap = [] # Loop invariant: `{x.coro for x in queue}.issubset(set(waiting_on.keys()))` wake_loop = threading.Event() wake_loop.set() @@ -245,11 +258,21 @@ def gen(coro=coro): self._check_cycle(waiting_on, coro) # ...but hopefully we're just waiting on a thread or exogeneous event to unblock one of our # coroutines. - wake_loop.wait() + timeout = None + wait = None + while len(wait_heap) > 0: + wait = heapq.heappop(wait_heap) + if wait.state != _WaitState.DONE: + timeout = wait.timeout_in_seconds - time.monotonic() + break + not_timed_out = wake_loop.wait(timeout=timeout) + if not not_timed_out: + cast(_Wait, wait).notify_from_timeout() + del timeout, wait, not_timed_out wake_loop.clear() todo = queue.pop() current_coro_ref[0] = todo.coro - self._step(todo, queue, waiting_on, wake_loop) + self._step(todo, queue, waiting_on, wait_heap, wake_loop) current_coro_ref[0] = coro except BaseException as e: _cleanup(e, waiting_on, current_coro_ref, exception_group) @@ -272,6 +295,7 @@ def _step( todo: _Todo, queue: co.deque[_Todo], waiting_on: dict[Coro, list[_WaitingFor]], + wait_heap: list[_Wait], wake_loop: threading.Event, ) -> None: try: @@ -322,6 +346,8 @@ def _step( waiting_on[out_i] = [waiting_for] elif isinstance(out_i, _Wait): out_i.register(waiting_for) + if out_i.timeout_in_seconds is not None: + heapq.heappush(wait_heap, out_i) else: todo.coro.throw(_invalid(original_out)) case _: diff --git a/tinyio/_time.py b/tinyio/_time.py index f05a85b..cc38614 100644 --- a/tinyio/_time.py +++ b/tinyio/_time.py @@ -1,11 +1,8 @@ import contextlib -import threading -import time from typing import TypeVar from ._background import add_done_callback from ._core import Coro, Event -from ._thread import run_in_thread _T = TypeVar("_T") @@ -22,7 +19,7 @@ def sleep(delay_in_seconds: int | float) -> Coro[None]: A coroutine that just sleeps. """ - yield run_in_thread(time.sleep, delay_in_seconds) + yield Event().wait(delay_in_seconds) class TimeoutError(BaseException): @@ -48,19 +45,12 @@ def timeout(coro: Coro[_T], timeout_in_seconds: int | float) -> Coro[tuple[None done = Event() outs = [] - def timeout(): - time.sleep(timeout_in_seconds) - done.set() - def callback(out): outs.append(out) done.set() - # Daemon so as not to block interpreter shutdown. - t = threading.Thread(target=timeout, daemon=True) - t.start() yield {add_done_callback(coro, callback)} - yield done.wait() + yield done.wait(timeout_in_seconds) if len(outs) == 0: with contextlib.suppress(TimeoutError): coro.throw(TimeoutError) From 843f51d3c63eb05b8dfcaa66eb92bec9eae31fbc Mon Sep 17 00:00:00 2001 From: Patrick Kidger <33688385+patrick-kidger@users.noreply.github.com> Date: Sat, 2 Aug 2025 01:03:55 +0200 Subject: [PATCH 4/9] Added test for sleep times --- tests/test_time.py | 21 +++++++++++++++++++++ tinyio/_core.py | 8 +++++--- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/tests/test_time.py b/tests/test_time.py index c01cfa8..9a5a11d 100644 --- a/tests/test_time.py +++ b/tests/test_time.py @@ -1,6 +1,27 @@ +import time + import tinyio +def test_sleep(): + outs = [] + + def f(): + start = time.monotonic() + yield [tinyio.sleep(0.05), tinyio.sleep(0.1)] + actual_duration = time.monotonic() - start + # Note that these are pretty inaccurate tolerances! This is about what we get with `asyncio` too. + # The reason for this seems to be the accuracy in the `threading.Event.wait()` that we bottom out in. If we need + # greater resolution than this then we could do that by using a busy-loop for the last 1e-2 seconds. + success = 0.09 < actual_duration < 0.11 + outs.append(success) + + loop = tinyio.Loop() + for _ in range(5): + loop.run(f()) + assert sum(outs) >= 4 # We allow one failure, to decrease flakiness. + + def _sleep(x): yield tinyio.sleep(x) return 3 diff --git a/tinyio/_core.py b/tinyio/_core.py index 8007506..0ee91f6 100644 --- a/tinyio/_core.py +++ b/tinyio/_core.py @@ -86,11 +86,9 @@ class _WaitState(enum.Enum): class _Wait: def __init__(self, event: Event, timeout_in_seconds: None | int | float): self._event = event + self._timeout_in_seconds = timeout_in_seconds self._waiting_for = None self.state = _WaitState.INITIALISED - if timeout_in_seconds is not None: - timeout_in_seconds = time.monotonic() + timeout_in_seconds - self.timeout_in_seconds = timeout_in_seconds # This is basically just a second `__init__` method. We're not really initialised until this has been called # precisely once as well. The reason we have two is that an end-user creates us during `Event.wait()`, and then we @@ -106,6 +104,10 @@ def register(self, waiting_for: "_WaitingFor") -> None: assert self._waiting_for is None assert self._event is not None self.state = _WaitState.REGISTERED + if self._timeout_in_seconds is None: + self.timeout_in_seconds = None + else: + self.timeout_in_seconds = time.monotonic() + self._timeout_in_seconds self._waiting_for = waiting_for self._event._waits[self] = None if self._event.is_set(): From 76360e20d254a211d82e8f92ada9dbd35ec969f2 Mon Sep 17 00:00:00 2001 From: Patrick Kidger <33688385+patrick-kidger@users.noreply.github.com> Date: Sat, 2 Aug 2025 16:08:53 +0200 Subject: [PATCH 5/9] Rename test_basic -> test_core --- tests/{test_basic.py => test_core.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/{test_basic.py => test_core.py} (100%) diff --git a/tests/test_basic.py b/tests/test_core.py similarity index 100% rename from tests/test_basic.py rename to tests/test_core.py From 3c8a5efe9eddab8ebba9f412ffaa0546623a4913 Mon Sep 17 00:00:00 2001 From: Patrick Kidger <33688385+patrick-kidger@users.noreply.github.com> Date: Sat, 2 Aug 2025 16:19:01 +0200 Subject: [PATCH 6/9] Handle multiple simultaneous timeouts fairly --- tests/test_core.py | 40 ++++++++++++++++++++++++++++++++++++++++ tinyio/_core.py | 42 +++++++++++++++++++++++++++++------------- 2 files changed, 69 insertions(+), 13 deletions(-) diff --git a/tests/test_core.py b/tests/test_core.py index c22f2cf..9c4df9d 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -276,3 +276,43 @@ def _gc(x: int) -> tinyio.Coro[tuple[int, int]]: assert loop.run(coro) == (5, 5) gc.collect() assert set(loop._results.keys()) == {coro} + + +def test_event_fairness(): + """This checks that once one event unblocks, that we don't just keep chasing all the stuff downstream of that event, + i.e. that we do schedule work from any other event that has finished. + """ + outs = [] + + def f(): + yield tinyio.Event().wait(0) + outs.append(1) + for _ in range(20): + yield + outs.append(2) + + def g(): + yield [f(), f()] + + loop = tinyio.Loop() + loop.run(g()) + assert outs == [1, 1, 2, 2] + + +def test_event_fairness2(): + event1 = tinyio.Event() + outs = [] + + def f(): + yield event1.wait(0) + outs.append(1) + + def g(): + yield {f()} + for _ in range(20): + yield + outs.append(2) + + loop = tinyio.Loop() + loop.run(g()) + assert outs == [1, 2] diff --git a/tinyio/_core.py b/tinyio/_core.py index 0ee91f6..f56ab4e 100644 --- a/tinyio/_core.py +++ b/tinyio/_core.py @@ -242,7 +242,7 @@ def gen(coro=coro): queue.appendleft(_Todo(coro, None)) waiting_on = dict[Coro, list[_WaitingFor]]() waiting_on[coro] = [] - wait_heap = [] + wait_heap: list[_Wait] = [] # Loop invariant: `{x.coro for x in queue}.issubset(set(waiting_on.keys()))` wake_loop = threading.Event() wake_loop.set() @@ -260,18 +260,8 @@ def gen(coro=coro): self._check_cycle(waiting_on, coro) # ...but hopefully we're just waiting on a thread or exogeneous event to unblock one of our # coroutines. - timeout = None - wait = None - while len(wait_heap) > 0: - wait = heapq.heappop(wait_heap) - if wait.state != _WaitState.DONE: - timeout = wait.timeout_in_seconds - time.monotonic() - break - not_timed_out = wake_loop.wait(timeout=timeout) - if not not_timed_out: - cast(_Wait, wait).notify_from_timeout() - del timeout, wait, not_timed_out - wake_loop.clear() + self._wait(wait_heap, wake_loop) + self._clear(wait_heap, wake_loop) todo = queue.pop() current_coro_ref[0] = todo.coro self._step(todo, queue, waiting_on, wait_heap, wake_loop) @@ -292,6 +282,32 @@ def _check_cycle(self, waiting_on, coro): except graphlib.CycleError: coro.throw(RuntimeError("Cycle detected in `tinyio` loop. Cancelling all coroutines.")) + def _wait(self, wait_heap: list[_Wait], wake_loop: threading.Event): + timeout = None + while len(wait_heap) > 0: + soonest = wait_heap[0] + assert soonest.timeout_in_seconds is not None + if soonest.state == _WaitState.DONE: + heapq.heappop(wait_heap) + else: + timeout = soonest.timeout_in_seconds - time.monotonic() + break + wake_loop.wait(timeout=timeout) + + def _clear(self, wait_heap: list[_Wait], wake_loop: threading.Event): + wake_loop.clear() + now = time.monotonic() + while len(wait_heap) > 0: + soonest = wait_heap[0] + assert soonest.timeout_in_seconds is not None + if soonest.state == _WaitState.DONE: + heapq.heappop(wait_heap) + elif soonest.timeout_in_seconds < now: + heapq.heappop(wait_heap) + soonest.notify_from_timeout() + else: + break + def _step( self, todo: _Todo, From 984a102a20f6db216737d2e283431a04af769ab6 Mon Sep 17 00:00:00 2001 From: Patrick Kidger <33688385+patrick-kidger@users.noreply.github.com> Date: Sat, 2 Aug 2025 16:30:21 +0200 Subject: [PATCH 7/9] Several edge-case Event fixes --- tests/test_core.py | 123 +++++++++++++++++++++++++++++++++++++++++++++ tinyio/_core.py | 37 ++++++++++---- 2 files changed, 149 insertions(+), 11 deletions(-) diff --git a/tests/test_core.py b/tests/test_core.py index 9c4df9d..e753c90 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,5 +1,7 @@ import gc +import threading import time +from typing import Any import pytest import tinyio @@ -316,3 +318,124 @@ def g(): loop = tinyio.Loop() loop.run(g()) assert outs == [1, 2] + + +def test_simultaneous_set(): + event = tinyio.Event() + + def f(): + for _ in range(20): + yield + yield [tinyio.run_in_thread(event.set) for _ in range(100)] + + def g(): + yield event.wait() + + def h(): + yield [g(), f()] + + loop = tinyio.Loop() + loop.run(h()) + + +def test_timeout_then_set(): + event1 = tinyio.Event() + event2 = tinyio.Event() + + def f(): + yield [event1.wait(0), event2.wait()] + + def g(): + yield {f()} + for _ in range(20): + yield + event1.set() + for _ in range(20): + yield + event2.set() + return 3 + + loop = tinyio.Loop() + assert loop.run(g()) == 3 + + +def test_set_then_timeout(): + event1 = tinyio.Event() + event2 = tinyio.Event() + + def f(): + event1.set() + yield [event1.wait(0), event2.wait()] + + def g(): + yield {f()} + for _ in range(20): + yield + event2.set() + return 3 + + loop = tinyio.Loop() + assert loop.run(g()) == 3 + + +def test_set_then_timeout_then_clear(): + event1 = tinyio.Event() + event2 = tinyio.Event() + + def f(): + event1.set() + yield [event1.wait(0), event2.wait()] + + def g(): + yield {f()} + for _ in range(20): + yield + event1.clear() + event2.set() + return 3 + + loop = tinyio.Loop() + assert loop.run(g()) == 3 + + +def test_set_then_timeout_then_clear_then_set(): + event1 = tinyio.Event() + event2 = tinyio.Event() + + def f(): + event1.set() + yield [event1.wait(0), event2.wait()] + + def g(): + yield {f()} + for _ in range(20): + yield + event1.clear() + event2.set() + event1.set() + return 3 + + loop = tinyio.Loop() + assert loop.run(g()) == 3 + + +def test_timeout_as_part_of_group_and_only_coroutine(): + event1 = tinyio.Event() + event2 = tinyio.Event() + wait: Any = event1.wait(0) + wait2 = event2.wait() + + def f(): + yield [wait, wait2] + return 3 + + def set2(): + while wait.state != tinyio._core._WaitState.NOTIFIED_TIMEOUT: + pass + time.sleep(0.1) + event2.set() + + t = threading.Thread(target=set2) + t.start() + loop = tinyio.Loop() + assert loop.run(f()) == 3 diff --git a/tinyio/_core.py b/tinyio/_core.py index f56ab4e..1dc24cf 100644 --- a/tinyio/_core.py +++ b/tinyio/_core.py @@ -115,25 +115,33 @@ def register(self, waiting_for: "_WaitingFor") -> None: def notify_from_event(self): with _global_event_lock: - assert self.state is _WaitState.REGISTERED + # We cannot have `NOTIFIED_EVENT` as our event will have toggled its internal state to `True` as part of + # calling us, and so future `Event.set()` calls will not call `.notify_from_event`. + # We cannot have `DONE` as this is only set during `.cleanup()`, and at that point we deregister from + # `self._event._waits`. + assert self.state in {_WaitState.REGISTERED, _WaitState.NOTIFIED_TIMEOUT} assert self._waiting_for is not None - self.state = _WaitState.NOTIFIED_EVENT - self._waiting_for.decrement() + if self.state == _WaitState.REGISTERED: + self.state = _WaitState.NOTIFIED_EVENT + self._waiting_for.decrement() def notify_from_timeout(self): with _global_event_lock: - assert self.state is _WaitState.REGISTERED + assert self.state in {_WaitState.REGISTERED, _WaitState.NOTIFIED_EVENT} assert self._waiting_for is not None - self.state = _WaitState.NOTIFIED_TIMEOUT - self._waiting_for.decrement() + is_registered = self.state == _WaitState.REGISTERED + self.state = _WaitState.NOTIFIED_TIMEOUT # Override `NOTIFIED_EVENT` in case we `unnotify_from_event` later + if is_registered: + self._waiting_for.decrement() def unnotify_from_event(self): with _global_event_lock: assert self.state in {_WaitState.NOTIFIED_EVENT, _WaitState.NOTIFIED_TIMEOUT} assert self._waiting_for is not None + # But ignore un-notifies if we've already triggered our timeout. if self.state is _WaitState.NOTIFIED_EVENT: + self.state = _WaitState.REGISTERED self._waiting_for.increment() - # But ignore un-notifies if we've already triggered our timeout. def cleanup(self): with _global_event_lock: @@ -145,6 +153,7 @@ def cleanup(self): del self._event._waits[self] self._event = None # For GC purposes. + # For `heapq` to work. def __lt__(self, other): return self.timeout_in_seconds < other.timeout_in_seconds @@ -260,8 +269,15 @@ def gen(coro=coro): self._check_cycle(waiting_on, coro) # ...but hopefully we're just waiting on a thread or exogeneous event to unblock one of our # coroutines. - self._wait(wait_heap, wake_loop) - self._clear(wait_heap, wake_loop) + while len(queue) == 0: + self._wait(wait_heap, wake_loop) + self._clear(wait_heap, wake_loop) + # This whole block needs to be wrapped in a `len(queue)` check, as just because we've + # unblocked doesn't necessarily mean that we're ready to schedule a coroutine: we could have + # something like `yield [event1.wait(...), event2.wait(...)]`, and only one of the two has + # unblocked. + else: + self._clear(wait_heap, wake_loop) todo = queue.pop() current_coro_ref[0] = todo.coro self._step(todo, queue, waiting_on, wait_heap, wake_loop) @@ -296,13 +312,12 @@ def _wait(self, wait_heap: list[_Wait], wake_loop: threading.Event): def _clear(self, wait_heap: list[_Wait], wake_loop: threading.Event): wake_loop.clear() - now = time.monotonic() while len(wait_heap) > 0: soonest = wait_heap[0] assert soonest.timeout_in_seconds is not None if soonest.state == _WaitState.DONE: heapq.heappop(wait_heap) - elif soonest.timeout_in_seconds < now: + elif soonest.timeout_in_seconds <= time.monotonic(): heapq.heappop(wait_heap) soonest.notify_from_timeout() else: From 40266e7940da3748d6efe1db38a18203da4a8df6 Mon Sep 17 00:00:00 2001 From: Patrick Kidger <33688385+patrick-kidger@users.noreply.github.com> Date: Sat, 2 Aug 2025 17:45:24 +0200 Subject: [PATCH 8/9] Reorder contents of _core.py --- tinyio/_core.py | 394 ++++++++++++++++++++++++------------------------ 1 file changed, 201 insertions(+), 193 deletions(-) diff --git a/tinyio/_core.py b/tinyio/_core.py index 1dc24cf..69e8b5e 100644 --- a/tinyio/_core.py +++ b/tinyio/_core.py @@ -14,12 +14,13 @@ # -# Loop implementation +# Public API: loop implementation # -# The main logic is that coroutines produce `_WaitingFor` objects which schedule them back on the loop once all the -# coroutines they are waiting on have notified them of completion. -# In addition we special-case `Event`s in the loop, so that threads can use them to notify the loop of completion, -# without the loop needing to poll for this. +# The main logic is that each time coroutine yields, we create a `_WaitingFor` object which holds a counter for how many +# things it is waiting on before it can wake up. Once this counter hits zero, the `_WaitingFor` object schedules the +# coroutine back on the loop. +# Counters can be decremented in three ways: another coroutine finishes, an `Event.set()` is triggered, or a timeout in +# `Event.wait(timeout=...)` is triggered. # @@ -27,186 +28,6 @@ Coro: TypeAlias = Generator[Any, Any, _Return] -@dataclasses.dataclass(frozen=True) -class _Todo: - coro: Coro - value: Any - - -# We need at least some use of locks, as `Event`s are public objects that may interact with user threads. If the -# internals of our event/wait/waitingfor mechanisms are modified concurrently then it would be very easy for things to -# go wrong. -# In particular note that our event loop is one actor that is making modifications, in addition to user threads. -# For this reason it doesn't suffice to just have a lock around `Event.{set, clear}`. -# For simplicity, we simply guard all entries into the event/wait/waitingfor mechanism with a single lock. We could try -# to use some other locking strategy but that seems error-prone. -_global_event_lock = threading.RLock() - - -class Event: - """A marker that something has happened.""" - - def __init__(self): - self._value = False - self._waits = dict[_Wait, None]() - - def is_set(self): - return self._value - - def set(self): - with _global_event_lock: - if not self._value: - for wait in self._waits.copy().keys(): - wait.notify_from_event() - self._value = True - - def clear(self): - with _global_event_lock: - if self._value: - for wait in self._waits.keys(): - wait.unnotify_from_event() - self._value = False - - def wait(self, timeout_in_seconds: None | int | float = None) -> Coro[None]: - # Lie about the return type, this is an implementation detail that should otherwise feel like a coroutine. - return _Wait(self, timeout_in_seconds) # pyright: ignore[reportReturnType] - - def __bool__(self): - raise TypeError("Cannot convert `tinyio.Event` to boolean. Did you mean `event.is_set()`?") - - -class _WaitState(enum.Enum): - INITIALISED = "initialised" - REGISTERED = "registered" - NOTIFIED_EVENT = "notified_event" - NOTIFIED_TIMEOUT = "notified_timeout" - DONE = "done" - - -class _Wait: - def __init__(self, event: Event, timeout_in_seconds: None | int | float): - self._event = event - self._timeout_in_seconds = timeout_in_seconds - self._waiting_for = None - self.state = _WaitState.INITIALISED - - # This is basically just a second `__init__` method. We're not really initialised until this has been called - # precisely once as well. The reason we have two is that an end-user creates us during `Event.wait()`, and then we - # need to register on the event loop. - def register(self, waiting_for: "_WaitingFor") -> None: - with _global_event_lock: - if self.state is not _WaitState.INITIALISED: - waiting_for.coro.throw( - RuntimeError( - "Do not yield the same `event.wait()` multiple times. Make a new `.wait()` call instead." - ) - ) - assert self._waiting_for is None - assert self._event is not None - self.state = _WaitState.REGISTERED - if self._timeout_in_seconds is None: - self.timeout_in_seconds = None - else: - self.timeout_in_seconds = time.monotonic() + self._timeout_in_seconds - self._waiting_for = waiting_for - self._event._waits[self] = None - if self._event.is_set(): - self.notify_from_event() - - def notify_from_event(self): - with _global_event_lock: - # We cannot have `NOTIFIED_EVENT` as our event will have toggled its internal state to `True` as part of - # calling us, and so future `Event.set()` calls will not call `.notify_from_event`. - # We cannot have `DONE` as this is only set during `.cleanup()`, and at that point we deregister from - # `self._event._waits`. - assert self.state in {_WaitState.REGISTERED, _WaitState.NOTIFIED_TIMEOUT} - assert self._waiting_for is not None - if self.state == _WaitState.REGISTERED: - self.state = _WaitState.NOTIFIED_EVENT - self._waiting_for.decrement() - - def notify_from_timeout(self): - with _global_event_lock: - assert self.state in {_WaitState.REGISTERED, _WaitState.NOTIFIED_EVENT} - assert self._waiting_for is not None - is_registered = self.state == _WaitState.REGISTERED - self.state = _WaitState.NOTIFIED_TIMEOUT # Override `NOTIFIED_EVENT` in case we `unnotify_from_event` later - if is_registered: - self._waiting_for.decrement() - - def unnotify_from_event(self): - with _global_event_lock: - assert self.state in {_WaitState.NOTIFIED_EVENT, _WaitState.NOTIFIED_TIMEOUT} - assert self._waiting_for is not None - # But ignore un-notifies if we've already triggered our timeout. - if self.state is _WaitState.NOTIFIED_EVENT: - self.state = _WaitState.REGISTERED - self._waiting_for.increment() - - def cleanup(self): - with _global_event_lock: - assert self.state in {_WaitState.NOTIFIED_EVENT, _WaitState.NOTIFIED_TIMEOUT} - assert self._waiting_for is not None - assert self._event is not None - self.state = _WaitState.DONE - self._waiting_for = None - del self._event._waits[self] - self._event = None # For GC purposes. - - # For `heapq` to work. - def __lt__(self, other): - return self.timeout_in_seconds < other.timeout_in_seconds - - -@dataclasses.dataclass(frozen=False) -class _WaitingFor: - counter: int - coro: Coro - out: _Wait | Coro | list[_Wait | Coro] - wake_loop: threading.Event - results: weakref.WeakKeyDictionary[Coro, Any] - queue: co.deque[_Todo] - - def __post_init__(self): - assert self.counter > 0 - - def increment(self): - with _global_event_lock: - # This assert is valid as our only caller is `_Wait.unnotify_from_event`, which will only have a reference - # to us if we haven't completed yet -- otherwise we'd have already called its `_Wait.cleanup` method. - assert self.counter != 0 - self.counter += 1 - - def decrement(self): - # We need a lock here as this may be called simultaneously between our event loop and via `Event.set`. - # (Though `Event.set` has its only internal lock, that doesn't cover the event loop as well.) - with _global_event_lock: - assert self.counter > 0 - self.counter -= 1 - if self.counter == 0: - match self.out: - case None: - result = None - waits = [] - case _Wait(): - result = None - waits = [self.out] - case Generator(): - result = self.results[self.out] - waits = [] - case list(): - result = [None if isinstance(out_i, _Wait) else self.results[out_i] for out_i in self.out] - waits = [out_i for out_i in self.out if isinstance(out_i, _Wait)] - case _: - assert False - for wait in waits: - wait.cleanup() - self.queue.appendleft(_Todo(self.coro, result)) - # If we're callling this function from a thread, and the main event loop is blocked, then use this to - # notify the main event loop that it can wake up. - self.wake_loop.set() - - class Loop: """Event loop for running `tinyio`-style coroutines.""" @@ -287,8 +108,8 @@ def gen(coro=coro): raise # if not raising an `exception_group` return self._results[coro] - def _check_cycle(self, waiting_on, coro): - del self + @staticmethod + def _check_cycle(waiting_on, coro): sorter = graphlib.TopologicalSorter() for k, v in waiting_on.items(): for vi in v: @@ -298,7 +119,8 @@ def _check_cycle(self, waiting_on, coro): except graphlib.CycleError: coro.throw(RuntimeError("Cycle detected in `tinyio` loop. Cancelling all coroutines.")) - def _wait(self, wait_heap: list[_Wait], wake_loop: threading.Event): + @staticmethod + def _wait(wait_heap: list["_Wait"], wake_loop: threading.Event): timeout = None while len(wait_heap) > 0: soonest = wait_heap[0] @@ -310,7 +132,8 @@ def _wait(self, wait_heap: list[_Wait], wake_loop: threading.Event): break wake_loop.wait(timeout=timeout) - def _clear(self, wait_heap: list[_Wait], wake_loop: threading.Event): + @staticmethod + def _clear(wait_heap: list["_Wait"], wake_loop: threading.Event): wake_loop.clear() while len(wait_heap) > 0: soonest = wait_heap[0] @@ -325,10 +148,10 @@ def _clear(self, wait_heap: list[_Wait], wake_loop: threading.Event): def _step( self, - todo: _Todo, - queue: co.deque[_Todo], - waiting_on: dict[Coro, list[_WaitingFor]], - wait_heap: list[_Wait], + todo: "_Todo", + queue: co.deque["_Todo"], + waiting_on: dict[Coro, list["_WaitingFor"]], + wait_heap: list["_Wait"], wake_loop: threading.Event, ) -> None: try: @@ -394,6 +217,191 @@ class CancelledError(BaseException): CancelledError.__module__ = "tinyio" +# +# Loop internals, in particular events and waiting +# + + +@dataclasses.dataclass(frozen=True) +class _Todo: + coro: Coro + value: Any + + +# We need at least some use of locks, as `Event`s are public objects that may interact with user threads. If the +# internals of our event/wait/waitingfor mechanisms are modified concurrently then it would be very easy for things to +# go wrong. +# In particular note that our event loop is one actor that is making modifications, in addition to user threads. +# For this reason it doesn't suffice to just have a lock around `Event.{set, clear}`. +# For simplicity, we simply guard all entries into the event/wait/waitingfor mechanism with a single lock. We could try +# to use some other locking strategy but that seems error-prone. +_global_event_lock = threading.RLock() + + +@dataclasses.dataclass(frozen=False) +class _WaitingFor: + counter: int + coro: Coro + out: "_Wait | Coro | list[_Wait | Coro]" + wake_loop: threading.Event + results: weakref.WeakKeyDictionary[Coro, Any] + queue: co.deque[_Todo] + + def __post_init__(self): + assert self.counter > 0 + + def increment(self): + with _global_event_lock: + # This assert is valid as our only caller is `_Wait.unnotify_from_event`, which will only have a reference + # to us if we haven't completed yet -- otherwise we'd have already called its `_Wait.cleanup` method. + assert self.counter != 0 + self.counter += 1 + + def decrement(self): + # We need a lock here as this may be called simultaneously between our event loop and via `Event.set`. + # (Though `Event.set` has its only internal lock, that doesn't cover the event loop as well.) + with _global_event_lock: + assert self.counter > 0 + self.counter -= 1 + if self.counter == 0: + match self.out: + case None: + result = None + waits = [] + case _Wait(): + result = None + waits = [self.out] + case Generator(): + result = self.results[self.out] + waits = [] + case list(): + result = [None if isinstance(out_i, _Wait) else self.results[out_i] for out_i in self.out] + waits = [out_i for out_i in self.out if isinstance(out_i, _Wait)] + case _: + assert False + for wait in waits: + wait.cleanup() + self.queue.appendleft(_Todo(self.coro, result)) + # If we're callling this function from a thread, and the main event loop is blocked, then use this to + # notify the main event loop that it can wake up. + self.wake_loop.set() + + +class _WaitState(enum.Enum): + INITIALISED = "initialised" + REGISTERED = "registered" + NOTIFIED_EVENT = "notified_event" + NOTIFIED_TIMEOUT = "notified_timeout" + DONE = "done" + + +class _Wait: + def __init__(self, event: "Event", timeout_in_seconds: None | int | float): + self._event = event + self._timeout_in_seconds = timeout_in_seconds + self._waiting_for = None + self.state = _WaitState.INITIALISED + + # This is basically just a second `__init__` method. We're not really initialised until this has been called + # precisely once as well. The reason we have two is that an end-user creates us during `Event.wait()`, and then we + # need to register on the event loop. + def register(self, waiting_for: "_WaitingFor") -> None: + with _global_event_lock: + if self.state is not _WaitState.INITIALISED: + waiting_for.coro.throw( + RuntimeError( + "Do not yield the same `event.wait()` multiple times. Make a new `.wait()` call instead." + ) + ) + assert self._waiting_for is None + assert self._event is not None + self.state = _WaitState.REGISTERED + if self._timeout_in_seconds is None: + self.timeout_in_seconds = None + else: + self.timeout_in_seconds = time.monotonic() + self._timeout_in_seconds + self._waiting_for = waiting_for + self._event._waits[self] = None + if self._event.is_set(): + self.notify_from_event() + + def notify_from_event(self): + with _global_event_lock: + # We cannot have `NOTIFIED_EVENT` as our event will have toggled its internal state to `True` as part of + # calling us, and so future `Event.set()` calls will not call `.notify_from_event`. + # We cannot have `DONE` as this is only set during `.cleanup()`, and at that point we deregister from + # `self._event._waits`. + assert self.state in {_WaitState.REGISTERED, _WaitState.NOTIFIED_TIMEOUT} + assert self._waiting_for is not None + if self.state == _WaitState.REGISTERED: + self.state = _WaitState.NOTIFIED_EVENT + self._waiting_for.decrement() + + def notify_from_timeout(self): + with _global_event_lock: + assert self.state in {_WaitState.REGISTERED, _WaitState.NOTIFIED_EVENT} + assert self._waiting_for is not None + is_registered = self.state == _WaitState.REGISTERED + self.state = _WaitState.NOTIFIED_TIMEOUT # Override `NOTIFIED_EVENT` in case we `unnotify_from_event` later + if is_registered: + self._waiting_for.decrement() + + def unnotify_from_event(self): + with _global_event_lock: + assert self.state in {_WaitState.NOTIFIED_EVENT, _WaitState.NOTIFIED_TIMEOUT} + assert self._waiting_for is not None + # But ignore un-notifies if we've already triggered our timeout. + if self.state is _WaitState.NOTIFIED_EVENT: + self.state = _WaitState.REGISTERED + self._waiting_for.increment() + + def cleanup(self): + with _global_event_lock: + assert self.state in {_WaitState.NOTIFIED_EVENT, _WaitState.NOTIFIED_TIMEOUT} + assert self._waiting_for is not None + assert self._event is not None + self.state = _WaitState.DONE + self._waiting_for = None + del self._event._waits[self] + self._event = None # For GC purposes. + + # For `heapq` to work. + def __lt__(self, other): + return self.timeout_in_seconds < other.timeout_in_seconds + + +class Event: + """A marker that something has happened.""" + + def __init__(self): + self._value = False + self._waits = dict[_Wait, None]() + + def is_set(self): + return self._value + + def set(self): + with _global_event_lock: + if not self._value: + for wait in self._waits.copy().keys(): + wait.notify_from_event() + self._value = True + + def clear(self): + with _global_event_lock: + if self._value: + for wait in self._waits.keys(): + wait.unnotify_from_event() + self._value = False + + def wait(self, timeout_in_seconds: None | int | float = None) -> Coro[None]: + # Lie about the return type, this is an implementation detail that should otherwise feel like a coroutine. + return _Wait(self, timeout_in_seconds) # pyright: ignore[reportReturnType] + + def __bool__(self): + raise TypeError("Cannot convert `tinyio.Event` to boolean. Did you mean `event.is_set()`?") + + # # Error handling # From 8701778ab60f55cb06208fafaccaed28819caa26 Mon Sep 17 00:00:00 2001 From: Patrick Kidger <33688385+patrick-kidger@users.noreply.github.com> Date: Sat, 2 Aug 2025 17:46:10 +0200 Subject: [PATCH 9/9] Bump the number to, uh, 300 lines. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 27d246d..d8123ce 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@