diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 16ee3b98242..59d073ea803 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -156,6 +156,11 @@ client_request_definitions! { response: v2::McpServerOauthLoginResponse, }, + McpServerRefresh => "config/mcpServer/reload" { + params: #[ts(type = "undefined")] #[serde(skip_serializing_if = "Option::is_none")] Option<()>, + response: v2::McpServerRefreshResponse, + }, + McpServerStatusList => "mcpServerStatus/list" { params: v2::ListMcpServerStatusParams, response: v2::ListMcpServerStatusResponse, diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 348df069fc0..30505cf0673 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -940,6 +940,16 @@ pub struct ListMcpServerStatusResponse { pub next_cursor: Option, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct McpServerRefreshParams {} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct McpServerRefreshResponse {} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 597f002c0d7..26c2dee9cf8 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -88,6 +88,7 @@ Example (from OpenAI's official VSCode extension): - `model/list` — list available models (with reasoning effort options). - `skills/list` — list skills for one or more `cwd` values (optional `forceReload`). - `mcpServer/oauth/login` — start an OAuth login for a configured MCP server; returns an `authorization_url` and later emits `mcpServer/oauthLogin/completed` once the browser flow finishes. +- `config/mcpServer/reload` — reload MCP server config from disk and queue a refresh for loaded threads (applied on each thread's next active turn); returns `{}`. Use this after editing `config.toml` without restarting the server. - `mcpServerStatus/list` — enumerate configured MCP servers with their tools, resources, resource templates, and auth status; supports cursor+limit pagination. - `feedback/upload` — submit a feedback report (classification + optional reason/logs and conversation_id); returns the tracking thread id. - `command/exec` — run a single command under the server sandbox without starting a thread/turn (handy for utilities and validation). diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index da6cecc6bcd..2d981427246 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -60,6 +60,7 @@ use codex_app_server_protocol::LogoutChatGptResponse; use codex_app_server_protocol::McpServerOauthLoginCompletedNotification; use codex_app_server_protocol::McpServerOauthLoginParams; use codex_app_server_protocol::McpServerOauthLoginResponse; +use codex_app_server_protocol::McpServerRefreshResponse; use codex_app_server_protocol::McpServerStatus; use codex_app_server_protocol::ModelListParams; use codex_app_server_protocol::ModelListResponse; @@ -157,6 +158,7 @@ use codex_protocol::items::TurnItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::GitInfo as CoreGitInfo; use codex_protocol::protocol::McpAuthStatus as CoreMcpAuthStatus; +use codex_protocol::protocol::McpServerRefreshConfig; use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionMetaLine; @@ -425,6 +427,9 @@ impl CodexMessageProcessor { ClientRequest::McpServerOauthLogin { request_id, params } => { self.mcp_server_oauth_login(request_id, params).await; } + ClientRequest::McpServerRefresh { request_id, params } => { + self.mcp_server_refresh(request_id, params).await; + } ClientRequest::McpServerStatusList { request_id, params } => { self.list_mcp_server_status(request_id, params).await; } @@ -2302,6 +2307,57 @@ impl CodexMessageProcessor { outgoing.send_response(request_id, response).await; } + async fn mcp_server_refresh(&self, request_id: RequestId, _params: Option<()>) { + let config = match self.load_latest_config().await { + Ok(config) => config, + Err(error) => { + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + let mcp_servers = match serde_json::to_value(&config.mcp_servers) { + Ok(value) => value, + Err(err) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to serialize MCP servers: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + let mcp_oauth_credentials_store_mode = + match serde_json::to_value(config.mcp_oauth_credentials_store_mode) { + Ok(value) => value, + Err(err) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!( + "failed to serialize MCP OAuth credentials store mode: {err}" + ), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + let refresh_config = McpServerRefreshConfig { + mcp_servers, + mcp_oauth_credentials_store_mode, + }; + + // Refresh requests are queued per thread; each thread rebuilds MCP connections on its next + // active turn to avoid work for threads that never resume. + let thread_manager = Arc::clone(&self.thread_manager); + thread_manager.refresh_mcp_servers(refresh_config).await; + let response = McpServerRefreshResponse {}; + self.outgoing.send_response(request_id, response).await; + } + async fn mcp_server_oauth_login( &self, request_id: RequestId, diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 00ed4fba0d6..1ceb71b7406 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -48,6 +48,7 @@ use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnContextItem; use codex_protocol::protocol::TurnStartedEvent; use codex_rmcp_client::ElicitationResponse; +use codex_rmcp_client::OAuthCredentialsStoreMode; use futures::future::BoxFuture; use futures::prelude::*; use futures::stream::FuturesOrdered; @@ -84,6 +85,7 @@ use crate::config::Config; use crate::config::Constrained; use crate::config::ConstraintResult; use crate::config::GhostSnapshotConfig; +use crate::config::types::McpServerConfig; use crate::config::types::ShellEnvironmentPolicy; use crate::context_manager::ContextManager; use crate::environment_context::EnvironmentContext; @@ -107,6 +109,7 @@ use crate::protocol::ErrorEvent; use crate::protocol::Event; use crate::protocol::EventMsg; use crate::protocol::ExecApprovalRequestEvent; +use crate::protocol::McpServerRefreshConfig; use crate::protocol::Op; use crate::protocol::RateLimitSnapshot; use crate::protocol::ReasoningContentDeltaEvent; @@ -361,6 +364,7 @@ pub(crate) struct Session { /// The set of enabled features should be invariant for the lifetime of the /// session. features: Features, + pending_mcp_server_refresh_config: Mutex>, pub(crate) active_turn: Mutex>, pub(crate) services: SessionServices, next_internal_sub_id: AtomicU64, @@ -685,7 +689,7 @@ impl Session { let services = SessionServices { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())), - mcp_startup_cancellation_token: CancellationToken::new(), + mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::default(), notifier: UserNotifier::new(config.notify.clone()), rollout: Mutex::new(Some(rollout_recorder)), @@ -706,6 +710,7 @@ impl Session { agent_status: Arc::clone(&agent_status), state: Mutex::new(state), features: config.features.clone(), + pending_mcp_server_refresh_config: Mutex::new(None), active_turn: Mutex::new(None), services, next_internal_sub_id: AtomicU64::new(0), @@ -742,6 +747,8 @@ impl Session { codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(), sandbox_cwd: session_configuration.cwd.clone(), }; + let cancel_token = sess.mcp_startup_cancellation_token().await; + sess.services .mcp_connection_manager .write() @@ -751,7 +758,7 @@ impl Session { config.mcp_oauth_credentials_store_mode, auth_statuses.clone(), tx_event.clone(), - sess.services.mcp_startup_cancellation_token.clone(), + cancel_token, sandbox_state, ) .await; @@ -1649,12 +1656,85 @@ impl Session { Arc::clone(&self.services.user_shell) } + async fn refresh_mcp_servers_if_requested(&self, turn_context: &TurnContext) { + let refresh_config = { self.pending_mcp_server_refresh_config.lock().await.take() }; + let Some(refresh_config) = refresh_config else { + return; + }; + + let McpServerRefreshConfig { + mcp_servers, + mcp_oauth_credentials_store_mode, + } = refresh_config; + + let mcp_servers = + match serde_json::from_value::>(mcp_servers) { + Ok(servers) => servers, + Err(err) => { + warn!("failed to parse MCP server refresh config: {err}"); + return; + } + }; + let store_mode = match serde_json::from_value::( + mcp_oauth_credentials_store_mode, + ) { + Ok(mode) => mode, + Err(err) => { + warn!("failed to parse MCP OAuth refresh config: {err}"); + return; + } + }; + + let auth_statuses = compute_auth_statuses(mcp_servers.iter(), store_mode).await; + let sandbox_state = SandboxState { + sandbox_policy: turn_context.sandbox_policy.clone(), + codex_linux_sandbox_exe: turn_context.codex_linux_sandbox_exe.clone(), + sandbox_cwd: turn_context.cwd.clone(), + }; + let cancel_token = self.reset_mcp_startup_cancellation_token().await; + + let mut refreshed_manager = McpConnectionManager::default(); + refreshed_manager + .initialize( + mcp_servers, + store_mode, + auth_statuses, + self.get_tx_event(), + cancel_token, + sandbox_state, + ) + .await; + + let mut manager = self.services.mcp_connection_manager.write().await; + *manager = refreshed_manager; + } + + async fn mcp_startup_cancellation_token(&self) -> CancellationToken { + self.services + .mcp_startup_cancellation_token + .lock() + .await + .clone() + } + + async fn reset_mcp_startup_cancellation_token(&self) -> CancellationToken { + let mut guard = self.services.mcp_startup_cancellation_token.lock().await; + guard.cancel(); + let cancel_token = CancellationToken::new(); + *guard = cancel_token.clone(); + cancel_token + } + fn show_raw_agent_reasoning(&self) -> bool { self.services.show_raw_agent_reasoning } async fn cancel_mcp_startup(&self) { - self.services.mcp_startup_cancellation_token.cancel(); + self.services + .mcp_startup_cancellation_token + .lock() + .await + .cancel(); } } @@ -1712,6 +1792,9 @@ async fn submission_loop(sess: Arc, config: Arc, rx_sub: Receiv Op::ListMcpTools => { handlers::list_mcp_tools(&sess, &config, sub.id.clone()).await; } + Op::RefreshMcpServers { config } => { + handlers::refresh_mcp_servers(&sess, config).await; + } Op::ListCustomPrompts => { handlers::list_custom_prompts(&sess, sub.id.clone()).await; } @@ -1780,6 +1863,7 @@ mod handlers { use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::ListCustomPromptsResponseEvent; use codex_protocol::protocol::ListSkillsResponseEvent; + use codex_protocol::protocol::McpServerRefreshConfig; use codex_protocol::protocol::Op; use codex_protocol::protocol::ReviewDecision; use codex_protocol::protocol::ReviewRequest; @@ -1878,6 +1962,8 @@ mod handlers { .await; } + sess.refresh_mcp_servers_if_requested(¤t_context) + .await; sess.spawn_task(Arc::clone(¤t_context), items, RegularTask) .await; *previous_context = Some(current_context); @@ -2009,6 +2095,11 @@ mod handlers { }); } + pub async fn refresh_mcp_servers(sess: &Arc, refresh_config: McpServerRefreshConfig) { + let mut guard = sess.pending_mcp_server_refresh_config.lock().await; + *guard = Some(refresh_config); + } + pub async fn list_mcp_tools(sess: &Session, config: &Arc, sub_id: String) { let mcp_connection_manager = sess.services.mcp_connection_manager.read().await; let snapshot = collect_mcp_snapshot_from_manager( @@ -2193,6 +2284,7 @@ mod handlers { review_request: ReviewRequest, ) { let turn_context = sess.new_default_turn_with_sub_id(sub_id.clone()).await; + sess.refresh_mcp_servers_if_requested(&turn_context).await; match resolve_review_request(review_request, turn_context.cwd.as_path()) { Ok(resolved) => { spawn_review_thread( @@ -3528,7 +3620,7 @@ mod tests { let services = SessionServices { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())), - mcp_startup_cancellation_token: CancellationToken::new(), + mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::default(), notifier: UserNotifier::new(None), rollout: Mutex::new(None), @@ -3560,6 +3652,7 @@ mod tests { agent_status: Arc::clone(&agent_status), state: Mutex::new(state), features: config.features.clone(), + pending_mcp_server_refresh_config: Mutex::new(None), active_turn: Mutex::new(None), services, next_internal_sub_id: AtomicU64::new(0), @@ -3622,7 +3715,7 @@ mod tests { let services = SessionServices { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())), - mcp_startup_cancellation_token: CancellationToken::new(), + mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::default(), notifier: UserNotifier::new(None), rollout: Mutex::new(None), @@ -3654,6 +3747,7 @@ mod tests { agent_status: Arc::clone(&agent_status), state: Mutex::new(state), features: config.features.clone(), + pending_mcp_server_refresh_config: Mutex::new(None), active_turn: Mutex::new(None), services, next_internal_sub_id: AtomicU64::new(0), @@ -3662,6 +3756,48 @@ mod tests { (session, turn_context, rx_event) } + #[tokio::test] + async fn refresh_mcp_servers_is_deferred_until_next_turn() { + let (session, turn_context) = make_session_and_context().await; + let old_token = session.mcp_startup_cancellation_token().await; + assert!(!old_token.is_cancelled()); + + let mcp_oauth_credentials_store_mode = + serde_json::to_value(OAuthCredentialsStoreMode::Auto).expect("serialize store mode"); + let refresh_config = McpServerRefreshConfig { + mcp_servers: json!({}), + mcp_oauth_credentials_store_mode, + }; + { + let mut guard = session.pending_mcp_server_refresh_config.lock().await; + *guard = Some(refresh_config); + } + + assert!(!old_token.is_cancelled()); + assert!( + session + .pending_mcp_server_refresh_config + .lock() + .await + .is_some() + ); + + session + .refresh_mcp_servers_if_requested(&turn_context) + .await; + + assert!(old_token.is_cancelled()); + assert!( + session + .pending_mcp_server_refresh_config + .lock() + .await + .is_none() + ); + let new_token = session.mcp_startup_cancellation_token().await; + assert!(!new_token.is_cancelled()); + } + #[tokio::test] async fn record_model_warning_appends_user_message() { let (mut session, turn_context) = make_session_and_context().await; diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index 2e4395956a5..cd1f1c04984 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -17,7 +17,7 @@ use tokio_util::sync::CancellationToken; pub(crate) struct SessionServices { pub(crate) mcp_connection_manager: Arc>, - pub(crate) mcp_startup_cancellation_token: CancellationToken, + pub(crate) mcp_startup_cancellation_token: Mutex, pub(crate) unified_exec_manager: UnifiedExecProcessManager, pub(crate) notifier: UserNotifier, pub(crate) rollout: Mutex>, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index a4e8f9c34cf..f95f08af08e 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -21,6 +21,7 @@ use crate::skills::SkillsManager; use codex_protocol::ThreadId; use codex_protocol::openai_models::ModelPreset; use codex_protocol::protocol::InitialHistory; +use codex_protocol::protocol::McpServerRefreshConfig; use codex_protocol::protocol::Op; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; @@ -30,6 +31,7 @@ use std::sync::Arc; #[cfg(any(test, feature = "test-support"))] use tempfile::TempDir; use tokio::sync::RwLock; +use tracing::warn; /// Represents a newly created Codex thread (formerly called a conversation), including the first event /// (which is [`EventMsg::SessionConfigured`]). @@ -136,6 +138,27 @@ impl ThreadManager { self.state.threads.read().await.keys().copied().collect() } + pub async fn refresh_mcp_servers(&self, refresh_config: McpServerRefreshConfig) { + let threads = self + .state + .threads + .read() + .await + .values() + .cloned() + .collect::>(); + for thread in threads { + if let Err(err) = thread + .submit(Op::RefreshMcpServers { + config: refresh_config.clone(), + }) + .await + { + warn!("failed to request MCP server refresh: {err}"); + } + } + } + pub async fn get_thread(&self, thread_id: ThreadId) -> CodexResult> { self.state.get_thread(thread_id).await } diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index b2f51caea85..cb848e57cf2 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -60,6 +60,13 @@ pub struct Submission { pub op: Op, } +/// Config payload for refreshing MCP servers. +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema)] +pub struct McpServerRefreshConfig { + pub mcp_servers: Value, + pub mcp_oauth_credentials_store_mode: Value, +} + /// Submission operation #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema)] #[serde(tag = "type", rename_all = "snake_case")] @@ -186,6 +193,9 @@ pub enum Op { /// Reply is delivered via `EventMsg::McpListToolsResponse`. ListMcpTools, + /// Request MCP servers to reinitialize and refresh cached tool lists. + RefreshMcpServers { config: McpServerRefreshConfig }, + /// Request the list of available custom prompts. ListCustomPrompts,