Add project and metadata fields to a workflow#105
Conversation
There was a problem hiding this comment.
Pull request overview
Adds first-class project and metadata fields to workflows end-to-end (DB → server API → SDKs/CLI → docs), enabling grouping/categorization and attaching arbitrary JSON metadata.
Changes:
- Add
project+metadatacolumns to theworkflowtable and plumb them through create/list/get/update. - Extend Rust model + WorkflowSpec + CLI commands to accept/display the new fields (including JSON output parsing for
metadata). - Add integration tests and new documentation pages describing usage.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
migrations/20260206000000_add_metadata_and_project.up.sql |
Adds project and metadata columns to workflow. |
migrations/20260206000000_add_metadata_and_project.down.sql |
Removes the added columns on rollback. |
src/server/api/workflows.rs |
Includes project/metadata in SELECT/INSERT/UPDATE and response mapping. |
src/models.rs |
Extends WorkflowModel with project/metadata + parsing/defaults updates. |
src/client/workflow_spec.rs |
Supports specifying project/metadata in workflow specs and forwards them on create. |
src/client/commands/workflows.rs |
Adds CLI flags for update and shows project/metadata in list output; parses metadata for JSON output. |
api/openapi.yaml |
Adds project/metadata to workflow_model schema. |
python_client/.../workflow_model.py |
Adds new fields to Python SDK model (regen output). |
julia_client/.../WorkflowModel.md |
Documents new fields in Julia SDK docs (regen output). |
julia_client/.../model_WorkflowModel.jl |
Adds new fields to Julia SDK model (regen output). |
docs/src/core/workflows/organizing-workflows.md |
New user guide for organizing workflows using project/metadata. |
docs/src/core/workflows/index.md |
Links the new organizing workflows doc. |
docs/src/core/reference/workflow-spec.md |
Documents project/metadata in the workflow spec reference + examples. |
tests/test_workflow_metadata_project.rs |
Adds integration tests for create/update behavior of the new fields. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 24 out of 24 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 31 out of 31 changed files in this pull request and generated 13 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 30 out of 30 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 32 out of 32 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
torc-dash/src/main.rs
Outdated
| // Couldn't extract workflow ID, delete temp file | ||
| let _ = tokio::fs::remove_file(&temp_path).await; |
There was a problem hiding this comment.
The inline-spec create path deletes the spec file if extract_workflow_id(&result.stdout) fails, even when workflow creation succeeded. This regresses previous behavior where the spec would still be saved (just without an ID suffix) and makes success dependent on CLI stdout formatting. Consider keeping the file (e.g., fall back to a name-only filename or leave the UUID filename) and returning/logging the actual saved path.
| // Couldn't extract workflow ID, delete temp file | |
| let _ = tokio::fs::remove_file(&temp_path).await; | |
| // Couldn't extract workflow ID; keep the temporary spec file so it isn't lost | |
| info!( | |
| "Workflow created but could not extract ID from CLI output; keeping spec at: {}", | |
| temp_path | |
| ); |
| let final_path = format!("{}_{}{}", sanitized_name, workflow_id, file_extension); | ||
| let _ = tokio::fs::rename(&temp_path, &final_path).await; | ||
| info!("Saved workflow spec to: {}", final_path); | ||
| } else { |
There was a problem hiding this comment.
tokio::fs::rename(&temp_path, &final_path).await ignores errors but still logs "Saved workflow spec to: ...". If rename fails (permissions, cross-device, collision), the log is misleading and the temp file may be left behind. Handle the rename result explicitly (log errors, and decide whether to keep/remove the temp file or fall back to copying).
| // Apply CPU violation fix | ||
| // Note: CPU corrections use memory_multiplier (capacity multiplier) like memory corrections, | ||
| // not runtime_multiplier (time multiplier). This ensures consistent safety margins for resources. | ||
| if adjustment.has_cpu_violation | ||
| && let Some(max_peak_cpu) = adjustment.max_peak_cpu_percent | ||
| { | ||
| // peak_cpu_percent is the total percentage for all CPUs | ||
| // e.g., 501.4% with 3 CPUs allocated (300%) | ||
| // Calculate required CPUs using capacity multiplier for safety margin | ||
| let required_cpus = (max_peak_cpu / 100.0 * memory_multiplier).ceil() as i64; | ||
| let new_cpus = std::cmp::max(required_cpus, 1); // At least 1 CPU | ||
|
|
||
| if new_cpus > adjustment.current_cpus { | ||
| let job_count = adjustment.job_ids.len(); | ||
| if job_count > 1 { | ||
| info!( | ||
| "{} job(s) with RR {}: CPU over-utilization detected, peak {}% -> allocating {} CPUs ({:.1}x safety margin)", | ||
| job_count, rr_id, max_peak_cpu, new_cpus, memory_multiplier | ||
| ); | ||
| } else if let (Some(job_id), Some(job_name)) = | ||
| (adjustment.job_ids.first(), adjustment.job_names.first()) | ||
| { | ||
| info!( | ||
| "Job {} ({}): CPU over-utilization detected, peak {}% -> allocating {} CPUs ({:.1}x safety margin)", | ||
| job_id, job_name, max_peak_cpu, new_cpus, memory_multiplier | ||
| ); | ||
| } | ||
|
|
||
| new_rr.num_cpus = new_cpus; | ||
| updated = true; | ||
| } | ||
| } | ||
|
|
||
| // Update resource requirements if changed (only once per rr_id) | ||
| #[allow(clippy::collapsible_if)] | ||
| if updated { | ||
| if !dry_run { | ||
| if let Err(e) = default_api::update_resource_requirements(config, rr_id, new_rr) { | ||
| warn!( | ||
| "Warning: failed to update resource requirements {}: {}", | ||
| rr_id, e | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| // Create adjustment report for JSON output | ||
| adjustment_reports.push(ResourceAdjustmentReport { | ||
| resource_requirements_id: rr_id, | ||
| job_ids: adjustment.job_ids.clone(), | ||
| job_names: adjustment.job_names.clone(), | ||
| memory_adjusted, | ||
| original_memory, | ||
| new_memory: new_memory_str, | ||
| max_peak_memory_bytes: adjustment.max_peak_memory_bytes, | ||
| runtime_adjusted, | ||
| original_runtime, | ||
| new_runtime: new_runtime_str, | ||
| }); |
There was a problem hiding this comment.
ResourceAdjustmentReport / ResourceCorrectionResult don't capture CPU adjustments, but apply_resource_corrections can update new_rr.num_cpus. As a result, CPU corrections are applied silently (no counts, no JSON fields, and the CLI summary can't report them). Add CPU fields (e.g., cpu_adjusted, original_cpus, new_cpus, max_peak_cpu_percent) and a cpu_corrections counter so dry-run/JSON output accurately reflects what changed.
| @@ -408,24 +415,29 @@ fn check_resource_utilization( | |||
| likely_timeout, | |||
| timeout_reason, | |||
| runtime_utilization, | |||
| likely_cpu_violation: false, | |||
| peak_cpu_percent: None, | |||
| likely_runtime_violation: false, | |||
| }); | |||
There was a problem hiding this comment.
ResourceViolationInfo now has likely_cpu_violation, peak_cpu_percent, and likely_runtime_violation, but check_resource_utilization always sets them to false/None. This makes the new fields misleading/inconsistent with the rest of the report (which already analyzes CPU/runtime over-utilization). Populate these fields based on result.peak_cpu_percent and runtime comparisons (similar to how memory/timeout are derived), or remove them from this code path until they're supported.
torc-dash/src/main.rs
Outdated
| let _ = tokio::fs::rename(&temp_path, &final_path).await; | ||
| info!("Saved workflow spec to: {}", final_path); | ||
| } else { | ||
| // Couldn't extract workflow ID, delete temp file | ||
| let _ = tokio::fs::remove_file(&temp_path).await; |
There was a problem hiding this comment.
Same issue as cli_create_handler: on success, if extract_workflow_id(&result.stdout) fails, the spec file is deleted, and rename errors are ignored while still logging a successful save. This makes successful creates depend on stdout parsing and can leave behind temp files or misreport the saved path. Consider preserving the file on ID parse failure and handling rename errors explicitly.
| let _ = tokio::fs::rename(&temp_path, &final_path).await; | |
| info!("Saved workflow spec to: {}", final_path); | |
| } else { | |
| // Couldn't extract workflow ID, delete temp file | |
| let _ = tokio::fs::remove_file(&temp_path).await; | |
| match tokio::fs::rename(&temp_path, &final_path).await { | |
| Ok(()) => { | |
| info!("Saved workflow spec to: {}", final_path); | |
| } | |
| Err(e) => { | |
| info!( | |
| "Workflow created (ID: {}) but failed to move spec from '{}' to '{}': {}. \ | |
| The spec remains at the temporary path.", | |
| workflow_id, temp_path, final_path, e | |
| ); | |
| } | |
| } | |
| } else { | |
| // Couldn't extract workflow ID; keep temp file so the spec isn't lost | |
| info!( | |
| "Workflow created but could not extract workflow ID from CLI output. \ | |
| Keeping workflow spec at temporary path: {}", | |
| temp_path | |
| ); |
| std::process::exit(1); | ||
| } | ||
| } | ||
| print_json_wrapped("workflows", &workflows_json, "workflows"); |
There was a problem hiding this comment.
workflows list -f json output changed from a top-level JSON array to an object wrapper ({"workflows": [...]}) via print_json_wrapped. This is a breaking change for existing consumers and will also break several existing tests/helpers that deserialize the output as an array (e.g., tests/test_workflows.rs). Either keep the legacy array output for this command or update all callers/tests and document the new wrapped shape consistently across list commands.
| print_json_wrapped("workflows", &workflows_json, "workflows"); | |
| // Preserve legacy behavior: output a top-level JSON array of workflows | |
| let json_output = serde_json::to_string_pretty(&workflows_json) | |
| .unwrap_or_else(|_| "[]".to_string()); | |
| println!("{}", json_output); |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 33 out of 33 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Spec is inline content - write to current directory with random name | ||
| let unique_id = uuid::Uuid::new_v4(); | ||
| let temp_path = format!("/tmp/torc_spec_{}{}", unique_id, file_extension); | ||
| if let Err(e) = tokio::fs::write(&temp_path, &req.spec).await { | ||
| let temp_path = format!("torc_spec_{}{}", unique_id, file_extension); | ||
| if let Err(e) = tokio::fs::write(&temp_path, &spec_content).await { | ||
| return Json(CliResponse { |
There was a problem hiding this comment.
Inline specs are now written into the server process working directory and preserved on success. This can break deployments where the CWD is not writable, and it can accumulate files over time. Consider using std::env::temp_dir() (like cli_validate_handler) and/or making persistence explicit/configurable (e.g., a configured export directory) while still cleaning up by default.
| // Spec is inline content - write to current directory with random name | ||
| let unique_id = uuid::Uuid::new_v4(); | ||
| let temp_path = format!("/tmp/torc_spec_{}{}", unique_id, file_extension); | ||
| if let Err(e) = tokio::fs::write(&temp_path, &req.spec).await { | ||
| let temp_path = format!("torc_spec_{}{}", unique_id, file_extension); | ||
| if let Err(e) = tokio::fs::write(&temp_path, &spec_content).await { | ||
| return Json(CliResponse { |
There was a problem hiding this comment.
Same as cli_create_handler: writing inline specs to the current working directory and preserving them on success can fail if the service CWD is not writable and can leak/accumulate specs on the server. Align with the temp-file approach used elsewhere (e.g., cli_validate_handler) or store in a dedicated configurable directory.
src/client/resource_correction.rs
Outdated
| /// Parse memory string (e.g., "8g", "512m", "1024k", "512b") to bytes | ||
| pub fn parse_memory_bytes(mem: &str) -> Option<u64> { | ||
| let mem = mem.trim().to_lowercase(); | ||
| let (num_str, multiplier) = if mem.ends_with("gb") { | ||
| (mem.trim_end_matches("gb"), 1024u64 * 1024 * 1024) | ||
| } else if mem.ends_with("g") { | ||
| (mem.trim_end_matches("g"), 1024u64 * 1024 * 1024) | ||
| } else if mem.ends_with("mb") { | ||
| (mem.trim_end_matches("mb"), 1024u64 * 1024) | ||
| } else if mem.ends_with("m") { | ||
| (mem.trim_end_matches("m"), 1024u64 * 1024) | ||
| } else if mem.ends_with("kb") { | ||
| (mem.trim_end_matches("kb"), 1024u64) | ||
| } else if mem.ends_with("k") { | ||
| (mem.trim_end_matches("k"), 1024u64) | ||
| } else if mem.ends_with("b") { | ||
| (mem.trim_end_matches("b"), 1u64) | ||
| } else { | ||
| (mem.as_str(), 1u64) | ||
| }; | ||
| num_str | ||
| .parse::<f64>() | ||
| .ok() | ||
| .map(|n| (n * multiplier as f64) as u64) | ||
| } |
There was a problem hiding this comment.
parse_memory_bytes duplicates memory_utils::memory_string_to_bytes and currently doesn't support common units like t/T (and will return None for inputs like 1t). Also, converting f64 to u64 via cast truncates, which can under-allocate after parsing (especially with decimals / floating rounding). Consider reusing memory_string_to_bytes (or extending it) and rounding up (ceil) when converting parsed values to bytes.
| /// Result of applying resource corrections | ||
| #[derive(Debug, Clone, Serialize)] | ||
| pub struct ResourceCorrectionResult { | ||
| pub resource_requirements_updated: usize, | ||
| pub jobs_analyzed: usize, | ||
| pub memory_corrections: usize, | ||
| pub runtime_corrections: usize, | ||
| /// Detailed adjustment reports for JSON output |
There was a problem hiding this comment.
CPU corrections are applied (cpu_adjusted, new_cpus) but ResourceCorrectionResult doesn't expose any CPU correction counts, and the CLI summary currently only reports memory/runtime. Consider adding a cpu_corrections (or similar) field and incrementing it when CPU adjustments occur so both JSON and human output can accurately summarize what changed.
| println!(); | ||
| println!("Resource Correction Summary:"); | ||
| println!(" Workflow: {}", selected_workflow_id); | ||
| println!(" Jobs analyzed: {}", result.jobs_analyzed); | ||
| println!( | ||
| " Resource requirements updated: {}", | ||
| result.resource_requirements_updated | ||
| ); | ||
| println!(" Memory corrections: {}", result.memory_corrections); | ||
| println!(" Runtime corrections: {}", result.runtime_corrections); | ||
|
|
||
| // Print details if any corrections were made | ||
| if !result.adjustments.is_empty() { | ||
| println!(); | ||
| println!("Adjustment Details:"); | ||
| for adj in &result.adjustments { | ||
| println!( | ||
| " RR {}: {} job(s)", | ||
| adj.resource_requirements_id, | ||
| adj.job_ids.len() | ||
| ); | ||
| if let (Some(old_mem), Some(new_mem)) = | ||
| (&adj.original_memory, &adj.new_memory) | ||
| { | ||
| println!(" Memory: {} -> {}", old_mem, new_mem); | ||
| } | ||
| if let (Some(old_rt), Some(new_rt)) = | ||
| (&adj.original_runtime, &adj.new_runtime) | ||
| { | ||
| println!(" Runtime: {} -> {}", old_rt, new_rt); | ||
| } |
There was a problem hiding this comment.
The correct-resources output summary/details omit CPU adjustments even though the correction logic can update num_cpus. This makes it hard to tell what changed in non-JSON mode; consider printing CPU changes alongside memory/runtime when adj.cpu_adjusted is true.
| /// Correct resource requirements based on actual job usage (proactive optimization) | ||
| /// | ||
| /// Analyzes completed jobs and adjusts resource requirements to better match actual usage. | ||
| /// Unlike `torc recover`, this command does NOT reset or rerun jobs - it only updates | ||
| /// resource requirements for future runs. | ||
| #[command( | ||
| name = "correct-resources", | ||
| after_long_help = "\ | ||
| EXAMPLES: | ||
| # Preview corrections (dry-run) | ||
| torc workflows correct-resources 123 --dry-run | ||
|
|
||
| # Apply corrections to all over-utilized jobs | ||
| torc workflows correct-resources 123 | ||
|
|
||
| # Apply corrections only to specific jobs | ||
| torc workflows correct-resources 123 --job-ids 45,67,89 | ||
|
|
||
| # Use custom multipliers | ||
| torc workflows correct-resources 123 --memory-multiplier 1.5 --runtime-multiplier 1.4 | ||
|
|
||
| # Output as JSON for programmatic use | ||
| torc -f json workflows correct-resources 123 --dry-run | ||
| " | ||
| )] | ||
| CorrectResources { | ||
| /// ID of the workflow to analyze (optional - will prompt if not provided) | ||
| #[arg()] | ||
| workflow_id: Option<i64>, | ||
| /// Memory multiplier for jobs that exceeded memory (default: 1.2) | ||
| #[arg(long, default_value = "1.2")] | ||
| memory_multiplier: f64, | ||
| /// Runtime multiplier for jobs that exceeded runtime (default: 1.2) | ||
| #[arg(long, default_value = "1.2")] | ||
| runtime_multiplier: f64, | ||
| /// Only correct resource requirements for specific jobs (comma-separated IDs) | ||
| #[arg(long, value_delimiter = ',')] | ||
| job_ids: Option<Vec<i64>>, | ||
| /// Show what would be changed without applying (default: false) | ||
| #[arg(long)] | ||
| dry_run: bool, | ||
| }, |
There was a problem hiding this comment.
New workflows correct-resources command introduces substantial behavior (including DB updates). The repo has many CLI integration tests for workflows, but there doesn't appear to be a CLI-level test exercising this subcommand (dry-run vs apply, JSON output shape, and that CPU/memory/runtime adjustments are reflected). Consider adding an integration test similar to the other tests/test_workflows*.rs coverage.
| let over_count = json | ||
| .get("over_utilization_count") | ||
| .and_then(|v| v.as_i64()) | ||
| .unwrap_or(0); | ||
| let failed_count = json | ||
| .get("failed_jobs") | ||
| let resource_violations_count = json | ||
| .get("resource_violations") | ||
| .and_then(|v| v.as_array()) | ||
| .map(|a| a.len()) | ||
| .unwrap_or(0); | ||
|
|
||
| if over_count > 0 { | ||
| response.push_str("\n\n[RECOVERABLE RESOURCE ISSUES DETECTED!"); | ||
| response.push_str(&format!( | ||
| "\n{} job(s) exceeded resource limits (OOM or timeout).", | ||
| "\n{} job(s) exceeded their resource allocations.", | ||
| over_count | ||
| )); |
There was a problem hiding this comment.
over_utilization_count comes from the CLI report and is a count of violations/rows, not unique jobs (a single job can contribute Memory + CPU + Runtime). The message currently says "job(s)", which can be misleading; consider either deduplicating by job_id before printing or changing the wording to "violation(s)".
Uh oh!
There was an error while loading. Please reload this page.