diff --git a/src/app/api/chat/route.ts b/src/app/api/chat/route.ts index 6362ebc7b..b5ec6d354 100644 --- a/src/app/api/chat/route.ts +++ b/src/app/api/chat/route.ts @@ -155,57 +155,72 @@ export const POST = async (req: Request) => { const responseStream = new TransformStream(); const writer = responseStream.writable.getWriter(); const encoder = new TextEncoder(); + const keepAliveMs = 15_000; + let streamClosed = false; + let keepAliveInterval: ReturnType | undefined; + + const safeWrite = (payload: Record) => { + if (streamClosed) return; + + try { + writer.write(encoder.encode(JSON.stringify(payload) + '\n')); + } catch (error) { + console.warn('Failed to write chat stream payload:', error); + } + }; + + const safeClose = () => { + if (streamClosed) return; + + streamClosed = true; + + if (keepAliveInterval) { + clearInterval(keepAliveInterval); + } + + try { + writer.close(); + } catch (error) { + console.warn('Failed to close chat stream:', error); + } + }; + + keepAliveInterval = setInterval(() => { + safeWrite({ type: 'keepAlive' }); + }, keepAliveMs); + + safeWrite({ type: 'keepAlive' }); const disconnect = session.subscribe((event: string, data: any) => { if (event === 'data') { if (data.type === 'block') { - writer.write( - encoder.encode( - JSON.stringify({ - type: 'block', - block: data.block, - }) + '\n', - ), - ); + safeWrite({ + type: 'block', + block: data.block, + }); } else if (data.type === 'updateBlock') { - writer.write( - encoder.encode( - JSON.stringify({ - type: 'updateBlock', - blockId: data.blockId, - patch: data.patch, - }) + '\n', - ), - ); + safeWrite({ + type: 'updateBlock', + blockId: data.blockId, + patch: data.patch, + }); } else if (data.type === 'researchComplete') { - writer.write( - encoder.encode( - JSON.stringify({ - type: 'researchComplete', - }) + '\n', - ), - ); + safeWrite({ + type: 'researchComplete', + }); } } else if (event === 'end') { - writer.write( - encoder.encode( - JSON.stringify({ - type: 'messageEnd', - }) + '\n', - ), - ); - writer.close(); + safeWrite({ + type: 'messageEnd', + }); + safeClose(); session.removeAllListeners(); } else if (event === 'error') { - writer.write( - encoder.encode( - JSON.stringify({ - type: 'error', - data: data.data, - }) + '\n', - ), - ); - writer.close(); + safeWrite({ + type: 'error', + data: data.data, + }); + safeClose(); session.removeAllListeners(); } }); @@ -234,7 +249,7 @@ export const POST = async (req: Request) => { req.signal.addEventListener('abort', () => { disconnect(); - writer.close(); + safeClose(); }); return new Response(responseStream.readable, { diff --git a/src/app/api/reconnect/[id]/route.ts b/src/app/api/reconnect/[id]/route.ts index 08be11b16..ecd5c2c90 100644 --- a/src/app/api/reconnect/[id]/route.ts +++ b/src/app/api/reconnect/[id]/route.ts @@ -16,64 +16,79 @@ export const POST = async ( const responseStream = new TransformStream(); const writer = responseStream.writable.getWriter(); const encoder = new TextEncoder(); + const keepAliveMs = 15_000; + let streamClosed = false; + let keepAliveInterval: ReturnType | undefined; + + const safeWrite = (payload: Record) => { + if (streamClosed) return; + + try { + writer.write(encoder.encode(JSON.stringify(payload) + '\n')); + } catch (error) { + console.warn('Failed to write reconnect stream payload:', error); + } + }; + + const safeClose = () => { + if (streamClosed) return; + + streamClosed = true; + + if (keepAliveInterval) { + clearInterval(keepAliveInterval); + } + + try { + writer.close(); + } catch (error) { + console.warn('Failed to close reconnect stream:', error); + } + }; + + keepAliveInterval = setInterval(() => { + safeWrite({ type: 'keepAlive' }); + }, keepAliveMs); + + safeWrite({ type: 'keepAlive' }); const disconnect = session.subscribe((event, data) => { if (event === 'data') { if (data.type === 'block') { - writer.write( - encoder.encode( - JSON.stringify({ - type: 'block', - block: data.block, - }) + '\n', - ), - ); + safeWrite({ + type: 'block', + block: data.block, + }); } else if (data.type === 'updateBlock') { - writer.write( - encoder.encode( - JSON.stringify({ - type: 'updateBlock', - blockId: data.blockId, - patch: data.patch, - }) + '\n', - ), - ); + safeWrite({ + type: 'updateBlock', + blockId: data.blockId, + patch: data.patch, + }); } else if (data.type === 'researchComplete') { - writer.write( - encoder.encode( - JSON.stringify({ - type: 'researchComplete', - }) + '\n', - ), - ); + safeWrite({ + type: 'researchComplete', + }); } } else if (event === 'end') { - writer.write( - encoder.encode( - JSON.stringify({ - type: 'messageEnd', - }) + '\n', - ), - ); - writer.close(); + safeWrite({ + type: 'messageEnd', + }); + safeClose(); disconnect(); } else if (event === 'error') { - writer.write( - encoder.encode( - JSON.stringify({ - type: 'error', - data: data.data, - }) + '\n', - ), - ); - writer.close(); + safeWrite({ + type: 'error', + data: data.data, + }); + safeClose(); disconnect(); } }); req.signal.addEventListener('abort', () => { disconnect(); - writer.close(); + safeClose(); }); return new Response(responseStream.readable, { diff --git a/src/lib/hooks/useChat.tsx b/src/lib/hooks/useChat.tsx index 5ee6d9fbe..26205b102 100644 --- a/src/lib/hooks/useChat.tsx +++ b/src/lib/hooks/useChat.tsx @@ -551,6 +551,10 @@ export const ChatProvider = ({ children }: { children: React.ReactNode }) => { const messageId = message.messageId; return async (data: any) => { + if (data.type === 'keepAlive') { + return; + } + if (data.type === 'error') { toast.error(data.data); setLoading(false);