diff --git a/worker/games.py b/worker/games.py index 6a85ff8c8..1caa1ed1b 100644 --- a/worker/games.py +++ b/worker/games.py @@ -902,7 +902,7 @@ def results_to_score(results): def parse_cutechess_output( - p, remote, result, spsa_tuning, games_to_play, batch_size, tc_limit + p, current_state, remote, result, spsa_tuning, games_to_play, batch_size, tc_limit ): hash_pattern = re.compile(r"(Base|New)-[a-f0-9]+") @@ -1051,6 +1051,8 @@ def shorten_hash(match): time.sleep(UPDATE_RETRY_TIME) if not update_succeeded: raise WorkerException("Too many failed update attempts") + else: + current_state["last_updated"] = datetime.now(timezone.utc) # Act on line like this: # Finished game 4 (Base-SHA vs New-SHA): 1/2-1/2 {Draw by adjudication} @@ -1065,7 +1067,7 @@ def shorten_hash(match): def launch_cutechess( - cmd, remote, result, spsa_tuning, games_to_play, batch_size, tc_limit + cmd, current_state, remote, result, spsa_tuning, games_to_play, batch_size, tc_limit ): if spsa_tuning: # Request parameters for next game. @@ -1144,6 +1146,7 @@ def launch_cutechess( try: task_alive = parse_cutechess_output( p, + current_state, remote, result, spsa_tuning, @@ -1178,7 +1181,9 @@ def launch_cutechess( return task_alive -def run_games(worker_info, password, remote, run, task_id, pgn_file, clear_binaries): +def run_games( + worker_info, current_state, password, remote, run, task_id, pgn_file, clear_binaries +): # This is the main cutechess-cli driver. # It is ok, and even expected, for this function to # raise exceptions, implicitly or explicitly, if a @@ -1548,6 +1553,7 @@ def make_player(arg): task_alive = launch_cutechess( cmd, + current_state, remote, result, spsa_tuning, diff --git a/worker/sri.txt b/worker/sri.txt index f7141f551..aa4d9791d 100644 --- a/worker/sri.txt +++ b/worker/sri.txt @@ -1 +1 @@ -{"__version": 237, "updater.py": "Mg+pWOgGA0gSo2TuXuuLCWLzwGwH91rsW1W3ixg3jYauHQpRMtNdGnCfuD1GqOhV", "worker.py": "SZXQEuoQG97IqizDgOrsEuzOl2P5hGbkAqpCaU1tiIp86pz2tq8xevf9T3Ei89zn", "games.py": "KqvcMhLOyArHNTWpD6QAx+IFdZwg8aTRcvGf7eN5kn31V5a+G9Y7137hnrtFGy3K"} +{"__version": 237, "updater.py": "Mg+pWOgGA0gSo2TuXuuLCWLzwGwH91rsW1W3ixg3jYauHQpRMtNdGnCfuD1GqOhV", "worker.py": "Yv0ObjNdzzJStMOg5VEKpm6a2+6nodXZeyJqoiZu0cDZIy8OCtMunyqUmT9z0V/9", "games.py": "6vKH51UtL56oNvA539hLXRzgE1ADXy3QZNJohoK94RntM72+iMancSJZHaNjEb5+"} diff --git a/worker/worker.py b/worker/worker.py index f64ee7c2c..33d543fb5 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -23,7 +23,7 @@ from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser from configparser import ConfigParser from contextlib import ExitStack -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from functools import partial from pathlib import Path @@ -1171,13 +1171,12 @@ def heartbeat(worker_info, password, remote, current_state): "password": password, "worker_info": worker_info, } - count = 0 while current_state["alive"]: time.sleep(1) - count += 1 - if count == 120: - count = 0 + now = datetime.now(timezone.utc) + if current_state["last_updated"] + timedelta(seconds=120) < now: print(" Send heartbeat for", worker_info["unique_key"], end=" ... ") + current_state["last_updated"] = now run = current_state["run"] payload["run_id"] = str(run["_id"]) if run else None task_id = current_state["task_id"] @@ -1419,7 +1418,16 @@ def fetch_and_handle_task( api = remote + "/api/failed_task" pgn_file = [None] try: - run_games(worker_info, password, remote, run, task_id, pgn_file, clear_binaries) + run_games( + worker_info, + current_state, + password, + remote, + run, + task_id, + pgn_file, + clear_binaries, + ) success = True except FatalException as e: message = str(e) @@ -1528,8 +1536,10 @@ def worker(): current_state = { "run": None, # the current run "task_id": None, # the id of the current task - "alive": True, # controls the main loop and - # the heartbeat loop + "alive": True, # controls the main and heartbeat loop + "last_updated": datetime.now( + timezone.utc + ), # tracks the last update to the server } # Install signal handlers.