diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index 39347714096..458d04335f6 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -70,6 +70,10 @@ impl ResponsesRequest { self.0.body_json().unwrap() } + pub fn body_bytes(&self) -> Vec { + self.0.body.clone() + } + /// Returns all `input_text` spans from `message` inputs for the provided role. pub fn message_input_texts(&self, role: &str) -> Vec { self.inputs_of_type("message") @@ -701,33 +705,6 @@ pub async fn start_mock_server() -> MockServer { server } -// todo(aibrahim): remove this and use our search matching patterns directly -/// Get all POST requests to `/responses` endpoints from the mock server. -/// Filters out GET requests (e.g., `/models`) . -pub async fn get_responses_requests(server: &MockServer) -> Vec { - server - .received_requests() - .await - .expect("mock server should not fail") - .into_iter() - .filter(|req| req.method == "POST" && req.url.path().ends_with("/responses")) - .collect() -} - -// todo(aibrahim): remove this and use our search matching patterns directly -/// Get request bodies as JSON values from POST requests to `/responses` endpoints. -/// Filters out GET requests (e.g., `/models`) . -pub async fn get_responses_request_bodies(server: &MockServer) -> Vec { - get_responses_requests(server) - .await - .into_iter() - .map(|req| { - req.body_json::() - .expect("request body to be valid JSON") - }) - .collect() -} - #[derive(Clone)] pub struct FunctionCallResponseMocks { pub function_call: ResponseMock, diff --git a/codex-rs/core/tests/common/test_codex.rs b/codex-rs/core/tests/common/test_codex.rs index 24d5dd8bc7a..d0b37f2d615 100644 --- a/codex-rs/core/tests/common/test_codex.rs +++ b/codex-rs/core/tests/common/test_codex.rs @@ -23,10 +23,11 @@ use tempfile::TempDir; use wiremock::MockServer; use crate::load_default_config_for_test; -use crate::responses::get_responses_request_bodies; use crate::responses::start_mock_server; use crate::streaming_sse::StreamingSseServer; use crate::wait_for_event; +use wiremock::Match; +use wiremock::matchers::path_regex; type ConfigMutator = dyn FnOnce(&mut Config) + Send; type PreBuildHook = dyn FnOnce(&Path) + Send + 'static; @@ -322,7 +323,18 @@ impl TestCodexHarness { } pub async fn request_bodies(&self) -> Vec { - get_responses_request_bodies(&self.server).await + let path_matcher = path_regex(".*/responses$"); + self.server + .received_requests() + .await + .expect("mock server should not fail") + .into_iter() + .filter(|req| path_matcher.matches(req)) + .map(|req| { + req.body_json::() + .expect("request body to be valid JSON") + }) + .collect() } pub async fn function_call_output_value(&self, call_id: &str) -> Value { diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index 2c08083ba5a..e37583161d6 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -23,6 +23,7 @@ use codex_otel::otel_manager::OtelManager; use codex_protocol::ThreadId; use codex_protocol::config_types::ReasoningSummary; use codex_protocol::config_types::Verbosity; +use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::ReasoningItemContent; use codex_protocol::models::ReasoningItemReasoningSummary; use codex_protocol::models::WebSearchAction; @@ -31,9 +32,9 @@ use codex_protocol::user_input::UserInput; use core_test_support::load_default_config_for_test; use core_test_support::load_sse_fixture_with_id; use core_test_support::responses::ev_completed_with_tokens; -use core_test_support::responses::get_responses_requests; use core_test_support::responses::mount_sse_once; use core_test_support::responses::mount_sse_once_match; +use core_test_support::responses::mount_sse_sequence; use core_test_support::responses::sse; use core_test_support::responses::sse_failed; use core_test_support::skip_if_no_network; @@ -324,17 +325,7 @@ async fn includes_conversation_id_and_model_headers_in_request() { // Mock server let server = MockServer::start().await; - // First request – must NOT include `previous_response_id`. - let first = ResponseTemplate::new(200) - .insert_header("content-type", "text/event-stream") - .set_body_raw(sse_completed("resp1"), "text/event-stream"); - - Mock::given(method("POST")) - .and(path("/v1/responses")) - .respond_with(first) - .expect(1) - .mount(&server) - .await; + let resp_mock = mount_sse_once(&server, sse_completed("resp1")).await; let model_provider = ModelProviderInfo { base_url: Some(format!("{}/v1", server.uri())), @@ -373,24 +364,19 @@ async fn includes_conversation_id_and_model_headers_in_request() { wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; - // get request from the server - let requests = get_responses_requests(&server).await; - let request = requests - .first() - .expect("expected POST request to /responses"); - let request_conversation_id = request.headers.get("conversation_id").unwrap(); - let request_authorization = request.headers.get("authorization").unwrap(); - let request_originator = request.headers.get("originator").unwrap(); - - assert_eq!( - request_conversation_id.to_str().unwrap(), - conversation_id.to_string() - ); - assert_eq!(request_originator.to_str().unwrap(), "codex_cli_rs"); - assert_eq!( - request_authorization.to_str().unwrap(), - "Bearer Test API Key" - ); + let request = resp_mock.single_request(); + assert_eq!(request.path(), "/v1/responses"); + let request_conversation_id = request + .header("conversation_id") + .expect("conversation_id header"); + let request_authorization = request + .header("authorization") + .expect("authorization header"); + let request_originator = request.header("originator").expect("originator header"); + + assert_eq!(request_conversation_id, conversation_id.to_string()); + assert_eq!(request_originator, "codex_cli_rs"); + assert_eq!(request_authorization, "Bearer Test API Key"); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -451,17 +437,7 @@ async fn chatgpt_auth_sends_correct_request() { // Mock server let server = MockServer::start().await; - // First request – must NOT include `previous_response_id`. - let first = ResponseTemplate::new(200) - .insert_header("content-type", "text/event-stream") - .set_body_raw(sse_completed("resp1"), "text/event-stream"); - - Mock::given(method("POST")) - .and(path("/api/codex/responses")) - .respond_with(first) - .expect(1) - .mount(&server) - .await; + let resp_mock = mount_sse_once(&server, sse_completed("resp1")).await; let model_provider = ModelProviderInfo { base_url: Some(format!("{}/api/codex", server.uri())), @@ -499,27 +475,24 @@ async fn chatgpt_auth_sends_correct_request() { wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; - // get request from the server - let requests = get_responses_requests(&server).await; - let request = requests - .first() - .expect("expected POST request to /responses"); - let request_conversation_id = request.headers.get("conversation_id").unwrap(); - let request_authorization = request.headers.get("authorization").unwrap(); - let request_originator = request.headers.get("originator").unwrap(); - let request_chatgpt_account_id = request.headers.get("chatgpt-account-id").unwrap(); - let request_body = request.body_json::().unwrap(); + let request = resp_mock.single_request(); + assert_eq!(request.path(), "/api/codex/responses"); + let request_conversation_id = request + .header("conversation_id") + .expect("conversation_id header"); + let request_authorization = request + .header("authorization") + .expect("authorization header"); + let request_originator = request.header("originator").expect("originator header"); + let request_chatgpt_account_id = request + .header("chatgpt-account-id") + .expect("chatgpt-account-id header"); + let request_body = request.body_json(); - assert_eq!( - request_conversation_id.to_str().unwrap(), - conversation_id.to_string() - ); - assert_eq!(request_originator.to_str().unwrap(), "codex_cli_rs"); - assert_eq!( - request_authorization.to_str().unwrap(), - "Bearer Access Token" - ); - assert_eq!(request_chatgpt_account_id.to_str().unwrap(), "account_id"); + assert_eq!(request_conversation_id, conversation_id.to_string()); + assert_eq!(request_originator, "codex_cli_rs"); + assert_eq!(request_authorization, "Bearer Access Token"); + assert_eq!(request_chatgpt_account_id, "account_id"); assert!(request_body["stream"].as_bool().unwrap()); assert_eq!( request_body["include"][0].as_str().unwrap(), @@ -1107,17 +1080,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { "data: {\"type\":\"response.created\",\"response\":{}}\n\n", "data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp_1\"}}\n\n", ); - - let template = ResponseTemplate::new(200) - .insert_header("content-type", "text/event-stream") - .set_body_raw(sse_body, "text/event-stream"); - - Mock::given(method("POST")) - .and(path("/openai/responses")) - .respond_with(template) - .expect(1) - .mount(&server) - .await; + let resp_mock = mount_sse_once(&server, sse_body.to_string()).await; let provider = ModelProviderInfo { name: "azure".into(), @@ -1202,6 +1165,13 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { arguments: "{}".into(), call_id: "function-call-id".into(), }); + prompt.input.push(ResponseItem::FunctionCallOutput { + call_id: "function-call-id".into(), + output: FunctionCallOutputPayload { + content: "ok".into(), + ..Default::default() + }, + }); prompt.input.push(ResponseItem::LocalShellCall { id: Some("local-shell-id".into()), call_id: Some("local-shell-call-id".into()), @@ -1221,6 +1191,10 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { name: "custom_tool".into(), input: "{}".into(), }); + prompt.input.push(ResponseItem::CustomToolCallOutput { + call_id: "custom-tool-call-id".into(), + output: "ok".into(), + }); let mut stream = client .stream(&prompt) @@ -1233,21 +1207,27 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { } } - let requests = get_responses_requests(&server).await; - assert_eq!(requests.len(), 1, "expected a single POST request"); - let body: serde_json::Value = requests[0] - .body_json() - .expect("request body to be valid JSON"); + let request = resp_mock.single_request(); + assert_eq!(request.path(), "/openai/responses"); + let body = request.body_json(); assert_eq!(body["store"], serde_json::Value::Bool(true)); assert_eq!(body["stream"], serde_json::Value::Bool(true)); - assert_eq!(body["input"].as_array().map(Vec::len), Some(6)); + assert_eq!(body["input"].as_array().map(Vec::len), Some(8)); assert_eq!(body["input"][0]["id"].as_str(), Some("reasoning-id")); assert_eq!(body["input"][1]["id"].as_str(), Some("message-id")); assert_eq!(body["input"][2]["id"].as_str(), Some("web-search-id")); assert_eq!(body["input"][3]["id"].as_str(), Some("function-id")); - assert_eq!(body["input"][4]["id"].as_str(), Some("local-shell-id")); - assert_eq!(body["input"][5]["id"].as_str(), Some("custom-tool-id")); + assert_eq!( + body["input"][4]["call_id"].as_str(), + Some("function-call-id") + ); + assert_eq!(body["input"][5]["id"].as_str(), Some("local-shell-id")); + assert_eq!(body["input"][6]["id"].as_str(), Some("custom-tool-id")); + assert_eq!( + body["input"][7]["call_id"].as_str(), + Some("custom-tool-call-id") + ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -1784,16 +1764,7 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() { ]"##; let sse1 = core_test_support::load_sse_fixture_with_id_from_str(sse_raw, "resp1"); - Mock::given(method("POST")) - .and(path("/v1/responses")) - .respond_with( - ResponseTemplate::new(200) - .insert_header("content-type", "text/event-stream") - .set_body_raw(sse1.clone(), "text/event-stream"), - ) - .expect(3) // respond identically to the three sequential turns - .mount(&server) - .await; + let request_log = mount_sse_sequence(&server, vec![sse1.clone(), sse1.clone(), sse1]).await; // Configure provider to point to mock server (Responses API) and use API key auth. let model_provider = ModelProviderInfo { @@ -1847,8 +1818,11 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() { wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; // Inspect the three captured requests. - let requests = get_responses_requests(&server).await; + let requests = request_log.requests(); assert_eq!(requests.len(), 3, "expected 3 requests (one per turn)"); + for request in &requests { + assert_eq!(request.path(), "/v1/responses"); + } // Replace full-array compare with tail-only raw JSON compare using a single hard-coded value. let r3_tail_expected = json!([ @@ -1880,8 +1854,7 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() { ]); let r3_input_array = requests[2] - .body_json::() - .unwrap() + .body_json() .get("input") .and_then(|v| v.as_array()) .cloned() diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index e0845ef72b4..9e678a4c3b6 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -31,7 +31,6 @@ use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_completed_with_tokens; use core_test_support::responses::ev_function_call; -use core_test_support::responses::get_responses_requests; use core_test_support::responses::mount_compact_json_once; use core_test_support::responses::mount_sse_once; use core_test_support::responses::mount_sse_once_match; @@ -330,7 +329,7 @@ async fn manual_compact_uses_custom_prompt() { let server = start_mock_server().await; let sse_stream = sse(vec![ev_completed("r1")]); - mount_sse_once(&server, sse_stream).await; + let response_mock = mount_sse_once(&server, sse_stream).await; let custom_prompt = "Use this compact prompt instead"; @@ -358,11 +357,7 @@ async fn manual_compact_uses_custom_prompt() { assert_eq!(message, COMPACT_WARNING_MESSAGE); wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; - let requests = get_responses_requests(&server).await; - let body = requests - .iter() - .find_map(|req| req.body_json::().ok()) - .expect("summary request body"); + let body = response_mock.single_request().body_json(); let input = body .get("input") @@ -571,7 +566,7 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() { model_compact_response_3_sse, model_final_response_sse, ]; - mount_sse_sequence(&server, bodies).await; + let request_log = mount_sse_sequence(&server, bodies).await; // Start the conversation with the user message codex @@ -586,11 +581,8 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() { wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; // collect the requests payloads from the model - let requests_payloads = get_responses_requests(&server).await; - - let body = requests_payloads[0] - .body_json::() - .unwrap(); + let requests_payloads = request_log.requests(); + let body = requests_payloads[0].body_json(); let input = body.get("input").and_then(|v| v.as_array()).unwrap(); fn normalize_inputs(values: &[serde_json::Value]) -> Vec { @@ -631,9 +623,7 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() { prefixed_third_summary.as_str(), ]; for (i, expected_summary) in compaction_indices.into_iter().zip(expected_summaries) { - let body = requests_payloads.clone()[i] - .body_json::() - .unwrap(); + let body = requests_payloads.clone()[i].body_json(); let input = body.get("input").and_then(|v| v.as_array()).unwrap(); let input = normalize_inputs(input); assert_eq!(input.len(), 3); @@ -996,7 +986,7 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() { ]); for (i, request) in requests_payloads.iter().enumerate() { - let body = request.body_json::().unwrap(); + let body = request.body_json(); let input = body.get("input").and_then(|v| v.as_array()).unwrap(); let expected_input = expected_requests_inputs[i].as_array().unwrap(); assert_eq!(normalize_inputs(input), normalize_inputs(expected_input)); @@ -1034,33 +1024,7 @@ async fn auto_compact_runs_after_token_limit_hit() { ]); let prefixed_auto_summary = AUTO_SUMMARY_TEXT; - let first_matcher = |req: &wiremock::Request| { - let body = std::str::from_utf8(&req.body).unwrap_or(""); - body.contains(FIRST_AUTO_MSG) - && !body.contains(SECOND_AUTO_MSG) - && !body_contains_text(body, SUMMARIZATION_PROMPT) - }; - mount_sse_once_match(&server, first_matcher, sse1).await; - - let second_matcher = |req: &wiremock::Request| { - let body = std::str::from_utf8(&req.body).unwrap_or(""); - body.contains(SECOND_AUTO_MSG) - && body.contains(FIRST_AUTO_MSG) - && !body_contains_text(body, SUMMARIZATION_PROMPT) - }; - mount_sse_once_match(&server, second_matcher, sse2).await; - - let third_matcher = |req: &wiremock::Request| { - let body = std::str::from_utf8(&req.body).unwrap_or(""); - body_contains_text(body, SUMMARIZATION_PROMPT) - }; - mount_sse_once_match(&server, third_matcher, sse3).await; - - let fourth_matcher = |req: &wiremock::Request| { - let body = std::str::from_utf8(&req.body).unwrap_or(""); - body.contains(POST_AUTO_USER_MSG) && !body_contains_text(body, SUMMARIZATION_PROMPT) - }; - mount_sse_once_match(&server, fourth_matcher, sse4).await; + let request_log = mount_sse_sequence(&server, vec![sse1, sse2, sse3, sse4]).await; let model_provider = non_openai_model_provider(&server); @@ -1111,53 +1075,49 @@ async fn auto_compact_runs_after_token_limit_hit() { wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; - let requests = get_responses_requests(&server).await; + let requests = request_log.requests(); + let request_bodies: Vec = requests + .iter() + .map(|request| request.body_json().to_string()) + .collect(); assert_eq!( - requests.len(), + request_bodies.len(), 4, "expected user turns, a compaction request, and the follow-up turn; got {}", - requests.len() + request_bodies.len() ); - let is_auto_compact = |req: &wiremock::Request| { - body_contains_text( - std::str::from_utf8(&req.body).unwrap_or(""), - SUMMARIZATION_PROMPT, - ) - }; - let auto_compact_count = requests.iter().filter(|req| is_auto_compact(req)).count(); + let auto_compact_count = request_bodies + .iter() + .filter(|body| body_contains_text(body, SUMMARIZATION_PROMPT)) + .count(); assert_eq!( auto_compact_count, 1, "expected exactly one auto compact request" ); - let auto_compact_index = requests + let auto_compact_index = request_bodies .iter() .enumerate() - .find_map(|(idx, req)| is_auto_compact(req).then_some(idx)) + .find_map(|(idx, body)| body_contains_text(body, SUMMARIZATION_PROMPT).then_some(idx)) .expect("auto compact request missing"); assert_eq!( auto_compact_index, 2, "auto compact should add a third request" ); - let follow_up_index = requests + let follow_up_index = request_bodies .iter() .enumerate() .rev() - .find_map(|(idx, req)| { - let body = std::str::from_utf8(&req.body).unwrap_or(""); + .find_map(|(idx, body)| { (body.contains(POST_AUTO_USER_MSG) && !body_contains_text(body, SUMMARIZATION_PROMPT)) .then_some(idx) }) .expect("follow-up request missing"); assert_eq!(follow_up_index, 3, "follow-up request should be last"); - let body_first = requests[0].body_json::().unwrap(); - let body_auto = requests[auto_compact_index] - .body_json::() - .unwrap(); - let body_follow_up = requests[follow_up_index] - .body_json::() - .unwrap(); + let body_first = requests[0].body_json(); + let body_auto = requests[auto_compact_index].body_json(); + let body_follow_up = requests[follow_up_index].body_json(); let instructions = body_auto .get("instructions") .and_then(|v| v.as_str()) @@ -1848,7 +1808,7 @@ async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_ let follow_up_user = "FOLLOW_UP_AUTO_COMPACT"; let final_user = "FINAL_AUTO_COMPACT"; - mount_sse_sequence(&server, vec![sse1, sse2, sse3, sse4, sse5, sse6]).await; + let request_log = mount_sse_sequence(&server, vec![sse1, sse2, sse3, sse4, sse5, sse6]).await; let model_provider = non_openai_model_provider(&server); @@ -1897,10 +1857,10 @@ async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_ "auto compact should not emit task lifecycle events" ); - let requests = get_responses_requests(&server).await; - let request_bodies: Vec = requests + let request_bodies: Vec = request_log + .requests() .into_iter() - .map(|request| String::from_utf8(request.body).unwrap_or_default()) + .map(|request| request.body_json().to_string()) .collect(); assert_eq!( request_bodies.len(), diff --git a/codex-rs/core/tests/suite/compact_resume_fork.rs b/codex-rs/core/tests/suite/compact_resume_fork.rs index 4ad829f07ab..51b3eb94246 100644 --- a/codex-rs/core/tests/suite/compact_resume_fork.rs +++ b/codex-rs/core/tests/suite/compact_resume_fork.rs @@ -24,9 +24,9 @@ use codex_core::protocol::WarningEvent; use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; use codex_protocol::user_input::UserInput; use core_test_support::load_default_config_for_test; +use core_test_support::responses::ResponseMock; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; -use core_test_support::responses::get_responses_request_bodies; use core_test_support::responses::mount_sse_once_match; use core_test_support::responses::sse; use core_test_support::wait_for_event; @@ -148,7 +148,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() { // 1. Arrange mocked SSE responses for the initial compact/resume/fork flow. let server = MockServer::start().await; - mount_initial_flow(&server).await; + let request_log = mount_initial_flow(&server).await; let expected_model = "gpt-5.1-codex"; // 2. Start a new conversation and drive it through the compact/resume/fork steps. let (_home, config, manager, base) = @@ -175,7 +175,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() { user_turn(&forked, "AFTER_FORK").await; // 3. Capture the requests to the model and validate the history slices. - let mut requests = gather_request_bodies(&server).await; + let mut requests = gather_request_bodies(&request_log); normalize_compact_prompts(&mut requests); // input after compact is a prefix of input after resume/fork @@ -600,8 +600,8 @@ async fn compact_resume_after_second_compaction_preserves_history() { // 1. Arrange mocked SSE responses for the initial flow plus the second compact. let server = MockServer::start().await; - mount_initial_flow(&server).await; - mount_second_compact_flow(&server).await; + let mut request_log = mount_initial_flow(&server).await; + request_log.extend(mount_second_compact_flow(&server).await); // 2. Drive the conversation through compact -> resume -> fork -> compact -> resume. let (_home, config, manager, base) = start_test_conversation(&server, None).await; @@ -637,7 +637,7 @@ async fn compact_resume_after_second_compaction_preserves_history() { let resumed_again = resume_conversation(&manager, &config, forked_path).await; user_turn(&resumed_again, AFTER_SECOND_RESUME).await; - let mut requests = gather_request_bodies(&server).await; + let mut requests = gather_request_bodies(&request_log); normalize_compact_prompts(&mut requests); let input_after_compact = json!(requests[requests.len() - 2]["input"]); let input_after_resume = json!(requests[requests.len() - 1]["input"]); @@ -771,15 +771,19 @@ fn normalize_line_endings(value: &mut Value) { } } -async fn gather_request_bodies(server: &MockServer) -> Vec { - let mut bodies = get_responses_request_bodies(server).await; +fn gather_request_bodies(request_log: &[ResponseMock]) -> Vec { + let mut bodies = request_log + .iter() + .flat_map(ResponseMock::requests) + .map(|request| request.body_json()) + .collect::>(); for body in &mut bodies { normalize_line_endings(body); } bodies } -async fn mount_initial_flow(server: &MockServer) { +async fn mount_initial_flow(server: &MockServer) -> Vec { let sse1 = sse(vec![ ev_assistant_message("m1", FIRST_REPLY), ev_completed("r1"), @@ -803,13 +807,13 @@ async fn mount_initial_flow(server: &MockServer) { && !body.contains("\"text\":\"AFTER_RESUME\"") && !body.contains("\"text\":\"AFTER_FORK\"") }; - mount_sse_once_match(server, match_first, sse1).await; + let first = mount_sse_once_match(server, match_first, sse1).await; let match_compact = |req: &wiremock::Request| { let body = std::str::from_utf8(&req.body).unwrap_or(""); body_contains_text(body, SUMMARIZATION_PROMPT) || body.contains(&json_fragment(FIRST_REPLY)) }; - mount_sse_once_match(server, match_compact, sse2).await; + let compact = mount_sse_once_match(server, match_compact, sse2).await; let match_after_compact = |req: &wiremock::Request| { let body = std::str::from_utf8(&req.body).unwrap_or(""); @@ -817,22 +821,24 @@ async fn mount_initial_flow(server: &MockServer) { && !body.contains("\"text\":\"AFTER_RESUME\"") && !body.contains("\"text\":\"AFTER_FORK\"") }; - mount_sse_once_match(server, match_after_compact, sse3).await; + let after_compact = mount_sse_once_match(server, match_after_compact, sse3).await; let match_after_resume = |req: &wiremock::Request| { let body = std::str::from_utf8(&req.body).unwrap_or(""); body.contains("\"text\":\"AFTER_RESUME\"") }; - mount_sse_once_match(server, match_after_resume, sse4).await; + let after_resume = mount_sse_once_match(server, match_after_resume, sse4).await; let match_after_fork = |req: &wiremock::Request| { let body = std::str::from_utf8(&req.body).unwrap_or(""); body.contains("\"text\":\"AFTER_FORK\"") }; - mount_sse_once_match(server, match_after_fork, sse5).await; + let after_fork = mount_sse_once_match(server, match_after_fork, sse5).await; + + vec![first, compact, after_compact, after_resume, after_fork] } -async fn mount_second_compact_flow(server: &MockServer) { +async fn mount_second_compact_flow(server: &MockServer) -> Vec { let sse6 = sse(vec![ ev_assistant_message("m4", SUMMARY_TEXT), ev_completed("r6"), @@ -843,13 +849,15 @@ async fn mount_second_compact_flow(server: &MockServer) { let body = std::str::from_utf8(&req.body).unwrap_or(""); body.contains("AFTER_FORK") }; - mount_sse_once_match(server, match_second_compact, sse6).await; + let second_compact = mount_sse_once_match(server, match_second_compact, sse6).await; let match_after_second_resume = |req: &wiremock::Request| { let body = std::str::from_utf8(&req.body).unwrap_or(""); body.contains(&format!("\"text\":\"{AFTER_SECOND_RESUME}\"")) }; - mount_sse_once_match(server, match_after_second_resume, sse7).await; + let after_second_resume = mount_sse_once_match(server, match_after_second_resume, sse7).await; + + vec![second_compact, after_second_resume] } async fn start_test_conversation( diff --git a/codex-rs/core/tests/suite/request_compression.rs b/codex-rs/core/tests/suite/request_compression.rs index 271f67e1aff..36f7c7dd86d 100644 --- a/codex-rs/core/tests/suite/request_compression.rs +++ b/codex-rs/core/tests/suite/request_compression.rs @@ -7,7 +7,6 @@ use codex_core::protocol::Op; use codex_protocol::user_input::UserInput; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_response_created; -use core_test_support::responses::get_responses_requests; use core_test_support::responses::mount_sse_once; use core_test_support::responses::sse; use core_test_support::responses::start_mock_server; @@ -21,7 +20,7 @@ async fn request_body_is_zstd_compressed_for_codex_backend_when_enabled() -> any skip_if_no_network!(Ok(())); let server = start_mock_server().await; - mount_sse_once( + let request_log = mount_sse_once( &server, sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]), ) @@ -48,17 +47,10 @@ async fn request_body_is_zstd_compressed_for_codex_backend_when_enabled() -> any // Wait until the task completes so the request definitely hit the server. wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; - let requests = get_responses_requests(&server).await; - assert_eq!(requests.len(), 1); + let request = request_log.single_request(); + assert_eq!(request.header("content-encoding").as_deref(), Some("zstd")); - let request = &requests[0]; - let content_encoding = request - .headers - .get("content-encoding") - .and_then(|v| v.to_str().ok()); - assert_eq!(content_encoding, Some("zstd")); - - let decompressed = zstd::stream::decode_all(std::io::Cursor::new(request.body.clone()))?; + let decompressed = zstd::stream::decode_all(std::io::Cursor::new(request.body_bytes()))?; let json: serde_json::Value = serde_json::from_slice(&decompressed)?; assert!( json.get("input").is_some(), @@ -73,7 +65,7 @@ async fn request_body_is_not_compressed_for_api_key_auth_even_when_enabled() -> skip_if_no_network!(Ok(())); let server = start_mock_server().await; - mount_sse_once( + let request_log = mount_sse_once( &server, sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]), ) @@ -97,16 +89,13 @@ async fn request_body_is_not_compressed_for_api_key_auth_even_when_enabled() -> wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; - let requests = get_responses_requests(&server).await; - assert_eq!(requests.len(), 1); - - let request = &requests[0]; + let request = request_log.single_request(); assert!( - request.headers.get("content-encoding").is_none(), + request.header("content-encoding").is_none(), "did not expect request compression for API-key auth" ); - let json: serde_json::Value = serde_json::from_slice(&request.body)?; + let json: serde_json::Value = serde_json::from_slice(&request.body_bytes())?; assert!( json.get("input").is_some(), "expected request body to be plain Responses API JSON" diff --git a/codex-rs/core/tests/suite/review.rs b/codex-rs/core/tests/suite/review.rs index 763b6109df4..3c52d4c5a44 100644 --- a/codex-rs/core/tests/suite/review.rs +++ b/codex-rs/core/tests/suite/review.rs @@ -23,7 +23,8 @@ use codex_core::review_format::render_review_output_text; use codex_protocol::user_input::UserInput; use core_test_support::load_default_config_for_test; use core_test_support::load_sse_fixture_with_id_from_str; -use core_test_support::responses::get_responses_requests; +use core_test_support::responses::ResponseMock; +use core_test_support::responses::mount_sse_sequence; use core_test_support::skip_if_no_network; use core_test_support::wait_for_event; use pretty_assertions::assert_eq; @@ -32,11 +33,7 @@ use std::sync::Arc; use tempfile::TempDir; use tokio::io::AsyncWriteExt as _; use uuid::Uuid; -use wiremock::Mock; use wiremock::MockServer; -use wiremock::ResponseTemplate; -use wiremock::matchers::method; -use wiremock::matchers::path; /// Verify that submitting `Op::Review` spawns a child task and emits /// EnteredReviewMode -> ExitedReviewMode(None) -> TaskComplete @@ -75,7 +72,7 @@ async fn review_op_emits_lifecycle_and_review_output() { ]"#; let review_json_escaped = serde_json::to_string(&review_json).unwrap(); let sse_raw = sse_template.replace("__REVIEW__", &review_json_escaped); - let server = start_responses_server_with_sse(&sse_raw, 1).await; + let (server, _request_log) = start_responses_server_with_sse(&sse_raw, 1).await; let codex_home = TempDir::new().unwrap(); let codex = new_conversation_for_server(&server, &codex_home, |_| {}).await; @@ -196,7 +193,7 @@ async fn review_op_with_plain_text_emits_review_fallback() { }}, {"type":"response.completed", "response": {"id": "__ID__"}} ]"#; - let server = start_responses_server_with_sse(sse_raw, 1).await; + let (server, _request_log) = start_responses_server_with_sse(sse_raw, 1).await; let codex_home = TempDir::new().unwrap(); let codex = new_conversation_for_server(&server, &codex_home, |_| {}).await; @@ -256,7 +253,7 @@ async fn review_filters_agent_message_related_events() { }}, {"type":"response.completed", "response": {"id": "__ID__"}} ]"#; - let server = start_responses_server_with_sse(sse_raw, 1).await; + let (server, _request_log) = start_responses_server_with_sse(sse_raw, 1).await; let codex_home = TempDir::new().unwrap(); let codex = new_conversation_for_server(&server, &codex_home, |_| {}).await; @@ -337,7 +334,7 @@ async fn review_does_not_emit_agent_message_on_structured_output() { ]"#; let review_json_escaped = serde_json::to_string(&review_json).unwrap(); let sse_raw = sse_template.replace("__REVIEW__", &review_json_escaped); - let server = start_responses_server_with_sse(&sse_raw, 1).await; + let (server, _request_log) = start_responses_server_with_sse(&sse_raw, 1).await; let codex_home = TempDir::new().unwrap(); let codex = new_conversation_for_server(&server, &codex_home, |_| {}).await; @@ -391,7 +388,7 @@ async fn review_uses_custom_review_model_from_config() { let sse_raw = r#"[ {"type":"response.completed", "response": {"id": "__ID__"}} ]"#; - let server = start_responses_server_with_sse(sse_raw, 1).await; + let (server, request_log) = start_responses_server_with_sse(sse_raw, 1).await; let codex_home = TempDir::new().unwrap(); // Choose a review model different from the main model; ensure it is used. let codex = new_conversation_for_server(&server, &codex_home, |cfg| { @@ -426,11 +423,9 @@ async fn review_uses_custom_review_model_from_config() { let _complete = wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; // Assert the request body model equals the configured review model - let requests = get_responses_requests(&server).await; - let request = requests - .first() - .expect("expected POST request to /responses"); - let body = request.body_json::().unwrap(); + let request = request_log.single_request(); + assert_eq!(request.path(), "/v1/responses"); + let body = request.body_json(); assert_eq!(body["model"].as_str().unwrap(), "gpt-5.1"); server.verify().await; @@ -449,7 +444,7 @@ async fn review_input_isolated_from_parent_history() { let sse_raw = r#"[ {"type":"response.completed", "response": {"id": "__ID__"}} ]"#; - let server = start_responses_server_with_sse(sse_raw, 1).await; + let (server, request_log) = start_responses_server_with_sse(sse_raw, 1).await; // Seed a parent session history via resume file with both user + assistant items. let codex_home = TempDir::new().unwrap(); @@ -547,11 +542,9 @@ async fn review_input_isolated_from_parent_history() { let _complete = wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; // Assert the request `input` contains the environment context followed by the user review prompt. - let requests = get_responses_requests(&server).await; - let request = requests - .first() - .expect("expected POST request to /responses"); - let body = request.body_json::().unwrap(); + let request = request_log.single_request(); + assert_eq!(request.path(), "/v1/responses"); + let body = request.body_json(); let input = body["input"].as_array().expect("input array"); assert!( input.len() >= 2, @@ -630,7 +623,7 @@ async fn review_history_surfaces_in_parent_session() { }}, {"type":"response.completed", "response": {"id": "__ID__"}} ]"#; - let server = start_responses_server_with_sse(sse_raw, 2).await; + let (server, request_log) = start_responses_server_with_sse(sse_raw, 2).await; let codex_home = TempDir::new().unwrap(); let codex = new_conversation_for_server(&server, &codex_home, |_| {}).await; @@ -674,9 +667,12 @@ async fn review_history_surfaces_in_parent_session() { // Inspect the second request (parent turn) input contents. // Parent turns include session initial messages (user_instructions, environment_context). // Critically, no messages from the review thread should appear. - let requests = get_responses_requests(&server).await; + let requests = request_log.requests(); assert_eq!(requests.len(), 2); - let body = requests[1].body_json::().unwrap(); + for request in &requests { + assert_eq!(request.path(), "/v1/responses"); + } + let body = requests[1].body_json(); let input = body["input"].as_array().expect("input array"); // Must include the followup as the last item for this turn @@ -717,7 +713,7 @@ async fn review_uses_overridden_cwd_for_base_branch_merge_base() { skip_if_no_network!(); let sse_raw = r#"[{"type":"response.completed", "response": {"id": "__ID__"}}]"#; - let server = start_responses_server_with_sse(sse_raw, 1).await; + let (server, request_log) = start_responses_server_with_sse(sse_raw, 1).await; let initial_cwd = TempDir::new().unwrap(); @@ -792,9 +788,12 @@ async fn review_uses_overridden_cwd_for_base_branch_merge_base() { let _entered = wait_for_event(&codex, |ev| matches!(ev, EventMsg::EnteredReviewMode(_))).await; let _complete = wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; - let requests = get_responses_requests(&server).await; + let requests = request_log.requests(); assert_eq!(requests.len(), 1); - let body = requests[0].body_json::().unwrap(); + for request in &requests { + assert_eq!(request.path(), "/v1/responses"); + } + let body = requests[0].body_json(); let input = body["input"].as_array().expect("input array"); let saw_merge_base_sha = input @@ -810,20 +809,15 @@ async fn review_uses_overridden_cwd_for_base_branch_merge_base() { } /// Start a mock Responses API server and mount the given SSE stream body. -async fn start_responses_server_with_sse(sse_raw: &str, expected_requests: usize) -> MockServer { +async fn start_responses_server_with_sse( + sse_raw: &str, + expected_requests: usize, +) -> (MockServer, ResponseMock) { let server = MockServer::start().await; let sse = load_sse_fixture_with_id_from_str(sse_raw, &Uuid::new_v4().to_string()); - Mock::given(method("POST")) - .and(path("/v1/responses")) - .respond_with( - ResponseTemplate::new(200) - .insert_header("content-type", "text/event-stream") - .set_body_raw(sse.clone(), "text/event-stream"), - ) - .expect(expected_requests as u64) - .mount(&server) - .await; - server + let responses = vec![sse; expected_requests]; + let request_log = mount_sse_sequence(&server, responses).await; + (server, request_log) } /// Create a conversation configured to talk to the provided mock server. diff --git a/codex-rs/core/tests/suite/unified_exec.rs b/codex-rs/core/tests/suite/unified_exec.rs index 66319e21c20..a640daacc88 100644 --- a/codex-rs/core/tests/suite/unified_exec.rs +++ b/codex-rs/core/tests/suite/unified_exec.rs @@ -20,7 +20,6 @@ use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_function_call; use core_test_support::responses::ev_response_created; -use core_test_support::responses::get_responses_request_bodies; use core_test_support::responses::mount_sse_sequence; use core_test_support::responses::sse; use core_test_support::responses::start_mock_server; @@ -471,7 +470,7 @@ async fn unified_exec_respects_workdir_override() -> Result<()> { ev_completed("resp-2"), ]), ]; - mount_sse_sequence(&server, responses).await; + let request_log = mount_sse_sequence(&server, responses).await; let session_model = session_configured.model.clone(); @@ -503,7 +502,7 @@ async fn unified_exec_respects_workdir_override() -> Result<()> { wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; - let requests = server.received_requests().await.expect("recorded requests"); + let requests = request_log.requests(); assert!(!requests.is_empty(), "expected at least one POST request"); Ok(()) @@ -1217,7 +1216,7 @@ async fn exec_command_reports_chunk_and_exit_metadata() -> Result<()> { ev_completed("resp-2"), ]), ]; - mount_sse_sequence(&server, responses).await; + let request_log = mount_sse_sequence(&server, responses).await; let session_model = session_configured.model.clone(); @@ -1238,10 +1237,12 @@ async fn exec_command_reports_chunk_and_exit_metadata() -> Result<()> { wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; - let requests = server.received_requests().await.expect("recorded requests"); + let requests = request_log.requests(); assert!(!requests.is_empty(), "expected at least one POST request"); - - let bodies = get_responses_request_bodies(&server).await; + let bodies = requests + .into_iter() + .map(|request| request.body_json()) + .collect::>(); let outputs = collect_tool_outputs(&bodies)?; let metadata = outputs @@ -1321,7 +1322,7 @@ async fn unified_exec_respects_early_exit_notifications() -> Result<()> { ev_completed("resp-2"), ]), ]; - mount_sse_sequence(&server, responses).await; + let request_log = mount_sse_sequence(&server, responses).await; let session_model = session_configured.model.clone(); @@ -1342,10 +1343,12 @@ async fn unified_exec_respects_early_exit_notifications() -> Result<()> { wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; - let requests = server.received_requests().await.expect("recorded requests"); + let requests = request_log.requests(); assert!(!requests.is_empty(), "expected at least one POST request"); - - let bodies = get_responses_request_bodies(&server).await; + let bodies = requests + .into_iter() + .map(|request| request.body_json()) + .collect::>(); let outputs = collect_tool_outputs(&bodies)?; let output = outputs @@ -1446,7 +1449,7 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> { ev_completed("resp-4"), ]), ]; - mount_sse_sequence(&server, responses).await; + let request_log = mount_sse_sequence(&server, responses).await; let session_model = session_configured.model.clone(); @@ -1467,10 +1470,12 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> { wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; - let requests = server.received_requests().await.expect("recorded requests"); + let requests = request_log.requests(); assert!(!requests.is_empty(), "expected at least one POST request"); - - let bodies = get_responses_request_bodies(&server).await; + let bodies = requests + .into_iter() + .map(|request| request.body_json()) + .collect::>(); let outputs = collect_tool_outputs(&bodies)?; @@ -1800,7 +1805,7 @@ async fn unified_exec_reuses_session_via_stdin() -> Result<()> { ev_completed("resp-3"), ]), ]; - mount_sse_sequence(&server, responses).await; + let request_log = mount_sse_sequence(&server, responses).await; let session_model = session_configured.model.clone(); @@ -1821,10 +1826,12 @@ async fn unified_exec_reuses_session_via_stdin() -> Result<()> { wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; - let requests = server.received_requests().await.expect("recorded requests"); + let requests = request_log.requests(); assert!(!requests.is_empty(), "expected at least one POST request"); - - let bodies = get_responses_request_bodies(&server).await; + let bodies = requests + .into_iter() + .map(|request| request.body_json()) + .collect::>(); let outputs = collect_tool_outputs(&bodies)?; @@ -1929,7 +1936,7 @@ PY ev_completed("resp-3"), ]), ]; - mount_sse_sequence(&server, responses).await; + let request_log = mount_sse_sequence(&server, responses).await; let session_model = session_configured.model.clone(); @@ -1955,10 +1962,12 @@ PY ) .await; - let requests = server.received_requests().await.expect("recorded requests"); + let requests = request_log.requests(); assert!(!requests.is_empty(), "expected at least one POST request"); - - let bodies = get_responses_request_bodies(&server).await; + let bodies = requests + .into_iter() + .map(|request| request.body_json()) + .collect::>(); let outputs = collect_tool_outputs(&bodies)?; @@ -2038,7 +2047,7 @@ async fn unified_exec_timeout_and_followup_poll() -> Result<()> { ev_completed("resp-3"), ]), ]; - mount_sse_sequence(&server, responses).await; + let request_log = mount_sse_sequence(&server, responses).await; let session_model = session_configured.model.clone(); @@ -2064,10 +2073,12 @@ async fn unified_exec_timeout_and_followup_poll() -> Result<()> { } } - let requests = server.received_requests().await.expect("recorded requests"); + let requests = request_log.requests(); assert!(!requests.is_empty(), "expected at least one POST request"); - - let bodies = get_responses_request_bodies(&server).await; + let bodies = requests + .into_iter() + .map(|request| request.body_json()) + .collect::>(); let outputs = collect_tool_outputs(&bodies)?; @@ -2129,7 +2140,7 @@ PY ev_completed("resp-2"), ]), ]; - mount_sse_sequence(&server, responses).await; + let request_log = mount_sse_sequence(&server, responses).await; let session_model = session_configured.model.clone(); @@ -2150,10 +2161,12 @@ PY wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; - let requests = server.received_requests().await.expect("recorded requests"); + let requests = request_log.requests(); assert!(!requests.is_empty(), "expected at least one POST request"); - - let bodies = get_responses_request_bodies(&server).await; + let bodies = requests + .into_iter() + .map(|request| request.body_json()) + .collect::>(); let outputs = collect_tool_outputs(&bodies)?; let large_output = outputs.get(call_id).expect("missing large output summary"); @@ -2205,7 +2218,7 @@ async fn unified_exec_runs_under_sandbox() -> Result<()> { ev_completed("resp-2"), ]), ]; - mount_sse_sequence(&server, responses).await; + let request_log = mount_sse_sequence(&server, responses).await; let session_model = session_configured.model.clone(); @@ -2227,10 +2240,12 @@ async fn unified_exec_runs_under_sandbox() -> Result<()> { wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; - let requests = server.received_requests().await.expect("recorded requests"); + let requests = request_log.requests(); assert!(!requests.is_empty(), "expected at least one POST request"); - - let bodies = get_responses_request_bodies(&server).await; + let bodies = requests + .into_iter() + .map(|request| request.body_json()) + .collect::>(); let outputs = collect_tool_outputs(&bodies)?; let output = outputs.get(call_id).expect("missing output"); @@ -2304,7 +2319,7 @@ async fn unified_exec_python_prompt_under_seatbelt() -> Result<()> { ev_completed("resp-3"), ]), ]; - mount_sse_sequence(&server, responses).await; + let request_log = mount_sse_sequence(&server, responses).await; let session_model = session_configured.model.clone(); @@ -2325,10 +2340,12 @@ async fn unified_exec_python_prompt_under_seatbelt() -> Result<()> { wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; - let requests = server.received_requests().await.expect("recorded requests"); + let requests = request_log.requests(); assert!(!requests.is_empty(), "expected at least one POST request"); - - let bodies = get_responses_request_bodies(&server).await; + let bodies = requests + .into_iter() + .map(|request| request.body_json()) + .collect::>(); let outputs = collect_tool_outputs(&bodies)?; let startup_output = outputs @@ -2394,7 +2411,7 @@ async fn unified_exec_runs_on_all_platforms() -> Result<()> { ev_completed("resp-2"), ]), ]; - mount_sse_sequence(&server, responses).await; + let request_log = mount_sse_sequence(&server, responses).await; let session_model = session_configured.model.clone(); @@ -2415,10 +2432,12 @@ async fn unified_exec_runs_on_all_platforms() -> Result<()> { wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; - let requests = server.received_requests().await.expect("recorded requests"); + let requests = request_log.requests(); assert!(!requests.is_empty(), "expected at least one POST request"); - - let bodies = get_responses_request_bodies(&server).await; + let bodies = requests + .into_iter() + .map(|request| request.body_json()) + .collect::>(); let outputs = collect_tool_outputs(&bodies)?; let output = outputs.get(call_id).expect("missing output");