Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow customization of run_process cancellation behavior #1525

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
("py:mod", "trio.abc"),
("py:class", "math.inf"),
("py:exc", "Anything else"),
("py:class", "async function"),
]
autodoc_inherit_docstrings = False
default_role = "obj"
Expand Down
7 changes: 7 additions & 0 deletions newsfragments/1104.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Previously, when `trio.run_process` was cancelled, it always killed
the subprocess immediately. Now, on Unix, it first gives the process a
chance to clean up by sending ``SIGTERM``, and only escalates to
``SIGKILL`` if the process is still running after 5 seconds. But if
you prefer the old behavior, or want to adjust the timeout, then don't
worry: you can now pass a custom ``deliver_cancel=`` argument to
define your own process killing policy.
87 changes: 86 additions & 1 deletion trio/_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
from typing import Optional
from functools import partial
import warnings

from ._abc import AsyncResource, SendStream, ReceiveStream
from ._highlevel_generic import StapledStream
Expand Down Expand Up @@ -377,13 +378,42 @@ async def open_process(
return Process._create(popen, trio_stdin, trio_stdout, trio_stderr)


async def _windows_deliver_cancel(p):
try:
p.terminate()
except OSError as exc:
warnings.warn(
RuntimeWarning(f"TerminateProcess on {p!r} failed with: {exc!r}")
)


async def _posix_deliver_cancel(p):
try:
p.terminate()
await trio.sleep(5)
warnings.warn(
RuntimeWarning(
f"process {p!r} ignored SIGTERM for 5 seconds. "
f"(Maybe you should custom deliver_cancel?) Trying SIGKILL."
njsmith marked this conversation as resolved.
Show resolved Hide resolved
)
)
p.kill()
except OSError as exc:
warnings.warn(
RuntimeWarning(
f"tried to kill process {p!r}, but failed with: {exc!r}"
)
)


async def run_process(
command,
*,
stdin=b"",
capture_stdout=False,
capture_stderr=False,
check=True,
deliver_cancel=None,
**options
):
"""Run ``command`` in a subprocess, wait for it to complete, and
Expand Down Expand Up @@ -441,6 +471,7 @@ async def run_process(
``**options``, or on Windows, ``command`` may alternatively
be a string, which will be parsed following platform-dependent
:ref:`quoting rules <subprocess-quoting>`.

stdin (:obj:`bytes`, file descriptor, or None): The bytes to provide to
the subprocess on its standard input stream, or ``None`` if the
subprocess's standard input should come from the same place as
Expand All @@ -449,18 +480,52 @@ async def run_process(
file descriptor or an object with a ``fileno()`` method,
in which case the subprocess's standard input will come from
that file.

capture_stdout (bool): If true, capture the bytes that the subprocess
writes to its standard output stream and return them in the
:attr:`~subprocess.CompletedProcess.stdout` attribute
of the returned :class:`~subprocess.CompletedProcess` object.

capture_stderr (bool): If true, capture the bytes that the subprocess
writes to its standard error stream and return them in the
:attr:`~subprocess.CompletedProcess.stderr` attribute
of the returned :class:`~subprocess.CompletedProcess` object.

check (bool): If false, don't validate that the subprocess exits
successfully. You should be sure to check the
``returncode`` attribute of the returned object if you pass
``check=False``, so that errors don't pass silently.

deliver_cancel (async function or None): If `run_process` is cancelled,
then it needs to kill the child process. There are multiple ways to
do this, so we let you customize it.

If you pass None (the default), then the behavior depends on the
platform:

- On Windows, Trio calls ``TerminateProcess``, which should kill the
process immediately.

- On Unix-likes, the default behavior is to send a ``SIGTERM``, wait
5 seconds, and send a ``SIGKILL``.

Alternatively, you can customize this behavior by passing in an
arbitrary async function, which will be called with the `Process`
object as an argument. For example, the default Unix behavior could
be implemented like this::

async def my_deliver_cancel(process):
process.send_signal(signal.SIGTERM)
await trio.sleep(5)
process.send_signal(signal.SIGKILL)

When the process actually exits, the ``deliver_cancel`` function
will automatically be cancelled – so if the process exits after
``SIGTERM``, then we'll never reach the ``SIGKILL``.

In any case, `run_process` will always wait for the child process to
exit before raising `Cancelled`.

**options: :func:`run_process` also accepts any :ref:`general subprocess
options <subprocess-options>` and passes them on to the
:class:`~trio.Process` constructor. This includes the
Expand Down Expand Up @@ -518,6 +583,13 @@ async def run_process(
raise ValueError("can't specify both stderr and capture_stderr")
options["stderr"] = subprocess.PIPE

if deliver_cancel is None:
if os.name == "nt":
deliver_cancel = _windows_deliver_cancel
else:
assert os.name == "posix"
deliver_cancel = _posix_deliver_cancel

stdout_chunks = []
stderr_chunks = []

Expand All @@ -542,7 +614,20 @@ async def read_output(stream, chunks):
nursery.start_soon(read_output, proc.stdout, stdout_chunks)
if proc.stderr is not None:
nursery.start_soon(read_output, proc.stderr, stderr_chunks)
await proc.wait()
try:
await proc.wait()
except trio.Cancelled:
with trio.CancelScope(shield=True):
killer_cscope = trio.CancelScope(shield=True)

async def killer():
with killer_cscope:
await deliver_cancel(proc)
njsmith marked this conversation as resolved.
Show resolved Hide resolved

nursery.start_soon(killer)
await proc.wait()
killer_cscope.cancel()
raise

stdout = b"".join(stdout_chunks) if proc.stdout is not None else None
stderr = b"".join(stderr_chunks) if proc.stderr is not None else None
Expand Down
54 changes: 54 additions & 0 deletions trio/tests/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sys
import pytest
import random
from functools import partial

from .. import (
_core, move_on_after, fail_after, sleep, sleep_forever, Process,
Expand Down Expand Up @@ -425,3 +426,56 @@ def on_alarm(sig, frame):
sleeper.kill()
sleeper.wait()
signal.signal(signal.SIGALRM, old_sigalrm)


async def test_custom_deliver_cancel():
custom_deliver_cancel_called = False

async def custom_deliver_cancel(proc):
nonlocal custom_deliver_cancel_called
custom_deliver_cancel_called = True
proc.terminate()
# Make sure this does get cancelled when the process exits, and that
# the process really exited.
try:
await sleep_forever()
finally:
assert proc.returncode is not None

async with _core.open_nursery() as nursery:
nursery.start_soon(
partial(
run_process, SLEEP(9999), deliver_cancel=custom_deliver_cancel
)
)
await wait_all_tasks_blocked()
nursery.cancel_scope.cancel()

assert custom_deliver_cancel_called


async def test_warn_on_failed_cancel_terminate(monkeypatch):
original_terminate = Process.terminate

def broken_terminate(self):
original_terminate(self)
raise OSError("whoops")

monkeypatch.setattr(Process, "terminate", broken_terminate)

with pytest.warns(RuntimeWarning, match=".*whoops.*"):
async with _core.open_nursery() as nursery:
nursery.start_soon(run_process, SLEEP(9999))
await wait_all_tasks_blocked()
nursery.cancel_scope.cancel()


@pytest.mark.skipif(os.name != "posix", reason="posix only")
async def test_warn_on_cancel_SIGKILL_escalation(autojump_clock, monkeypatch):
oremanj marked this conversation as resolved.
Show resolved Hide resolved
monkeypatch.setattr(Process, "terminate", lambda *args: None)

with pytest.warns(RuntimeWarning, match=".*ignored SIGTERM.*"):
async with _core.open_nursery() as nursery:
nursery.start_soon(run_process, SLEEP(9999))
await wait_all_tasks_blocked()
nursery.cancel_scope.cancel()