Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abort other fetches when resolution fails #111

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions micropip/_compat_in_pyodide.py
Copy link
Sponsor Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that the original implementation can only abort in await pyfetch(...) stage, but we need to abort await response.bytes() too, so I changed the approach.

Now I used a decorator to dependency-inject an AbortSignal into the decorated function, and pass that to the call to pyfetch. After decorating, the signature of them are the same as before. But maybe this needs a re-review.

Copy link
Sponsor Member Author

@CNSeniorious000 CNSeniorious000 Jun 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I still changed fetch_bytes and fetch_string_and_headers. Maybe you think that isn't elegant.

In fact, through a more hacking way, it is possible to only decorate it, without dependency injection, and no longer need to pass signal=signal in pyfetch themselves, which enables this decorater to be used elsewhere too. (Maybe after #112, there would be more resolution/downloading implementations, and they can simply decorate their fetching function with this decorator to ensure the aborting-when-cancelled behavior)

In the _abort_on_cancel decorator, replace the input function's __locals__ with a ChainMap. In that context, insert a _signal into that namespace, and replace the pyfetch in that namespace to partial(pyfetch, signal=_signal). Then we can simplify the patching code:

@_abort_on_cancel
async def fetch_string_and_headers(
-   signal: AbortSignal, url: str, kwargs: dict[str, str]
+   url: str, kwargs: dict[str, str]
) -> tuple[str, dict[str, str]]:
-   response = await pyfetch(url, **kwargs, signal=signal)
+   response = await pyfetch(url, **kwargs)
    ...

Potential downsides:

  1. the code may become a little confusing
  2. if the user don't use the name pyfetch, but using other names, it won't be patched as partial(pyfetch, signal=_signal
  3. ChainMap and partial may have small runtime overhead

Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from asyncio import CancelledError
from collections.abc import Awaitable, Callable
from pathlib import Path
from typing import TYPE_CHECKING, Concatenate, ParamSpec, TypeVar
from urllib.parse import urlparse

from pyodide._package_loader import get_dynlibs
Expand All @@ -7,7 +10,7 @@

try:
import pyodide_js
from js import Object
from js import AbortController, AbortSignal, Object
from pyodide_js import loadedPackages, loadPackage
from pyodide_js._api import ( # type: ignore[import]
loadBinaryFile,
Expand All @@ -21,21 +24,45 @@
raise
# Otherwise, this is pytest test collection so let it go.

if IN_BROWSER or TYPE_CHECKING:
P = ParamSpec("P")
T = TypeVar("T")

async def fetch_bytes(url: str, kwargs: dict[str, str]) -> bytes:
def _abort_on_cancel(
func: Callable[Concatenate[AbortSignal, P], Awaitable[T]],
) -> Callable[P, Awaitable[T]]:
"""inject an AbortSignal as the first argument"""

async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
controller = AbortController.new()
try:
return await func(controller.signal, *args, **kwargs)
except CancelledError:
controller.abort()
raise

return wrapper

else:
_abort_on_cancel = lambda func: lambda *args, **kwargs: func(None, *args, **kwargs)


@_abort_on_cancel
async def fetch_bytes(signal: AbortSignal, url: str, kwargs: dict[str, str]) -> bytes:
parsed_url = urlparse(url)
if parsed_url.scheme == "emfs":
return Path(parsed_url.path).read_bytes()
if parsed_url.scheme == "file":
return (await loadBinaryFile(parsed_url.path)).to_bytes()

return await (await pyfetch(url, **kwargs)).bytes()
return await (await pyfetch(url, **kwargs, signal=signal)).bytes()


@_abort_on_cancel
async def fetch_string_and_headers(
url: str, kwargs: dict[str, str]
signal: AbortSignal, url: str, kwargs: dict[str, str]
) -> tuple[str, dict[str, str]]:
response = await pyfetch(url, **kwargs)
response = await pyfetch(url, **kwargs, signal=signal)

content = await response.string()
# TODO: replace with response.headers when pyodide>= 0.24 is released
Expand Down
2 changes: 1 addition & 1 deletion micropip/_compat_not_in_pyodide.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def __get__(self, attr):
REPODATA_INFO: dict[str, str] = {}


def loadPackage(packages: str | list[str]) -> None:
async def loadPackage(packages: str | list[str]) -> None:
pass


Expand Down
16 changes: 11 additions & 5 deletions micropip/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,17 @@ async def gather_requirements(
self,
requirements: list[str] | list[Requirement],
) -> None:
requirement_promises = []
for requirement in requirements:
requirement_promises.append(self.add_requirement(requirement))

await asyncio.gather(*requirement_promises)
futures: list[asyncio.Future] = []
try:
for requirement in requirements:
futures.append(asyncio.ensure_future(self.add_requirement(requirement)))
await asyncio.gather(*futures)
except ValueError:
if not self.keep_going:
for future in futures:
if not future.done():
future.cancel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't investigated how cancellation interacts with our event loop and I'm slightly worried that it could not interact well. But if this passes the tests then we should merge.

Copy link
Sponsor Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait a second, I just find that this part seems to raises several asyncio.exceptions.InvalidStateError: invalid state exceptions

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Zac-HD any tips on how to make my event loop support cancellation? Where to look who to ask etc?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyways I'll open a separate issue about this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you try using a TaskGroup? Maybe that will fix our problems.
https://docs.python.org/3/library/asyncio-task.html#asyncio.TaskGroup

Copy link
Sponsor Member Author

@CNSeniorious000 CNSeniorious000 Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, I've refactored this section using TaskGroup, now all the futures should be gathered in one top-level TaskGroup.

Note that this makes the traceback looks a bit different because TaskGroup would group all the exceptions into a ExceptionGroup. Here comes a little inconsistency because if specified keep_going=True, this will only raise ValueError.

And asyncio.exceptions.InvalidStateError: invalid state stays still.

Copy link
Sponsor Member Author

@CNSeniorious000 CNSeniorious000 Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test fails because the raised error is not a ValueError but a ExceptionGroup containing a ValueError.

It's possible to recover the original ValueError by wrap the async with block with a try-except:

...
    try:
        async with asyncio.TaskGroup() as tg:
            self.tg = tg  # only one task group from the top level
            for requirement in requirements:
                tg.create_task(self.add_requirement(requirement))
    except ExceptionGroup as e:
        raise e.exceptions[-1] from None
...

But I am not sure whether this is the desired behavior.

Maybe it is better to name a new error like ResolutionFailedError(ValueError) to wrap both type of error.

raise

async def add_requirement(self, req: str | Requirement) -> None:
if isinstance(req, Requirement):
Expand Down
Loading