Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show when runs are being processed #322

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions damnit/backend/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,8 @@ class MsgKind(Enum):
#run_deleted = 'run_deleted'
#standalone_comment_set = 'standalone_comment_set'
#standalone_comment_deleted = 'standalone_comment_deleted'
processing_running = 'processing_running' # Supports status indicators
processing_finished = 'processing_finished'
# Commented out options are not implemented yet

def msg_dict(kind: MsgKind, data: dict):
Expand Down
175 changes: 112 additions & 63 deletions damnit/backend/extract_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import socket
import subprocess
import sys
from getpass import getuser
from pathlib import Path
from tempfile import TemporaryDirectory
from uuid import uuid4

import h5py
import numpy as np
Expand All @@ -29,45 +31,15 @@
log = logging.getLogger(__name__)


def run_in_subprocess(args, **kwargs):
def prepare_env():
# Ensure subprocess can import ctxrunner & damnit_ctx
env = os.environ.copy()
ctxsupport_dir = str(Path(__file__).parents[1] / 'ctxsupport')
env['PYTHONPATH'] = ctxsupport_dir + (
os.pathsep + env['PYTHONPATH'] if 'PYTHONPATH' in env else ''
)
return env

return subprocess.run(args, env=env, **kwargs)


def extract_in_subprocess(
proposal, run, out_path, cluster=False, run_data=RunData.ALL, match=(),
variables=(), python_exe=None, mock=False
):
if not python_exe:
python_exe = sys.executable

args = [python_exe, '-m', 'ctxrunner', 'exec', str(proposal), str(run), run_data.value,
'--save', out_path]
if cluster:
args.append('--cluster-job')
if mock:
args.append("--mock")
if variables:
for v in variables:
args.extend(['--var', v])
else:
for m in match:
args.extend(['--match', m])

with TemporaryDirectory() as td:
# Save a separate copy of the reduced data, so we can send an update
# with only the variables that we've extracted.
reduced_out_path = Path(td, 'reduced.h5')
args.extend(['--save-reduced', str(reduced_out_path)])

run_in_subprocess(args, check=True)

return load_reduced_data(reduced_out_path)

class ContextFileUnpickler(pickle.Unpickler):
"""
Expand Down Expand Up @@ -95,8 +67,8 @@
else:
with TemporaryDirectory() as d:
out_file = Path(d) / "context.pickle"
run_in_subprocess([context_python, "-m", "ctxrunner", "ctx", str(ctx_path), str(out_file)],
cwd=db_dir, check=True)
subprocess.run([context_python, "-m", "ctxrunner", "ctx", str(ctx_path), str(out_file)],
cwd=db_dir, env=prepare_env(), check=True)

with out_file.open("rb") as f:
unpickler = ContextFileUnpickler(f)
Expand Down Expand Up @@ -197,51 +169,127 @@
))
self.kafka_prd.flush()

def extract_and_ingest(self, proposal, run, cluster=False,
run_data=RunData.ALL, match=(), variables=(), mock=False):
if proposal is None:
proposal = self.db.metameta['proposal']
class RunExtractor(Extractor):
def __init__(self, proposal, run, cluster=False, run_data=RunData.ALL,
match=(), variables=(), mock=False):
super().__init__()
self.proposal = proposal
self.run = run
self.cluster = cluster
self.run_data = run_data
self.match = match
self.variables = variables
self.mock = mock
self.uuid = str(uuid4())
self.running_msg = msg_dict(MsgKind.processing_running, {
takluyver marked this conversation as resolved.
Show resolved Hide resolved
'processing_id': self.uuid,
'proposal': proposal,
'run': run,
'data': run_data.value,
'hostname': socket.gethostname(),
'username': getuser(),
'slurm_cluster': self._slurm_cluster(),
'slurm_job_id': os.environ.get('SLURM_JOB_ID', ''),
})

@staticmethod
def _slurm_cluster():
# For some reason, SLURM_CLUSTER_NAME is '(null)'. This is a workaround:
if not os.environ.get('SLURM_JOB_ID', ''):
return None
partition = os.environ.get('SLURM_JOB_PARTITION', '')
return 'solaris' if (partition == 'solcpu') else 'maxwell'

Check warning on line 201 in damnit/backend/extract_data.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extract_data.py#L200-L201

Added lines #L200 - L201 were not covered by tests

@property
def out_path(self):
return Path('extracted_data', f'p{self.proposal}_r{self.run}.h5')

def _notify_running(self):
self.kafka_prd.send(self.db.kafka_topic, self.running_msg)

def _notify_finished(self):
self.kafka_prd.send(self.db.kafka_topic, msg_dict(
MsgKind.processing_finished, {'processing_id': self.uuid}
))

def extract_in_subprocess(self):
python_exe = self.db.metameta.get('context_python', '') or sys.executable

args = [python_exe, '-m', 'ctxrunner', 'exec', str(self.proposal), str(self.run),
self.run_data.value, '--save', self.out_path]
if self.cluster:
args.append('--cluster-job')

Check warning on line 221 in damnit/backend/extract_data.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extract_data.py#L221

Added line #L221 was not covered by tests
if self.mock:
args.append("--mock")
if self.variables:
for v in self.variables:
args.extend(['--var', v])

Check warning on line 226 in damnit/backend/extract_data.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extract_data.py#L225-L226

Added lines #L225 - L226 were not covered by tests
else:
for m in self.match:
args.extend(['--match', m])

out_path = Path('extracted_data', f'p{proposal}_r{run}.h5')
out_path.parent.mkdir(parents=True, exist_ok=True)
if out_path.parent.stat().st_uid == os.getuid():
os.chmod(out_path.parent, 0o777)
with TemporaryDirectory() as td:
# Save a separate copy of the reduced data, so we can send an update
# with only the variables that we've extracted.
reduced_out_path = Path(td, 'reduced.h5')
args.extend(['--save-reduced', str(reduced_out_path)])

python_exe = self.db.metameta.get('context_python', '')
reduced_data = extract_in_subprocess(
proposal, run, out_path, cluster=cluster, run_data=run_data,
match=match, variables=variables, python_exe=python_exe, mock=mock,
)
p = subprocess.Popen(args, env=prepare_env(), stdin=subprocess.DEVNULL)

while True:
try:
retcode = p.wait(timeout=10)
break
except subprocess.TimeoutExpired:
self._notify_running()

Check warning on line 244 in damnit/backend/extract_data.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extract_data.py#L243-L244

Added lines #L243 - L244 were not covered by tests

if retcode:
raise subprocess.CalledProcessError(retcode, p.args)

Check warning on line 247 in damnit/backend/extract_data.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extract_data.py#L247

Added line #L247 was not covered by tests

return load_reduced_data(reduced_out_path)

def extract_and_ingest(self):
self._notify_running()
self.out_path.parent.mkdir(parents=True, exist_ok=True)
if self.out_path.parent.stat().st_uid == os.getuid():
os.chmod(self.out_path.parent, 0o777)

reduced_data = self.extract_in_subprocess()
log.info("Reduced data has %d fields", len(reduced_data))
add_to_db(reduced_data, self.db, proposal, run)
add_to_db(reduced_data, self.db, self.proposal, self.run)

# Send all the updates for scalars
image_values = { name: reduced for name, reduced in reduced_data.items()
if isinstance(reduced.value, bytes) }
update_msg = msg_dict(MsgKind.run_values_updated, {
'run': run, 'proposal': proposal, 'values': { name: reduced.value for name, reduced in reduced_data.items()
if name not in image_values }})
'run': self.run, 'proposal': self.proposal, 'values': {
name: reduced.value for name, reduced in reduced_data.items()
if name not in image_values
}
})
self.kafka_prd.send(self.db.kafka_topic, update_msg).get(timeout=30)

# And each image update separately so we don't hit any size limits
for name, reduced in image_values.items():
update_msg = msg_dict(MsgKind.run_values_updated, {
'run': run, 'proposal': proposal, 'values': { name: reduced.value }})
'run': self.run, 'proposal': self.proposal, 'values': { name: reduced.value }})
self.kafka_prd.send(self.db.kafka_topic, update_msg).get(timeout=30)

log.info("Sent Kafka updates to topic %r", self.db.kafka_topic)

self._notify_finished()

# Launch a Slurm job if there are any 'cluster' variables to evaluate
if not cluster:
if not self.cluster:
ctx_slurm = self.ctx_whole.filter(
run_data=run_data, name_matches=match, variables=variables, cluster=True
run_data=self.run_data, name_matches=self.match, variables=self.variables, cluster=True
)
ctx_no_slurm = ctx_slurm.filter(cluster=False)
if set(ctx_slurm.vars) > set(ctx_no_slurm.vars):
submitter = ExtractionSubmitter(Path.cwd(), self.db)
cluster_req = ExtractionRequest(
run, proposal, mock=mock,
run_data=run_data, cluster=True, match=match, variables=variables
self.run, self.proposal, mock=self.mock, run_data=self.run_data,
cluster=True, match=self.match, variables=self.variables
)
submitter.submit(cluster_req)

Expand Down Expand Up @@ -273,16 +321,17 @@
log.info("Extracting cluster variables in Slurm job %s on %s",
os.environ.get('SLURM_JOB_ID', '?'), socket.gethostname())

extr = Extractor()
extr = RunExtractor(args.proposal, args.run,

Check warning on line 324 in damnit/backend/extract_data.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extract_data.py#L324

Added line #L324 was not covered by tests
cluster=args.cluster_job,
run_data=RunData(args.run_data),
match=args.match,
variables=args.var,
mock=args.mock)
if args.update_vars:
extr.update_db_vars()

extr.extract_and_ingest(args.proposal, args.run,
cluster=args.cluster_job,
run_data=RunData(args.run_data),
match=args.match,
variables=args.var,
mock=args.mock)
extr.extract_and_ingest()
extr.kafka_prd.flush(timeout=10)

Check warning on line 334 in damnit/backend/extract_data.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extract_data.py#L333-L334

Added lines #L333 - L334 were not covered by tests


if __name__ == '__main__':
Expand Down
70 changes: 70 additions & 0 deletions damnit/backend/extraction_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,73 @@
submitter.execute_in_slurm(req)
else:
submitter.submit(req)


class ExtractionJobTracker:
"""Track running extraction jobs using their running/finished messages"""
def __init__(self):
self.jobs = {} # keyed by processing_id

def on_processing_running(self, info):
proc_id = info['processing_id']
if info != self.jobs.get(proc_id, None):
self.jobs[proc_id] = info
self.on_run_jobs_changed(info['proposal'], info['run'])
log.debug("Processing running for p%s r%s on %s (%s)",
info['proposal'], info['run'], info['hostname'], proc_id)

def on_processing_finished(self, info):
proc_id = info['processing_id']
info = self.jobs.pop(proc_id, None)
if info is not None:
self.on_run_jobs_changed(info['proposal'], info['run'])
log.debug("Processing finished for p%s r%s (%s)",
info['proposal'], info['run'], proc_id)

def on_run_jobs_changed(self, proposal, run):
pass # Implement in subclass

Check warning on line 309 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L309

Added line #L309 was not covered by tests

def check_slurm_jobs(self):
"""Check for any Slurm jobs that exited without a 'finished' message"""
jobs_by_cluster = {}
for info in self.jobs.values():
if cluster := info['slurm_cluster']:
jobs_by_cluster.setdefault(cluster, []).append(info)

Check warning on line 316 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L313-L316

Added lines #L313 - L316 were not covered by tests

for cluster, infos in jobs_by_cluster.items():
jids = [i['slurm_job_id'] for i in infos]

Check warning on line 319 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L318-L319

Added lines #L318 - L319 were not covered by tests
# Passing 1 Job ID can give an 'Invalid job id' error if it has
# already left the queue. With multiple, we always get a list back.
if len(jids) == 1:
jids.append("1")

Check warning on line 323 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L322-L323

Added lines #L322 - L323 were not covered by tests

cmd = ["squeue", "--clusters", cluster, "--jobs=" + ",".join(jids),

Check warning on line 325 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L325

Added line #L325 was not covered by tests
"--format=%i %T", "--noheader"]
self.squeue_check_jobs(cmd, infos)

Check warning on line 327 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L327

Added line #L327 was not covered by tests

# Running the squeue subprocess is separated here so GUI code can override
# it, to avoid blocking the event loop if squeue is slow for any reason.
def squeue_check_jobs(self, cmd, jobs_to_check):
res = subprocess.run(cmd, stdout=subprocess.PIPE, text=True)
if res.returncode != 0:
log.warning("Error calling squeue")
return

Check warning on line 335 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L332-L335

Added lines #L332 - L335 were not covered by tests

self.process_squeue_output(res.stdout, jobs_to_check)

Check warning on line 337 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L337

Added line #L337 was not covered by tests

def process_squeue_output(self, stdout: str, jobs_to_check):
"""Inspect squeue output to clean up crashed jobs"""
still_running = set()
for line in stdout.splitlines():
job_id, status = line.strip().split()
if status == 'RUNNING':
still_running.add(job_id)

Check warning on line 345 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L341-L345

Added lines #L341 - L345 were not covered by tests

for info in jobs_to_check:
proc_id = info['processing_id']
job_id = info['slurm_job_id']
if (proc_id in self.jobs) and (job_id not in still_running):
del self.jobs[proc_id]
self.on_run_jobs_changed(info['proposal'], info['run'])
log.info("Slurm job %s on %s (%s) crashed or was cancelled",

Check warning on line 353 in damnit/backend/extraction_control.py

View check run for this annotation

Codecov / codecov/patch

damnit/backend/extraction_control.py#L347-L353

Added lines #L347 - L353 were not covered by tests
info['slurm_job_id'], info['slurm_cluster'], proc_id)
4 changes: 4 additions & 0 deletions damnit/gui/main_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,10 @@
)
elif msg_kind == MsgKind.variable_set:
self.table.handle_variable_set(data)
elif msg_kind == MsgKind.processing_running:
self.table.handle_processing_running(data)
elif msg_kind == MsgKind.processing_finished:
self.table.handle_processing_finished(data)

Check warning on line 495 in damnit/gui/main_window.py

View check run for this annotation

Codecov / codecov/patch

damnit/gui/main_window.py#L492-L495

Added lines #L492 - L495 were not covered by tests

def handle_run_values_updated(self, proposal, run, values: dict):
self.table.handle_run_values_changed(proposal, run, values)
Expand Down
Loading