Skip to content

Commit 11d256e

Browse files
committed
fix stream event idle for 10s, claude code will retry the request with stream=false
1 parent ed53b38 commit 11d256e

File tree

4 files changed

+95
-62
lines changed

4 files changed

+95
-62
lines changed

src/lib/utils.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,25 @@ export const cacheVSCodeVersion = async () => {
2424

2525
consola.info(`Using VSCode version: ${response}`)
2626
}
27+
28+
export const setupPingInterval = (
29+
stream: {
30+
writeSSE: (data: { event: string; data: string }) => Promise<void>
31+
},
32+
intervalMs: number = 3000,
33+
) => {
34+
const pingInterval = setInterval(async () => {
35+
try {
36+
await stream.writeSSE({
37+
event: "ping",
38+
data: "",
39+
})
40+
consola.debug("Sent ping")
41+
} catch (error) {
42+
consola.warn("Failed to send ping:", error)
43+
clearInterval(pingInterval)
44+
}
45+
}, intervalMs)
46+
47+
return pingInterval
48+
}

src/routes/chat-completions/handler.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { awaitApproval } from "~/lib/approval"
77
import { checkRateLimit } from "~/lib/rate-limit"
88
import { state } from "~/lib/state"
99
import { getTokenCount } from "~/lib/tokenizer"
10+
import { setupPingInterval } from "~/lib/utils"
1011
import { isNullish } from "~/lib/utils"
1112
import {
1213
createChatCompletions,
@@ -56,9 +57,15 @@ export async function handleCompletion(c: Context) {
5657

5758
consola.debug("Streaming response")
5859
return streamSSE(c, async (stream) => {
59-
for await (const chunk of response) {
60-
consola.debug("Streaming chunk:", JSON.stringify(chunk))
61-
await stream.writeSSE(chunk as SSEMessage)
60+
const pingInterval = setupPingInterval(stream)
61+
62+
try {
63+
for await (const chunk of response) {
64+
consola.debug("Streaming chunk:", JSON.stringify(chunk))
65+
await stream.writeSSE(chunk as SSEMessage)
66+
}
67+
} finally {
68+
clearInterval(pingInterval)
6269
}
6370
})
6471
}

src/routes/messages/handler.ts

Lines changed: 61 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { streamSSE } from "hono/streaming"
66
import { awaitApproval } from "~/lib/approval"
77
import { checkRateLimit } from "~/lib/rate-limit"
88
import { state } from "~/lib/state"
9+
import { setupPingInterval } from "~/lib/utils"
910
import {
1011
createResponsesStreamState,
1112
translateResponsesStreamEvent,
@@ -84,33 +85,39 @@ const handleWithChatCompletions = async (
8485

8586
consola.debug("Streaming response from Copilot")
8687
return streamSSE(c, async (stream) => {
88+
const pingInterval = setupPingInterval(stream)
89+
8790
const streamState: AnthropicStreamState = {
8891
messageStartSent: false,
8992
contentBlockIndex: 0,
9093
contentBlockOpen: false,
9194
toolCalls: {},
9295
}
9396

94-
for await (const rawEvent of response) {
95-
consola.debug("Copilot raw stream event:", JSON.stringify(rawEvent))
96-
if (rawEvent.data === "[DONE]") {
97-
break
98-
}
97+
try {
98+
for await (const rawEvent of response) {
99+
consola.debug("Copilot raw stream event:", JSON.stringify(rawEvent))
100+
if (rawEvent.data === "[DONE]") {
101+
break
102+
}
99103

100-
if (!rawEvent.data) {
101-
continue
102-
}
104+
if (!rawEvent.data) {
105+
continue
106+
}
103107

104-
const chunk = JSON.parse(rawEvent.data) as ChatCompletionChunk
105-
const events = translateChunkToAnthropicEvents(chunk, streamState)
108+
const chunk = JSON.parse(rawEvent.data) as ChatCompletionChunk
109+
const events = translateChunkToAnthropicEvents(chunk, streamState)
106110

107-
for (const event of events) {
108-
consola.debug("Translated Anthropic event:", JSON.stringify(event))
109-
await stream.writeSSE({
110-
event: event.type,
111-
data: JSON.stringify(event),
112-
})
111+
for (const event of events) {
112+
consola.debug("Translated Anthropic event:", JSON.stringify(event))
113+
await stream.writeSSE({
114+
event: event.type,
115+
data: JSON.stringify(event),
116+
})
117+
}
113118
}
119+
} finally {
120+
clearInterval(pingInterval)
114121
}
115122
})
116123
}
@@ -135,45 +142,51 @@ const handleWithResponsesApi = async (
135142
if (responsesPayload.stream && isAsyncIterable(response)) {
136143
consola.debug("Streaming response from Copilot (Responses API)")
137144
return streamSSE(c, async (stream) => {
138-
const streamState = createResponsesStreamState()
145+
const pingInterval = setupPingInterval(stream)
139146

140-
for await (const chunk of response) {
141-
const eventName = chunk.event
142-
if (eventName === "ping") {
143-
await stream.writeSSE({ event: "ping", data: "" })
144-
continue
145-
}
147+
const streamState = createResponsesStreamState()
146148

147-
const data = chunk.data
148-
if (!data) {
149-
continue
149+
try {
150+
for await (const chunk of response) {
151+
const eventName = chunk.event
152+
if (eventName === "ping") {
153+
await stream.writeSSE({ event: "ping", data: "" })
154+
continue
155+
}
156+
157+
const data = chunk.data
158+
if (!data) {
159+
continue
160+
}
161+
162+
consola.debug("Responses raw stream event:", data)
163+
164+
const events = translateResponsesStreamEvent(
165+
JSON.parse(data) as ResponseStreamEvent,
166+
streamState,
167+
)
168+
for (const event of events) {
169+
const eventData = JSON.stringify(event)
170+
consola.debug("Translated Anthropic event:", eventData)
171+
await stream.writeSSE({
172+
event: event.type,
173+
data: eventData,
174+
})
175+
}
150176
}
151177

152-
consola.debug("Responses raw stream event:", data)
153-
154-
const events = translateResponsesStreamEvent(
155-
JSON.parse(data) as ResponseStreamEvent,
156-
streamState,
157-
)
158-
for (const event of events) {
159-
const eventData = JSON.stringify(event)
160-
consola.debug("Translated Anthropic event:", eventData)
178+
if (!streamState.messageCompleted) {
179+
consola.warn(
180+
"Responses stream ended without completion; sending fallback message_stop",
181+
)
182+
const fallback = { type: "message_stop" as const }
161183
await stream.writeSSE({
162-
event: event.type,
163-
data: eventData,
184+
event: fallback.type,
185+
data: JSON.stringify(fallback),
164186
})
165187
}
166-
}
167-
168-
if (!streamState.messageCompleted) {
169-
consola.warn(
170-
"Responses stream ended without completion; sending fallback message_stop",
171-
)
172-
const fallback = { type: "message_stop" as const }
173-
await stream.writeSSE({
174-
event: fallback.type,
175-
data: JSON.stringify(fallback),
176-
})
188+
} finally {
189+
clearInterval(pingInterval)
177190
}
178191
})
179192
}

src/routes/responses/handler.ts

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { streamSSE } from "hono/streaming"
66
import { awaitApproval } from "~/lib/approval"
77
import { checkRateLimit } from "~/lib/rate-limit"
88
import { state } from "~/lib/state"
9+
import { setupPingInterval } from "~/lib/utils"
910
import {
1011
createResponses,
1112
type ResponsesPayload,
@@ -52,17 +53,7 @@ export const handleResponses = async (c: Context) => {
5253
if (isStreamingRequested(payload) && isAsyncIterable(response)) {
5354
consola.debug("Forwarding native Responses stream")
5455
return streamSSE(c, async (stream) => {
55-
const pingInterval = setInterval(async () => {
56-
try {
57-
await stream.writeSSE({
58-
event: "ping",
59-
data: JSON.stringify({ timestamp: Date.now() }),
60-
})
61-
} catch (error) {
62-
consola.warn("Failed to send ping:", error)
63-
clearInterval(pingInterval)
64-
}
65-
}, 3000)
56+
const pingInterval = setupPingInterval(stream)
6657

6758
try {
6859
for await (const chunk of response) {

0 commit comments

Comments
 (0)