Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.

Commit b257e0a

Browse files
committed
fix(core): update FakeWebSocket to handle onopen assigned after open event fired
1 parent 43e35aa commit b257e0a

File tree

5 files changed

+90
-173
lines changed

5 files changed

+90
-173
lines changed

packages/core/src/inline-client-driver/fake-websocket.ts

Lines changed: 90 additions & 173 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,16 @@
11
import { WSContext } from "hono/ws";
22
import { logger } from "@/registry/log";
3-
import type { ConnectWebSocketOutput } from "@/actor/router-endpoints";
4-
import type * as messageToServer from "@/actor/protocol/message/to-server";
5-
import { parseMessage } from "@/actor/protocol/message/mod";
6-
import type { InputData } from "@/actor/protocol/serde";
7-
import type {
8-
Event,
9-
CloseEvent,
10-
MessageEvent,
11-
} from "ws";
3+
import type { ConnectWebSocketOutput } from "@/actor/router-endpoints";
4+
import type * as messageToServer from "@/actor/protocol/message/to-server";
5+
import { parseMessage } from "@/actor/protocol/message/mod";
6+
import type { InputData } from "@/actor/protocol/serde";
7+
import type { Event, CloseEvent, MessageEvent } from "ws";
128

139
/**
1410
* FakeWebSocket implements a WebSocket-like interface
1511
* that connects to a ConnectWebSocketOutput handler
1612
*/
1713
export class FakeWebSocket {
18-
// WebSocket interface properties
19-
bufferedAmount = 0;
20-
extensions = "";
21-
protocol = "";
22-
url = "";
23-
24-
// Event handlers
25-
onclose: ((ev: any) => void) | null = null;
26-
onerror: ((ev: any) => void) | null = null;
27-
onmessage: ((ev: any) => void) | null = null;
28-
onopen: ((ev: any) => void) | null = null;
29-
3014
// WebSocket readyState values
3115
readonly CONNECTING = 0 as const;
3216
readonly OPEN = 1 as const;
@@ -37,37 +21,59 @@ export class FakeWebSocket {
3721
#handler: ConnectWebSocketOutput;
3822
#wsContext: WSContext;
3923
#readyState: 0 | 1 | 2 | 3 = 0; // Start in CONNECTING state
40-
#initPromise: Promise<void>;
41-
#initResolve: (value: void) => void;
42-
#initReject: (reason: any) => void;
4324
#queuedMessages: Array<string | ArrayBuffer | Uint8Array> = [];
25+
// Event buffering is needed since onopen/onmessage events can be fired
26+
// before JavaScript has a chance to assign handlers (e.g. within the same tick)
27+
#bufferedEvents: Array<{
28+
type: "open" | "close" | "error" | "message";
29+
event: any;
30+
}> = [];
31+
32+
// Event handlers with buffering
33+
#onopen: ((ev: any) => void) | null = null;
34+
#onclose: ((ev: any) => void) | null = null;
35+
#onerror: ((ev: any) => void) | null = null;
36+
#onmessage: ((ev: any) => void) | null = null;
37+
38+
get onopen() {
39+
return this.#onopen;
40+
}
41+
set onopen(handler: ((ev: any) => void) | null) {
42+
this.#onopen = handler;
43+
if (handler) this.#flushBufferedEvents("open");
44+
}
45+
46+
get onclose() {
47+
return this.#onclose;
48+
}
49+
set onclose(handler: ((ev: any) => void) | null) {
50+
this.#onclose = handler;
51+
if (handler) this.#flushBufferedEvents("close");
52+
}
53+
54+
get onerror() {
55+
return this.#onerror;
56+
}
57+
set onerror(handler: ((ev: any) => void) | null) {
58+
this.#onerror = handler;
59+
if (handler) this.#flushBufferedEvents("error");
60+
}
61+
62+
get onmessage() {
63+
return this.#onmessage;
64+
}
65+
set onmessage(handler: ((ev: any) => void) | null) {
66+
this.#onmessage = handler;
67+
if (handler) this.#flushBufferedEvents("message");
68+
}
4469

45-
/**
46-
* Creates a new FakeWebSocket connected to a ConnectWebSocketOutput handler
47-
*/
4870
constructor(handler: ConnectWebSocketOutput) {
4971
this.#handler = handler;
5072

51-
// Create promise resolvers for initialization
52-
const initPromise = Promise.withResolvers<void>();
53-
this.#initPromise = initPromise.promise;
54-
this.#initResolve = initPromise.resolve;
55-
this.#initReject = initPromise.reject;
56-
5773
// Create a fake WSContext to pass to the handler
5874
this.#wsContext = new WSContext({
5975
send: (data: string | ArrayBuffer | Uint8Array) => {
60-
logger().debug("WSContext.send called", {
61-
dataType: typeof data,
62-
dataLength:
63-
typeof data === "string"
64-
? data.length
65-
: data instanceof ArrayBuffer
66-
? data.byteLength
67-
: data instanceof Uint8Array
68-
? data.byteLength
69-
: "unknown",
70-
});
76+
logger().debug("WSContext.send called");
7177
this.#handleMessage(data);
7278
},
7379
close: (code?: number, reason?: string) => {
@@ -76,24 +82,16 @@ export class FakeWebSocket {
7682
},
7783
// Set readyState to 1 (OPEN) since handlers expect an open connection
7884
readyState: 1,
79-
url: "ws://fake-websocket/",
80-
protocol: "",
8185
});
8286

8387
// Initialize the connection
8488
this.#initialize();
8589
}
8690

87-
/**
88-
* Returns the current ready state of the connection
89-
*/
9091
get readyState(): 0 | 1 | 2 | 3 {
9192
return this.#readyState;
9293
}
9394

94-
/**
95-
* Sends data through the connection
96-
*/
9795
send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void {
9896
logger().debug("send called", { readyState: this.readyState });
9997

@@ -118,16 +116,6 @@ export class FakeWebSocket {
118116
});
119117
const message = JSON.parse(data) as messageToServer.ToServer;
120118

121-
logger().debug("fake websocket sending message", {
122-
messageType:
123-
message.b &&
124-
("ar" in message.b
125-
? "action"
126-
: "sr" in message.b
127-
? "subscription"
128-
: "unknown"),
129-
});
130-
131119
this.#handler.onMessage(message).catch((err) => {
132120
logger().error("error handling websocket message", {
133121
error: err,
@@ -219,83 +207,6 @@ export class FakeWebSocket {
219207
});
220208
}
221209

222-
/**
223-
* Implementation of EventTarget methods (minimal implementation)
224-
*/
225-
addEventListener(type: string, listener: any): void {
226-
// Map to the onXXX properties
227-
switch (type) {
228-
case "open":
229-
this.onopen =
230-
typeof listener === "function"
231-
? listener
232-
: (ev) => listener.handleEvent(ev);
233-
break;
234-
case "message":
235-
this.onmessage =
236-
typeof listener === "function"
237-
? listener
238-
: (ev) => listener.handleEvent(ev);
239-
break;
240-
case "close":
241-
this.onclose =
242-
typeof listener === "function"
243-
? listener
244-
: (ev) => listener.handleEvent(ev);
245-
break;
246-
case "error":
247-
this.onerror =
248-
typeof listener === "function"
249-
? listener
250-
: (ev) => listener.handleEvent(ev);
251-
break;
252-
}
253-
}
254-
255-
removeEventListener(type: string): void {
256-
// Simple implementation that just nullifies the corresponding handler
257-
switch (type) {
258-
case "open":
259-
this.onopen = null;
260-
break;
261-
case "message":
262-
this.onmessage = null;
263-
break;
264-
case "close":
265-
this.onclose = null;
266-
break;
267-
case "error":
268-
this.onerror = null;
269-
break;
270-
}
271-
}
272-
273-
dispatchEvent(event: any): boolean {
274-
// Dispatch to the corresponding handler
275-
switch (event.type) {
276-
case "open":
277-
if (this.onopen) this.onopen(event);
278-
break;
279-
case "message":
280-
if (this.onmessage) this.onmessage(event as MessageEvent);
281-
break;
282-
case "close":
283-
if (this.onclose) this.onclose(event as CloseEvent);
284-
break;
285-
case "error":
286-
if (this.onerror) this.onerror(event);
287-
break;
288-
}
289-
return !event.defaultPrevented;
290-
}
291-
292-
/**
293-
* Wait for the WebSocket to be initialized and ready
294-
*/
295-
waitForReady(): Promise<void> {
296-
return this.#initPromise;
297-
}
298-
299210
/**
300211
* Initialize the connection with the handler
301212
*/
@@ -314,11 +225,6 @@ export class FakeWebSocket {
314225
// Fire the open event
315226
this.#fireOpen();
316227

317-
// Resolve the initialization promise - do this BEFORE processing queued messages
318-
// This allows clients to set up their event handlers before messages are processed
319-
logger().info("resolving initialization promise");
320-
this.#initResolve();
321-
322228
// Delay processing queued messages slightly to allow event handlers to be set up
323229
if (this.#queuedMessages.length > 0) {
324230
if (this.readyState !== this.OPEN) {
@@ -348,7 +254,6 @@ export class FakeWebSocket {
348254
});
349255
this.#fireError(err);
350256
this.close(1011, "Internal error during initialization");
351-
this.#initReject(err);
352257
}
353258
}
354259

@@ -402,13 +307,11 @@ export class FakeWebSocket {
402307
logger().debug("dispatching message to onmessage handler");
403308
this.onmessage(event);
404309
} else {
405-
logger().warn("no onmessage handler registered, message dropped");
310+
logger().debug("no onmessage handler registered, buffering message");
311+
this.#bufferedEvents.push({ type: "message", event });
406312
}
407313
}
408314

409-
/**
410-
* Handle connection close from the server side
411-
*/
412315
#handleClose(code: number, reason: string): void {
413316
if (this.readyState === this.CLOSED) return;
414317

@@ -427,12 +330,41 @@ export class FakeWebSocket {
427330
// Dispatch the event
428331
if (this.onclose) {
429332
this.onclose(event);
333+
} else {
334+
this.#bufferedEvents.push({ type: "close", event });
335+
}
336+
}
337+
338+
#flushBufferedEvents(type: "open" | "close" | "error" | "message"): void {
339+
const eventsToFlush = this.#bufferedEvents.filter(
340+
(buffered) => buffered.type === type,
341+
);
342+
this.#bufferedEvents = this.#bufferedEvents.filter(
343+
(buffered) => buffered.type !== type,
344+
);
345+
346+
for (const { event } of eventsToFlush) {
347+
try {
348+
switch (type) {
349+
case "open":
350+
this.#onopen?.(event);
351+
break;
352+
case "close":
353+
this.#onclose?.(event);
354+
break;
355+
case "error":
356+
this.#onerror?.(event);
357+
break;
358+
case "message":
359+
this.#onmessage?.(event);
360+
break;
361+
}
362+
} catch (err) {
363+
logger().error(`error in buffered ${type} handler`, { error: err });
364+
}
430365
}
431366
}
432367

433-
/**
434-
* Fire the open event
435-
*/
436368
#fireOpen(): void {
437369
try {
438370
// Create an Event-like object since Event constructor may not be available
@@ -444,28 +376,26 @@ export class FakeWebSocket {
444376

445377
if (this.onopen) {
446378
this.onopen(event);
379+
} else {
380+
this.#bufferedEvents.push({ type: "open", event });
447381
}
448382
} catch (err) {
449383
logger().error("error in onopen handler", { error: err });
450384
}
451385
}
452386

453-
/**
454-
* Fire the close event
455-
*/
456387
#fireClose(event: CloseEvent): void {
457388
try {
458389
if (this.onclose) {
459390
this.onclose(event);
391+
} else {
392+
this.#bufferedEvents.push({ type: "close", event });
460393
}
461394
} catch (err) {
462395
logger().error("error in onclose handler", { error: err });
463396
}
464397
}
465398

466-
/**
467-
* Fire the error event
468-
*/
469399
#fireError(error: unknown): void {
470400
try {
471401
// Create an Event-like object for error
@@ -479,6 +409,8 @@ export class FakeWebSocket {
479409

480410
if (this.onerror) {
481411
this.onerror(event);
412+
} else {
413+
this.#bufferedEvents.push({ type: "error", event });
482414
}
483415
} catch (err) {
484416
logger().error("error in onerror handler", { error: err });
@@ -488,9 +420,6 @@ export class FakeWebSocket {
488420
logger().error("websocket error", { error });
489421
}
490422

491-
/**
492-
* Parse binary message and forward to handler
493-
*/
494423
async #parseBinaryMessage(data: Uint8Array): Promise<void> {
495424
try {
496425
logger().debug("parsing binary message", { dataLength: data.byteLength });
@@ -501,18 +430,6 @@ export class FakeWebSocket {
501430
maxIncomingMessageSize: 1024 * 1024, // 1MB default limit
502431
});
503432

504-
logger().debug("successfully parsed binary message", {
505-
messageType:
506-
message.b &&
507-
("i" in message.b
508-
? "init"
509-
: "ar" in message.b
510-
? "action"
511-
: "sr" in message.b
512-
? "subscription"
513-
: "unknown"),
514-
});
515-
516433
// Forward the parsed message to the handler
517434
await this.#handler.onMessage(message);
518435
logger().debug("handler processed binary message");

0 commit comments

Comments
 (0)