Skip to content

Commit

Permalink
Fix parallel message handling
Browse files Browse the repository at this point in the history
Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia committed May 31, 2024
1 parent 31b1324 commit 215d151
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 14 deletions.
25 changes: 15 additions & 10 deletions client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,23 @@ export const connect = async (opts: ConnectOptions): Promise<Connected> => {
wsSockets,
ch,
};
for await (const message of ch.in.recv()) {
try {
await handleServerMessage(state, message);
if (state.live) {
registered.resolve();
}
} catch (err) {
console.error(new Date(), "error handling message", err);
break;
const ctrl = new AbortController();
try {
for await (const message of ch.in.recv(ctrl.signal)) {
Promise.resolve(handleServerMessage(state, message)).then(() => {
if (state.live) {
registered.resolve();
}
}).catch((err) => {
console.error(new Date(), "error handling message", err);
!ctrl.signal.aborted && ctrl.abort();
});
}
} catch (_err) {
// ignore
} finally {
closed.resolve();
}
closed.resolve();
})();
return { closed: closed.promise, registered: registered.promise };
};
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@deco/warp",
"version": "0.2.0",
"version": "0.2.1",
"exports": "./mod.ts",
"tasks": {
"check": "deno fmt && deno lint && deno check mod.ts"
Expand Down
12 changes: 9 additions & 3 deletions server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { makeChan, makeChanStream, makeWebSocket } from "./channel.ts";
import { link, makeChan, makeChanStream, makeWebSocket } from "./channel.ts";
import { handleClientMessage } from "./handlers.server.ts";
import type {
ClientMessage,
Expand Down Expand Up @@ -96,9 +96,15 @@ export const serveHandler = (
apiKeys,
};
serverStates[state.clientId] = state;
const ctrl = new AbortController();
const linked = link(req.signal, ctrl.signal);
try {
for await (const message of ch.in.recv(req.signal)) {
await handleClientMessage(state, message);
for await (const message of ch.in.recv(linked)) {
Promise.resolve(handleClientMessage(state, message)).catch(
(_err) => {
!ctrl.signal.aborted && ctrl.abort();
},
);
}
} catch (_err) {
// ignore
Expand Down

0 comments on commit 215d151

Please sign in to comment.