Skip to content

Commit

Permalink
Faster worker stop on terminated task.
Browse files Browse the repository at this point in the history
Instead of always waiting for the next update_task, we allow
the heart beat api to stop handling a task in the worker.
  • Loading branch information
vdbergh committed Aug 14, 2024
1 parent 12981ff commit 8736dee
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 7 deletions.
2 changes: 1 addition & 1 deletion server/fishtest/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def beat(self):
task = self.task()
task["last_updated"] = datetime.now(timezone.utc)
self.request.rundb.buffer(run, False)
return self.add_time({})
return self.add_time({"task_alive": task["active"]})

@view_config(route_name="api_request_spsa")
def request_spsa(self):
Expand Down
2 changes: 1 addition & 1 deletion server/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ def test_beat(self):
request = self.correct_password_request({"run_id": run_id, "task_id": 0})
response = WorkerApi(request).beat()
response.pop("duration", None)
self.assertEqual(response, {})
self.assertEqual(response, {"task_alive": True})


class TestRunFinished(unittest.TestCase):
Expand Down
12 changes: 8 additions & 4 deletions worker/games.py
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,9 @@ def results_to_score(results):
def parse_cutechess_output(
p, current_state, remote, result, spsa_tuning, games_to_play, batch_size, tc_limit
):
finished_task_message = (
"The server told us that no more games are needed for the current task."
)
hash_pattern = re.compile(r"(Base|New)-[a-f0-9]+")

def shorten_hash(match):
Expand All @@ -981,6 +984,10 @@ def shorten_hash(match):

num_games_updated = 0
while datetime.now(timezone.utc) < end_time:
if current_state["task_id"] is None:
# This task is no longer necessary
print(finished_task_message)
return False
try:
line = q.get_nowait().strip()
except Empty:
Expand Down Expand Up @@ -1097,10 +1104,7 @@ def shorten_hash(match):
else:
if not response["task_alive"]:
# This task is no longer necessary
print(
"The server told us that no more games"
" are needed for the current task."
)
print(finished_task_message)
return False
update_succeeded = True
num_games_updated = num_games_finished
Expand Down
2 changes: 1 addition & 1 deletion worker/sri.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"__version": 241, "updater.py": "Mg+pWOgGA0gSo2TuXuuLCWLzwGwH91rsW1W3ixg3jYauHQpRMtNdGnCfuD1GqOhV", "worker.py": "BMuQUpxZAKF0aP6ByTZY1r06MfPoIbdG2xraTrDQQRKgvhzJo6CKmeX2P8vX/QDm", "games.py": "9dFaa914vpqT7q4LLx2LlDdYwK6QFVX3h7+XRt18ATX0lt737rvFeBIiqakkttNC"}
{"__version": 241, "updater.py": "Mg+pWOgGA0gSo2TuXuuLCWLzwGwH91rsW1W3ixg3jYauHQpRMtNdGnCfuD1GqOhV", "worker.py": "0tqGx4eOPC34V6ByfwNwT8lkHpTjjs5EygX5wobSQI4vbhL5AH7t+q6jb4BOi52Q", "games.py": "e9OsVWGcgVoxQos7rBZgyy/ttjd99PYrdKqTq+hQksvvzA2ydi4Fnk36OvixV4fE"}
4 changes: 4 additions & 0 deletions worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,10 @@ def heartbeat(worker_info, password, remote, current_state):
else:
if "error" not in req:
print("(received)")
task_alive = req.get("task_alive", True)
if not task_alive:
current_state["task_id"] = None
current_state["run"] = None
else:
print("Heartbeat stopped")

Expand Down

0 comments on commit 8736dee

Please sign in to comment.