diff --git a/codex-rs/codex-api/src/sse/chat.rs b/codex-rs/codex-api/src/sse/chat.rs index dec35890b78..91c0b6f8a8e 100644 --- a/codex-rs/codex-api/src/sse/chat.rs +++ b/codex-rs/codex-api/src/sse/chat.rs @@ -36,15 +36,22 @@ pub(crate) fn spawn_chat_stream( /// (`data: [DONE]`). Historically, some of our test stubs have emitted `data: DONE` /// (without brackets) instead. /// -/// `eventsource_stream` delivers these sentinels as regular events rather than signaling -/// end-of-stream. If we try to parse them as JSON, we log and skip them, then keep -/// polling for more events. +/// In addition to the sentinel, the stream also encodes completion semantics in JSON via +/// `finish_reason` on each choice: +/// - `finish_reason == "tool_calls"`: tool calls are complete for this response and the client +/// should proceed (typically by executing tools and issuing a follow-up request). +/// - `finish_reason == "stop"`: the assistant message is complete. /// -/// On servers that keep the HTTP connection open after emitting the sentinel (notably -/// wiremock on Windows), skipping the sentinel means we never emit `ResponseEvent::Completed`. -/// Higher-level workflows/tests that wait for completion before issuing subsequent model -/// calls will then stall, which shows up as "expected N requests, got 1" verification -/// failures in the mock server. +/// Some servers/tests (notably WireMock on Windows) may keep the HTTP connection open after +/// emitting a JSON message with `finish_reason` and may not reliably deliver a `DONE`/`[DONE]` +/// sentinel promptly. If the client waits for the sentinel/connection close to mark completion, +/// higher-level workflows can stall while waiting for `ResponseEvent::Completed`, leading to +/// follow-up requests never being issued and WireMock verification failures like +/// "expected N requests, got 1". +/// +/// To avoid this class of stalls, we treat `finish_reason` as authoritative for completion and +/// emit `ResponseEvent::Completed` as soon as we observe it. We still accept `DONE`/`[DONE]` as +/// a terminal completion signal for compatibility, but correctness must not rely on it. pub async fn process_chat_sse( stream: S, tx_event: mpsc::Sender>, @@ -70,8 +77,21 @@ pub async fn process_chat_sse( let mut last_tool_call_index: Option = None; let mut assistant_item: Option = None; let mut reasoning_item: Option = None; - let mut completed_sent = false; + /// Flush any accumulated output items (reasoning/assistant) and then emit a terminal + /// `ResponseEvent::Completed`. + /// + /// `process_chat_sse` can complete due to: + /// - a `finish_reason` in the JSON payload, + /// - a `DONE`/`[DONE]` sentinel, or + /// - the underlying stream ending. + /// + /// Regardless of the trigger, we want the same ordering: item-done events first, then a + /// single `Completed` event. + /// + /// We intentionally do not track a separate `completed_sent` flag: all call sites return + /// immediately after calling `flush_and_complete`, making it impossible to emit multiple + /// `Completed` events from a single invocation of `process_chat_sse`. async fn flush_and_complete( tx_event: &mpsc::Sender>, reasoning_item: &mut Option, @@ -110,9 +130,7 @@ pub async fn process_chat_sse( return; } Ok(None) => { - if !completed_sent { - flush_and_complete(&tx_event, &mut reasoning_item, &mut assistant_item).await; - } + flush_and_complete(&tx_event, &mut reasoning_item, &mut assistant_item).await; return; } Err(_) => { @@ -132,9 +150,7 @@ pub async fn process_chat_sse( } if data == "[DONE]" || data == "DONE" { - if !completed_sent { - flush_and_complete(&tx_event, &mut reasoning_item, &mut assistant_item).await; - } + flush_and_complete(&tx_event, &mut reasoning_item, &mut assistant_item).await; return; } @@ -153,6 +169,12 @@ pub async fn process_chat_sse( continue; }; + // Track completion across all choices in a single SSE event so we can process every + // choice before emitting `Completed` (e.g. multiple tool calls delivered as multiple + // choices in one chunk). + let mut saw_stop = false; + let mut saw_tool_calls = false; + for choice in choices { if let Some(delta) = choice.get("delta") { if let Some(reasoning) = delta.get("reasoning") { @@ -255,35 +277,13 @@ pub async fn process_chat_sse( let finish_reason = choice.get("finish_reason").and_then(|r| r.as_str()); if finish_reason == Some("stop") { - if let Some(reasoning) = reasoning_item.take() { - let _ = tx_event - .send(Ok(ResponseEvent::OutputItemDone(reasoning))) - .await; - } - - if let Some(assistant) = assistant_item.take() { - let _ = tx_event - .send(Ok(ResponseEvent::OutputItemDone(assistant))) - .await; - } - if !completed_sent { - let _ = tx_event - .send(Ok(ResponseEvent::Completed { - response_id: String::new(), - token_usage: None, - })) - .await; - completed_sent = true; - } - continue; - } - - if finish_reason == Some("length") { + saw_stop = true; + } else if finish_reason == Some("length") { let _ = tx_event.send(Err(ApiError::ContextWindowExceeded)).await; return; - } + } else if finish_reason == Some("tool_calls") { + saw_tool_calls = true; - if finish_reason == Some("tool_calls") { if let Some(reasoning) = reasoning_item.take() { let _ = tx_event .send(Ok(ResponseEvent::OutputItemDone(reasoning))) @@ -314,6 +314,11 @@ pub async fn process_chat_sse( } } } + + if saw_stop || saw_tool_calls { + flush_and_complete(&tx_event, &mut reasoning_item, &mut assistant_item).await; + return; + } } } @@ -381,9 +386,13 @@ async fn append_reasoning_text( mod tests { use super::*; use assert_matches::assert_matches; + use bytes::Bytes; use codex_protocol::models::ResponseItem; use futures::TryStreamExt; use serde_json::json; + use std::pin::Pin; + use std::task::Context; + use std::task::Poll; use tokio::sync::mpsc; use tokio_util::io::ReaderStream; @@ -405,12 +414,57 @@ mod tests { assert_matches!(&events[..], [ResponseEvent::Completed { .. }]); } + /// A stream that yields one chunk, then never produces another item and never terminates. + /// + /// This models cases where a server keeps the HTTP connection open after emitting a JSON + /// event that already carries completion semantics (e.g. a choice with + /// `finish_reason == "tool_calls"`), and no `DONE`/`[DONE]` is observed. + /// + /// The bug we are guarding against: if we wait for a transport sentinel/close instead of + /// treating `finish_reason` as authoritative, the SSE parser hangs until `idle_timeout` + /// and surfaces an error, stalling higher-level workflows. + struct OneChunkThenHang { + yielded: bool, + chunk: Bytes, + } + + impl futures::Stream for OneChunkThenHang { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + if this.yielded { + return Poll::Pending; + } + + this.yielded = true; + Poll::Ready(Some(Ok(this.chunk.clone()))) + } + } + + /// Render a single Chat Completions SSE "message" event. + fn sse_message_event(payload: serde_json::Value) -> Bytes { + Bytes::from(format!("event: message\ndata: {payload}\n\n")) + } + async fn collect_events(body: &str) -> Vec { let reader = ReaderStream::new(std::io::Cursor::new(body.to_string())) .map_err(|err| codex_client::TransportError::Network(err.to_string())); + collect_events_from_stream(reader).await + } + + /// Collect all output events from `process_chat_sse` until it terminates. + async fn collect_events_from_stream(stream: S) -> Vec + where + S: futures::Stream> + + Unpin + + Send + + 'static, + { let (tx, mut rx) = mpsc::channel::>(16); + // Use a short timeout so a regression ("wait for DONE/close") fails fast. tokio::spawn(process_chat_sse( - reader, + stream, tx, Duration::from_millis(1000), None, @@ -423,6 +477,114 @@ mod tests { out } + #[tokio::test] + async fn completes_on_tool_calls_finish_reason_even_if_stream_never_closes() { + // The server indicates completion via finish_reason="tool_calls", but does not send a + // DONE/[DONE] sentinel and does not close the underlying stream. We should still emit a + // terminal Completed event and return. + let payload = json!({ + "choices": [{ + "delta": { + "tool_calls": [{ + "id": "call_a", + "index": 0, + "function": { "name": "do_a", "arguments": "{}" } + }] + }, + "finish_reason": "tool_calls" + }] + }); + + let stream = OneChunkThenHang { + yielded: false, + chunk: sse_message_event(payload), + }; + + let events = collect_events_from_stream(stream).await; + assert_matches!( + &events[..], + [ + ResponseEvent::OutputItemDone(ResponseItem::FunctionCall { call_id, name, arguments, .. }), + ResponseEvent::Completed { .. } + ] if call_id == "call_a" && name == "do_a" && arguments == "{}" + ); + } + + #[tokio::test] + async fn completes_on_multi_choice_tool_calls_finish_reason_even_if_stream_never_closes() { + // A single SSE event can contain multiple choices; we should process and emit tool calls + // for all of them before completing the response. + let payload = json!({ + "choices": [ + { + "delta": { + "tool_calls": [{ + "id": "call_a", + "index": 0, + "function": { "name": "do_a", "arguments": "{}" } + }] + }, + "finish_reason": "tool_calls" + }, + { + "delta": { + "tool_calls": [{ + "id": "call_b", + "index": 0, + "function": { "name": "do_b", "arguments": "{}" } + }] + }, + "finish_reason": "tool_calls" + } + ] + }); + + let stream = OneChunkThenHang { + yielded: false, + chunk: sse_message_event(payload), + }; + + let events = collect_events_from_stream(stream).await; + assert_matches!( + &events[..], + [ + ResponseEvent::OutputItemDone(ResponseItem::FunctionCall { call_id: call_a, name: name_a, arguments: args_a, .. }), + ResponseEvent::OutputItemDone(ResponseItem::FunctionCall { call_id: call_b, name: name_b, arguments: args_b, .. }), + ResponseEvent::Completed { .. } + ] if call_a == "call_a" && name_a == "do_a" && args_a == "{}" && call_b == "call_b" && name_b == "do_b" && args_b == "{}" + ); + } + + #[tokio::test] + async fn completes_on_stop_finish_reason_even_if_stream_never_closes() { + // Same as the tool-calls case, but for finish_reason="stop" (a complete assistant + // message). + let payload = json!({ + "choices": [{ + "delta": { + "content": "hi" + }, + "finish_reason": "stop" + }] + }); + + let stream = OneChunkThenHang { + yielded: false, + chunk: sse_message_event(payload), + }; + + let events = collect_events_from_stream(stream).await; + assert_matches!( + &events[..], + [ + ResponseEvent::OutputItemAdded(ResponseItem::Message { .. }), + ResponseEvent::OutputTextDelta(delta), + ResponseEvent::OutputItemDone(ResponseItem::Message { .. }), + ResponseEvent::Completed { .. } + ] if delta == "hi" + ); + } + #[tokio::test] async fn concatenates_tool_call_arguments_across_deltas() { let delta_name = json!({ diff --git a/docs/ci/windows-wiremock-chat-sse-flake.md b/docs/ci/windows-wiremock-chat-sse-flake.md new file mode 100644 index 00000000000..63de362aa60 --- /dev/null +++ b/docs/ci/windows-wiremock-chat-sse-flake.md @@ -0,0 +1,73 @@ +# Windows CI flake: WireMock + Chat Completions SSE never completes + +## Summary + +Some Windows CI runs intermittently failed `codex-app-server` integration tests that use a WireMock “mock model server” for the legacy Chat Completions streaming API (`/v1/chat/completions`). + +The failure mode looked like: + +- WireMock verification panic on drop (e.g. “expected 2/4 matching requests, got 1”). +- The single observed request was `POST /v1/chat/completions` returning an SSE stream that included tool calls. +- The tests expected multiple model calls across a conversation/turn sequence, but Codex never issued the follow-up request. + +## Root cause + +The mock server responds with an SSE payload that contains: + +- a JSON `data: ...` event whose `finish_reason` is `"tool_calls"` (or `"stop"`), and +- a terminal sentinel event `data: DONE` (historical test fixture) or `data: [DONE]` (upstream convention). + +On Windows (notably under WireMock), the HTTP connection can remain open after emitting a `finish_reason`-bearing JSON event, and the sentinel may not be observed in a way that closes the stream promptly from the client’s perspective. + +Prior to the fix, our Chat Completions SSE parsing treated `finish_reason == "tool_calls"` as “emit tool call items”, but it did **not** unconditionally emit `ResponseEvent::Completed` / terminate the stream at that point. That meant: + +1. Codex received tool calls and began executing/handling them. +2. The core turn runner (`try_run_turn`) continued waiting for `ResponseEvent::Completed` to mark the turn finished. +3. Since completion never arrived, the turn stalled, no follow-up model request was issued, and WireMock later panicked because it only saw the first request. + +This is why the failures were “flaky”: it depended on timing/stream termination behavior on Windows rather than deterministic application logic. + +## Fix + +We changed the Chat Completions SSE parser to treat `finish_reason` as authoritative for end-of-response: + +- When a choice reports `finish_reason == "tool_calls"`, we: + - flush any accumulated reasoning item (if present), + - emit the `FunctionCall` response items, + - emit `ResponseEvent::Completed`, and + - return from the SSE processing loop. +- When a choice reports `finish_reason == "stop"`, we flush any accumulated reasoning/assistant items, emit `Completed`, and return. + +We still accept `data: DONE` / `data: [DONE]` as a terminal sentinel for compatibility, but the parser no longer depends on that sentinel to complete a turn. + +Implementation: `codex-rs/codex-api/src/sse/chat.rs` + +## Protocol correctness and risk + +This change was motivated by flaky tests, but it is not a test-only workaround; it corrects completion semantics in the Chat Completions streaming handler. + +In the Chat Completions stream, `finish_reason` in the JSON payload (`"tool_calls"`, `"stop"`, etc.) is the semantic signal that the model is done producing output for the current response. The `DONE`/`[DONE]` line is a transport sentinel that many servers emit, but clients should not rely on it being observed promptly (or at all) to make forward progress—especially in environments where the HTTP connection may remain open. + +To keep risk low: +- We only change behavior when we see an explicit `finish_reason` value that already indicates the stream is complete; in those cases we now emit `ResponseEvent::Completed` immediately instead of waiting for a sentinel/close. +- We preserve compatibility by still accepting `DONE`/`[DONE]` as a completion signal. +- We validate the behavior with focused unit tests in `codex-api` plus the higher-level `codex-app-server` integration tests that were flaking. + +## Why this fixes the tests + +The integration tests in `codex-app-server` rely on turn completion to progress the conversation and trigger subsequent model calls. By guaranteeing that the SSE parser emits a `Completed` event as soon as the model indicates it is finished (via `finish_reason`), the core state machine can: + +- complete the current turn deterministically, +- proceed to the next turn/model request immediately, and +- satisfy WireMock’s expected request count reliably on Windows. + +## Verification + +Locally, we validated: + +- `cargo test -p codex-api --lib` +- `cargo test -p codex-app-server codex_message_processor_flow` + +The previously failing `codex_message_processor_flow` tests now pass. + +In particular, `codex-api` includes regression tests that simulate a stream that *does not close* after emitting a JSON event with `finish_reason` (to mimic WireMock/Windows behavior). Those tests ensure we emit `Completed` and return without relying on `DONE`/connection close.