Skip to content

Commit

Permalink
Merge pull request #10 from juaml/fix/schedd_connect_failed
Browse files Browse the repository at this point in the history
Consider errors from htcondor scheduler
  • Loading branch information
fraimondo authored Oct 28, 2024
2 parents 1bd253c + 5879835 commit 134f644
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 15 deletions.
1 change: 1 addition & 0 deletions changelog.d/10.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug in which the scheduler submit call might fail and job will be lost
49 changes: 35 additions & 14 deletions joblib_htcondor/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -903,23 +903,47 @@ def _watcher(self) -> None:
logger.log(
level=9, msg=f"Submitting job {to_submit}"
)
to_submit.htcondor_submit_result = (
self._client.submit(
to_submit.htcondor_submit,
count=1,
try:
to_submit.htcondor_submit_result = (
self._client.submit(
to_submit.htcondor_submit,
count=1,
)
)
)
except OSError as e:
# Something went wrong, continue and submit
# this later
logger.error(f"Error submitting job: {e}")
logger.error(traceback.format_exc())
logger.error("Will try later.")

# Put the job back in the queue
self._queued_jobs_list.appendleft(to_submit)

# Delete the pickle file
to_submit.pickle_fname.unlink()

# Wait a bit before trying again
time.sleep(1)
continue

logger.log(level=9, msg="Getting cluster id.")
# Set the cluster id
to_submit.cluster_id = ( # type: ignore
to_submit.htcondor_submit_result.cluster()
)
logger.log(level=9, msg="Job submitted.")
# Update the sent timestamp and cluster id
logger.log(
level=9, msg="Updating task status timestamp."
)
# Move to waiting jobs
self._waiting_jobs_deque.append(to_submit)
newly_queued += 1
update_meta = True

if self._export_metadata:
# Update the sent timestamp and cluster id
logger.log(
level=9,
msg="Updating task status timestamp.",
)
self._backend_meta.task_status[ # type: ignore
to_submit.task_id - 1
].sent_timestamp = datetime.now()
Expand All @@ -932,11 +956,8 @@ def _watcher(self) -> None:
to_submit.task_id - 1
].cluster_id = to_submit.cluster_id

logger.log(level=9, msg="Task status updated")
# Move to waiting jobs
self._waiting_jobs_deque.append(to_submit)
newly_queued += 1
update_meta = True
logger.log(level=9, msg="Task status updated")

if update_meta and self._export_metadata:
self.write_metadata()
# logger.debug("Waiting 0.1 seconds")
Expand Down
2 changes: 1 addition & 1 deletion joblib_htcondor/tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def compare_backends(a, b) -> bool:

def test_pickle() -> None:
"""Test pickling of the backend."""
backend = _HTCondorBackend()
backend = _HTCondorBackend(request_cpus=1, request_memory="2GB")
pickled_backend = pickle.loads(pickle.dumps(backend))
assert compare_backends(pickled_backend, backend)

Expand Down

0 comments on commit 134f644

Please sign in to comment.