Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 150 additions & 3 deletions codex-rs/core/src/agent/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ impl AgentControl {
Self { manager }
}

#[allow(dead_code)] // Used by upcoming multi-agent tooling.
/// Spawn a new agent thread and submit the initial prompt.
///
/// If `headless` is true, a background drain task is spawned to prevent unbounded event growth
Expand All @@ -50,7 +49,6 @@ impl AgentControl {
Ok(new_thread.thread_id)
}

#[allow(dead_code)] // Used by upcoming multi-agent tooling.
/// Send a `user` prompt to an existing agent thread.
pub(crate) async fn send_prompt(
&self,
Expand All @@ -69,7 +67,7 @@ impl AgentControl {
.await
}

#[allow(dead_code)] // Used by upcoming multi-agent tooling.
#[allow(dead_code)] // Will be used for collab tools.
/// Fetch the last known status for `agent_id`, returning `NotFound` when unavailable.
pub(crate) async fn get_status(&self, agent_id: ThreadId) -> AgentStatus {
let Ok(state) = self.upgrade() else {
Expand Down Expand Up @@ -114,13 +112,63 @@ fn spawn_headless_drain(thread: Arc<CodexThread>) {
#[cfg(test)]
mod tests {
use super::*;
use crate::CodexAuth;
use crate::ThreadManager;
use crate::agent::agent_status_from_event;
use crate::config::Config;
use crate::config::ConfigBuilder;
use assert_matches::assert_matches;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::TaskCompleteEvent;
use codex_protocol::protocol::TaskStartedEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use pretty_assertions::assert_eq;
use tempfile::TempDir;

async fn test_config() -> (TempDir, Config) {
let home = TempDir::new().expect("create temp dir");
let config = ConfigBuilder::default()
.codex_home(home.path().to_path_buf())
.build()
.await
.expect("load default test config");
(home, config)
}

struct AgentControlHarness {
_home: TempDir,
config: Config,
manager: ThreadManager,
control: AgentControl,
}

impl AgentControlHarness {
async fn new() -> Self {
let (home, config) = test_config().await;
let manager = ThreadManager::with_models_provider_and_home(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.clone(),
);
let control = manager.agent_control();
Self {
_home: home,
config,
manager,
control,
}
}

async fn start_thread(&self) -> (ThreadId, Arc<CodexThread>) {
let new_thread = self
.manager
.start_thread(self.config.clone())
.await
.expect("start thread");
(new_thread.thread_id, new_thread.thread)
}
}

#[tokio::test]
async fn send_prompt_errors_when_manager_dropped() {
Expand Down Expand Up @@ -185,4 +233,103 @@ mod tests {
let status = agent_status_from_event(&EventMsg::ShutdownComplete);
assert_eq!(status, Some(AgentStatus::Shutdown));
}

#[tokio::test]
async fn spawn_agent_errors_when_manager_dropped() {
let control = AgentControl::default();
let (_home, config) = test_config().await;
let err = control
.spawn_agent(config, "hello".to_string(), false)
.await
.expect_err("spawn_agent should fail without a manager");
assert_eq!(
err.to_string(),
"unsupported operation: thread manager dropped"
);
}

#[tokio::test]
async fn send_prompt_errors_when_thread_missing() {
let harness = AgentControlHarness::new().await;
let thread_id = ThreadId::new();
let err = harness
.control
.send_prompt(thread_id, "hello".to_string())
.await
.expect_err("send_prompt should fail for missing thread");
assert_matches!(err, CodexErr::ThreadNotFound(id) if id == thread_id);
}

#[tokio::test]
async fn get_status_returns_not_found_for_missing_thread() {
let harness = AgentControlHarness::new().await;
let status = harness.control.get_status(ThreadId::new()).await;
assert_eq!(status, AgentStatus::NotFound);
}

#[tokio::test]
async fn get_status_returns_pending_init_for_new_thread() {
let harness = AgentControlHarness::new().await;
let (thread_id, _) = harness.start_thread().await;
let status = harness.control.get_status(thread_id).await;
assert_eq!(status, AgentStatus::PendingInit);
}

#[tokio::test]
async fn send_prompt_submits_user_message() {
let harness = AgentControlHarness::new().await;
let (thread_id, _thread) = harness.start_thread().await;

let submission_id = harness
.control
.send_prompt(thread_id, "hello from tests".to_string())
.await
.expect("send_prompt should succeed");
assert!(!submission_id.is_empty());
let expected = (
thread_id,
Op::UserInput {
items: vec![UserInput::Text {
text: "hello from tests".to_string(),
}],
final_output_json_schema: None,
},
);
let captured = harness
.manager
.captured_ops()
.into_iter()
.find(|entry| *entry == expected);
assert_eq!(captured, Some(expected));
}

#[tokio::test]
async fn spawn_agent_creates_thread_and_sends_prompt() {
let harness = AgentControlHarness::new().await;
let thread_id = harness
.control
.spawn_agent(harness.config.clone(), "spawned".to_string(), false)
.await
.expect("spawn_agent should succeed");
let _thread = harness
.manager
.get_thread(thread_id)
.await
.expect("thread should be registered");
let expected = (
thread_id,
Op::UserInput {
items: vec![UserInput::Text {
text: "spawned".to_string(),
}],
final_output_json_schema: None,
},
);
let captured = harness
.manager
.captured_ops()
.into_iter()
.find(|entry| *entry == expected);
assert_eq!(captured, Some(expected));
}
}
29 changes: 27 additions & 2 deletions codex-rs/core/src/thread_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ pub(crate) struct ThreadManagerState {
models_manager: Arc<ModelsManager>,
skills_manager: Arc<SkillsManager>,
session_source: SessionSource,
#[cfg(any(test, feature = "test-support"))]
#[allow(dead_code)]
// Captures submitted ops for testing purpose.
ops_log: Arc<std::sync::Mutex<Vec<(ThreadId, Op)>>>,
}

impl ThreadManager {
Expand All @@ -74,6 +78,8 @@ impl ThreadManager {
skills_manager: Arc::new(SkillsManager::new(codex_home)),
auth_manager,
session_source,
#[cfg(any(test, feature = "test-support"))]
ops_log: Arc::new(std::sync::Mutex::new(Vec::new())),
}),
#[cfg(any(test, feature = "test-support"))]
_test_codex_home_guard: None,
Expand Down Expand Up @@ -111,6 +117,8 @@ impl ThreadManager {
skills_manager: Arc::new(SkillsManager::new(codex_home)),
auth_manager,
session_source: SessionSource::Exec,
#[cfg(any(test, feature = "test-support"))]
ops_log: Arc::new(std::sync::Mutex::new(Vec::new())),
}),
_test_codex_home_guard: None,
}
Expand Down Expand Up @@ -202,9 +210,19 @@ impl ThreadManager {
.await
}

fn agent_control(&self) -> AgentControl {
pub(crate) fn agent_control(&self) -> AgentControl {
AgentControl::new(Arc::downgrade(&self.state))
}

#[cfg(any(test, feature = "test-support"))]
#[allow(dead_code)]
pub(crate) fn captured_ops(&self) -> Vec<(ThreadId, Op)> {
self.state
.ops_log
.lock()
.map(|log| log.clone())
.unwrap_or_default()
}
}

impl ThreadManagerState {
Expand All @@ -217,7 +235,14 @@ impl ThreadManagerState {
}

pub(crate) async fn send_op(&self, thread_id: ThreadId, op: Op) -> CodexResult<String> {
self.get_thread(thread_id).await?.submit(op).await
let thread = self.get_thread(thread_id).await?;
#[cfg(any(test, feature = "test-support"))]
{
if let Ok(mut log) = self.ops_log.lock() {
log.push((thread_id, op.clone()));
}
}
thread.submit(op).await
}

#[allow(dead_code)] // Used by upcoming multi-agent tooling.
Expand Down
Loading
Loading