Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 115 additions & 4 deletions src/job_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,124 @@ impl JobWatcher {
.map(|s| s.to_owned() + ":" + output_separator)
.join(",");

let fields_sacct = [
"jobid",
"jobname",
"state",
"user",
"Elapsed",
"AllocTRES",
"Partition",
"NodeList",
"SubmitLine",
"Reason",
"WorkDir",
];
let output_format_sacct = fields_sacct.join(",");

loop {
let jobs: Vec<Job> = Command::new("squeue")
let mut jobs_sacct: Vec<Job> = Command::new("sacct")
// .arg("--starttime ") todo make an option
.arg("-P")
.arg("--noheader")
.arg("--delimiter=###turm###")
.arg("--format")
.arg(&output_format_sacct)
.output()
.expect("failed to execute sacct process")
.stdout
.lines()
.map(|l| l.unwrap().trim().to_string())
.filter_map(|l| {
let parts: Vec<_> = l.split(output_separator).collect();

if parts.len() != 11 {
return None;
}

// jobid 0,jobname 1,state 2,user 3,Elapsed 4,AllocTRES5, Partition 6,NodeList 7, SubmitLine 8,Reason 9,WorkDir 10
let id = parts[0];
let name = parts[1];
let state = parts[2];

// remove the .batch and .extern jobs from sacct
if id.contains(".") {
return None;
}
// do not print running jobs, handeled by squue
if state == "RUNNING" {
return None;
}

let user = parts[3];
let time = parts[4];
let tres = parts[5];
let partition = parts[6];
let nodelist = parts[7];
// let stdout = parts[8];
// let stderr = parts[9];
let command = parts[8];
let state_compact = &parts[2][0..1];
let reason = parts[9];

let array_job_id = "N/A";
let array_task_id = "N/A";
let node_list = parts[7];
let working_dir = parts[10];

Some(Job {
job_id: id.to_owned(),
array_id: array_job_id.to_owned(),
array_step: match array_task_id {
"N/A" => None,
_ => Some(array_task_id.to_owned()),
},
name: name.to_owned(),
state: state.to_owned(),
state_compact: state_compact.to_owned(),
reason: if reason == "None" {
None
} else {
Some(reason.to_owned())
},
user: user.to_owned(),
time: time.to_owned(),
tres: tres.to_owned(),
partition: partition.to_owned(),
nodelist: nodelist.to_owned(),
command: command.to_owned(),
stdout: Self::resolve_path(
"",
array_job_id,
array_task_id,
id,
node_list,
user,
name,
working_dir,
),
stderr: Self::resolve_path(
"",
array_job_id,
array_task_id,
id,
node_list,
user,
name,
working_dir,
), // TODO fill all fields
})
})
.collect();


let mut jobs_squeue: Vec<Job> = Command::new("squeue")
.args(&self.squeue_args)
.arg("--array")
.arg("--noheader")
.arg("--Format")
.arg(&output_format)
.output()
.expect("failed to execute process")
.output().expect("failed to execute squeue process")
.stdout
.lines()
.map(|l| l.unwrap().trim().to_string())
Expand Down Expand Up @@ -131,7 +240,9 @@ impl JobWatcher {
})
})
.collect();
self.app.send(AppMessage::Jobs(jobs)).unwrap();

jobs_squeue.append(&mut jobs_sacct);
self.app.send(AppMessage::Jobs(jobs_squeue)).unwrap();
thread::sleep(self.interval);
}
}
Expand Down