Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/gateway/call.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ let lastClientOptions: {
tlsFingerprint?: string;
scopes?: string[];
onHelloOk?: (hello: { features?: { methods?: string[] } }) => void | Promise<void>;
onConnectError?: (err: Error) => void;
onClose?: (code: number, reason: string) => void;
} | null = null;
type StartMode = "hello" | "close" | "silent";
Expand All @@ -40,6 +41,7 @@ vi.mock("./client.js", () => ({
password?: string;
scopes?: string[];
onHelloOk?: (hello: { features?: { methods?: string[] } }) => void | Promise<void>;
onConnectError?: (err: Error) => void;
onClose?: (code: number, reason: string) => void;
}) {
lastClientOptions = opts;
Expand Down Expand Up @@ -1064,3 +1066,35 @@ describe("callGateway password resolution", () => {
expect(lastClientOptions?.[testCase.authKey]).toBe(testCase.explicitValue);
});
});


describe("callGateway connect error handling", () => {
beforeEach(() => {
resetGatewayCallMocks();
setLocalLoopbackGatewayConfig();
startMode = "silent";
});

it("surfaces connect errors instead of generic close errors", async () => {
const pending = callGateway({ method: "health", timeoutMs: 5_000 }).catch((err) => err as Error);

await new Promise((resolve) => setTimeout(resolve, 0));
lastClientOptions?.onConnectError?.(new Error("auth challenge failed: token expired"));
lastClientOptions?.onClose?.(1008, "connect failed");

const err = await pending;
expect(err).toBeInstanceOf(Error);
expect(err.message).toContain("auth challenge failed: token expired");
expect(err.message).not.toContain("gateway closed (1008");
});

it("ignores late connect errors after a successful settlement", async () => {
startMode = "hello";
const result = await callGateway({ method: "health" });
expect(result).toEqual({ ok: true });

expect(() => {
lastClientOptions?.onConnectError?.(new Error("stale connect error"));
}).not.toThrow();
});
});
18 changes: 16 additions & 2 deletions src/gateway/call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
type GatewayClientName,
} from "../utils/message-channel.js";
import { VERSION } from "../version.js";
import { GatewayClient } from "./client.js";
import { GatewayClient, GatewayRequestAcceptedError } from "./client.js";
import {
GatewaySecretRefUnavailableError,
resolveGatewayCredentialsFromConfig,
Expand Down Expand Up @@ -818,6 +818,7 @@ async function executeGatewayRequestWithScopes<T>(params: {
mode: opts.mode ?? GATEWAY_CLIENT_MODES.CLI,
role: "operator",
scopes,
reconnect: false,
deviceIdentity: loadOrCreateDeviceIdentity(),
minProtocol: opts.minProtocol ?? PROTOCOL_VERSION,
maxProtocol: opts.maxProtocol ?? PROTOCOL_VERSION,
Expand All @@ -840,13 +841,26 @@ async function executeGatewayRequestWithScopes<T>(params: {
stop(err as Error);
}
},
onConnectError: (err) => {
if (settled || ignoreClose) {
return;
}
ignoreClose = true;
client.stop();
stop(err);
},
onClose: (code, reason) => {
if (settled || ignoreClose) {
return;
}
ignoreClose = true;
client.stop();
stop(new Error(formatGatewayCloseError(code, reason, params.connectionDetails)));
const closeError = formatGatewayCloseError(code, reason, params.connectionDetails);
stop(
client.hadAcceptedRequest
? new GatewayRequestAcceptedError(closeError)
: new Error(closeError),
);
},
});

Expand Down
52 changes: 52 additions & 0 deletions src/gateway/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -459,3 +459,55 @@ describe("GatewayClient connect auth payload", () => {
client.stop();
});
});


describe("GatewayClient async onHelloOk error propagation", () => {
beforeEach(() => {
wsInstances.length = 0;
});

it("reports async onHelloOk failures through onConnectError", async () => {
const onConnectError = vi.fn();
const helloError = new Error("request failed inside onHelloOk");
const client = new GatewayClient({
url: "ws://127.0.0.1:18789",
reconnect: false,
onHelloOk: async () => {
throw helloError;
},
onConnectError,
});

client.start();
const ws = getLatestWs();
ws.emitOpen();
ws.emitMessage(
JSON.stringify({
type: "event",
event: "connect.challenge",
payload: { nonce: "test-nonce" },
}),
);

const connectFrame = ws.sent.find((frame) => frame.includes('"method":"connect"'));
expect(connectFrame).toBeDefined();
const connectId = JSON.parse(connectFrame as string).id as string;
ws.emitMessage(
JSON.stringify({
type: "res",
id: connectId,
ok: true,
payload: {
type: "hello-ok",
protocol: 1,
auth: null,
policy: { tickIntervalMs: 30_000 },
},
}),
);

await new Promise((resolve) => setTimeout(resolve, 0));
expect(onConnectError).toHaveBeenCalledWith(helloError);
client.stop();
});
});
28 changes: 25 additions & 3 deletions src/gateway/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Pending = {
resolve: (value: unknown) => void;
reject: (err: unknown) => void;
expectFinal: boolean;
ackReceived: boolean;
};

export type GatewayClientOptions = {
Expand Down Expand Up @@ -70,6 +71,7 @@ export type GatewayClientOptions = {
onConnectError?: (err: Error) => void;
onClose?: (code: number, reason: string) => void;
onGap?: (info: { expected: number; received: number }) => void;
reconnect?: boolean;
};

export const GATEWAY_CLOSE_CODE_HINTS: Readonly<Record<number, string>> = {
Expand All @@ -83,6 +85,15 @@ export function describeGatewayCloseCode(code: number): string | undefined {
return GATEWAY_CLOSE_CODE_HINTS[code];
}

export class GatewayRequestAcceptedError extends Error {
readonly code = "GATEWAY_REQUEST_ACCEPTED_THEN_CLOSED";

constructor(message: string) {
super(message);
this.name = "GatewayRequestAcceptedError";
}
}

export class GatewayClient {
private ws: WebSocket | null = null;
private opts: GatewayClientOptions;
Expand All @@ -93,6 +104,7 @@ export class GatewayClient {
private connectNonce: string | null = null;
private connectSent = false;
private connectTimer: NodeJS.Timeout | null = null;
private hadAcceptedRequestValue = false;
// Track last tick to detect silent stalls.
private lastTick: number | null = null;
private tickIntervalMs = 30_000;
Expand Down Expand Up @@ -232,6 +244,10 @@ export class GatewayClient {
this.flushPendingErrors(new Error("gateway client stopped"));
}

get hadAcceptedRequest(): boolean {
return this.hadAcceptedRequestValue;
}

private sendConnect() {
if (this.connectSent) {
return;
Expand Down Expand Up @@ -326,7 +342,7 @@ export class GatewayClient {
};

void this.request<HelloOk>("connect", params)
.then((helloOk) => {
.then(async (helloOk) => {
const authInfo = helloOk?.auth;
if (authInfo?.deviceToken && this.opts.deviceIdentity) {
storeDeviceAuthToken({
Expand All @@ -343,7 +359,7 @@ export class GatewayClient {
: 30_000;
this.lastTick = Date.now();
this.startTickWatch();
this.opts.onHelloOk?.(helloOk);
await this.opts.onHelloOk?.(helloOk);
})
.catch((err) => {
this.opts.onConnectError?.(err instanceof Error ? err : new Error(String(err)));
Expand Down Expand Up @@ -396,6 +412,8 @@ export class GatewayClient {
const payload = parsed.payload as { status?: unknown } | undefined;
const status = payload?.status;
if (pending.expectFinal && status === "accepted") {
pending.ackReceived = true;
this.hadAcceptedRequestValue = true;
return;
}
this.pending.delete(parsed.id);
Expand Down Expand Up @@ -431,6 +449,9 @@ export class GatewayClient {
}

private scheduleReconnect() {
if (this.opts.reconnect === false) {
return;
}
if (this.closed) {
return;
}
Expand All @@ -445,7 +466,7 @@ export class GatewayClient {

private flushPendingErrors(err: Error) {
for (const [, p] of this.pending) {
p.reject(err);
p.reject(p.ackReceived ? new GatewayRequestAcceptedError(err.message) : err);
}
this.pending.clear();
}
Expand Down Expand Up @@ -522,6 +543,7 @@ export class GatewayClient {
resolve: (value) => resolve(value as T),
reject,
expectFinal,
ackReceived: false,
});
});
this.ws.send(JSON.stringify(frame));
Expand Down
6 changes: 4 additions & 2 deletions src/gateway/server/ws-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,15 @@ export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnecti
}
};

const send = (obj: unknown) => {
const send = (obj: unknown): boolean => {
try {
socket.send(JSON.stringify(obj));
return true;
} catch (err) {
logWsControl.warn(
`[ws] failed to serialize outbound frame conn=${connId} type=${lastFrameType ?? "unknown"} method=${lastFrameMethod ?? "unknown"} id=${lastFrameId ?? "unknown"} error=${formatError(err)}`,
`[ws] failed to send outbound frame conn=${connId} type=${lastFrameType ?? "unknown"} method=${lastFrameMethod ?? "unknown"} id=${lastFrameId ?? "unknown"} error=${formatError(err)}`,
);
return false;
}
};

Expand Down
2 changes: 1 addition & 1 deletion src/gateway/server/ws-connection/message-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ export function attachGatewayWsMessageHandler(params: {
events: string[];
extraHandlers: GatewayRequestHandlers;
buildRequestContext: () => GatewayRequestContext;
send: (obj: unknown) => void;
send: (obj: unknown) => boolean;
close: (code?: number, reason?: string) => void;
isClosed: () => boolean;
clearHandshakeTimer: () => void;
Expand Down
Loading