diff --git a/channels-src/telegram/src/lib.rs b/channels-src/telegram/src/lib.rs index a095ccb3a2..f34ed68aa7 100644 --- a/channels-src/telegram/src/lib.rs +++ b/channels-src/telegram/src/lib.rs @@ -360,6 +360,8 @@ enum TelegramStatusAction { } const TELEGRAM_STATUS_MAX_CHARS: usize = 600; +/// Telegram's hard limit for message text length. +const TELEGRAM_MAX_MESSAGE_LEN: usize = 4096; fn truncate_status_message(input: &str, max_chars: usize) -> String { let mut iter = input.chars(); @@ -371,6 +373,73 @@ fn truncate_status_message(input: &str, max_chars: usize) -> String { } } +/// Split a long message into chunks that fit within Telegram's 4096-char limit. +/// +/// Tries to split at the most natural boundary available (in priority order): +/// 1. Double newline (paragraph break) +/// 2. Single newline +/// 3. Sentence end (`. `, `! `, `? `) +/// 4. Word boundary (space) +/// 5. Hard cut at the limit (last resort for pathological input) +fn split_message(text: &str) -> Vec { + if text.chars().count() <= TELEGRAM_MAX_MESSAGE_LEN { + return vec![text.to_string()]; + } + + let mut chunks: Vec = Vec::new(); + let mut remaining = text; + + while !remaining.is_empty() { + // Count chars to find the byte offset for our window. + let window_bytes = remaining + .char_indices() + .take(TELEGRAM_MAX_MESSAGE_LEN) + .last() + .map(|(byte_idx, ch)| byte_idx + ch.len_utf8()) + .unwrap_or(remaining.len()); + + if window_bytes >= remaining.len() { + // Remainder fits entirely. + chunks.push(remaining.to_string()); + break; + } + + let window = &remaining[..window_bytes]; + + // 1. Double newline — best paragraph boundary + let split_at = window.rfind("\n\n") + // 2. Single newline + .or_else(|| window.rfind('\n')) + // 3. Sentence-ending punctuation followed by space. + // Note: this only detects ASCII punctuation (. ! ?), not CJK + // sentence-ending marks (。!?). CJK text falls through to + // word-boundary or hard-cut splitting. + .or_else(|| { + let bytes = window.as_bytes(); + // Search backwards for '. ', '! ', '? ' + (1..bytes.len()).rev().find(|&i| { + matches!(bytes[i - 1], b'.' | b'!' | b'?') && bytes[i] == b' ' + }) + }) + // 4. Word boundary (last space) + .or_else(|| window.rfind(' ')) + // 5. Hard cut + .unwrap_or(window_bytes); + + // Avoid empty chunks (e.g. text starting with \n\n). + let split_at = if split_at == 0 { window_bytes } else { split_at }; + + // Trim whitespace at chunk boundaries for clean Telegram display. + // Note: this drops leading/trailing spaces at split points, which is + // acceptable for chat messages but means the concatenation of chunks + // may not exactly equal the original text when split at spaces. + chunks.push(remaining[..split_at].trim_end().to_string()); + remaining = remaining[split_at..].trim_start(); + } + + chunks +} + fn status_message_for_user(update: &StatusUpdate) -> Option { let message = update.message.trim(); if message.is_empty() { @@ -1242,26 +1311,64 @@ fn send_response( return Ok(()); } - // Try Markdown, fall back to plain text on parse errors - match send_message( - chat_id, - &response.content, - reply_to_message_id, - Some("Markdown"), - message_thread_id, - ) { - Ok(_) => Ok(()), - Err(SendError::ParseEntities(_)) => send_message( - chat_id, - &response.content, - reply_to_message_id, - None, - message_thread_id, - ) - .map(|_| ()) - .map_err(|e| format!("Plain-text retry also failed: {}", e)), - Err(e) => Err(e.to_string()), + // Split large messages into chunks that fit Telegram's limit. + let chunks = split_message(&response.content); + let total = chunks.len(); + + // The first chunk replies to the original message; subsequent chunks + // reply to the previously sent chunk so they form a visual thread. + let mut reply_to = reply_to_message_id; + + for (i, chunk) in chunks.into_iter().enumerate() { + // Try Markdown, fall back to plain text on parse errors + let result = send_message(chat_id, &chunk, reply_to, Some("Markdown"), message_thread_id); + + let msg_id = match result { + Ok(id) => { + channel_host::log( + channel_host::LogLevel::Debug, + &format!( + "Sent message chunk {}/{} to chat {}: message_id={}", + i + 1, + total, + chat_id, + id, + ), + ); + id + } + Err(SendError::ParseEntities(detail)) => { + channel_host::log( + channel_host::LogLevel::Warn, + &format!( + "Markdown parse failed on chunk {}/{} ({}), retrying as plain text", + i + 1, + total, + detail + ), + ); + let id = send_message(chat_id, &chunk, reply_to, None, message_thread_id) + .map_err(|e| format!("Plain-text retry also failed: {}", e))?; + channel_host::log( + channel_host::LogLevel::Debug, + &format!( + "Sent plain-text chunk {}/{} to chat {}: message_id={}", + i + 1, + total, + chat_id, + id, + ), + ); + id + } + Err(e) => return Err(e.to_string()), + }; + + // Each subsequent chunk threads off the previous sent message. + reply_to = Some(msg_id); } + + Ok(()) } /// Send a single attachment, choosing sendPhoto or sendDocument based on MIME type. @@ -2043,6 +2150,102 @@ export!(TelegramChannel); mod tests { use super::*; + #[test] + fn test_split_message_short() { + let text = "Hello, world!"; + let chunks = split_message(text); + assert_eq!(chunks, vec![text]); + } + + #[test] + fn test_split_message_paragraph_boundary() { + let para_a = "A".repeat(3000); + let para_b = "B".repeat(3000); + let text = format!("{}\n\n{}", para_a, para_b); + let chunks = split_message(&text); + assert_eq!(chunks.len(), 2); + assert_eq!(chunks[0], para_a); + assert_eq!(chunks[1], para_b); + } + + #[test] + fn test_split_message_word_boundary() { + // Build a string well over the limit with no newlines. + let words: Vec = (0..1000).map(|i| format!("word{:04}", i)).collect(); + let text = words.join(" "); + assert!(text.len() > TELEGRAM_MAX_MESSAGE_LEN); + let chunks = split_message(&text); + assert!(chunks.len() > 1, "expected multiple chunks"); + for chunk in &chunks { + assert!(chunk.chars().count() <= TELEGRAM_MAX_MESSAGE_LEN); + } + // Rejoined chunks must equal the original text exactly. + let rejoined = chunks.join(" "); + assert_eq!(rejoined, text); + } + + #[test] + fn test_split_message_each_chunk_fits() { + // Stress-test: 20 000 chars of mixed text. + let text: String = (0..500) + .map(|i| format!("Sentence number {}. ", i)) + .collect(); + assert!(text.len() > TELEGRAM_MAX_MESSAGE_LEN); + let chunks = split_message(&text); + for chunk in &chunks { + assert!(chunk.chars().count() <= TELEGRAM_MAX_MESSAGE_LEN); + } + } + + #[test] + fn test_split_message_sentence_boundary() { + // Build text that exceeds the limit, with sentence boundaries inside. + let sentence = "This is a test sentence. "; + let repeat_count = TELEGRAM_MAX_MESSAGE_LEN / sentence.len() + 5; + let text: String = sentence.repeat(repeat_count); + assert!(text.chars().count() > TELEGRAM_MAX_MESSAGE_LEN); + + let chunks = split_message(&text); + assert!(chunks.len() > 1); + // First chunk should end at a sentence boundary (trimmed) + let first = &chunks[0]; + assert!( + first.ends_with('.'), + "First chunk should end at a sentence boundary, got: ...{}", + &first[first.len().saturating_sub(20)..] + ); + } + + #[test] + fn test_split_message_hard_cut_no_spaces() { + // Pathological input: a single huge "word" with no spaces or newlines. + let text = "x".repeat(TELEGRAM_MAX_MESSAGE_LEN * 2 + 100); + let chunks = split_message(&text); + assert!(chunks.len() >= 2); + for chunk in &chunks { + assert!(chunk.chars().count() <= TELEGRAM_MAX_MESSAGE_LEN); + } + // Rejoined must preserve all characters + let rejoined: String = chunks.concat(); + assert_eq!(rejoined, text); + } + + #[test] + fn test_split_message_multibyte_chars() { + // Emoji are 4 bytes each. Ensure we don't panic or split mid-character. + let emoji = "\u{1F600}"; // 😀 + let text: String = emoji.repeat(TELEGRAM_MAX_MESSAGE_LEN + 100); + assert!(text.chars().count() > TELEGRAM_MAX_MESSAGE_LEN); + + let chunks = split_message(&text); + assert!(chunks.len() >= 2); + for chunk in &chunks { + assert!(chunk.chars().count() <= TELEGRAM_MAX_MESSAGE_LEN); + // Every char should be a complete emoji + assert!(chunk.chars().all(|c| c == '\u{1F600}')); + } + } + #[test] fn test_clean_message_text() { // Without bot_username: strips any leading @mention diff --git a/src/agent/agent_loop.rs b/src/agent/agent_loop.rs index 132ba4a1c1..1780ba9dc4 100644 --- a/src/agent/agent_loop.rs +++ b/src/agent/agent_loop.rs @@ -146,6 +146,8 @@ pub struct AgentDeps { pub transcription: Option>, /// Document text extraction middleware for PDF, DOCX, PPTX, etc. pub document_extraction: Option>, + /// Software builder for self-repair tool rebuilding. + pub builder: Option>, } /// The main agent that coordinates all components. @@ -340,11 +342,18 @@ impl Agent { let mut message_stream = self.channels.start_all().await?; // Start self-repair task with notification forwarding - let repair = Arc::new(DefaultSelfRepair::new( + let mut self_repair = DefaultSelfRepair::new( self.context_manager.clone(), self.config.stuck_threshold, self.config.max_repair_attempts, - )); + ); + if let Some(ref store) = self.deps.store { + self_repair = self_repair.with_store(Arc::clone(store)); + } + if let Some(ref builder) = self.deps.builder { + self_repair = self_repair.with_builder(Arc::clone(builder), Arc::clone(self.tools())); + } + let repair = Arc::new(self_repair); let repair_interval = self.config.repair_check_interval; let repair_channels = self.channels.clone(); let repair_owner_id = self.owner_id().to_string(); diff --git a/src/agent/dispatcher.rs b/src/agent/dispatcher.rs index 9be0d654d1..49387e8351 100644 --- a/src/agent/dispatcher.rs +++ b/src/agent/dispatcher.rs @@ -1197,6 +1197,7 @@ mod tests { http_interceptor: None, transcription: None, document_extraction: None, + builder: None, }; Agent::new( @@ -2037,6 +2038,7 @@ mod tests { http_interceptor: None, transcription: None, document_extraction: None, + builder: None, }; Agent::new( @@ -2155,6 +2157,7 @@ mod tests { http_interceptor: None, transcription: None, document_extraction: None, + builder: None, }; Agent::new( diff --git a/src/agent/self_repair.rs b/src/agent/self_repair.rs index a67fe23eba..db491194f8 100644 --- a/src/agent/self_repair.rs +++ b/src/agent/self_repair.rs @@ -66,14 +66,10 @@ pub trait SelfRepair: Send + Sync { /// Default self-repair implementation. pub struct DefaultSelfRepair { context_manager: Arc, - // TODO: use for time-based stuck detection (currently only max_repair_attempts is checked) - #[allow(dead_code)] stuck_threshold: Duration, max_repair_attempts: u32, store: Option>, builder: Option>, - // TODO: use for tool hot-reload after repair - #[allow(dead_code)] tools: Option>, } @@ -95,15 +91,13 @@ impl DefaultSelfRepair { } /// Add a Store for tool failure tracking. - #[allow(dead_code)] // TODO: wire up in main.rs when persistence is needed - pub(crate) fn with_store(mut self, store: Arc) -> Self { + pub fn with_store(mut self, store: Arc) -> Self { self.store = Some(store); self } /// Add a Builder and ToolRegistry for automatic tool repair. - #[allow(dead_code)] // TODO: wire up in main.rs when auto-repair is needed - pub(crate) fn with_builder( + pub fn with_builder( mut self, builder: Arc, tools: Arc, @@ -124,18 +118,30 @@ impl SelfRepair for DefaultSelfRepair { if let Ok(ctx) = self.context_manager.get_context(job_id).await && ctx.state == JobState::Stuck { - let stuck_duration = ctx - .started_at - .map(|start| { - let now = Utc::now(); - let duration = now.signed_duration_since(start); + // Measure stuck_duration from the most recent Stuck transition, + // not from started_at (which reflects when the job first ran). + let stuck_since = ctx + .transitions + .iter() + .rev() + .find(|t| t.to == JobState::Stuck) + .map(|t| t.timestamp); + + let stuck_duration = stuck_since + .map(|ts| { + let duration = Utc::now().signed_duration_since(ts); Duration::from_secs(duration.num_seconds().max(0) as u64) }) .unwrap_or_default(); + // Only report jobs that have been stuck long enough + if stuck_duration < self.stuck_threshold { + continue; + } + stuck_jobs.push(StuckJob { job_id, - last_activity: ctx.started_at.unwrap_or(ctx.created_at), + last_activity: stuck_since.unwrap_or(ctx.created_at), stuck_duration, last_error: None, repair_attempts: ctx.repair_attempts, @@ -273,9 +279,8 @@ impl SelfRepair for DefaultSelfRepair { tracing::warn!("Failed to mark tool as repaired: {}", e); } - // Log if the tool was auto-registered if result.registered { - tracing::info!("Repaired tool '{}' auto-registered", tool.name); + tracing::info!("Repaired tool '{}' auto-registered by builder", tool.name); } Ok(RepairResult::Success { @@ -417,7 +422,8 @@ mod tests { .unwrap() .unwrap(); - let repair = DefaultSelfRepair::new(cm, Duration::from_secs(60), 3); + // Use zero threshold so the just-stuck job is detected immediately. + let repair = DefaultSelfRepair::new(cm, Duration::from_secs(0), 3); let stuck = repair.detect_stuck_jobs().await; assert_eq!(stuck.len(), 1); assert_eq!(stuck[0].job_id, job_id); @@ -483,6 +489,98 @@ mod tests { ); } + #[tokio::test] + async fn detect_stuck_jobs_filters_by_threshold() { + let cm = Arc::new(ContextManager::new(10)); + let job_id = cm.create_job("Stuck job", "desc").await.unwrap(); + + // Transition to InProgress, then to Stuck. + cm.update_context(job_id, |ctx| ctx.transition_to(JobState::InProgress, None)) + .await + .unwrap() + .unwrap(); + cm.update_context(job_id, |ctx| { + ctx.transition_to(JobState::Stuck, Some("timed out".to_string())) + }) + .await + .unwrap() + .unwrap(); + + // Use a very large threshold (1 hour). Job just became stuck, so + // stuck_duration < threshold. It should be filtered out. + let repair = DefaultSelfRepair::new(cm, Duration::from_secs(3600), 3); + let stuck = repair.detect_stuck_jobs().await; + assert!( + stuck.is_empty(), + "Job stuck for <1s should be filtered by 1h threshold" + ); + } + + #[tokio::test] + async fn detect_stuck_jobs_includes_when_over_threshold() { + let cm = Arc::new(ContextManager::new(10)); + let job_id = cm.create_job("Stuck job", "desc").await.unwrap(); + + // Transition to InProgress, then to Stuck. + cm.update_context(job_id, |ctx| ctx.transition_to(JobState::InProgress, None)) + .await + .unwrap() + .unwrap(); + cm.update_context(job_id, |ctx| { + ctx.transition_to(JobState::Stuck, Some("timed out".to_string())) + }) + .await + .unwrap() + .unwrap(); + + // Use a zero threshold -- any stuck duration should be included. + let repair = DefaultSelfRepair::new(cm, Duration::from_secs(0), 3); + let stuck = repair.detect_stuck_jobs().await; + assert_eq!(stuck.len(), 1, "Job should be detected with zero threshold"); + assert_eq!(stuck[0].job_id, job_id); + } + + /// Regression: stuck_duration must be measured from the Stuck transition, + /// not from started_at. A job that ran for 2 hours before becoming stuck + /// should NOT immediately exceed a 5-minute threshold. + #[tokio::test] + async fn stuck_duration_measured_from_stuck_transition_not_started_at() { + let cm = Arc::new(ContextManager::new(10)); + let job_id = cm.create_job("Long runner", "desc").await.unwrap(); + + // Transition to InProgress (sets started_at to now). + cm.update_context(job_id, |ctx| ctx.transition_to(JobState::InProgress, None)) + .await + .unwrap() + .unwrap(); + + // Backdate started_at to 2 hours ago to simulate a long-running job. + cm.update_context(job_id, |ctx| { + ctx.started_at = Some(Utc::now() - chrono::Duration::hours(2)); + Ok::<(), crate::error::Error>(()) + }) + .await + .unwrap() + .unwrap(); + + // Now transition to Stuck (stuck transition timestamp is ~now). + cm.update_context(job_id, |ctx| { + ctx.transition_to(JobState::Stuck, Some("wedged".into())) + }) + .await + .unwrap() + .unwrap(); + + // With a 5-minute threshold, the job JUST became stuck — should NOT be detected. + let repair = DefaultSelfRepair::new(cm, Duration::from_secs(300), 3); + let stuck = repair.detect_stuck_jobs().await; + assert!( + stuck.is_empty(), + "Job stuck for <1s should not exceed 5min threshold, \ + but stuck_duration was computed from started_at (2h ago)" + ); + } + #[tokio::test] async fn detect_broken_tools_returns_empty_without_store() { let cm = Arc::new(ContextManager::new(10)); @@ -515,4 +613,148 @@ mod tests { result ); } + + /// Mock SoftwareBuilder that returns a successful build result. + struct MockBuilder { + build_count: std::sync::atomic::AtomicU32, + } + + impl MockBuilder { + fn new() -> Self { + Self { + build_count: std::sync::atomic::AtomicU32::new(0), + } + } + + fn builds(&self) -> u32 { + self.build_count.load(std::sync::atomic::Ordering::Relaxed) + } + } + + #[async_trait] + impl crate::tools::SoftwareBuilder for MockBuilder { + async fn analyze( + &self, + _description: &str, + ) -> Result { + Ok(crate::tools::BuildRequirement { + name: "mock-tool".to_string(), + description: "mock".to_string(), + software_type: crate::tools::SoftwareType::WasmTool, + language: crate::tools::Language::Rust, + input_spec: None, + output_spec: None, + dependencies: vec![], + capabilities: vec![], + }) + } + + async fn build( + &self, + requirement: &crate::tools::BuildRequirement, + ) -> Result { + self.build_count + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + Ok(crate::tools::BuildResult { + build_id: Uuid::new_v4(), + requirement: requirement.clone(), + artifact_path: std::path::PathBuf::from("/tmp/mock.wasm"), + logs: vec![], + success: true, + error: None, + started_at: Utc::now(), + completed_at: Utc::now(), + iterations: 1, + validation_warnings: vec![], + tests_passed: 1, + tests_failed: 0, + registered: true, + }) + } + + async fn repair( + &self, + _result: &crate::tools::BuildResult, + _error: &str, + ) -> Result { + unimplemented!("not needed for this test") + } + } + + /// E2E test: stuck job detected -> repaired -> transitions back to InProgress, + /// and broken tool detected -> builder invoked -> tool marked repaired. + #[cfg(feature = "libsql")] + #[tokio::test] + async fn e2e_stuck_job_repair_and_tool_rebuild() { + // --- Setup --- + let cm = Arc::new(ContextManager::new(10)); + let job_id = cm.create_job("E2E stuck job", "desc").await.unwrap(); + + // Transition job: Pending -> InProgress -> Stuck + cm.update_context(job_id, |ctx| ctx.transition_to(JobState::InProgress, None)) + .await + .unwrap() + .unwrap(); + cm.update_context(job_id, |ctx| { + ctx.transition_to(JobState::Stuck, Some("deadlocked".to_string())) + }) + .await + .unwrap() + .unwrap(); + + // Create a mock builder and a real test database (for store) + let builder = Arc::new(MockBuilder::new()); + let tools = Arc::new(ToolRegistry::new()); + let (db, _tmp_dir) = crate::testing::test_db().await; + + // Create self-repair with zero threshold (detect immediately), + // wired with store, builder, and tools. + let repair = DefaultSelfRepair::new(Arc::clone(&cm), Duration::from_secs(0), 3) + .with_store(Arc::clone(&db)) + .with_builder( + Arc::clone(&builder) as Arc, + tools, + ); + + // --- Phase 1: Detect and repair stuck job --- + let stuck_jobs = repair.detect_stuck_jobs().await; + assert_eq!(stuck_jobs.len(), 1, "Should detect the stuck job"); + assert_eq!(stuck_jobs[0].job_id, job_id); + + let result = repair.repair_stuck_job(&stuck_jobs[0]).await.unwrap(); + assert!( + matches!(result, RepairResult::Success { .. }), + "Job repair should succeed: {:?}", + result + ); + + // Verify job transitioned back to InProgress + let ctx = cm.get_context(job_id).await.unwrap(); + assert_eq!( + ctx.state, + JobState::InProgress, + "Job should be back to InProgress after repair" + ); + + // --- Phase 2: Repair a broken tool via builder --- + let broken = BrokenTool { + name: "broken-wasm-tool".to_string(), + failure_count: 10, + last_error: Some("panic in tool execution".to_string()), + first_failure: Utc::now() - chrono::Duration::hours(1), + last_failure: Utc::now(), + last_build_result: None, + repair_attempts: 0, + }; + + let tool_result = repair.repair_broken_tool(&broken).await.unwrap(); + assert!( + matches!(tool_result, RepairResult::Success { .. }), + "Tool repair should succeed with mock builder: {:?}", + tool_result + ); + + // Verify builder was actually invoked + assert_eq!(builder.builds(), 1, "Builder should have been called once"); + } } diff --git a/src/app.rs b/src/app.rs index 0ffe782064..fa6675bfad 100644 --- a/src/app.rs +++ b/src/app.rs @@ -56,6 +56,7 @@ pub struct AppComponents { pub session: Arc, pub catalog_entries: Vec, pub dev_loaded_tool_names: Vec, + pub builder: Option>, } /// Options that control optional init phases. @@ -280,6 +281,7 @@ impl AppBuilder { Arc, Option>, Option>, + Option>, ), anyhow::Error, > { @@ -367,16 +369,19 @@ impl AppBuilder { } // Register builder tool if enabled - if self.config.builder.enabled + let builder = if self.config.builder.enabled && (self.config.agent.allow_local_tools || !self.config.sandbox.enabled) { - tools + let b = tools .register_builder_tool(llm.clone(), Some(self.config.builder.to_builder_config())) .await; - tracing::debug!("Builder mode enabled"); - } + tracing::info!("Builder mode enabled"); + Some(b) + } else { + None + }; - Ok((safety, tools, embeddings, workspace)) + Ok((safety, tools, embeddings, workspace, builder)) } /// Phase 5: Load WASM tools, MCP servers, and create extension manager. @@ -699,7 +704,7 @@ impl AppBuilder { } else { self.init_llm().await? }; - let (safety, tools, embeddings, workspace) = self.init_tools(&llm).await?; + let (safety, tools, embeddings, workspace, builder) = self.init_tools(&llm).await?; // Create hook registry early so runtime extension activation can register hooks. let hooks = Arc::new(HookRegistry::new()); @@ -819,6 +824,7 @@ impl AppBuilder { session: self.session, catalog_entries, dev_loaded_tool_names, + builder, }) } } diff --git a/src/main.rs b/src/main.rs index 65c04dda8d..e7477bc35f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -748,6 +748,7 @@ async fn async_main() -> anyhow::Result<()> { document_extraction: Some(Arc::new( ironclaw::document_extraction::DocumentExtractionMiddleware::new(), )), + builder: components.builder, }; let mut agent = Agent::new( diff --git a/src/testing/fault_injection.rs b/src/testing/fault_injection.rs new file mode 100644 index 0000000000..f9f8d23bd9 --- /dev/null +++ b/src/testing/fault_injection.rs @@ -0,0 +1,432 @@ +//! Fault injection framework for testing retry, failover, and circuit breaker behavior. +//! +//! Provides [`FaultInjector`] which can be attached to [`StubLlm`](super::StubLlm) to +//! produce configurable error sequences, random failures, and delays. +//! +//! # Example +//! +//! ```rust,no_run +//! use ironclaw::testing::fault_injection::*; +//! +//! // Fail twice with transient errors, then succeed +//! let injector = FaultInjector::sequence([ +//! FaultAction::Fail(FaultType::RequestFailed), +//! FaultAction::Fail(FaultType::RateLimited { retry_after: None }), +//! FaultAction::Succeed, +//! ]); +//! ``` + +use std::sync::Mutex; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::time::Duration; + +use crate::llm::error::LlmError; + +/// The type of fault to inject. +#[derive(Debug, Clone)] +pub enum FaultType { + /// Transient request failure (retryable). + RequestFailed, + /// Rate limited with optional retry-after duration. + RateLimited { retry_after: Option }, + /// Authentication failure (non-retryable). + AuthFailed, + /// Invalid response from provider (retryable). + InvalidResponse, + /// I/O error (retryable). + IoError, + /// Context length exceeded (non-retryable). + ContextLengthExceeded, + /// Session expired (transient for circuit breaker, not retryable). + SessionExpired, +} + +impl FaultType { + /// Convert to the corresponding `LlmError`. + pub fn to_llm_error(&self, provider: &str) -> LlmError { + match self { + FaultType::RequestFailed => LlmError::RequestFailed { + provider: provider.to_string(), + reason: "injected fault: request failed".to_string(), + }, + FaultType::RateLimited { retry_after } => LlmError::RateLimited { + provider: provider.to_string(), + retry_after: *retry_after, + }, + FaultType::AuthFailed => LlmError::AuthFailed { + provider: provider.to_string(), + }, + FaultType::InvalidResponse => LlmError::InvalidResponse { + provider: provider.to_string(), + reason: "injected fault: invalid response".to_string(), + }, + FaultType::IoError => LlmError::Io(std::io::Error::new( + std::io::ErrorKind::ConnectionReset, + "injected fault: connection reset", + )), + FaultType::ContextLengthExceeded => LlmError::ContextLengthExceeded { + used: 100_000, + limit: 50_000, + }, + FaultType::SessionExpired => LlmError::SessionExpired { + provider: provider.to_string(), + }, + } + } +} + +/// Action to take on a given call. +#[derive(Debug, Clone)] +pub enum FaultAction { + /// Return a successful response. + Succeed, + /// Return an error of the given type. + Fail(FaultType), + /// Sleep for the given duration, then succeed. + Delay(Duration), +} + +/// How the fault sequence is consumed. +#[derive(Debug, Clone)] +pub enum FaultMode { + /// Play the sequence once, then succeed for all subsequent calls. + SequenceOnce, + /// Loop the sequence forever. + SequenceLoop, + /// Fail randomly at the given rate (0.0 = never, 1.0 = always) with + /// the specified fault type. Uses a seeded RNG for reproducibility. + /// The seed is stored so that [`FaultInjector::reset()`] can re-initialize + /// the RNG for test reproducibility. + Random { + error_rate: f64, + fault: FaultType, + seed: u64, + }, +} + +/// A configurable fault injector for [`StubLlm`](super::StubLlm). +/// +/// Thread-safe: uses atomic call counter and mutex-protected RNG. +pub struct FaultInjector { + actions: Vec, + mode: FaultMode, + call_index: AtomicU32, + /// Seeded RNG for Random mode, behind Mutex for Sync. + rng_state: Mutex, +} + +impl std::fmt::Debug for FaultInjector { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FaultInjector") + .field("call_index", &self.call_index.load(Ordering::Relaxed)) + .field("mode", &self.mode) + .finish() + } +} + +impl FaultInjector { + /// Create a fault injector that plays actions once, then succeeds. + pub fn sequence(actions: impl IntoIterator) -> Self { + Self { + actions: actions.into_iter().collect(), + mode: FaultMode::SequenceOnce, + call_index: AtomicU32::new(0), + rng_state: Mutex::new(0), + } + } + + /// Create a fault injector that loops the action sequence forever. + pub fn sequence_loop(actions: impl IntoIterator) -> Self { + Self { + actions: actions.into_iter().collect(), + mode: FaultMode::SequenceLoop, + call_index: AtomicU32::new(0), + rng_state: Mutex::new(0), + } + } + + /// Create a fault injector with random failures at the given rate. + /// + /// # Panics + /// + /// Panics if `error_rate` is not in `0.0..=1.0` or is NaN. + /// + /// The seed is guarded against zero, which is a fixed point for xorshift. + pub fn random(error_rate: f64, fault: FaultType, seed: u64) -> Self { + assert!( + !error_rate.is_nan() && (0.0..=1.0).contains(&error_rate), + "error_rate must be in 0.0..=1.0 and not NaN, got {error_rate}" + ); + let seed = if seed == 0 { 1 } else { seed }; + Self { + actions: Vec::new(), + mode: FaultMode::Random { + error_rate, + fault, + seed, + }, + call_index: AtomicU32::new(0), + rng_state: Mutex::new(seed), + } + } + + /// Get the action for the next call. + pub fn next_action(&self) -> FaultAction { + let index = self.call_index.fetch_add(1, Ordering::Relaxed) as usize; + + match &self.mode { + FaultMode::SequenceOnce => { + if index < self.actions.len() { + self.actions[index].clone() + } else { + FaultAction::Succeed + } + } + FaultMode::SequenceLoop => { + if self.actions.is_empty() { + FaultAction::Succeed + } else { + self.actions[index % self.actions.len()].clone() + } + } + FaultMode::Random { + error_rate, fault, .. + } => { + // Simple xorshift64 PRNG for reproducible randomness. + let random_val = { + let mut state = self.rng_state.lock().unwrap_or_else(|p| p.into_inner()); + *state ^= *state << 13; + *state ^= *state >> 7; + *state ^= *state << 17; + (*state as f64) / (u64::MAX as f64) + }; + if random_val <= *error_rate { + FaultAction::Fail(fault.clone()) + } else { + FaultAction::Succeed + } + } + } + } + + /// Get the total number of calls made. + pub fn call_count(&self) -> u32 { + self.call_index.load(Ordering::Relaxed) + } + + /// Reset the injector to its initial state. + /// + /// For `Random` mode, re-initializes the RNG from the stored seed, + /// which is useful for test reproducibility. + /// For all modes, resets the call counter to zero. + pub fn reset(&self) { + self.call_index.store(0, Ordering::Relaxed); + if let FaultMode::Random { seed, .. } = &self.mode { + let mut state = self.rng_state.lock().unwrap_or_else(|p| p.into_inner()); + *state = *seed; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sequence_once_plays_then_succeeds() { + let injector = FaultInjector::sequence([ + FaultAction::Fail(FaultType::RequestFailed), + FaultAction::Fail(FaultType::RateLimited { retry_after: None }), + FaultAction::Succeed, + ]); + + // First two calls should fail + assert!(matches!( + injector.next_action(), + FaultAction::Fail(FaultType::RequestFailed) + )); + assert!(matches!( + injector.next_action(), + FaultAction::Fail(FaultType::RateLimited { .. }) + )); + // Third call is explicit succeed + assert!(matches!(injector.next_action(), FaultAction::Succeed)); + // Beyond sequence: implicit succeed + assert!(matches!(injector.next_action(), FaultAction::Succeed)); + assert!(matches!(injector.next_action(), FaultAction::Succeed)); + assert_eq!(injector.call_count(), 5); + } + + #[test] + fn sequence_loop_repeats() { + let injector = FaultInjector::sequence_loop([ + FaultAction::Fail(FaultType::RequestFailed), + FaultAction::Succeed, + ]); + + assert!(matches!(injector.next_action(), FaultAction::Fail(_))); + assert!(matches!(injector.next_action(), FaultAction::Succeed)); + assert!(matches!(injector.next_action(), FaultAction::Fail(_))); + assert!(matches!(injector.next_action(), FaultAction::Succeed)); + } + + #[test] + fn random_mode_is_deterministic_with_seed() { + let injector1 = FaultInjector::random(0.5, FaultType::RequestFailed, 42); + let injector2 = FaultInjector::random(0.5, FaultType::RequestFailed, 42); + + let results1: Vec = (0..20) + .map(|_| matches!(injector1.next_action(), FaultAction::Fail(_))) + .collect(); + let results2: Vec = (0..20) + .map(|_| matches!(injector2.next_action(), FaultAction::Fail(_))) + .collect(); + + assert_eq!(results1, results2, "Same seed should produce same sequence"); + } + + #[test] + fn fault_type_produces_correct_llm_errors() { + let provider = "test-provider"; + + assert!(matches!( + FaultType::RequestFailed.to_llm_error(provider), + LlmError::RequestFailed { .. } + )); + assert!(matches!( + FaultType::RateLimited { + retry_after: Some(Duration::from_secs(5)) + } + .to_llm_error(provider), + LlmError::RateLimited { .. } + )); + assert!(matches!( + FaultType::AuthFailed.to_llm_error(provider), + LlmError::AuthFailed { .. } + )); + assert!(matches!( + FaultType::InvalidResponse.to_llm_error(provider), + LlmError::InvalidResponse { .. } + )); + assert!(matches!( + FaultType::IoError.to_llm_error(provider), + LlmError::Io(_) + )); + assert!(matches!( + FaultType::ContextLengthExceeded.to_llm_error(provider), + LlmError::ContextLengthExceeded { .. } + )); + assert!(matches!( + FaultType::SessionExpired.to_llm_error(provider), + LlmError::SessionExpired { .. } + )); + } + + #[test] + fn delay_action_exists() { + let injector = FaultInjector::sequence([FaultAction::Delay(Duration::from_millis(100))]); + assert!(matches!(injector.next_action(), FaultAction::Delay(_))); + } + + #[test] + fn random_seed_zero_does_not_always_fail() { + // seed=0 is a fixed point for xorshift; the constructor guards it to 1. + let injector = FaultInjector::random(0.5, FaultType::RequestFailed, 0); + let failures = (0..100) + .filter(|_| matches!(injector.next_action(), FaultAction::Fail(_))) + .count(); + assert!(failures < 100, "seed=0 must not produce stuck RNG"); + } + + #[test] + fn empty_sequence_always_succeeds() { + let injector = FaultInjector::sequence([]); + for _ in 0..10 { + assert!(matches!(injector.next_action(), FaultAction::Succeed)); + } + } + + #[test] + fn reset_restores_random_rng_from_stored_seed() { + let injector = FaultInjector::random(0.5, FaultType::RequestFailed, 42); + let run1: Vec = (0..20) + .map(|_| matches!(injector.next_action(), FaultAction::Fail(_))) + .collect(); + + injector.reset(); + assert_eq!(injector.call_count(), 0); + + let run2: Vec = (0..20) + .map(|_| matches!(injector.next_action(), FaultAction::Fail(_))) + .collect(); + + assert_eq!(run1, run2, "reset() should reproduce the same sequence"); + } + + #[test] + #[should_panic(expected = "error_rate must be in 0.0..=1.0")] + fn random_rejects_error_rate_above_one() { + FaultInjector::random(1.5, FaultType::RequestFailed, 42); + } + + #[test] + #[should_panic(expected = "error_rate must be in 0.0..=1.0")] + fn random_rejects_negative_error_rate() { + FaultInjector::random(-0.1, FaultType::RequestFailed, 42); + } + + #[test] + #[should_panic(expected = "error_rate must be in 0.0..=1.0 and not NaN")] + fn random_rejects_nan_error_rate() { + FaultInjector::random(f64::NAN, FaultType::RequestFailed, 42); + } + + #[test] + fn error_rate_one_always_fails() { + let injector = FaultInjector::random(1.0, FaultType::RequestFailed, 42); + for _ in 0..100 { + assert!( + matches!(injector.next_action(), FaultAction::Fail(_)), + "error_rate=1.0 must always produce failures" + ); + } + } + + #[test] + fn error_rate_zero_never_fails() { + let injector = FaultInjector::random(0.0, FaultType::RequestFailed, 42); + for _ in 0..100 { + assert!( + matches!(injector.next_action(), FaultAction::Succeed), + "error_rate=0.0 must never produce failures" + ); + } + } + + #[tokio::test] + async fn delay_action_pauses_execution() { + tokio::time::pause(); + let injector = FaultInjector::sequence([ + FaultAction::Delay(Duration::from_secs(10)), + FaultAction::Succeed, + ]); + + // First action is a delay + let action = injector.next_action(); + assert!(matches!(action, FaultAction::Delay(d) if d == Duration::from_secs(10))); + + // Simulate what StubLlm does: sleep then succeed + if let FaultAction::Delay(d) = action { + let start = tokio::time::Instant::now(); + tokio::time::sleep(d).await; + let elapsed = start.elapsed(); + assert!( + elapsed >= Duration::from_secs(10), + "delay should have paused for at least 10s, got {elapsed:?}" + ); + } + + // Next action succeeds + assert!(matches!(injector.next_action(), FaultAction::Succeed)); + } +} diff --git a/src/testing/mod.rs b/src/testing/mod.rs index ff522e3ad2..d55043938f 100644 --- a/src/testing/mod.rs +++ b/src/testing/mod.rs @@ -19,9 +19,11 @@ //! ``` pub mod credentials; +pub mod fault_injection; use std::sync::Arc; use std::sync::Mutex; + use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use async_trait::async_trait; @@ -84,6 +86,9 @@ pub struct StubLlm { call_count: AtomicU32, should_fail: AtomicBool, error_kind: StubErrorKind, + /// Optional fault injector for fine-grained failure control. + /// When set, takes precedence over the `should_fail` / `error_kind` fields. + fault_injector: Option>, } impl StubLlm { @@ -95,6 +100,7 @@ impl StubLlm { call_count: AtomicU32::new(0), should_fail: AtomicBool::new(false), error_kind: StubErrorKind::Transient, + fault_injector: None, } } @@ -106,6 +112,7 @@ impl StubLlm { call_count: AtomicU32::new(0), should_fail: AtomicBool::new(true), error_kind: StubErrorKind::Transient, + fault_injector: None, } } @@ -117,6 +124,7 @@ impl StubLlm { call_count: AtomicU32::new(0), should_fail: AtomicBool::new(true), error_kind: StubErrorKind::NonTransient, + fault_injector: None, } } @@ -131,11 +139,39 @@ impl StubLlm { self.call_count.load(Ordering::Relaxed) } + /// Attach a fault injector for fine-grained failure control. + /// + /// When set, the injector's `next_action()` is consulted on every call, + /// taking precedence over the `should_fail` / `error_kind` fields. + pub fn with_fault_injector(mut self, injector: Arc) -> Self { + self.fault_injector = Some(injector); + self + } + /// Toggle whether calls should fail at runtime. pub fn set_failing(&self, fail: bool) { self.should_fail.store(fail, Ordering::Relaxed); } + /// Check the fault injector or should_fail flag, returning an error if + /// the call should fail, or None if it should succeed. + async fn check_faults(&self) -> Option { + if let Some(ref injector) = self.fault_injector { + match injector.next_action() { + fault_injection::FaultAction::Fail(fault) => { + return Some(fault.to_llm_error(&self.model_name)); + } + fault_injection::FaultAction::Delay(duration) => { + tokio::time::sleep(duration).await; + } + fault_injection::FaultAction::Succeed => {} + } + } else if self.should_fail.load(Ordering::Relaxed) { + return Some(self.make_error()); + } + None + } + fn make_error(&self) -> LlmError { match self.error_kind { StubErrorKind::Transient => LlmError::RequestFailed { @@ -168,8 +204,8 @@ impl LlmProvider for StubLlm { async fn complete(&self, _request: CompletionRequest) -> Result { self.call_count.fetch_add(1, Ordering::Relaxed); - if self.should_fail.load(Ordering::Relaxed) { - return Err(self.make_error()); + if let Some(err) = self.check_faults().await { + return Err(err); } Ok(CompletionResponse { content: self.response.clone(), @@ -186,8 +222,8 @@ impl LlmProvider for StubLlm { _request: ToolCompletionRequest, ) -> Result { self.call_count.fetch_add(1, Ordering::Relaxed); - if self.should_fail.load(Ordering::Relaxed) { - return Err(self.make_error()); + if let Some(err) = self.check_faults().await { + return Err(err); } Ok(ToolCompletionResponse { content: Some(self.response.clone()), @@ -456,6 +492,7 @@ impl TestHarnessBuilder { http_interceptor: None, transcription: None, document_extraction: None, + builder: None, }; TestHarness { @@ -1508,4 +1545,29 @@ mod tests { .await .expect("update actuals"); } + + #[tokio::test] + async fn stub_llm_fault_injector_sequence() { + use crate::llm::LlmProvider; + use crate::testing::fault_injection::{FaultAction, FaultInjector, FaultType}; + + let injector = Arc::new(FaultInjector::sequence([ + FaultAction::Fail(FaultType::RateLimited { retry_after: None }), + FaultAction::Succeed, + ])); + + let stub = StubLlm::new("hello").with_fault_injector(injector); + + let req = crate::llm::CompletionRequest::new(vec![crate::llm::ChatMessage::user("hi")]); + + // First call should fail with RateLimited + let result = stub.complete(req.clone()).await; + assert!(result.is_err()); + assert!(matches!(result.unwrap_err(), LlmError::RateLimited { .. })); + + // Second call should succeed + let result = stub.complete(req).await; + assert!(result.is_ok()); + assert_eq!(result.unwrap().content, "hello"); + } } diff --git a/src/tools/registry.rs b/src/tools/registry.rs index f8110b4657..a68e300b2e 100644 --- a/src/tools/registry.rs +++ b/src/tools/registry.rs @@ -13,7 +13,9 @@ use crate::orchestrator::job_manager::ContainerJobManager; use crate::secrets::SecretsStore; use crate::skills::catalog::SkillCatalog; use crate::skills::registry::SkillRegistry; -use crate::tools::builder::{BuildSoftwareTool, BuilderConfig, LlmSoftwareBuilder}; +use crate::tools::builder::{ + BuildSoftwareTool, BuilderConfig, LlmSoftwareBuilder, SoftwareBuilder, +}; use crate::tools::builtin::{ ApplyPatchTool, CancelJobTool, CreateJobTool, EchoTool, ExtensionInfoTool, HttpTool, JobEventsTool, JobPromptTool, JobStatusTool, JsonTool, ListDirTool, ListJobsTool, @@ -576,22 +578,23 @@ impl ToolRegistry { self: &Arc, llm: Arc, config: Option, - ) { + ) -> Arc { // First register dev tools needed by the builder self.register_dev_tools(); // Create the builder (arg order: config, llm, tools) - let builder = Arc::new(LlmSoftwareBuilder::new( + let builder: Arc = Arc::new(LlmSoftwareBuilder::new( config.unwrap_or_default(), llm, Arc::clone(self), )); // Register the build_software tool - self.register(Arc::new(BuildSoftwareTool::new(builder))) + self.register(Arc::new(BuildSoftwareTool::new(Arc::clone(&builder)))) .await; - tracing::debug!("Registered software builder tool"); + tracing::info!("Registered software builder tool"); + builder } /// Register a WASM tool from bytes. diff --git a/tests/e2e/scenarios/test_telegram_hot_activation.py b/tests/e2e/scenarios/test_telegram_hot_activation.py index e6fa598aac..261b837eb9 100644 --- a/tests/e2e/scenarios/test_telegram_hot_activation.py +++ b/tests/e2e/scenarios/test_telegram_hot_activation.py @@ -33,17 +33,28 @@ } -async def go_to_extensions(page): - await page.locator(SEL["tab_button"].format(tab="extensions")).click() - await page.locator(SEL["tab_panel"].format(tab="extensions")).wait_for( +async def go_to_channels(page): + """Navigate to Settings → Channels subtab (where wasm_channel extensions live).""" + await page.locator(SEL["tab_button"].format(tab="settings")).click() + await page.locator(SEL["settings_subtab"].format(subtab="channels")).click() + await page.locator(SEL["settings_subpanel"].format(subtab="channels")).wait_for( state="visible", timeout=5000 ) - await page.locator( - f"{SEL['extensions_list']} .empty-state, {SEL['ext_card_installed']}" - ).first.wait_for(state="visible", timeout=8000) + # Wait for the Telegram card specifically (built-in cards render first) + await page.locator(SEL["channels_ext_card"], has_text="Telegram").wait_for( + state="visible", timeout=8000 + ) + +async def _default_gateway_status_handler(route): + await route.fulfill( + status=200, + content_type="application/json", + body=json.dumps({"enabled_channels": [], "sse_connections": 0, "ws_connections": 0}), + ) -async def mock_extension_lists(page, ext_handler): + +async def mock_extension_lists(page, ext_handler, *, gateway_status_handler=None): async def handle_ext_list(route): path = route.request.url.split("?")[0] if path.endswith("/api/extensions"): @@ -69,6 +80,10 @@ async def handle_registry(route): await page.route("**/api/extensions*", handle_ext_list) await page.route("**/api/extensions/tools", handle_tools) await page.route("**/api/extensions/registry", handle_registry) + await page.route( + "**/api/gateway/status", + gateway_status_handler or _default_gateway_status_handler, + ) async def wait_for_toast(page, text: str, *, timeout: int = 5000): @@ -106,9 +121,9 @@ async def handle_setup(route): await mock_extension_lists(page, handle_ext_list) await page.route("**/api/extensions/telegram/setup", handle_setup) - await go_to_extensions(page) + await go_to_channels(page) - card = page.locator(SEL["ext_card_installed"]).first + card = page.locator(SEL["channels_ext_card"], has_text="Telegram") await card.locator(SEL["ext_configure_btn"], has_text="Setup").click() modal = page.locator(SEL["configure_modal"]) @@ -198,9 +213,9 @@ async def handle_setup(route): await mock_extension_lists(page, handle_ext_list) await page.route("**/api/extensions/telegram/setup", handle_setup) - await go_to_extensions(page) + await go_to_channels(page) - card = page.locator(SEL["ext_card_installed"]).first + card = page.locator(SEL["channels_ext_card"], has_text="Telegram") await card.locator(SEL["ext_configure_btn"], has_text="Setup").click() modal = page.locator(SEL["configure_modal"]) diff --git a/tests/e2e_telegram_message_routing.rs b/tests/e2e_telegram_message_routing.rs index cad2387ca0..a96aabe4c2 100644 --- a/tests/e2e_telegram_message_routing.rs +++ b/tests/e2e_telegram_message_routing.rs @@ -198,6 +198,7 @@ mod tests { http_interceptor: None, transcription: None, document_extraction: None, + builder: None, }; let gateway = Arc::new(TestChannel::new()); diff --git a/tests/support/gateway_workflow_harness.rs b/tests/support/gateway_workflow_harness.rs index 13a8a54cfe..c2db4427e3 100644 --- a/tests/support/gateway_workflow_harness.rs +++ b/tests/support/gateway_workflow_harness.rs @@ -257,6 +257,7 @@ impl GatewayWorkflowHarness { http_interceptor: None, transcription: None, document_extraction: None, + builder: None, }, channels, None, diff --git a/tests/support/test_rig.rs b/tests/support/test_rig.rs index 8d41a26119..e6c4a6e2b5 100644 --- a/tests/support/test_rig.rs +++ b/tests/support/test_rig.rs @@ -642,6 +642,7 @@ impl TestRigBuilder { }, transcription: None, document_extraction: None, + builder: None, }; // 7. Create TestChannel and ChannelManager.