From 8736dee19cc26e871565a63c45bb1040bf0a1472 Mon Sep 17 00:00:00 2001 From: Michel Van den Bergh Date: Mon, 12 Aug 2024 08:12:09 +0000 Subject: [PATCH] Faster worker stop on terminated task. Instead of always waiting for the next update_task, we allow the heart beat api to stop handling a task in the worker. --- server/fishtest/api.py | 2 +- server/tests/test_api.py | 2 +- worker/games.py | 12 ++++++++---- worker/sri.txt | 2 +- worker/worker.py | 4 ++++ 5 files changed, 15 insertions(+), 7 deletions(-) diff --git a/server/fishtest/api.py b/server/fishtest/api.py index 22755e5fa..f0af700a8 100644 --- a/server/fishtest/api.py +++ b/server/fishtest/api.py @@ -316,7 +316,7 @@ def beat(self): task = self.task() task["last_updated"] = datetime.now(timezone.utc) self.request.rundb.buffer(run, False) - return self.add_time({}) + return self.add_time({"task_alive": task["active"]}) @view_config(route_name="api_request_spsa") def request_spsa(self): diff --git a/server/tests/test_api.py b/server/tests/test_api.py index fb017fb2f..c324fee79 100644 --- a/server/tests/test_api.py +++ b/server/tests/test_api.py @@ -484,7 +484,7 @@ def test_beat(self): request = self.correct_password_request({"run_id": run_id, "task_id": 0}) response = WorkerApi(request).beat() response.pop("duration", None) - self.assertEqual(response, {}) + self.assertEqual(response, {"task_alive": True}) class TestRunFinished(unittest.TestCase): diff --git a/worker/games.py b/worker/games.py index 64463a3c5..6a4fd3b36 100644 --- a/worker/games.py +++ b/worker/games.py @@ -961,6 +961,9 @@ def results_to_score(results): def parse_cutechess_output( p, current_state, remote, result, spsa_tuning, games_to_play, batch_size, tc_limit ): + finished_task_message = ( + "The server told us that no more games are needed for the current task." + ) hash_pattern = re.compile(r"(Base|New)-[a-f0-9]+") def shorten_hash(match): @@ -981,6 +984,10 @@ def shorten_hash(match): num_games_updated = 0 while datetime.now(timezone.utc) < end_time: + if current_state["task_id"] is None: + # This task is no longer necessary + print(finished_task_message) + return False try: line = q.get_nowait().strip() except Empty: @@ -1097,10 +1104,7 @@ def shorten_hash(match): else: if not response["task_alive"]: # This task is no longer necessary - print( - "The server told us that no more games" - " are needed for the current task." - ) + print(finished_task_message) return False update_succeeded = True num_games_updated = num_games_finished diff --git a/worker/sri.txt b/worker/sri.txt index ee2cad5b6..c2d7a5a31 100644 --- a/worker/sri.txt +++ b/worker/sri.txt @@ -1 +1 @@ -{"__version": 241, "updater.py": "Mg+pWOgGA0gSo2TuXuuLCWLzwGwH91rsW1W3ixg3jYauHQpRMtNdGnCfuD1GqOhV", "worker.py": "BMuQUpxZAKF0aP6ByTZY1r06MfPoIbdG2xraTrDQQRKgvhzJo6CKmeX2P8vX/QDm", "games.py": "9dFaa914vpqT7q4LLx2LlDdYwK6QFVX3h7+XRt18ATX0lt737rvFeBIiqakkttNC"} +{"__version": 241, "updater.py": "Mg+pWOgGA0gSo2TuXuuLCWLzwGwH91rsW1W3ixg3jYauHQpRMtNdGnCfuD1GqOhV", "worker.py": "0tqGx4eOPC34V6ByfwNwT8lkHpTjjs5EygX5wobSQI4vbhL5AH7t+q6jb4BOi52Q", "games.py": "e9OsVWGcgVoxQos7rBZgyy/ttjd99PYrdKqTq+hQksvvzA2ydi4Fnk36OvixV4fE"} diff --git a/worker/worker.py b/worker/worker.py index bfc40c95d..098398c36 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -1226,6 +1226,10 @@ def heartbeat(worker_info, password, remote, current_state): else: if "error" not in req: print("(received)") + task_alive = req.get("task_alive", True) + if not task_alive: + current_state["task_id"] = None + current_state["run"] = None else: print("Heartbeat stopped")