Skip to content

Commit

Permalink
Do not flag a worker as a duplicate of itself.
Browse files Browse the repository at this point in the history
It may happen that a worker fails to update a task because
of server issues. It will retry 5 times and then ask for a new
task. But the server will not have marked the original task
as inactive. So before this PR the worker will be flagged as
a duplicate of itself and will be denied a new task
for some time.

In this PR we simply mark the original task as inactive
and continue.
  • Loading branch information
vdbergh authored and ppigazzini committed May 4, 2024
1 parent 2737c82 commit b8469b3
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
14 changes: 12 additions & 2 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,11 +866,21 @@ def priority(run): # lower is better
for task in active_tasks:
task_name = worker_name(task["worker_info"], short=True)
if my_name == task_name:
task_name_long = worker_name(task["worker_info"])
my_name_long = worker_name(worker_info)
task_unique_key = task["worker_info"]["unique_key"]
if unique_key == task_unique_key:
# It seems that this worker was unable to update an old task
# (perhaps because of server issues).
print(
f'Stale active task detected for worker "{my_name_long}". Correcting...',
flush=True,
)
task["active"] = False
continue
last_update = (now - task["last_updated"]).seconds
# 120 = period of heartbeat in worker.
if last_update <= 120:
task_name_long = worker_name(task["worker_info"])
my_name_long = worker_name(worker_info)
error = (
f'Request_task: There is already a worker running with name "{task_name_long}" '
f'which sent an update {last_update} seconds ago (my name is "{my_name_long}")'
Expand Down
3 changes: 2 additions & 1 deletion server/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,8 @@ def test_duplicate_workers(self):
# Request task 2 of 2
request = self.correct_password_request()
response = ApiView(request).request_task()
self.assertTrue("error" in response)
self.assertFalse("error" in response)
# TODO Add test for a different worker connecting

def test_auto_purge_runs(self):
stop_all_runs(self)
Expand Down

0 comments on commit b8469b3

Please sign in to comment.