diff --git a/src/psij/executors/batch/batch_scheduler_executor.py b/src/psij/executors/batch/batch_scheduler_executor.py index 0c7db560..7b344977 100644 --- a/src/psij/executors/batch/batch_scheduler_executor.py +++ b/src/psij/executors/batch/batch_scheduler_executor.py @@ -281,6 +281,50 @@ def cancel(self, job: Job) -> None: # re-raise raise + def hold(self, job: Job) -> None: + """Holds a job if it has not otherwise executed. + + A command is constructed using :func:`~get_hold_command` and executed in order to hold + the job. Also see :func:`~psij.JobExecutor.hold`. + """ + if job.native_id is None: + raise SubmitException('Job does not have a native ID.') + if job.status.state.final: + return + try: + self._run_command(self.get_hold_command(job.native_id)) + except subprocess.CalledProcessError as ex: + try: + self.process_hold_command_output(ex.returncode, ex.output) + except InvalidJobStateError: + # do nothing; the job has started or completed anyway + pass + except SubmitException: + # re-raise + raise + + def release(self, job: Job) -> None: + """Releases a job if it has been held. + + A command is constructed using :func:`~get_release_command` and executed in order to + release the job. Also see :func:`~psij.JobExecutor.release`. + """ + if job.native_id is None: + raise SubmitException('Job does not have a native ID.') + if job.status.state.final: + return + try: + self._run_command(self.get_release_command(job.native_id)) + except subprocess.CalledProcessError as ex: + try: + self.process_release_command_output(ex.returncode, ex.output) + except InvalidJobStateError: + # do nothing; the job has started or completed anyway + pass + except SubmitException: + # re-raise + raise + def attach(self, job: Job, native_id: str) -> None: """Attaches a job to a native job. @@ -302,6 +346,37 @@ def attach(self, job: Job, native_id: str) -> None: job.executor = self self._queue_poll_thread.register_job(job) + def info(self, jobs: Optional[List[Job]] = None, owner: Optional[str] = None) -> List[Job]: + """Retrieves information about jobs. + + If `jobs` is not specified, the information for all jobs known to the executor is + retrieved. Otherwise, the information for the specified job is retrieved. + The information is obtained by executing the command returned by + :func:`~get_info_command` and parsing the output using + :func:`~parse_info_output`. + Parameters + ---------- + job + The job to retrieve information for. If `None`, all jobs are retrieved. + owner + The owner of the job. If `None`, the current user is used. + """ + try: + id_list = [] + # create a list of native ids to query + if jobs is not None: + for job in jobs: + if job.native_id is not None: + id_list.append(job.native_id) + out = self._run_command(self.get_info_command(native_ids=id_list, owner=owner)) + except subprocess.CalledProcessError as ex: + out = ex.output + exit_code = ex.returncode + else: + exit_code = 0 + + return self.parse_info_command_output(exit_code, out, jobs) + @abstractmethod def generate_submit_script(self, job: Job, context: Dict[str, object], submit_file: IO[str]) -> None: @@ -415,6 +490,112 @@ def process_cancel_command_output(self, exit_code: int, out: str) -> None: """ pass + @abstractmethod + def get_hold_command(self, native_id: str) -> List[str]: + """Constructs a command to hold a batch scheduler job. + + Concrete implementations of batch scheduler executors must override this method. + + Parameters + ---------- + native_id + The native id of the job being held. + + Returns + ------- + A list of strings representing the command and arguments to execute in order to hold + the job, such as, e.g., `['qrls', native_id]`. + """ + pass + + @abstractmethod + def process_hold_command_output(self, exit_code: int, out: str) -> str: + """Handle output from a failed hold command. + + The main purpose of this method is to help distinguish between the cancel command + failing due to an invalid job state (such as the job having completed before the hold + command was invoked) and other types of errors. Since job state errors are ignored, there + are two options: + + 1. Instruct the hold command to not fail on invalid state errors and have this + method always raise a :class:`~psij.exceptions.SubmitException`, since it is only invoked + on "other" errors. + + 2. Have the hold command fail on both invalid state errors and other errors and + interpret the output from the hold command to distinguish between the two and raise + the appropriate exception. + + Parameters + ---------- + exit_code + The exit code from the hold command. + out + The output from the hold command. + + Raises + ------ + InvalidJobStateError + Raised if the job holding has failed because the job was in a completed or failed + state at the time when the holding command was invoked. + SubmitException + Raised for all other reasons. + + """ + pass + + @abstractmethod + def get_release_command(self, native_id: str) -> List[str]: + """Constructs a command to release a batch scheduler job. + + Concrete implementations of batch scheduler executors must override this method. + + Parameters + ---------- + native_id + The native id of the job being released. + + Returns + ------- + A list of strings representing the command and arguments to execute in order to release + the job, such as, e.g.,`['qrls', native_id]`. + """ + pass + + @abstractmethod + def process_release_command_output(self, exit_code: int, out: str) -> str: + """Handle output from a failed release command. + + The main purpose of this method is to help distinguish between the release command + failing due to an invalid job state (such as the job having completed before the release + command was invoked) and other types of errors. Since job state errors are ignored, there + are two options: + + 1. Instruct the release command to not fail on invalid state errors and have this + method always raise a :class:`~psij.exceptions.SubmitException`, since it is only invoked + on "other" errors. + + 2. Have the release command fail on both invalid state errors and other errors and + interpret the output from the release command to distinguish between the two and raise + the appropriate exception. + + Parameters + ---------- + exit_code + The exit code from the release command. + + out + The output from the release command. + + Raises + ------ + InvalidJobStateError + Raised if the job releasing has failed because the job was in a completed or failed + state at the time when the releasing command was invoked. + SubmitException + Raised for all other reasons. + """ + pass + @abstractmethod def get_status_command(self, native_ids: Collection[str]) -> List[str]: """Constructs a command to retrieve the status of a list of jobs. @@ -488,6 +669,49 @@ def parse_list_output(self, out: str) -> List[str]: """ return [s.strip() for s in out.splitlines()] + @abstractmethod + def get_info_command(self, native_ids: Optional[List[str]] = None, + owner: Optional[str] = None) -> List[str]: + """Constructs a command to retrieve information about a job. + + Concrete implementations of batch scheduler executors must override this method. The + command should be able to retrieve information about the job with the specified native id + and, optionally, the owner of the job. + + Parameters + ---------- + native_ids + The native id of the jobs to retrieve information for. + owner + The owner of the job. If `None`, the current user is used. + + Returns + ------- + A list of strings representing the command and arguments to execute in order to get + information about the job, such as `['qstat', '-u', owner, native_id]`. + """ + pass + + @abstractmethod + def parse_info_command_output(self, exit_code: int, out: str, + jobs: Optional[List[Job]] = None) -> List[Job]: + """Parses the output of a job information command. + + Concrete implementations of batch scheduler executors must override this method. The output + is meant to have been produced by the command generated by :func:`~get_info_command`. + + Parameters + ---------- + out + The string output of the information command as prescribed by :func:`~get_info_command`. + jobs + A list of jobs to retrieve information for. If `None`, all jobs are retrieved. + Returns + ------- + A list of dictionaries containing information about the job, if jobs are not specified. + """ + pass + def _create_script_context(self, job: Job) -> Dict[str, object]: launcher = self._get_launcher_from_job(job) if isinstance(launcher, ScriptBasedLauncher) and logger.isEnabledFor(logging.DEBUG): diff --git a/src/psij/executors/batch/slurm.py b/src/psij/executors/batch/slurm.py index 6f0963ee..a0cabf5e 100644 --- a/src/psij/executors/batch/slurm.py +++ b/src/psij/executors/batch/slurm.py @@ -1,8 +1,9 @@ -from datetime import timedelta +from datetime import timedelta, datetime from pathlib import Path -from typing import Optional, Collection, List, Dict, IO +from typing import Optional, Collection, List, Dict, IO, Union +import re -from psij import Job, JobStatus, JobState, SubmitException +from psij import Job, JobStatus, JobState, SubmitException, JobSpec, ResourceSpecV1 from psij.executors.batch.batch_scheduler_executor import BatchSchedulerExecutor, \ BatchSchedulerExecutorConfig, check_status_exit_code from psij.executors.batch.script_generator import TemplatedScriptGenerator @@ -145,6 +146,26 @@ def process_cancel_command_output(self, exit_code: int, out: str) -> None: """See :meth:`~.BatchSchedulerExecutor.process_cancel_command_output`.""" raise SubmitException('Failed job cancel job: %s' % out) + def get_hold_command(self, native_id: str) -> List[str]: + """See :meth:`~.BatchSchedulerExecutor.get_hold_command`.""" + return ['scontrol', 'hold', native_id] + + def process_hold_command_output(self, exit_code: int, out: str) -> str: + """See :meth:`~.BatchSchedulerExecutor.process_hold_command_output`.""" + if exit_code != 0: + raise SubmitException('Failed job hold: %s' % out) + return out + + def get_release_command(self, native_id: str) -> List[str]: + """See :meth:`~.BatchSchedulerExecutor.get_release_command`.""" + return ['scontrol', 'release', native_id] + + def process_release_command_output(self, exit_code: int, out: str) -> str: + """See :meth:`~.BatchSchedulerExecutor.process_release_command_output`.""" + if exit_code != 0: + raise SubmitException('Failed job release: %s' % out) + return out + def get_status_command(self, native_ids: Collection[str]) -> List[str]: """See :meth:`~.BatchSchedulerExecutor.get_status_command`.""" # we're not really using job arrays, so this is equivalent to the job ID. However, if @@ -199,3 +220,150 @@ def _format_duration(self, d: timedelta) -> str: def _clean_submit_script(self, job: Job) -> None: super()._clean_submit_script(job) self._delete_aux_file(job, '.nodefile') + + def get_info_command(self, native_ids: Optional[List[str]] = None, + owner: Optional[str] = None) -> List[str]: + """See :meth:`~.BatchSchedulerExecutor.get_info_command`.""" + args = [_SQUEUE_COMMAND, '-ho', '%A %a %C %l %j %M %P %t %V %Z %S %u %N'] + + # Create commna-separated list of job IDs. + if native_ids is not None and len(native_ids) > 0: + jobids = "" + for id in native_ids: + if len(id) == 0: + continue + if jobids is None: + jobids = id + else: + jobids += ',' + id + args.append('--job') + args.append(jobids) + + if owner is not None: + args.append('--user') + args.append(owner) + + return args + + def _parse_nodes(self, node_list: str, cpu_per_node: int) -> List[Dict[str, Union[str, int]]]: + """ + Parse the node list of squeue, and return a list of dictionaries with + node names and CPU counts. + """ + result: List[Dict[str, Union[str, int]]] = [] + for match in re.findall(r'([^,\[]+)(?:\[([^\]]+)\])?', str(node_list)): + prefix, range_str = match + if range_str: + for x in range_str.split(","): + if re.match(r'^(\d+)-(\d+)$', x): + match = re.match(r'^(\d+)-(\d+)$', x) + if match is not None: + start, end = map(int, match.groups()) + range_values = range(start, end + 1) + result.extend({"name": prefix + str(n), + "procs": cpu_per_node} for n in range_values) + else: + result.append({"name": prefix + x, "procs": cpu_per_node}) + elif prefix: + result.append({"name": prefix, "procs": cpu_per_node}) + return result + + def _parse_duration(self, duration: str) -> timedelta: + """Parse the duration string of squeue, and return a timedelta object.""" + # Acceptable time formats include "minutes", "minutes:seconds", "hours:minutes:seconds", + # "days-hours", "days-hours:minutes" and "days-hours:minutes:seconds". + # The default time format is "minutes:seconds". + if '-' in duration: + days, time = duration.split('-') + days_val = int(days) + hours, minutes, seconds = map(int, time.split(':')) + return timedelta(days=days_val, hours=hours, minutes=minutes, seconds=seconds) + else: + time_parts = duration.split(':') + if len(time_parts) == 1: + minutes = int(time_parts[0]) + return timedelta(minutes=minutes) + elif len(time_parts) == 2: + minutes, seconds = map(int, time_parts) + return timedelta(minutes=minutes, seconds=seconds) + elif len(time_parts) == 3: + hours, minutes, seconds = map(int, time_parts) + return timedelta(hours=hours, minutes=minutes, seconds=seconds) + else: + raise ValueError(f"Invalid duration format: {duration}") + + def parse_info_command_output(self, exit_code: int, out: str, + jobs: Optional[List[Job]] = None) -> List[Job]: + """See :meth:`~.BatchSchedulerExecutor.parse_info_output`.""" + check_status_exit_code(_SQUEUE_COMMAND, exit_code, out) + """ Output of example: Spaces added for clarity. + $ squeue -o "%A %a %C %l %j %M %P %t %V %Z %S %u %B %N" + cols[0] [1] [2] [3] [4] [5] [6] [7] [8] + JOBID ACCOUNT CPUS TIME_LIMIT NAME TIME PARTITION ST SUBMIT_TIME + 1418 (null) 1 UNLIMITED sbatch 0:00 aaa PD 2025-03-31T13:07:28 + 1424 (null) 1 UNLIMITED sbatch 0:08 aaa R 2025-04-07T11:34:38 + [9] [10] [11] [12] [13] + WORK_DIR START_TIME USER EXEC_HOST NODELIST + /home/XYZ N/A XYZ N/A n/a + /home/XYZ 2025-04-07T11:34:38 XYZ ehost ehost + """ + lines = iter(out.split('\n')) + job_list = [] + for line in lines: + if not line: + continue + cols = line.split() + assert len(cols) > 10 + native_id = cols[0] + # Search the job which have native id. + job = None + if jobs is not None: + for job in jobs: + if job.native_id == native_id: + break + if job is None: + # Create a new job object + spec = JobSpec() + job = Job(spec=spec) + + if job.executor is None: + job.executor = self + + if job.spec is None: + job.spec = JobSpec() + + job._native_id = native_id + spec = job.spec + if cols[1] != "(null)": + spec.attributes.account = cols[1] + else: + spec.attributes.account = None + if spec.resources is None: + spec.resources = ResourceSpecV1(process_count=int(cols[2])) + elif isinstance(spec.resources, ResourceSpecV1): + spec.resources.process_count = int(cols[2]) + if cols[3] == "UNLIMITED": + spec.attributes.duration = timedelta(seconds=0) + else: + spec.attributes.duration = self._parse_duration(cols[3]) + spec.name = cols[4] + job.current_info.wall_time = int(self._parse_duration(cols[5]).total_seconds()) + spec.attributes.queue_name = cols[6] + job.status.state = self._get_state(cols[7]) + job.current_info.submission_time = datetime.fromisoformat(cols[8]) + spec.directory = Path(cols[9]) + if cols[10] == "N/A": + job.current_info.dispatch_time = None + else: + job.current_info.dispatch_time = datetime.fromisoformat(cols[10]) + job.current_info.owner = cols[11] + job.current_info.resourcelist = [] + if len(cols) > 12: + job.current_info.resourcelist = self._parse_nodes(cols[12], int(cols[2])) + + job.current_info.submit_host = None # Can not get submit host from squeue + job.current_info.cpu_time = None # Can not get CPU time from squeue + + job_list.append(job) + + return job_list diff --git a/src/psij/job.py b/src/psij/job.py index b3673069..8155fb36 100644 --- a/src/psij/job.py +++ b/src/psij/job.py @@ -10,6 +10,7 @@ from psij.job_spec import JobSpec from psij.job_state import JobState, JobStateOrder from psij.job_status import JobStatus +from psij.job_info import JobInfo logger = logging.getLogger(__name__) @@ -47,6 +48,7 @@ def __init__(self, spec: Optional[JobSpec] = None) -> None: self._native_id: Optional[object] = None self._cb: Optional[JobStatusCallback] = None self._status_cv = threading.Condition() + self._current_info = JobInfo() if logger.isEnabledFor(logging.DEBUG): logger.debug('New Job: {}'.format(self)) @@ -118,6 +120,17 @@ def status(self, status: JobStatus) -> None: if self.executor: self.executor._notify_callback(self, status) + @property + def current_info(self) -> JobInfo: + """ + The current information about this job. + + This property returns the most recent job information available. The information is + obtained by calling :func:`~psij.JobExecutor.info` on the job executor that was used to + submit this job. + """ + return self._current_info + def set_job_status_callback(self, cb: Union['JobStatusCallback', Callable[['Job', 'psij.JobStatus'], None]]) -> None: @@ -156,6 +169,52 @@ def cancel(self) -> None: else: self.executor.cancel(self) + def hold(self) -> None: + """ + Holds this job. + + The job is held by calling :func:`~psij.JobExecutor.hold` on the job + executor that was used to submit this job. + :raises SubmitException: if the job has not yet been submitted. + """ + if self.status.final: + return + if not self.executor: + raise SubmitException('Cannot hold job: not bound to an executor.') + else: + self.executor.hold(self) + + def release(self) -> None: + """ + Releases this job. + + The job is released by calling :func:`~psij.JobExecutor.release` on the job + executor that was used to submit this job. + :raises SubmitException: if the job has not yet been submitted. + """ + if self.status.final: + return + if not self.executor: + raise SubmitException('Cannot release job: not bound to an executor.') + else: + self.executor.release(self) + + def info(self) -> None: + """ + Returns the information about this job. + + The job information is obtained by calling :func:`~psij.JobExecutor.info` on the job + executor that was used to submit this job. + :return: a dictionary containing information about this job + :raises SubmitException: if the job has not yet been submitted + """ + if self.status.final: + return + if not self.executor: + raise SubmitException('Cannot release job: not bound to an executor.') + else: + self.executor.info(jobs=[self]) + def _all_greater(self, states: Optional[Union[JobState, Sequence[JobState]]]) \ -> Optional[Set[JobState]]: if states is None: diff --git a/src/psij/job_executor.py b/src/psij/job_executor.py index 286fc315..650c1a59 100644 --- a/src/psij/job_executor.py +++ b/src/psij/job_executor.py @@ -159,7 +159,6 @@ def cancel(self, job: Job) -> None: underlying implementation and before the client code receives the completion notification. In such a case, the job will never enter the `CANCELED` state and `job.wait(JobState.CANCELED)` would hang indefinitely. - :param job: The job to be canceled. :raises SubmitException: Thrown if the request cannot be sent to the underlying @@ -167,6 +166,45 @@ def cancel(self, job: Job) -> None: """ pass + @abstractmethod + def hold(self, job: Job) -> None: + """ + Holds a job that has been submitted to underlying executor implementation. + + A successful return of this method only indicates that the request for hold has been + communicated to the underlying implementation. + The job on hold will not be executed until it is released. The job must be in the + :attr:`~psij.JobState.HELD` state. The job will be put on hold at the discretion of + the implementation, which may be at some later time. A successful hold is reflected in a + change of status of the respective job to :attr:`~psij.JobState.HELD`. User code can + synchronously wait until the :attr:`~psij.JobState.HELD` state is reached using + `job.wait(JobState.HELD)`. + :param job: The job to be put on hold. + + :raises SubmitException: Thrown if the request cannot be sent to the underlying + implementation. + """ + pass + + @abstractmethod + def release(self, job: Job) -> None: + """ + Relases a job that has been put on hold. + A successful return of this method only indicates that the request for release has been + communicated to the underlying implementation. + The job on hold will not be executed until it is released. The job must be in the + :attr:`~psij.JobState.QUEUED` state. The job will be released at the discretion of + the implementation, which may be at some later time. A successful release is reflected in a + change of status of the respective job to :attr:`~psij.JobState.QUEUED`. User code can + synchronously wait until the :attr:`~psij.JobState.QUEUED` state is reached using + `job.wait(JobState.QUEUED)`. + :param job: The job to be released. + + :raises SubmitException: Thrown if the request cannot be sent to the underlying + implementation. + """ + pass + @abstractmethod def list(self) -> List[str]: """List native IDs of all jobs known to the backend. @@ -186,6 +224,18 @@ def attach(self, job: Job, native_id: str) -> None: """ pass + @abstractmethod + def info(self, jobs: Optional[List[Job]] = None, owner: Optional[str] = None) -> List[Job]: + """ + Returns information about jobs. + + :param jobs: An optional list of jobs to get information about. If not specified, the method + returns information about about all jobs. + :param owner: An optional list of job owners to get information about. If not specified, + the method returns information about all jobs. + """ + pass + def set_job_status_callback(self, cb: Union[JobStatusCallback, Callable[[Job, 'psij.JobStatus'], None]]) -> None: diff --git a/src/psij/job_info.py b/src/psij/job_info.py new file mode 100644 index 00000000..d1345403 --- /dev/null +++ b/src/psij/job_info.py @@ -0,0 +1,54 @@ +from typing import Optional, List, Dict +from datetime import datetime + + +class ResourceInfo(object): + """Class to resource information about a job.""" + + def __init__(self, hostname: str, cores: int): + """ + Initialize the ResourceInfo object. + + :param name: Name of the resource. + :param cores: Number of cores allocated for the job. + """ + self.hostname = hostname + self.cores = cores + + def __repr__(self) -> str: + """Return a string representation of the ResourceInfo instance.""" + return f"ResourceInfo(name={self.hostname}, cores={self.cores})" + + +class JobInfo(object): + """Class to the information about a job.""" + + def __init__(self, wall_time: Optional[int] = None, + cpu_time: Optional[int] = None, + submission_time: Optional[datetime] = None, + dispatch_time: Optional[datetime] = None, + resources: Optional[List[Dict[str, str | int]]] = None, + owner: Optional[str] = None, submit_host: Optional[str] = None): + """ + Initialize the JobInfo object. + + :param wall_time: Wall time of the job in seconds. + :param cpu_time: CPU time of the job in seconds. + :param submission_time: Time when the job was submitted. + :param dispatch_time: Time when the job was running. + :param resources: Resource information about the job. + :param owner: Owner of the job. + """ + self.wall_time = wall_time + self.cpu_time = cpu_time + self.submission_time = submission_time + self.dispatch_time = dispatch_time + self.resourcelist = resources + self.owner = owner + self.submit_host = submit_host + + def __repr__(self) -> str: + """Return a string representation of the JobInfo instance.""" + return f"JobInfo(wall_time={self.wall_time}, cpu_time={self.cpu_time}, " \ + f"submission_time={self.submission_time}, dispatch_time={self.dispatch_time}, " \ + f"resourcelist={self.resourcelist}, owner={self.owner})" diff --git a/src/psij/job_state.py b/src/psij/job_state.py index b028f456..402daba4 100644 --- a/src/psij/job_state.py +++ b/src/psij/job_state.py @@ -6,7 +6,8 @@ class JobState(bytes, Enum): """ An enumeration holding the possible job states. - The possible states are: `NEW`, `QUEUED`, `ACTIVE`, `COMPLETED`, `FAILED`, and `CANCELED`. + The possible states are: `NEW`, `QUEUED`, `ACTIVE`, `COMPLETED`, `FAILED`, `HOLD`, + and `CANCELED`. """ def __new__(cls, index: int, order: int, name: str, final: bool) -> 'JobState': # noqa: D102 @@ -34,6 +35,11 @@ def __init__(self, *args: object) -> None: # noqa: D107 This is the state of the job after being accepted by a backend for execution, but before the execution of the job begins. """ + HELD = (1, 2, 'HELD', False) + """ + This state represents a job that has been held by the backend for execution, but before the + execution of the job begins. + """ ACTIVE = (2, 2, 'ACTIVE', False) """This state represents an actively running job.""" COMPLETED = (3, 3, 'COMPLETED', True) @@ -129,6 +135,8 @@ def prev(state: JobState) -> Optional[JobState]: return JobState.ACTIVE if state == JobState.ACTIVE: return JobState.QUEUED + if state == JobState.HELD: + return JobState.QUEUED if state == JobState.QUEUED: return JobState.NEW return None