Skip to content

Commit d54d197

Browse files
Debounce reconnect disconnect logging (#1862)
1 parent de7734c commit d54d197

File tree

4 files changed

+123
-9
lines changed

4 files changed

+123
-9
lines changed

apps/web/src/environments/runtime/connection.test.ts

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ import type { WsRpcClient } from "~/rpc/wsRpcClient";
66

77
function createTestClient(options?: {
88
readonly getSnapshot?: () => Promise<{ readonly snapshotSequence: number }>;
9+
readonly replayEvents?: () => Promise<ReadonlyArray<any>>;
910
}) {
1011
const lifecycleListeners = new Set<(event: any) => void>();
1112
const configListeners = new Set<(event: any) => void>();
1213
const terminalListeners = new Set<(event: any) => void>();
14+
let domainResubscribe: (() => void) | undefined;
1315

1416
const getSnapshot = vi.fn(
1517
options?.getSnapshot ??
@@ -20,6 +22,7 @@ function createTestClient(options?: {
2022
threads: [],
2123
}) as any),
2224
);
25+
const replayEvents = vi.fn(options?.replayEvents ?? (async () => []));
2326

2427
const client = {
2528
dispose: vi.fn(async () => undefined),
@@ -49,8 +52,15 @@ function createTestClient(options?: {
4952
dispatchCommand: vi.fn(async () => undefined),
5053
getTurnDiff: vi.fn(async () => undefined),
5154
getFullThreadDiff: vi.fn(async () => undefined),
52-
replayEvents: vi.fn(async () => []),
53-
onDomainEvent: () => () => undefined,
55+
replayEvents,
56+
onDomainEvent: vi.fn((_: (event: any) => void, options?: { onResubscribe?: () => void }) => {
57+
domainResubscribe = options?.onResubscribe;
58+
return () => {
59+
if (domainResubscribe === options?.onResubscribe) {
60+
domainResubscribe = undefined;
61+
}
62+
};
63+
}),
5464
},
5565
terminal: {
5666
open: vi.fn(async () => undefined),
@@ -114,6 +124,9 @@ function createTestClient(options?: {
114124
});
115125
}
116126
},
127+
triggerDomainResubscribe: () => {
128+
domainResubscribe?.();
129+
},
117130
};
118131
}
119132

@@ -213,4 +226,63 @@ describe("createEnvironmentConnection", () => {
213226

214227
await connection.dispose();
215228
});
229+
230+
it("swallows replay recovery failures triggered by resubscribe", async () => {
231+
const environmentId = EnvironmentId.makeUnsafe("env-1");
232+
const snapshotError = new Error("snapshot failed");
233+
let snapshotCalls = 0;
234+
const { client, triggerDomainResubscribe } = createTestClient({
235+
getSnapshot: async () => {
236+
snapshotCalls += 1;
237+
if (snapshotCalls === 1) {
238+
return {
239+
snapshotSequence: 1,
240+
projects: [],
241+
threads: [],
242+
} as any;
243+
}
244+
245+
throw snapshotError;
246+
},
247+
replayEvents: async () => {
248+
throw new Error("SocketCloseError: 1006");
249+
},
250+
});
251+
252+
const connection = createEnvironmentConnection({
253+
kind: "saved",
254+
knownEnvironment: {
255+
id: "env-1",
256+
label: "Remote env",
257+
source: "manual",
258+
target: {
259+
httpBaseUrl: "http://example.test",
260+
wsBaseUrl: "ws://example.test",
261+
},
262+
environmentId,
263+
},
264+
client,
265+
applyEventBatch: vi.fn(),
266+
syncSnapshot: vi.fn(),
267+
applyTerminalEvent: vi.fn(),
268+
});
269+
270+
await Promise.resolve();
271+
await Promise.resolve();
272+
273+
const onUnhandledRejection = vi.fn();
274+
process.on("unhandledRejection", onUnhandledRejection);
275+
276+
try {
277+
triggerDomainResubscribe();
278+
await new Promise((resolve) => setTimeout(resolve, 0));
279+
await new Promise((resolve) => setTimeout(resolve, 0));
280+
} finally {
281+
process.off("unhandledRejection", onUnhandledRejection);
282+
}
283+
284+
expect(onUnhandledRejection).not.toHaveBeenCalled();
285+
286+
await connection.dispose();
287+
});
216288
});

apps/web/src/environments/runtime/connection.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ export function createEnvironmentConnection(
128128
queueMicrotask(flushPendingDomainEvents);
129129
};
130130

131+
const scheduleReplayRecovery = (reason: "sequence-gap" | "resubscribe") => {
132+
void runReplayRecovery(reason).catch(() => undefined);
133+
};
134+
131135
const runReplayRecovery = async (reason: "sequence-gap" | "resubscribe"): Promise<void> => {
132136
if (!recovery.beginReplayRecovery(reason)) {
133137
return;
@@ -172,7 +176,7 @@ export function createEnvironmentConnection(
172176
return;
173177
}
174178
}
175-
void runReplayRecovery(reason);
179+
scheduleReplayRecovery(reason);
176180
} else if (replayCompletion.shouldReplay && import.meta.env.MODE !== "test") {
177181
console.warn(
178182
"[orchestration-recovery]",
@@ -198,7 +202,7 @@ export function createEnvironmentConnection(
198202
if (!disposed) {
199203
input.syncSnapshot(snapshot, environmentId);
200204
if (recovery.completeSnapshotRecovery(snapshot.snapshotSequence)) {
201-
void runReplayRecovery("sequence-gap");
205+
scheduleReplayRecovery("sequence-gap");
202206
}
203207
}
204208
} catch (error) {
@@ -245,7 +249,7 @@ export function createEnvironmentConnection(
245249
}
246250
if (action === "recover") {
247251
flushPendingDomainEvents();
248-
void runReplayRecovery("sequence-gap");
252+
scheduleReplayRecovery("sequence-gap");
249253
}
250254
},
251255
{
@@ -254,7 +258,7 @@ export function createEnvironmentConnection(
254258
return;
255259
}
256260
flushPendingDomainEvents();
257-
void runReplayRecovery("resubscribe");
261+
scheduleReplayRecovery("resubscribe");
258262
},
259263
},
260264
);

apps/web/src/rpc/wsTransport.test.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,39 @@ describe("WsTransport", () => {
707707
await transport.dispose();
708708
});
709709

710+
it("logs a transport disconnect once even when multiple subscriptions fail together", async () => {
711+
const transport = new WsTransport("ws://localhost:3020");
712+
const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => undefined);
713+
714+
const unsubscribeA = transport.subscribe(
715+
() => Stream.fail(new Error("SocketCloseError: 1006")),
716+
vi.fn(),
717+
{ retryDelay: 10 },
718+
);
719+
const unsubscribeB = transport.subscribe(
720+
() => Stream.fail(new Error("SocketCloseError: 1006")),
721+
vi.fn(),
722+
{ retryDelay: 10 },
723+
);
724+
725+
await waitFor(() => {
726+
expect(sockets).toHaveLength(1);
727+
});
728+
729+
getSocket().open();
730+
731+
await waitFor(() => {
732+
expect(warnSpy).toHaveBeenCalledTimes(1);
733+
});
734+
expect(warnSpy).toHaveBeenCalledWith("WebSocket RPC subscription disconnected", {
735+
error: "SocketCloseError: 1006",
736+
});
737+
738+
unsubscribeA();
739+
unsubscribeB();
740+
await transport.dispose();
741+
});
742+
710743
it("streams finite request events without re-subscribing", async () => {
711744
const transport = new WsTransport("ws://localhost:3020");
712745
const listener = vi.fn();

apps/web/src/rpc/wsTransport.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ export class WsTransport {
5050
private readonly url: WsRpcProtocolSocketUrlProvider;
5151
private readonly lifecycleHandlers: WsProtocolLifecycleHandlers | undefined;
5252
private disposed = false;
53+
private hasReportedTransportDisconnect = false;
5354
private reconnectChain: Promise<void> = Promise.resolve();
5455
private session: TransportSession;
5556

@@ -136,6 +137,7 @@ export class WsTransport {
136137
listener,
137138
() => active,
138139
() => {
140+
this.hasReportedTransportDisconnect = false;
139141
hasReceivedValue = true;
140142
},
141143
);
@@ -156,9 +158,12 @@ export class WsTransport {
156158
return;
157159
}
158160

159-
console.warn("WebSocket RPC subscription disconnected", {
160-
error: formattedError,
161-
});
161+
if (!this.hasReportedTransportDisconnect) {
162+
console.warn("WebSocket RPC subscription disconnected", {
163+
error: formattedError,
164+
});
165+
}
166+
this.hasReportedTransportDisconnect = true;
162167
await sleep(retryDelayMs);
163168
}
164169
}

0 commit comments

Comments
 (0)