diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index 4601f9d244d..16b0d819c29 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -206,7 +206,7 @@ impl Session { async fn close_unified_exec_sessions(&self) { self.services .unified_exec_manager - .terminate_all_sessions() + .terminate_turn_sessions() .await; } diff --git a/codex-rs/core/src/tools/handlers/unified_exec.rs b/codex-rs/core/src/tools/handlers/unified_exec.rs index 00418d8a65e..8bc034b9c3a 100644 --- a/codex-rs/core/src/tools/handlers/unified_exec.rs +++ b/codex-rs/core/src/tools/handlers/unified_exec.rs @@ -36,6 +36,8 @@ struct ExecCommandArgs { shell: Option, #[serde(default = "default_login")] login: bool, + #[serde(default)] + long_running: bool, #[serde(default = "default_exec_yield_time_ms")] yield_time_ms: u64, #[serde(default)] @@ -135,6 +137,7 @@ impl ToolHandler for UnifiedExecHandler { max_output_tokens, sandbox_permissions, justification, + long_running, .. } = args; @@ -196,6 +199,7 @@ impl ToolHandler for UnifiedExecHandler { workdir, sandbox_permissions, justification, + long_running, }, &context, ) diff --git a/codex-rs/core/src/tools/spec.rs b/codex-rs/core/src/tools/spec.rs index 0ac91755c22..8d9e5d60f10 100644 --- a/codex-rs/core/src/tools/spec.rs +++ b/codex-rs/core/src/tools/spec.rs @@ -196,6 +196,15 @@ fn create_exec_command_tool() -> ToolSpec { ), }, ); + properties.insert( + "long_running".to_string(), + JsonSchema::Boolean { + description: Some( + "Whether to keep the process running across turns. Max 8 long-running processes; long-running processes do not stream output deltas or end events." + .to_string(), + ), + }, + ); ToolSpec::Function(ResponsesApiTool { name: "exec_command".to_string(), @@ -1493,6 +1502,28 @@ mod tests { assert_contains_tool_names(&tools, &subset); } + #[test] + fn test_exec_command_schema_includes_long_running() { + let config = test_config(); + let model_family = ModelsManager::construct_model_family_offline("o3", &config); + let mut features = Features::with_defaults(); + features.enable(Feature::UnifiedExec); + let tools_config = ToolsConfig::new(&ToolsConfigParams { + model_family: &model_family, + features: &features, + }); + let (tools, _) = build_specs(&tools_config, None).build(); + + let tool = find_tool(&tools, "exec_command"); + let ToolSpec::Function(tool) = &tool.spec else { + panic!("expected exec_command to be a function tool"); + }; + let JsonSchema::Object { properties, .. } = &tool.parameters else { + panic!("expected exec_command schema parameters to be an object"); + }; + assert!(properties.contains_key("long_running")); + } + #[test] #[ignore] fn test_parallel_support_flags() { diff --git a/codex-rs/core/src/unified_exec/errors.rs b/codex-rs/core/src/unified_exec/errors.rs index 02031f22fab..7380486ddbd 100644 --- a/codex-rs/core/src/unified_exec/errors.rs +++ b/codex-rs/core/src/unified_exec/errors.rs @@ -8,6 +8,10 @@ pub(crate) enum UnifiedExecError { // Called "session" in the model's training. #[error("Unknown session id {process_id}")] UnknownSessionId { process_id: String }, + #[error( + "Too many long-running unified exec sessions (max {max}). Close an existing one before starting another." + )] + TooManyLongRunningSessions { max: usize }, #[error("failed to write to stdin")] WriteToStdin, #[error("missing command line for unified exec request")] diff --git a/codex-rs/core/src/unified_exec/mod.rs b/codex-rs/core/src/unified_exec/mod.rs index 2cb30e5aa39..c069bccb909 100644 --- a/codex-rs/core/src/unified_exec/mod.rs +++ b/codex-rs/core/src/unified_exec/mod.rs @@ -49,6 +49,7 @@ pub(crate) const DEFAULT_MAX_OUTPUT_TOKENS: usize = 10_000; pub(crate) const UNIFIED_EXEC_OUTPUT_MAX_BYTES: usize = 1024 * 1024; // 1 MiB pub(crate) const UNIFIED_EXEC_OUTPUT_MAX_TOKENS: usize = UNIFIED_EXEC_OUTPUT_MAX_BYTES / 4; pub(crate) const MAX_UNIFIED_EXEC_SESSIONS: usize = 64; +pub(crate) const MAX_LONG_RUNNING_UNIFIED_EXEC_SESSIONS: usize = 8; // Send a warning message to the models when it reaches this number of sessions. pub(crate) const WARNING_UNIFIED_EXEC_SESSIONS: usize = 60; @@ -96,6 +97,7 @@ pub(crate) struct ExecCommandRequest { pub workdir: Option, pub sandbox_permissions: SandboxPermissions, pub justification: Option, + pub long_running: bool, } #[derive(Debug)] @@ -153,6 +155,7 @@ struct SessionEntry { process_id: String, command: Vec, last_used: tokio::time::Instant, + long_running: bool, } pub(crate) fn clamp_yield_time(yield_time_ms: u64) -> u64 { @@ -220,6 +223,7 @@ mod tests { workdir: None, sandbox_permissions: SandboxPermissions::UseDefault, justification: None, + long_running: false, }, &context, ) diff --git a/codex-rs/core/src/unified_exec/session_manager.rs b/codex-rs/core/src/unified_exec/session_manager.rs index c97820e4643..22ff272b1d1 100644 --- a/codex-rs/core/src/unified_exec/session_manager.rs +++ b/codex-rs/core/src/unified_exec/session_manager.rs @@ -28,6 +28,7 @@ use crate::truncate::formatted_truncate_text; use super::CommandTranscript; use super::ExecCommandRequest; +use super::MAX_LONG_RUNNING_UNIFIED_EXEC_SESSIONS; use super::MAX_UNIFIED_EXEC_SESSIONS; use super::SessionEntry; use super::SessionStore; @@ -74,6 +75,7 @@ struct PreparedSessionHandles { turn_ref: Arc, command: Vec, process_id: String, + long_running: bool, } impl UnifiedExecSessionManager { @@ -140,7 +142,9 @@ impl UnifiedExecSessionManager { }; let transcript = Arc::new(tokio::sync::Mutex::new(CommandTranscript::default())); - start_streaming_output(&session, context, Arc::clone(&transcript)); + if !request.long_running { + start_streaming_output(&session, context, Arc::clone(&transcript)); + } let max_tokens = resolve_max_tokens(request.max_output_tokens); let yield_time_ms = clamp_yield_time(request.yield_time_ms); @@ -175,23 +179,35 @@ impl UnifiedExecSessionManager { // same helper as the background watcher, so all end events share // one implementation. let exit = exit_code.unwrap_or(-1); - emit_exec_end_for_unified_exec( - Arc::clone(&context.session), - Arc::clone(&context.turn), - context.call_id.clone(), - request.command.clone(), - cwd, - Some(process_id), - Arc::clone(&transcript), - output.clone(), - exit, - wall_time, - ) - .await; + if !request.long_running { + emit_exec_end_for_unified_exec( + Arc::clone(&context.session), + Arc::clone(&context.turn), + context.call_id.clone(), + request.command.clone(), + cwd, + Some(process_id), + Arc::clone(&transcript), + output.clone(), + exit, + wall_time, + ) + .await; + } self.release_process_id(&request.process_id).await; session.check_for_sandbox_denial_with_text(&text).await?; } else { + if request.long_running + && self.active_long_running_session_count().await + >= MAX_LONG_RUNNING_UNIFIED_EXEC_SESSIONS + { + session.terminate(); + self.release_process_id(&request.process_id).await; + return Err(UnifiedExecError::TooManyLongRunningSessions { + max: MAX_LONG_RUNNING_UNIFIED_EXEC_SESSIONS, + }); + } // Long‑lived command: persist the session so write_stdin can reuse // it, and register a background watcher that will emit // ExecCommandEnd when the PTY eventually exits (even if no further @@ -204,6 +220,7 @@ impl UnifiedExecSessionManager { start, process_id, Arc::clone(&transcript), + request.long_running, ) .await; @@ -245,6 +262,7 @@ impl UnifiedExecSessionManager { turn_ref, command: session_command, process_id, + long_running, .. } = self.prepare_session_handles(process_id.as_str()).await?; @@ -307,7 +325,7 @@ impl UnifiedExecSessionManager { session_command: Some(session_command.clone()), }; - if response.process_id.is_some() { + if response.process_id.is_some() && !long_running { Self::emit_waiting_status(&session_ref, &turn_ref, &session_command).await; } @@ -368,6 +386,7 @@ impl UnifiedExecSessionManager { turn_ref: Arc::clone(&entry.turn_ref), command: entry.command.clone(), process_id: entry.process_id.clone(), + long_running: entry.long_running, }) } @@ -391,6 +410,7 @@ impl UnifiedExecSessionManager { started_at: Instant, process_id: String, transcript: Arc>, + long_running: bool, ) { let entry = SessionEntry { session: Arc::clone(&session), @@ -400,6 +420,7 @@ impl UnifiedExecSessionManager { process_id: process_id.clone(), command: command.to_vec(), last_used: started_at, + long_running, }; let number_sessions = { let mut store = self.session_store.lock().await; @@ -418,17 +439,19 @@ impl UnifiedExecSessionManager { .await; }; - spawn_exit_watcher( - Arc::clone(&session), - Arc::clone(&context.session), - Arc::clone(&context.turn), - context.call_id.clone(), - command.to_vec(), - cwd, - process_id, - transcript, - started_at, - ); + if !long_running { + spawn_exit_watcher( + Arc::clone(&session), + Arc::clone(&context.session), + Arc::clone(&context.turn), + context.call_id.clone(), + command.to_vec(), + cwd, + process_id, + transcript, + started_at, + ); + } } async fn emit_waiting_status( @@ -583,13 +606,19 @@ impl UnifiedExecSessionManager { } fn prune_sessions_if_needed(store: &mut SessionStore) -> bool { - if store.sessions.len() < MAX_UNIFIED_EXEC_SESSIONS { + let non_long_running = store + .sessions + .values() + .filter(|entry| !entry.long_running) + .count(); + if non_long_running < MAX_UNIFIED_EXEC_SESSIONS { return false; } let meta: Vec<(String, Instant, bool)> = store .sessions .iter() + .filter(|(_, entry)| !entry.long_running) .map(|(id, entry)| (id.clone(), entry.last_used, entry.session.has_exited())) .collect(); @@ -645,6 +674,38 @@ impl UnifiedExecSessionManager { entry.session.terminate(); } } + + pub(crate) async fn terminate_turn_sessions(&self) { + let entries: Vec = { + let mut sessions = self.session_store.lock().await; + let mut keep = HashMap::new(); + let mut to_terminate = Vec::new(); + let drained: Vec<(String, SessionEntry)> = sessions.sessions.drain().collect(); + for (process_id, entry) in drained { + if entry.long_running { + keep.insert(process_id, entry); + } else { + sessions.reserved_sessions_id.remove(&process_id); + to_terminate.push(entry); + } + } + sessions.sessions = keep; + to_terminate + }; + + for entry in entries { + entry.session.terminate(); + } + } + + async fn active_long_running_session_count(&self) -> usize { + let store = self.session_store.lock().await; + store + .sessions + .values() + .filter(|entry| entry.long_running && !entry.session.has_exited()) + .count() + } } enum SessionStatus {