From 958f64f78705b4c6c922580bc9de9e4f798b49ef Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Mon, 30 Jun 2025 16:13:56 +0200 Subject: [PATCH 01/20] readd cancel on disconnect decorator --- .../src/simcore_service_api_server/api/routes/files.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/services/api-server/src/simcore_service_api_server/api/routes/files.py b/services/api-server/src/simcore_service_api_server/api/routes/files.py index 69f779f13e19..a220edacb232 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/files.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/files.py @@ -287,6 +287,7 @@ async def upload_files(files: list[UploadFile] = FileParam(...)): response_model=ClientFileUploadData, responses=_FILE_STATUS_CODES, ) +@cancel_on_disconnect async def get_upload_links( request: Request, client_file: UserFileToProgramJob | UserFile, @@ -421,6 +422,7 @@ async def abort_multipart_upload( response_model=OutputFile, responses=_FILE_STATUS_CODES, ) +@cancel_on_disconnect async def complete_multipart_upload( request: Request, file_id: UUID, From 176775f365b7fa323a70a0d8622a140d0e8708b2 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Mon, 30 Jun 2025 16:15:54 +0200 Subject: [PATCH 02/20] add asyncio.Event for killing poller task --- .../src/servicelib/fastapi/requests_decorators.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/packages/service-library/src/servicelib/fastapi/requests_decorators.py b/packages/service-library/src/servicelib/fastapi/requests_decorators.py index ae5f1ea047c6..f812437270c6 100644 --- a/packages/service-library/src/servicelib/fastapi/requests_decorators.py +++ b/packages/service-library/src/servicelib/fastapi/requests_decorators.py @@ -13,8 +13,7 @@ class _HandlerWithRequestArg(Protocol): __name__: str - async def __call__(self, request: Request, *args: Any, **kwargs: Any) -> Any: - ... + async def __call__(self, request: Request, *args: Any, **kwargs: Any) -> Any: ... def _validate_signature(handler: _HandlerWithRequestArg): @@ -36,13 +35,15 @@ def _validate_signature(handler: _HandlerWithRequestArg): _POLL_INTERVAL_S: float = 0.01 -async def _disconnect_poller(request: Request, result: Any): +async def _disconnect_poller(close_event: asyncio.Event, request: Request, result: Any): """ Poll for a disconnect. If the request disconnects, stop polling and return. """ while not await request.is_disconnected(): await asyncio.sleep(_POLL_INTERVAL_S) + if close_event.is_set(): + break return result @@ -59,8 +60,9 @@ async def wrapper(request: Request, *args, **kwargs): # Create two tasks: # one to poll the request and check if the client disconnected + kill_poller_event = asyncio.Event() poller_task = asyncio.create_task( - _disconnect_poller(request, sentinel), + _disconnect_poller(kill_poller_event, request, sentinel), name=f"cancel_on_disconnect/poller/{handler.__name__}/{id(sentinel)}", ) # , and another which is the request handler @@ -72,6 +74,7 @@ async def wrapper(request: Request, *args, **kwargs): done, pending = await asyncio.wait( [poller_task, handler_task], return_when=asyncio.FIRST_COMPLETED ) + kill_poller_event.set() # One has completed, cancel the other for t in pending: From c376bab4e9c2b8a446544c47ea7db5b4a87f970f Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Mon, 30 Jun 2025 16:49:12 +0200 Subject: [PATCH 03/20] add comment --- .../src/servicelib/fastapi/requests_decorators.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/service-library/src/servicelib/fastapi/requests_decorators.py b/packages/service-library/src/servicelib/fastapi/requests_decorators.py index f812437270c6..f508ed6b3f9e 100644 --- a/packages/service-library/src/servicelib/fastapi/requests_decorators.py +++ b/packages/service-library/src/servicelib/fastapi/requests_decorators.py @@ -60,6 +60,9 @@ async def wrapper(request: Request, *args, **kwargs): # Create two tasks: # one to poll the request and check if the client disconnected + # sometimes canceling a task doesn't cancel the task immediately. If the poller task is not "killed" immediately, the client doesn't + # get a response, and the request "hangs". For this reason, we use an event to signal the poller task to stop. + # See: https://github.com/ITISFoundation/osparc-issues/issues/1922 kill_poller_event = asyncio.Event() poller_task = asyncio.create_task( _disconnect_poller(kill_poller_event, request, sentinel), From 8812a8a29a0c067276ea873b318e7499c08bd75c Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 1 Jul 2025 07:02:36 +0200 Subject: [PATCH 04/20] add test for request handler `cancel_on_disconnect` @pcrespov --- .../tests/fastapi/test_request_decorators.py | 168 +++++++----------- 1 file changed, 61 insertions(+), 107 deletions(-) diff --git a/packages/service-library/tests/fastapi/test_request_decorators.py b/packages/service-library/tests/fastapi/test_request_decorators.py index 18f6267cf33a..4a5200cbe937 100644 --- a/packages/service-library/tests/fastapi/test_request_decorators.py +++ b/packages/service-library/tests/fastapi/test_request_decorators.py @@ -2,121 +2,75 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable + import asyncio -import subprocess -import sys -import time -from collections.abc import Callable, Iterator -from contextlib import contextmanager -from pathlib import Path -from typing import NamedTuple +from collections.abc import Awaitable, Callable +from typing import Any +from unittest.mock import AsyncMock import pytest -import requests -from fastapi import FastAPI, Query, Request +from fastapi import Request from servicelib.fastapi.requests_decorators import cancel_on_disconnect -CURRENT_FILE = Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve() -CURRENT_DIR = CURRENT_FILE.parent +POLLER_CLEANUP_DELAY_S = 100.0 + +@pytest.fixture +def long_running_poller_mock( + monkeypatch: pytest.MonkeyPatch, +) -> Callable[[asyncio.Event, Request, Any], Awaitable]: -mock_app = FastAPI(title="Disconnect example") + async def _mock_disconnect_poller( + close_event: asyncio.Event, request: Request, result: Any + ): + _mock_disconnect_poller.called = True + while not await request.is_disconnected(): + await asyncio.sleep(0.01) + if close_event.is_set(): + # Simulate a long cleanup procedure + await asyncio.sleep(POLLER_CLEANUP_DELAY_S) + break + return result -MESSAGE_ON_HANDLER_CANCELLATION = "Request was cancelled!!" + monkeypatch.setattr( + "servicelib.fastapi.requests_decorators._disconnect_poller", + _mock_disconnect_poller, + ) + return _mock_disconnect_poller -@mock_app.get("/example") -@cancel_on_disconnect -async def example( - request: Request, - wait: float = Query(..., description="Time to wait, in seconds"), +async def test_decorator_waits_for_poller_cleanup( + long_running_poller_mock: Callable[[asyncio.Event, Request, Any], Awaitable], ): - try: - print(f"Sleeping for {wait:.2f}") - await asyncio.sleep(wait) - print("Sleep not cancelled") - return f"I waited for {wait:.2f}s and now this is the result" - except asyncio.CancelledError: - print(MESSAGE_ON_HANDLER_CANCELLATION) - raise - - -class ServerInfo(NamedTuple): - url: str - proc: subprocess.Popen - - -@contextmanager -def server_lifetime(port: int) -> Iterator[ServerInfo]: - with subprocess.Popen( - [ - "uvicorn", - f"{CURRENT_FILE.stem}:mock_app", - "--port", - f"{port}", - ], - cwd=f"{CURRENT_DIR}", - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) as proc: - - url = f"http://127.0.0.1:{port}" - print("\nStarted", proc.args) - - # some time to start - time.sleep(2) - - # checks started successfully - assert proc.stdout - assert not proc.poll(), proc.stdout.read().decode("utf-8") - print("server is up and waiting for requests...") - yield ServerInfo(url, proc) - print("server is closing...") - proc.terminate() - print("server terminated") - - -def test_cancel_on_disconnect(get_unused_port: Callable[[], int]): - - with server_lifetime(port=get_unused_port()) as server: - url, proc = server - print("--> testing server") - response = requests.get(f"{server.url}/example?wait=0", timeout=2) - print(response.url, "->", response.text) - response.raise_for_status() - print("<-- server responds") - - print("--> testing server correctly cancels") - with pytest.raises(requests.exceptions.ReadTimeout): - response = requests.get(f"{server.url}/example?wait=2", timeout=0.5) - print("<-- testing server correctly cancels done") - - print("--> testing server again") - # NOTE: the timeout here appears to be sensitive. if it is set <5 the test hangs from time to time - response = requests.get(f"{server.url}/example?wait=1", timeout=5) - print(response.url, "->", response.text) - response.raise_for_status() - print("<-- testing server again done") - - # kill service - server.proc.terminate() - assert server.proc.stdout - server_log = server.proc.stdout.read().decode("utf-8") - print( - f"{server.url=} stdout", - "-" * 10, - "\n", - server_log, - "-" * 30, - ) - # server.url=http://127.0.0.1:44077 stdout ---------- - # Sleeping for 0.00 - # Sleep not cancelled - # INFO: 127.0.0.1:35114 - "GET /example?wait=0 HTTP/1.1" 200 OK - # Sleeping for 2.00 - # Exiting on cancellation - # Sleeping for 1.00 - # Sleep not cancelled - # INFO: 127.0.0.1:35134 - "GET /example?wait=1 HTTP/1.1" 200 OK - - assert MESSAGE_ON_HANDLER_CANCELLATION in server_log + """ + Tests that the decorator's wrapper waits for the poller task to finish + its cleanup, even if the handler finishes first, without needing a full server. + """ + long_running_poller_mock.called = False + handler_was_called = False + + @cancel_on_disconnect + async def my_handler(request: Request): + nonlocal handler_was_called + handler_was_called = True + await asyncio.sleep(0.1) # Simulate quick work + return "Success" + + # Mock a fastapi.Request object + mock_request = AsyncMock(spec=Request) + mock_request.is_disconnected.return_value = False + + # --- + tasks_before = asyncio.all_tasks() + + # Call the decorated handler + _ = await my_handler(mock_request) + + tasks_after = asyncio.all_tasks() + # --- + + assert handler_was_called + assert long_running_poller_mock.called == True + + # Check that no background tasks were left orphaned + assert tasks_before == tasks_after From a80f4e199e6085b21869cfe6be9f55b972e66a9a Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 1 Jul 2025 07:12:02 +0200 Subject: [PATCH 05/20] use taskgroup for error handling --- .../servicelib/fastapi/requests_decorators.py | 106 ++++++------------ 1 file changed, 34 insertions(+), 72 deletions(-) diff --git a/packages/service-library/src/servicelib/fastapi/requests_decorators.py b/packages/service-library/src/servicelib/fastapi/requests_decorators.py index f508ed6b3f9e..062494d30a1d 100644 --- a/packages/service-library/src/servicelib/fastapi/requests_decorators.py +++ b/packages/service-library/src/servicelib/fastapi/requests_decorators.py @@ -29,96 +29,58 @@ def _validate_signature(handler: _HandlerWithRequestArg): # -# cancel_on_disconnect/disconnect_poller based -# on https://github.com/RedRoserade/fastapi-disconnect-example/blob/main/app.py +# cancel_on_disconnect based on TaskGroup # _POLL_INTERVAL_S: float = 0.01 -async def _disconnect_poller(close_event: asyncio.Event, request: Request, result: Any): +class _ClientDisconnectedError(Exception): + """Internal exception raised by the poller task when the client disconnects.""" + + +async def _disconnect_poller_for_task_group(request: Request): """ - Poll for a disconnect. - If the request disconnects, stop polling and return. + Polls for client disconnection and raises _ClientDisconnectedError if it occurs. """ while not await request.is_disconnected(): await asyncio.sleep(_POLL_INTERVAL_S) - if close_event.is_set(): - break - return result + raise _ClientDisconnectedError() def cancel_on_disconnect(handler: _HandlerWithRequestArg): """ - After client disconnects, handler gets cancelled in ~<3 secs + Decorator that cancels the request handler if the client disconnects. + + Uses a TaskGroup to manage the handler and a poller task concurrently. + If the client disconnects, the poller raises an exception, which is + caught and translated into a 503 Service Unavailable response. """ _validate_signature(handler) @wraps(handler) async def wrapper(request: Request, *args, **kwargs): - sentinel = object() - - # Create two tasks: - # one to poll the request and check if the client disconnected - # sometimes canceling a task doesn't cancel the task immediately. If the poller task is not "killed" immediately, the client doesn't - # get a response, and the request "hangs". For this reason, we use an event to signal the poller task to stop. - # See: https://github.com/ITISFoundation/osparc-issues/issues/1922 - kill_poller_event = asyncio.Event() - poller_task = asyncio.create_task( - _disconnect_poller(kill_poller_event, request, sentinel), - name=f"cancel_on_disconnect/poller/{handler.__name__}/{id(sentinel)}", - ) - # , and another which is the request handler - handler_task = asyncio.create_task( - handler(request, *args, **kwargs), - name=f"cancel_on_disconnect/handler/{handler.__name__}/{id(sentinel)}", - ) - - done, pending = await asyncio.wait( - [poller_task, handler_task], return_when=asyncio.FIRST_COMPLETED - ) - kill_poller_event.set() - - # One has completed, cancel the other - for t in pending: - t.cancel() - - try: - await asyncio.wait_for(t, timeout=3) - - except asyncio.CancelledError: - pass - except Exception: # pylint: disable=broad-except - if t is handler_task: - raise - finally: - assert t.done() # nosec - - # Return the result if the handler finished first - if handler_task in done: - assert poller_task.done() # nosec - return await handler_task - - # Otherwise, raise an exception. This is not exactly needed, - # but it will prevent validation errors if your request handler - # is supposed to return something. - logger.warning( - "Request %s %s cancelled since client %s disconnected:\n - %s\n - %s", - request.method, - request.url, - request.client, - f"{poller_task=}", - f"{handler_task=}", - ) - - assert poller_task.done() # nosec - assert handler_task.done() # nosec - - # NOTE: uvicorn server fails with 499 - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail=f"client disconnected from {request=}", - ) + try: + async with asyncio.TaskGroup() as tg: + handler_task = tg.create_task(handler(request, *args, **kwargs)) + tg.create_task(_disconnect_poller_for_task_group(request)) + + return handler_task.result() + + except* _ClientDisconnectedError as eg: + logger.info( + "Request %s %s cancelled since client %s disconnected.", + request.method, + request.url, + request.client, + ) + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="Client disconnected", + ) from eg + + except* Exception as eg: + raise eg.exceptions[0] return wrapper From 60e99d04c61dec0efcaf28f0ed897dac2ed2877d Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 1 Jul 2025 07:47:44 +0200 Subject: [PATCH 06/20] add old names to tasks --- .../src/servicelib/fastapi/requests_decorators.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/packages/service-library/src/servicelib/fastapi/requests_decorators.py b/packages/service-library/src/servicelib/fastapi/requests_decorators.py index 062494d30a1d..70c938aa77b5 100644 --- a/packages/service-library/src/servicelib/fastapi/requests_decorators.py +++ b/packages/service-library/src/servicelib/fastapi/requests_decorators.py @@ -60,10 +60,17 @@ def cancel_on_disconnect(handler: _HandlerWithRequestArg): @wraps(handler) async def wrapper(request: Request, *args, **kwargs): + sentinel = object() try: async with asyncio.TaskGroup() as tg: - handler_task = tg.create_task(handler(request, *args, **kwargs)) - tg.create_task(_disconnect_poller_for_task_group(request)) + handler_task = tg.create_task( + handler(request, *args, **kwargs), + name=f"cancel_on_disconnect/handler/{handler.__name__}/{id(sentinel)}", + ) + tg.create_task( + _disconnect_poller_for_task_group(request), + name=f"cancel_on_disconnect/poller/{handler.__name__}/{id(sentinel)}", + ) return handler_task.result() From e6c42f71d12bc820b59a652c7d07b5b837d015a8 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 1 Jul 2025 15:33:36 +0200 Subject: [PATCH 07/20] use event to kill poller task to be sure it terminates --- .../servicelib/fastapi/requests_decorators.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/packages/service-library/src/servicelib/fastapi/requests_decorators.py b/packages/service-library/src/servicelib/fastapi/requests_decorators.py index 70c938aa77b5..2cdf373f7c52 100644 --- a/packages/service-library/src/servicelib/fastapi/requests_decorators.py +++ b/packages/service-library/src/servicelib/fastapi/requests_decorators.py @@ -38,12 +38,16 @@ class _ClientDisconnectedError(Exception): """Internal exception raised by the poller task when the client disconnects.""" -async def _disconnect_poller_for_task_group(request: Request): +async def _disconnect_poller_for_task_group( + close_event: asyncio.Event, request: Request +): """ Polls for client disconnection and raises _ClientDisconnectedError if it occurs. """ while not await request.is_disconnected(): await asyncio.sleep(_POLL_INTERVAL_S) + if close_event.is_set(): + return raise _ClientDisconnectedError() @@ -61,16 +65,20 @@ def cancel_on_disconnect(handler: _HandlerWithRequestArg): @wraps(handler) async def wrapper(request: Request, *args, **kwargs): sentinel = object() + kill_poller_task_event = asyncio.Event() try: async with asyncio.TaskGroup() as tg: + + tg.create_task( + _disconnect_poller_for_task_group(kill_poller_task_event, request), + name=f"cancel_on_disconnect/poller/{handler.__name__}/{id(sentinel)}", + ) handler_task = tg.create_task( handler(request, *args, **kwargs), name=f"cancel_on_disconnect/handler/{handler.__name__}/{id(sentinel)}", ) - tg.create_task( - _disconnect_poller_for_task_group(request), - name=f"cancel_on_disconnect/poller/{handler.__name__}/{id(sentinel)}", - ) + await handler_task + kill_poller_task_event.set() return handler_task.result() From 1229fa333f1f7a38a409c6467f74fd7fcab4df7a Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Wed, 2 Jul 2025 09:15:51 +0200 Subject: [PATCH 08/20] pylint --- .../src/servicelib/fastapi/requests_decorators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/service-library/src/servicelib/fastapi/requests_decorators.py b/packages/service-library/src/servicelib/fastapi/requests_decorators.py index 2cdf373f7c52..daf4922fc182 100644 --- a/packages/service-library/src/servicelib/fastapi/requests_decorators.py +++ b/packages/service-library/src/servicelib/fastapi/requests_decorators.py @@ -95,7 +95,7 @@ async def wrapper(request: Request, *args, **kwargs): ) from eg except* Exception as eg: - raise eg.exceptions[0] + raise eg.exceptions[0] # pylint: disable=no-member return wrapper From 4cbc9dc0845eb41439e23f49ee32411aef7fcf9e Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Wed, 2 Jul 2025 09:19:56 +0200 Subject: [PATCH 09/20] fix test --- .../tests/fastapi/test_request_decorators.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/packages/service-library/tests/fastapi/test_request_decorators.py b/packages/service-library/tests/fastapi/test_request_decorators.py index 4a5200cbe937..19f899fad9d6 100644 --- a/packages/service-library/tests/fastapi/test_request_decorators.py +++ b/packages/service-library/tests/fastapi/test_request_decorators.py @@ -5,7 +5,6 @@ import asyncio from collections.abc import Awaitable, Callable -from typing import Any from unittest.mock import AsyncMock import pytest @@ -18,29 +17,24 @@ @pytest.fixture def long_running_poller_mock( monkeypatch: pytest.MonkeyPatch, -) -> Callable[[asyncio.Event, Request, Any], Awaitable]: +) -> Callable[[asyncio.Event, Request], Awaitable]: - async def _mock_disconnect_poller( - close_event: asyncio.Event, request: Request, result: Any - ): + async def _mock_disconnect_poller(close_event: asyncio.Event, request: Request): _mock_disconnect_poller.called = True while not await request.is_disconnected(): - await asyncio.sleep(0.01) + await asyncio.sleep(2) if close_event.is_set(): - # Simulate a long cleanup procedure - await asyncio.sleep(POLLER_CLEANUP_DELAY_S) break - return result monkeypatch.setattr( - "servicelib.fastapi.requests_decorators._disconnect_poller", + "servicelib.fastapi.requests_decorators._disconnect_poller_for_task_group", _mock_disconnect_poller, ) return _mock_disconnect_poller async def test_decorator_waits_for_poller_cleanup( - long_running_poller_mock: Callable[[asyncio.Event, Request, Any], Awaitable], + long_running_poller_mock: Callable[[asyncio.Event, Request], Awaitable], ): """ Tests that the decorator's wrapper waits for the poller task to finish From e22808ab71a4f5e4a747f6861b051cca052b4276 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Fri, 4 Jul 2025 14:23:31 +0200 Subject: [PATCH 10/20] @pcrespov readd comments --- .../src/servicelib/fastapi/requests_decorators.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/service-library/src/servicelib/fastapi/requests_decorators.py b/packages/service-library/src/servicelib/fastapi/requests_decorators.py index daf4922fc182..9f88406b33a3 100644 --- a/packages/service-library/src/servicelib/fastapi/requests_decorators.py +++ b/packages/service-library/src/servicelib/fastapi/requests_decorators.py @@ -67,12 +67,15 @@ async def wrapper(request: Request, *args, **kwargs): sentinel = object() kill_poller_task_event = asyncio.Event() try: + # Create two tasks in a TaskGroup async with asyncio.TaskGroup() as tg: + # One to poll the `Request` object to check for client disconnection tg.create_task( _disconnect_poller_for_task_group(kill_poller_task_event, request), name=f"cancel_on_disconnect/poller/{handler.__name__}/{id(sentinel)}", ) + # The other to run the actual request handler handler_task = tg.create_task( handler(request, *args, **kwargs), name=f"cancel_on_disconnect/handler/{handler.__name__}/{id(sentinel)}", From 5399294d2bdbf8cd471039fd587e90b5b312350b Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Fri, 4 Jul 2025 14:29:51 +0200 Subject: [PATCH 11/20] readd old tests --- .../tests/fastapi/test_request_decorators.py | 120 +++++++++++++++++- 1 file changed, 118 insertions(+), 2 deletions(-) diff --git a/packages/service-library/tests/fastapi/test_request_decorators.py b/packages/service-library/tests/fastapi/test_request_decorators.py index 19f899fad9d6..49646047169d 100644 --- a/packages/service-library/tests/fastapi/test_request_decorators.py +++ b/packages/service-library/tests/fastapi/test_request_decorators.py @@ -4,16 +4,132 @@ import asyncio -from collections.abc import Awaitable, Callable +import subprocess +import sys +import time +from collections.abc import Awaitable, Callable, Iterator +from contextlib import contextmanager +from pathlib import Path +from typing import NamedTuple from unittest.mock import AsyncMock import pytest -from fastapi import Request +import requests +from fastapi import FastAPI, Query, Request from servicelib.fastapi.requests_decorators import cancel_on_disconnect POLLER_CLEANUP_DELAY_S = 100.0 +CURRENT_FILE = Path(sys.argv[0] if __name__ == "__main__" else __file__).resolve() +CURRENT_DIR = CURRENT_FILE.parent + + +mock_app = FastAPI(title="Disconnect example") + +MESSAGE_ON_HANDLER_CANCELLATION = "Request was cancelled!!" + + +@mock_app.get("/example") +@cancel_on_disconnect +async def example( + request: Request, + wait: float = Query(..., description="Time to wait, in seconds"), +): + try: + print(f"Sleeping for {wait:.2f}") + await asyncio.sleep(wait) + print("Sleep not cancelled") + return f"I waited for {wait:.2f}s and now this is the result" + except asyncio.CancelledError: + print(MESSAGE_ON_HANDLER_CANCELLATION) + raise + + +class ServerInfo(NamedTuple): + url: str + proc: subprocess.Popen + + +@contextmanager +def server_lifetime(port: int) -> Iterator[ServerInfo]: + with subprocess.Popen( + [ + "uvicorn", + f"{CURRENT_FILE.stem}:mock_app", + "--port", + f"{port}", + ], + cwd=f"{CURRENT_DIR}", + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) as proc: + + url = f"http://127.0.0.1:{port}" + print("\nStarted", proc.args) + + # some time to start + time.sleep(2) + + # checks started successfully + assert proc.stdout + assert not proc.poll(), proc.stdout.read().decode("utf-8") + print("server is up and waiting for requests...") + yield ServerInfo(url, proc) + print("server is closing...") + proc.terminate() + print("server terminated") + + +def test_cancel_on_disconnect(get_unused_port: Callable[[], int]): + + with server_lifetime(port=get_unused_port()) as server: + url, proc = server + print("--> testing server") + response = requests.get(f"{server.url}/example?wait=0", timeout=2) + print(response.url, "->", response.text) + response.raise_for_status() + print("<-- server responds") + + print("--> testing server correctly cancels") + with pytest.raises(requests.exceptions.ReadTimeout): + response = requests.get(f"{server.url}/example?wait=2", timeout=0.5) + print("<-- testing server correctly cancels done") + + print("--> testing server again") + # NOTE: the timeout here appears to be sensitive. if it is set <5 the test hangs from time to time + response = requests.get(f"{server.url}/example?wait=1", timeout=5) + print(response.url, "->", response.text) + response.raise_for_status() + print("<-- testing server again done") + + # kill service + server.proc.terminate() + assert server.proc.stdout + server_log = server.proc.stdout.read().decode("utf-8") + print( + f"{server.url=} stdout", + "-" * 10, + "\n", + server_log, + "-" * 30, + ) + # server.url=http://127.0.0.1:44077 stdout ---------- + # Sleeping for 0.00 + # Sleep not cancelled + # INFO: 127.0.0.1:35114 - "GET /example?wait=0 HTTP/1.1" 200 OK + # Sleeping for 2.00 + # Exiting on cancellation + # Sleeping for 1.00 + # Sleep not cancelled + # INFO: 127.0.0.1:35134 - "GET /example?wait=1 HTTP/1.1" 200 OK + + assert MESSAGE_ON_HANDLER_CANCELLATION in server_log + + +# =================================================================================================== + + @pytest.fixture def long_running_poller_mock( monkeypatch: pytest.MonkeyPatch, From 97afe884816bdb30f1d81b21cdede15b8786de43 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Fri, 4 Jul 2025 14:30:15 +0200 Subject: [PATCH 12/20] readd --- .../service-library/tests/fastapi/test_request_decorators.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/service-library/tests/fastapi/test_request_decorators.py b/packages/service-library/tests/fastapi/test_request_decorators.py index 49646047169d..d711a930631a 100644 --- a/packages/service-library/tests/fastapi/test_request_decorators.py +++ b/packages/service-library/tests/fastapi/test_request_decorators.py @@ -127,9 +127,6 @@ def test_cancel_on_disconnect(get_unused_port: Callable[[], int]): assert MESSAGE_ON_HANDLER_CANCELLATION in server_log -# =================================================================================================== - - @pytest.fixture def long_running_poller_mock( monkeypatch: pytest.MonkeyPatch, From 373ad5c6b512c74dd4e779fede62028e533f79f1 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Fri, 4 Jul 2025 15:19:16 +0200 Subject: [PATCH 13/20] factor out core funcionality into run_until_cancelled --- .../src/servicelib/async_utils.py | 59 ++++++++++++++++++- .../servicelib/fastapi/requests_decorators.py | 55 ++++------------- 2 files changed, 69 insertions(+), 45 deletions(-) diff --git a/packages/service-library/src/servicelib/async_utils.py b/packages/service-library/src/servicelib/async_utils.py index d2c62ba55ffb..424156acfdb6 100644 --- a/packages/service-library/src/servicelib/async_utils.py +++ b/packages/service-library/src/servicelib/async_utils.py @@ -1,7 +1,7 @@ import asyncio import logging from collections import deque -from collections.abc import Awaitable, Callable +from collections.abc import Awaitable, Callable, Coroutine from contextlib import suppress from dataclasses import dataclass from functools import wraps @@ -210,3 +210,60 @@ async def worker(in_q: Queue[QueueElement], out_q: Queue) -> None: return wrapper return decorator + + +TaskCancelCallback = Callable[[], Awaitable[bool]] +_POLL_INTERVAL_S: float = 0.01 + + +class TaskCancelled(Exception): + """Internal exception raised by the poller task when the client disconnects.""" + + +async def _poller_for_task_group( + close_event: asyncio.Event, cancel_awaitable: TaskCancelCallback +): + """ + Polls for cancellation via the callback and raises TaskCancelled if it occurs. + """ + while not await cancel_awaitable(): + await asyncio.sleep(_POLL_INTERVAL_S) + if close_event.is_set(): + return + raise TaskCancelled() + + +async def run_until_cancelled( + *, coro: Coroutine, cancel_callback: TaskCancelCallback +) -> Any: + """ + Runs the given coroutine until it completes or cancellation is requested. + + This function executes the provided coroutine and periodically checks the given + cancel_callback. If cancellation is requested (i.e., cancel_callback returns True), + the coroutine is cancelled and a TaskCancelled exception is raised. If the coroutine + completes first, its result (or exception) is returned/reraised. + """ + sentinel = object() + close_poller_event = asyncio.Event() + try: + # Create two tasks in a TaskGroup + async with asyncio.TaskGroup() as tg: + + # One to poll for cancellation + tg.create_task( + _poller_for_task_group(close_poller_event, cancel_callback), + name=f"run_until_cancelled/poller/{coro.__name__}/{id(sentinel)}", + ) + # The other to run the actual coroutine + coro_task = tg.create_task( + coro, + name=f"run_until_cancelled/coroutine/{coro.__name__}/{id(sentinel)}", + ) + await coro_task + close_poller_event.set() + + return coro_task.result() + + except* Exception as eg: + raise eg.exceptions[0] # pylint: disable=no-member diff --git a/packages/service-library/src/servicelib/fastapi/requests_decorators.py b/packages/service-library/src/servicelib/fastapi/requests_decorators.py index 9f88406b33a3..761c81be7061 100644 --- a/packages/service-library/src/servicelib/fastapi/requests_decorators.py +++ b/packages/service-library/src/servicelib/fastapi/requests_decorators.py @@ -1,4 +1,3 @@ -import asyncio import inspect import logging from functools import wraps @@ -7,6 +6,8 @@ from fastapi import Request, status from fastapi.exceptions import HTTPException +from ..async_utils import TaskCancelled, run_until_cancelled + logger = logging.getLogger(__name__) @@ -31,24 +32,6 @@ def _validate_signature(handler: _HandlerWithRequestArg): # # cancel_on_disconnect based on TaskGroup # -_POLL_INTERVAL_S: float = 0.01 - - -class _ClientDisconnectedError(Exception): - """Internal exception raised by the poller task when the client disconnects.""" - - -async def _disconnect_poller_for_task_group( - close_event: asyncio.Event, request: Request -): - """ - Polls for client disconnection and raises _ClientDisconnectedError if it occurs. - """ - while not await request.is_disconnected(): - await asyncio.sleep(_POLL_INTERVAL_S) - if close_event.is_set(): - return - raise _ClientDisconnectedError() def cancel_on_disconnect(handler: _HandlerWithRequestArg): @@ -64,28 +47,15 @@ def cancel_on_disconnect(handler: _HandlerWithRequestArg): @wraps(handler) async def wrapper(request: Request, *args, **kwargs): - sentinel = object() - kill_poller_task_event = asyncio.Event() + try: - # Create two tasks in a TaskGroup - async with asyncio.TaskGroup() as tg: - - # One to poll the `Request` object to check for client disconnection - tg.create_task( - _disconnect_poller_for_task_group(kill_poller_task_event, request), - name=f"cancel_on_disconnect/poller/{handler.__name__}/{id(sentinel)}", - ) - # The other to run the actual request handler - handler_task = tg.create_task( - handler(request, *args, **kwargs), - name=f"cancel_on_disconnect/handler/{handler.__name__}/{id(sentinel)}", - ) - await handler_task - kill_poller_task_event.set() - - return handler_task.result() - - except* _ClientDisconnectedError as eg: + await run_until_cancelled( + coro=handler(request, *args, **kwargs), + cancel_callback=request.is_disconnected, + ) + + except TaskCancelled as exc: + logger.info( "Request %s %s cancelled since client %s disconnected.", request.method, @@ -95,10 +65,7 @@ async def wrapper(request: Request, *args, **kwargs): raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Client disconnected", - ) from eg - - except* Exception as eg: - raise eg.exceptions[0] # pylint: disable=no-member + ) from exc return wrapper From ef3911e50c7d9d08ed30a13562cb7f1fa8900d81 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Fri, 4 Jul 2025 15:27:11 +0200 Subject: [PATCH 14/20] fix tests --- .../service-library/tests/fastapi/test_request_decorators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/service-library/tests/fastapi/test_request_decorators.py b/packages/service-library/tests/fastapi/test_request_decorators.py index d711a930631a..2b271b6219f0 100644 --- a/packages/service-library/tests/fastapi/test_request_decorators.py +++ b/packages/service-library/tests/fastapi/test_request_decorators.py @@ -140,7 +140,7 @@ async def _mock_disconnect_poller(close_event: asyncio.Event, request: Request): break monkeypatch.setattr( - "servicelib.fastapi.requests_decorators._disconnect_poller_for_task_group", + "servicelib.async_utils._poller_for_task_group", _mock_disconnect_poller, ) return _mock_disconnect_poller From a626a294a583869e6e74f39905301af3286f5186 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Fri, 4 Jul 2025 22:11:27 +0200 Subject: [PATCH 15/20] ensure function decorator works on local deployment --- packages/service-library/src/servicelib/async_utils.py | 2 +- .../src/servicelib/fastapi/requests_decorators.py | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/packages/service-library/src/servicelib/async_utils.py b/packages/service-library/src/servicelib/async_utils.py index 424156acfdb6..68fda7dff267 100644 --- a/packages/service-library/src/servicelib/async_utils.py +++ b/packages/service-library/src/servicelib/async_utils.py @@ -230,7 +230,7 @@ async def _poller_for_task_group( await asyncio.sleep(_POLL_INTERVAL_S) if close_event.is_set(): return - raise TaskCancelled() + raise TaskCancelled async def run_until_cancelled( diff --git a/packages/service-library/src/servicelib/fastapi/requests_decorators.py b/packages/service-library/src/servicelib/fastapi/requests_decorators.py index 761c81be7061..b9c6f4941202 100644 --- a/packages/service-library/src/servicelib/fastapi/requests_decorators.py +++ b/packages/service-library/src/servicelib/fastapi/requests_decorators.py @@ -1,6 +1,6 @@ import inspect import logging -from functools import wraps +from functools import partial, wraps from typing import Any, Protocol from fastapi import Request, status @@ -34,6 +34,10 @@ def _validate_signature(handler: _HandlerWithRequestArg): # +async def _is_client_disconnected(request: Request): + return await request.is_disconnected() + + def cancel_on_disconnect(handler: _HandlerWithRequestArg): """ Decorator that cancels the request handler if the client disconnects. @@ -49,9 +53,9 @@ def cancel_on_disconnect(handler: _HandlerWithRequestArg): async def wrapper(request: Request, *args, **kwargs): try: - await run_until_cancelled( + return await run_until_cancelled( coro=handler(request, *args, **kwargs), - cancel_callback=request.is_disconnected, + cancel_callback=partial(_is_client_disconnected, request), ) except TaskCancelled as exc: From edd826e5f63a0d193071370277539a0dc238bafe Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Fri, 4 Jul 2025 23:18:06 +0200 Subject: [PATCH 16/20] migrate middleware to new implementation --- .../src/servicelib/async_utils.py | 17 +++-- .../fastapi/cancellation_middleware.py | 68 +++++++++---------- 2 files changed, 44 insertions(+), 41 deletions(-) diff --git a/packages/service-library/src/servicelib/async_utils.py b/packages/service-library/src/servicelib/async_utils.py index 68fda7dff267..896fa13be214 100644 --- a/packages/service-library/src/servicelib/async_utils.py +++ b/packages/service-library/src/servicelib/async_utils.py @@ -7,6 +7,8 @@ from functools import wraps from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar +from pydantic import NonNegativeFloat + from . import tracing from .utils_profiling_middleware import dont_profile, is_profiling, profile_context @@ -221,20 +223,25 @@ class TaskCancelled(Exception): async def _poller_for_task_group( - close_event: asyncio.Event, cancel_awaitable: TaskCancelCallback + close_event: asyncio.Event, + cancel_awaitable: TaskCancelCallback, + poll_interval: NonNegativeFloat, ): """ Polls for cancellation via the callback and raises TaskCancelled if it occurs. """ while not await cancel_awaitable(): - await asyncio.sleep(_POLL_INTERVAL_S) + await asyncio.sleep(poll_interval) if close_event.is_set(): return raise TaskCancelled async def run_until_cancelled( - *, coro: Coroutine, cancel_callback: TaskCancelCallback + *, + coro: Coroutine, + cancel_callback: TaskCancelCallback, + poll_interval: NonNegativeFloat = _POLL_INTERVAL_S, ) -> Any: """ Runs the given coroutine until it completes or cancellation is requested. @@ -252,7 +259,9 @@ async def run_until_cancelled( # One to poll for cancellation tg.create_task( - _poller_for_task_group(close_poller_event, cancel_callback), + _poller_for_task_group( + close_poller_event, cancel_callback, poll_interval + ), name=f"run_until_cancelled/poller/{coro.__name__}/{id(sentinel)}", ) # The other to run the actual coroutine diff --git a/packages/service-library/src/servicelib/fastapi/cancellation_middleware.py b/packages/service-library/src/servicelib/fastapi/cancellation_middleware.py index 8116869af5db..3d69c72d8ab1 100644 --- a/packages/service-library/src/servicelib/fastapi/cancellation_middleware.py +++ b/packages/service-library/src/servicelib/fastapi/cancellation_middleware.py @@ -1,7 +1,8 @@ import asyncio import logging -from typing import NoReturn +from functools import partial +from servicelib.async_utils import TaskCancelled, run_until_cancelled from starlette.requests import Request from starlette.types import ASGIApp, Message, Receive, Scope, Send @@ -10,29 +11,23 @@ _logger = logging.getLogger(__name__) -class _TerminateTaskGroupError(Exception): - pass - - -async def _message_poller( - request: Request, queue: asyncio.Queue, receive: Receive -) -> NoReturn: - while True: - message = await receive() - if message["type"] == "http.disconnect": - _logger.debug( - "client disconnected, terminating request to %s!", request.url - ) - raise _TerminateTaskGroupError - - # Puts the message in the queue - await queue.put(message) - - async def _handler( app: ASGIApp, scope: Scope, queue: asyncio.Queue[Message], send: Send ) -> None: - return await app(scope, queue.get, send) + await app(scope, queue.get, send) + + +async def _is_client_disconnected( + receive: Receive, queue: asyncio.Queue[Message], request: Request +) -> bool: + message = await receive() + if message["type"] == "http.disconnect": + _logger.debug("client disconnected, terminating request to %s!", request.url) + return True + + # Puts the message in the queue + await queue.put(message) + return False class RequestCancellationMiddleware: @@ -58,23 +53,22 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: return # Let's make a shared queue for the request messages - queue: asyncio.Queue[Message] = asyncio.Queue() request = Request(scope) + queue: asyncio.Queue[Message] = asyncio.Queue() with log_context(_logger, logging.DEBUG, f"cancellable request {request.url}"): try: - async with asyncio.TaskGroup() as tg: - handler_task = tg.create_task( - _handler(self.app, scope, queue, send) - ) - poller_task = tg.create_task( - _message_poller(request, queue, receive) - ) - await handler_task - poller_task.cancel() - except* _TerminateTaskGroupError: - if not handler_task.done(): - _logger.info( - "The client disconnected. request to %s was cancelled.", - request.url, - ) + await run_until_cancelled( + coro=_handler(self.app, scope, queue, send), + cancel_callback=partial( + _is_client_disconnected, receive, queue, request + ), + poll_interval=0.0, + ) + return + + except TaskCancelled: + _logger.info( + "The client disconnected. request to %s was cancelled.", + request.url, + ) From 241a0e9565a626e2b81ebabda0829c16cdf2cc45 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Fri, 4 Jul 2025 23:32:52 +0200 Subject: [PATCH 17/20] improve types --- packages/service-library/src/servicelib/async_utils.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/service-library/src/servicelib/async_utils.py b/packages/service-library/src/servicelib/async_utils.py index 896fa13be214..c8892fba010a 100644 --- a/packages/service-library/src/servicelib/async_utils.py +++ b/packages/service-library/src/servicelib/async_utils.py @@ -7,8 +7,6 @@ from functools import wraps from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar -from pydantic import NonNegativeFloat - from . import tracing from .utils_profiling_middleware import dont_profile, is_profiling, profile_context @@ -225,7 +223,7 @@ class TaskCancelled(Exception): async def _poller_for_task_group( close_event: asyncio.Event, cancel_awaitable: TaskCancelCallback, - poll_interval: NonNegativeFloat, + poll_interval: float, ): """ Polls for cancellation via the callback and raises TaskCancelled if it occurs. @@ -241,7 +239,7 @@ async def run_until_cancelled( *, coro: Coroutine, cancel_callback: TaskCancelCallback, - poll_interval: NonNegativeFloat = _POLL_INTERVAL_S, + poll_interval: float = _POLL_INTERVAL_S, ) -> Any: """ Runs the given coroutine until it completes or cancellation is requested. From 431df54c0d72496e382132a1c7f1ab13adfd6655 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Mon, 7 Jul 2025 06:25:55 +0200 Subject: [PATCH 18/20] fix request cancellation test --- .../tests/fastapi/test_request_decorators.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/packages/service-library/tests/fastapi/test_request_decorators.py b/packages/service-library/tests/fastapi/test_request_decorators.py index 2b271b6219f0..968534c36383 100644 --- a/packages/service-library/tests/fastapi/test_request_decorators.py +++ b/packages/service-library/tests/fastapi/test_request_decorators.py @@ -16,6 +16,7 @@ import pytest import requests from fastapi import FastAPI, Query, Request +from servicelib.async_utils import TaskCancelCallback from servicelib.fastapi.requests_decorators import cancel_on_disconnect POLLER_CLEANUP_DELAY_S = 100.0 @@ -130,11 +131,15 @@ def test_cancel_on_disconnect(get_unused_port: Callable[[], int]): @pytest.fixture def long_running_poller_mock( monkeypatch: pytest.MonkeyPatch, -) -> Callable[[asyncio.Event, Request], Awaitable]: +) -> Callable[[asyncio.Event, TaskCancelCallback, float], Awaitable]: - async def _mock_disconnect_poller(close_event: asyncio.Event, request: Request): + async def _mock_disconnect_poller( + close_event: asyncio.Event, + cancel_awaitable: TaskCancelCallback, + poll_interval: float, + ): _mock_disconnect_poller.called = True - while not await request.is_disconnected(): + while not await cancel_awaitable(): await asyncio.sleep(2) if close_event.is_set(): break @@ -147,7 +152,9 @@ async def _mock_disconnect_poller(close_event: asyncio.Event, request: Request): async def test_decorator_waits_for_poller_cleanup( - long_running_poller_mock: Callable[[asyncio.Event, Request], Awaitable], + long_running_poller_mock: Callable[ + [asyncio.Event, TaskCancelCallback, float], Awaitable + ], ): """ Tests that the decorator's wrapper waits for the poller task to finish From fe5b4ca6b023420fafca37c222deed47b15f1b02 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Mon, 7 Jul 2025 06:43:49 +0200 Subject: [PATCH 19/20] improve naming --- packages/service-library/src/servicelib/async_utils.py | 8 ++++---- .../src/servicelib/fastapi/cancellation_middleware.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/service-library/src/servicelib/async_utils.py b/packages/service-library/src/servicelib/async_utils.py index c8892fba010a..0229495442be 100644 --- a/packages/service-library/src/servicelib/async_utils.py +++ b/packages/service-library/src/servicelib/async_utils.py @@ -223,13 +223,13 @@ class TaskCancelled(Exception): async def _poller_for_task_group( close_event: asyncio.Event, cancel_awaitable: TaskCancelCallback, - poll_interval: float, + poll_interval_s: float, ): """ Polls for cancellation via the callback and raises TaskCancelled if it occurs. """ while not await cancel_awaitable(): - await asyncio.sleep(poll_interval) + await asyncio.sleep(poll_interval_s) if close_event.is_set(): return raise TaskCancelled @@ -239,7 +239,7 @@ async def run_until_cancelled( *, coro: Coroutine, cancel_callback: TaskCancelCallback, - poll_interval: float = _POLL_INTERVAL_S, + poll_interval_s: float = _POLL_INTERVAL_S, ) -> Any: """ Runs the given coroutine until it completes or cancellation is requested. @@ -258,7 +258,7 @@ async def run_until_cancelled( # One to poll for cancellation tg.create_task( _poller_for_task_group( - close_poller_event, cancel_callback, poll_interval + close_poller_event, cancel_callback, poll_interval_s ), name=f"run_until_cancelled/poller/{coro.__name__}/{id(sentinel)}", ) diff --git a/packages/service-library/src/servicelib/fastapi/cancellation_middleware.py b/packages/service-library/src/servicelib/fastapi/cancellation_middleware.py index 3d69c72d8ab1..fe62d6af7930 100644 --- a/packages/service-library/src/servicelib/fastapi/cancellation_middleware.py +++ b/packages/service-library/src/servicelib/fastapi/cancellation_middleware.py @@ -63,7 +63,7 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: cancel_callback=partial( _is_client_disconnected, receive, queue, request ), - poll_interval=0.0, + poll_interval_s=0.0, ) return From 62808a82931eba020f64346e5c38495919499bd0 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Mon, 7 Jul 2025 07:10:00 +0200 Subject: [PATCH 20/20] Revert "improve naming" This reverts commit fe5b4ca6b023420fafca37c222deed47b15f1b02. --- packages/service-library/src/servicelib/async_utils.py | 8 ++++---- .../src/servicelib/fastapi/cancellation_middleware.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/service-library/src/servicelib/async_utils.py b/packages/service-library/src/servicelib/async_utils.py index 0229495442be..c8892fba010a 100644 --- a/packages/service-library/src/servicelib/async_utils.py +++ b/packages/service-library/src/servicelib/async_utils.py @@ -223,13 +223,13 @@ class TaskCancelled(Exception): async def _poller_for_task_group( close_event: asyncio.Event, cancel_awaitable: TaskCancelCallback, - poll_interval_s: float, + poll_interval: float, ): """ Polls for cancellation via the callback and raises TaskCancelled if it occurs. """ while not await cancel_awaitable(): - await asyncio.sleep(poll_interval_s) + await asyncio.sleep(poll_interval) if close_event.is_set(): return raise TaskCancelled @@ -239,7 +239,7 @@ async def run_until_cancelled( *, coro: Coroutine, cancel_callback: TaskCancelCallback, - poll_interval_s: float = _POLL_INTERVAL_S, + poll_interval: float = _POLL_INTERVAL_S, ) -> Any: """ Runs the given coroutine until it completes or cancellation is requested. @@ -258,7 +258,7 @@ async def run_until_cancelled( # One to poll for cancellation tg.create_task( _poller_for_task_group( - close_poller_event, cancel_callback, poll_interval_s + close_poller_event, cancel_callback, poll_interval ), name=f"run_until_cancelled/poller/{coro.__name__}/{id(sentinel)}", ) diff --git a/packages/service-library/src/servicelib/fastapi/cancellation_middleware.py b/packages/service-library/src/servicelib/fastapi/cancellation_middleware.py index fe62d6af7930..3d69c72d8ab1 100644 --- a/packages/service-library/src/servicelib/fastapi/cancellation_middleware.py +++ b/packages/service-library/src/servicelib/fastapi/cancellation_middleware.py @@ -63,7 +63,7 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: cancel_callback=partial( _is_client_disconnected, receive, queue, request ), - poll_interval_s=0.0, + poll_interval=0.0, ) return