Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5994,7 +5994,7 @@ components:
type: string

compute_node_expiration_buffer_seconds:
default: 60
default: 180
description: "Inform all compute nodes to shut down this number of seconds\
\ before the expiration time. This allows torc to send SIGTERM to all\
\ job processes and set all statuses to terminated. Increase the time\
Expand Down
2 changes: 1 addition & 1 deletion docs/src/core/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -1552,7 +1552,7 @@ Schedule compute nodes using Slurm

###### **Options:**

- `-j`, `--job-prefix <JOB_PREFIX>` — Job prefix for the Slurm job names. Default: `worker`
- `-j`, `--job-prefix <JOB_PREFIX>` — Job prefix for the Slurm job names. Default: empty
- `--keep-submission-scripts` — Keep submission scripts after job submission. Default: `false`
- `-m`, `--max-parallel-jobs <MAX_PARALLEL_JOBS>` — Maximum number of parallel jobs
- `-n`, `--num-hpc-jobs <NUM_HPC_JOBS>` — Number of HPC jobs to submit. Default: `1`
Expand Down
4 changes: 2 additions & 2 deletions julia_client/Torc/src/api/models/model_WorkflowModel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
user=nothing,
description=nothing,
timestamp=nothing,
compute_node_expiration_buffer_seconds=60,
compute_node_expiration_buffer_seconds=180,
compute_node_wait_for_new_jobs_seconds=90,
compute_node_ignore_workflow_completion=false,
compute_node_wait_for_healthy_database_minutes=20,
Expand Down Expand Up @@ -44,7 +44,7 @@ Base.@kwdef mutable struct WorkflowModel <: OpenAPI.APIModel
user::Union{Nothing, String} = nothing
description::Union{Nothing, String} = nothing
timestamp::Union{Nothing, String} = nothing
compute_node_expiration_buffer_seconds::Union{Nothing, Int64} = 60
compute_node_expiration_buffer_seconds::Union{Nothing, Int64} = 180
compute_node_wait_for_new_jobs_seconds::Union{Nothing, Int64} = 90
compute_node_ignore_workflow_completion::Union{Nothing, Bool} = false
compute_node_wait_for_healthy_database_minutes::Union{Nothing, Int64} = 20
Expand Down
10 changes: 9 additions & 1 deletion src/client/apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,15 @@ impl<T> fmt::Display for Error<T> {
Error::Reqwest(e) => ("reqwest", e.to_string()),
Error::Serde(e) => ("serde", e.to_string()),
Error::Io(e) => ("IO", e.to_string()),
Error::ResponseError(e) => ("response", format!("status code {}", e.status)),
Error::ResponseError(e) => {
// Include response content to show the actual error message from the server
let msg = if e.content.is_empty() {
format!("status code {}", e.status)
} else {
format!("status code {}: {}", e.status, e.content)
};
("response", msg)
}
};
write!(f, "error in {}: {}", module, e)
}
Expand Down
113 changes: 34 additions & 79 deletions src/client/commands/slurm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ EXAMPLES:
#[arg()]
workflow_id: Option<i64>,
/// Job prefix for the Slurm job names
#[arg(short, long, default_value = "worker")]
#[arg(short, long, default_value = "")]
job_prefix: String,
/// Keep submission scripts after job submission
#[arg(long, default_value = "false")]
Expand Down Expand Up @@ -1391,34 +1391,14 @@ pub fn schedule_slurm_nodes(

std::fs::create_dir_all(output)?;

for _job_num in 1..num_hpc_jobs + 1 {
// Create the scheduled compute node record first so we can use its ID in the Slurm job name.
// This allows us to correlate the Slurm job with the scheduled compute node record.
// We use scheduler_id=0 as a placeholder since we don't have the Slurm job ID yet.
let scheduled_compute_node = models::ScheduledComputeNodesModel::new(
for job_num in 1..num_hpc_jobs + 1 {
let job_name = format!(
"{}wf{}_{}_{}",
job_prefix,
workflow_id,
0, // Placeholder - will be updated after submission
scheduler_config_id,
"slurm".to_string(),
"pending".to_string(),
std::process::id(),
job_num
);
let created_scn = match utils::send_with_retries(
config,
|| default_api::create_scheduled_compute_node(config, scheduled_compute_node.clone()),
WAIT_FOR_HEALTHY_DATABASE_MINUTES,
) {
Ok(scn) => scn,
Err(e) => {
error!("Failed to create scheduled compute node: {}", e);
return Err(format!("Failed to create scheduled compute node: {}", e).into());
}
};
let scn_id = created_scn
.id
.expect("Created scheduled compute node should have an ID");

// Use the scheduled compute node ID in the job name for correlation
let job_name = format!("{}_{}", job_prefix, scn_id);
let script_path = format!("{}/{}.sh", output, job_name);

if let Err(e) = slurm_interface.create_submission_script(
Expand All @@ -1433,80 +1413,55 @@ pub fn schedule_slurm_nodes(
start_one_worker_per_node,
) {
error!("Error creating submission script: {}", e);
// Clean up the scheduled compute node record since submission failed
if let Err(del_err) = utils::send_with_retries(
config,
|| default_api::delete_scheduled_compute_node(config, scn_id, None),
WAIT_FOR_HEALTHY_DATABASE_MINUTES,
) {
error!(
"Failed to delete scheduled compute node after script creation failure: {}",
del_err
);
}
return Err(e.into());
}

match slurm_interface.submit(Path::new(&script_path)) {
Ok((return_code, slurm_job_id, stderr)) => {
if return_code != 0 {
error!("Error submitting job: {}", stderr);
// Clean up the scheduled compute node record since submission failed
if let Err(del_err) = utils::send_with_retries(
config,
|| default_api::delete_scheduled_compute_node(config, scn_id, None),
WAIT_FOR_HEALTHY_DATABASE_MINUTES,
) {
error!(
"Failed to delete scheduled compute node after submission failure: {}",
del_err
);
}
return Err(format!("Job submission failed: {}", stderr).into());
}
let slurm_job_id_int: i64 = slurm_job_id
.parse()
.unwrap_or_else(|_| panic!("Failed to parse Slurm job ID {}", slurm_job_id));
info!(
"Submitted Slurm job name={} with ID={} (scheduled_compute_node_id={})",
job_name, slurm_job_id_int, scn_id
);

// Update the scheduled compute node with the actual Slurm job ID
let mut updated_scn = created_scn.clone();
updated_scn.scheduler_id = slurm_job_id_int;
if let Err(e) = utils::send_with_retries(
// Create the scheduled compute node record only after we have a valid Slurm job ID
let scheduled_compute_node = models::ScheduledComputeNodesModel::new(
workflow_id,
slurm_job_id_int,
scheduler_config_id,
"slurm".to_string(),
"pending".to_string(),
);
let created_scn = match utils::send_with_retries(
config,
|| {
default_api::update_scheduled_compute_node(
default_api::create_scheduled_compute_node(
config,
scn_id,
updated_scn.clone(),
scheduled_compute_node.clone(),
)
},
WAIT_FOR_HEALTHY_DATABASE_MINUTES,
) {
error!(
"Failed to update scheduled compute node with Slurm job ID: {}",
e
);
}

// Event is now broadcast via SSE from the server when the scheduled compute node is created
Ok(scn) => scn,
Err(e) => {
error!("Failed to create scheduled compute node: {}", e);
return Err(
format!("Failed to create scheduled compute node: {}", e).into()
);
}
};
let scn_id = created_scn
.id
.expect("Created scheduled compute node should have an ID");
info!(
"Submitted Slurm job name={} with ID={} (scheduled_compute_node_id={})",
job_name, slurm_job_id_int, scn_id
);
}
Err(e) => {
error!("Error submitting job: {}", e);
// Clean up the scheduled compute node record since submission failed
if let Err(del_err) = utils::send_with_retries(
config,
|| default_api::delete_scheduled_compute_node(config, scn_id, None),
WAIT_FOR_HEALTHY_DATABASE_MINUTES,
) {
error!(
"Failed to delete scheduled compute node after submission error: {}",
del_err
);
}
return Err(e.into());
}
}
Expand Down Expand Up @@ -3500,7 +3455,7 @@ fn handle_regenerate(
workflow_id,
scheduler_info.id,
scheduler_info.num_allocations as i32,
"worker",
"",
output_dir.to_str().unwrap_or("output"),
poll_interval,
None, // max_parallel_jobs
Expand Down
50 changes: 47 additions & 3 deletions src/client/commands/workflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ struct WorkflowTableRowNoUser {
timestamp: String,
}

#[derive(Tabled)]
struct WorkflowTableRow {
#[tabled(rename = "ID")]
id: i64,
#[tabled(rename = "User")]
user: String,
#[tabled(rename = "Name")]
name: String,
#[tabled(rename = "Description")]
description: String,
#[tabled(rename = "Timestamp")]
timestamp: String,
}

#[derive(Tabled)]
struct WorkflowActionTableRow {
#[tabled(rename = "ID")]
Expand Down Expand Up @@ -255,6 +269,9 @@ EXAMPLES:
# Show archived workflows
torc workflows list --archived-only
torc workflows list --include-archived

# Show workflows from all users
torc workflows list --all-users
"
)]
List {
Expand All @@ -276,6 +293,9 @@ EXAMPLES:
/// Include both archived and non-archived workflows
#[arg(long, default_value = "false")]
include_archived: bool,
/// Show workflows from all users (filtered by access when authentication is enabled)
#[arg(long, default_value = "false")]
all_users: bool,
},
/// Get a specific workflow by ID
#[command(
Expand Down Expand Up @@ -2258,14 +2278,19 @@ fn handle_list(
reverse_sort: bool,
archived_only: bool,
include_archived: bool,
all_users: bool,
format: &str,
) {
// Use pagination utility to get all workflows
let mut params = WorkflowListParams::new()
.with_offset(offset)
.with_limit(limit)
.with_reverse_sort(reverse_sort)
.with_user(user.to_string());
.with_reverse_sort(reverse_sort);

// When --all-users is not set, filter by current user (default behavior)
if !all_users {
params = params.with_user(user.to_string());
}

// Handle archive filtering:
// - include_archived: show both archived and non-archived (is_archived = None)
Expand Down Expand Up @@ -2316,7 +2341,24 @@ fn handle_list(
}
}
} else if workflows.is_empty() {
println!("No workflows found for user: {}", user);
if all_users {
println!("No workflows found.");
} else {
println!("No workflows found for user: {}", user);
}
} else if all_users {
println!("All workflows:");
let rows: Vec<WorkflowTableRow> = workflows
.iter()
.map(|workflow| WorkflowTableRow {
id: workflow.id.unwrap_or(-1),
user: workflow.user.clone(),
name: workflow.name.clone(),
description: workflow.description.as_deref().unwrap_or("").to_string(),
timestamp: workflow.timestamp.as_deref().unwrap_or("").to_string(),
})
.collect();
display_table_with_count(&rows, "workflows");
} else {
println!("Workflows for user {}:", user);
let rows: Vec<WorkflowTableRowNoUser> = workflows
Expand Down Expand Up @@ -2914,6 +2956,7 @@ pub fn handle_workflow_commands(config: &Configuration, command: &WorkflowComman
reverse_sort,
archived_only,
include_archived,
all_users,
} => {
handle_list(
config,
Expand All @@ -2924,6 +2967,7 @@ pub fn handle_workflow_commands(config: &Configuration, command: &WorkflowComman
*reverse_sort,
*archived_only,
*include_archived,
*all_users,
format,
);
}
Expand Down
2 changes: 1 addition & 1 deletion src/client/job_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1787,7 +1787,7 @@ impl JobRunner {
self.workflow_id,
scheduler_id,
num_allocations,
"worker",
"",
"output",
self.torc_config.client.slurm.poll_interval,
max_parallel_jobs,
Expand Down
2 changes: 1 addition & 1 deletion src/client/workflow_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ impl WorkflowManager {
self.workflow_id,
scheduler_id,
num_allocations,
"worker",
"",
"output",
self.torc_config.client.slurm.poll_interval,
max_parallel_jobs,
Expand Down
2 changes: 1 addition & 1 deletion src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9762,7 +9762,7 @@ impl WorkflowModel {
user,
description: None,
timestamp: None,
compute_node_expiration_buffer_seconds: Some(60),
compute_node_expiration_buffer_seconds: Some(180),
compute_node_wait_for_new_jobs_seconds: Some(0),
compute_node_ignore_workflow_completion: Some(false),
compute_node_wait_for_healthy_database_minutes: Some(20),
Expand Down
Loading