diff --git a/packages/core/src/inline-client-driver/fake-websocket.ts b/packages/core/src/inline-client-driver/fake-websocket.ts index b067e4cf4..9d6c98f1e 100644 --- a/packages/core/src/inline-client-driver/fake-websocket.ts +++ b/packages/core/src/inline-client-driver/fake-websocket.ts @@ -1,32 +1,16 @@ import { WSContext } from "hono/ws"; import { logger } from "@/registry/log"; -import type { ConnectWebSocketOutput } from "@/actor/router-endpoints"; -import type * as messageToServer from "@/actor/protocol/message/to-server"; -import { parseMessage } from "@/actor/protocol/message/mod"; -import type { InputData } from "@/actor/protocol/serde"; -import type { - Event, - CloseEvent, - MessageEvent, -} from "ws"; +import type { ConnectWebSocketOutput } from "@/actor/router-endpoints"; +import type * as messageToServer from "@/actor/protocol/message/to-server"; +import { parseMessage } from "@/actor/protocol/message/mod"; +import type { InputData } from "@/actor/protocol/serde"; +import type { Event, CloseEvent, MessageEvent } from "ws"; /** * FakeWebSocket implements a WebSocket-like interface * that connects to a ConnectWebSocketOutput handler */ export class FakeWebSocket { - // WebSocket interface properties - bufferedAmount = 0; - extensions = ""; - protocol = ""; - url = ""; - - // Event handlers - onclose: ((ev: any) => void) | null = null; - onerror: ((ev: any) => void) | null = null; - onmessage: ((ev: any) => void) | null = null; - onopen: ((ev: any) => void) | null = null; - // WebSocket readyState values readonly CONNECTING = 0 as const; readonly OPEN = 1 as const; @@ -37,37 +21,59 @@ export class FakeWebSocket { #handler: ConnectWebSocketOutput; #wsContext: WSContext; #readyState: 0 | 1 | 2 | 3 = 0; // Start in CONNECTING state - #initPromise: Promise; - #initResolve: (value: void) => void; - #initReject: (reason: any) => void; #queuedMessages: Array = []; + // Event buffering is needed since onopen/onmessage events can be fired + // before JavaScript has a chance to assign handlers (e.g. within the same tick) + #bufferedEvents: Array<{ + type: "open" | "close" | "error" | "message"; + event: any; + }> = []; + + // Event handlers with buffering + #onopen: ((ev: any) => void) | null = null; + #onclose: ((ev: any) => void) | null = null; + #onerror: ((ev: any) => void) | null = null; + #onmessage: ((ev: any) => void) | null = null; + + get onopen() { + return this.#onopen; + } + set onopen(handler: ((ev: any) => void) | null) { + this.#onopen = handler; + if (handler) this.#flushBufferedEvents("open"); + } + + get onclose() { + return this.#onclose; + } + set onclose(handler: ((ev: any) => void) | null) { + this.#onclose = handler; + if (handler) this.#flushBufferedEvents("close"); + } + + get onerror() { + return this.#onerror; + } + set onerror(handler: ((ev: any) => void) | null) { + this.#onerror = handler; + if (handler) this.#flushBufferedEvents("error"); + } + + get onmessage() { + return this.#onmessage; + } + set onmessage(handler: ((ev: any) => void) | null) { + this.#onmessage = handler; + if (handler) this.#flushBufferedEvents("message"); + } - /** - * Creates a new FakeWebSocket connected to a ConnectWebSocketOutput handler - */ constructor(handler: ConnectWebSocketOutput) { this.#handler = handler; - // Create promise resolvers for initialization - const initPromise = Promise.withResolvers(); - this.#initPromise = initPromise.promise; - this.#initResolve = initPromise.resolve; - this.#initReject = initPromise.reject; - // Create a fake WSContext to pass to the handler this.#wsContext = new WSContext({ send: (data: string | ArrayBuffer | Uint8Array) => { - logger().debug("WSContext.send called", { - dataType: typeof data, - dataLength: - typeof data === "string" - ? data.length - : data instanceof ArrayBuffer - ? data.byteLength - : data instanceof Uint8Array - ? data.byteLength - : "unknown", - }); + logger().debug("WSContext.send called"); this.#handleMessage(data); }, close: (code?: number, reason?: string) => { @@ -76,24 +82,16 @@ export class FakeWebSocket { }, // Set readyState to 1 (OPEN) since handlers expect an open connection readyState: 1, - url: "ws://fake-websocket/", - protocol: "", }); // Initialize the connection this.#initialize(); } - /** - * Returns the current ready state of the connection - */ get readyState(): 0 | 1 | 2 | 3 { return this.#readyState; } - /** - * Sends data through the connection - */ send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void { logger().debug("send called", { readyState: this.readyState }); @@ -118,16 +116,6 @@ export class FakeWebSocket { }); const message = JSON.parse(data) as messageToServer.ToServer; - logger().debug("fake websocket sending message", { - messageType: - message.b && - ("ar" in message.b - ? "action" - : "sr" in message.b - ? "subscription" - : "unknown"), - }); - this.#handler.onMessage(message).catch((err) => { logger().error("error handling websocket message", { error: err, @@ -219,83 +207,6 @@ export class FakeWebSocket { }); } - /** - * Implementation of EventTarget methods (minimal implementation) - */ - addEventListener(type: string, listener: any): void { - // Map to the onXXX properties - switch (type) { - case "open": - this.onopen = - typeof listener === "function" - ? listener - : (ev) => listener.handleEvent(ev); - break; - case "message": - this.onmessage = - typeof listener === "function" - ? listener - : (ev) => listener.handleEvent(ev); - break; - case "close": - this.onclose = - typeof listener === "function" - ? listener - : (ev) => listener.handleEvent(ev); - break; - case "error": - this.onerror = - typeof listener === "function" - ? listener - : (ev) => listener.handleEvent(ev); - break; - } - } - - removeEventListener(type: string): void { - // Simple implementation that just nullifies the corresponding handler - switch (type) { - case "open": - this.onopen = null; - break; - case "message": - this.onmessage = null; - break; - case "close": - this.onclose = null; - break; - case "error": - this.onerror = null; - break; - } - } - - dispatchEvent(event: any): boolean { - // Dispatch to the corresponding handler - switch (event.type) { - case "open": - if (this.onopen) this.onopen(event); - break; - case "message": - if (this.onmessage) this.onmessage(event as MessageEvent); - break; - case "close": - if (this.onclose) this.onclose(event as CloseEvent); - break; - case "error": - if (this.onerror) this.onerror(event); - break; - } - return !event.defaultPrevented; - } - - /** - * Wait for the WebSocket to be initialized and ready - */ - waitForReady(): Promise { - return this.#initPromise; - } - /** * Initialize the connection with the handler */ @@ -314,11 +225,6 @@ export class FakeWebSocket { // Fire the open event this.#fireOpen(); - // Resolve the initialization promise - do this BEFORE processing queued messages - // This allows clients to set up their event handlers before messages are processed - logger().info("resolving initialization promise"); - this.#initResolve(); - // Delay processing queued messages slightly to allow event handlers to be set up if (this.#queuedMessages.length > 0) { if (this.readyState !== this.OPEN) { @@ -348,7 +254,6 @@ export class FakeWebSocket { }); this.#fireError(err); this.close(1011, "Internal error during initialization"); - this.#initReject(err); } } @@ -402,13 +307,11 @@ export class FakeWebSocket { logger().debug("dispatching message to onmessage handler"); this.onmessage(event); } else { - logger().warn("no onmessage handler registered, message dropped"); + logger().debug("no onmessage handler registered, buffering message"); + this.#bufferedEvents.push({ type: "message", event }); } } - /** - * Handle connection close from the server side - */ #handleClose(code: number, reason: string): void { if (this.readyState === this.CLOSED) return; @@ -427,12 +330,41 @@ export class FakeWebSocket { // Dispatch the event if (this.onclose) { this.onclose(event); + } else { + this.#bufferedEvents.push({ type: "close", event }); + } + } + + #flushBufferedEvents(type: "open" | "close" | "error" | "message"): void { + const eventsToFlush = this.#bufferedEvents.filter( + (buffered) => buffered.type === type, + ); + this.#bufferedEvents = this.#bufferedEvents.filter( + (buffered) => buffered.type !== type, + ); + + for (const { event } of eventsToFlush) { + try { + switch (type) { + case "open": + this.#onopen?.(event); + break; + case "close": + this.#onclose?.(event); + break; + case "error": + this.#onerror?.(event); + break; + case "message": + this.#onmessage?.(event); + break; + } + } catch (err) { + logger().error(`error in buffered ${type} handler`, { error: err }); + } } } - /** - * Fire the open event - */ #fireOpen(): void { try { // Create an Event-like object since Event constructor may not be available @@ -444,28 +376,26 @@ export class FakeWebSocket { if (this.onopen) { this.onopen(event); + } else { + this.#bufferedEvents.push({ type: "open", event }); } } catch (err) { logger().error("error in onopen handler", { error: err }); } } - /** - * Fire the close event - */ #fireClose(event: CloseEvent): void { try { if (this.onclose) { this.onclose(event); + } else { + this.#bufferedEvents.push({ type: "close", event }); } } catch (err) { logger().error("error in onclose handler", { error: err }); } } - /** - * Fire the error event - */ #fireError(error: unknown): void { try { // Create an Event-like object for error @@ -479,6 +409,8 @@ export class FakeWebSocket { if (this.onerror) { this.onerror(event); + } else { + this.#bufferedEvents.push({ type: "error", event }); } } catch (err) { logger().error("error in onerror handler", { error: err }); @@ -488,9 +420,6 @@ export class FakeWebSocket { logger().error("websocket error", { error }); } - /** - * Parse binary message and forward to handler - */ async #parseBinaryMessage(data: Uint8Array): Promise { try { logger().debug("parsing binary message", { dataLength: data.byteLength }); @@ -501,18 +430,6 @@ export class FakeWebSocket { maxIncomingMessageSize: 1024 * 1024, // 1MB default limit }); - logger().debug("successfully parsed binary message", { - messageType: - message.b && - ("i" in message.b - ? "init" - : "ar" in message.b - ? "action" - : "sr" in message.b - ? "subscription" - : "unknown"), - }); - // Forward the parsed message to the handler await this.#handler.onMessage(message); logger().debug("handler processed binary message"); diff --git a/packages/core/tests/rivet/deployment.test.ts b/packages/core/tests/rivet/deployment.test.ts.old similarity index 100% rename from packages/core/tests/rivet/deployment.test.ts rename to packages/core/tests/rivet/deployment.test.ts.old diff --git a/packages/core/tests/rivet/driver-tests.test.ts b/packages/core/tests/rivet/driver-tests.test.ts.old similarity index 100% rename from packages/core/tests/rivet/driver-tests.test.ts rename to packages/core/tests/rivet/driver-tests.test.ts.old diff --git a/packages/core/tests/rivet/key-serialization.test.ts b/packages/core/tests/rivet/key-serialization.test.ts.old similarity index 100% rename from packages/core/tests/rivet/key-serialization.test.ts rename to packages/core/tests/rivet/key-serialization.test.ts.old diff --git a/packages/core/tests/rivet/rivet-deploy.ts b/packages/core/tests/rivet/rivet-deploy.ts.old similarity index 100% rename from packages/core/tests/rivet/rivet-deploy.ts rename to packages/core/tests/rivet/rivet-deploy.ts.old