From e141295b3c4e95443a15825a1bd73f25acf89f91 Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Fri, 11 Oct 2019 09:22:41 +0200 Subject: [PATCH] Fix exception reporting in run_parallel() (#118) Fix run_parallel --- poucave/utils.py | 15 +++++++++++---- tests/test_utils.py | 15 ++++++++++++++- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/poucave/utils.py b/poucave/utils.py index 0089c16d..22bb1210 100644 --- a/poucave/utils.py +++ b/poucave/utils.py @@ -95,9 +95,11 @@ async def run_parallel(*futures): async def worker(results_by_index, queue): while True: i, future = await queue.get() - result = await future - results_by_index[i] = result - queue.task_done() + try: + result = await future + results_by_index[i] = result + finally: + queue.task_done() # Pre-allocate a list of results. results_by_index = {} @@ -119,7 +121,12 @@ async def worker(results_by_index, queue): # Stop workers and wait until done. for task in worker_tasks: task.cancel() - await asyncio.gather(*worker_tasks, return_exceptions=True) + + # If some errors happened in the workers, re-raise here. + errors = await asyncio.gather(*worker_tasks, return_exceptions=True) + real_errors = [e for e in errors if not isinstance(e, asyncio.CancelledError)] + if len(real_errors) > 0: + raise real_errors[0] return [results_by_index[k] for k in sorted(results_by_index.keys())] diff --git a/tests/test_utils.py b/tests/test_utils.py index 7ac84a27..8ed4304c 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,4 +1,6 @@ -from poucave.utils import Cache, fetch_redash +import pytest + +from poucave.utils import Cache, fetch_redash, run_parallel def test_cache_set_get(): @@ -29,3 +31,14 @@ async def test_fetch_redash(mock_aioresponses): rows = await fetch_redash(query_id=64921, api_key="abc") assert rows == [row] + + +async def test_run_parallel(): + async def success(): + return 42 + + async def failure(): + raise ValueError() + + with pytest.raises(ValueError): + await run_parallel(success(), failure(), success())