diff --git a/channel.ts b/channel.ts index 28ff12a..7be96c2 100644 --- a/channel.ts +++ b/channel.ts @@ -167,12 +167,17 @@ export const makeWebSocket = < export const makeReadableStream = ( ch: Channel, + signal?: AbortSignal, ): ReadableStream => { return new ReadableStream({ async start(controller) { - for await (const content of ch.recv()) { + for await (const content of ch.recv(signal)) { controller.enqueue(content); } + // Uncomment if necessary. this will send a signal to the controller + // if (signal?.aborted) { + // controller.error(new Error("aborted")); + // } controller.close(); }, cancel() { @@ -195,6 +200,10 @@ export const makeChanStream = ( } chan.close(); }; - processStream().catch(console.error); + processStream().catch((err) => { + if (!err?.target?.aborted) { + console.error("error processing stream", err); + } + }); return chan; }; diff --git a/deno.json b/deno.json index 1b92ee8..dff8e9b 100644 --- a/deno.json +++ b/deno.json @@ -1,6 +1,6 @@ { "name": "@deco/warp", - "version": "0.3.5", + "version": "0.3.6", "exports": "./mod.ts", "tasks": { "check": "deno fmt && deno lint && deno check mod.ts" diff --git a/handlers.client.ts b/handlers.client.ts index 8f97b7a..d9de976 100644 --- a/handlers.client.ts +++ b/handlers.client.ts @@ -56,7 +56,9 @@ const onRequestStart: ServerMessageHandler = async ( if (!message.hasBody) { doFetch(message, state, state.ch.out, abortCtrl.signal).catch( ignoreIfClosed, - ); + ).finally(() => { + delete state.requests[message.id]; + }); } else { const bodyData = makeChan(); state.requests[message.id]!.body = bodyData; diff --git a/handlers.server.ts b/handlers.server.ts index 5313e04..dfa98dc 100644 --- a/handlers.server.ts +++ b/handlers.server.ts @@ -48,7 +48,7 @@ const onResponseStart: ClientMessageHandler = ( ); const shouldBeNullBody = NULL_BODIES.includes(message.statusCode); const stream = !shouldBeNullBody && request.responseBodyChan - ? makeReadableStream(request.responseBodyChan) + ? makeReadableStream(request.responseBodyChan, state.ch.out.signal) : undefined; const resp = new Response(stream, { status: message.statusCode, @@ -62,6 +62,7 @@ const onResponseStart: ClientMessageHandler = ( request.responseObject.reject( new DOMException("Connection closed", "AbortError"), ); + request?.responseBodyChan?.close?.(); } }); request.responseObject.resolve(resp); @@ -105,7 +106,7 @@ const onDataEnd: ClientMessageHandler = (state, message) => { try { request.responseBodyChan?.close?.(); } catch (_err) { - console.log(_err); + console.log(`error closing body chan`, _err); } };