diff --git a/changelog.d/10.fixed.md b/changelog.d/10.fixed.md new file mode 100644 index 0000000..4fcb93d --- /dev/null +++ b/changelog.d/10.fixed.md @@ -0,0 +1 @@ +Fix a bug in which the scheduler submit call might fail and job will be lost diff --git a/joblib_htcondor/backend.py b/joblib_htcondor/backend.py index 0514119..91b51d1 100644 --- a/joblib_htcondor/backend.py +++ b/joblib_htcondor/backend.py @@ -903,23 +903,47 @@ def _watcher(self) -> None: logger.log( level=9, msg=f"Submitting job {to_submit}" ) - to_submit.htcondor_submit_result = ( - self._client.submit( - to_submit.htcondor_submit, - count=1, + try: + to_submit.htcondor_submit_result = ( + self._client.submit( + to_submit.htcondor_submit, + count=1, + ) ) - ) + except OSError as e: + # Something went wrong, continue and submit + # this later + logger.error(f"Error submitting job: {e}") + logger.error(traceback.format_exc()) + logger.error("Will try later.") + + # Put the job back in the queue + self._queued_jobs_list.appendleft(to_submit) + + # Delete the pickle file + to_submit.pickle_fname.unlink() + + # Wait a bit before trying again + time.sleep(1) + continue + logger.log(level=9, msg="Getting cluster id.") # Set the cluster id to_submit.cluster_id = ( # type: ignore to_submit.htcondor_submit_result.cluster() ) logger.log(level=9, msg="Job submitted.") - # Update the sent timestamp and cluster id - logger.log( - level=9, msg="Updating task status timestamp." - ) + # Move to waiting jobs + self._waiting_jobs_deque.append(to_submit) + newly_queued += 1 + update_meta = True + if self._export_metadata: + # Update the sent timestamp and cluster id + logger.log( + level=9, + msg="Updating task status timestamp.", + ) self._backend_meta.task_status[ # type: ignore to_submit.task_id - 1 ].sent_timestamp = datetime.now() @@ -932,11 +956,8 @@ def _watcher(self) -> None: to_submit.task_id - 1 ].cluster_id = to_submit.cluster_id - logger.log(level=9, msg="Task status updated") - # Move to waiting jobs - self._waiting_jobs_deque.append(to_submit) - newly_queued += 1 - update_meta = True + logger.log(level=9, msg="Task status updated") + if update_meta and self._export_metadata: self.write_metadata() # logger.debug("Waiting 0.1 seconds") diff --git a/joblib_htcondor/tests/test_backend.py b/joblib_htcondor/tests/test_backend.py index e254ade..fbdd4a2 100644 --- a/joblib_htcondor/tests/test_backend.py +++ b/joblib_htcondor/tests/test_backend.py @@ -37,7 +37,7 @@ def compare_backends(a, b) -> bool: def test_pickle() -> None: """Test pickling of the backend.""" - backend = _HTCondorBackend() + backend = _HTCondorBackend(request_cpus=1, request_memory="2GB") pickled_backend = pickle.loads(pickle.dumps(backend)) assert compare_backends(pickled_backend, backend)