Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<h1 align="center">tinyio</h1>
<h2 align="center">A tiny (~300 lines) event loop for Python</h2>
<h2 align="center">A tiny (~400 lines) event loop for Python</h2>
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

D:

we started at 200, what is the world coming to.

Maybe it's time to rename to mediumio


_Ever used `asyncio` and wished you hadn't?_

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ name = "tinyio"
readme = "README.md"
requires-python = ">=3.11"
urls = {repository = "https://github.com/patrick-kidger/tinyio"}
version = "0.2.1"
version = "0.3.0"

[project.optional-dependencies]
dev = ["pre-commit"]
Expand Down
51 changes: 51 additions & 0 deletions tests/test_errors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import os
import pickle
import signal
import threading
import time

import pytest
Expand Down Expand Up @@ -368,3 +371,51 @@ def _h():
keyboard = catcher.value
assert type(keyboard) is KeyboardInterrupt
assert _flat_tb(keyboard) == ["test_keyboard_interrupt_within_loop", "_f", "_g", "_invalid"]


@pytest.mark.parametrize("exception_group", (None, False, True))
def test_keyboard_interrupt_while_waiting(exception_group):
"""Tests an error occurring from within the loop itself while waiting for a thread."""

# This sends a keyboard interrupt signal to the main thread
# as soon as the main thread has started the loop.
ev = threading.Event()

def foo():
ev.wait()
os.kill(os.getpid(), signal.SIGINT)

t = threading.Thread(target=foo)
t.start()

# This simulates work in a blocking thread
did_cleanup = False

def _blocking_work():
nonlocal did_cleanup
try:
ev.set()
for _ in range(20):
time.sleep(0.1)
except tinyio.CancelledError:
did_cleanup = True
raise

# The tinyio entry point that will wait for the blocking work
def _f():
yield [tinyio.run_in_thread(_blocking_work)]

loop = tinyio.Loop()
with pytest.raises(BaseException) as catcher:
loop.run(_f(), exception_group)

t.join()

assert did_cleanup
if exception_group is True:
assert type(catcher.value) is BaseExceptionGroup
[keyboard] = catcher.value.exceptions
assert type(keyboard) is KeyboardInterrupt
elif exception_group is None:
keyboard = catcher.value
assert type(keyboard) is KeyboardInterrupt
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,22 @@ async def f():
return out

assert asyncio.run(f()) == 9


def test_other_asyncio_can_run():
event = tinyio.Event()

def _add_one(x: int) -> tinyio.Coro[int]:
yield event.wait()
return x + 1

async def g():
for _ in range(20):
await asyncio.sleep(0)
event.set()

async def f():
await asyncio.gather(g(), tinyio.to_asyncio(_add_one(1)))
return 5

assert asyncio.run(f()) == 5
21 changes: 21 additions & 0 deletions tests/test_trio_integration.py → tests/test_integration_trio.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,24 @@ async def f():
return out

assert trio.run(f) == 9


def test_other_trio_can_run():
event = tinyio.Event()

def _add_one(x: int) -> tinyio.Coro[int]:
yield event.wait()
return x + 1

async def g():
for _ in range(20):
await trio.sleep(0)
event.set()

async def f():
async with trio.open_nursery() as n:
n.start_soon(g)
n.start_soon(lambda: tinyio.to_trio(_add_one(1)))
return 5

assert trio.run(f) == 5
153 changes: 77 additions & 76 deletions tinyio/_core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import collections as co
import contextlib
import dataclasses
import enum
import graphlib
Expand All @@ -12,6 +13,8 @@
from collections.abc import Callable, Generator
from typing import Any, TypeAlias, TypeVar

from ._utils import EventWithFileno, SimpleContextManager


#
# Public API: loop implementation
Expand Down Expand Up @@ -61,25 +64,23 @@ def run(self, coro: Coro[_Return], exception_group: None | bool = None) -> _Retu

The final `return` from `coro`.
"""
gen = self.runtime(coro, exception_group)
while True:
try:
wait = next(gen)
except StopIteration as e:
return e.value
except BaseException as e:
_strip_frames(e, 1)
raise
if wait is not None:
wait()
with self.runtime(coro, exception_group) as gen:
while True:
try:
wait = next(gen)
except StopIteration as e:
return e.value
if wait is not None:
wait()

def runtime(
self, coro: Coro[_Return], exception_group: None | bool
) -> Generator[None | Callable[[], None], None, _Return]:
) -> contextlib.AbstractContextManager[Generator[None | Callable[[], None], None, _Return]]:
"""The generator for driving the event loop. This is low-level functionality that makes it possible to iterate
the loop by just a single step at a time. This is typically useful for integrating with another event loop.

See `tinyio.Loop.run` for an example of how to iterate through this until completion.
See the source code for `tinyio.Loop.run`, or `tinyio.to_asyncio`, for an example of how to iterate through this
until completion.

Yields `None` after each step to cede control, or a callable indicating the loop is blocked waiting for an event
or timeout.
Expand All @@ -89,73 +90,72 @@ def runtime(
if self._running:
raise RuntimeError("Cannot call `tinyio.Loop().run` whilst the loop is currently running.")
self._running = True
try:
return (yield from self._runtime(coro, exception_group))
except BaseException as e:
_strip_frames(e, 1)
raise
finally:
wake_loop = EventWithFileno()
wake_loop.set()
waiting_on = dict[Coro, list[_WaitingFor]]()
waiting_on[coro] = []
current_coro_ref = [coro]

enter = self._runtime(coro, waiting_on, current_coro_ref, wake_loop)

def exit(e: None | BaseException):
wake_loop.close()
assert self._running
self._running = False
if e is not None:
_cleanup(e, waiting_on, current_coro_ref[0], exception_group)

return SimpleContextManager(enter, exit)

def _runtime(
self, coro: Coro[_Return], exception_group: None | bool
self,
coro: Coro[_Return],
waiting_on: dict[Coro, list["_WaitingFor"]],
current_coro_ref: list[Coro],
wake_loop: EventWithFileno,
) -> Generator[None | Callable[[], None], None, _Return]:
if coro in self._results.keys():
return self._results[coro]
queue: co.deque[_Todo] = co.deque()
queue.appendleft(_Todo(coro, None))
waiting_on = dict[Coro, list[_WaitingFor]]()
waiting_on[coro] = []
# Loop invariant: `{x.coro for x in queue}.issubset(set(waiting_on.keys()))`
wait_heap: list[_Wait] = []
wake_loop = threading.Event()
wake_loop.set()
current_coro_ref = [coro]
# Loop invariant: `len(current_coro_ref) == 1`. It's not really load-bearing, it's just used for making a nice
# traceback when we get an error.
try:
while True:
if len(queue) == 0:
if len(waiting_on) == 0:
# We're done.
break
else:
# We might have a cycle bug...
self._check_cycle(waiting_on, coro)
# ...but hopefully we're just waiting on a thread or exogeneous event to unblock one of our
# coroutines.
while len(queue) == 0:
timeout = None
while len(wait_heap) > 0:
soonest = wait_heap[0]
assert soonest.timeout_in_seconds is not None
if soonest.state == _WaitState.DONE:
heapq.heappop(wait_heap)
else:
timeout = soonest.timeout_in_seconds - time.monotonic()
break

def wait():
wake_loop.wait(timeout=timeout)

yield wait
self._clear(wait_heap, wake_loop)
# These lines needs to be wrapped in a `len(queue)` check, as just because we've unblocked
# doesn't necessarily mean that we're ready to schedule a coroutine: we could have something
# like `yield [event1.wait(...), event2.wait(...)]`, and only one of the two has unblocked.
while True:
if len(queue) == 0:
if len(waiting_on) == 0:
# We're done.
break
else:
self._clear(wait_heap, wake_loop)
todo = queue.pop()
current_coro_ref[0] = todo.coro
self._step(todo, queue, waiting_on, wait_heap, wake_loop)
yield
current_coro_ref[0] = coro
except BaseException as e:
_cleanup(e, waiting_on, current_coro_ref, exception_group)
raise # if not raising an `exception_group`
out = self._results[coro]
return out
# We might have a cycle bug...
self._check_cycle(waiting_on, coro)
# ...but hopefully we're just waiting on a thread or exogeneous event to unblock one of our
# coroutines.
while len(queue) == 0:
timeout = None
while len(wait_heap) > 0:
soonest = wait_heap[0]
assert soonest.timeout_in_seconds is not None
if soonest.state == _WaitState.DONE:
heapq.heappop(wait_heap)
else:
timeout = soonest.timeout_in_seconds - time.monotonic()
break

def wait():
wake_loop.wait(timeout=timeout)

yield wait
self._clear(wait_heap, wake_loop)
# These lines needs to be wrapped in a `len(queue)` check, as just because we've unblocked
# doesn't necessarily mean that we're ready to schedule a coroutine: we could have something
# like `yield [event1.wait(...), event2.wait(...)]`, and only one of the two has unblocked.
else:
self._clear(wait_heap, wake_loop)
todo = queue.pop()
current_coro_ref[0] = todo.coro
self._step(todo, queue, waiting_on, wait_heap, wake_loop)
yield
return self._results[coro]

@staticmethod
def _check_cycle(waiting_on, coro):
Expand All @@ -169,7 +169,7 @@ def _check_cycle(waiting_on, coro):
coro.throw(RuntimeError("Cycle detected in `tinyio` loop. Cancelling all coroutines."))

@staticmethod
def _clear(wait_heap: list["_Wait"], wake_loop: threading.Event):
def _clear(wait_heap: list["_Wait"], wake_loop: EventWithFileno):
wake_loop.clear()
while len(wait_heap) > 0:
soonest = wait_heap[0]
Expand All @@ -188,7 +188,7 @@ def _step(
queue: co.deque["_Todo"],
waiting_on: dict[Coro, list["_WaitingFor"]],
wait_heap: list["_Wait"],
wake_loop: threading.Event,
wake_loop: EventWithFileno,
) -> None:
try:
out = todo.coro.send(todo.value)
Expand Down Expand Up @@ -275,7 +275,7 @@ class _WaitingFor:
counter: int
coro: Coro
out: "_Wait | Coro | list[_Wait | Coro]"
wake_loop: threading.Event
wake_loop: EventWithFileno
results: weakref.WeakKeyDictionary[Coro, Any]
queue: co.deque[_Todo]

Expand Down Expand Up @@ -444,13 +444,13 @@ def _strip_frames(e: BaseException, n: int):
def _cleanup(
base_e: BaseException,
waiting_on: dict[Coro, list[_WaitingFor]],
current_coro_ref: list[Coro],
current_coro: Coro,
exception_group: None | bool,
):
# Oh no! Time to shut everything down. We can get here in two different ways:
# - One of our coroutines raised an error internally (including being interrupted with a `KeyboardInterrupt`).
# - An exogenous `KeyboardInterrupt` occurred whilst we were within the loop itself.
[current_coro] = current_coro_ref

# First, stop all the coroutines.
cancellation_errors: dict[Coro, BaseException] = {}
other_errors: dict[Coro, BaseException] = {}
Expand Down Expand Up @@ -497,11 +497,12 @@ def _cleanup(
else:
module_e = tb.tb_frame.f_globals.get("__name__", "")
if not module_e.startswith("tinyio."):
# 2 skipped frames:
# 3 skipped frames:
# `self.run`
# `self._runtime`
# `self._step`
# either `coro.throw(...)` or `todo.coro.send(todo.value)`
# Don't skip them if the error was an internal error in tinyio, or a KeyboardInterrupt.
_strip_frames(base_e, 2) # pyright: ignore[reportPossiblyUnboundVariable]
_strip_frames(base_e, 3) # pyright: ignore[reportPossiblyUnboundVariable]
# Next: bit of a heuristic, but it is pretty common to only have one thing waiting on you, so stitch together
# their tracebacks as far as we can. Thinking about specifically `current_coro`:
#
Expand Down
Loading