diff --git a/api/openapi.yaml b/api/openapi.yaml index 538a57ab..8a7187b2 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -5994,7 +5994,7 @@ components: type: string compute_node_expiration_buffer_seconds: - default: 60 + default: 180 description: "Inform all compute nodes to shut down this number of seconds\ \ before the expiration time. This allows torc to send SIGTERM to all\ \ job processes and set all statuses to terminated. Increase the time\ diff --git a/docs/src/core/reference/cli.md b/docs/src/core/reference/cli.md index b1630cd7..3c95af66 100644 --- a/docs/src/core/reference/cli.md +++ b/docs/src/core/reference/cli.md @@ -1552,7 +1552,7 @@ Schedule compute nodes using Slurm ###### **Options:** -- `-j`, `--job-prefix ` — Job prefix for the Slurm job names. Default: `worker` +- `-j`, `--job-prefix ` — Job prefix for the Slurm job names. Default: empty - `--keep-submission-scripts` — Keep submission scripts after job submission. Default: `false` - `-m`, `--max-parallel-jobs ` — Maximum number of parallel jobs - `-n`, `--num-hpc-jobs ` — Number of HPC jobs to submit. Default: `1` diff --git a/julia_client/Torc/src/api/models/model_WorkflowModel.jl b/julia_client/Torc/src/api/models/model_WorkflowModel.jl index 0bad87ce..c2cac6ee 100644 --- a/julia_client/Torc/src/api/models/model_WorkflowModel.jl +++ b/julia_client/Torc/src/api/models/model_WorkflowModel.jl @@ -10,7 +10,7 @@ user=nothing, description=nothing, timestamp=nothing, - compute_node_expiration_buffer_seconds=60, + compute_node_expiration_buffer_seconds=180, compute_node_wait_for_new_jobs_seconds=90, compute_node_ignore_workflow_completion=false, compute_node_wait_for_healthy_database_minutes=20, @@ -44,7 +44,7 @@ Base.@kwdef mutable struct WorkflowModel <: OpenAPI.APIModel user::Union{Nothing, String} = nothing description::Union{Nothing, String} = nothing timestamp::Union{Nothing, String} = nothing - compute_node_expiration_buffer_seconds::Union{Nothing, Int64} = 60 + compute_node_expiration_buffer_seconds::Union{Nothing, Int64} = 180 compute_node_wait_for_new_jobs_seconds::Union{Nothing, Int64} = 90 compute_node_ignore_workflow_completion::Union{Nothing, Bool} = false compute_node_wait_for_healthy_database_minutes::Union{Nothing, Int64} = 20 diff --git a/src/client/apis.rs b/src/client/apis.rs index 65079f6b..313f5413 100644 --- a/src/client/apis.rs +++ b/src/client/apis.rs @@ -24,7 +24,15 @@ impl fmt::Display for Error { Error::Reqwest(e) => ("reqwest", e.to_string()), Error::Serde(e) => ("serde", e.to_string()), Error::Io(e) => ("IO", e.to_string()), - Error::ResponseError(e) => ("response", format!("status code {}", e.status)), + Error::ResponseError(e) => { + // Include response content to show the actual error message from the server + let msg = if e.content.is_empty() { + format!("status code {}", e.status) + } else { + format!("status code {}: {}", e.status, e.content) + }; + ("response", msg) + } }; write!(f, "error in {}: {}", module, e) } diff --git a/src/client/commands/slurm.rs b/src/client/commands/slurm.rs index 239e9a4b..71202c58 100644 --- a/src/client/commands/slurm.rs +++ b/src/client/commands/slurm.rs @@ -365,7 +365,7 @@ EXAMPLES: #[arg()] workflow_id: Option, /// Job prefix for the Slurm job names - #[arg(short, long, default_value = "worker")] + #[arg(short, long, default_value = "")] job_prefix: String, /// Keep submission scripts after job submission #[arg(long, default_value = "false")] @@ -1391,34 +1391,14 @@ pub fn schedule_slurm_nodes( std::fs::create_dir_all(output)?; - for _job_num in 1..num_hpc_jobs + 1 { - // Create the scheduled compute node record first so we can use its ID in the Slurm job name. - // This allows us to correlate the Slurm job with the scheduled compute node record. - // We use scheduler_id=0 as a placeholder since we don't have the Slurm job ID yet. - let scheduled_compute_node = models::ScheduledComputeNodesModel::new( + for job_num in 1..num_hpc_jobs + 1 { + let job_name = format!( + "{}wf{}_{}_{}", + job_prefix, workflow_id, - 0, // Placeholder - will be updated after submission - scheduler_config_id, - "slurm".to_string(), - "pending".to_string(), + std::process::id(), + job_num ); - let created_scn = match utils::send_with_retries( - config, - || default_api::create_scheduled_compute_node(config, scheduled_compute_node.clone()), - WAIT_FOR_HEALTHY_DATABASE_MINUTES, - ) { - Ok(scn) => scn, - Err(e) => { - error!("Failed to create scheduled compute node: {}", e); - return Err(format!("Failed to create scheduled compute node: {}", e).into()); - } - }; - let scn_id = created_scn - .id - .expect("Created scheduled compute node should have an ID"); - - // Use the scheduled compute node ID in the job name for correlation - let job_name = format!("{}_{}", job_prefix, scn_id); let script_path = format!("{}/{}.sh", output, job_name); if let Err(e) = slurm_interface.create_submission_script( @@ -1433,17 +1413,6 @@ pub fn schedule_slurm_nodes( start_one_worker_per_node, ) { error!("Error creating submission script: {}", e); - // Clean up the scheduled compute node record since submission failed - if let Err(del_err) = utils::send_with_retries( - config, - || default_api::delete_scheduled_compute_node(config, scn_id, None), - WAIT_FOR_HEALTHY_DATABASE_MINUTES, - ) { - error!( - "Failed to delete scheduled compute node after script creation failure: {}", - del_err - ); - } return Err(e.into()); } @@ -1451,62 +1420,48 @@ pub fn schedule_slurm_nodes( Ok((return_code, slurm_job_id, stderr)) => { if return_code != 0 { error!("Error submitting job: {}", stderr); - // Clean up the scheduled compute node record since submission failed - if let Err(del_err) = utils::send_with_retries( - config, - || default_api::delete_scheduled_compute_node(config, scn_id, None), - WAIT_FOR_HEALTHY_DATABASE_MINUTES, - ) { - error!( - "Failed to delete scheduled compute node after submission failure: {}", - del_err - ); - } return Err(format!("Job submission failed: {}", stderr).into()); } let slurm_job_id_int: i64 = slurm_job_id .parse() .unwrap_or_else(|_| panic!("Failed to parse Slurm job ID {}", slurm_job_id)); - info!( - "Submitted Slurm job name={} with ID={} (scheduled_compute_node_id={})", - job_name, slurm_job_id_int, scn_id - ); - // Update the scheduled compute node with the actual Slurm job ID - let mut updated_scn = created_scn.clone(); - updated_scn.scheduler_id = slurm_job_id_int; - if let Err(e) = utils::send_with_retries( + // Create the scheduled compute node record only after we have a valid Slurm job ID + let scheduled_compute_node = models::ScheduledComputeNodesModel::new( + workflow_id, + slurm_job_id_int, + scheduler_config_id, + "slurm".to_string(), + "pending".to_string(), + ); + let created_scn = match utils::send_with_retries( config, || { - default_api::update_scheduled_compute_node( + default_api::create_scheduled_compute_node( config, - scn_id, - updated_scn.clone(), + scheduled_compute_node.clone(), ) }, WAIT_FOR_HEALTHY_DATABASE_MINUTES, ) { - error!( - "Failed to update scheduled compute node with Slurm job ID: {}", - e - ); - } - - // Event is now broadcast via SSE from the server when the scheduled compute node is created + Ok(scn) => scn, + Err(e) => { + error!("Failed to create scheduled compute node: {}", e); + return Err( + format!("Failed to create scheduled compute node: {}", e).into() + ); + } + }; + let scn_id = created_scn + .id + .expect("Created scheduled compute node should have an ID"); + info!( + "Submitted Slurm job name={} with ID={} (scheduled_compute_node_id={})", + job_name, slurm_job_id_int, scn_id + ); } Err(e) => { error!("Error submitting job: {}", e); - // Clean up the scheduled compute node record since submission failed - if let Err(del_err) = utils::send_with_retries( - config, - || default_api::delete_scheduled_compute_node(config, scn_id, None), - WAIT_FOR_HEALTHY_DATABASE_MINUTES, - ) { - error!( - "Failed to delete scheduled compute node after submission error: {}", - del_err - ); - } return Err(e.into()); } } @@ -3500,7 +3455,7 @@ fn handle_regenerate( workflow_id, scheduler_info.id, scheduler_info.num_allocations as i32, - "worker", + "", output_dir.to_str().unwrap_or("output"), poll_interval, None, // max_parallel_jobs diff --git a/src/client/commands/workflows.rs b/src/client/commands/workflows.rs index d7a3f892..0017151d 100644 --- a/src/client/commands/workflows.rs +++ b/src/client/commands/workflows.rs @@ -84,6 +84,20 @@ struct WorkflowTableRowNoUser { timestamp: String, } +#[derive(Tabled)] +struct WorkflowTableRow { + #[tabled(rename = "ID")] + id: i64, + #[tabled(rename = "User")] + user: String, + #[tabled(rename = "Name")] + name: String, + #[tabled(rename = "Description")] + description: String, + #[tabled(rename = "Timestamp")] + timestamp: String, +} + #[derive(Tabled)] struct WorkflowActionTableRow { #[tabled(rename = "ID")] @@ -255,6 +269,9 @@ EXAMPLES: # Show archived workflows torc workflows list --archived-only torc workflows list --include-archived + + # Show workflows from all users + torc workflows list --all-users " )] List { @@ -276,6 +293,9 @@ EXAMPLES: /// Include both archived and non-archived workflows #[arg(long, default_value = "false")] include_archived: bool, + /// Show workflows from all users (filtered by access when authentication is enabled) + #[arg(long, default_value = "false")] + all_users: bool, }, /// Get a specific workflow by ID #[command( @@ -2258,14 +2278,19 @@ fn handle_list( reverse_sort: bool, archived_only: bool, include_archived: bool, + all_users: bool, format: &str, ) { // Use pagination utility to get all workflows let mut params = WorkflowListParams::new() .with_offset(offset) .with_limit(limit) - .with_reverse_sort(reverse_sort) - .with_user(user.to_string()); + .with_reverse_sort(reverse_sort); + + // When --all-users is not set, filter by current user (default behavior) + if !all_users { + params = params.with_user(user.to_string()); + } // Handle archive filtering: // - include_archived: show both archived and non-archived (is_archived = None) @@ -2316,7 +2341,24 @@ fn handle_list( } } } else if workflows.is_empty() { - println!("No workflows found for user: {}", user); + if all_users { + println!("No workflows found."); + } else { + println!("No workflows found for user: {}", user); + } + } else if all_users { + println!("All workflows:"); + let rows: Vec = workflows + .iter() + .map(|workflow| WorkflowTableRow { + id: workflow.id.unwrap_or(-1), + user: workflow.user.clone(), + name: workflow.name.clone(), + description: workflow.description.as_deref().unwrap_or("").to_string(), + timestamp: workflow.timestamp.as_deref().unwrap_or("").to_string(), + }) + .collect(); + display_table_with_count(&rows, "workflows"); } else { println!("Workflows for user {}:", user); let rows: Vec = workflows @@ -2914,6 +2956,7 @@ pub fn handle_workflow_commands(config: &Configuration, command: &WorkflowComman reverse_sort, archived_only, include_archived, + all_users, } => { handle_list( config, @@ -2924,6 +2967,7 @@ pub fn handle_workflow_commands(config: &Configuration, command: &WorkflowComman *reverse_sort, *archived_only, *include_archived, + *all_users, format, ); } diff --git a/src/client/job_runner.rs b/src/client/job_runner.rs index ad3d3c8c..975d122d 100644 --- a/src/client/job_runner.rs +++ b/src/client/job_runner.rs @@ -1787,7 +1787,7 @@ impl JobRunner { self.workflow_id, scheduler_id, num_allocations, - "worker", + "", "output", self.torc_config.client.slurm.poll_interval, max_parallel_jobs, diff --git a/src/client/workflow_manager.rs b/src/client/workflow_manager.rs index 62fbe9e4..5a7226f7 100644 --- a/src/client/workflow_manager.rs +++ b/src/client/workflow_manager.rs @@ -266,7 +266,7 @@ impl WorkflowManager { self.workflow_id, scheduler_id, num_allocations, - "worker", + "", "output", self.torc_config.client.slurm.poll_interval, max_parallel_jobs, diff --git a/src/models.rs b/src/models.rs index 13308869..b482b918 100644 --- a/src/models.rs +++ b/src/models.rs @@ -9762,7 +9762,7 @@ impl WorkflowModel { user, description: None, timestamp: None, - compute_node_expiration_buffer_seconds: Some(60), + compute_node_expiration_buffer_seconds: Some(180), compute_node_wait_for_new_jobs_seconds: Some(0), compute_node_ignore_workflow_completion: Some(false), compute_node_wait_for_healthy_database_minutes: Some(20), diff --git a/src/server/api/workflows.rs b/src/server/api/workflows.rs index b0d2e0e1..a5a8b12e 100644 --- a/src/server/api/workflows.rs +++ b/src/server/api/workflows.rs @@ -150,6 +150,310 @@ impl WorkflowsApiImpl { pub fn new(context: ApiContext) -> Self { Self { context } } + + /// List workflows with optional access control filtering. + /// + /// When `accessible_ids` is `Some(ids)`, only workflows with IDs in the list are returned. + /// When `accessible_ids` is `None`, no ID-based filtering is applied. + pub async fn list_workflows_filtered( + &self, + offset: i64, + sort_by: Option, + reverse_sort: Option, + limit: i64, + name: Option, + user: Option, + description: Option, + is_archived: Option, + accessible_ids: Option>, + context: &C, + ) -> Result + where + C: Has + Send + Sync, + { + debug!( + "list_workflows_filtered({}, {:?}, {:?}, {}, {:?}, {:?}, {:?}, {:?}, accessible_ids={:?}) - X-Span-ID: {:?}", + offset, + sort_by, + reverse_sort, + limit, + name, + user, + description, + is_archived, + accessible_ids.as_ref().map(|ids| ids.len()), + context.get().0.clone() + ); + + // Build base query - join with workflow_status if is_archived filter is needed + let base_query = if is_archived.is_some() { + " + SELECT + w.id + ,w.name + ,w.user + ,w.description + ,w.timestamp + ,w.compute_node_expiration_buffer_seconds + ,w.compute_node_wait_for_new_jobs_seconds + ,w.compute_node_ignore_workflow_completion + ,w.compute_node_wait_for_healthy_database_minutes + ,w.compute_node_min_time_for_new_jobs_seconds + ,w.jobs_sort_method + ,w.resource_monitor_config + ,w.slurm_defaults + ,w.status_id + FROM workflow w + INNER JOIN workflow_status ws ON w.status_id = ws.id + " + .to_string() + } else { + " + SELECT + id + ,name + ,user + ,description + ,timestamp + ,compute_node_expiration_buffer_seconds + ,compute_node_wait_for_new_jobs_seconds + ,compute_node_ignore_workflow_completion + ,compute_node_wait_for_healthy_database_minutes + ,compute_node_min_time_for_new_jobs_seconds + ,jobs_sort_method + ,resource_monitor_config + ,slurm_defaults + ,status_id + FROM workflow + " + .to_string() + }; + + // Build WHERE clause conditions + let mut where_conditions = Vec::new(); + + // Use table prefix when joining with workflow_status + let table_prefix = if is_archived.is_some() { "w." } else { "" }; + + if name.is_some() { + where_conditions.push(format!("{}name = ?", table_prefix)); + } + + if user.is_some() { + where_conditions.push(format!("{}user = ?", table_prefix)); + } + + if description.is_some() { + where_conditions.push(format!("{}description LIKE ?", table_prefix)); + } + + if let Some(archived) = is_archived { + if archived { + where_conditions.push("ws.is_archived = 1".to_string()); + } else { + where_conditions.push("(ws.is_archived IS NULL OR ws.is_archived = 0)".to_string()); + } + } + + // Access control filtering: restrict to accessible workflow IDs + if let Some(ref ids) = accessible_ids { + if ids.is_empty() { + // No accessible workflows - return empty result + return Ok(ListWorkflowsResponse::SuccessfulResponse( + models::ListWorkflowsResponse { + items: Some(Vec::new()), + offset, + max_limit: MAX_RECORD_TRANSFER_COUNT, + count: 0, + total_count: 0, + has_more: false, + }, + )); + } + let placeholders: Vec = ids.iter().map(|_| "?".to_string()).collect(); + where_conditions.push(format!( + "{}id IN ({})", + table_prefix, + placeholders.join(", ") + )); + } + + let where_clause = if where_conditions.is_empty() { + String::new() + } else { + where_conditions.join(" AND ") + }; + + // Build the complete query with pagination and sorting + // Use table prefix for default sort column when joining + let default_sort_column = if is_archived.is_some() { "w.id" } else { "id" }; + + // Add table prefix to sort_by column if joining and column is ambiguous + let prefixed_sort_by = if is_archived.is_some() { + sort_by.as_ref().map(|col| { + // If the column is "id" (ambiguous), prefix it with "w." + // For other workflow columns, also add prefix to be consistent + if col == "id" || !col.contains('.') { + format!("w.{}", col) + } else { + col.clone() + } + }) + } else { + sort_by.clone() + }; + + let query = if where_clause.is_empty() { + SqlQueryBuilder::new(base_query) + .with_pagination_and_sorting( + offset, + limit, + prefixed_sort_by, + reverse_sort, + default_sort_column, + ) + .build() + } else { + SqlQueryBuilder::new(base_query) + .with_where(where_clause.clone()) + .with_pagination_and_sorting( + offset, + limit, + prefixed_sort_by, + reverse_sort, + default_sort_column, + ) + .build() + }; + + debug!("Executing query: {}", query); + + // Execute the query + let mut sqlx_query = sqlx::query(&query); + + // Bind optional parameters in order + if let Some(workflow_name) = &name { + sqlx_query = sqlx_query.bind(workflow_name); + } + if let Some(workflow_user) = &user { + sqlx_query = sqlx_query.bind(workflow_user); + } + if let Some(workflow_description) = &description { + sqlx_query = sqlx_query.bind(format!("%{}%", workflow_description)); + } + // Bind accessible IDs + if let Some(ref ids) = accessible_ids { + for id in ids { + sqlx_query = sqlx_query.bind(id); + } + } + + let records = match sqlx_query.fetch_all(self.context.pool.as_ref()).await { + Ok(recs) => recs, + Err(e) => { + error!("Database error: {}", e); + return Err(database_error(e)); + } + }; + + let mut items: Vec = Vec::new(); + for record in records { + let jobs_sort_method_str: String = record.get("jobs_sort_method"); + let sort_method = jobs_sort_method_str + .parse::() + .ok(); + items.push(models::WorkflowModel { + id: Some(record.get("id")), + name: record.get("name"), + user: record.get("user"), + description: record.get("description"), + timestamp: Some(record.get("timestamp")), + compute_node_expiration_buffer_seconds: Some( + record.get("compute_node_expiration_buffer_seconds"), + ), + compute_node_wait_for_new_jobs_seconds: Some( + record.get("compute_node_wait_for_new_jobs_seconds"), + ), + compute_node_ignore_workflow_completion: Some( + record.get::("compute_node_ignore_workflow_completion") != 0, + ), + compute_node_wait_for_healthy_database_minutes: Some( + record.get("compute_node_wait_for_healthy_database_minutes"), + ), + compute_node_min_time_for_new_jobs_seconds: Some( + record.get("compute_node_min_time_for_new_jobs_seconds"), + ), + jobs_sort_method: sort_method, + resource_monitor_config: record.get("resource_monitor_config"), + slurm_defaults: record.get("slurm_defaults"), + use_pending_failed: record + .try_get::, _>("use_pending_failed") + .ok() + .flatten() + .map(|v| v != 0), + status_id: Some(record.get("status_id")), + }); + } + + // For proper pagination, we should get the total count without LIMIT/OFFSET + let count_base_query = if is_archived.is_some() { + "SELECT COUNT(*) as total FROM workflow w INNER JOIN workflow_status ws ON w.status_id = ws.id" + } else { + "SELECT COUNT(*) as total FROM workflow" + }; + let count_query = if where_clause.is_empty() { + count_base_query.to_string() + } else { + format!("{} WHERE {}", count_base_query, where_clause) + }; + + let mut count_sqlx_query = sqlx::query(&count_query); + if let Some(workflow_name) = &name { + count_sqlx_query = count_sqlx_query.bind(workflow_name); + } + if let Some(workflow_user) = &user { + count_sqlx_query = count_sqlx_query.bind(workflow_user); + } + if let Some(workflow_description) = &description { + count_sqlx_query = count_sqlx_query.bind(format!("%{}%", workflow_description)); + } + // Bind accessible IDs for count query + if let Some(ref ids) = accessible_ids { + for id in ids { + count_sqlx_query = count_sqlx_query.bind(id); + } + } + + let total_count = match count_sqlx_query.fetch_one(self.context.pool.as_ref()).await { + Ok(row) => row.get::("total"), + Err(e) => { + error!("Database error getting count: {}", e); + return Err(database_error(e)); + } + }; + + let current_count = items.len() as i64; + let offset_val = offset; + let has_more = offset_val + current_count < total_count; + + debug!( + "list_workflows_filtered({}/{}) - X-Span-ID: {:?}", + current_count, + total_count, + context.get().0.clone() + ); + + Ok(ListWorkflowsResponse::SuccessfulResponse( + models::ListWorkflowsResponse { + items: Some(items), + offset: offset_val, + max_limit: MAX_RECORD_TRANSFER_COUNT, + count: current_count, + total_count, + has_more, + }, + )) + } } #[async_trait] @@ -217,7 +521,7 @@ where .unwrap_or_else(|| "gpus_runtime_memory".to_string()); let compute_node_expiration_buffer_seconds = - body.compute_node_expiration_buffer_seconds.unwrap_or(60); + body.compute_node_expiration_buffer_seconds.unwrap_or(180); // Default must be >= completion_check_interval_secs + job_completion_poll_interval // to avoid workers exiting before dependent jobs are unblocked. let compute_node_wait_for_new_jobs_seconds = @@ -748,8 +1052,7 @@ where is_archived: Option, context: &C, ) -> Result { - debug!( - "list_workflows({}, {:?}, {:?}, {}, {:?}, {:?}, {:?}, {:?}) - X-Span-ID: {:?}", + self.list_workflows_filtered( offset, sort_by, reverse_sort, @@ -758,246 +1061,10 @@ where user, description, is_archived, - context.get().0.clone() - ); - - // Build base query - join with workflow_status if is_archived filter is needed - let base_query = if is_archived.is_some() { - " - SELECT - w.id - ,w.name - ,w.user - ,w.description - ,w.timestamp - ,w.compute_node_expiration_buffer_seconds - ,w.compute_node_wait_for_new_jobs_seconds - ,w.compute_node_ignore_workflow_completion - ,w.compute_node_wait_for_healthy_database_minutes - ,w.compute_node_min_time_for_new_jobs_seconds - ,w.jobs_sort_method - ,w.resource_monitor_config - ,w.slurm_defaults - ,w.status_id - FROM workflow w - INNER JOIN workflow_status ws ON w.status_id = ws.id - " - .to_string() - } else { - " - SELECT - id - ,name - ,user - ,description - ,timestamp - ,compute_node_expiration_buffer_seconds - ,compute_node_wait_for_new_jobs_seconds - ,compute_node_ignore_workflow_completion - ,compute_node_wait_for_healthy_database_minutes - ,compute_node_min_time_for_new_jobs_seconds - ,jobs_sort_method - ,resource_monitor_config - ,slurm_defaults - ,status_id - FROM workflow - " - .to_string() - }; - - // Build WHERE clause conditions - let mut where_conditions = Vec::new(); - let mut bind_values: Vec + Send>> = Vec::new(); - - // Use table prefix when joining with workflow_status - let table_prefix = if is_archived.is_some() { "w." } else { "" }; - - if let Some(workflow_name) = &name { - where_conditions.push(format!("{}name = ?", table_prefix)); - bind_values.push(Box::new(workflow_name.clone())); - } - - if let Some(workflow_user) = &user { - where_conditions.push(format!("{}user = ?", table_prefix)); - bind_values.push(Box::new(workflow_user.clone())); - } - - if let Some(workflow_description) = &description { - where_conditions.push(format!("{}description LIKE ?", table_prefix)); - bind_values.push(Box::new(format!("%{}%", workflow_description))); - } - - if let Some(archived) = is_archived { - if archived { - where_conditions.push("ws.is_archived = 1".to_string()); - } else { - where_conditions.push("(ws.is_archived IS NULL OR ws.is_archived = 0)".to_string()); - } - } - - let where_clause = if where_conditions.is_empty() { - String::new() - } else { - where_conditions.join(" AND ") - }; - - // Build the complete query with pagination and sorting - // Use table prefix for default sort column when joining - let default_sort_column = if is_archived.is_some() { "w.id" } else { "id" }; - - // Add table prefix to sort_by column if joining and column is ambiguous - let prefixed_sort_by = if is_archived.is_some() { - sort_by.as_ref().map(|col| { - // If the column is "id" (ambiguous), prefix it with "w." - // For other workflow columns, also add prefix to be consistent - if col == "id" || !col.contains('.') { - format!("w.{}", col) - } else { - col.clone() - } - }) - } else { - sort_by.clone() - }; - - let query = if where_clause.is_empty() { - SqlQueryBuilder::new(base_query) - .with_pagination_and_sorting( - offset, - limit, - prefixed_sort_by, - reverse_sort, - default_sort_column, - ) - .build() - } else { - SqlQueryBuilder::new(base_query) - .with_where(where_clause.clone()) - .with_pagination_and_sorting( - offset, - limit, - prefixed_sort_by, - reverse_sort, - default_sort_column, - ) - .build() - }; - - debug!("Executing query: {}", query); - - // Execute the query - let mut sqlx_query = sqlx::query(&query); - - // Bind optional parameters in order - if let Some(workflow_name) = &name { - sqlx_query = sqlx_query.bind(workflow_name); - } - if let Some(workflow_user) = &user { - sqlx_query = sqlx_query.bind(workflow_user); - } - if let Some(workflow_description) = &description { - sqlx_query = sqlx_query.bind(format!("%{}%", workflow_description)); - } - - let records = match sqlx_query.fetch_all(self.context.pool.as_ref()).await { - Ok(recs) => recs, - Err(e) => { - error!("Database error: {}", e); - return Err(database_error(e)); - } - }; - - let mut items: Vec = Vec::new(); - for record in records { - let jobs_sort_method_str: String = record.get("jobs_sort_method"); - let sort_method = jobs_sort_method_str - .parse::() - .ok(); - items.push(models::WorkflowModel { - id: Some(record.get("id")), - name: record.get("name"), - user: record.get("user"), - description: record.get("description"), - timestamp: Some(record.get("timestamp")), - compute_node_expiration_buffer_seconds: Some( - record.get("compute_node_expiration_buffer_seconds"), - ), - compute_node_wait_for_new_jobs_seconds: Some( - record.get("compute_node_wait_for_new_jobs_seconds"), - ), - compute_node_ignore_workflow_completion: Some( - record.get::("compute_node_ignore_workflow_completion") != 0, - ), - compute_node_wait_for_healthy_database_minutes: Some( - record.get("compute_node_wait_for_healthy_database_minutes"), - ), - compute_node_min_time_for_new_jobs_seconds: Some( - record.get("compute_node_min_time_for_new_jobs_seconds"), - ), - jobs_sort_method: sort_method, - resource_monitor_config: record.get("resource_monitor_config"), - slurm_defaults: record.get("slurm_defaults"), - use_pending_failed: record - .try_get::, _>("use_pending_failed") - .ok() - .flatten() - .map(|v| v != 0), - status_id: Some(record.get("status_id")), - }); - } - - // For proper pagination, we should get the total count without LIMIT/OFFSET - let count_base_query = if is_archived.is_some() { - "SELECT COUNT(*) as total FROM workflow w INNER JOIN workflow_status ws ON w.status_id = ws.id" - } else { - "SELECT COUNT(*) as total FROM workflow" - }; - let count_query = if where_clause.is_empty() { - count_base_query.to_string() - } else { - format!("{} WHERE {}", count_base_query, where_clause) - }; - - let mut count_sqlx_query = sqlx::query(&count_query); - if let Some(workflow_name) = &name { - count_sqlx_query = count_sqlx_query.bind(workflow_name); - } - if let Some(workflow_user) = &user { - count_sqlx_query = count_sqlx_query.bind(workflow_user); - } - if let Some(workflow_description) = &description { - count_sqlx_query = count_sqlx_query.bind(format!("%{}%", workflow_description)); - } - - let total_count = match count_sqlx_query.fetch_one(self.context.pool.as_ref()).await { - Ok(row) => row.get::("total"), - Err(e) => { - error!("Database error getting count: {}", e); - return Err(database_error(e)); - } - }; - - let current_count = items.len() as i64; - let offset_val = offset; - let has_more = offset_val + current_count < total_count; - - debug!( - "list_workflows({}/{}) - X-Span-ID: {:?}", - current_count, - total_count, - context.get().0.clone() - ); - - Ok(ListWorkflowsResponse::SuccessfulResponse( - models::ListWorkflowsResponse { - items: Some(items), - offset: offset_val, - max_limit: MAX_RECORD_TRANSFER_COUNT, - count: current_count, - total_count, - has_more, - }, - )) + None, // no access control filtering + context, + ) + .await } /// Update a workflow. diff --git a/tests/test_access_groups.rs b/tests/test_access_groups.rs index 82c57af5..067a1b4e 100644 --- a/tests/test_access_groups.rs +++ b/tests/test_access_groups.rs @@ -1521,3 +1521,105 @@ fn test_comprehensive_access_control_workflow_execution( "shared_user should see job names" ); } + +/// Test that `workflows list --all-users` with access control returns only workflows +/// the authenticated user can access (owned + group-shared), not all workflows. +#[rstest] +fn test_workflows_list_all_users_with_access_control( + start_server_with_access_control: &AccessControlServerProcess, +) { + let admin_config = &start_server_with_access_control.config; + let password = "password"; + + // Create configs for different users + let wf_user_config = config_with_auth(admin_config, "wf-user"); + let wf_user_2_config = config_with_auth(admin_config, "wf-user-2"); + let wf_user_3_config = config_with_auth(admin_config, "wf-user-3"); + + let counter = TEST_COUNTER.fetch_add(1, Ordering::SeqCst); + + // Create workflow A owned by "wf-user" + let wf_a = create_workflow_with_user( + &wf_user_config, + &format!("all-users-test-wf-a-{}", counter), + "wf-user", + ); + let wf_a_id = wf_a.id.unwrap(); + + // Create workflow B owned by "wf-user-2" + let wf_b = create_workflow_with_user( + &wf_user_2_config, + &format!("all-users-test-wf-b-{}", counter), + "wf-user-2", + ); + let wf_b_id = wf_b.id.unwrap(); + + // Create workflow C owned by "wf-user-3" (not shared with wf-user) + let wf_c = create_workflow_with_user( + &wf_user_3_config, + &format!("all-users-test-wf-c-{}", counter), + "wf-user-3", + ); + let wf_c_id = wf_c.id.unwrap(); + + // Create an access group and add wf-user to it (admin can create groups) + let group = models::AccessGroupModel { + id: None, + name: format!("all-users-test-group-{}", counter), + description: Some("Test group for all-users listing".to_string()), + created_at: None, + }; + let created_group = + default_api::create_access_group(admin_config, group).expect("Failed to create group"); + let group_id = created_group.id.unwrap(); + + // Add wf-user to the group + let membership = models::UserGroupMembershipModel { + id: None, + user_name: "wf-user".to_string(), + group_id, + role: "member".to_string(), + created_at: None, + }; + default_api::add_user_to_group(admin_config, group_id, membership) + .expect("Failed to add wf-user to group"); + + // Share workflow B with the group (so wf-user can access it) + default_api::add_workflow_to_group(admin_config, wf_b_id, group_id) + .expect("Failed to share workflow B with group"); + + // Verify wf-user can see workflow A (owned) and workflow B (group-shared) + // but NOT workflow C (no access) + let output = run_cli_command_with_auth( + &["--format", "json", "workflows", "list", "--all-users"], + start_server_with_access_control, + "wf-user", + password, + ) + .expect("Failed to run workflows list --all-users as wf-user"); + + let json_output: serde_json::Value = + serde_json::from_str(&output).expect("Failed to parse JSON output"); + let workflows_array = json_output.as_array().expect("Expected JSON array"); + + let found_ids: Vec = workflows_array + .iter() + .filter_map(|w| w.get("id").and_then(|id| id.as_i64())) + .collect(); + + assert!( + found_ids.contains(&wf_a_id), + "wf-user should see workflow A (owned), found_ids={:?}", + found_ids + ); + assert!( + found_ids.contains(&wf_b_id), + "wf-user should see workflow B (group-shared), found_ids={:?}", + found_ids + ); + assert!( + !found_ids.contains(&wf_c_id), + "wf-user should NOT see workflow C (no access), found_ids={:?}", + found_ids + ); +} diff --git a/tests/test_workflows.rs b/tests/test_workflows.rs index 8808ccd2..773e1876 100644 --- a/tests/test_workflows.rs +++ b/tests/test_workflows.rs @@ -7,6 +7,7 @@ use common::{ use rstest::rstest; use serde_json::json; use torc::client::default_api; +use torc::models; #[rstest] fn test_workflows_add_command_json(start_server: &ServerProcess) { @@ -1382,3 +1383,101 @@ fn test_archive_multiple_workflows(start_server: &ServerProcess) { ); } } + +#[rstest] +fn test_workflows_list_all_users_no_auth(start_server: &ServerProcess) { + let config = &start_server.config; + + // Create workflows for 3 different users + let wf_a = default_api::create_workflow( + config, + models::WorkflowModel::new( + "all_users_test_wf_a".to_string(), + "all_users_user_a".to_string(), + ), + ) + .expect("Failed to create workflow for user_a"); + + let wf_b = default_api::create_workflow( + config, + models::WorkflowModel::new( + "all_users_test_wf_b".to_string(), + "all_users_user_b".to_string(), + ), + ) + .expect("Failed to create workflow for user_b"); + + let wf_c = default_api::create_workflow( + config, + models::WorkflowModel::new( + "all_users_test_wf_c".to_string(), + "all_users_user_c".to_string(), + ), + ) + .expect("Failed to create workflow for user_c"); + + // Run `torc workflows list --all-users` with JSON output + let args = ["workflows", "list", "--all-users"]; + let json_output = run_cli_with_json(&args, start_server, Some("all_users_user_a")) + .expect("Failed to run workflows list --all-users"); + + // Should return an array containing workflows from all users + assert!( + json_output.is_array(), + "Workflows list should return an array" + ); + let workflows_array = json_output.as_array().unwrap(); + + // Find our test workflows by ID + let wf_a_id = wf_a.id.unwrap(); + let wf_b_id = wf_b.id.unwrap(); + let wf_c_id = wf_c.id.unwrap(); + + let found_ids: Vec = workflows_array + .iter() + .filter_map(|w| w.get("id").and_then(|id| id.as_i64())) + .collect(); + + assert!( + found_ids.contains(&wf_a_id), + "Should contain user_a's workflow (id={})", + wf_a_id + ); + assert!( + found_ids.contains(&wf_b_id), + "Should contain user_b's workflow (id={})", + wf_b_id + ); + assert!( + found_ids.contains(&wf_c_id), + "Should contain user_c's workflow (id={})", + wf_c_id + ); + + // Verify each workflow has a user field + for wf in workflows_array { + assert!( + wf.get("user").is_some(), + "Each workflow should have a 'user' field" + ); + } + + // Without --all-users, user_a should only see their own workflows + let args_no_all = ["workflows", "list"]; + let json_filtered = run_cli_with_json(&args_no_all, start_server, Some("all_users_user_a")) + .expect("Failed to run workflows list without --all-users"); + + let filtered_array = json_filtered.as_array().unwrap(); + let filtered_users: Vec<&str> = filtered_array + .iter() + .filter_map(|w| w.get("user").and_then(|u| u.as_str())) + .collect(); + + // All returned workflows should belong to user_a + for u in &filtered_users { + assert_eq!( + *u, "all_users_user_a", + "Without --all-users, should only return current user's workflows" + ); + } +} diff --git a/torc-server/src/server.rs b/torc-server/src/server.rs index b3dfdc1d..657a049c 100644 --- a/torc-server/src/server.rs +++ b/torc-server/src/server.rs @@ -2704,8 +2704,30 @@ where context: &C, ) -> Result { let (processed_offset, processed_limit) = process_pagination_params(offset, limit)?; + + // When access control is enforced and no user filter is provided, + // restrict results to workflows the authenticated user can access. + let accessible_ids = if self.authorization_service.is_enforced() && user.is_none() { + let auth: Option = Has::>::get(context).clone(); + match self + .authorization_service + .get_accessible_workflow_ids(&auth) + .await + { + Ok(ids) => ids, + Err(e) => { + return Err(ApiError(format!( + "Failed to get accessible workflows: {}", + e + ))); + } + } + } else { + None + }; + self.workflows_api - .list_workflows( + .list_workflows_filtered( processed_offset, sort_by, reverse_sort, @@ -2714,6 +2736,7 @@ where user, description, is_archived, + accessible_ids, context, ) .await diff --git a/torc-slurm-job-runner/src/main.rs b/torc-slurm-job-runner/src/main.rs index d78ec174..bcd43bec 100644 --- a/torc-slurm-job-runner/src/main.rs +++ b/torc-slurm-job-runner/src/main.rs @@ -171,7 +171,8 @@ mod unix_main { let expiration_buffer_seconds = workflow .compute_node_expiration_buffer_seconds - .unwrap_or(300); + .unwrap_or(180) + .max(120); info!("Expiration buffer seconds: {}", expiration_buffer_seconds); let job_end_time = match slurm_interface.get_job_end_time() {