Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 90 additions & 173 deletions packages/core/src/inline-client-driver/fake-websocket.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,16 @@
import { WSContext } from "hono/ws";
import { logger } from "@/registry/log";
import type { ConnectWebSocketOutput } from "@/actor/router-endpoints";
import type * as messageToServer from "@/actor/protocol/message/to-server";
import { parseMessage } from "@/actor/protocol/message/mod";
import type { InputData } from "@/actor/protocol/serde";
import type {
Event,
CloseEvent,
MessageEvent,
} from "ws";
import type { ConnectWebSocketOutput } from "@/actor/router-endpoints";
import type * as messageToServer from "@/actor/protocol/message/to-server";
import { parseMessage } from "@/actor/protocol/message/mod";
import type { InputData } from "@/actor/protocol/serde";
import type { Event, CloseEvent, MessageEvent } from "ws";

/**
* FakeWebSocket implements a WebSocket-like interface
* that connects to a ConnectWebSocketOutput handler
*/
export class FakeWebSocket {
// WebSocket interface properties
bufferedAmount = 0;
extensions = "";
protocol = "";
url = "";

// Event handlers
onclose: ((ev: any) => void) | null = null;
onerror: ((ev: any) => void) | null = null;
onmessage: ((ev: any) => void) | null = null;
onopen: ((ev: any) => void) | null = null;

// WebSocket readyState values
readonly CONNECTING = 0 as const;
readonly OPEN = 1 as const;
Expand All @@ -37,37 +21,59 @@ export class FakeWebSocket {
#handler: ConnectWebSocketOutput;
#wsContext: WSContext;
#readyState: 0 | 1 | 2 | 3 = 0; // Start in CONNECTING state
#initPromise: Promise<void>;
#initResolve: (value: void) => void;
#initReject: (reason: any) => void;
#queuedMessages: Array<string | ArrayBuffer | Uint8Array> = [];
// Event buffering is needed since onopen/onmessage events can be fired
// before JavaScript has a chance to assign handlers (e.g. within the same tick)
#bufferedEvents: Array<{
type: "open" | "close" | "error" | "message";
event: any;
}> = [];

// Event handlers with buffering
#onopen: ((ev: any) => void) | null = null;
#onclose: ((ev: any) => void) | null = null;
#onerror: ((ev: any) => void) | null = null;
#onmessage: ((ev: any) => void) | null = null;

get onopen() {
return this.#onopen;
}
set onopen(handler: ((ev: any) => void) | null) {
this.#onopen = handler;
if (handler) this.#flushBufferedEvents("open");
}

get onclose() {
return this.#onclose;
}
set onclose(handler: ((ev: any) => void) | null) {
this.#onclose = handler;
if (handler) this.#flushBufferedEvents("close");
}

get onerror() {
return this.#onerror;
}
set onerror(handler: ((ev: any) => void) | null) {
this.#onerror = handler;
if (handler) this.#flushBufferedEvents("error");
}

get onmessage() {
return this.#onmessage;
}
set onmessage(handler: ((ev: any) => void) | null) {
this.#onmessage = handler;
if (handler) this.#flushBufferedEvents("message");
}

/**
* Creates a new FakeWebSocket connected to a ConnectWebSocketOutput handler
*/
constructor(handler: ConnectWebSocketOutput) {
this.#handler = handler;

// Create promise resolvers for initialization
const initPromise = Promise.withResolvers<void>();
this.#initPromise = initPromise.promise;
this.#initResolve = initPromise.resolve;
this.#initReject = initPromise.reject;

// Create a fake WSContext to pass to the handler
this.#wsContext = new WSContext({
send: (data: string | ArrayBuffer | Uint8Array) => {
logger().debug("WSContext.send called", {
dataType: typeof data,
dataLength:
typeof data === "string"
? data.length
: data instanceof ArrayBuffer
? data.byteLength
: data instanceof Uint8Array
? data.byteLength
: "unknown",
});
logger().debug("WSContext.send called");
this.#handleMessage(data);
},
close: (code?: number, reason?: string) => {
Expand All @@ -76,24 +82,16 @@ export class FakeWebSocket {
},
// Set readyState to 1 (OPEN) since handlers expect an open connection
readyState: 1,
url: "ws://fake-websocket/",
protocol: "",
});

// Initialize the connection
this.#initialize();
}

/**
* Returns the current ready state of the connection
*/
get readyState(): 0 | 1 | 2 | 3 {
return this.#readyState;
}

/**
* Sends data through the connection
*/
send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void {
logger().debug("send called", { readyState: this.readyState });

Expand All @@ -118,16 +116,6 @@ export class FakeWebSocket {
});
const message = JSON.parse(data) as messageToServer.ToServer;

logger().debug("fake websocket sending message", {
messageType:
message.b &&
("ar" in message.b
? "action"
: "sr" in message.b
? "subscription"
: "unknown"),
});

this.#handler.onMessage(message).catch((err) => {
logger().error("error handling websocket message", {
error: err,
Expand Down Expand Up @@ -219,83 +207,6 @@ export class FakeWebSocket {
});
}

/**
* Implementation of EventTarget methods (minimal implementation)
*/
addEventListener(type: string, listener: any): void {
// Map to the onXXX properties
switch (type) {
case "open":
this.onopen =
typeof listener === "function"
? listener
: (ev) => listener.handleEvent(ev);
break;
case "message":
this.onmessage =
typeof listener === "function"
? listener
: (ev) => listener.handleEvent(ev);
break;
case "close":
this.onclose =
typeof listener === "function"
? listener
: (ev) => listener.handleEvent(ev);
break;
case "error":
this.onerror =
typeof listener === "function"
? listener
: (ev) => listener.handleEvent(ev);
break;
}
}

removeEventListener(type: string): void {
// Simple implementation that just nullifies the corresponding handler
switch (type) {
case "open":
this.onopen = null;
break;
case "message":
this.onmessage = null;
break;
case "close":
this.onclose = null;
break;
case "error":
this.onerror = null;
break;
}
}

dispatchEvent(event: any): boolean {
// Dispatch to the corresponding handler
switch (event.type) {
case "open":
if (this.onopen) this.onopen(event);
break;
case "message":
if (this.onmessage) this.onmessage(event as MessageEvent);
break;
case "close":
if (this.onclose) this.onclose(event as CloseEvent);
break;
case "error":
if (this.onerror) this.onerror(event);
break;
}
return !event.defaultPrevented;
}

/**
* Wait for the WebSocket to be initialized and ready
*/
waitForReady(): Promise<void> {
return this.#initPromise;
}

/**
* Initialize the connection with the handler
*/
Expand All @@ -314,11 +225,6 @@ export class FakeWebSocket {
// Fire the open event
this.#fireOpen();

// Resolve the initialization promise - do this BEFORE processing queued messages
// This allows clients to set up their event handlers before messages are processed
logger().info("resolving initialization promise");
this.#initResolve();

// Delay processing queued messages slightly to allow event handlers to be set up
if (this.#queuedMessages.length > 0) {
if (this.readyState !== this.OPEN) {
Expand Down Expand Up @@ -348,7 +254,6 @@ export class FakeWebSocket {
});
this.#fireError(err);
this.close(1011, "Internal error during initialization");
this.#initReject(err);
}
}

Expand Down Expand Up @@ -402,13 +307,11 @@ export class FakeWebSocket {
logger().debug("dispatching message to onmessage handler");
this.onmessage(event);
} else {
logger().warn("no onmessage handler registered, message dropped");
logger().debug("no onmessage handler registered, buffering message");
this.#bufferedEvents.push({ type: "message", event });
}
}

/**
* Handle connection close from the server side
*/
#handleClose(code: number, reason: string): void {
if (this.readyState === this.CLOSED) return;

Expand All @@ -427,12 +330,41 @@ export class FakeWebSocket {
// Dispatch the event
if (this.onclose) {
this.onclose(event);
} else {
this.#bufferedEvents.push({ type: "close", event });
}
}

#flushBufferedEvents(type: "open" | "close" | "error" | "message"): void {
const eventsToFlush = this.#bufferedEvents.filter(
(buffered) => buffered.type === type,
);
this.#bufferedEvents = this.#bufferedEvents.filter(
(buffered) => buffered.type !== type,
);

for (const { event } of eventsToFlush) {
try {
switch (type) {
case "open":
this.#onopen?.(event);
break;
case "close":
this.#onclose?.(event);
break;
case "error":
this.#onerror?.(event);
break;
case "message":
this.#onmessage?.(event);
break;
}
} catch (err) {
logger().error(`error in buffered ${type} handler`, { error: err });
}
}
}

/**
* Fire the open event
*/
#fireOpen(): void {
try {
// Create an Event-like object since Event constructor may not be available
Expand All @@ -444,28 +376,26 @@ export class FakeWebSocket {

if (this.onopen) {
this.onopen(event);
} else {
this.#bufferedEvents.push({ type: "open", event });
}
} catch (err) {
logger().error("error in onopen handler", { error: err });
}
}

/**
* Fire the close event
*/
#fireClose(event: CloseEvent): void {
try {
if (this.onclose) {
this.onclose(event);
} else {
this.#bufferedEvents.push({ type: "close", event });
}
} catch (err) {
logger().error("error in onclose handler", { error: err });
}
}

/**
* Fire the error event
*/
#fireError(error: unknown): void {
try {
// Create an Event-like object for error
Expand All @@ -479,6 +409,8 @@ export class FakeWebSocket {

if (this.onerror) {
this.onerror(event);
} else {
this.#bufferedEvents.push({ type: "error", event });
}
} catch (err) {
logger().error("error in onerror handler", { error: err });
Expand All @@ -488,9 +420,6 @@ export class FakeWebSocket {
logger().error("websocket error", { error });
}

/**
* Parse binary message and forward to handler
*/
async #parseBinaryMessage(data: Uint8Array): Promise<void> {
try {
logger().debug("parsing binary message", { dataLength: data.byteLength });
Expand All @@ -501,18 +430,6 @@ export class FakeWebSocket {
maxIncomingMessageSize: 1024 * 1024, // 1MB default limit
});

logger().debug("successfully parsed binary message", {
messageType:
message.b &&
("i" in message.b
? "init"
: "ar" in message.b
? "action"
: "sr" in message.b
? "subscription"
: "unknown"),
});

// Forward the parsed message to the handler
await this.#handler.onMessage(message);
logger().debug("handler processed binary message");
Expand Down
Loading