From 811754c93bf791caf5c09d2a2d0d5e71548afef4 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Thu, 8 Jan 2026 12:10:46 +0000 Subject: [PATCH 1/2] chore: add small debug client --- codex-rs/Cargo.lock | 12 + codex-rs/Cargo.toml | 1 + codex-rs/debug-client/Cargo.toml | 15 + codex-rs/debug-client/README.md | 57 ++++ codex-rs/debug-client/src/client.rs | 385 ++++++++++++++++++++++++++ codex-rs/debug-client/src/commands.rs | 156 +++++++++++ codex-rs/debug-client/src/main.rs | 293 ++++++++++++++++++++ codex-rs/debug-client/src/output.rs | 121 ++++++++ codex-rs/debug-client/src/reader.rs | 316 +++++++++++++++++++++ codex-rs/debug-client/src/state.rs | 28 ++ 10 files changed, 1384 insertions(+) create mode 100644 codex-rs/debug-client/Cargo.toml create mode 100644 codex-rs/debug-client/README.md create mode 100644 codex-rs/debug-client/src/client.rs create mode 100644 codex-rs/debug-client/src/commands.rs create mode 100644 codex-rs/debug-client/src/main.rs create mode 100644 codex-rs/debug-client/src/output.rs create mode 100644 codex-rs/debug-client/src/reader.rs create mode 100644 codex-rs/debug-client/src/state.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 92ecc7e3b0b..d027df83ad4 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1355,6 +1355,18 @@ dependencies = [ "zstd", ] +[[package]] +name = "codex-debug-client" +version = "0.0.0" +dependencies = [ + "anyhow", + "clap", + "codex-app-server-protocol", + "pretty_assertions", + "serde", + "serde_json", +] + [[package]] name = "codex-exec" version = "0.0.0" diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index 1dc043a4fcc..feb498cb0be 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -6,6 +6,7 @@ members = [ "app-server", "app-server-protocol", "app-server-test-client", + "debug-client", "apply-patch", "arg0", "feedback", diff --git a/codex-rs/debug-client/Cargo.toml b/codex-rs/debug-client/Cargo.toml new file mode 100644 index 00000000000..b220ebd367a --- /dev/null +++ b/codex-rs/debug-client/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "codex-debug-client" +version.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +anyhow.workspace = true +clap = { workspace = true, features = ["derive"] } +codex-app-server-protocol.workspace = true +serde.workspace = true +serde_json.workspace = true + +[dev-dependencies] +pretty_assertions.workspace = true diff --git a/codex-rs/debug-client/README.md b/codex-rs/debug-client/README.md new file mode 100644 index 00000000000..091fc1e6aad --- /dev/null +++ b/codex-rs/debug-client/README.md @@ -0,0 +1,57 @@ +WARNING: this code is mainly generated by Codex and should not be used in production + +# codex-debug-client + +A tiny interactive client for `codex app-server` (protocol v2 only). It prints +all JSON-RPC lines from the server and lets you send new turns as you type. + +## Usage + +Start the app-server client (it will spawn `codex app-server` itself): + +``` +cargo run -p codex-debug-client -- \ + --codex-bin codex \ + --approval-policy on-request +``` + +You can resume a specific thread: + +``` +cargo run -p codex-debug-client -- --thread-id thr_123 +``` + +### CLI flags + +- `--codex-bin `: path to the `codex` binary (default: `codex`). +- `-c, --config key=value`: pass through `--config` overrides to `codex`. +- `--thread-id `: resume a thread instead of starting a new one. +- `--approval-policy `: `untrusted`, `on-failure`, `on-request`, `never`. +- `--auto-approve`: auto-approve command/file-change approvals (default: decline). +- `--final-only`: only show completed assistant messages and tool items. +- `--model `: optional model override for thread start/resume. +- `--model-provider `: optional provider override. +- `--cwd `: optional working directory override. + +## Interactive commands + +Type a line to send it as a new turn. Commands are prefixed with `:`: + +- `:help` show help +- `:new` start a new thread +- `:resume ` resume a thread +- `:use ` switch active thread without resuming +- `:refresh-thread` list available threads +- `:quit` exit + +The prompt shows the active thread id. Client messages (help, errors, approvals) +print to stderr; raw server JSON prints to stdout so you can pipe/record it +unless `--final-only` is set. + +## Notes + +- The client performs the required initialize/initialized handshake. +- It prints every server notification and response line as it arrives. +- Approvals for `item/commandExecution/requestApproval` and + `item/fileChange/requestApproval` are auto-responded to with decline unless + `--auto-approve` is set. diff --git a/codex-rs/debug-client/src/client.rs b/codex-rs/debug-client/src/client.rs new file mode 100644 index 00000000000..f18ce9f717a --- /dev/null +++ b/codex-rs/debug-client/src/client.rs @@ -0,0 +1,385 @@ +use std::io::BufRead; +use std::io::BufReader; +use std::io::Write; +use std::process::Child; +use std::process::ChildStdin; +use std::process::ChildStdout; +use std::process::Command; +use std::process::Stdio; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::atomic::AtomicI64; +use std::sync::atomic::Ordering; +use std::sync::mpsc::Sender; + +use anyhow::Context; +use anyhow::Result; +use codex_app_server_protocol::AskForApproval; +use codex_app_server_protocol::ClientInfo; +use codex_app_server_protocol::ClientNotification; +use codex_app_server_protocol::ClientRequest; +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCRequest; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ThreadListParams; +use codex_app_server_protocol::ThreadResumeParams; +use codex_app_server_protocol::ThreadResumeResponse; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::UserInput; +use serde::Serialize; + +use crate::output::Output; +use crate::reader::start_reader; +use crate::state::PendingRequest; +use crate::state::ReaderEvent; +use crate::state::State; + +pub struct AppServerClient { + child: Child, + stdin: Arc>, + stdout: Option>, + next_request_id: AtomicI64, + state: Arc>, + output: Output, + filtered_output: bool, +} + +impl AppServerClient { + pub fn spawn( + codex_bin: &str, + config_overrides: &[String], + output: Output, + filtered_output: bool, + ) -> Result { + let mut cmd = Command::new(codex_bin); + for override_kv in config_overrides { + cmd.arg("--config").arg(override_kv); + } + + let mut child = cmd + .arg("app-server") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn() + .with_context(|| format!("failed to start `{codex_bin}` app-server"))?; + + let stdin = child + .stdin + .take() + .context("codex app-server stdin unavailable")?; + let stdout = child + .stdout + .take() + .context("codex app-server stdout unavailable")?; + + Ok(Self { + child, + stdin: Arc::new(Mutex::new(stdin)), + stdout: Some(BufReader::new(stdout)), + next_request_id: AtomicI64::new(1), + state: Arc::new(Mutex::new(State::default())), + output, + filtered_output, + }) + } + + pub fn initialize(&mut self) -> Result<()> { + let request_id = self.next_request_id(); + let request = ClientRequest::Initialize { + request_id: request_id.clone(), + params: codex_app_server_protocol::InitializeParams { + client_info: ClientInfo { + name: "debug-client".to_string(), + title: Some("Debug Client".to_string()), + version: env!("CARGO_PKG_VERSION").to_string(), + }, + }, + }; + + self.send(&request)?; + let response = self.read_until_response(&request_id)?; + let _parsed: codex_app_server_protocol::InitializeResponse = + serde_json::from_value(response.result).context("decode initialize response")?; + let initialized = ClientNotification::Initialized; + self.send(&initialized)?; + Ok(()) + } + + pub fn start_thread(&mut self, params: ThreadStartParams) -> Result { + let request_id = self.next_request_id(); + let request = ClientRequest::ThreadStart { + request_id: request_id.clone(), + params, + }; + self.send(&request)?; + let response = self.read_until_response(&request_id)?; + let parsed: ThreadStartResponse = + serde_json::from_value(response.result).context("decode thread/start response")?; + let thread_id = parsed.thread.id; + self.set_thread_id(thread_id.clone()); + Ok(thread_id) + } + + pub fn resume_thread(&mut self, params: ThreadResumeParams) -> Result { + let request_id = self.next_request_id(); + let request = ClientRequest::ThreadResume { + request_id: request_id.clone(), + params, + }; + self.send(&request)?; + let response = self.read_until_response(&request_id)?; + let parsed: ThreadResumeResponse = + serde_json::from_value(response.result).context("decode thread/resume response")?; + let thread_id = parsed.thread.id; + self.set_thread_id(thread_id.clone()); + Ok(thread_id) + } + + pub fn request_thread_start(&self, params: ThreadStartParams) -> Result { + let request_id = self.next_request_id(); + self.track_pending(request_id.clone(), PendingRequest::ThreadStart); + let request = ClientRequest::ThreadStart { + request_id: request_id.clone(), + params, + }; + self.send(&request)?; + Ok(request_id) + } + + pub fn request_thread_resume(&self, params: ThreadResumeParams) -> Result { + let request_id = self.next_request_id(); + self.track_pending(request_id.clone(), PendingRequest::ThreadResume); + let request = ClientRequest::ThreadResume { + request_id: request_id.clone(), + params, + }; + self.send(&request)?; + Ok(request_id) + } + + pub fn request_thread_list(&self, cursor: Option) -> Result { + let request_id = self.next_request_id(); + self.track_pending(request_id.clone(), PendingRequest::ThreadList); + let request = ClientRequest::ThreadList { + request_id: request_id.clone(), + params: ThreadListParams { + cursor, + limit: None, + model_providers: None, + }, + }; + self.send(&request)?; + Ok(request_id) + } + + pub fn send_turn(&self, thread_id: &str, text: String) -> Result { + let request_id = self.next_request_id(); + let request = ClientRequest::TurnStart { + request_id: request_id.clone(), + params: TurnStartParams { + thread_id: thread_id.to_string(), + input: vec![UserInput::Text { text }], + ..Default::default() + }, + }; + self.send(&request)?; + Ok(request_id) + } + + pub fn start_reader( + &mut self, + events: Sender, + auto_approve: bool, + filtered_output: bool, + ) -> Result<()> { + let stdout = self.stdout.take().context("reader already started")?; + start_reader( + stdout, + Arc::clone(&self.stdin), + Arc::clone(&self.state), + events, + self.output.clone(), + auto_approve, + filtered_output, + ); + Ok(()) + } + + pub fn thread_id(&self) -> Option { + let state = self.state.lock().expect("state lock poisoned"); + state.thread_id.clone() + } + + pub fn set_thread_id(&self, thread_id: String) { + let mut state = self.state.lock().expect("state lock poisoned"); + state.thread_id = Some(thread_id); + self.remember_thread_locked(&mut state); + } + + pub fn use_thread(&self, thread_id: String) -> bool { + let mut state = self.state.lock().expect("state lock poisoned"); + let known = state.known_threads.iter().any(|id| id == &thread_id); + state.thread_id = Some(thread_id); + self.remember_thread_locked(&mut state); + known + } + + pub fn shutdown(&mut self) { + drop(self.stdin.lock().ok()); + let _ = self.child.wait(); + } + + fn track_pending(&self, request_id: RequestId, kind: PendingRequest) { + let mut state = self.state.lock().expect("state lock poisoned"); + state.pending.insert(request_id, kind); + } + + fn remember_thread_locked(&self, state: &mut State) { + if let Some(thread_id) = state.thread_id.as_ref() { + if !state.known_threads.iter().any(|id| id == thread_id) { + state.known_threads.push(thread_id.clone()); + } + } + } + + fn next_request_id(&self) -> RequestId { + let id = self.next_request_id.fetch_add(1, Ordering::SeqCst); + RequestId::Integer(id) + } + + fn send(&self, value: &T) -> Result<()> { + let json = serde_json::to_string(value).context("serialize message")?; + let mut line = json; + line.push('\n'); + let mut stdin = self.stdin.lock().expect("stdin lock poisoned"); + stdin.write_all(line.as_bytes()).context("write message")?; + stdin.flush().context("flush message")?; + Ok(()) + } + + fn read_until_response(&mut self, request_id: &RequestId) -> Result { + let stdin = Arc::clone(&self.stdin); + let output = self.output.clone(); + let reader = self.stdout.as_mut().context("stdout missing")?; + let mut buffer = String::new(); + + loop { + buffer.clear(); + let bytes = reader + .read_line(&mut buffer) + .context("read server output")?; + if bytes == 0 { + anyhow::bail!("server closed stdout while awaiting response {request_id:?}"); + } + + let line = buffer.trim_end_matches(['\n', '\r']); + if !line.is_empty() && !self.filtered_output { + let _ = output.server_line(line); + } + + let message = match serde_json::from_str::(line) { + Ok(message) => message, + Err(_) => continue, + }; + + match message { + JSONRPCMessage::Response(response) => { + if &response.id == request_id { + return Ok(response); + } + } + JSONRPCMessage::Request(request) => { + let _ = handle_server_request(request, &stdin); + } + _ => {} + } + } + } +} + +fn handle_server_request(request: JSONRPCRequest, stdin: &Arc>) -> Result<()> { + let Ok(server_request) = codex_app_server_protocol::ServerRequest::try_from(request) else { + return Ok(()); + }; + + match server_request { + codex_app_server_protocol::ServerRequest::CommandExecutionRequestApproval { + request_id, + .. + } => { + let response = codex_app_server_protocol::CommandExecutionRequestApprovalResponse { + decision: codex_app_server_protocol::ApprovalDecision::Decline, + }; + send_jsonrpc_response(stdin, request_id, response) + } + codex_app_server_protocol::ServerRequest::FileChangeRequestApproval { + request_id, .. + } => { + let response = codex_app_server_protocol::FileChangeRequestApprovalResponse { + decision: codex_app_server_protocol::ApprovalDecision::Decline, + }; + send_jsonrpc_response(stdin, request_id, response) + } + _ => Ok(()), + } +} + +fn send_jsonrpc_response( + stdin: &Arc>, + request_id: RequestId, + response: T, +) -> Result<()> { + let result = serde_json::to_value(response).context("serialize response")?; + let message = JSONRPCMessage::Response(JSONRPCResponse { + id: request_id, + result, + }); + send_with_stdin(stdin, &message) +} + +fn send_with_stdin(stdin: &Arc>, value: &T) -> Result<()> { + let json = serde_json::to_string(value).context("serialize message")?; + let mut line = json; + line.push('\n'); + let mut stdin = stdin.lock().expect("stdin lock poisoned"); + stdin.write_all(line.as_bytes()).context("write message")?; + stdin.flush().context("flush message")?; + Ok(()) +} + +pub fn build_thread_start_params( + approval_policy: AskForApproval, + model: Option, + model_provider: Option, + cwd: Option, +) -> ThreadStartParams { + ThreadStartParams { + model, + model_provider, + cwd, + approval_policy: Some(approval_policy), + experimental_raw_events: false, + ..Default::default() + } +} + +pub fn build_thread_resume_params( + thread_id: String, + approval_policy: AskForApproval, + model: Option, + model_provider: Option, + cwd: Option, +) -> ThreadResumeParams { + ThreadResumeParams { + thread_id, + model, + model_provider, + cwd, + approval_policy: Some(approval_policy), + ..Default::default() + } +} diff --git a/codex-rs/debug-client/src/commands.rs b/codex-rs/debug-client/src/commands.rs new file mode 100644 index 00000000000..447f09a54de --- /dev/null +++ b/codex-rs/debug-client/src/commands.rs @@ -0,0 +1,156 @@ +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum InputAction { + Message(String), + Command(UserCommand), +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum UserCommand { + Help, + Quit, + NewThread, + Resume(String), + Use(String), + RefreshThread, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ParseError { + EmptyCommand, + MissingArgument { name: &'static str }, + UnknownCommand { command: String }, +} + +impl ParseError { + pub fn message(&self) -> String { + match self { + Self::EmptyCommand => "empty command after ':'".to_string(), + Self::MissingArgument { name } => { + format!("missing required argument: {name}") + } + Self::UnknownCommand { command } => format!("unknown command: {command}"), + } + } +} + +pub fn parse_input(line: &str) -> Result, ParseError> { + let trimmed = line.trim(); + if trimmed.is_empty() { + return Ok(None); + } + + let Some(command_line) = trimmed.strip_prefix(':') else { + return Ok(Some(InputAction::Message(trimmed.to_string()))); + }; + + let mut parts = command_line.split_whitespace(); + let Some(command) = parts.next() else { + return Err(ParseError::EmptyCommand); + }; + + match command { + "help" | "h" => Ok(Some(InputAction::Command(UserCommand::Help))), + "quit" | "q" | "exit" => Ok(Some(InputAction::Command(UserCommand::Quit))), + "new" => Ok(Some(InputAction::Command(UserCommand::NewThread))), + "resume" => { + let thread_id = parts + .next() + .ok_or(ParseError::MissingArgument { name: "thread-id" })?; + Ok(Some(InputAction::Command(UserCommand::Resume( + thread_id.to_string(), + )))) + } + "use" => { + let thread_id = parts + .next() + .ok_or(ParseError::MissingArgument { name: "thread-id" })?; + Ok(Some(InputAction::Command(UserCommand::Use( + thread_id.to_string(), + )))) + } + "refresh-thread" => Ok(Some(InputAction::Command(UserCommand::RefreshThread))), + _ => Err(ParseError::UnknownCommand { + command: command.to_string(), + }), + } +} + +#[cfg(test)] +mod tests { + use pretty_assertions::assert_eq; + + use super::InputAction; + use super::ParseError; + use super::UserCommand; + use super::parse_input; + + #[test] + fn parses_message() { + let result = parse_input("hello there").unwrap(); + assert_eq!( + result, + Some(InputAction::Message("hello there".to_string())) + ); + } + + #[test] + fn parses_help_command() { + let result = parse_input(":help").unwrap(); + assert_eq!(result, Some(InputAction::Command(UserCommand::Help))); + } + + #[test] + fn parses_new_thread() { + let result = parse_input(":new").unwrap(); + assert_eq!(result, Some(InputAction::Command(UserCommand::NewThread))); + } + + #[test] + fn parses_resume() { + let result = parse_input(":resume thr_123").unwrap(); + assert_eq!( + result, + Some(InputAction::Command(UserCommand::Resume( + "thr_123".to_string() + ))) + ); + } + + #[test] + fn parses_use() { + let result = parse_input(":use thr_456").unwrap(); + assert_eq!( + result, + Some(InputAction::Command(UserCommand::Use( + "thr_456".to_string() + ))) + ); + } + + #[test] + fn parses_refresh_thread() { + let result = parse_input(":refresh-thread").unwrap(); + assert_eq!( + result, + Some(InputAction::Command(UserCommand::RefreshThread)) + ); + } + + #[test] + fn rejects_missing_resume_arg() { + let result = parse_input(":resume"); + assert_eq!( + result, + Err(ParseError::MissingArgument { name: "thread-id" }) + ); + } + + #[test] + fn rejects_missing_use_arg() { + let result = parse_input(":use"); + assert_eq!( + result, + Err(ParseError::MissingArgument { name: "thread-id" }) + ); + } +} diff --git a/codex-rs/debug-client/src/main.rs b/codex-rs/debug-client/src/main.rs new file mode 100644 index 00000000000..e51376fb516 --- /dev/null +++ b/codex-rs/debug-client/src/main.rs @@ -0,0 +1,293 @@ +mod client; +mod commands; +mod output; +mod reader; +mod state; + +use std::io; +use std::io::BufRead; +use std::sync::mpsc; + +use anyhow::Context; +use anyhow::Result; +use clap::ArgAction; +use clap::Parser; +use codex_app_server_protocol::AskForApproval; + +use crate::client::AppServerClient; +use crate::client::build_thread_resume_params; +use crate::client::build_thread_start_params; +use crate::commands::InputAction; +use crate::commands::UserCommand; +use crate::commands::parse_input; +use crate::output::Output; +use crate::state::ReaderEvent; + +#[derive(Parser)] +#[command(author = "Codex", version, about = "Minimal app-server client")] +struct Cli { + /// Path to the `codex` CLI binary. + #[arg(long, default_value = "codex")] + codex_bin: String, + + /// Forwarded to the `codex` CLI as `--config key=value`. Repeatable. + #[arg(short = 'c', long = "config", value_name = "key=value", action = ArgAction::Append)] + config_overrides: Vec, + + /// Resume an existing thread instead of starting a new one. + #[arg(long)] + thread_id: Option, + + /// Set the approval policy for the thread. + #[arg(long, default_value = "on-request")] + approval_policy: String, + + /// Auto-approve command/file-change approvals. + #[arg(long, default_value_t = false)] + auto_approve: bool, + + /// Only show final assistant messages and tool calls. + #[arg(long, default_value_t = false)] + final_only: bool, + + /// Optional model override when starting/resuming a thread. + #[arg(long)] + model: Option, + + /// Optional model provider override when starting/resuming a thread. + #[arg(long)] + model_provider: Option, + + /// Optional working directory override when starting/resuming a thread. + #[arg(long)] + cwd: Option, +} + +fn main() -> Result<()> { + let cli = Cli::parse(); + let output = Output::new(); + let approval_policy = parse_approval_policy(&cli.approval_policy)?; + + let mut client = AppServerClient::spawn( + &cli.codex_bin, + &cli.config_overrides, + output.clone(), + cli.final_only, + )?; + client.initialize()?; + + let thread_id = if let Some(thread_id) = cli.thread_id.as_ref() { + client.resume_thread(build_thread_resume_params( + thread_id.clone(), + approval_policy, + cli.model.clone(), + cli.model_provider.clone(), + cli.cwd.clone(), + ))? + } else { + client.start_thread(build_thread_start_params( + approval_policy, + cli.model.clone(), + cli.model_provider.clone(), + cli.cwd.clone(), + ))? + }; + + output + .client_line(&format!("connected to thread {thread_id}")) + .ok(); + output.set_prompt(&thread_id); + + let (event_tx, event_rx) = mpsc::channel(); + client.start_reader(event_tx, cli.auto_approve, cli.final_only)?; + + print_help(&output); + + let stdin = io::stdin(); + let mut lines = stdin.lock().lines(); + + loop { + drain_events(&event_rx, &output); + let prompt_thread = client + .thread_id() + .unwrap_or_else(|| "no-thread".to_string()); + output.prompt(&prompt_thread).ok(); + + let Some(line) = lines.next() else { + break; + }; + let line = line.context("read stdin")?; + + match parse_input(&line) { + Ok(None) => continue, + Ok(Some(InputAction::Message(message))) => { + let Some(active_thread) = client.thread_id() else { + output + .client_line("no active thread; use :new or :resume ") + .ok(); + continue; + }; + if let Err(err) = client.send_turn(&active_thread, message) { + output + .client_line(&format!("failed to send turn: {err}")) + .ok(); + } + } + Ok(Some(InputAction::Command(command))) => { + if !handle_command(command, &client, &output, approval_policy, &cli) { + break; + } + } + Err(err) => { + output.client_line(&err.message()).ok(); + } + } + } + + client.shutdown(); + Ok(()) +} + +fn handle_command( + command: UserCommand, + client: &AppServerClient, + output: &Output, + approval_policy: AskForApproval, + cli: &Cli, +) -> bool { + match command { + UserCommand::Help => { + print_help(output); + true + } + UserCommand::Quit => false, + UserCommand::NewThread => { + match client.request_thread_start(build_thread_start_params( + approval_policy, + cli.model.clone(), + cli.model_provider.clone(), + cli.cwd.clone(), + )) { + Ok(request_id) => { + output + .client_line(&format!("requested new thread ({request_id:?})")) + .ok(); + } + Err(err) => { + output + .client_line(&format!("failed to start thread: {err}")) + .ok(); + } + } + true + } + UserCommand::Resume(thread_id) => { + match client.request_thread_resume(build_thread_resume_params( + thread_id, + approval_policy, + cli.model.clone(), + cli.model_provider.clone(), + cli.cwd.clone(), + )) { + Ok(request_id) => { + output + .client_line(&format!("requested thread resume ({request_id:?})")) + .ok(); + } + Err(err) => { + output + .client_line(&format!("failed to resume thread: {err}")) + .ok(); + } + } + true + } + UserCommand::Use(thread_id) => { + let known = client.use_thread(thread_id.clone()); + output.set_prompt(&thread_id); + if known { + output + .client_line(&format!("switched active thread to {thread_id}")) + .ok(); + } else { + output + .client_line(&format!( + "switched active thread to {thread_id} (unknown; use :resume to load)" + )) + .ok(); + } + true + } + UserCommand::RefreshThread => { + match client.request_thread_list(None) { + Ok(request_id) => { + output + .client_line(&format!("requested thread list ({request_id:?})")) + .ok(); + } + Err(err) => { + output + .client_line(&format!("failed to list threads: {err}")) + .ok(); + } + } + true + } + } +} + +fn parse_approval_policy(value: &str) -> Result { + match value { + "untrusted" | "unless-trusted" | "unlessTrusted" => Ok(AskForApproval::UnlessTrusted), + "on-failure" | "onFailure" => Ok(AskForApproval::OnFailure), + "on-request" | "onRequest" => Ok(AskForApproval::OnRequest), + "never" => Ok(AskForApproval::Never), + _ => anyhow::bail!( + "unknown approval policy: {value}. Expected one of: untrusted, on-failure, on-request, never" + ), + } +} + +fn drain_events(event_rx: &mpsc::Receiver, output: &Output) { + while let Ok(event) = event_rx.try_recv() { + match event { + ReaderEvent::ThreadReady { thread_id } => { + output + .client_line(&format!("active thread is now {thread_id}")) + .ok(); + output.set_prompt(&thread_id); + } + ReaderEvent::ThreadList { + thread_ids, + next_cursor, + } => { + if thread_ids.is_empty() { + output.client_line("threads: (none)").ok(); + } else { + output.client_line("threads:").ok(); + for thread_id in thread_ids { + output.client_line(&format!(" {thread_id}")).ok(); + } + } + if let Some(next_cursor) = next_cursor { + output + .client_line(&format!( + "more threads available, next cursor: {next_cursor}" + )) + .ok(); + } + } + } + } +} + +fn print_help(output: &Output) { + let _ = output.client_line("commands:"); + let _ = output.client_line(" :help show this help"); + let _ = output.client_line(" :new start a new thread"); + let _ = output.client_line(" :resume resume an existing thread"); + let _ = output.client_line(" :use switch the active thread"); + let _ = output.client_line(" :refresh-thread list available threads"); + let _ = output.client_line(" :quit exit"); + let _ = output.client_line("type a message to send it as a new turn"); +} diff --git a/codex-rs/debug-client/src/output.rs b/codex-rs/debug-client/src/output.rs new file mode 100644 index 00000000000..ec71cf2b245 --- /dev/null +++ b/codex-rs/debug-client/src/output.rs @@ -0,0 +1,121 @@ +use std::io; +use std::io::IsTerminal; +use std::io::Write; +use std::sync::Arc; +use std::sync::Mutex; + +#[derive(Clone, Copy, Debug)] +pub enum LabelColor { + Assistant, + Tool, + ToolMeta, + Thread, +} + +#[derive(Debug, Default)] +struct PromptState { + thread_id: Option, + visible: bool, +} + +#[derive(Clone, Debug)] +pub struct Output { + lock: Arc>, + prompt: Arc>, + color: bool, +} + +impl Output { + pub fn new() -> Self { + let no_color = std::env::var_os("NO_COLOR").is_some(); + let color = !no_color && io::stdout().is_terminal() && io::stderr().is_terminal(); + Self { + lock: Arc::new(Mutex::new(())), + prompt: Arc::new(Mutex::new(PromptState::default())), + color, + } + } + + pub fn server_line(&self, line: &str) -> io::Result<()> { + let _guard = self.lock.lock().expect("output lock poisoned"); + self.clear_prompt_line_locked()?; + let mut stdout = io::stdout(); + writeln!(stdout, "{line}")?; + stdout.flush()?; + self.redraw_prompt_locked() + } + + pub fn client_line(&self, line: &str) -> io::Result<()> { + let _guard = self.lock.lock().expect("output lock poisoned"); + self.clear_prompt_line_locked()?; + let mut stderr = io::stderr(); + writeln!(stderr, "{line}")?; + stderr.flush() + } + + pub fn prompt(&self, thread_id: &str) -> io::Result<()> { + let _guard = self.lock.lock().expect("output lock poisoned"); + self.set_prompt_locked(thread_id); + self.write_prompt_locked() + } + + pub fn set_prompt(&self, thread_id: &str) { + let _guard = self.lock.lock().expect("output lock poisoned"); + self.set_prompt_locked(thread_id); + } + + pub fn format_label(&self, label: &str, color: LabelColor) -> String { + if !self.color { + return label.to_string(); + } + + let code = match color { + LabelColor::Assistant => "32", + LabelColor::Tool => "36", + LabelColor::ToolMeta => "33", + LabelColor::Thread => "34", + }; + format!("\x1b[{code}m{label}\x1b[0m") + } + + fn clear_prompt_line_locked(&self) -> io::Result<()> { + let mut prompt = self.prompt.lock().expect("prompt lock poisoned"); + if prompt.visible { + let mut stderr = io::stderr(); + writeln!(stderr)?; + stderr.flush()?; + prompt.visible = false; + } + Ok(()) + } + + fn redraw_prompt_locked(&self) -> io::Result<()> { + if self + .prompt + .lock() + .expect("prompt lock poisoned") + .thread_id + .is_some() + { + self.write_prompt_locked()?; + } + Ok(()) + } + + fn set_prompt_locked(&self, thread_id: &str) { + let mut prompt = self.prompt.lock().expect("prompt lock poisoned"); + prompt.thread_id = Some(thread_id.to_string()); + } + + fn write_prompt_locked(&self) -> io::Result<()> { + let mut prompt = self.prompt.lock().expect("prompt lock poisoned"); + let Some(thread_id) = prompt.thread_id.as_ref() else { + return Ok(()); + }; + let mut stderr = io::stderr(); + write!(stderr, "({thread_id})> ")?; + stderr.flush()?; + prompt.visible = true; + Ok(()) + } +} diff --git a/codex-rs/debug-client/src/reader.rs b/codex-rs/debug-client/src/reader.rs new file mode 100644 index 00000000000..7c027b7649f --- /dev/null +++ b/codex-rs/debug-client/src/reader.rs @@ -0,0 +1,316 @@ +use std::io::BufRead; +use std::io::BufReader; +use std::process::ChildStdout; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::mpsc::Sender; +use std::thread; +use std::thread::JoinHandle; + +use anyhow::Context; +use codex_app_server_protocol::ApprovalDecision; +use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; +use codex_app_server_protocol::FileChangeRequestApprovalResponse; +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCNotification; +use codex_app_server_protocol::JSONRPCRequest; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::ThreadItem; +use codex_app_server_protocol::ThreadListResponse; +use codex_app_server_protocol::ThreadResumeResponse; +use codex_app_server_protocol::ThreadStartResponse; +use serde::Serialize; +use std::io::Write; + +use crate::output::LabelColor; +use crate::output::Output; +use crate::state::PendingRequest; +use crate::state::ReaderEvent; +use crate::state::State; + +pub fn start_reader( + mut stdout: BufReader, + stdin: Arc>, + state: Arc>, + events: Sender, + output: Output, + auto_approve: bool, + filtered_output: bool, +) -> JoinHandle<()> { + thread::spawn(move || { + let decision = if auto_approve { + ApprovalDecision::Accept + } else { + ApprovalDecision::Decline + }; + + let mut buffer = String::new(); + + loop { + buffer.clear(); + match stdout.read_line(&mut buffer) { + Ok(0) => break, + Ok(_) => {} + Err(err) => { + let _ = output.client_line(&format!("failed to read from server: {err}")); + break; + } + } + + let line = buffer.trim_end_matches(['\n', '\r']); + if !line.is_empty() && !filtered_output { + let _ = output.server_line(line); + } + + let Ok(message) = serde_json::from_str::(line) else { + continue; + }; + + match message { + JSONRPCMessage::Request(request) => { + if let Err(err) = handle_server_request(request, &decision, &stdin, &output) { + let _ = + output.client_line(&format!("failed to handle server request: {err}")); + } + } + JSONRPCMessage::Response(response) => { + if let Err(err) = handle_response(response, &state, &events) { + let _ = output.client_line(&format!("failed to handle response: {err}")); + } + } + JSONRPCMessage::Notification(notification) => { + if filtered_output + && let Err(err) = handle_filtered_notification(notification, &output) + { + let _ = + output.client_line(&format!("failed to filter notification: {err}")); + } + } + _ => {} + } + } + }) +} + +fn handle_server_request( + request: JSONRPCRequest, + decision: &ApprovalDecision, + stdin: &Arc>, + output: &Output, +) -> anyhow::Result<()> { + let server_request = match ServerRequest::try_from(request.clone()) { + Ok(server_request) => server_request, + Err(_) => return Ok(()), + }; + + match server_request { + ServerRequest::CommandExecutionRequestApproval { request_id, params } => { + let response = CommandExecutionRequestApprovalResponse { + decision: decision.clone(), + }; + output.client_line(&format!( + "auto-response for command approval {request_id:?}: {decision:?} ({params:?})" + ))?; + send_response(stdin, request_id, response) + } + ServerRequest::FileChangeRequestApproval { request_id, params } => { + let response = FileChangeRequestApprovalResponse { + decision: decision.clone(), + }; + output.client_line(&format!( + "auto-response for file change approval {request_id:?}: {decision:?} ({params:?})" + ))?; + send_response(stdin, request_id, response) + } + _ => Ok(()), + } +} + +fn handle_response( + response: JSONRPCResponse, + state: &Arc>, + events: &Sender, +) -> anyhow::Result<()> { + let pending = { + let mut state = state.lock().expect("state lock poisoned"); + state.pending.remove(&response.id) + }; + + let Some(pending) = pending else { + return Ok(()); + }; + + match pending { + PendingRequest::ThreadStart => { + let parsed = serde_json::from_value::(response.result) + .context("decode thread/start response")?; + let thread_id = parsed.thread.id; + { + let mut state = state.lock().expect("state lock poisoned"); + state.thread_id = Some(thread_id.clone()); + if !state.known_threads.iter().any(|id| id == &thread_id) { + state.known_threads.push(thread_id.clone()); + } + } + events.send(ReaderEvent::ThreadReady { thread_id }).ok(); + } + PendingRequest::ThreadResume => { + let parsed = serde_json::from_value::(response.result) + .context("decode thread/resume response")?; + let thread_id = parsed.thread.id; + { + let mut state = state.lock().expect("state lock poisoned"); + state.thread_id = Some(thread_id.clone()); + if !state.known_threads.iter().any(|id| id == &thread_id) { + state.known_threads.push(thread_id.clone()); + } + } + events.send(ReaderEvent::ThreadReady { thread_id }).ok(); + } + PendingRequest::ThreadList => { + let parsed = serde_json::from_value::(response.result) + .context("decode thread/list response")?; + let thread_ids: Vec = parsed.data.into_iter().map(|thread| thread.id).collect(); + { + let mut state = state.lock().expect("state lock poisoned"); + for thread_id in &thread_ids { + if !state.known_threads.iter().any(|id| id == thread_id) { + state.known_threads.push(thread_id.clone()); + } + } + } + events + .send(ReaderEvent::ThreadList { + thread_ids, + next_cursor: parsed.next_cursor, + }) + .ok(); + } + } + + Ok(()) +} + +fn handle_filtered_notification( + notification: JSONRPCNotification, + output: &Output, +) -> anyhow::Result<()> { + let Ok(server_notification) = ServerNotification::try_from(notification) else { + return Ok(()); + }; + + match server_notification { + ServerNotification::ItemCompleted(payload) => { + emit_filtered_item(payload.item, &payload.thread_id, output) + } + _ => Ok(()), + } +} + +fn emit_filtered_item(item: ThreadItem, thread_id: &str, output: &Output) -> anyhow::Result<()> { + let thread_label = output.format_label(thread_id, LabelColor::Thread); + match item { + ThreadItem::AgentMessage { text, .. } => { + let label = output.format_label("assistant", LabelColor::Assistant); + output.server_line(&format!("{thread_label} {label}: {text}"))?; + } + ThreadItem::CommandExecution { + command, + status, + exit_code, + aggregated_output, + .. + } => { + let label = output.format_label("tool", LabelColor::Tool); + output.server_line(&format!( + "{thread_label} {label}: command {command} ({status:?})" + ))?; + if let Some(exit_code) = exit_code { + let label = output.format_label("tool exit", LabelColor::ToolMeta); + output.server_line(&format!("{thread_label} {label}: {exit_code}"))?; + } + if let Some(aggregated_output) = aggregated_output { + let label = output.format_label("tool output", LabelColor::ToolMeta); + write_multiline( + output, + &thread_label, + &format!("{label}:"), + &aggregated_output, + )?; + } + } + ThreadItem::FileChange { + changes, status, .. + } => { + let label = output.format_label("tool", LabelColor::Tool); + output.server_line(&format!( + "{thread_label} {label}: file change ({status:?}, {} files)", + changes.len() + ))?; + } + ThreadItem::McpToolCall { + server, + tool, + status, + arguments, + result, + error, + .. + } => { + let label = output.format_label("tool", LabelColor::Tool); + output.server_line(&format!( + "{thread_label} {label}: {server}.{tool} ({status:?})" + ))?; + if !arguments.is_null() { + let label = output.format_label("tool args", LabelColor::ToolMeta); + output.server_line(&format!("{thread_label} {label}: {arguments}"))?; + } + if let Some(result) = result { + let label = output.format_label("tool result", LabelColor::ToolMeta); + output.server_line(&format!("{thread_label} {label}: {result:?}"))?; + } + if let Some(error) = error { + let label = output.format_label("tool error", LabelColor::ToolMeta); + output.server_line(&format!("{thread_label} {label}: {error:?}"))?; + } + } + _ => {} + } + + Ok(()) +} + +fn write_multiline( + output: &Output, + thread_label: &str, + header: &str, + text: &str, +) -> anyhow::Result<()> { + output.server_line(&format!("{thread_label} {header}"))?; + for line in text.lines() { + output.server_line(&format!("{thread_label} {line}"))?; + } + Ok(()) +} + +fn send_response( + stdin: &Arc>, + request_id: codex_app_server_protocol::RequestId, + response: T, +) -> anyhow::Result<()> { + let result = serde_json::to_value(response).context("serialize response")?; + let message = JSONRPCResponse { + id: request_id, + result, + }; + let json = serde_json::to_string(&message).context("serialize response message")?; + let mut line = json; + line.push('\n'); + + let mut stdin = stdin.lock().expect("stdin lock poisoned"); + stdin.write_all(line.as_bytes()).context("write response")?; + stdin.flush().context("flush response")?; + Ok(()) +} diff --git a/codex-rs/debug-client/src/state.rs b/codex-rs/debug-client/src/state.rs new file mode 100644 index 00000000000..60efe496174 --- /dev/null +++ b/codex-rs/debug-client/src/state.rs @@ -0,0 +1,28 @@ +use std::collections::HashMap; + +use codex_app_server_protocol::RequestId; + +#[derive(Debug, Default)] +pub struct State { + pub pending: HashMap, + pub thread_id: Option, + pub known_threads: Vec, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PendingRequest { + ThreadStart, + ThreadResume, + ThreadList, +} + +#[derive(Debug, Clone)] +pub enum ReaderEvent { + ThreadReady { + thread_id: String, + }, + ThreadList { + thread_ids: Vec, + next_cursor: Option, + }, +} From 3ac56f063da7214a7cc507a880d6c9a579e33811 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Thu, 8 Jan 2026 13:12:55 +0000 Subject: [PATCH 2/2] Fix the client --- codex-rs/debug-client/src/client.rs | 43 +++++++++++++++++--------- codex-rs/debug-client/src/reader.rs | 48 +++++++++++++++++++---------- codex-rs/debug-client/src/state.rs | 6 ++-- 3 files changed, 63 insertions(+), 34 deletions(-) diff --git a/codex-rs/debug-client/src/client.rs b/codex-rs/debug-client/src/client.rs index f18ce9f717a..344dd2020be 100644 --- a/codex-rs/debug-client/src/client.rs +++ b/codex-rs/debug-client/src/client.rs @@ -18,6 +18,8 @@ use codex_app_server_protocol::AskForApproval; use codex_app_server_protocol::ClientInfo; use codex_app_server_protocol::ClientNotification; use codex_app_server_protocol::ClientRequest; +use codex_app_server_protocol::CommandExecutionApprovalDecision; +use codex_app_server_protocol::FileChangeApprovalDecision; use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::JSONRPCRequest; use codex_app_server_protocol::JSONRPCResponse; @@ -39,7 +41,7 @@ use crate::state::State; pub struct AppServerClient { child: Child, - stdin: Arc>, + stdin: Arc>>, stdout: Option>, next_request_id: AtomicI64, state: Arc>, @@ -78,7 +80,7 @@ impl AppServerClient { Ok(Self { child, - stdin: Arc::new(Mutex::new(stdin)), + stdin: Arc::new(Mutex::new(Some(stdin))), stdout: Some(BufReader::new(stdout)), next_request_id: AtomicI64::new(1), state: Arc::new(Mutex::new(State::default())), @@ -141,7 +143,7 @@ impl AppServerClient { pub fn request_thread_start(&self, params: ThreadStartParams) -> Result { let request_id = self.next_request_id(); - self.track_pending(request_id.clone(), PendingRequest::ThreadStart); + self.track_pending(request_id.clone(), PendingRequest::Start); let request = ClientRequest::ThreadStart { request_id: request_id.clone(), params, @@ -152,7 +154,7 @@ impl AppServerClient { pub fn request_thread_resume(&self, params: ThreadResumeParams) -> Result { let request_id = self.next_request_id(); - self.track_pending(request_id.clone(), PendingRequest::ThreadResume); + self.track_pending(request_id.clone(), PendingRequest::Resume); let request = ClientRequest::ThreadResume { request_id: request_id.clone(), params, @@ -163,7 +165,7 @@ impl AppServerClient { pub fn request_thread_list(&self, cursor: Option) -> Result { let request_id = self.next_request_id(); - self.track_pending(request_id.clone(), PendingRequest::ThreadList); + self.track_pending(request_id.clone(), PendingRequest::List); let request = ClientRequest::ThreadList { request_id: request_id.clone(), params: ThreadListParams { @@ -229,7 +231,9 @@ impl AppServerClient { } pub fn shutdown(&mut self) { - drop(self.stdin.lock().ok()); + if let Ok(mut stdin) = self.stdin.lock() { + let _ = stdin.take(); + } let _ = self.child.wait(); } @@ -239,10 +243,10 @@ impl AppServerClient { } fn remember_thread_locked(&self, state: &mut State) { - if let Some(thread_id) = state.thread_id.as_ref() { - if !state.known_threads.iter().any(|id| id == thread_id) { - state.known_threads.push(thread_id.clone()); - } + if let Some(thread_id) = state.thread_id.as_ref() + && !state.known_threads.iter().any(|id| id == thread_id) + { + state.known_threads.push(thread_id.clone()); } } @@ -256,6 +260,9 @@ impl AppServerClient { let mut line = json; line.push('\n'); let mut stdin = self.stdin.lock().expect("stdin lock poisoned"); + let Some(stdin) = stdin.as_mut() else { + anyhow::bail!("stdin already closed"); + }; stdin.write_all(line.as_bytes()).context("write message")?; stdin.flush().context("flush message")?; Ok(()) @@ -301,7 +308,10 @@ impl AppServerClient { } } -fn handle_server_request(request: JSONRPCRequest, stdin: &Arc>) -> Result<()> { +fn handle_server_request( + request: JSONRPCRequest, + stdin: &Arc>>, +) -> Result<()> { let Ok(server_request) = codex_app_server_protocol::ServerRequest::try_from(request) else { return Ok(()); }; @@ -312,7 +322,7 @@ fn handle_server_request(request: JSONRPCRequest, stdin: &Arc> .. } => { let response = codex_app_server_protocol::CommandExecutionRequestApprovalResponse { - decision: codex_app_server_protocol::ApprovalDecision::Decline, + decision: CommandExecutionApprovalDecision::Decline, }; send_jsonrpc_response(stdin, request_id, response) } @@ -320,7 +330,7 @@ fn handle_server_request(request: JSONRPCRequest, stdin: &Arc> request_id, .. } => { let response = codex_app_server_protocol::FileChangeRequestApprovalResponse { - decision: codex_app_server_protocol::ApprovalDecision::Decline, + decision: FileChangeApprovalDecision::Decline, }; send_jsonrpc_response(stdin, request_id, response) } @@ -329,7 +339,7 @@ fn handle_server_request(request: JSONRPCRequest, stdin: &Arc> } fn send_jsonrpc_response( - stdin: &Arc>, + stdin: &Arc>>, request_id: RequestId, response: T, ) -> Result<()> { @@ -341,11 +351,14 @@ fn send_jsonrpc_response( send_with_stdin(stdin, &message) } -fn send_with_stdin(stdin: &Arc>, value: &T) -> Result<()> { +fn send_with_stdin(stdin: &Arc>>, value: &T) -> Result<()> { let json = serde_json::to_string(value).context("serialize message")?; let mut line = json; line.push('\n'); let mut stdin = stdin.lock().expect("stdin lock poisoned"); + let Some(stdin) = stdin.as_mut() else { + anyhow::bail!("stdin already closed"); + }; stdin.write_all(line.as_bytes()).context("write message")?; stdin.flush().context("flush message")?; Ok(()) diff --git a/codex-rs/debug-client/src/reader.rs b/codex-rs/debug-client/src/reader.rs index 7c027b7649f..92161638ffa 100644 --- a/codex-rs/debug-client/src/reader.rs +++ b/codex-rs/debug-client/src/reader.rs @@ -8,8 +8,9 @@ use std::thread; use std::thread::JoinHandle; use anyhow::Context; -use codex_app_server_protocol::ApprovalDecision; +use codex_app_server_protocol::CommandExecutionApprovalDecision; use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; +use codex_app_server_protocol::FileChangeApprovalDecision; use codex_app_server_protocol::FileChangeRequestApprovalResponse; use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::JSONRPCNotification; @@ -32,7 +33,7 @@ use crate::state::State; pub fn start_reader( mut stdout: BufReader, - stdin: Arc>, + stdin: Arc>>, state: Arc>, events: Sender, output: Output, @@ -40,10 +41,15 @@ pub fn start_reader( filtered_output: bool, ) -> JoinHandle<()> { thread::spawn(move || { - let decision = if auto_approve { - ApprovalDecision::Accept + let command_decision = if auto_approve { + CommandExecutionApprovalDecision::Accept } else { - ApprovalDecision::Decline + CommandExecutionApprovalDecision::Decline + }; + let file_decision = if auto_approve { + FileChangeApprovalDecision::Accept + } else { + FileChangeApprovalDecision::Decline }; let mut buffer = String::new(); @@ -70,7 +76,13 @@ pub fn start_reader( match message { JSONRPCMessage::Request(request) => { - if let Err(err) = handle_server_request(request, &decision, &stdin, &output) { + if let Err(err) = handle_server_request( + request, + &command_decision, + &file_decision, + &stdin, + &output, + ) { let _ = output.client_line(&format!("failed to handle server request: {err}")); } @@ -96,8 +108,9 @@ pub fn start_reader( fn handle_server_request( request: JSONRPCRequest, - decision: &ApprovalDecision, - stdin: &Arc>, + command_decision: &CommandExecutionApprovalDecision, + file_decision: &FileChangeApprovalDecision, + stdin: &Arc>>, output: &Output, ) -> anyhow::Result<()> { let server_request = match ServerRequest::try_from(request.clone()) { @@ -108,19 +121,19 @@ fn handle_server_request( match server_request { ServerRequest::CommandExecutionRequestApproval { request_id, params } => { let response = CommandExecutionRequestApprovalResponse { - decision: decision.clone(), + decision: command_decision.clone(), }; output.client_line(&format!( - "auto-response for command approval {request_id:?}: {decision:?} ({params:?})" + "auto-response for command approval {request_id:?}: {command_decision:?} ({params:?})" ))?; send_response(stdin, request_id, response) } ServerRequest::FileChangeRequestApproval { request_id, params } => { let response = FileChangeRequestApprovalResponse { - decision: decision.clone(), + decision: file_decision.clone(), }; output.client_line(&format!( - "auto-response for file change approval {request_id:?}: {decision:?} ({params:?})" + "auto-response for file change approval {request_id:?}: {file_decision:?} ({params:?})" ))?; send_response(stdin, request_id, response) } @@ -143,7 +156,7 @@ fn handle_response( }; match pending { - PendingRequest::ThreadStart => { + PendingRequest::Start => { let parsed = serde_json::from_value::(response.result) .context("decode thread/start response")?; let thread_id = parsed.thread.id; @@ -156,7 +169,7 @@ fn handle_response( } events.send(ReaderEvent::ThreadReady { thread_id }).ok(); } - PendingRequest::ThreadResume => { + PendingRequest::Resume => { let parsed = serde_json::from_value::(response.result) .context("decode thread/resume response")?; let thread_id = parsed.thread.id; @@ -169,7 +182,7 @@ fn handle_response( } events.send(ReaderEvent::ThreadReady { thread_id }).ok(); } - PendingRequest::ThreadList => { + PendingRequest::List => { let parsed = serde_json::from_value::(response.result) .context("decode thread/list response")?; let thread_ids: Vec = parsed.data.into_iter().map(|thread| thread.id).collect(); @@ -296,7 +309,7 @@ fn write_multiline( } fn send_response( - stdin: &Arc>, + stdin: &Arc>>, request_id: codex_app_server_protocol::RequestId, response: T, ) -> anyhow::Result<()> { @@ -310,6 +323,9 @@ fn send_response( line.push('\n'); let mut stdin = stdin.lock().expect("stdin lock poisoned"); + let Some(stdin) = stdin.as_mut() else { + anyhow::bail!("stdin already closed"); + }; stdin.write_all(line.as_bytes()).context("write response")?; stdin.flush().context("flush response")?; Ok(()) diff --git a/codex-rs/debug-client/src/state.rs b/codex-rs/debug-client/src/state.rs index 60efe496174..44e2c5f355b 100644 --- a/codex-rs/debug-client/src/state.rs +++ b/codex-rs/debug-client/src/state.rs @@ -11,9 +11,9 @@ pub struct State { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PendingRequest { - ThreadStart, - ThreadResume, - ThreadList, + Start, + Resume, + List, } #[derive(Debug, Clone)]