diff --git a/src/agent/agent_loop.rs b/src/agent/agent_loop.rs index aaaad879d..d924f1ee9 100644 --- a/src/agent/agent_loop.rs +++ b/src/agent/agent_loop.rs @@ -97,6 +97,8 @@ pub struct AgentDeps { pub transcription: Option>, /// Document text extraction middleware for PDF, DOCX, PPTX, etc. pub document_extraction: Option>, + /// Software builder for self-repair tool rebuilding. + pub builder: Option>, } /// The main agent that coordinates all components. @@ -285,11 +287,18 @@ impl Agent { let mut message_stream = self.channels.start_all().await?; // Start self-repair task with notification forwarding - let repair = Arc::new(DefaultSelfRepair::new( + let mut self_repair = DefaultSelfRepair::new( self.context_manager.clone(), self.config.stuck_threshold, self.config.max_repair_attempts, - )); + ); + if let Some(ref store) = self.deps.store { + self_repair = self_repair.with_store(Arc::clone(store)); + } + if let Some(ref builder) = self.deps.builder { + self_repair = self_repair.with_builder(Arc::clone(builder), Arc::clone(self.tools())); + } + let repair = Arc::new(self_repair); let repair_interval = self.config.repair_check_interval; let repair_channels = self.channels.clone(); let repair_owner_id = self.owner_id().to_string(); diff --git a/src/agent/dispatcher.rs b/src/agent/dispatcher.rs index 9be0d654d..49387e835 100644 --- a/src/agent/dispatcher.rs +++ b/src/agent/dispatcher.rs @@ -1197,6 +1197,7 @@ mod tests { http_interceptor: None, transcription: None, document_extraction: None, + builder: None, }; Agent::new( @@ -2037,6 +2038,7 @@ mod tests { http_interceptor: None, transcription: None, document_extraction: None, + builder: None, }; Agent::new( @@ -2155,6 +2157,7 @@ mod tests { http_interceptor: None, transcription: None, document_extraction: None, + builder: None, }; Agent::new( diff --git a/src/agent/self_repair.rs b/src/agent/self_repair.rs index a67fe23eb..db491194f 100644 --- a/src/agent/self_repair.rs +++ b/src/agent/self_repair.rs @@ -66,14 +66,10 @@ pub trait SelfRepair: Send + Sync { /// Default self-repair implementation. pub struct DefaultSelfRepair { context_manager: Arc, - // TODO: use for time-based stuck detection (currently only max_repair_attempts is checked) - #[allow(dead_code)] stuck_threshold: Duration, max_repair_attempts: u32, store: Option>, builder: Option>, - // TODO: use for tool hot-reload after repair - #[allow(dead_code)] tools: Option>, } @@ -95,15 +91,13 @@ impl DefaultSelfRepair { } /// Add a Store for tool failure tracking. - #[allow(dead_code)] // TODO: wire up in main.rs when persistence is needed - pub(crate) fn with_store(mut self, store: Arc) -> Self { + pub fn with_store(mut self, store: Arc) -> Self { self.store = Some(store); self } /// Add a Builder and ToolRegistry for automatic tool repair. - #[allow(dead_code)] // TODO: wire up in main.rs when auto-repair is needed - pub(crate) fn with_builder( + pub fn with_builder( mut self, builder: Arc, tools: Arc, @@ -124,18 +118,30 @@ impl SelfRepair for DefaultSelfRepair { if let Ok(ctx) = self.context_manager.get_context(job_id).await && ctx.state == JobState::Stuck { - let stuck_duration = ctx - .started_at - .map(|start| { - let now = Utc::now(); - let duration = now.signed_duration_since(start); + // Measure stuck_duration from the most recent Stuck transition, + // not from started_at (which reflects when the job first ran). + let stuck_since = ctx + .transitions + .iter() + .rev() + .find(|t| t.to == JobState::Stuck) + .map(|t| t.timestamp); + + let stuck_duration = stuck_since + .map(|ts| { + let duration = Utc::now().signed_duration_since(ts); Duration::from_secs(duration.num_seconds().max(0) as u64) }) .unwrap_or_default(); + // Only report jobs that have been stuck long enough + if stuck_duration < self.stuck_threshold { + continue; + } + stuck_jobs.push(StuckJob { job_id, - last_activity: ctx.started_at.unwrap_or(ctx.created_at), + last_activity: stuck_since.unwrap_or(ctx.created_at), stuck_duration, last_error: None, repair_attempts: ctx.repair_attempts, @@ -273,9 +279,8 @@ impl SelfRepair for DefaultSelfRepair { tracing::warn!("Failed to mark tool as repaired: {}", e); } - // Log if the tool was auto-registered if result.registered { - tracing::info!("Repaired tool '{}' auto-registered", tool.name); + tracing::info!("Repaired tool '{}' auto-registered by builder", tool.name); } Ok(RepairResult::Success { @@ -417,7 +422,8 @@ mod tests { .unwrap() .unwrap(); - let repair = DefaultSelfRepair::new(cm, Duration::from_secs(60), 3); + // Use zero threshold so the just-stuck job is detected immediately. + let repair = DefaultSelfRepair::new(cm, Duration::from_secs(0), 3); let stuck = repair.detect_stuck_jobs().await; assert_eq!(stuck.len(), 1); assert_eq!(stuck[0].job_id, job_id); @@ -483,6 +489,98 @@ mod tests { ); } + #[tokio::test] + async fn detect_stuck_jobs_filters_by_threshold() { + let cm = Arc::new(ContextManager::new(10)); + let job_id = cm.create_job("Stuck job", "desc").await.unwrap(); + + // Transition to InProgress, then to Stuck. + cm.update_context(job_id, |ctx| ctx.transition_to(JobState::InProgress, None)) + .await + .unwrap() + .unwrap(); + cm.update_context(job_id, |ctx| { + ctx.transition_to(JobState::Stuck, Some("timed out".to_string())) + }) + .await + .unwrap() + .unwrap(); + + // Use a very large threshold (1 hour). Job just became stuck, so + // stuck_duration < threshold. It should be filtered out. + let repair = DefaultSelfRepair::new(cm, Duration::from_secs(3600), 3); + let stuck = repair.detect_stuck_jobs().await; + assert!( + stuck.is_empty(), + "Job stuck for <1s should be filtered by 1h threshold" + ); + } + + #[tokio::test] + async fn detect_stuck_jobs_includes_when_over_threshold() { + let cm = Arc::new(ContextManager::new(10)); + let job_id = cm.create_job("Stuck job", "desc").await.unwrap(); + + // Transition to InProgress, then to Stuck. + cm.update_context(job_id, |ctx| ctx.transition_to(JobState::InProgress, None)) + .await + .unwrap() + .unwrap(); + cm.update_context(job_id, |ctx| { + ctx.transition_to(JobState::Stuck, Some("timed out".to_string())) + }) + .await + .unwrap() + .unwrap(); + + // Use a zero threshold -- any stuck duration should be included. + let repair = DefaultSelfRepair::new(cm, Duration::from_secs(0), 3); + let stuck = repair.detect_stuck_jobs().await; + assert_eq!(stuck.len(), 1, "Job should be detected with zero threshold"); + assert_eq!(stuck[0].job_id, job_id); + } + + /// Regression: stuck_duration must be measured from the Stuck transition, + /// not from started_at. A job that ran for 2 hours before becoming stuck + /// should NOT immediately exceed a 5-minute threshold. + #[tokio::test] + async fn stuck_duration_measured_from_stuck_transition_not_started_at() { + let cm = Arc::new(ContextManager::new(10)); + let job_id = cm.create_job("Long runner", "desc").await.unwrap(); + + // Transition to InProgress (sets started_at to now). + cm.update_context(job_id, |ctx| ctx.transition_to(JobState::InProgress, None)) + .await + .unwrap() + .unwrap(); + + // Backdate started_at to 2 hours ago to simulate a long-running job. + cm.update_context(job_id, |ctx| { + ctx.started_at = Some(Utc::now() - chrono::Duration::hours(2)); + Ok::<(), crate::error::Error>(()) + }) + .await + .unwrap() + .unwrap(); + + // Now transition to Stuck (stuck transition timestamp is ~now). + cm.update_context(job_id, |ctx| { + ctx.transition_to(JobState::Stuck, Some("wedged".into())) + }) + .await + .unwrap() + .unwrap(); + + // With a 5-minute threshold, the job JUST became stuck — should NOT be detected. + let repair = DefaultSelfRepair::new(cm, Duration::from_secs(300), 3); + let stuck = repair.detect_stuck_jobs().await; + assert!( + stuck.is_empty(), + "Job stuck for <1s should not exceed 5min threshold, \ + but stuck_duration was computed from started_at (2h ago)" + ); + } + #[tokio::test] async fn detect_broken_tools_returns_empty_without_store() { let cm = Arc::new(ContextManager::new(10)); @@ -515,4 +613,148 @@ mod tests { result ); } + + /// Mock SoftwareBuilder that returns a successful build result. + struct MockBuilder { + build_count: std::sync::atomic::AtomicU32, + } + + impl MockBuilder { + fn new() -> Self { + Self { + build_count: std::sync::atomic::AtomicU32::new(0), + } + } + + fn builds(&self) -> u32 { + self.build_count.load(std::sync::atomic::Ordering::Relaxed) + } + } + + #[async_trait] + impl crate::tools::SoftwareBuilder for MockBuilder { + async fn analyze( + &self, + _description: &str, + ) -> Result { + Ok(crate::tools::BuildRequirement { + name: "mock-tool".to_string(), + description: "mock".to_string(), + software_type: crate::tools::SoftwareType::WasmTool, + language: crate::tools::Language::Rust, + input_spec: None, + output_spec: None, + dependencies: vec![], + capabilities: vec![], + }) + } + + async fn build( + &self, + requirement: &crate::tools::BuildRequirement, + ) -> Result { + self.build_count + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + Ok(crate::tools::BuildResult { + build_id: Uuid::new_v4(), + requirement: requirement.clone(), + artifact_path: std::path::PathBuf::from("/tmp/mock.wasm"), + logs: vec![], + success: true, + error: None, + started_at: Utc::now(), + completed_at: Utc::now(), + iterations: 1, + validation_warnings: vec![], + tests_passed: 1, + tests_failed: 0, + registered: true, + }) + } + + async fn repair( + &self, + _result: &crate::tools::BuildResult, + _error: &str, + ) -> Result { + unimplemented!("not needed for this test") + } + } + + /// E2E test: stuck job detected -> repaired -> transitions back to InProgress, + /// and broken tool detected -> builder invoked -> tool marked repaired. + #[cfg(feature = "libsql")] + #[tokio::test] + async fn e2e_stuck_job_repair_and_tool_rebuild() { + // --- Setup --- + let cm = Arc::new(ContextManager::new(10)); + let job_id = cm.create_job("E2E stuck job", "desc").await.unwrap(); + + // Transition job: Pending -> InProgress -> Stuck + cm.update_context(job_id, |ctx| ctx.transition_to(JobState::InProgress, None)) + .await + .unwrap() + .unwrap(); + cm.update_context(job_id, |ctx| { + ctx.transition_to(JobState::Stuck, Some("deadlocked".to_string())) + }) + .await + .unwrap() + .unwrap(); + + // Create a mock builder and a real test database (for store) + let builder = Arc::new(MockBuilder::new()); + let tools = Arc::new(ToolRegistry::new()); + let (db, _tmp_dir) = crate::testing::test_db().await; + + // Create self-repair with zero threshold (detect immediately), + // wired with store, builder, and tools. + let repair = DefaultSelfRepair::new(Arc::clone(&cm), Duration::from_secs(0), 3) + .with_store(Arc::clone(&db)) + .with_builder( + Arc::clone(&builder) as Arc, + tools, + ); + + // --- Phase 1: Detect and repair stuck job --- + let stuck_jobs = repair.detect_stuck_jobs().await; + assert_eq!(stuck_jobs.len(), 1, "Should detect the stuck job"); + assert_eq!(stuck_jobs[0].job_id, job_id); + + let result = repair.repair_stuck_job(&stuck_jobs[0]).await.unwrap(); + assert!( + matches!(result, RepairResult::Success { .. }), + "Job repair should succeed: {:?}", + result + ); + + // Verify job transitioned back to InProgress + let ctx = cm.get_context(job_id).await.unwrap(); + assert_eq!( + ctx.state, + JobState::InProgress, + "Job should be back to InProgress after repair" + ); + + // --- Phase 2: Repair a broken tool via builder --- + let broken = BrokenTool { + name: "broken-wasm-tool".to_string(), + failure_count: 10, + last_error: Some("panic in tool execution".to_string()), + first_failure: Utc::now() - chrono::Duration::hours(1), + last_failure: Utc::now(), + last_build_result: None, + repair_attempts: 0, + }; + + let tool_result = repair.repair_broken_tool(&broken).await.unwrap(); + assert!( + matches!(tool_result, RepairResult::Success { .. }), + "Tool repair should succeed with mock builder: {:?}", + tool_result + ); + + // Verify builder was actually invoked + assert_eq!(builder.builds(), 1, "Builder should have been called once"); + } } diff --git a/src/app.rs b/src/app.rs index 0ffe78206..fa6675bfa 100644 --- a/src/app.rs +++ b/src/app.rs @@ -56,6 +56,7 @@ pub struct AppComponents { pub session: Arc, pub catalog_entries: Vec, pub dev_loaded_tool_names: Vec, + pub builder: Option>, } /// Options that control optional init phases. @@ -280,6 +281,7 @@ impl AppBuilder { Arc, Option>, Option>, + Option>, ), anyhow::Error, > { @@ -367,16 +369,19 @@ impl AppBuilder { } // Register builder tool if enabled - if self.config.builder.enabled + let builder = if self.config.builder.enabled && (self.config.agent.allow_local_tools || !self.config.sandbox.enabled) { - tools + let b = tools .register_builder_tool(llm.clone(), Some(self.config.builder.to_builder_config())) .await; - tracing::debug!("Builder mode enabled"); - } + tracing::info!("Builder mode enabled"); + Some(b) + } else { + None + }; - Ok((safety, tools, embeddings, workspace)) + Ok((safety, tools, embeddings, workspace, builder)) } /// Phase 5: Load WASM tools, MCP servers, and create extension manager. @@ -699,7 +704,7 @@ impl AppBuilder { } else { self.init_llm().await? }; - let (safety, tools, embeddings, workspace) = self.init_tools(&llm).await?; + let (safety, tools, embeddings, workspace, builder) = self.init_tools(&llm).await?; // Create hook registry early so runtime extension activation can register hooks. let hooks = Arc::new(HookRegistry::new()); @@ -819,6 +824,7 @@ impl AppBuilder { session: self.session, catalog_entries, dev_loaded_tool_names, + builder, }) } } diff --git a/src/main.rs b/src/main.rs index ae864bed9..3a098395a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -727,6 +727,7 @@ async fn async_main() -> anyhow::Result<()> { document_extraction: Some(Arc::new( ironclaw::document_extraction::DocumentExtractionMiddleware::new(), )), + builder: components.builder, }; let mut agent = Agent::new( diff --git a/src/testing/mod.rs b/src/testing/mod.rs index ff522e3ad..4f51e321f 100644 --- a/src/testing/mod.rs +++ b/src/testing/mod.rs @@ -456,6 +456,7 @@ impl TestHarnessBuilder { http_interceptor: None, transcription: None, document_extraction: None, + builder: None, }; TestHarness { diff --git a/src/tools/registry.rs b/src/tools/registry.rs index 754869c8c..a419b1957 100644 --- a/src/tools/registry.rs +++ b/src/tools/registry.rs @@ -13,7 +13,9 @@ use crate::orchestrator::job_manager::ContainerJobManager; use crate::secrets::SecretsStore; use crate::skills::catalog::SkillCatalog; use crate::skills::registry::SkillRegistry; -use crate::tools::builder::{BuildSoftwareTool, BuilderConfig, LlmSoftwareBuilder}; +use crate::tools::builder::{ + BuildSoftwareTool, BuilderConfig, LlmSoftwareBuilder, SoftwareBuilder, +}; use crate::tools::builtin::{ ApplyPatchTool, CancelJobTool, CreateJobTool, EchoTool, ExtensionInfoTool, HttpTool, JobEventsTool, JobPromptTool, JobStatusTool, JsonTool, ListDirTool, ListJobsTool, @@ -580,22 +582,23 @@ impl ToolRegistry { self: &Arc, llm: Arc, config: Option, - ) { + ) -> Arc { // First register dev tools needed by the builder self.register_dev_tools(); // Create the builder (arg order: config, llm, tools) - let builder = Arc::new(LlmSoftwareBuilder::new( + let builder: Arc = Arc::new(LlmSoftwareBuilder::new( config.unwrap_or_default(), llm, Arc::clone(self), )); // Register the build_software tool - self.register(Arc::new(BuildSoftwareTool::new(builder))) + self.register(Arc::new(BuildSoftwareTool::new(Arc::clone(&builder)))) .await; - tracing::debug!("Registered software builder tool"); + tracing::info!("Registered software builder tool"); + builder } /// Register a WASM tool from bytes. diff --git a/tests/support/gateway_workflow_harness.rs b/tests/support/gateway_workflow_harness.rs index a4d737b52..1751faa54 100644 --- a/tests/support/gateway_workflow_harness.rs +++ b/tests/support/gateway_workflow_harness.rs @@ -256,6 +256,7 @@ impl GatewayWorkflowHarness { http_interceptor: None, transcription: None, document_extraction: None, + builder: None, }, channels, None, diff --git a/tests/support/test_rig.rs b/tests/support/test_rig.rs index 8549a21cb..7adc31a57 100644 --- a/tests/support/test_rig.rs +++ b/tests/support/test_rig.rs @@ -642,6 +642,7 @@ impl TestRigBuilder { }, transcription: None, document_extraction: None, + builder: None, }; // 7. Create TestChannel and ChannelManager.