diff --git a/changelog.d/9.fixed.md b/changelog.d/9.fixed.md new file mode 100644 index 0000000..00b69e9 --- /dev/null +++ b/changelog.d/9.fixed.md @@ -0,0 +1 @@ +Fix a bug in which the watcher thread might crash due to an empty `.run` file diff --git a/joblib_htcondor/backend.py b/joblib_htcondor/backend.py index 479e7ff..0514119 100644 --- a/joblib_htcondor/backend.py +++ b/joblib_htcondor/backend.py @@ -10,6 +10,7 @@ import signal import sys import time +import traceback from collections import deque from concurrent.futures import Future, ThreadPoolExecutor from dataclasses import dataclass, field @@ -145,6 +146,10 @@ def update_run_from_file(self, run_fname: Path) -> bool: logger.warning( "Error reading run timestamp from " f"{run_fname}: {e}" ) + except ValueError as e: + logger.warning( + "Error parsing run timestamp from " f"{run_fname}: {e}" + ) return out @classmethod @@ -867,68 +872,78 @@ def _watcher(self) -> None: logger.info(f"Polling every {self._poll_interval} seconds.") logger.info(f"Throttle set to {throttle}.") while self._continue: - if (time.time() - last_poll) > self._poll_interval: - # Enough time passed, poll the jobs - n_running, update_meta = self._poll_jobs() - last_poll = time.time() - if n_running < throttle: - # We don't have enough jobs int he condor queue, submit - newly_queued = 0 - max_to_queue = throttle - n_running - # First check if there are any queued jobs to be submitted - while ( - self._queued_jobs_list and newly_queued < max_to_queue - ): - to_submit = self._queued_jobs_list.popleft() - logger.log( - level=9, msg=f"Dumping pickle file {to_submit}" - ) - # Dump pickle file - dumped = to_submit.delayed_submission.dump( - to_submit.pickle_fname - ) - if not dumped: - # Something went wrong, continue and submit this - # later - logger.debug("Could not dump pickle file.") - continue - # Submit job - logger.log(level=9, msg=f"Submitting job {to_submit}") - to_submit.htcondor_submit_result = self._client.submit( - to_submit.htcondor_submit, - count=1, - ) - logger.log(level=9, msg="Getting cluster id.") - # Set the cluster id - to_submit.cluster_id = ( # type: ignore - to_submit.htcondor_submit_result.cluster() - ) - logger.log(level=9, msg="Job submitted.") - # Update the sent timestamp and cluster id - logger.log( - level=9, msg="Updating task status timestamp." - ) - if self._export_metadata: - self._backend_meta.task_status[ # type: ignore - to_submit.task_id - 1 - ].sent_timestamp = datetime.now() - + try: + if (time.time() - last_poll) > self._poll_interval: + # Enough time passed, poll the jobs + n_running, update_meta = self._poll_jobs() + last_poll = time.time() + if n_running < throttle: + # We don't have enough jobs int he condor queue, submit + newly_queued = 0 + max_to_queue = throttle - n_running + # 1) check if there are any queued jobs to be submitted + while ( + self._queued_jobs_list + and newly_queued < max_to_queue + ): + to_submit = self._queued_jobs_list.popleft() logger.log( - level=9, msg="Updating task status cluster id." + level=9, msg=f"Dumping pickle file {to_submit}" ) - self._backend_meta.task_status[ # type: ignore - to_submit.task_id - 1 - ].cluster_id = to_submit.cluster_id - - logger.log(level=9, msg="Task status updated") - # Move to waiting jobs - self._waiting_jobs_deque.append(to_submit) - newly_queued += 1 - update_meta = True - if update_meta and self._export_metadata: - self.write_metadata() - # logger.debug("Waiting 0.1 seconds") - time.sleep(0.1) + # Dump pickle file + dumped = to_submit.delayed_submission.dump( + to_submit.pickle_fname + ) + if not dumped: + # Something went wrong + # continue and submit this later + logger.debug("Could not dump pickle file.") + continue + # Submit job + logger.log( + level=9, msg=f"Submitting job {to_submit}" + ) + to_submit.htcondor_submit_result = ( + self._client.submit( + to_submit.htcondor_submit, + count=1, + ) + ) + logger.log(level=9, msg="Getting cluster id.") + # Set the cluster id + to_submit.cluster_id = ( # type: ignore + to_submit.htcondor_submit_result.cluster() + ) + logger.log(level=9, msg="Job submitted.") + # Update the sent timestamp and cluster id + logger.log( + level=9, msg="Updating task status timestamp." + ) + if self._export_metadata: + self._backend_meta.task_status[ # type: ignore + to_submit.task_id - 1 + ].sent_timestamp = datetime.now() + + logger.log( + level=9, + msg="Updating task status cluster id.", + ) + self._backend_meta.task_status[ # type: ignore + to_submit.task_id - 1 + ].cluster_id = to_submit.cluster_id + + logger.log(level=9, msg="Task status updated") + # Move to waiting jobs + self._waiting_jobs_deque.append(to_submit) + newly_queued += 1 + update_meta = True + if update_meta and self._export_metadata: + self.write_metadata() + # logger.debug("Waiting 0.1 seconds") + time.sleep(0.1) + except Exception as e: # noqa: BLE001 + logger.error(f"Error in HTCondor backend watcher: {e}") + logger.error(traceback.format_exc()) def _poll_jobs(self) -> tuple[int, bool]: # noqa: C901 """Poll the schedd for job status.