Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split flush_buffers() #2048

Merged
merged 2 commits into from
Jun 2, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 54 additions & 54 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def schedule_tasks(self):
if self.scheduler is None:
self.scheduler = Scheduler(jitter=0.05)
self.scheduler.add_task(1.0, self.flush_buffers)
self.scheduler.add_task(60.0, self.clean_cache)
self.scheduler.add_task(60.0, self.scavenge_dead_tasks)
self.scheduler.add_task(180.0, self.validate_random_run)

def validate_random_run(self):
Expand Down Expand Up @@ -477,70 +479,68 @@ def flush_all(self):
print("done", flush=True)

# For documentation of the cache format see "cache_schema" in schemas.py.

def flush_buffers(self):
try:
self.run_cache_lock.acquire()
now = time.time()
old = now + 1
oldest_entry = None
# We make this a list to be able to change run_cache during iteration
for r_id, cache_entry in list(self.run_cache.items()):
oldest_entry = None
old = float("inf")
with self.run_cache_lock:
for cache_entry in self.run_cache.values():
run = cache_entry["run"]
if not cache_entry["is_changed"]:
if not run["finished"] and (
"last_scavenge_time" not in cache_entry
or cache_entry["last_scavenge_time"] < now - 60
):
cache_entry["last_scavenge_time"] = now
if self.scavenge(run):
with self.run_cache_write_lock:
self.runs.replace_one({"_id": run["_id"]}, run)
# Presently run["finished"] implies run["cores"]==0 but
# this was not always true in the past.
if (run["cores"] <= 0 or run["finished"]) and cache_entry[
"last_access_time"
] < now - 300:
del self.run_cache[r_id]
elif cache_entry["last_sync_time"] < old:
if cache_entry["is_changed"] and cache_entry["last_sync_time"] < old:
old = cache_entry["last_sync_time"]
oldest_entry = cache_entry
if oldest_entry is not None:
oldest_run = oldest_entry["run"]
self.scavenge(oldest_run)
oldest_entry["last_scavenge_time"] = now
oldest_entry["is_changed"] = False
oldest_entry["last_sync_time"] = time.time()
with self.run_cache_write_lock:
self.runs.replace_one({"_id": oldest_run["_id"]}, oldest_run)
except Exception as e:
print(f"Flush exception: {str(e)}", flush=True)
finally:
self.run_cache_lock.release()

def scavenge(self, run):
if datetime.now(timezone.utc) < boot_time + timedelta(seconds=300):
return False
dead_task = False
old = datetime.now(timezone.utc) - timedelta(minutes=6)
task_id = -1
for task in run["tasks"]:
task_id += 1
if task["active"] and task["last_updated"] < old:
self.set_inactive_task(task_id, run)
dead_task = True
print(
"dead task: run: https://tests.stockfishchess.org/tests/view/{} task_id: {} worker: {}".format(
run["_id"], task_id, worker_name(task["worker_info"])
),
flush=True,
)
self.handle_crash_or_time(run, task_id)
self.actiondb.dead_task(
username=task["worker_info"]["username"],
run=run,
task_id=task_id,
)
return dead_task

def clean_cache(self):
now = time.time()
with self.run_cache_lock:
# We make this a list to be able to change run_cache during iteration
for r_id, cache_entry in list(self.run_cache.items()):
run = cache_entry["run"]
# Presently run["finished"] implies run["cores"]==0 but
# this was not always true in the past.
if (
not cache_entry["is_changed"]
and (run["cores"] <= 0 or run["finished"])
and cache_entry["last_access_time"] < now - 300
):
del self.run_cache[r_id]

def scavenge_dead_tasks(self):
now = time.time()
dead_tasks = []
with self.run_cache_lock:
for cache_entry in self.run_cache.values():
run = cache_entry["run"]
if not cache_entry["is_changed"] and not run["finished"]:
for task_id, task in enumerate(run["tasks"]):
if (
task["active"]
and task["last_updated"].timestamp() < now - 360
):
cache_entry["is_changed"] = True
ppigazzini marked this conversation as resolved.
Show resolved Hide resolved
dead_tasks.append((task_id, run))
# We release the lock to avoid deadlock
for task_id, run in dead_tasks:
task = run["tasks"][task_id]
print(
"dead task: run: https://tests.stockfishchess.org/tests/view/{} task_id: {} worker: {}".format(
run["_id"], task_id, worker_name(task["worker_info"])
),
flush=True,
)
self.handle_crash_or_time(run, task_id)
self.actiondb.dead_task(
username=task["worker_info"]["username"],
run=run,
task_id=task_id,
)
self.set_inactive_task(task_id, run)

def get_unfinished_runs_id(self):
with self.run_cache_write_lock:
Expand Down
Loading