Skip to content

Commit

Permalink
Merge pull request #9 from juaml/fix/race_condition
Browse files Browse the repository at this point in the history
Handle exception on timestamp parsing from .run file
  • Loading branch information
fraimondo authored Oct 25, 2024
2 parents 488214a + 04d3ce6 commit 1bd253c
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 60 deletions.
1 change: 1 addition & 0 deletions changelog.d/9.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug in which the watcher thread might crash due to an empty `.run` file
135 changes: 75 additions & 60 deletions joblib_htcondor/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import signal
import sys
import time
import traceback
from collections import deque
from concurrent.futures import Future, ThreadPoolExecutor
from dataclasses import dataclass, field
Expand Down Expand Up @@ -145,6 +146,10 @@ def update_run_from_file(self, run_fname: Path) -> bool:
logger.warning(
"Error reading run timestamp from " f"{run_fname}: {e}"
)
except ValueError as e:
logger.warning(
"Error parsing run timestamp from " f"{run_fname}: {e}"
)
return out

@classmethod
Expand Down Expand Up @@ -867,68 +872,78 @@ def _watcher(self) -> None:
logger.info(f"Polling every {self._poll_interval} seconds.")
logger.info(f"Throttle set to {throttle}.")
while self._continue:
if (time.time() - last_poll) > self._poll_interval:
# Enough time passed, poll the jobs
n_running, update_meta = self._poll_jobs()
last_poll = time.time()
if n_running < throttle:
# We don't have enough jobs int he condor queue, submit
newly_queued = 0
max_to_queue = throttle - n_running
# First check if there are any queued jobs to be submitted
while (
self._queued_jobs_list and newly_queued < max_to_queue
):
to_submit = self._queued_jobs_list.popleft()
logger.log(
level=9, msg=f"Dumping pickle file {to_submit}"
)
# Dump pickle file
dumped = to_submit.delayed_submission.dump(
to_submit.pickle_fname
)
if not dumped:
# Something went wrong, continue and submit this
# later
logger.debug("Could not dump pickle file.")
continue
# Submit job
logger.log(level=9, msg=f"Submitting job {to_submit}")
to_submit.htcondor_submit_result = self._client.submit(
to_submit.htcondor_submit,
count=1,
)
logger.log(level=9, msg="Getting cluster id.")
# Set the cluster id
to_submit.cluster_id = ( # type: ignore
to_submit.htcondor_submit_result.cluster()
)
logger.log(level=9, msg="Job submitted.")
# Update the sent timestamp and cluster id
logger.log(
level=9, msg="Updating task status timestamp."
)
if self._export_metadata:
self._backend_meta.task_status[ # type: ignore
to_submit.task_id - 1
].sent_timestamp = datetime.now()

try:
if (time.time() - last_poll) > self._poll_interval:
# Enough time passed, poll the jobs
n_running, update_meta = self._poll_jobs()
last_poll = time.time()
if n_running < throttle:
# We don't have enough jobs int he condor queue, submit
newly_queued = 0
max_to_queue = throttle - n_running
# 1) check if there are any queued jobs to be submitted
while (
self._queued_jobs_list
and newly_queued < max_to_queue
):
to_submit = self._queued_jobs_list.popleft()
logger.log(
level=9, msg="Updating task status cluster id."
level=9, msg=f"Dumping pickle file {to_submit}"
)
self._backend_meta.task_status[ # type: ignore
to_submit.task_id - 1
].cluster_id = to_submit.cluster_id

logger.log(level=9, msg="Task status updated")
# Move to waiting jobs
self._waiting_jobs_deque.append(to_submit)
newly_queued += 1
update_meta = True
if update_meta and self._export_metadata:
self.write_metadata()
# logger.debug("Waiting 0.1 seconds")
time.sleep(0.1)
# Dump pickle file
dumped = to_submit.delayed_submission.dump(
to_submit.pickle_fname
)
if not dumped:
# Something went wrong
# continue and submit this later
logger.debug("Could not dump pickle file.")
continue
# Submit job
logger.log(
level=9, msg=f"Submitting job {to_submit}"
)
to_submit.htcondor_submit_result = (
self._client.submit(
to_submit.htcondor_submit,
count=1,
)
)
logger.log(level=9, msg="Getting cluster id.")
# Set the cluster id
to_submit.cluster_id = ( # type: ignore
to_submit.htcondor_submit_result.cluster()
)
logger.log(level=9, msg="Job submitted.")
# Update the sent timestamp and cluster id
logger.log(
level=9, msg="Updating task status timestamp."
)
if self._export_metadata:
self._backend_meta.task_status[ # type: ignore
to_submit.task_id - 1
].sent_timestamp = datetime.now()

logger.log(
level=9,
msg="Updating task status cluster id.",
)
self._backend_meta.task_status[ # type: ignore
to_submit.task_id - 1
].cluster_id = to_submit.cluster_id

logger.log(level=9, msg="Task status updated")
# Move to waiting jobs
self._waiting_jobs_deque.append(to_submit)
newly_queued += 1
update_meta = True
if update_meta and self._export_metadata:
self.write_metadata()
# logger.debug("Waiting 0.1 seconds")
time.sleep(0.1)
except Exception as e: # noqa: BLE001
logger.error(f"Error in HTCondor backend watcher: {e}")
logger.error(traceback.format_exc())

def _poll_jobs(self) -> tuple[int, bool]: # noqa: C901
"""Poll the schedd for job status.
Expand Down

0 comments on commit 1bd253c

Please sign in to comment.