From ce9f76cc59ad5e4fb5023069cb955d47889d6c62 Mon Sep 17 00:00:00 2001 From: c0st1nus Date: Mon, 23 Mar 2026 21:48:32 +0500 Subject: [PATCH] Bridge RequestUserInput events to ACP permission prompts --- src/thread.rs | 389 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 388 insertions(+), 1 deletion(-) diff --git a/src/thread.rs b/src/thread.rs index 4b074cd..5f3f63f 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -57,6 +57,10 @@ use codex_protocol::{ request_permissions::{ PermissionGrantScope, RequestPermissionsEvent, RequestPermissionsResponse, }, + request_user_input::{ + RequestUserInputAnswer, RequestUserInputEvent, RequestUserInputQuestion, + RequestUserInputQuestionOption, RequestUserInputResponse, + }, user_input::UserInput, }; use codex_shell_command::parse_command::parse_command; @@ -343,6 +347,11 @@ enum PendingPermissionRequest { call_id: String, permissions: PermissionProfile, }, + UserInput { + turn_id: String, + question_id: String, + option_map: HashMap, + }, } struct PendingPermissionInteraction { @@ -362,6 +371,16 @@ fn permissions_request_key(call_id: &str) -> String { format!("permissions:{call_id}") } +fn user_input_request_key(turn_id: &str) -> String { + format!("user-input:{turn_id}") +} + +fn empty_request_user_input_response() -> RequestUserInputResponse { + RequestUserInputResponse { + answers: HashMap::new(), + } +} + enum SubmissionState { /// Loading custom prompts from the project CustomPrompts(CustomPromptsState), @@ -635,6 +654,38 @@ impl PromptState { .await .map_err(|e| Error::from(anyhow::anyhow!(e)))?; } + PendingPermissionRequest::UserInput { + turn_id, + question_id, + option_map, + } => { + let response = match response.outcome { + RequestPermissionOutcome::Selected(SelectedPermissionOutcome { + option_id, + .. + }) => option_map + .get(option_id.0.as_ref()) + .cloned() + .map(|answer| RequestUserInputResponse { + answers: HashMap::from([( + question_id, + RequestUserInputAnswer { + answers: vec![answer], + }, + )]), + }) + .unwrap_or_else(empty_request_user_input_response), + RequestPermissionOutcome::Cancelled | _ => empty_request_user_input_response(), + }; + + self.thread + .submit(Op::UserInputAnswer { + id: turn_id, + response, + }) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + } } Ok(()) @@ -1017,6 +1068,14 @@ impl PromptState { drop(response_tx.send(Err(err))); } } + EventMsg::RequestUserInput(event) => { + info!("Request user input: {} {}", event.call_id, event.turn_id); + if let Err(err) = self.request_user_input(client, event).await + && let Some(response_tx) = self.response_tx.take() + { + drop(response_tx.send(Err(err))); + } + } // Ignore these events EventMsg::ImageGenerationBegin(..) @@ -1058,7 +1117,6 @@ impl PromptState { // Used for returning a single history entry | EventMsg::GetHistoryEntryResponse(..) | EventMsg::DeprecationNotice(..) - | EventMsg::RequestUserInput(..) | EventMsg::ListRemoteSkillsResponse(..) | EventMsg::RemoteSkillDownloaded(..)) => { warn!("Unexpected event: {:?}", e); @@ -1849,6 +1907,103 @@ impl PromptState { Ok(()) } + + async fn request_user_input( + &mut self, + client: &SessionClient, + event: RequestUserInputEvent, + ) -> Result<(), Error> { + let raw_input = serde_json::json!(&event); + let RequestUserInputEvent { + call_id, + turn_id, + questions, + } = event; + + let response_turn_id = if turn_id.is_empty() { + self.submission_id.clone() + } else { + turn_id + }; + + if questions.len() != 1 { + warn!( + "Unsupported request_user_input question count: {}; responding with cancellation", + questions.len() + ); + self.thread + .submit(Op::UserInputAnswer { + id: response_turn_id, + response: empty_request_user_input_response(), + }) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + return Ok(()); + } + + let Some(question) = questions.into_iter().next() else { + self.thread + .submit(Op::UserInputAnswer { + id: response_turn_id, + response: empty_request_user_input_response(), + }) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + return Ok(()); + }; + let permission_options = build_request_user_input_permission_options(&question); + + if permission_options.is_empty() { + warn!( + "Unsupported request_user_input shape for question {:?}; responding with cancellation", + question.id + ); + self.thread + .submit(Op::UserInputAnswer { + id: response_turn_id, + response: empty_request_user_input_response(), + }) + .await + .map_err(|e| Error::from(anyhow::anyhow!(e)))?; + return Ok(()); + } + + let content = Some(vec![request_user_input_tool_call_content(&question).into()]); + let request_key = user_input_request_key(&response_turn_id); + let tool_call_id = if call_id.is_empty() { + generate_fallback_id("request_user_input") + } else { + call_id + }; + + self.spawn_permission_request( + client, + request_key, + PendingPermissionRequest::UserInput { + turn_id: response_turn_id, + question_id: question.id.clone(), + option_map: permission_options + .iter() + .map(|option| (option.option_id.clone(), option.answer.clone())) + .collect(), + }, + ToolCallUpdate::new( + tool_call_id, + ToolCallUpdateFields::new() + .kind(ToolKind::Other) + .status(ToolCallStatus::Pending) + .title(request_user_input_title(&question)) + .raw_input(raw_input) + .content(content), + ), + permission_options + .into_iter() + .map(|option| option.permission_option) + .collect(), + ); + + Ok(()) + } } #[derive(Clone)] @@ -1858,6 +2013,86 @@ struct ExecPermissionOption { decision: ReviewDecision, } +struct UserInputPermissionOption { + option_id: String, + permission_option: PermissionOption, + answer: String, +} + +fn request_user_input_title(question: &RequestUserInputQuestion) -> String { + let header = question.header.trim(); + if header.is_empty() { + "Request input".to_string() + } else { + header.to_string() + } +} + +fn request_user_input_tool_call_content(question: &RequestUserInputQuestion) -> String { + let mut lines = vec![question.question.clone()]; + + if let Some(options) = question.options.as_ref() { + for option in options { + let description = option.description.trim(); + if description.is_empty() { + lines.push(option.label.clone()); + } else { + lines.push(format!("{}: {}", option.label, description)); + } + } + } + + lines.join("\n") +} + +fn build_request_user_input_permission_options( + question: &RequestUserInputQuestion, +) -> Vec { + question + .options + .as_ref() + .into_iter() + .flatten() + .enumerate() + .map(|(idx, option)| { + let option_id = format!("{}:{idx}", question.id); + UserInputPermissionOption { + permission_option: PermissionOption::new( + option_id.clone(), + option.label.clone(), + request_user_input_option_kind(option), + ), + option_id, + answer: option.label.clone(), + } + }) + .collect() +} + +fn request_user_input_option_kind(option: &RequestUserInputQuestionOption) -> PermissionOptionKind { + let label = option.label.to_ascii_lowercase(); + + if label.contains("remember") + || label.contains("always") + || label.contains("don't ask") + || label.contains("dont ask") + || label.contains("for this session") + { + PermissionOptionKind::AllowAlways + } else if label.contains("cancel") + || label.contains("reject") + || label.contains("deny") + || label.contains("decline") + || label == "no" + || label.starts_with("no,") + || label.starts_with("no ") + { + PermissionOptionKind::RejectOnce + } else { + PermissionOptionKind::AllowOnce + } +} + fn build_exec_permission_options( available_decisions: &[ReviewDecision], network_approval_context: Option<&NetworkApprovalContext>, @@ -4307,6 +4542,7 @@ mod tests { Op::ExecApproval { .. } | Op::ResolveElicitation { .. } | Op::RequestPermissionsResponse { .. } + | Op::UserInputAnswer { .. } | Op::PatchApproval { .. } | Op::Interrupt => {} Op::Shutdown => { @@ -4569,6 +4805,157 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_request_user_input_uses_permission_request() -> anyhow::Result<()> { + LocalSet::new() + .run_until(async { + let session_id = SessionId::new("test"); + let client = Arc::new(StubClient::with_permission_responses(vec![ + RequestPermissionResponse::new(RequestPermissionOutcome::Selected( + SelectedPermissionOutcome::new("approval:1"), + )), + ])); + let session_client = + SessionClient::with_client(session_id, client.clone(), Arc::default()); + let thread = Arc::new(StubCodexThread::new()); + let (response_tx, _response_rx) = tokio::sync::oneshot::channel(); + let (message_tx, mut message_rx) = tokio::sync::mpsc::unbounded_channel(); + let mut prompt_state = PromptState::new( + "submission-id".to_string(), + thread.clone(), + message_tx, + response_tx, + ); + + prompt_state + .request_user_input( + &session_client, + RequestUserInputEvent { + call_id: "call-id".to_string(), + turn_id: "turn-id".to_string(), + questions: vec![RequestUserInputQuestion { + id: "approval".to_string(), + header: "Approve app tool call?".to_string(), + question: "Allow this action?".to_string(), + is_other: false, + is_secret: false, + options: Some(vec![ + RequestUserInputQuestionOption { + label: "Allow".to_string(), + description: "Run the tool and continue.".to_string(), + }, + RequestUserInputQuestionOption { + label: "Cancel".to_string(), + description: "Cancel this tool call.".to_string(), + }, + ]), + }], + }, + ) + .await?; + + let ThreadMessage::PermissionRequestResolved { + submission_id, + request_key, + response, + } = message_rx.recv().await.unwrap() + else { + panic!("expected permission resolution message"); + }; + assert_eq!(submission_id, "submission-id"); + prompt_state + .handle_permission_request_resolved(&session_client, request_key, response) + .await?; + + let requests = client.permission_requests.lock().unwrap(); + let request = requests.last().unwrap(); + assert_eq!(request.tool_call.tool_call_id.0.as_ref(), "call-id"); + assert_eq!( + request.options[0].name, "Allow", + "first option should mirror the question label" + ); + assert_eq!( + request.options[1].name, "Cancel", + "second option should mirror the question label" + ); + + let ops = thread.ops.lock().unwrap(); + let Some(Op::UserInputAnswer { id, response }) = ops.last() else { + panic!("expected user input answer op"); + }; + assert_eq!(id, "turn-id"); + assert_eq!( + response + .answers + .get("approval") + .map(|answer| answer.answers.clone()), + Some(vec!["Cancel".to_string()]) + ); + + anyhow::Ok(()) + }) + .await?; + + Ok(()) + } + + #[tokio::test] + async fn test_request_user_input_without_options_cancels_without_permission_prompt() + -> anyhow::Result<()> { + LocalSet::new() + .run_until(async { + let session_id = SessionId::new("test"); + let client = Arc::new(StubClient::new()); + let session_client = + SessionClient::with_client(session_id, client.clone(), Arc::default()); + let thread = Arc::new(StubCodexThread::new()); + let (response_tx, _response_rx) = tokio::sync::oneshot::channel(); + let (message_tx, _message_rx) = tokio::sync::mpsc::unbounded_channel(); + let mut prompt_state = PromptState::new( + "submission-id".to_string(), + thread.clone(), + message_tx, + response_tx, + ); + + prompt_state + .request_user_input( + &session_client, + RequestUserInputEvent { + call_id: "call-id".to_string(), + turn_id: "turn-id".to_string(), + questions: vec![RequestUserInputQuestion { + id: "approval".to_string(), + header: "Approve app tool call?".to_string(), + question: "Allow this action?".to_string(), + is_other: false, + is_secret: false, + options: None, + }], + }, + ) + .await?; + + let requests = client.permission_requests.lock().unwrap(); + assert!( + requests.is_empty(), + "unsupported user input should not trigger an ACP permission prompt" + ); + + let ops = thread.ops.lock().unwrap(); + let Some(Op::UserInputAnswer { id, response }) = ops.last() else { + panic!("expected user input answer op"); + }; + assert_eq!(id, "turn-id"); + assert!(response.answers.is_empty()); + + anyhow::Ok(()) + }) + .await?; + + Ok(()) + } + #[tokio::test] async fn test_mcp_elicitation_declines_unsupported_form_requests() -> anyhow::Result<()> { LocalSet::new()