From c59440fddf441cce01d0c0352f189e9be1a488ab Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 3 Oct 2024 01:38:16 -0700 Subject: [PATCH] Fix and reenable keyboard interrupt test --- ci/test_common.sh | 8 ++++---- python/distributed-ucxx/distributed_ucxx/ucxx.py | 3 +++ python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py | 4 ++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/ci/test_common.sh b/ci/test_common.sh index d7603f95..df9e7aff 100755 --- a/ci/test_common.sh +++ b/ci/test_common.sh @@ -202,10 +202,10 @@ run_distributed_ucxx_tests() { ENABLE_DELAYED_SUBMISSION=$2 ENABLE_PYTHON_FUTURE=$3 - CMD_LINE="UCXPY_PROGRESS_MODE=${PROGRESS_MODE} UCXPY_ENABLE_DELAYED_SUBMISSION=${ENABLE_DELAYED_SUBMISSION} UCXPY_ENABLE_PYTHON_FUTURE=${ENABLE_PYTHON_FUTURE} timeout 10m python -m pytest -vs python/distributed-ucxx/distributed_ucxx/tests/ -k 'test_nanny_closed_by_keyboard_interrupt'" + CMD_LINE="UCXPY_PROGRESS_MODE=${PROGRESS_MODE} UCXPY_ENABLE_DELAYED_SUBMISSION=${ENABLE_DELAYED_SUBMISSION} UCXPY_ENABLE_PYTHON_FUTURE=${ENABLE_PYTHON_FUTURE} timeout 10m python -m pytest -vs python/distributed-ucxx/distributed_ucxx/tests/ log_command "${CMD_LINE}" - UCXPY_PROGRESS_MODE=${PROGRESS_MODE} UCXPY_ENABLE_DELAYED_SUBMISSION=${ENABLE_DELAYED_SUBMISSION} UCXPY_ENABLE_PYTHON_FUTURE=${ENABLE_PYTHON_FUTURE} timeout 10m python -m pytest -vs python/distributed-ucxx/distributed_ucxx/tests/ -k 'not test_nanny_closed_by_keyboard_interrupt' + UCXPY_PROGRESS_MODE=${PROGRESS_MODE} UCXPY_ENABLE_DELAYED_SUBMISSION=${ENABLE_DELAYED_SUBMISSION} UCXPY_ENABLE_PYTHON_FUTURE=${ENABLE_PYTHON_FUTURE} timeout 10m python -m pytest -vs python/distributed-ucxx/distributed_ucxx/tests/ } run_distributed_ucxx_tests_internal() { @@ -215,8 +215,8 @@ run_distributed_ucxx_tests_internal() { ENABLE_DELAYED_SUBMISSION=$2 ENABLE_PYTHON_FUTURE=$3 - CMD_LINE="UCXPY_PROGRESS_MODE=${PROGRESS_MODE} UCXPY_ENABLE_DELAYED_SUBMISSION=${ENABLE_DELAYED_SUBMISSION} UCXPY_ENABLE_PYTHON_FUTURE=${ENABLE_PYTHON_FUTURE} timeout 10m python -m pytest -vs python/distributed-ucxx/distributed_ucxx/tests_internal/ -k 'not test_nanny_closed_by_keyboard_interrupt'" + CMD_LINE="UCXPY_PROGRESS_MODE=${PROGRESS_MODE} UCXPY_ENABLE_DELAYED_SUBMISSION=${ENABLE_DELAYED_SUBMISSION} UCXPY_ENABLE_PYTHON_FUTURE=${ENABLE_PYTHON_FUTURE} timeout 10m python -m pytest -vs python/distributed-ucxx/distributed_ucxx/tests_internal/ log_command "${CMD_LINE}" - UCXPY_PROGRESS_MODE=${PROGRESS_MODE} UCXPY_ENABLE_DELAYED_SUBMISSION=${ENABLE_DELAYED_SUBMISSION} UCXPY_ENABLE_PYTHON_FUTURE=${ENABLE_PYTHON_FUTURE} timeout 10m python -m pytest -vs python/distributed-ucxx/distributed_ucxx/tests_internal/ -k 'not test_nanny_closed_by_keyboard_interrupt' + UCXPY_PROGRESS_MODE=${PROGRESS_MODE} UCXPY_ENABLE_DELAYED_SUBMISSION=${ENABLE_DELAYED_SUBMISSION} UCXPY_ENABLE_PYTHON_FUTURE=${ENABLE_PYTHON_FUTURE} timeout 10m python -m pytest -vs python/distributed-ucxx/distributed_ucxx/tests_internal/ } diff --git a/python/distributed-ucxx/distributed_ucxx/ucxx.py b/python/distributed-ucxx/distributed_ucxx/ucxx.py index 3b6ca89e..0fb14b8e 100644 --- a/python/distributed-ucxx/distributed_ucxx/ucxx.py +++ b/python/distributed-ucxx/distributed_ucxx/ucxx.py @@ -671,10 +671,13 @@ async def serve_forever(client_ep): await self.comm_handler(ucx) init_once() + _register_dask_resource(hash(self)) + weakref.finalize(self, _deregister_dask_resource, hash(self)) self.ucxx_server = ucxx.create_listener(serve_forever, port=self._input_port) def stop(self): self.ucxx_server = None + _deregister_dask_resource(hash(self)) def get_host_port(self): # TODO: TCP raises if this hasn't started yet. diff --git a/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py b/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py index 5671d0c8..9763e222 100644 --- a/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py +++ b/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py @@ -51,8 +51,8 @@ def __init__(self, worker, event_loop, polling_mode=False): worker.set_progress_thread_start_callback(_create_context) worker.start_progress_thread(polling_mode=polling_mode, epoll_timeout=1) - # def __del__(self): - # self.worker.stop_progress_thread() + def __del__(self): + self.worker.stop_progress_thread() class PollingMode(ProgressTask):