Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.

Commit 40cc7ac

Browse files
committed
fix(core): update websocket to use event listeners
1 parent ec58033 commit 40cc7ac

File tree

4 files changed

+81
-115
lines changed

4 files changed

+81
-115
lines changed

packages/core/src/client/actor-conn.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -255,18 +255,18 @@ enc
255255
signal ? { signal } : undefined,
256256
);
257257
this.#transport = { websocket: ws };
258-
ws.onopen = () => {
258+
ws.addEventListener("open", () => {
259259
logger().debug("websocket open");
260-
};
261-
ws.onmessage = async (ev) => {
260+
});
261+
ws.addEventListener("message", async (ev) => {
262262
this.#handleOnMessage(ev.data);
263-
};
264-
ws.onclose = (ev) => {
263+
});
264+
ws.addEventListener("close", (ev) => {
265265
this.#handleOnClose(ev);
266-
};
267-
ws.onerror = (ev) => {
266+
});
267+
ws.addEventListener("error", (ev) => {
268268
this.#handleOnError();
269-
};
269+
});
270270
}
271271

272272
async #connectSse({ signal }: { signal?: AbortSignal } = {}) {

packages/core/src/drivers/rivet/ws-proxy.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export async function createWebSocketProxy(
2626
targetWs = new WebSocket(targetUrl, { headers });
2727

2828
// Set up target websocket handlers
29-
targetWs.onopen = () => {
29+
targetWs.addEventListener("open", () => {
3030
invariant(targetWs, "targetWs does not exist");
3131

3232
// Process any queued messages once connected
@@ -37,13 +37,13 @@ export async function createWebSocketProxy(
3737
// Clear the queue after sending
3838
messageQueue.length = 0;
3939
}
40-
};
40+
});
4141

42-
targetWs.onmessage = (event: any) => {
42+
targetWs.addEventListener("message", (event: any) => {
4343
wsContext.send(event.data as any);
44-
};
44+
});
4545

46-
targetWs.onclose = (event: any) => {
46+
targetWs.addEventListener("close", (event: any) => {
4747
logger().debug("target websocket closed", {
4848
code: event.code,
4949
reason: event.reason,
@@ -53,17 +53,17 @@ export async function createWebSocketProxy(
5353
// Forward the close code and reason from target to client
5454
wsContext.close(event.code, event.reason);
5555
}
56-
};
56+
});
5757

58-
targetWs.onerror = () => {
58+
targetWs.addEventListener("error", () => {
5959
logger().warn("target websocket error");
6060

6161
if (wsContext.readyState === WebSocket.OPEN) {
6262
// Use standard WebSocket error code: 1006 - Abnormal Closure
6363
// The connection was closed abnormally, e.g., without sending or receiving a Close control frame
6464
wsContext.close(1006, "Error in target connection");
6565
}
66-
};
66+
});
6767
},
6868

6969
// Handle messages from client to target

packages/core/src/inline-client-driver/fake-websocket.ts

Lines changed: 50 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -22,50 +22,15 @@ export class FakeWebSocket {
2222
#wsContext: WSContext;
2323
#readyState: 0 | 1 | 2 | 3 = 0; // Start in CONNECTING state
2424
#queuedMessages: Array<string | ArrayBuffer | Uint8Array> = [];
25-
// Event buffering is needed since onopen/onmessage events can be fired
26-
// before JavaScript has a chance to assign handlers (e.g. within the same tick)
25+
// Event buffering is needed since events can be fired
26+
// before JavaScript has a chance to add event listeners (e.g. within the same tick)
2727
#bufferedEvents: Array<{
28-
type: "open" | "close" | "error" | "message";
28+
type: string;
2929
event: any;
3030
}> = [];
3131

32-
// Event handlers with buffering
33-
#onopen: ((ev: any) => void) | null = null;
34-
#onclose: ((ev: any) => void) | null = null;
35-
#onerror: ((ev: any) => void) | null = null;
36-
#onmessage: ((ev: any) => void) | null = null;
37-
38-
get onopen() {
39-
return this.#onopen;
40-
}
41-
set onopen(handler: ((ev: any) => void) | null) {
42-
this.#onopen = handler;
43-
if (handler) this.#flushBufferedEvents("open");
44-
}
45-
46-
get onclose() {
47-
return this.#onclose;
48-
}
49-
set onclose(handler: ((ev: any) => void) | null) {
50-
this.#onclose = handler;
51-
if (handler) this.#flushBufferedEvents("close");
52-
}
53-
54-
get onerror() {
55-
return this.#onerror;
56-
}
57-
set onerror(handler: ((ev: any) => void) | null) {
58-
this.#onerror = handler;
59-
if (handler) this.#flushBufferedEvents("error");
60-
}
61-
62-
get onmessage() {
63-
return this.#onmessage;
64-
}
65-
set onmessage(handler: ((ev: any) => void) | null) {
66-
this.#onmessage = handler;
67-
if (handler) this.#flushBufferedEvents("message");
68-
}
32+
// Event listeners with buffering
33+
#eventListeners: Map<string, ((ev: any) => void)[]> = new Map();
6934

7035
constructor(handler: ConnectWebSocketOutput) {
7136
this.#handler = handler;
@@ -303,13 +268,7 @@ export class FakeWebSocket {
303268
} as unknown as MessageEvent;
304269

305270
// Dispatch the event
306-
if (this.onmessage) {
307-
logger().debug("dispatching message to onmessage handler");
308-
this.onmessage(event);
309-
} else {
310-
logger().debug("no onmessage handler registered, buffering message");
311-
this.#bufferedEvents.push({ type: "message", event });
312-
}
271+
this.#dispatchEvent("message", event);
313272
}
314273

315274
#handleClose(code: number, reason: string): void {
@@ -328,14 +287,47 @@ export class FakeWebSocket {
328287
} as unknown as CloseEvent;
329288

330289
// Dispatch the event
331-
if (this.onclose) {
332-
this.onclose(event);
290+
this.#dispatchEvent("close", event);
291+
}
292+
293+
addEventListener(type: string, listener: (ev: any) => void): void {
294+
if (!this.#eventListeners.has(type)) {
295+
this.#eventListeners.set(type, []);
296+
}
297+
this.#eventListeners.get(type)!.push(listener);
298+
299+
// Flush any buffered events for this type
300+
this.#flushBufferedEvents(type);
301+
}
302+
303+
removeEventListener(type: string, listener: (ev: any) => void): void {
304+
const listeners = this.#eventListeners.get(type);
305+
if (listeners) {
306+
const index = listeners.indexOf(listener);
307+
if (index !== -1) {
308+
listeners.splice(index, 1);
309+
}
310+
}
311+
}
312+
313+
#dispatchEvent(type: string, event: any): void {
314+
const listeners = this.#eventListeners.get(type);
315+
if (listeners && listeners.length > 0) {
316+
logger().debug(`dispatching ${type} event to ${listeners.length} listeners`);
317+
for (const listener of listeners) {
318+
try {
319+
listener(event);
320+
} catch (err) {
321+
logger().error(`error in ${type} event listener`, { error: err });
322+
}
323+
}
333324
} else {
334-
this.#bufferedEvents.push({ type: "close", event });
325+
logger().debug(`no ${type} listeners registered, buffering event`);
326+
this.#bufferedEvents.push({ type, event });
335327
}
336328
}
337329

338-
#flushBufferedEvents(type: "open" | "close" | "error" | "message"): void {
330+
#flushBufferedEvents(type: string): void {
339331
const eventsToFlush = this.#bufferedEvents.filter(
340332
(buffered) => buffered.type === type,
341333
);
@@ -344,24 +336,7 @@ export class FakeWebSocket {
344336
);
345337

346338
for (const { event } of eventsToFlush) {
347-
try {
348-
switch (type) {
349-
case "open":
350-
this.#onopen?.(event);
351-
break;
352-
case "close":
353-
this.#onclose?.(event);
354-
break;
355-
case "error":
356-
this.#onerror?.(event);
357-
break;
358-
case "message":
359-
this.#onmessage?.(event);
360-
break;
361-
}
362-
} catch (err) {
363-
logger().error(`error in buffered ${type} handler`, { error: err });
364-
}
339+
this.#dispatchEvent(type, event);
365340
}
366341
}
367342

@@ -374,25 +349,17 @@ export class FakeWebSocket {
374349
currentTarget: this,
375350
} as unknown as Event;
376351

377-
if (this.onopen) {
378-
this.onopen(event);
379-
} else {
380-
this.#bufferedEvents.push({ type: "open", event });
381-
}
352+
this.#dispatchEvent("open", event);
382353
} catch (err) {
383-
logger().error("error in onopen handler", { error: err });
354+
logger().error("error in open event", { error: err });
384355
}
385356
}
386357

387358
#fireClose(event: CloseEvent): void {
388359
try {
389-
if (this.onclose) {
390-
this.onclose(event);
391-
} else {
392-
this.#bufferedEvents.push({ type: "close", event });
393-
}
360+
this.#dispatchEvent("close", event);
394361
} catch (err) {
395-
logger().error("error in onclose handler", { error: err });
362+
logger().error("error in close event", { error: err });
396363
}
397364
}
398365

@@ -407,13 +374,9 @@ export class FakeWebSocket {
407374
message: error instanceof Error ? error.message : String(error),
408375
} as unknown as Event;
409376

410-
if (this.onerror) {
411-
this.onerror(event);
412-
} else {
413-
this.#bufferedEvents.push({ type: "error", event });
414-
}
377+
this.#dispatchEvent("error", event);
415378
} catch (err) {
416-
logger().error("error in onerror handler", { error: err });
379+
logger().error("error in error event", { error: err });
417380
}
418381

419382
// Log the error

packages/core/src/manager/router.ts

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -442,34 +442,37 @@ export function createManagerRouter(
442442
clientWs = await clientWsPromise;
443443

444444
// Add message handler to forward messages from client to server
445-
clientWs.onmessage = (clientEvt: MessageEvent) => {
446-
logger().debug("test websocket connection message");
447-
448-
if (serverWs.readyState === 1) {
449-
// OPEN
450-
serverWs.send(clientEvt.data as any);
451-
}
452-
};
445+
clientWs.addEventListener(
446+
"message",
447+
(clientEvt: MessageEvent) => {
448+
logger().debug("test websocket connection message");
449+
450+
if (serverWs.readyState === 1) {
451+
// OPEN
452+
serverWs.send(clientEvt.data as any);
453+
}
454+
},
455+
);
453456

454457
// Add close handler to close server when client closes
455-
clientWs.onclose = (clientEvt: CloseEvent) => {
458+
clientWs.addEventListener("close", (clientEvt: CloseEvent) => {
456459
logger().debug("test websocket connection closed");
457460

458461
if (serverWs.readyState !== 3) {
459462
// Not CLOSED
460463
serverWs.close(clientEvt.code, clientEvt.reason);
461464
}
462-
};
465+
});
463466

464467
// Add error handler
465-
clientWs.onerror = () => {
468+
clientWs.addEventListener("error", () => {
466469
logger().debug("test websocket connection error");
467470

468471
if (serverWs.readyState !== 3) {
469472
// Not CLOSED
470473
serverWs.close(1011, "Error in client websocket");
471474
}
472-
};
475+
});
473476
} catch (error) {
474477
logger().error(
475478
"failed to establish client websocket connection",

0 commit comments

Comments
 (0)