Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 224 additions & 0 deletions src/psij/executors/batch/batch_scheduler_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,50 @@ def cancel(self, job: Job) -> None:
# re-raise
raise

def hold(self, job: Job) -> None:
"""Holds a job if it has not otherwise executed.

A command is constructed using :func:`~get_hold_command` and executed in order to hold
the job. Also see :func:`~psij.JobExecutor.hold`.
"""
if job.native_id is None:
raise SubmitException('Job does not have a native ID.')
if job.status.state.final:
return
try:
self._run_command(self.get_hold_command(job.native_id))
except subprocess.CalledProcessError as ex:
try:
self.process_hold_command_output(ex.returncode, ex.output)
except InvalidJobStateError:
# do nothing; the job has started or completed anyway
pass
except SubmitException:
# re-raise
raise

def release(self, job: Job) -> None:
"""Releases a job if it has been held.

A command is constructed using :func:`~get_release_command` and executed in order to
release the job. Also see :func:`~psij.JobExecutor.release`.
"""
if job.native_id is None:
raise SubmitException('Job does not have a native ID.')
if job.status.state.final:
return
try:
self._run_command(self.get_release_command(job.native_id))
except subprocess.CalledProcessError as ex:
try:
self.process_release_command_output(ex.returncode, ex.output)
except InvalidJobStateError:
# do nothing; the job has started or completed anyway
pass
except SubmitException:
# re-raise
raise

def attach(self, job: Job, native_id: str) -> None:
"""Attaches a job to a native job.

Expand All @@ -302,6 +346,37 @@ def attach(self, job: Job, native_id: str) -> None:
job.executor = self
self._queue_poll_thread.register_job(job)

def info(self, jobs: Optional[List[Job]] = None, owner: Optional[str] = None) -> List[Job]:
"""Retrieves information about jobs.

If `jobs` is not specified, the information for all jobs known to the executor is
retrieved. Otherwise, the information for the specified job is retrieved.
The information is obtained by executing the command returned by
:func:`~get_info_command` and parsing the output using
:func:`~parse_info_output`.
Parameters
----------
job
The job to retrieve information for. If `None`, all jobs are retrieved.
owner
The owner of the job. If `None`, the current user is used.
"""
try:
id_list = []
# create a list of native ids to query
if jobs is not None:
for job in jobs:
if job.native_id is not None:
id_list.append(job.native_id)
out = self._run_command(self.get_info_command(native_ids=id_list, owner=owner))
except subprocess.CalledProcessError as ex:
out = ex.output
exit_code = ex.returncode
else:
exit_code = 0

return self.parse_info_command_output(exit_code, out, jobs)

@abstractmethod
def generate_submit_script(self, job: Job, context: Dict[str, object],
submit_file: IO[str]) -> None:
Expand Down Expand Up @@ -415,6 +490,112 @@ def process_cancel_command_output(self, exit_code: int, out: str) -> None:
"""
pass

@abstractmethod
def get_hold_command(self, native_id: str) -> List[str]:
"""Constructs a command to hold a batch scheduler job.

Concrete implementations of batch scheduler executors must override this method.

Parameters
----------
native_id
The native id of the job being held.

Returns
-------
A list of strings representing the command and arguments to execute in order to hold
the job, such as, e.g., `['qrls', native_id]`.
"""
pass

@abstractmethod
def process_hold_command_output(self, exit_code: int, out: str) -> str:
"""Handle output from a failed hold command.

The main purpose of this method is to help distinguish between the cancel command
failing due to an invalid job state (such as the job having completed before the hold
command was invoked) and other types of errors. Since job state errors are ignored, there
are two options:

1. Instruct the hold command to not fail on invalid state errors and have this
method always raise a :class:`~psij.exceptions.SubmitException`, since it is only invoked
on "other" errors.

2. Have the hold command fail on both invalid state errors and other errors and
interpret the output from the hold command to distinguish between the two and raise
the appropriate exception.

Parameters
----------
exit_code
The exit code from the hold command.
out
The output from the hold command.

Raises
------
InvalidJobStateError
Raised if the job holding has failed because the job was in a completed or failed
state at the time when the holding command was invoked.
SubmitException
Raised for all other reasons.

"""
pass

@abstractmethod
def get_release_command(self, native_id: str) -> List[str]:
"""Constructs a command to release a batch scheduler job.

Concrete implementations of batch scheduler executors must override this method.

Parameters
----------
native_id
The native id of the job being released.

Returns
-------
A list of strings representing the command and arguments to execute in order to release
the job, such as, e.g.,`['qrls', native_id]`.
"""
pass

@abstractmethod
def process_release_command_output(self, exit_code: int, out: str) -> str:
"""Handle output from a failed release command.

The main purpose of this method is to help distinguish between the release command
failing due to an invalid job state (such as the job having completed before the release
command was invoked) and other types of errors. Since job state errors are ignored, there
are two options:

1. Instruct the release command to not fail on invalid state errors and have this
method always raise a :class:`~psij.exceptions.SubmitException`, since it is only invoked
on "other" errors.

2. Have the release command fail on both invalid state errors and other errors and
interpret the output from the release command to distinguish between the two and raise
the appropriate exception.

Parameters
----------
exit_code
The exit code from the release command.

out
The output from the release command.

Raises
------
InvalidJobStateError
Raised if the job releasing has failed because the job was in a completed or failed
state at the time when the releasing command was invoked.
SubmitException
Raised for all other reasons.
"""
pass

@abstractmethod
def get_status_command(self, native_ids: Collection[str]) -> List[str]:
"""Constructs a command to retrieve the status of a list of jobs.
Expand Down Expand Up @@ -488,6 +669,49 @@ def parse_list_output(self, out: str) -> List[str]:
"""
return [s.strip() for s in out.splitlines()]

@abstractmethod
def get_info_command(self, native_ids: Optional[List[str]] = None,
owner: Optional[str] = None) -> List[str]:
"""Constructs a command to retrieve information about a job.

Concrete implementations of batch scheduler executors must override this method. The
command should be able to retrieve information about the job with the specified native id
and, optionally, the owner of the job.

Parameters
----------
native_ids
The native id of the jobs to retrieve information for.
owner
The owner of the job. If `None`, the current user is used.

Returns
-------
A list of strings representing the command and arguments to execute in order to get
information about the job, such as `['qstat', '-u', owner, native_id]`.
"""
pass

@abstractmethod
def parse_info_command_output(self, exit_code: int, out: str,
jobs: Optional[List[Job]] = None) -> List[Job]:
"""Parses the output of a job information command.

Concrete implementations of batch scheduler executors must override this method. The output
is meant to have been produced by the command generated by :func:`~get_info_command`.

Parameters
----------
out
The string output of the information command as prescribed by :func:`~get_info_command`.
jobs
A list of jobs to retrieve information for. If `None`, all jobs are retrieved.
Returns
-------
A list of dictionaries containing information about the job, if jobs are not specified.
"""
pass

def _create_script_context(self, job: Job) -> Dict[str, object]:
launcher = self._get_launcher_from_job(job)
if isinstance(launcher, ScriptBasedLauncher) and logger.isEnabledFor(logging.DEBUG):
Expand Down
Loading
Loading