Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion codex-rs/core/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl Session {
async fn close_unified_exec_sessions(&self) {
self.services
.unified_exec_manager
.terminate_all_sessions()
.terminate_turn_sessions()
.await;
}

Expand Down
4 changes: 4 additions & 0 deletions codex-rs/core/src/tools/handlers/unified_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ struct ExecCommandArgs {
shell: Option<String>,
#[serde(default = "default_login")]
login: bool,
#[serde(default)]
long_running: bool,
#[serde(default = "default_exec_yield_time_ms")]
yield_time_ms: u64,
#[serde(default)]
Expand Down Expand Up @@ -135,6 +137,7 @@ impl ToolHandler for UnifiedExecHandler {
max_output_tokens,
sandbox_permissions,
justification,
long_running,
..
} = args;

Expand Down Expand Up @@ -196,6 +199,7 @@ impl ToolHandler for UnifiedExecHandler {
workdir,
sandbox_permissions,
justification,
long_running,
},
&context,
)
Expand Down
31 changes: 31 additions & 0 deletions codex-rs/core/src/tools/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,15 @@ fn create_exec_command_tool() -> ToolSpec {
),
},
);
properties.insert(
"long_running".to_string(),
JsonSchema::Boolean {
description: Some(
"Whether to keep the process running across turns. Max 8 long-running processes; long-running processes do not stream output deltas or end events."
.to_string(),
),
},
);

ToolSpec::Function(ResponsesApiTool {
name: "exec_command".to_string(),
Expand Down Expand Up @@ -1493,6 +1502,28 @@ mod tests {
assert_contains_tool_names(&tools, &subset);
}

#[test]
fn test_exec_command_schema_includes_long_running() {
let config = test_config();
let model_family = ModelsManager::construct_model_family_offline("o3", &config);
let mut features = Features::with_defaults();
features.enable(Feature::UnifiedExec);
let tools_config = ToolsConfig::new(&ToolsConfigParams {
model_family: &model_family,
features: &features,
});
let (tools, _) = build_specs(&tools_config, None).build();

let tool = find_tool(&tools, "exec_command");
let ToolSpec::Function(tool) = &tool.spec else {
panic!("expected exec_command to be a function tool");
};
let JsonSchema::Object { properties, .. } = &tool.parameters else {
panic!("expected exec_command schema parameters to be an object");
};
assert!(properties.contains_key("long_running"));
}

#[test]
#[ignore]
fn test_parallel_support_flags() {
Expand Down
4 changes: 4 additions & 0 deletions codex-rs/core/src/unified_exec/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ pub(crate) enum UnifiedExecError {
// Called "session" in the model's training.
#[error("Unknown session id {process_id}")]
UnknownSessionId { process_id: String },
#[error(
"Too many long-running unified exec sessions (max {max}). Close an existing one before starting another."
)]
TooManyLongRunningSessions { max: usize },
#[error("failed to write to stdin")]
WriteToStdin,
#[error("missing command line for unified exec request")]
Expand Down
4 changes: 4 additions & 0 deletions codex-rs/core/src/unified_exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub(crate) const DEFAULT_MAX_OUTPUT_TOKENS: usize = 10_000;
pub(crate) const UNIFIED_EXEC_OUTPUT_MAX_BYTES: usize = 1024 * 1024; // 1 MiB
pub(crate) const UNIFIED_EXEC_OUTPUT_MAX_TOKENS: usize = UNIFIED_EXEC_OUTPUT_MAX_BYTES / 4;
pub(crate) const MAX_UNIFIED_EXEC_SESSIONS: usize = 64;
pub(crate) const MAX_LONG_RUNNING_UNIFIED_EXEC_SESSIONS: usize = 8;

// Send a warning message to the models when it reaches this number of sessions.
pub(crate) const WARNING_UNIFIED_EXEC_SESSIONS: usize = 60;
Expand Down Expand Up @@ -96,6 +97,7 @@ pub(crate) struct ExecCommandRequest {
pub workdir: Option<PathBuf>,
pub sandbox_permissions: SandboxPermissions,
pub justification: Option<String>,
pub long_running: bool,
}

#[derive(Debug)]
Expand Down Expand Up @@ -153,6 +155,7 @@ struct SessionEntry {
process_id: String,
command: Vec<String>,
last_used: tokio::time::Instant,
long_running: bool,
}

pub(crate) fn clamp_yield_time(yield_time_ms: u64) -> u64 {
Expand Down Expand Up @@ -220,6 +223,7 @@ mod tests {
workdir: None,
sandbox_permissions: SandboxPermissions::UseDefault,
justification: None,
long_running: false,
},
&context,
)
Expand Down
115 changes: 88 additions & 27 deletions codex-rs/core/src/unified_exec/session_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::truncate::formatted_truncate_text;

use super::CommandTranscript;
use super::ExecCommandRequest;
use super::MAX_LONG_RUNNING_UNIFIED_EXEC_SESSIONS;
use super::MAX_UNIFIED_EXEC_SESSIONS;
use super::SessionEntry;
use super::SessionStore;
Expand Down Expand Up @@ -74,6 +75,7 @@ struct PreparedSessionHandles {
turn_ref: Arc<TurnContext>,
command: Vec<String>,
process_id: String,
long_running: bool,
}

impl UnifiedExecSessionManager {
Expand Down Expand Up @@ -140,7 +142,9 @@ impl UnifiedExecSessionManager {
};

let transcript = Arc::new(tokio::sync::Mutex::new(CommandTranscript::default()));
start_streaming_output(&session, context, Arc::clone(&transcript));
if !request.long_running {
start_streaming_output(&session, context, Arc::clone(&transcript));
}

let max_tokens = resolve_max_tokens(request.max_output_tokens);
let yield_time_ms = clamp_yield_time(request.yield_time_ms);
Expand Down Expand Up @@ -175,23 +179,35 @@ impl UnifiedExecSessionManager {
// same helper as the background watcher, so all end events share
// one implementation.
let exit = exit_code.unwrap_or(-1);
emit_exec_end_for_unified_exec(
Arc::clone(&context.session),
Arc::clone(&context.turn),
context.call_id.clone(),
request.command.clone(),
cwd,
Some(process_id),
Arc::clone(&transcript),
output.clone(),
exit,
wall_time,
)
.await;
if !request.long_running {
emit_exec_end_for_unified_exec(
Arc::clone(&context.session),
Arc::clone(&context.turn),
context.call_id.clone(),
request.command.clone(),
cwd,
Some(process_id),
Arc::clone(&transcript),
output.clone(),
exit,
wall_time,
)
.await;
}

self.release_process_id(&request.process_id).await;
session.check_for_sandbox_denial_with_text(&text).await?;
} else {
if request.long_running
&& self.active_long_running_session_count().await
>= MAX_LONG_RUNNING_UNIFIED_EXEC_SESSIONS
{
session.terminate();
self.release_process_id(&request.process_id).await;
return Err(UnifiedExecError::TooManyLongRunningSessions {
max: MAX_LONG_RUNNING_UNIFIED_EXEC_SESSIONS,
});
}
// Long‑lived command: persist the session so write_stdin can reuse
// it, and register a background watcher that will emit
// ExecCommandEnd when the PTY eventually exits (even if no further
Expand All @@ -204,6 +220,7 @@ impl UnifiedExecSessionManager {
start,
process_id,
Arc::clone(&transcript),
request.long_running,
)
.await;

Expand Down Expand Up @@ -245,6 +262,7 @@ impl UnifiedExecSessionManager {
turn_ref,
command: session_command,
process_id,
long_running,
..
} = self.prepare_session_handles(process_id.as_str()).await?;

Expand Down Expand Up @@ -307,7 +325,7 @@ impl UnifiedExecSessionManager {
session_command: Some(session_command.clone()),
};

if response.process_id.is_some() {
if response.process_id.is_some() && !long_running {
Self::emit_waiting_status(&session_ref, &turn_ref, &session_command).await;
}

Expand Down Expand Up @@ -368,6 +386,7 @@ impl UnifiedExecSessionManager {
turn_ref: Arc::clone(&entry.turn_ref),
command: entry.command.clone(),
process_id: entry.process_id.clone(),
long_running: entry.long_running,
})
}

Expand All @@ -391,6 +410,7 @@ impl UnifiedExecSessionManager {
started_at: Instant,
process_id: String,
transcript: Arc<tokio::sync::Mutex<CommandTranscript>>,
long_running: bool,
) {
let entry = SessionEntry {
session: Arc::clone(&session),
Expand All @@ -400,6 +420,7 @@ impl UnifiedExecSessionManager {
process_id: process_id.clone(),
command: command.to_vec(),
last_used: started_at,
long_running,
};
let number_sessions = {
let mut store = self.session_store.lock().await;
Expand All @@ -418,17 +439,19 @@ impl UnifiedExecSessionManager {
.await;
};

spawn_exit_watcher(
Arc::clone(&session),
Arc::clone(&context.session),
Arc::clone(&context.turn),
context.call_id.clone(),
command.to_vec(),
cwd,
process_id,
transcript,
started_at,
);
if !long_running {
spawn_exit_watcher(
Arc::clone(&session),
Arc::clone(&context.session),
Arc::clone(&context.turn),
context.call_id.clone(),
command.to_vec(),
cwd,
process_id,
transcript,
started_at,
);
}
}

async fn emit_waiting_status(
Expand Down Expand Up @@ -583,13 +606,19 @@ impl UnifiedExecSessionManager {
}

fn prune_sessions_if_needed(store: &mut SessionStore) -> bool {
if store.sessions.len() < MAX_UNIFIED_EXEC_SESSIONS {
let non_long_running = store
.sessions
.values()
.filter(|entry| !entry.long_running)
.count();
if non_long_running < MAX_UNIFIED_EXEC_SESSIONS {
return false;
}

let meta: Vec<(String, Instant, bool)> = store
.sessions
.iter()
.filter(|(_, entry)| !entry.long_running)
.map(|(id, entry)| (id.clone(), entry.last_used, entry.session.has_exited()))
.collect();

Expand Down Expand Up @@ -645,6 +674,38 @@ impl UnifiedExecSessionManager {
entry.session.terminate();
}
}

pub(crate) async fn terminate_turn_sessions(&self) {
let entries: Vec<SessionEntry> = {
let mut sessions = self.session_store.lock().await;
let mut keep = HashMap::new();
let mut to_terminate = Vec::new();
let drained: Vec<(String, SessionEntry)> = sessions.sessions.drain().collect();
for (process_id, entry) in drained {
if entry.long_running {
keep.insert(process_id, entry);
} else {
sessions.reserved_sessions_id.remove(&process_id);
to_terminate.push(entry);
}
}
sessions.sessions = keep;
to_terminate
};

for entry in entries {
entry.session.terminate();
}
}

async fn active_long_running_session_count(&self) -> usize {
let store = self.session_store.lock().await;
store
.sessions
.values()
.filter(|entry| entry.long_running && !entry.session.has_exited())
.count()
}
}

enum SessionStatus {
Expand Down
Loading