Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ DATABASE_POOL_SIZE=10

# LLM Provider
# LLM_BACKEND=nearai # default
# Possible values: nearai, ollama, openai_compatible, openai, anthropic, tinfoil
# Possible values: nearai, ollama, openai_compatible, openai, anthropic, tinfoil, openai_codex
# LLM_REQUEST_TIMEOUT_SECS=120 # Increase for local LLMs (Ollama, vLLM, LM Studio)

# === Anthropic Direct ===
Expand Down Expand Up @@ -92,6 +92,13 @@ NEARAI_AUTH_URL=https://private.near.ai
# long = 1-hour TTL, 2.0× (200%) write surcharge
# ANTHROPIC_CACHE_RETENTION=short

# === OpenAI Codex (ChatGPT subscription, OAuth) ===
# LLM_BACKEND=openai_codex
# OPENAI_CODEX_MODEL=gpt-5.3-codex # default
# OPENAI_CODEX_CLIENT_ID=app_EMoamEEZ73f0CkXaXp7hrann # override (rare)
# OPENAI_CODEX_AUTH_URL=https://auth.openai.com # override (rare)
# OPENAI_CODEX_API_URL=https://chatgpt.com/backend-api/codex # override (rare)

# For full provider setup guide see docs/LLM_PROVIDERS.md

# Channel Configuration
Expand Down
227 changes: 227 additions & 0 deletions src/agent/job_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
//! Agent Loop
//! ```

use std::sync::Arc;

use tokio::sync::{broadcast, mpsc};
use tokio::task::JoinHandle;
use uuid::Uuid;

use crate::channels::IncomingMessage;
use crate::channels::web::types::SseEvent;
use crate::context::{ContextManager, JobState};

/// Route context for forwarding job monitor events back to the user's channel.
#[derive(Debug, Clone)]
Expand All @@ -40,10 +43,23 @@ pub struct JobMonitorRoute {
/// Tool use/result and status events are intentionally skipped (too noisy for
/// the main agent's context window).
pub fn spawn_job_monitor(
job_id: Uuid,
event_rx: broadcast::Receiver<(Uuid, SseEvent)>,
inject_tx: mpsc::Sender<IncomingMessage>,
route: JobMonitorRoute,
) -> JoinHandle<()> {
spawn_job_monitor_with_context(job_id, event_rx, inject_tx, route, None)
}

/// Like `spawn_job_monitor`, but also transitions the job's in-memory state
/// when it receives a `JobResult` event. This ensures fire-and-forget sandbox
/// jobs don't stay `InProgress` forever in the `ContextManager`.
pub fn spawn_job_monitor_with_context(
job_id: Uuid,
mut event_rx: broadcast::Receiver<(Uuid, SseEvent)>,
inject_tx: mpsc::Sender<IncomingMessage>,
route: JobMonitorRoute,
context_manager: Option<Arc<ContextManager>>,
) -> JoinHandle<()> {
let short_id = job_id.to_string()[..8].to_string();

Expand Down Expand Up @@ -77,6 +93,26 @@ pub fn spawn_job_monitor(
}
}
SseEvent::JobResult { status, .. } => {
// Transition in-memory state so the job frees its
// max_jobs slot and query tools show the final state.
if let Some(ref cm) = context_manager {
let target = if status == "completed" {
JobState::Completed
} else {
JobState::Failed
};
let reason = if status != "completed" {
Some(format!("Container finished: {}", status))
} else {
None
};
let _ = cm
.update_context(job_id, |ctx| {
let _ = ctx.transition_to(target, reason);
})
.await;
}

let mut msg = IncomingMessage::new(
route.channel.clone(),
route.user_id.clone(),
Expand Down Expand Up @@ -121,6 +157,62 @@ pub fn spawn_job_monitor(
})
}

/// Lightweight watcher that only transitions ContextManager state on job
/// completion. Used when monitor routing metadata is absent (no channel to
/// inject messages into) but we still need to free the `max_jobs` slot.
pub fn spawn_completion_watcher(
job_id: Uuid,
mut event_rx: broadcast::Receiver<(Uuid, SseEvent)>,
context_manager: Arc<ContextManager>,
) -> JoinHandle<()> {
let short_id = job_id.to_string()[..8].to_string();

tokio::spawn(async move {
loop {
match event_rx.recv().await {
Ok((ev_job_id, SseEvent::JobResult { status, .. })) if ev_job_id == job_id => {
let target = if status == "completed" {
JobState::Completed
} else {
JobState::Failed
};
let reason = if status != "completed" {
Some(format!("Container finished: {}", status))
} else {
None
};
let _ = context_manager
.update_context(job_id, |ctx| {
let _ = ctx.transition_to(target, reason);
})
.await;
tracing::debug!(
job_id = %short_id,
status = %status,
"Completion watcher exiting (job finished)"
);
break;
}
Ok(_) => {}
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(
job_id = %short_id,
skipped = n,
"Completion watcher lagged"
);
}
Err(broadcast::error::RecvError::Closed) => {
tracing::debug!(
job_id = %short_id,
"Broadcast channel closed, stopping completion watcher"
);
break;
}
}
}
})
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -294,4 +386,139 @@ mod tests {
let msg = IncomingMessage::new("monitor", "system", "test").into_internal();
assert!(msg.is_internal);
}

// === Regression: fire-and-forget sandbox jobs must transition out of InProgress ===
// Before this fix, spawn_job_monitor only forwarded SSE messages but never
// updated ContextManager. Background sandbox jobs stayed InProgress forever,
// permanently consuming a max_jobs slot.

#[tokio::test]
async fn test_monitor_transitions_context_on_completion() {
use crate::context::{ContextManager, JobState};

let cm = Arc::new(ContextManager::new(5));
let job_id = Uuid::new_v4();
cm.register_sandbox_job(job_id, "user-1", "Build app", "desc")
.await
.unwrap();

let (event_tx, _) = broadcast::channel::<(Uuid, SseEvent)>(16);
let (inject_tx, mut inject_rx) = mpsc::channel::<IncomingMessage>(16);

let handle = spawn_job_monitor_with_context(
job_id,
event_tx.subscribe(),
inject_tx,
test_route(),
Some(Arc::clone(&cm)),
);

// Send completion event
event_tx
.send((
job_id,
SseEvent::JobResult {
job_id: job_id.to_string(),
status: "completed".to_string(),
session_id: None,
fallback_deliverable: None,
},
))
.unwrap();

// Drain the injected message
let _ = tokio::time::timeout(std::time::Duration::from_secs(1), inject_rx.recv()).await;

// Wait for monitor to exit
tokio::time::timeout(std::time::Duration::from_secs(1), handle)
.await
.expect("monitor should exit")
.expect("monitor should not panic");

// Job should now be Completed, not InProgress
let ctx = cm.get_context(job_id).await.unwrap();
assert_eq!(ctx.state, JobState::Completed);
}

#[tokio::test]
async fn test_monitor_transitions_context_on_failure() {
use crate::context::{ContextManager, JobState};

let cm = Arc::new(ContextManager::new(5));
let job_id = Uuid::new_v4();
cm.register_sandbox_job(job_id, "user-1", "Build app", "desc")
.await
.unwrap();

let (event_tx, _) = broadcast::channel::<(Uuid, SseEvent)>(16);
let (inject_tx, mut inject_rx) = mpsc::channel::<IncomingMessage>(16);

let handle = spawn_job_monitor_with_context(
job_id,
event_tx.subscribe(),
inject_tx,
test_route(),
Some(Arc::clone(&cm)),
);

// Send failure event
event_tx
.send((
job_id,
SseEvent::JobResult {
job_id: job_id.to_string(),
status: "failed".to_string(),
session_id: None,
fallback_deliverable: None,
},
))
.unwrap();

let _ = tokio::time::timeout(std::time::Duration::from_secs(1), inject_rx.recv()).await;
tokio::time::timeout(std::time::Duration::from_secs(1), handle)
.await
.expect("monitor should exit")
.expect("monitor should not panic");

let ctx = cm.get_context(job_id).await.unwrap();
assert_eq!(ctx.state, JobState::Failed);
}

// === Regression: completion watcher (no route metadata) ===
// When monitor_route_from_ctx() returns None, spawn_completion_watcher
// must still transition the job so the max_jobs slot is freed.

#[tokio::test]
async fn test_completion_watcher_transitions_on_result() {
use crate::context::{ContextManager, JobState};

let cm = Arc::new(ContextManager::new(5));
let job_id = Uuid::new_v4();
cm.register_sandbox_job(job_id, "user-1", "Build app", "desc")
.await
.unwrap();

let (event_tx, _) = broadcast::channel::<(Uuid, SseEvent)>(16);
let handle = spawn_completion_watcher(job_id, event_tx.subscribe(), Arc::clone(&cm));

event_tx
.send((
job_id,
SseEvent::JobResult {
job_id: job_id.to_string(),
status: "completed".to_string(),
session_id: None,
fallback_deliverable: None,
},
))
.unwrap();

tokio::time::timeout(std::time::Duration::from_secs(1), handle)
.await
.expect("watcher should exit")
.expect("watcher should not panic");

let ctx = cm.get_context(job_id).await.unwrap();
assert_eq!(ctx.state, JobState::Completed);
}
}
6 changes: 5 additions & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,11 @@ impl AppBuilder {
// Post-init validation: if a non-nearai backend was selected but
// credentials were never resolved (deferred resolution found no keys),
// fail early with a clear error instead of a confusing runtime failure.
if self.config.llm.backend != "nearai" && self.config.llm.provider.is_none() {
if self.config.llm.backend != "nearai"
&& self.config.llm.backend != "bedrock"
&& self.config.llm.backend != "openai_codex"
&& self.config.llm.provider.is_none()
{
let backend = &self.config.llm.backend;
anyhow::bail!(
"LLM_BACKEND={backend} is configured but no credentials were found. \
Expand Down
11 changes: 11 additions & 0 deletions src/channels/wasm/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3314,6 +3314,7 @@ mod tests {
use std::sync::Arc;

use crate::channels::Channel;
use crate::channels::OutgoingResponse;
use crate::channels::wasm::capabilities::ChannelCapabilities;
use crate::channels::wasm::runtime::{
PreparedChannelModule, WasmChannelRuntime, WasmChannelRuntimeConfig,
Expand Down Expand Up @@ -3401,6 +3402,16 @@ mod tests {
assert!(channel.health_check().await.is_err());
}

#[tokio::test]
async fn test_broadcast_delegates_to_call_on_broadcast() {
let channel = create_test_channel();
// With `component: None`, call_on_broadcast short-circuits to Ok(()).
let result = channel
.broadcast("146032821", OutgoingResponse::text("hello"))
.await;
assert!(result.is_ok());
}

#[tokio::test]
async fn test_execute_poll_no_wasm_returns_empty() {
// When there's no WASM module (None component), execute_poll
Expand Down
11 changes: 11 additions & 0 deletions src/channels/web/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ pub async fn start_server(
.route("/", get(index_handler))
.route("/style.css", get(css_handler))
.route("/app.js", get(js_handler))
.route("/theme-init.js", get(theme_init_handler))
.route("/favicon.ico", get(favicon_handler))
.route("/i18n/index.js", get(i18n_index_handler))
.route("/i18n/en.js", get(i18n_en_handler))
Expand Down Expand Up @@ -465,6 +466,16 @@ async fn js_handler() -> impl IntoResponse {
)
}

async fn theme_init_handler() -> impl IntoResponse {
(
[
(header::CONTENT_TYPE, "application/javascript"),
(header::CACHE_CONTROL, "no-cache"),
],
include_str!("static/theme-init.js"),
)
}

async fn favicon_handler() -> impl IntoResponse {
(
[
Expand Down
Loading
Loading