From 215d151fc71ccd6e0b39e771fae0c35c3997fb9a Mon Sep 17 00:00:00 2001 From: Marcos Candeia Date: Fri, 31 May 2024 19:54:14 -0300 Subject: [PATCH] Fix parallel message handling Signed-off-by: Marcos Candeia --- client.ts | 25 +++++++++++++++---------- deno.json | 2 +- server.ts | 12 +++++++++--- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/client.ts b/client.ts index 47f12ee..afe7eb2 100644 --- a/client.ts +++ b/client.ts @@ -65,18 +65,23 @@ export const connect = async (opts: ConnectOptions): Promise => { wsSockets, ch, }; - for await (const message of ch.in.recv()) { - try { - await handleServerMessage(state, message); - if (state.live) { - registered.resolve(); - } - } catch (err) { - console.error(new Date(), "error handling message", err); - break; + const ctrl = new AbortController(); + try { + for await (const message of ch.in.recv(ctrl.signal)) { + Promise.resolve(handleServerMessage(state, message)).then(() => { + if (state.live) { + registered.resolve(); + } + }).catch((err) => { + console.error(new Date(), "error handling message", err); + !ctrl.signal.aborted && ctrl.abort(); + }); } + } catch (_err) { + // ignore + } finally { + closed.resolve(); } - closed.resolve(); })(); return { closed: closed.promise, registered: registered.promise }; }; diff --git a/deno.json b/deno.json index c5a7ecb..ff7d589 100644 --- a/deno.json +++ b/deno.json @@ -1,6 +1,6 @@ { "name": "@deco/warp", - "version": "0.2.0", + "version": "0.2.1", "exports": "./mod.ts", "tasks": { "check": "deno fmt && deno lint && deno check mod.ts" diff --git a/server.ts b/server.ts index 56ed29f..e6ee9d5 100644 --- a/server.ts +++ b/server.ts @@ -1,4 +1,4 @@ -import { makeChan, makeChanStream, makeWebSocket } from "./channel.ts"; +import { link, makeChan, makeChanStream, makeWebSocket } from "./channel.ts"; import { handleClientMessage } from "./handlers.server.ts"; import type { ClientMessage, @@ -96,9 +96,15 @@ export const serveHandler = ( apiKeys, }; serverStates[state.clientId] = state; + const ctrl = new AbortController(); + const linked = link(req.signal, ctrl.signal); try { - for await (const message of ch.in.recv(req.signal)) { - await handleClientMessage(state, message); + for await (const message of ch.in.recv(linked)) { + Promise.resolve(handleClientMessage(state, message)).catch( + (_err) => { + !ctrl.signal.aborted && ctrl.abort(); + }, + ); } } catch (_err) { // ignore