Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions codex-rs/app-server-protocol/src/protocol/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ client_request_definitions! {
params: v2::ThreadListParams,
response: v2::ThreadListResponse,
},
ThreadLoadedList => "thread/loaded/list" {
params: v2::ThreadLoadedListParams,
response: v2::ThreadLoadedListResponse,
},
SkillsList => "skills/list" {
params: v2::SkillsListParams,
response: v2::SkillsListResponse,
Expand Down
21 changes: 21 additions & 0 deletions codex-rs/app-server-protocol/src/protocol/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1139,6 +1139,27 @@ pub struct ThreadListResponse {
pub next_cursor: Option<String>,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadLoadedListParams {
/// Opaque pagination cursor returned by a previous call.
pub cursor: Option<String>,
/// Optional page size; defaults to no limit.
pub limit: Option<u32>,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadLoadedListResponse {
/// Thread ids for sessions currently loaded in memory.
pub data: Vec<String>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this return Vec<Thread>? i.e. would the metadata on Thread be useful?

We don't have to populate all the turns & items since we only do that for specific APIs (we've already documented the behavior on the Thread object)

Copy link
Collaborator Author

@jif-oai jif-oai Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No because sometimes we don't have them and don't need them

Copy link
Collaborator

@owenlin0 owenlin0 Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, would be nice to support pagination for consistency with our other list APIs, even though in practice you'd probably want to return everything always. Mainly because adding pagination later is a breaking change

pub struct ThreadLoadedListRequest {
    /// Opaque pagination cursor returned by a previous call.
    pub cursor: Option<String>,
    /// Optional page size; defaults to no limit
    pub limit: Option<u32>,
}

pub struct ThreadLoadedListResponse {
    pub data: Vec<String>,
    /// Opaque cursor to pass to the next call to continue after the last item.
    /// if None, there are no more items to return.
    pub next_cursor: Option<String>,
}

/// Opaque cursor to pass to the next call to continue after the last item.
/// if None, there are no more items to return.
pub next_cursor: Option<String>,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
Expand Down
12 changes: 12 additions & 0 deletions codex-rs/app-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Example (from OpenAI's official VSCode extension):
- `thread/start` — create a new thread; emits `thread/started` and auto-subscribes you to turn/item events for that thread.
- `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it.
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders` filtering.
- `thread/loaded/list` — list the thread ids currently loaded in memory.
- `thread/archive` — move a thread’s rollout file into the archived directory; returns `{}` on success.
- `thread/rollback` — drop the last N turns from the agent’s in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications.
Expand Down Expand Up @@ -147,6 +148,17 @@ Example:

When `nextCursor` is `null`, you’ve reached the final page.

### Example: List loaded threads

`thread/loaded/list` returns thread ids currently loaded in memory. This is useful when you want to check which sessions are active without scanning rollouts on disk.

```json
{ "method": "thread/loaded/list", "id": 21 }
{ "id": 21, "result": {
"data": ["thr_123", "thr_456"]
} }
```

### Example: Archive a thread

Use `thread/archive` to move the persisted rollout (stored as a JSONL file on disk) into the archived sessions directory.
Expand Down
60 changes: 60 additions & 0 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadLoadedListParams;
use codex_app_server_protocol::ThreadLoadedListResponse;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadRollbackParams;
Expand Down Expand Up @@ -376,6 +378,9 @@ impl CodexMessageProcessor {
ClientRequest::ThreadList { request_id, params } => {
self.thread_list(request_id, params).await;
}
ClientRequest::ThreadLoadedList { request_id, params } => {
self.thread_loaded_list(request_id, params).await;
}
ClientRequest::SkillsList { request_id, params } => {
self.skills_list(request_id, params).await;
}
Expand Down Expand Up @@ -1591,6 +1596,61 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
}

async fn thread_loaded_list(&self, request_id: RequestId, params: ThreadLoadedListParams) {
let ThreadLoadedListParams { cursor, limit } = params;
let mut data = self
.thread_manager
.list_thread_ids()
.await
.into_iter()
.map(|thread_id| thread_id.to_string())
.collect::<Vec<_>>();

if data.is_empty() {
let response = ThreadLoadedListResponse {
data,
next_cursor: None,
};
self.outgoing.send_response(request_id, response).await;
return;
}

data.sort();
let total = data.len();
let start = match cursor {
Some(cursor) => {
let cursor = match ThreadId::from_string(&cursor) {
Ok(id) => id.to_string(),
Err(_) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid cursor: {cursor}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
match data.binary_search(&cursor) {
Ok(idx) => idx + 1,
Err(idx) => idx,
}
}
None => 0,
};

let effective_limit = limit.unwrap_or(total as u32).max(1) as usize;
let end = start.saturating_add(effective_limit).min(total);
let page = data[start..end].to_vec();
let next_cursor = page.last().filter(|_| end < total).cloned();

let response = ThreadLoadedListResponse {
data: page,
next_cursor,
};
self.outgoing.send_response(request_id, response).await;
}

async fn thread_resume(&mut self, request_id: RequestId, params: ThreadResumeParams) {
let ThreadResumeParams {
thread_id,
Expand Down
10 changes: 10 additions & 0 deletions codex-rs/app-server/tests/common/mcp_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::SetDefaultModelParams;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadLoadedListParams;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadRollbackParams;
use codex_app_server_protocol::ThreadStartParams;
Expand Down Expand Up @@ -335,6 +336,15 @@ impl McpProcess {
self.send_request("thread/list", params).await
}

/// Send a `thread/loaded/list` JSON-RPC request.
pub async fn send_thread_loaded_list_request(
&mut self,
params: ThreadLoadedListParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/loaded/list", params).await
}

/// Send a `model/list` JSON-RPC request.
pub async fn send_list_models_request(
&mut self,
Expand Down
1 change: 1 addition & 0 deletions codex-rs/app-server/tests/suite/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod rate_limits;
mod review;
mod thread_archive;
mod thread_list;
mod thread_loaded_list;
mod thread_resume;
mod thread_rollback;
mod thread_start;
Expand Down
139 changes: 139 additions & 0 deletions codex-rs/app-server/tests/suite/v2/thread_loaded_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_mock_chat_completions_server;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadLoadedListParams;
use codex_app_server_protocol::ThreadLoadedListResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use pretty_assertions::assert_eq;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;

const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);

#[tokio::test]
async fn thread_loaded_list_returns_loaded_thread_ids() -> Result<()> {
let server = create_mock_chat_completions_server(vec![]).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;

let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;

let thread_id = start_thread(&mut mcp).await?;

let list_id = mcp
.send_thread_loaded_list_request(ThreadLoadedListParams::default())
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
)
.await??;
let ThreadLoadedListResponse {
mut data,
next_cursor,
} = to_response::<ThreadLoadedListResponse>(resp)?;
data.sort();
assert_eq!(data, vec![thread_id]);
assert_eq!(next_cursor, None);

Ok(())
}

#[tokio::test]
async fn thread_loaded_list_paginates() -> Result<()> {
let server = create_mock_chat_completions_server(vec![]).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;

let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;

let first = start_thread(&mut mcp).await?;
let second = start_thread(&mut mcp).await?;

let mut expected = [first, second];
expected.sort();

let list_id = mcp
.send_thread_loaded_list_request(ThreadLoadedListParams {
cursor: None,
limit: Some(1),
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
)
.await??;
let ThreadLoadedListResponse {
data: first_page,
next_cursor,
} = to_response::<ThreadLoadedListResponse>(resp)?;
assert_eq!(first_page, vec![expected[0].clone()]);
assert_eq!(next_cursor, Some(expected[0].clone()));

let list_id = mcp
.send_thread_loaded_list_request(ThreadLoadedListParams {
cursor: next_cursor,
limit: Some(1),
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
)
.await??;
let ThreadLoadedListResponse {
data: second_page,
next_cursor,
} = to_response::<ThreadLoadedListResponse>(resp)?;
assert_eq!(second_page, vec![expected[1].clone()]);
assert_eq!(next_cursor, None);

Ok(())
}

fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"

model_provider = "mock_provider"

[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "chat"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}

async fn start_thread(mcp: &mut McpProcess) -> Result<String> {
let req_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5.1".to_string()),
..Default::default()
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(resp)?;
Ok(thread.id)
}
Loading