diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index a841e29205d..f9863a23675 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -121,6 +121,10 @@ client_request_definitions! { params: v2::ThreadListParams, response: v2::ThreadListResponse, }, + ThreadLoadedList => "thread/loaded/list" { + params: v2::ThreadLoadedListParams, + response: v2::ThreadLoadedListResponse, + }, SkillsList => "skills/list" { params: v2::SkillsListParams, response: v2::SkillsListResponse, diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 2d8a9113f1f..e6fe7633c0f 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -1139,6 +1139,27 @@ pub struct ThreadListResponse { pub next_cursor: Option, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadLoadedListParams { + /// Opaque pagination cursor returned by a previous call. + pub cursor: Option, + /// Optional page size; defaults to no limit. + pub limit: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadLoadedListResponse { + /// Thread ids for sessions currently loaded in memory. + pub data: Vec, + /// Opaque cursor to pass to the next call to continue after the last item. + /// if None, there are no more items to return. + pub next_cursor: Option, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 0d8fafad25b..84608b5b6a5 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -73,6 +73,7 @@ Example (from OpenAI's official VSCode extension): - `thread/start` — create a new thread; emits `thread/started` and auto-subscribes you to turn/item events for that thread. - `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it. - `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders` filtering. +- `thread/loaded/list` — list the thread ids currently loaded in memory. - `thread/archive` — move a thread’s rollout file into the archived directory; returns `{}` on success. - `thread/rollback` — drop the last N turns from the agent’s in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success. - `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. @@ -147,6 +148,17 @@ Example: When `nextCursor` is `null`, you’ve reached the final page. +### Example: List loaded threads + +`thread/loaded/list` returns thread ids currently loaded in memory. This is useful when you want to check which sessions are active without scanning rollouts on disk. + +```json +{ "method": "thread/loaded/list", "id": 21 } +{ "id": 21, "result": { + "data": ["thr_123", "thr_456"] +} } +``` + ### Example: Archive a thread Use `thread/archive` to move the persisted rollout (stored as a JSONL file on disk) into the archived sessions directory. diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index d17dc76b4b2..177410f16d4 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -89,6 +89,8 @@ use codex_app_server_protocol::ThreadArchiveResponse; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadListResponse; +use codex_app_server_protocol::ThreadLoadedListParams; +use codex_app_server_protocol::ThreadLoadedListResponse; use codex_app_server_protocol::ThreadResumeParams; use codex_app_server_protocol::ThreadResumeResponse; use codex_app_server_protocol::ThreadRollbackParams; @@ -376,6 +378,9 @@ impl CodexMessageProcessor { ClientRequest::ThreadList { request_id, params } => { self.thread_list(request_id, params).await; } + ClientRequest::ThreadLoadedList { request_id, params } => { + self.thread_loaded_list(request_id, params).await; + } ClientRequest::SkillsList { request_id, params } => { self.skills_list(request_id, params).await; } @@ -1591,6 +1596,61 @@ impl CodexMessageProcessor { self.outgoing.send_response(request_id, response).await; } + async fn thread_loaded_list(&self, request_id: RequestId, params: ThreadLoadedListParams) { + let ThreadLoadedListParams { cursor, limit } = params; + let mut data = self + .thread_manager + .list_thread_ids() + .await + .into_iter() + .map(|thread_id| thread_id.to_string()) + .collect::>(); + + if data.is_empty() { + let response = ThreadLoadedListResponse { + data, + next_cursor: None, + }; + self.outgoing.send_response(request_id, response).await; + return; + } + + data.sort(); + let total = data.len(); + let start = match cursor { + Some(cursor) => { + let cursor = match ThreadId::from_string(&cursor) { + Ok(id) => id.to_string(), + Err(_) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("invalid cursor: {cursor}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + match data.binary_search(&cursor) { + Ok(idx) => idx + 1, + Err(idx) => idx, + } + } + None => 0, + }; + + let effective_limit = limit.unwrap_or(total as u32).max(1) as usize; + let end = start.saturating_add(effective_limit).min(total); + let page = data[start..end].to_vec(); + let next_cursor = page.last().filter(|_| end < total).cloned(); + + let response = ThreadLoadedListResponse { + data: page, + next_cursor, + }; + self.outgoing.send_response(request_id, response).await; + } + async fn thread_resume(&mut self, request_id: RequestId, params: ThreadResumeParams) { let ThreadResumeParams { thread_id, diff --git a/codex-rs/app-server/tests/common/mcp_process.rs b/codex-rs/app-server/tests/common/mcp_process.rs index f3ec682fb21..c60e1dce022 100644 --- a/codex-rs/app-server/tests/common/mcp_process.rs +++ b/codex-rs/app-server/tests/common/mcp_process.rs @@ -44,6 +44,7 @@ use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::SetDefaultModelParams; use codex_app_server_protocol::ThreadArchiveParams; use codex_app_server_protocol::ThreadListParams; +use codex_app_server_protocol::ThreadLoadedListParams; use codex_app_server_protocol::ThreadResumeParams; use codex_app_server_protocol::ThreadRollbackParams; use codex_app_server_protocol::ThreadStartParams; @@ -335,6 +336,15 @@ impl McpProcess { self.send_request("thread/list", params).await } + /// Send a `thread/loaded/list` JSON-RPC request. + pub async fn send_thread_loaded_list_request( + &mut self, + params: ThreadLoadedListParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("thread/loaded/list", params).await + } + /// Send a `model/list` JSON-RPC request. pub async fn send_list_models_request( &mut self, diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index 1ef00c6939d..865321bd345 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -6,6 +6,7 @@ mod rate_limits; mod review; mod thread_archive; mod thread_list; +mod thread_loaded_list; mod thread_resume; mod thread_rollback; mod thread_start; diff --git a/codex-rs/app-server/tests/suite/v2/thread_loaded_list.rs b/codex-rs/app-server/tests/suite/v2/thread_loaded_list.rs new file mode 100644 index 00000000000..f8f9ec15774 --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/thread_loaded_list.rs @@ -0,0 +1,139 @@ +use anyhow::Result; +use app_test_support::McpProcess; +use app_test_support::create_mock_chat_completions_server; +use app_test_support::to_response; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ThreadLoadedListParams; +use codex_app_server_protocol::ThreadLoadedListResponse; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use pretty_assertions::assert_eq; +use std::path::Path; +use tempfile::TempDir; +use tokio::time::timeout; + +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +#[tokio::test] +async fn thread_loaded_list_returns_loaded_thread_ids() -> Result<()> { + let server = create_mock_chat_completions_server(vec![]).await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_id = start_thread(&mut mcp).await?; + + let list_id = mcp + .send_thread_loaded_list_request(ThreadLoadedListParams::default()) + .await?; + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(list_id)), + ) + .await??; + let ThreadLoadedListResponse { + mut data, + next_cursor, + } = to_response::(resp)?; + data.sort(); + assert_eq!(data, vec![thread_id]); + assert_eq!(next_cursor, None); + + Ok(()) +} + +#[tokio::test] +async fn thread_loaded_list_paginates() -> Result<()> { + let server = create_mock_chat_completions_server(vec![]).await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let first = start_thread(&mut mcp).await?; + let second = start_thread(&mut mcp).await?; + + let mut expected = [first, second]; + expected.sort(); + + let list_id = mcp + .send_thread_loaded_list_request(ThreadLoadedListParams { + cursor: None, + limit: Some(1), + }) + .await?; + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(list_id)), + ) + .await??; + let ThreadLoadedListResponse { + data: first_page, + next_cursor, + } = to_response::(resp)?; + assert_eq!(first_page, vec![expected[0].clone()]); + assert_eq!(next_cursor, Some(expected[0].clone())); + + let list_id = mcp + .send_thread_loaded_list_request(ThreadLoadedListParams { + cursor: next_cursor, + limit: Some(1), + }) + .await?; + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(list_id)), + ) + .await??; + let ThreadLoadedListResponse { + data: second_page, + next_cursor, + } = to_response::(resp)?; + assert_eq!(second_page, vec![expected[1].clone()]); + assert_eq!(next_cursor, None); + + Ok(()) +} + +fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + format!( + r#" +model = "mock-model" +approval_policy = "never" +sandbox_mode = "read-only" + +model_provider = "mock_provider" + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{server_uri}/v1" +wire_api = "chat" +request_max_retries = 0 +stream_max_retries = 0 +"# + ), + ) +} + +async fn start_thread(mcp: &mut McpProcess) -> Result { + let req_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("gpt-5.1".to_string()), + ..Default::default() + }) + .await?; + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(req_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(resp)?; + Ok(thread.id) +}