Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 149 additions & 18 deletions xfel/ui/components/submission_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,56 @@

from libtbx import easy_run

# Mapping from raw Slurm (sacct) job states to cctbx job statuses.
SLURM_STATUS_MAP = {'COMPLETED': 'DONE',
'COMPLETING': 'RUN',
'FAILED': 'EXIT',
'PENDING': 'PEND',
'PREEMPTED': 'SUSP',
'RUNNING': 'RUN',
'SUSPENDED': 'SUSP',
'STOPPED': 'SUSP',
'CANCELLED': 'EXIT',
'TIMEOUT': 'TIMEOUT',
'OUT_OF_ME': 'EXIT', # truncated 'OUT_OF_MEMORY'
'OUT_OF_MEMORY': 'EXIT',
}

def _map_slurm_state(raw):
"""Normalize a raw sacct state string and map it to a cctbx job status.

Handles both truncated forms (e.g. 'CANCELLED+', 'OUT_OF_ME') and full forms
with trailing detail (e.g. 'CANCELLED by 12345')."""
status = raw.split()[0].rstrip('+') if raw else ''
if status not in SLURM_STATUS_MAP:
print('Unknown job status', status)
return SLURM_STATUS_MAP.get(status, 'UNKWN')

# Per-chunk statuses that are terminal and stable, so they never need to be
# re-queried. 'UNKWN' is deliberately excluded (it is transient -- a job not yet
# registered with the scheduler reads as UNKWN until it appears).
CACHEABLE_TERMINAL = frozenset(["DONE", "EXIT", "DELETED", "ERR", "SUBMIT_FAIL", "TIMEOUT"])

def _aggregate_ensemble(statuses):
"""Collapse the statuses of a job's chunks (comma-separated submission ids,
e.g. ensemble refinement) into one overall status. Active states win so the
job keeps being tracked; once nothing is active, a single failed chunk marks
the whole job EXIT, and only an all-clear set is DONE."""
s = [st for st in statuses if st is not None]
if not s:
return "UNKWN"
if "RUN" in s:
return "RUN"
if "PEND" in s or "SUBMITTED" in s:
return "PEND"
if "SUSP" in s:
return "SUSP"
if any(st in ("EXIT", "ERR", "TIMEOUT") for st in s):
return "EXIT"
if "UNKWN" in s:
return "UNKWN"
return "DONE"

class JobStopper(object):
def __init__(self, queueing_system):
self.queueing_system = queueing_system
Expand Down Expand Up @@ -91,21 +141,7 @@ def query(self, submission_id):
# submission tracker.
result = easy_run.fully_buffered(command=self.command%submission_id)
if len(result.stdout_lines) == 0: return 'UNKWN'
status = result.stdout_lines[0].strip().rstrip('+')
statuses = {'COMPLETED': 'DONE',
'COMPLETING': 'RUN',
'FAILED': 'EXIT',
'PENDING': 'PEND',
'PREEMPTED': 'SUSP',
'RUNNING': 'RUN',
'SUSPENDED': 'SUSP',
'STOPPED': 'SUSP',
'CANCELLED': 'EXIT',
'TIMEOUT': 'TIMEOUT',
'OUT_OF_ME': 'EXIT',
}
if status not in statuses: print('Unknown job status', status)
return statuses[status] if status in statuses else 'UNKWN'
return _map_slurm_state(result.stdout_lines[0].strip())
elif self.queueing_system == 'htcondor':
# (copied from the man page)
# H = on hold, R = running, I = idle (waiting for a machine to execute on), C = completed,
Expand Down Expand Up @@ -135,6 +171,55 @@ def query(self, submission_id):
else:
return status

def query_many(self, submission_ids):
"""Query many slurm/shifter jobs with minimal load on the accounting DB.

Live state comes from the controller (slurmctld) via a single `squeue` call,
which is cheap and in-memory. Only jobs that have already left the queue are
looked up in the accounting DB (slurmdbd) via a single batched `sacct`, to
classify their terminal state once. Returns {submission_id: cctbx status};
ids unknown to both map to 'UNKWN'. Slurm/shifter only."""
assert self.queueing_system in ('slurm', 'shifter')
ids = [str(sid) for sid in submission_ids if sid]
result = {}
if not ids:
return result
# 1) Controller state -- avoids slurmdbd entirely. --states=all is needed so
# that jobs still held in the controller in a finished state are reported.
squeue_command = "squeue --jobs=%s --noheader --format='%%i|%%T' --states=all" \
% ",".join(ids)
squeue_result = easy_run.fully_buffered(command=squeue_command)
for line in squeue_result.stdout_lines:
fields = line.split('|')
if len(fields) < 2:
continue
jobid, state = fields[0].strip(), fields[1].strip()
if '.' in jobid: # array/step sub-entries
continue
result[jobid] = _map_slurm_state(state)
# 2) Jobs no longer known to the controller have finished and left the queue;
# resolve their terminal state once from the accounting DB, batched.
missing = [sid for sid in ids if sid not in result]
if missing:
sacct_command = "sacct --jobs=%s --format=JobID,State --noheader -P" \
% ",".join(missing)
sacct_result = easy_run.fully_buffered(command=sacct_command)
for line in sacct_result.stdout_lines:
fields = line.split('|')
if len(fields) < 2:
continue
jobid, state = fields[0].strip(), fields[1].strip()
# Skip step rows (e.g. '12345.batch', '12345.extern', '12345.0').
if '.' in jobid:
continue
if jobid in missing:
result[jobid] = _map_slurm_state(state)
# 3) Anything still unresolved (e.g. very recently submitted, not yet
# registered with either the controller or the accounting DB).
for sid in ids:
result.setdefault(sid, 'UNKWN')
return result

def get_mysql_server_hostname(self, submission_id):
if self.queueing_system in ["mpi", "lsf"]:
print("method to obtain hostname running MySQL server not implemented for ", self.queueing_system)
Expand Down Expand Up @@ -191,10 +276,17 @@ def track(self, submission_id, log_path):
if submission_id is None:
return "UNKWN"
all_statuses = [self._track(sid, log_path) for sid in submission_id.split(',')]
if all_statuses and all([all_statuses[0] == s for s in all_statuses[1:]]):
# Collapse a job's (comma-separated) submission ids into one status. All ids
# must agree, otherwise the job is considered UNKWN (e.g. an ensemble job
# whose chunks are still in mixed states).
if all_statuses and all(all_statuses[0] == s for s in all_statuses[1:]):
return all_statuses[0]
else:
return "UNKWN"
return "UNKWN"

def track_many(self, submission_ids, log_paths):
"""Return a list of statuses, one per job. Default implementation simply loops
over track(); SlurmSubmissionTracker overrides this to batch into one query."""
return [self.track(sid, lp) for sid, lp in zip(submission_ids, log_paths)]

def _track(self, submission_id, log_path):
raise NotImplementedError("Override me!")
Expand Down Expand Up @@ -243,9 +335,48 @@ def _track(self, submission_id, log_path):
print("Found an unknown status", status)

class SlurmSubmissionTracker(SubmissionTracker):
def __init__(self, params):
super(SlurmSubmissionTracker, self).__init__(params)
# Cache of terminal per-chunk statuses, so finished chunks (e.g. of an
# ensemble job) are never re-queried for the life of this tracker.
self._chunk_cache = {}

def _track(self, submission_id, log_path):
return self.interrogator.query(submission_id)

def track_many(self, submission_ids, log_paths):
# Collect every not-yet-terminal chunk id across all jobs, query them in a
# single squeue (+ at most one sacct) call, and aggregate per job. Chunks
# already known to be terminal are served from the cache and not re-queried.
to_query = set()
for submission_id in submission_ids:
if submission_id is None:
continue
for cid in submission_id.split(','):
if cid and cid not in self._chunk_cache:
to_query.add(cid)
fresh = {}
if to_query:
fresh = self.interrogator.query_many(sorted(to_query))
for cid, status in fresh.items():
if status in CACHEABLE_TERMINAL:
self._chunk_cache[cid] = status
def chunk_status(cid):
return self._chunk_cache.get(cid) or fresh.get(cid, "UNKWN")
results = []
for submission_id in submission_ids:
if submission_id is None:
results.append("UNKWN")
continue
statuses = [chunk_status(cid) for cid in submission_id.split(',') if cid]
# Single-chunk jobs report their exact status (preserving e.g. TIMEOUT);
# multi-chunk (ensemble) jobs are collapsed by precedence.
if len(statuses) == 1:
results.append(statuses[0])
else:
results.append(_aggregate_ensemble(statuses))
return results

class HTCondorSubmissionTracker(SubmissionTracker):
def _track(self, submission_id, log_path):
return self.interrogator.query(submission_id)
Expand Down
45 changes: 31 additions & 14 deletions xfel/ui/components/xfel_gui_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
from xfel.ui.db import get_run_path
from xfel.ui.db.xfel_db import xfel_db_application

from prime.postrefine.mod_gui_frames import PRIMEInputWindow, PRIMERunWindow
from prime.postrefine.mod_input import master_phil
#from prime.postrefine.mod_gui_frames import PRIMEInputWindow, PRIMERunWindow
#from prime.postrefine.mod_input import master_phil
from iota.utils.utils import Capturing, set_base_dir

icons = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'icons/')
Expand Down Expand Up @@ -342,13 +342,15 @@ def __init__(self,
self.parent = parent
self.active = active
self.only_active_jobs = True
# {job.id: status} from the previous cycle, used to repaint only on a delta.
self._last_snapshot = None

def post_refresh(self, trials = None, jobs = None):
evt = MonitorJobs(tp_EVT_JOB_MONITOR, -1, trials, jobs)
wx.PostEvent(self.parent.run_window.jobs_tab, evt)

def run(self):
from xfel.ui.components.submission_tracker import TrackerFactory
from xfel.ui.components.submission_tracker import TrackerFactory, CACHEABLE_TERMINAL

# one time post for an initial update
self.post_refresh()
Expand All @@ -363,19 +365,34 @@ def run(self):
trials = db.get_all_trials()
jobs = db.get_all_jobs(active = self.only_active_jobs)

for job in jobs:
if job.status in ['DONE', 'EXIT', 'SUBMIT_FAIL', 'DELETED']:
# Collect the jobs that still need tracking, then query the queueing system
# in a single batched call (one squeue per cycle) rather than once per job.
# Skip the terminal statuses (CACHEABLE_TERMINAL, which includes ERR and
# TIMEOUT); UNKWN is intentionally NOT in that set because it is transient
# for slurm (e.g. ensemble jobs with mixed states), so it keeps tracking.
active_jobs = [job for job in jobs if job.status not in CACHEABLE_TERMINAL]
new_statuses = tracker.track_many(
[job.submission_id for job in active_jobs],
[job.get_log_path() for job in active_jobs])
updates = []
for job, new_status in zip(active_jobs, new_statuses):
# A just-submitted job may not be visible to the scheduler yet; keep it
# SUBMITTED rather than flipping it to a transient ERR/UNKWN.
if job.status == "SUBMITTED" and new_status in ("ERR", "UNKWN"):
continue
new_status = tracker.track(job.submission_id, job.get_log_path())
# Handle the case where the job was submitted but no status is available yet
if job.status == "SUBMITTED" and new_status == "ERR":
pass
elif job.status != new_status:
job.status = new_status

self.post_refresh(trials, jobs)
if job.status != new_status:
updates.append((job, new_status))
# Persist all status changes in one UPDATE (also syncs the Job objects).
db.update_job_statuses(updates)

# Repaint only when something actually changed (a status delta, or the
# set of active jobs changed), to avoid needless UI churn.
snapshot = {job.id: job.status for job in jobs}
if snapshot != self._last_snapshot:
self.post_refresh(trials, jobs)
self._last_snapshot = snapshot
wx.CallAfter(self.parent.run_window.jmn_light.change_status, 'on')
time.sleep(5)
time.sleep(10)
except Exception as e:
print(e)
wx.CallAfter(self.parent.run_window.jmn_light.change_status, 'alert')
Expand Down
18 changes: 18 additions & 0 deletions xfel/ui/db/xfel_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,24 @@ def get_all_jobs(self, active = False, where = None):
(Task, 'task', False),
(Dataset, 'dataset', False)], where = where)

def update_job_statuses(self, job_status_pairs):
"""Persist new statuses for several jobs in a single UPDATE and sync the
in-memory Job objects. job_status_pairs is a list of (Job, new_status).
Avoids one committed UPDATE round-trip per changed job."""
job_status_pairs = [(job, status) for job, status in job_status_pairs
if job.status != status]
if not job_status_pairs:
return
table_name = job_status_pairs[0][0].table_name
cases = " ".join("WHEN %d THEN '%s'" % (job.id, status)
for job, status in job_status_pairs)
ids = ",".join("%d" % job.id for job, _ in job_status_pairs)
query = "UPDATE `%s` SET status = CASE id %s END WHERE id IN (%s)" % (
table_name, cases, ids)
self.execute_query(query, commit=True)
for job, status in job_status_pairs:
job._db_dict['status'] = status

def delete_job(self, job = None, job_id = None):
assert [job, job_id].count(None) == 1
if job_id is None:
Expand Down
Loading