Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 204 additions & 42 deletions codex-rs/codex-api/src/sse/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,22 @@ pub(crate) fn spawn_chat_stream(
/// (`data: [DONE]`). Historically, some of our test stubs have emitted `data: DONE`
/// (without brackets) instead.
///
/// `eventsource_stream` delivers these sentinels as regular events rather than signaling
/// end-of-stream. If we try to parse them as JSON, we log and skip them, then keep
/// polling for more events.
/// In addition to the sentinel, the stream also encodes completion semantics in JSON via
/// `finish_reason` on each choice:
/// - `finish_reason == "tool_calls"`: tool calls are complete for this response and the client
/// should proceed (typically by executing tools and issuing a follow-up request).
/// - `finish_reason == "stop"`: the assistant message is complete.
///
/// On servers that keep the HTTP connection open after emitting the sentinel (notably
/// wiremock on Windows), skipping the sentinel means we never emit `ResponseEvent::Completed`.
/// Higher-level workflows/tests that wait for completion before issuing subsequent model
/// calls will then stall, which shows up as "expected N requests, got 1" verification
/// failures in the mock server.
/// Some servers/tests (notably WireMock on Windows) may keep the HTTP connection open after
/// emitting a JSON message with `finish_reason` and may not reliably deliver a `DONE`/`[DONE]`
/// sentinel promptly. If the client waits for the sentinel/connection close to mark completion,
/// higher-level workflows can stall while waiting for `ResponseEvent::Completed`, leading to
/// follow-up requests never being issued and WireMock verification failures like
/// "expected N requests, got 1".
///
/// To avoid this class of stalls, we treat `finish_reason` as authoritative for completion and
/// emit `ResponseEvent::Completed` as soon as we observe it. We still accept `DONE`/`[DONE]` as
/// a terminal completion signal for compatibility, but correctness must not rely on it.
pub async fn process_chat_sse<S>(
stream: S,
tx_event: mpsc::Sender<Result<ResponseEvent, ApiError>>,
Expand All @@ -70,8 +77,21 @@ pub async fn process_chat_sse<S>(
let mut last_tool_call_index: Option<usize> = None;
let mut assistant_item: Option<ResponseItem> = None;
let mut reasoning_item: Option<ResponseItem> = None;
let mut completed_sent = false;

/// Flush any accumulated output items (reasoning/assistant) and then emit a terminal
/// `ResponseEvent::Completed`.
///
/// `process_chat_sse` can complete due to:
/// - a `finish_reason` in the JSON payload,
/// - a `DONE`/`[DONE]` sentinel, or
/// - the underlying stream ending.
///
/// Regardless of the trigger, we want the same ordering: item-done events first, then a
/// single `Completed` event.
///
/// We intentionally do not track a separate `completed_sent` flag: all call sites return
/// immediately after calling `flush_and_complete`, making it impossible to emit multiple
/// `Completed` events from a single invocation of `process_chat_sse`.
async fn flush_and_complete(
tx_event: &mpsc::Sender<Result<ResponseEvent, ApiError>>,
reasoning_item: &mut Option<ResponseItem>,
Expand Down Expand Up @@ -110,9 +130,7 @@ pub async fn process_chat_sse<S>(
return;
}
Ok(None) => {
if !completed_sent {
flush_and_complete(&tx_event, &mut reasoning_item, &mut assistant_item).await;
}
flush_and_complete(&tx_event, &mut reasoning_item, &mut assistant_item).await;
return;
}
Err(_) => {
Expand All @@ -132,9 +150,7 @@ pub async fn process_chat_sse<S>(
}

if data == "[DONE]" || data == "DONE" {
if !completed_sent {
flush_and_complete(&tx_event, &mut reasoning_item, &mut assistant_item).await;
}
flush_and_complete(&tx_event, &mut reasoning_item, &mut assistant_item).await;
return;
}

Expand All @@ -153,6 +169,12 @@ pub async fn process_chat_sse<S>(
continue;
};

// Track completion across all choices in a single SSE event so we can process every
// choice before emitting `Completed` (e.g. multiple tool calls delivered as multiple
// choices in one chunk).
let mut saw_stop = false;
let mut saw_tool_calls = false;

for choice in choices {
if let Some(delta) = choice.get("delta") {
if let Some(reasoning) = delta.get("reasoning") {
Expand Down Expand Up @@ -255,35 +277,13 @@ pub async fn process_chat_sse<S>(

let finish_reason = choice.get("finish_reason").and_then(|r| r.as_str());
if finish_reason == Some("stop") {
if let Some(reasoning) = reasoning_item.take() {
let _ = tx_event
.send(Ok(ResponseEvent::OutputItemDone(reasoning)))
.await;
}

if let Some(assistant) = assistant_item.take() {
let _ = tx_event
.send(Ok(ResponseEvent::OutputItemDone(assistant)))
.await;
}
if !completed_sent {
let _ = tx_event
.send(Ok(ResponseEvent::Completed {
response_id: String::new(),
token_usage: None,
}))
.await;
completed_sent = true;
}
continue;
}

if finish_reason == Some("length") {
saw_stop = true;
} else if finish_reason == Some("length") {
let _ = tx_event.send(Err(ApiError::ContextWindowExceeded)).await;
return;
}
} else if finish_reason == Some("tool_calls") {
saw_tool_calls = true;

if finish_reason == Some("tool_calls") {
if let Some(reasoning) = reasoning_item.take() {
let _ = tx_event
.send(Ok(ResponseEvent::OutputItemDone(reasoning)))
Expand Down Expand Up @@ -314,6 +314,11 @@ pub async fn process_chat_sse<S>(
}
}
}

if saw_stop || saw_tool_calls {
flush_and_complete(&tx_event, &mut reasoning_item, &mut assistant_item).await;
return;
}
}
}

Expand Down Expand Up @@ -381,9 +386,13 @@ async fn append_reasoning_text(
mod tests {
use super::*;
use assert_matches::assert_matches;
use bytes::Bytes;
use codex_protocol::models::ResponseItem;
use futures::TryStreamExt;
use serde_json::json;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio::sync::mpsc;
use tokio_util::io::ReaderStream;

Expand All @@ -405,12 +414,57 @@ mod tests {
assert_matches!(&events[..], [ResponseEvent::Completed { .. }]);
}

/// A stream that yields one chunk, then never produces another item and never terminates.
///
/// This models cases where a server keeps the HTTP connection open after emitting a JSON
/// event that already carries completion semantics (e.g. a choice with
/// `finish_reason == "tool_calls"`), and no `DONE`/`[DONE]` is observed.
///
/// The bug we are guarding against: if we wait for a transport sentinel/close instead of
/// treating `finish_reason` as authoritative, the SSE parser hangs until `idle_timeout`
/// and surfaces an error, stalling higher-level workflows.
struct OneChunkThenHang {
yielded: bool,
chunk: Bytes,
}

impl futures::Stream for OneChunkThenHang {
type Item = Result<Bytes, codex_client::TransportError>;

fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.yielded {
return Poll::Pending;
}

this.yielded = true;
Poll::Ready(Some(Ok(this.chunk.clone())))
}
}

/// Render a single Chat Completions SSE "message" event.
fn sse_message_event(payload: serde_json::Value) -> Bytes {
Bytes::from(format!("event: message\ndata: {payload}\n\n"))
}

async fn collect_events(body: &str) -> Vec<ResponseEvent> {
let reader = ReaderStream::new(std::io::Cursor::new(body.to_string()))
.map_err(|err| codex_client::TransportError::Network(err.to_string()));
collect_events_from_stream(reader).await
}

/// Collect all output events from `process_chat_sse` until it terminates.
async fn collect_events_from_stream<S>(stream: S) -> Vec<ResponseEvent>
where
S: futures::Stream<Item = Result<Bytes, codex_client::TransportError>>
+ Unpin
+ Send
+ 'static,
{
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent, ApiError>>(16);
// Use a short timeout so a regression ("wait for DONE/close") fails fast.
tokio::spawn(process_chat_sse(
reader,
stream,
tx,
Duration::from_millis(1000),
None,
Expand All @@ -423,6 +477,114 @@ mod tests {
out
}

#[tokio::test]
async fn completes_on_tool_calls_finish_reason_even_if_stream_never_closes() {
// The server indicates completion via finish_reason="tool_calls", but does not send a
// DONE/[DONE] sentinel and does not close the underlying stream. We should still emit a
// terminal Completed event and return.
let payload = json!({
"choices": [{
"delta": {
"tool_calls": [{
"id": "call_a",
"index": 0,
"function": { "name": "do_a", "arguments": "{}" }
}]
},
"finish_reason": "tool_calls"
}]
});

let stream = OneChunkThenHang {
yielded: false,
chunk: sse_message_event(payload),
};

let events = collect_events_from_stream(stream).await;
assert_matches!(
&events[..],
[
ResponseEvent::OutputItemDone(ResponseItem::FunctionCall { call_id, name, arguments, .. }),
ResponseEvent::Completed { .. }
] if call_id == "call_a" && name == "do_a" && arguments == "{}"
);
}

#[tokio::test]
async fn completes_on_multi_choice_tool_calls_finish_reason_even_if_stream_never_closes() {
// A single SSE event can contain multiple choices; we should process and emit tool calls
// for all of them before completing the response.
let payload = json!({
"choices": [
{
"delta": {
"tool_calls": [{
"id": "call_a",
"index": 0,
"function": { "name": "do_a", "arguments": "{}" }
}]
},
"finish_reason": "tool_calls"
},
{
"delta": {
"tool_calls": [{
"id": "call_b",
"index": 0,
"function": { "name": "do_b", "arguments": "{}" }
}]
},
"finish_reason": "tool_calls"
}
]
});

let stream = OneChunkThenHang {
yielded: false,
chunk: sse_message_event(payload),
};

let events = collect_events_from_stream(stream).await;
assert_matches!(
&events[..],
[
ResponseEvent::OutputItemDone(ResponseItem::FunctionCall { call_id: call_a, name: name_a, arguments: args_a, .. }),
ResponseEvent::OutputItemDone(ResponseItem::FunctionCall { call_id: call_b, name: name_b, arguments: args_b, .. }),
ResponseEvent::Completed { .. }
] if call_a == "call_a" && name_a == "do_a" && args_a == "{}" && call_b == "call_b" && name_b == "do_b" && args_b == "{}"
);
}

#[tokio::test]
async fn completes_on_stop_finish_reason_even_if_stream_never_closes() {
// Same as the tool-calls case, but for finish_reason="stop" (a complete assistant
// message).
let payload = json!({
"choices": [{
"delta": {
"content": "hi"
},
"finish_reason": "stop"
}]
});

let stream = OneChunkThenHang {
yielded: false,
chunk: sse_message_event(payload),
};

let events = collect_events_from_stream(stream).await;
assert_matches!(
&events[..],
[
ResponseEvent::OutputItemAdded(ResponseItem::Message { .. }),
ResponseEvent::OutputTextDelta(delta),
ResponseEvent::OutputItemDone(ResponseItem::Message { .. }),
ResponseEvent::Completed { .. }
] if delta == "hi"
);
}

#[tokio::test]
async fn concatenates_tool_call_arguments_across_deltas() {
let delta_name = json!({
Expand Down
Loading
Loading