Skip to content
13 changes: 11 additions & 2 deletions src/agent/agent_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ pub struct AgentDeps {
pub transcription: Option<Arc<crate::transcription::TranscriptionMiddleware>>,
/// Document text extraction middleware for PDF, DOCX, PPTX, etc.
pub document_extraction: Option<Arc<crate::document_extraction::DocumentExtractionMiddleware>>,
/// Software builder for self-repair tool rebuilding.
pub builder: Option<Arc<dyn crate::tools::SoftwareBuilder>>,
}

/// The main agent that coordinates all components.
Expand Down Expand Up @@ -285,11 +287,18 @@ impl Agent {
let mut message_stream = self.channels.start_all().await?;

// Start self-repair task with notification forwarding
let repair = Arc::new(DefaultSelfRepair::new(
let mut self_repair = DefaultSelfRepair::new(
self.context_manager.clone(),
self.config.stuck_threshold,
self.config.max_repair_attempts,
));
);
if let Some(ref store) = self.deps.store {
self_repair = self_repair.with_store(Arc::clone(store));
}
if let Some(ref builder) = self.deps.builder {
self_repair = self_repair.with_builder(Arc::clone(builder), Arc::clone(self.tools()));
}
let repair = Arc::new(self_repair);
let repair_interval = self.config.repair_check_interval;
let repair_channels = self.channels.clone();
let repair_owner_id = self.owner_id().to_string();
Expand Down
3 changes: 3 additions & 0 deletions src/agent/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,7 @@ mod tests {
http_interceptor: None,
transcription: None,
document_extraction: None,
builder: None,
};

Agent::new(
Expand Down Expand Up @@ -2037,6 +2038,7 @@ mod tests {
http_interceptor: None,
transcription: None,
document_extraction: None,
builder: None,
};

Agent::new(
Expand Down Expand Up @@ -2155,6 +2157,7 @@ mod tests {
http_interceptor: None,
transcription: None,
document_extraction: None,
builder: None,
};

Agent::new(
Expand Down
276 changes: 259 additions & 17 deletions src/agent/self_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,10 @@ pub trait SelfRepair: Send + Sync {
/// Default self-repair implementation.
pub struct DefaultSelfRepair {
context_manager: Arc<ContextManager>,
// TODO: use for time-based stuck detection (currently only max_repair_attempts is checked)
#[allow(dead_code)]
stuck_threshold: Duration,
max_repair_attempts: u32,
store: Option<Arc<dyn Database>>,
builder: Option<Arc<dyn SoftwareBuilder>>,
// TODO: use for tool hot-reload after repair
#[allow(dead_code)]
tools: Option<Arc<ToolRegistry>>,
}

Expand All @@ -95,15 +91,13 @@ impl DefaultSelfRepair {
}

/// Add a Store for tool failure tracking.
#[allow(dead_code)] // TODO: wire up in main.rs when persistence is needed
pub(crate) fn with_store(mut self, store: Arc<dyn Database>) -> Self {
pub fn with_store(mut self, store: Arc<dyn Database>) -> Self {
self.store = Some(store);
self
}

/// Add a Builder and ToolRegistry for automatic tool repair.
#[allow(dead_code)] // TODO: wire up in main.rs when auto-repair is needed
pub(crate) fn with_builder(
pub fn with_builder(
mut self,
builder: Arc<dyn SoftwareBuilder>,
tools: Arc<ToolRegistry>,
Expand All @@ -124,18 +118,30 @@ impl SelfRepair for DefaultSelfRepair {
if let Ok(ctx) = self.context_manager.get_context(job_id).await
&& ctx.state == JobState::Stuck
{
let stuck_duration = ctx
.started_at
.map(|start| {
let now = Utc::now();
let duration = now.signed_duration_since(start);
// Measure stuck_duration from the most recent Stuck transition,
// not from started_at (which reflects when the job first ran).
let stuck_since = ctx
.transitions
.iter()
.rev()
.find(|t| t.to == JobState::Stuck)
.map(|t| t.timestamp);

let stuck_duration = stuck_since
.map(|ts| {
let duration = Utc::now().signed_duration_since(ts);
Duration::from_secs(duration.num_seconds().max(0) as u64)
})
.unwrap_or_default();

// Only report jobs that have been stuck long enough
if stuck_duration < self.stuck_threshold {
continue;
}

stuck_jobs.push(StuckJob {
job_id,
last_activity: ctx.started_at.unwrap_or(ctx.created_at),
last_activity: stuck_since.unwrap_or(ctx.created_at),
stuck_duration,
last_error: None,
repair_attempts: ctx.repair_attempts,
Expand Down Expand Up @@ -273,9 +279,8 @@ impl SelfRepair for DefaultSelfRepair {
tracing::warn!("Failed to mark tool as repaired: {}", e);
}

// Log if the tool was auto-registered
if result.registered {
tracing::info!("Repaired tool '{}' auto-registered", tool.name);
tracing::info!("Repaired tool '{}' auto-registered by builder", tool.name);
}

Ok(RepairResult::Success {
Expand Down Expand Up @@ -417,7 +422,8 @@ mod tests {
.unwrap()
.unwrap();

let repair = DefaultSelfRepair::new(cm, Duration::from_secs(60), 3);
// Use zero threshold so the just-stuck job is detected immediately.
let repair = DefaultSelfRepair::new(cm, Duration::from_secs(0), 3);
let stuck = repair.detect_stuck_jobs().await;
assert_eq!(stuck.len(), 1);
assert_eq!(stuck[0].job_id, job_id);
Expand Down Expand Up @@ -483,6 +489,98 @@ mod tests {
);
}

#[tokio::test]
async fn detect_stuck_jobs_filters_by_threshold() {
let cm = Arc::new(ContextManager::new(10));
let job_id = cm.create_job("Stuck job", "desc").await.unwrap();

// Transition to InProgress, then to Stuck.
cm.update_context(job_id, |ctx| ctx.transition_to(JobState::InProgress, None))
.await
.unwrap()
.unwrap();
cm.update_context(job_id, |ctx| {
ctx.transition_to(JobState::Stuck, Some("timed out".to_string()))
})
.await
.unwrap()
.unwrap();

// Use a very large threshold (1 hour). Job just became stuck, so
// stuck_duration < threshold. It should be filtered out.
let repair = DefaultSelfRepair::new(cm, Duration::from_secs(3600), 3);
let stuck = repair.detect_stuck_jobs().await;
assert!(
stuck.is_empty(),
"Job stuck for <1s should be filtered by 1h threshold"
);
}

#[tokio::test]
async fn detect_stuck_jobs_includes_when_over_threshold() {
let cm = Arc::new(ContextManager::new(10));
let job_id = cm.create_job("Stuck job", "desc").await.unwrap();

// Transition to InProgress, then to Stuck.
cm.update_context(job_id, |ctx| ctx.transition_to(JobState::InProgress, None))
.await
.unwrap()
.unwrap();
cm.update_context(job_id, |ctx| {
ctx.transition_to(JobState::Stuck, Some("timed out".to_string()))
})
.await
.unwrap()
.unwrap();

// Use a zero threshold -- any stuck duration should be included.
let repair = DefaultSelfRepair::new(cm, Duration::from_secs(0), 3);
let stuck = repair.detect_stuck_jobs().await;
assert_eq!(stuck.len(), 1, "Job should be detected with zero threshold");
assert_eq!(stuck[0].job_id, job_id);
}

/// Regression: stuck_duration must be measured from the Stuck transition,
/// not from started_at. A job that ran for 2 hours before becoming stuck
/// should NOT immediately exceed a 5-minute threshold.
#[tokio::test]
async fn stuck_duration_measured_from_stuck_transition_not_started_at() {
let cm = Arc::new(ContextManager::new(10));
let job_id = cm.create_job("Long runner", "desc").await.unwrap();

// Transition to InProgress (sets started_at to now).
cm.update_context(job_id, |ctx| ctx.transition_to(JobState::InProgress, None))
.await
.unwrap()
.unwrap();

// Backdate started_at to 2 hours ago to simulate a long-running job.
cm.update_context(job_id, |ctx| {
ctx.started_at = Some(Utc::now() - chrono::Duration::hours(2));
Ok::<(), crate::error::Error>(())
})
.await
.unwrap()
.unwrap();

// Now transition to Stuck (stuck transition timestamp is ~now).
cm.update_context(job_id, |ctx| {
ctx.transition_to(JobState::Stuck, Some("wedged".into()))
})
.await
.unwrap()
.unwrap();

// With a 5-minute threshold, the job JUST became stuck — should NOT be detected.
let repair = DefaultSelfRepair::new(cm, Duration::from_secs(300), 3);
let stuck = repair.detect_stuck_jobs().await;
assert!(
stuck.is_empty(),
"Job stuck for <1s should not exceed 5min threshold, \
but stuck_duration was computed from started_at (2h ago)"
);
}

#[tokio::test]
async fn detect_broken_tools_returns_empty_without_store() {
let cm = Arc::new(ContextManager::new(10));
Expand Down Expand Up @@ -515,4 +613,148 @@ mod tests {
result
);
}

/// Mock SoftwareBuilder that returns a successful build result.
struct MockBuilder {
build_count: std::sync::atomic::AtomicU32,
}

impl MockBuilder {
fn new() -> Self {
Self {
build_count: std::sync::atomic::AtomicU32::new(0),
}
}

fn builds(&self) -> u32 {
self.build_count.load(std::sync::atomic::Ordering::Relaxed)
}
}

#[async_trait]
impl crate::tools::SoftwareBuilder for MockBuilder {
async fn analyze(
&self,
_description: &str,
) -> Result<crate::tools::BuildRequirement, crate::error::ToolError> {
Ok(crate::tools::BuildRequirement {
name: "mock-tool".to_string(),
description: "mock".to_string(),
software_type: crate::tools::SoftwareType::WasmTool,
language: crate::tools::Language::Rust,
input_spec: None,
output_spec: None,
dependencies: vec![],
capabilities: vec![],
})
}

async fn build(
&self,
requirement: &crate::tools::BuildRequirement,
) -> Result<crate::tools::BuildResult, crate::error::ToolError> {
self.build_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok(crate::tools::BuildResult {
build_id: Uuid::new_v4(),
requirement: requirement.clone(),
artifact_path: std::path::PathBuf::from("/tmp/mock.wasm"),
logs: vec![],
success: true,
error: None,
started_at: Utc::now(),
completed_at: Utc::now(),
iterations: 1,
validation_warnings: vec![],
tests_passed: 1,
tests_failed: 0,
registered: true,
})
}

async fn repair(
&self,
_result: &crate::tools::BuildResult,
_error: &str,
) -> Result<crate::tools::BuildResult, crate::error::ToolError> {
unimplemented!("not needed for this test")
}
}

/// E2E test: stuck job detected -> repaired -> transitions back to InProgress,
/// and broken tool detected -> builder invoked -> tool marked repaired.
#[cfg(feature = "libsql")]
#[tokio::test]
async fn e2e_stuck_job_repair_and_tool_rebuild() {
// --- Setup ---
let cm = Arc::new(ContextManager::new(10));
let job_id = cm.create_job("E2E stuck job", "desc").await.unwrap();

// Transition job: Pending -> InProgress -> Stuck
cm.update_context(job_id, |ctx| ctx.transition_to(JobState::InProgress, None))
.await
.unwrap()
.unwrap();
cm.update_context(job_id, |ctx| {
ctx.transition_to(JobState::Stuck, Some("deadlocked".to_string()))
})
.await
.unwrap()
.unwrap();

// Create a mock builder and a real test database (for store)
let builder = Arc::new(MockBuilder::new());
let tools = Arc::new(ToolRegistry::new());
let (db, _tmp_dir) = crate::testing::test_db().await;

// Create self-repair with zero threshold (detect immediately),
// wired with store, builder, and tools.
let repair = DefaultSelfRepair::new(Arc::clone(&cm), Duration::from_secs(0), 3)
.with_store(Arc::clone(&db))
.with_builder(
Arc::clone(&builder) as Arc<dyn crate::tools::SoftwareBuilder>,
tools,
);

// --- Phase 1: Detect and repair stuck job ---
let stuck_jobs = repair.detect_stuck_jobs().await;
assert_eq!(stuck_jobs.len(), 1, "Should detect the stuck job");
assert_eq!(stuck_jobs[0].job_id, job_id);

let result = repair.repair_stuck_job(&stuck_jobs[0]).await.unwrap();
assert!(
matches!(result, RepairResult::Success { .. }),
"Job repair should succeed: {:?}",
result
);

// Verify job transitioned back to InProgress
let ctx = cm.get_context(job_id).await.unwrap();
assert_eq!(
ctx.state,
JobState::InProgress,
"Job should be back to InProgress after repair"
);

// --- Phase 2: Repair a broken tool via builder ---
let broken = BrokenTool {
name: "broken-wasm-tool".to_string(),
failure_count: 10,
last_error: Some("panic in tool execution".to_string()),
first_failure: Utc::now() - chrono::Duration::hours(1),
last_failure: Utc::now(),
last_build_result: None,
repair_attempts: 0,
};

let tool_result = repair.repair_broken_tool(&broken).await.unwrap();
assert!(
matches!(tool_result, RepairResult::Success { .. }),
"Tool repair should succeed with mock builder: {:?}",
tool_result
);

// Verify builder was actually invoked
assert_eq!(builder.builds(), 1, "Builder should have been called once");
}
}
Loading
Loading