From b400ffd00021acf624aef484d6097ed197e73a39 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 4 Apr 2026 17:12:49 -0700 Subject: [PATCH 1/3] Add WebSocket disconnect recovery and slow RPC toast UX (#1730) Co-authored-by: Hwanseo Choi Co-authored-by: codex --- apps/web/src/components/ChatView.tsx | 16 +- .../WebSocketConnectionSurface.logic.test.ts | 83 +++ .../components/WebSocketConnectionSurface.tsx | 544 ++++++++++++++++++ apps/web/src/components/ui/toast.tsx | 23 +- apps/web/src/orchestrationRecovery.ts | 6 +- apps/web/src/routes/__root.tsx | 58 +- apps/web/src/rpc/protocol.ts | 87 ++- apps/web/src/rpc/requestLatencyState.test.ts | 65 +++ apps/web/src/rpc/requestLatencyState.ts | 127 ++++ apps/web/src/rpc/transportError.test.ts | 24 + apps/web/src/rpc/transportError.ts | 23 + apps/web/src/rpc/wsConnectionState.test.ts | 112 ++++ apps/web/src/rpc/wsConnectionState.ts | 219 +++++++ apps/web/src/store.ts | 5 +- apps/web/src/wsNativeApi.test.ts | 14 + apps/web/src/wsNativeApi.ts | 7 +- apps/web/src/wsRpcClient.ts | 39 +- apps/web/src/wsTransport.test.ts | 271 ++++++++- apps/web/src/wsTransport.ts | 216 +++++-- packages/contracts/src/ipc.ts | 7 +- 20 files changed, 1847 insertions(+), 99 deletions(-) create mode 100644 apps/web/src/components/WebSocketConnectionSurface.logic.test.ts create mode 100644 apps/web/src/components/WebSocketConnectionSurface.tsx create mode 100644 apps/web/src/rpc/requestLatencyState.test.ts create mode 100644 apps/web/src/rpc/requestLatencyState.ts create mode 100644 apps/web/src/rpc/transportError.test.ts create mode 100644 apps/web/src/rpc/transportError.ts create mode 100644 apps/web/src/rpc/wsConnectionState.test.ts create mode 100644 apps/web/src/rpc/wsConnectionState.ts diff --git a/apps/web/src/components/ChatView.tsx b/apps/web/src/components/ChatView.tsx index 8641b7897f..5c29cdfa00 100644 --- a/apps/web/src/components/ChatView.tsx +++ b/apps/web/src/components/ChatView.tsx @@ -197,6 +197,7 @@ import { useServerConfig, useServerKeybindings, } from "~/rpc/serverState"; +import { sanitizeThreadErrorMessage } from "~/rpc/transportError"; const ATTACHMENT_PREVIEW_HANDOFF_TTL_MS = 5000; const IMAGE_SIZE_LIMIT_LABEL = `${Math.round(PROVIDER_SEND_TURN_MAX_IMAGE_BYTES / (1024 * 1024))}MB`; @@ -1620,17 +1621,18 @@ export default function ChatView({ threadId }: ChatViewProps) { const setThreadError = useCallback( (targetThreadId: ThreadId | null, error: string | null) => { if (!targetThreadId) return; + const nextError = sanitizeThreadErrorMessage(error); if (useStore.getState().threads.some((thread) => thread.id === targetThreadId)) { - setStoreThreadError(targetThreadId, error); + setStoreThreadError(targetThreadId, nextError); return; } setLocalDraftErrorsByThreadId((existing) => { - if ((existing[targetThreadId] ?? null) === error) { + if ((existing[targetThreadId] ?? null) === nextError) { return existing; } return { ...existing, - [targetThreadId]: error, + [targetThreadId]: nextError, }; }); }, @@ -3173,14 +3175,14 @@ export default function ChatView({ threadId }: ChatViewProps) { createdAt: new Date().toISOString(), }) .catch((err: unknown) => { - setStoreThreadError( + setThreadError( activeThreadId, err instanceof Error ? err.message : "Failed to submit approval decision.", ); }); setRespondingRequestIds((existing) => existing.filter((id) => id !== requestId)); }, - [activeThreadId, setStoreThreadError], + [activeThreadId, setThreadError], ); const onRespondToUserInput = useCallback( @@ -3201,14 +3203,14 @@ export default function ChatView({ threadId }: ChatViewProps) { createdAt: new Date().toISOString(), }) .catch((err: unknown) => { - setStoreThreadError( + setThreadError( activeThreadId, err instanceof Error ? err.message : "Failed to submit user input.", ); }); setRespondingUserInputRequestIds((existing) => existing.filter((id) => id !== requestId)); }, - [activeThreadId, setStoreThreadError], + [activeThreadId, setThreadError], ); const setActivePendingUserInputQuestionIndex = useCallback( diff --git a/apps/web/src/components/WebSocketConnectionSurface.logic.test.ts b/apps/web/src/components/WebSocketConnectionSurface.logic.test.ts new file mode 100644 index 0000000000..2e371b7d76 --- /dev/null +++ b/apps/web/src/components/WebSocketConnectionSurface.logic.test.ts @@ -0,0 +1,83 @@ +import { describe, expect, it } from "vitest"; + +import type { WsConnectionStatus } from "../rpc/wsConnectionState"; +import { shouldAutoReconnect } from "./WebSocketConnectionSurface"; + +function makeStatus(overrides: Partial = {}): WsConnectionStatus { + return { + attemptCount: 0, + closeCode: null, + closeReason: null, + connectedAt: null, + disconnectedAt: null, + hasConnected: false, + lastError: null, + lastErrorAt: null, + nextRetryAt: null, + online: true, + phase: "idle", + reconnectAttemptCount: 0, + reconnectMaxAttempts: 8, + reconnectPhase: "idle", + socketUrl: null, + ...overrides, + }; +} + +describe("WebSocketConnectionSurface.logic", () => { + it("forces reconnect on online when the app was offline", () => { + expect( + shouldAutoReconnect( + makeStatus({ + disconnectedAt: "2026-04-03T20:00:00.000Z", + online: false, + phase: "disconnected", + }), + "online", + ), + ).toBe(true); + }); + + it("forces reconnect on focus only for previously connected disconnected states", () => { + expect( + shouldAutoReconnect( + makeStatus({ + hasConnected: true, + online: true, + phase: "disconnected", + reconnectAttemptCount: 3, + reconnectPhase: "waiting", + }), + "focus", + ), + ).toBe(true); + + expect( + shouldAutoReconnect( + makeStatus({ + hasConnected: false, + online: true, + phase: "disconnected", + reconnectAttemptCount: 1, + reconnectPhase: "waiting", + }), + "focus", + ), + ).toBe(false); + }); + + it("forces reconnect on focus for exhausted reconnect loops", () => { + expect( + shouldAutoReconnect( + makeStatus({ + hasConnected: true, + online: true, + phase: "disconnected", + reconnectAttemptCount: 8, + reconnectPhase: "exhausted", + }), + "focus", + ), + ).toBe(true); + }); +}); diff --git a/apps/web/src/components/WebSocketConnectionSurface.tsx b/apps/web/src/components/WebSocketConnectionSurface.tsx new file mode 100644 index 0000000000..1855046a62 --- /dev/null +++ b/apps/web/src/components/WebSocketConnectionSurface.tsx @@ -0,0 +1,544 @@ +import { AlertTriangle, CloudOff, LoaderCircle, RotateCw } from "lucide-react"; +import { type ReactNode, useEffect, useEffectEvent, useRef, useState } from "react"; + +import { APP_DISPLAY_NAME } from "../branding"; +import { type SlowRpcAckRequest, useSlowRpcAckRequests } from "../rpc/requestLatencyState"; +import { useServerConfig } from "../rpc/serverState"; +import { + exhaustWsReconnectIfStillWaiting, + getWsConnectionStatus, + getWsConnectionUiState, + setBrowserOnlineStatus, + type WsConnectionStatus, + type WsConnectionUiState, + useWsConnectionStatus, + WS_RECONNECT_MAX_ATTEMPTS, +} from "../rpc/wsConnectionState"; +import { Button } from "./ui/button"; +import { toastManager } from "./ui/toast"; +import { getWsRpcClient } from "~/wsRpcClient"; + +const FORCED_WS_RECONNECT_DEBOUNCE_MS = 5_000; +type WsAutoReconnectTrigger = "focus" | "online"; + +const connectionTimeFormatter = new Intl.DateTimeFormat(undefined, { + day: "numeric", + hour: "numeric", + minute: "2-digit", + month: "short", + second: "2-digit", +}); + +function formatConnectionMoment(isoDate: string | null): string | null { + if (!isoDate) { + return null; + } + + return connectionTimeFormatter.format(new Date(isoDate)); +} + +function formatRetryCountdown(nextRetryAt: string, nowMs: number): string { + const remainingMs = Math.max(0, new Date(nextRetryAt).getTime() - nowMs); + return `${Math.max(1, Math.ceil(remainingMs / 1000))}s`; +} + +function describeOfflineToast(): string { + return "WebSocket disconnected. Waiting for network."; +} + +function formatReconnectAttemptLabel(status: WsConnectionStatus): string { + const reconnectAttempt = Math.max( + 1, + Math.min(status.reconnectAttemptCount, WS_RECONNECT_MAX_ATTEMPTS), + ); + return `Attempt ${reconnectAttempt}/${status.reconnectMaxAttempts}`; +} + +function describeExhaustedToast(): string { + return "Retries exhausted trying to reconnect"; +} + +function buildReconnectTitle(status: WsConnectionStatus): string { + if (status.nextRetryAt === null) { + return "Disconnected from T3 Server"; + } + + return "Disconnected from T3 Server"; +} + +function describeRecoveredToast( + previousDisconnectedAt: string | null, + connectedAt: string | null, +): string { + const reconnectedAtLabel = formatConnectionMoment(connectedAt); + const disconnectedAtLabel = formatConnectionMoment(previousDisconnectedAt); + + if (disconnectedAtLabel && reconnectedAtLabel) { + return `Disconnected at ${disconnectedAtLabel} and reconnected at ${reconnectedAtLabel}.`; + } + + if (reconnectedAtLabel) { + return `Connection restored at ${reconnectedAtLabel}.`; + } + + return "Connection restored."; +} + +function describeSlowRpcAckToast(requests: ReadonlyArray): ReactNode { + const count = requests.length; + const thresholdSeconds = Math.round((requests[0]?.thresholdMs ?? 0) / 1000); + + return `${count} request${count === 1 ? "" : "s"} waiting longer than ${thresholdSeconds}s.`; +} + +export function shouldAutoReconnect( + status: WsConnectionStatus, + trigger: WsAutoReconnectTrigger, +): boolean { + const uiState = getWsConnectionUiState(status); + + if (trigger === "online") { + return ( + uiState === "offline" || + uiState === "reconnecting" || + uiState === "error" || + status.reconnectPhase === "exhausted" + ); + } + + return ( + status.online && + status.hasConnected && + (uiState === "reconnecting" || status.reconnectPhase === "exhausted") + ); +} + +function buildBlockingCopy( + uiState: WsConnectionUiState, + status: WsConnectionStatus, +): { + readonly description: string; + readonly eyebrow: string; + readonly title: string; +} { + if (uiState === "connecting") { + return { + description: `Opening the WebSocket connection to the ${APP_DISPLAY_NAME} server and waiting for the initial config snapshot.`, + eyebrow: "Starting Session", + title: `Connecting to ${APP_DISPLAY_NAME}`, + }; + } + + if (uiState === "offline") { + return { + description: + "Your browser is offline, so the web client cannot reach the T3 server. Reconnect to the network and the app will retry automatically.", + eyebrow: "Offline", + title: "WebSocket connection unavailable", + }; + } + + if (status.lastError?.trim()) { + return { + description: `${status.lastError} Verify that the T3 server is running and reachable, then reload the app if needed.`, + eyebrow: "Connection Error", + title: "Cannot reach the T3 server", + }; + } + + return { + description: + "The web client could not complete its initial WebSocket connection to the T3 server. It will keep retrying in the background.", + eyebrow: "Connection Error", + title: "Cannot reach the T3 server", + }; +} + +function buildConnectionDetails(status: WsConnectionStatus, uiState: WsConnectionUiState): string { + const details = [ + `state: ${uiState}`, + `online: ${status.online ? "yes" : "no"}`, + `attempts: ${status.attemptCount}`, + ]; + + if (status.socketUrl) { + details.push(`socket: ${status.socketUrl}`); + } + if (status.connectedAt) { + details.push(`connectedAt: ${status.connectedAt}`); + } + if (status.disconnectedAt) { + details.push(`disconnectedAt: ${status.disconnectedAt}`); + } + if (status.lastErrorAt) { + details.push(`lastErrorAt: ${status.lastErrorAt}`); + } + if (status.lastError) { + details.push(`lastError: ${status.lastError}`); + } + if (status.closeCode !== null) { + details.push(`closeCode: ${status.closeCode}`); + } + if (status.closeReason) { + details.push(`closeReason: ${status.closeReason}`); + } + + return details.join("\n"); +} + +function WebSocketBlockingState({ + status, + uiState, +}: { + readonly status: WsConnectionStatus; + readonly uiState: WsConnectionUiState; +}) { + const copy = buildBlockingCopy(uiState, status); + const disconnectedAt = formatConnectionMoment(status.disconnectedAt ?? status.lastErrorAt); + const Icon = + uiState === "connecting" ? LoaderCircle : uiState === "offline" ? CloudOff : AlertTriangle; + + return ( +
+
+
+
+
+ +
+
+
+

+ {copy.eyebrow} +

+

{copy.title}

+
+
+ +
+
+ +

{copy.description}

+ +
+
+

+ Connection +

+

+ {uiState === "connecting" + ? "Opening WebSocket" + : uiState === "offline" + ? "Waiting for network" + : "Retrying server connection"} +

+
+
+

+ Latest Event +

+

{disconnectedAt ?? "Pending"}

+
+
+ +
+ +
+ +
+ + Show connection details + Hide connection details + +
+            {buildConnectionDetails(status, uiState)}
+          
+
+
+
+ ); +} + +export function WebSocketConnectionCoordinator() { + const status = useWsConnectionStatus(); + const [nowMs, setNowMs] = useState(() => Date.now()); + const lastForcedReconnectAtRef = useRef(0); + const toastIdRef = useRef | null>(null); + const toastResetTimerRef = useRef(null); + const previousUiStateRef = useRef(getWsConnectionUiState(status)); + const previousDisconnectedAtRef = useRef(status.disconnectedAt); + + const runReconnect = useEffectEvent((showFailureToast: boolean) => { + if (toastResetTimerRef.current !== null) { + window.clearTimeout(toastResetTimerRef.current); + toastResetTimerRef.current = null; + } + lastForcedReconnectAtRef.current = Date.now(); + void getWsRpcClient() + .reconnect() + .catch((error) => { + if (!showFailureToast) { + console.warn("Automatic WebSocket reconnect failed", { error }); + return; + } + toastManager.add({ + type: "error", + title: "Reconnect failed", + description: error instanceof Error ? error.message : "Unable to restart the WebSocket.", + data: { + dismissAfterVisibleMs: 8_000, + hideCopyButton: true, + }, + }); + }); + }); + const syncBrowserOnlineStatus = useEffectEvent(() => { + setBrowserOnlineStatus(navigator.onLine !== false); + }); + const triggerManualReconnect = useEffectEvent(() => { + runReconnect(true); + }); + const triggerAutoReconnect = useEffectEvent((trigger: WsAutoReconnectTrigger) => { + const currentStatus = + trigger === "online" ? setBrowserOnlineStatus(true) : getWsConnectionStatus(); + + if (!shouldAutoReconnect(currentStatus, trigger)) { + return; + } + if (Date.now() - lastForcedReconnectAtRef.current < FORCED_WS_RECONNECT_DEBOUNCE_MS) { + return; + } + + runReconnect(false); + }); + + useEffect(() => { + const handleOnline = () => { + triggerAutoReconnect("online"); + }; + const handleFocus = () => { + triggerAutoReconnect("focus"); + }; + + syncBrowserOnlineStatus(); + window.addEventListener("online", handleOnline); + window.addEventListener("offline", syncBrowserOnlineStatus); + window.addEventListener("focus", handleFocus); + return () => { + window.removeEventListener("online", handleOnline); + window.removeEventListener("offline", syncBrowserOnlineStatus); + window.removeEventListener("focus", handleFocus); + }; + }, []); + + useEffect(() => { + if (status.reconnectPhase !== "waiting" || status.nextRetryAt === null) { + return; + } + + setNowMs(Date.now()); + const intervalId = window.setInterval(() => { + setNowMs(Date.now()); + }, 1_000); + + return () => { + window.clearInterval(intervalId); + }; + }, [status.nextRetryAt, status.reconnectPhase]); + + useEffect(() => { + if ( + status.reconnectPhase !== "waiting" || + status.nextRetryAt === null || + !status.online || + !status.hasConnected + ) { + return; + } + + const nextRetryAt = status.nextRetryAt; + const timeoutMs = Math.max(0, new Date(nextRetryAt).getTime() - Date.now()) + 1_500; + const timeoutId = window.setTimeout(() => { + exhaustWsReconnectIfStillWaiting(nextRetryAt); + }, timeoutMs); + + return () => { + window.clearTimeout(timeoutId); + }; + }, [ + status.hasConnected, + status.nextRetryAt, + status.online, + status.reconnectAttemptCount, + status.reconnectPhase, + ]); + + useEffect(() => { + const uiState = getWsConnectionUiState(status); + const previousUiState = previousUiStateRef.current; + const previousDisconnectedAt = previousDisconnectedAtRef.current; + const shouldShowReconnectToast = status.hasConnected && uiState === "reconnecting"; + const shouldShowOfflineToast = uiState === "offline" && status.disconnectedAt !== null; + const shouldShowExhaustedToast = status.hasConnected && status.reconnectPhase === "exhausted"; + + if ( + toastResetTimerRef.current !== null && + (shouldShowReconnectToast || shouldShowOfflineToast || shouldShowExhaustedToast) + ) { + window.clearTimeout(toastResetTimerRef.current); + toastResetTimerRef.current = null; + } + + if (shouldShowReconnectToast || shouldShowOfflineToast || shouldShowExhaustedToast) { + const toastPayload = shouldShowOfflineToast + ? { + description: describeOfflineToast(), + timeout: 0, + title: "Offline", + type: "warning" as const, + data: { + hideCopyButton: true, + }, + } + : shouldShowExhaustedToast + ? { + actionProps: { + children: "Retry", + onClick: triggerManualReconnect, + }, + description: describeExhaustedToast(), + timeout: 0, + title: "Disconnected from T3 Server", + type: "error" as const, + data: { + hideCopyButton: true, + }, + } + : { + actionProps: { + children: "Retry now", + onClick: triggerManualReconnect, + }, + description: + status.nextRetryAt === null + ? `Reconnecting... ${formatReconnectAttemptLabel(status)}` + : `Reconnecting in ${formatRetryCountdown(status.nextRetryAt, nowMs)}... ${formatReconnectAttemptLabel(status)}`, + timeout: 0, + title: buildReconnectTitle(status), + type: "loading" as const, + data: { + hideCopyButton: true, + }, + }; + + if (toastIdRef.current) { + toastManager.update(toastIdRef.current, toastPayload); + } else { + toastIdRef.current = toastManager.add(toastPayload); + } + } else if (toastIdRef.current) { + toastManager.close(toastIdRef.current); + toastIdRef.current = null; + } + + if ( + uiState === "connected" && + (previousUiState === "offline" || previousUiState === "reconnecting") && + previousDisconnectedAt !== null + ) { + const successToast = { + description: describeRecoveredToast(previousDisconnectedAt, status.connectedAt), + title: "Reconnected to T3 Server", + type: "success" as const, + timeout: 0, + data: { + dismissAfterVisibleMs: 8_000, + hideCopyButton: true, + }, + }; + + if (toastIdRef.current) { + toastManager.update(toastIdRef.current, successToast); + } else { + toastIdRef.current = toastManager.add(successToast); + } + + toastResetTimerRef.current = window.setTimeout(() => { + toastIdRef.current = null; + toastResetTimerRef.current = null; + }, 8_250); + } + + previousUiStateRef.current = uiState; + previousDisconnectedAtRef.current = status.disconnectedAt; + }, [nowMs, status]); + + useEffect(() => { + return () => { + if (toastResetTimerRef.current !== null) { + window.clearTimeout(toastResetTimerRef.current); + } + }; + }, []); + + return null; +} + +export function SlowRpcAckToastCoordinator() { + const slowRequests = useSlowRpcAckRequests(); + const status = useWsConnectionStatus(); + const toastIdRef = useRef | null>(null); + + useEffect(() => { + if (getWsConnectionUiState(status) !== "connected") { + if (toastIdRef.current) { + toastManager.close(toastIdRef.current); + toastIdRef.current = null; + } + return; + } + + if (slowRequests.length === 0) { + if (toastIdRef.current) { + toastManager.close(toastIdRef.current); + toastIdRef.current = null; + } + return; + } + + const nextToast = { + description: describeSlowRpcAckToast(slowRequests), + timeout: 0, + title: "Some requests are slow", + type: "warning" as const, + }; + + if (toastIdRef.current) { + toastManager.update(toastIdRef.current, nextToast); + } else { + toastIdRef.current = toastManager.add(nextToast); + } + }, [slowRequests, status]); + + return null; +} + +export function WebSocketConnectionSurface({ children }: { readonly children: ReactNode }) { + const serverConfig = useServerConfig(); + const status = useWsConnectionStatus(); + + if (serverConfig === null) { + const uiState = getWsConnectionUiState(status); + return ( + + ); + } + + return children; +} diff --git a/apps/web/src/components/ui/toast.tsx b/apps/web/src/components/ui/toast.tsx index 387967d34a..b90e07ecc9 100644 --- a/apps/web/src/components/ui/toast.tsx +++ b/apps/web/src/components/ui/toast.tsx @@ -23,6 +23,7 @@ export type ThreadToastData = { threadId?: ThreadId | null; tooltipStyle?: boolean; dismissAfterVisibleMs?: number; + hideCopyButton?: boolean; }; const toastManager = Toast.createToastManager(); @@ -309,15 +310,15 @@ function Toasts({ position = "top-right" }: { position: ToastPosition }) {
- {toast.type === "error" && typeof toast.description === "string" && ( - - )} + {toast.type === "error" && + typeof toast.description === "string" && + !toast.data?.hideCopyButton && }
@@ -403,15 +404,17 @@ function AnchoredToasts() {
- {toast.type === "error" && typeof toast.description === "string" && ( - - )} + {toast.type === "error" && + typeof toast.description === "string" && + !toast.data?.hideCopyButton && ( + + )}
diff --git a/apps/web/src/orchestrationRecovery.ts b/apps/web/src/orchestrationRecovery.ts index 7f8fbb61c9..853535f3bb 100644 --- a/apps/web/src/orchestrationRecovery.ts +++ b/apps/web/src/orchestrationRecovery.ts @@ -1,4 +1,8 @@ -export type OrchestrationRecoveryReason = "bootstrap" | "sequence-gap" | "replay-failed"; +export type OrchestrationRecoveryReason = + | "bootstrap" + | "sequence-gap" + | "resubscribe" + | "replay-failed"; export interface OrchestrationRecoveryPhase { kind: "snapshot" | "replay"; diff --git a/apps/web/src/routes/__root.tsx b/apps/web/src/routes/__root.tsx index 2b36cc7e49..11f5a0d14e 100644 --- a/apps/web/src/routes/__root.tsx +++ b/apps/web/src/routes/__root.tsx @@ -16,6 +16,11 @@ import { Throttler } from "@tanstack/react-pacer"; import { APP_DISPLAY_NAME } from "../branding"; import { AppSidebarLayout } from "../components/AppSidebarLayout"; +import { + SlowRpcAckToastCoordinator, + WebSocketConnectionCoordinator, + WebSocketConnectionSurface, +} from "../components/WebSocketConnectionSurface"; import { Button } from "../components/ui/button"; import { AnchoredToastProvider, ToastProvider, toastManager } from "../components/ui/toast"; import { resolveAndPersistPreferredEditor } from "../editorPreferences"; @@ -73,9 +78,13 @@ function RootRouteView() { - - - + + + + + + + ); @@ -428,8 +437,8 @@ function EventRouter() { queueMicrotask(flushPendingDomainEvents); }; - const recoverFromSequenceGap = async (): Promise => { - if (!recovery.beginReplayRecovery("sequence-gap")) { + const runReplayRecovery = async (reason: "sequence-gap" | "resubscribe"): Promise => { + if (!recovery.beginReplayRecovery(reason)) { return; } @@ -466,7 +475,7 @@ function EventRouter() { return; } } - void recoverFromSequenceGap(); + void runReplayRecovery(reason); } else if (replayCompletion.shouldReplay && import.meta.env.MODE !== "test") { console.warn( "[orchestration-recovery]", @@ -505,7 +514,7 @@ function EventRouter() { syncServerReadModel(snapshot); reconcileSnapshotDerivedState(); if (recovery.completeSnapshotRecovery(snapshot.snapshotSequence)) { - void recoverFromSequenceGap(); + void runReplayRecovery("sequence-gap"); } } } catch { @@ -522,18 +531,29 @@ function EventRouter() { const fallbackToSnapshotRecovery = async (): Promise => { await runSnapshotRecovery("replay-failed"); }; - const unsubDomainEvent = api.orchestration.onDomainEvent((event) => { - const action = recovery.classifyDomainEvent(event.sequence); - if (action === "apply") { - pendingDomainEvents.push(event); - schedulePendingDomainEventFlush(); - return; - } - if (action === "recover") { - flushPendingDomainEvents(); - void recoverFromSequenceGap(); - } - }); + const unsubDomainEvent = api.orchestration.onDomainEvent( + (event) => { + const action = recovery.classifyDomainEvent(event.sequence); + if (action === "apply") { + pendingDomainEvents.push(event); + schedulePendingDomainEventFlush(); + return; + } + if (action === "recover") { + flushPendingDomainEvents(); + void runReplayRecovery("sequence-gap"); + } + }, + { + onResubscribe: () => { + if (disposed) { + return; + } + flushPendingDomainEvents(); + void runReplayRecovery("resubscribe"); + }, + }, + ); const unsubTerminalEvent = api.terminal.onEvent((event) => { const thread = useStore.getState().threads.find((entry) => entry.id === event.threadId); if (!thread || thread.archivedAt !== null) { diff --git a/apps/web/src/rpc/protocol.ts b/apps/web/src/rpc/protocol.ts index 38012e507d..fc5ab9097f 100644 --- a/apps/web/src/rpc/protocol.ts +++ b/apps/web/src/rpc/protocol.ts @@ -1,9 +1,22 @@ import { WsRpcGroup } from "@t3tools/contracts"; -import { Effect, Layer } from "effect"; +import { Duration, Effect, Layer, Schedule } from "effect"; import { RpcClient, RpcSerialization } from "effect/unstable/rpc"; import * as Socket from "effect/unstable/socket/Socket"; import { resolveServerUrl } from "../lib/utils"; +import { + acknowledgeRpcRequest, + clearAllTrackedRpcRequests, + trackRpcRequestSent, +} from "./requestLatencyState"; +import { + getWsReconnectDelayMsForRetry, + recordWsConnectionAttempt, + recordWsConnectionClosed, + recordWsConnectionErrored, + recordWsConnectionOpened, + WS_RECONNECT_MAX_RETRIES, +} from "./wsConnectionState"; export const makeWsRpcProtocolClient = RpcClient.make(WsRpcGroup); @@ -17,11 +30,75 @@ export function createWsRpcProtocolLayer(url?: string) { protocol: window.location.protocol === "https:" ? "wss" : "ws", pathname: "/ws", }); + const trackingWebSocketConstructorLayer = Layer.succeed( + Socket.WebSocketConstructor, + (socketUrl, protocols) => { + recordWsConnectionAttempt(socketUrl); + const socket = new globalThis.WebSocket(socketUrl, protocols); + + socket.addEventListener( + "open", + () => { + recordWsConnectionOpened(); + }, + { once: true }, + ); + socket.addEventListener( + "error", + () => { + clearAllTrackedRpcRequests(); + recordWsConnectionErrored("Unable to connect to the T3 server WebSocket."); + }, + { once: true }, + ); + socket.addEventListener( + "close", + (event) => { + clearAllTrackedRpcRequests(); + recordWsConnectionClosed({ + code: event.code, + reason: event.reason, + }); + }, + { once: true }, + ); + + return socket; + }, + ); const socketLayer = Socket.layerWebSocket(resolvedUrl).pipe( - Layer.provide(Socket.layerWebSocketConstructorGlobal), + Layer.provide(trackingWebSocketConstructorLayer), ); - - return RpcClient.layerProtocolSocket({ retryTransientErrors: true }).pipe( - Layer.provide(Layer.mergeAll(socketLayer, RpcSerialization.layerJson)), + const retryPolicy = Schedule.addDelay(Schedule.recurs(WS_RECONNECT_MAX_RETRIES), (retryCount) => + Effect.succeed(Duration.millis(getWsReconnectDelayMsForRetry(retryCount) ?? 0)), + ); + const protocolLayer = Layer.effect( + RpcClient.Protocol, + Effect.map( + RpcClient.makeProtocolSocket({ + retryPolicy, + retryTransientErrors: true, + }), + (protocol) => ({ + ...protocol, + run: (writeResponse) => + protocol.run((response) => { + if (response._tag === "Chunk" || response._tag === "Exit") { + acknowledgeRpcRequest(response.requestId); + } else if (response._tag === "ClientProtocolError" || response._tag === "Defect") { + clearAllTrackedRpcRequests(); + } + return writeResponse(response); + }), + send: (request, transferables) => { + if (request._tag === "Request") { + trackRpcRequestSent(request.id, request.tag); + } + return protocol.send(request, transferables); + }, + }), + ), ); + + return protocolLayer.pipe(Layer.provide(Layer.mergeAll(socketLayer, RpcSerialization.layerJson))); } diff --git a/apps/web/src/rpc/requestLatencyState.test.ts b/apps/web/src/rpc/requestLatencyState.test.ts new file mode 100644 index 0000000000..ac83c3b265 --- /dev/null +++ b/apps/web/src/rpc/requestLatencyState.test.ts @@ -0,0 +1,65 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { + acknowledgeRpcRequest, + getSlowRpcAckRequests, + resetRequestLatencyStateForTests, + trackRpcRequestSent, + SLOW_RPC_ACK_THRESHOLD_MS, + MAX_TRACKED_RPC_ACK_REQUESTS, +} from "./requestLatencyState"; + +describe("requestLatencyState", () => { + beforeEach(() => { + vi.useFakeTimers(); + resetRequestLatencyStateForTests(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("marks unary requests as slow when the ack threshold is exceeded", () => { + trackRpcRequestSent("1", "server.getConfig"); + vi.advanceTimersByTime(SLOW_RPC_ACK_THRESHOLD_MS - 1); + expect(getSlowRpcAckRequests()).toEqual([]); + + vi.advanceTimersByTime(1); + expect(getSlowRpcAckRequests()).toMatchObject([ + { + requestId: "1", + tag: "server.getConfig", + thresholdMs: SLOW_RPC_ACK_THRESHOLD_MS, + }, + ]); + }); + + it("clears the slow request once the server acknowledges it", () => { + trackRpcRequestSent("1", "git.status"); + vi.advanceTimersByTime(SLOW_RPC_ACK_THRESHOLD_MS); + expect(getSlowRpcAckRequests()).toHaveLength(1); + + acknowledgeRpcRequest("1"); + expect(getSlowRpcAckRequests()).toEqual([]); + }); + + it("ignores long-lived subscribe requests", () => { + trackRpcRequestSent("1", "subscribeServerConfig"); + vi.advanceTimersByTime(SLOW_RPC_ACK_THRESHOLD_MS * 2); + + expect(getSlowRpcAckRequests()).toEqual([]); + }); + + it("evicts the oldest pending requests once the tracker reaches capacity", () => { + for (let index = 0; index < MAX_TRACKED_RPC_ACK_REQUESTS + 1; index += 1) { + trackRpcRequestSent(String(index), "server.getConfig"); + } + + vi.advanceTimersByTime(SLOW_RPC_ACK_THRESHOLD_MS); + + const slowRequests = getSlowRpcAckRequests(); + expect(slowRequests).toHaveLength(MAX_TRACKED_RPC_ACK_REQUESTS); + expect(slowRequests[0]?.requestId).toBe("1"); + expect(slowRequests.at(-1)?.requestId).toBe(String(MAX_TRACKED_RPC_ACK_REQUESTS)); + }); +}); diff --git a/apps/web/src/rpc/requestLatencyState.ts b/apps/web/src/rpc/requestLatencyState.ts new file mode 100644 index 0000000000..01c59ea0d5 --- /dev/null +++ b/apps/web/src/rpc/requestLatencyState.ts @@ -0,0 +1,127 @@ +import { useAtomValue } from "@effect/atom-react"; +import { Atom } from "effect/unstable/reactivity"; + +import { appAtomRegistry } from "./atomRegistry"; + +export const SLOW_RPC_ACK_THRESHOLD_MS = 2_500; +export const MAX_TRACKED_RPC_ACK_REQUESTS = 256; + +export interface SlowRpcAckRequest { + readonly requestId: string; + readonly startedAt: string; + readonly startedAtMs: number; + readonly tag: string; + readonly thresholdMs: number; +} + +interface PendingRpcAckRequest { + readonly request: SlowRpcAckRequest; + readonly timeoutId: ReturnType; +} + +const pendingRpcAckRequests = new Map(); + +const slowRpcAckRequestsAtom = Atom.make>([]).pipe( + Atom.keepAlive, + Atom.withLabel("slow-rpc-ack-requests"), +); + +function setSlowRpcAckRequests(requests: ReadonlyArray) { + appAtomRegistry.set(slowRpcAckRequestsAtom, [...requests]); +} + +function getSlowRpcAckRequestsValue(): ReadonlyArray { + return appAtomRegistry.get(slowRpcAckRequestsAtom); +} + +function shouldTrackRpcAck(tag: string): boolean { + return !tag.startsWith("subscribe"); +} + +export function getSlowRpcAckRequests(): ReadonlyArray { + return getSlowRpcAckRequestsValue(); +} + +export function trackRpcRequestSent(requestId: string, tag: string): void { + if (!shouldTrackRpcAck(tag)) { + return; + } + + clearTrackedRpcRequest(requestId); + evictOldestPendingRpcRequestIfNeeded(); + + const startedAtMs = Date.now(); + const request: SlowRpcAckRequest = { + requestId, + startedAt: new Date(startedAtMs).toISOString(), + startedAtMs, + tag, + thresholdMs: SLOW_RPC_ACK_THRESHOLD_MS, + }; + const timeoutId = setTimeout(() => { + pendingRpcAckRequests.delete(requestId); + appendSlowRpcAckRequest(request); + }, SLOW_RPC_ACK_THRESHOLD_MS); + + pendingRpcAckRequests.set(requestId, { + request, + timeoutId, + }); +} + +export function acknowledgeRpcRequest(requestId: string): void { + clearTrackedRpcRequest(requestId); + const slowRequests = getSlowRpcAckRequestsValue(); + if (!slowRequests.some((request) => request.requestId === requestId)) { + return; + } + + setSlowRpcAckRequests(slowRequests.filter((request) => request.requestId !== requestId)); +} + +export function clearAllTrackedRpcRequests(): void { + for (const pending of pendingRpcAckRequests.values()) { + clearTimeout(pending.timeoutId); + } + pendingRpcAckRequests.clear(); + setSlowRpcAckRequests([]); +} + +function clearTrackedRpcRequest(requestId: string): void { + const pending = pendingRpcAckRequests.get(requestId); + if (!pending) { + return; + } + + clearTimeout(pending.timeoutId); + pendingRpcAckRequests.delete(requestId); +} + +function appendSlowRpcAckRequest(request: SlowRpcAckRequest): void { + const requests = [...getSlowRpcAckRequestsValue(), request]; + if (requests.length <= MAX_TRACKED_RPC_ACK_REQUESTS) { + setSlowRpcAckRequests(requests); + return; + } + + setSlowRpcAckRequests(requests.slice(-MAX_TRACKED_RPC_ACK_REQUESTS)); +} + +function evictOldestPendingRpcRequestIfNeeded(): void { + while (pendingRpcAckRequests.size >= MAX_TRACKED_RPC_ACK_REQUESTS) { + const oldestRequestId = pendingRpcAckRequests.keys().next().value; + if (oldestRequestId === undefined) { + return; + } + + clearTrackedRpcRequest(oldestRequestId); + } +} + +export function resetRequestLatencyStateForTests(): void { + clearAllTrackedRpcRequests(); +} + +export function useSlowRpcAckRequests(): ReadonlyArray { + return useAtomValue(slowRpcAckRequestsAtom); +} diff --git a/apps/web/src/rpc/transportError.test.ts b/apps/web/src/rpc/transportError.test.ts new file mode 100644 index 0000000000..f9da05abbc --- /dev/null +++ b/apps/web/src/rpc/transportError.test.ts @@ -0,0 +1,24 @@ +import { describe, expect, it } from "vitest"; + +import { isTransportConnectionErrorMessage, sanitizeThreadErrorMessage } from "./transportError"; + +describe("transportError", () => { + it("detects websocket transport failures", () => { + expect(isTransportConnectionErrorMessage("SocketCloseError: 1006")).toBe(true); + expect(isTransportConnectionErrorMessage("Unable to connect to the T3 server WebSocket.")).toBe( + true, + ); + expect(isTransportConnectionErrorMessage("SocketOpenError: Timeout")).toBe(true); + }); + + it("preserves non-transport thread errors", () => { + expect(sanitizeThreadErrorMessage("Turn failed")).toBe("Turn failed"); + expect(sanitizeThreadErrorMessage("Select a base branch before sending.")).toBe( + "Select a base branch before sending.", + ); + }); + + it("drops transport failures from thread surfaces", () => { + expect(sanitizeThreadErrorMessage("SocketCloseError: 1006")).toBeNull(); + }); +}); diff --git a/apps/web/src/rpc/transportError.ts b/apps/web/src/rpc/transportError.ts new file mode 100644 index 0000000000..edc90a5a3c --- /dev/null +++ b/apps/web/src/rpc/transportError.ts @@ -0,0 +1,23 @@ +const TRANSPORT_ERROR_PATTERNS = [ + /\bSocketCloseError\b/i, + /\bSocketOpenError\b/i, + /Unable to connect to the T3 server WebSocket\./i, + /\bping timeout\b/i, +] as const; + +export function isTransportConnectionErrorMessage(message: string | null | undefined): boolean { + if (typeof message !== "string") { + return false; + } + + const normalizedMessage = message.trim(); + if (normalizedMessage.length === 0) { + return false; + } + + return TRANSPORT_ERROR_PATTERNS.some((pattern) => pattern.test(normalizedMessage)); +} + +export function sanitizeThreadErrorMessage(message: string | null | undefined): string | null { + return isTransportConnectionErrorMessage(message) ? null : (message ?? null); +} diff --git a/apps/web/src/rpc/wsConnectionState.test.ts b/apps/web/src/rpc/wsConnectionState.test.ts new file mode 100644 index 0000000000..f4e8689a58 --- /dev/null +++ b/apps/web/src/rpc/wsConnectionState.test.ts @@ -0,0 +1,112 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { + exhaustWsReconnectIfStillWaiting, + getWsConnectionStatus, + getWsReconnectDelayMsForRetry, + getWsConnectionUiState, + recordWsConnectionAttempt, + recordWsConnectionClosed, + recordWsConnectionErrored, + recordWsConnectionOpened, + resetWsConnectionStateForTests, + setBrowserOnlineStatus, + WS_RECONNECT_MAX_ATTEMPTS, +} from "./wsConnectionState"; + +describe("wsConnectionState", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-04-03T20:30:00.000Z")); + resetWsConnectionStateForTests(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("treats a disconnected browser as offline once the websocket drops", () => { + recordWsConnectionAttempt("ws://localhost:3020/ws"); + recordWsConnectionOpened(); + recordWsConnectionClosed({ code: 1006, reason: "offline" }); + setBrowserOnlineStatus(false); + + expect(getWsConnectionUiState(getWsConnectionStatus())).toBe("offline"); + }); + + it("stays in the initial connecting state until the first disconnect", () => { + recordWsConnectionAttempt("ws://localhost:3020/ws"); + + expect(getWsConnectionStatus()).toMatchObject({ + attemptCount: 1, + hasConnected: false, + phase: "connecting", + }); + expect(getWsConnectionUiState(getWsConnectionStatus())).toBe("connecting"); + }); + + it("schedules the next retry after a failed websocket attempt", () => { + recordWsConnectionAttempt("ws://localhost:3020/ws"); + recordWsConnectionErrored("Unable to connect to the T3 server WebSocket."); + + const firstRetryDelayMs = getWsReconnectDelayMsForRetry(0); + if (firstRetryDelayMs === null) { + throw new Error("Expected an initial retry delay."); + } + + expect(getWsConnectionStatus()).toMatchObject({ + nextRetryAt: new Date(Date.now() + firstRetryDelayMs).toISOString(), + reconnectAttemptCount: 1, + reconnectPhase: "waiting", + }); + }); + + it("marks the reconnect cycle as exhausted after the final attempt fails", () => { + for (let attempt = 0; attempt < WS_RECONNECT_MAX_ATTEMPTS; attempt += 1) { + recordWsConnectionAttempt("ws://localhost:3020/ws"); + recordWsConnectionErrored("Unable to connect to the T3 server WebSocket."); + } + + expect(getWsConnectionStatus()).toMatchObject({ + nextRetryAt: null, + reconnectAttemptCount: WS_RECONNECT_MAX_ATTEMPTS, + reconnectPhase: "exhausted", + }); + }); + + it("can exhaust a stalled final retry window when no new attempt starts", () => { + recordWsConnectionAttempt("ws://localhost:3020/ws"); + recordWsConnectionOpened(); + + for (let attempt = 0; attempt < WS_RECONNECT_MAX_ATTEMPTS - 1; attempt += 1) { + recordWsConnectionAttempt("ws://localhost:3020/ws"); + recordWsConnectionErrored("Unable to connect to the T3 server WebSocket."); + } + + const finalRetryDelayMs = getWsReconnectDelayMsForRetry(WS_RECONNECT_MAX_ATTEMPTS - 2); + if (finalRetryDelayMs === null) { + throw new Error("Expected a final retry delay."); + } + + const statusBeforeExhaust = getWsConnectionStatus(); + expect(statusBeforeExhaust).toMatchObject({ + nextRetryAt: new Date(Date.now() + finalRetryDelayMs).toISOString(), + reconnectAttemptCount: 7, + reconnectPhase: "waiting", + }); + + const nextRetryAt = statusBeforeExhaust.nextRetryAt; + if (!nextRetryAt) { + throw new Error("Expected a scheduled retry."); + } + + vi.setSystemTime(new Date(Date.now() + finalRetryDelayMs + 1_000)); + exhaustWsReconnectIfStillWaiting(nextRetryAt); + + expect(getWsConnectionStatus()).toMatchObject({ + nextRetryAt: null, + reconnectAttemptCount: WS_RECONNECT_MAX_ATTEMPTS, + reconnectPhase: "exhausted", + }); + }); +}); diff --git a/apps/web/src/rpc/wsConnectionState.ts b/apps/web/src/rpc/wsConnectionState.ts new file mode 100644 index 0000000000..5020a1c453 --- /dev/null +++ b/apps/web/src/rpc/wsConnectionState.ts @@ -0,0 +1,219 @@ +import { useAtomValue } from "@effect/atom-react"; +import { Atom } from "effect/unstable/reactivity"; + +import { appAtomRegistry } from "./atomRegistry"; + +export type WsConnectionUiState = "connected" | "connecting" | "error" | "offline" | "reconnecting"; +export type WsReconnectPhase = "attempting" | "exhausted" | "idle" | "waiting"; + +export const WS_RECONNECT_INITIAL_DELAY_MS = 1_000; +export const WS_RECONNECT_BACKOFF_FACTOR = 2; +export const WS_RECONNECT_MAX_DELAY_MS = 64_000; +export const WS_RECONNECT_MAX_RETRIES = 7; +export const WS_RECONNECT_MAX_ATTEMPTS = WS_RECONNECT_MAX_RETRIES + 1; + +export interface WsConnectionStatus { + readonly attemptCount: number; + readonly closeCode: number | null; + readonly closeReason: string | null; + readonly connectedAt: string | null; + readonly disconnectedAt: string | null; + readonly hasConnected: boolean; + readonly lastError: string | null; + readonly lastErrorAt: string | null; + readonly nextRetryAt: string | null; + readonly online: boolean; + readonly phase: "idle" | "connecting" | "connected" | "disconnected"; + readonly reconnectAttemptCount: number; + readonly reconnectMaxAttempts: number; + readonly reconnectPhase: WsReconnectPhase; + readonly socketUrl: string | null; +} + +const INITIAL_WS_CONNECTION_STATUS = Object.freeze({ + attemptCount: 0, + closeCode: null, + closeReason: null, + connectedAt: null, + disconnectedAt: null, + hasConnected: false, + lastError: null, + lastErrorAt: null, + nextRetryAt: null, + online: typeof navigator === "undefined" ? true : navigator.onLine !== false, + phase: "idle", + reconnectAttemptCount: 0, + reconnectMaxAttempts: WS_RECONNECT_MAX_ATTEMPTS, + reconnectPhase: "idle", + socketUrl: null, +}); + +export const wsConnectionStatusAtom = Atom.make(INITIAL_WS_CONNECTION_STATUS).pipe( + Atom.keepAlive, + Atom.withLabel("ws-connection-status"), +); + +function isoNow() { + return new Date().toISOString(); +} + +function updateWsConnectionStatus( + updater: (current: WsConnectionStatus) => WsConnectionStatus, +): WsConnectionStatus { + const nextStatus = updater(getWsConnectionStatus()); + appAtomRegistry.set(wsConnectionStatusAtom, nextStatus); + return nextStatus; +} + +export function getWsConnectionStatus(): WsConnectionStatus { + return appAtomRegistry.get(wsConnectionStatusAtom); +} + +export function getWsConnectionUiState(status: WsConnectionStatus): WsConnectionUiState { + if (status.phase === "connected") { + return "connected"; + } + + if (!status.online && (status.disconnectedAt !== null || status.phase === "disconnected")) { + return "offline"; + } + + if (!status.hasConnected) { + return status.phase === "disconnected" ? "error" : "connecting"; + } + + return "reconnecting"; +} + +export function recordWsConnectionAttempt(socketUrl: string): WsConnectionStatus { + return updateWsConnectionStatus((current) => ({ + ...current, + attemptCount: current.attemptCount + 1, + nextRetryAt: null, + phase: "connecting", + reconnectAttemptCount: current.phase === "connected" ? 1 : current.reconnectAttemptCount + 1, + reconnectPhase: "attempting", + socketUrl, + })); +} + +export function recordWsConnectionOpened(): WsConnectionStatus { + return updateWsConnectionStatus((current) => ({ + ...current, + closeCode: null, + closeReason: null, + connectedAt: isoNow(), + disconnectedAt: null, + hasConnected: true, + nextRetryAt: null, + phase: "connected", + reconnectAttemptCount: 0, + reconnectPhase: "idle", + })); +} + +export function recordWsConnectionErrored(message?: string | null): WsConnectionStatus { + return updateWsConnectionStatus((current) => + applyDisconnectState(current, { + lastError: message?.trim() ? message : current.lastError, + lastErrorAt: isoNow(), + }), + ); +} + +export function recordWsConnectionClosed(details?: { + readonly code?: number; + readonly reason?: string; +}): WsConnectionStatus { + return updateWsConnectionStatus((current) => + applyDisconnectState(current, { + closeCode: details?.code ?? current.closeCode, + closeReason: details?.reason?.trim() ? details.reason : current.closeReason, + }), + ); +} + +export function setBrowserOnlineStatus(online: boolean): WsConnectionStatus { + return updateWsConnectionStatus((current) => ({ + ...current, + online, + })); +} + +export function resetWsReconnectBackoff(): WsConnectionStatus { + return updateWsConnectionStatus((current) => ({ + ...current, + nextRetryAt: null, + reconnectAttemptCount: 0, + reconnectPhase: "idle", + })); +} + +export function exhaustWsReconnectIfStillWaiting(expectedNextRetryAt: string): WsConnectionStatus { + return updateWsConnectionStatus((current) => { + if ( + current.reconnectPhase !== "waiting" || + current.nextRetryAt !== expectedNextRetryAt || + !current.online || + !current.hasConnected + ) { + return current; + } + + return { + ...current, + nextRetryAt: null, + reconnectAttemptCount: current.reconnectMaxAttempts, + reconnectPhase: "exhausted", + }; + }); +} + +export function resetWsConnectionStateForTests(): void { + appAtomRegistry.set(wsConnectionStatusAtom, INITIAL_WS_CONNECTION_STATUS); +} + +export function useWsConnectionStatus(): WsConnectionStatus { + return useAtomValue(wsConnectionStatusAtom); +} + +export function getWsReconnectDelayMsForRetry(retryIndex: number): number | null { + if (!Number.isInteger(retryIndex) || retryIndex < 0 || retryIndex >= WS_RECONNECT_MAX_RETRIES) { + return null; + } + + return Math.min( + Math.round(WS_RECONNECT_INITIAL_DELAY_MS * WS_RECONNECT_BACKOFF_FACTOR ** retryIndex), + WS_RECONNECT_MAX_DELAY_MS, + ); +} + +function applyDisconnectState( + current: WsConnectionStatus, + updates: Partial< + Pick + >, +): WsConnectionStatus { + const disconnectedAt = current.disconnectedAt ?? isoNow(); + const nextRetryDelayMs = + current.nextRetryAt !== null || current.reconnectPhase === "exhausted" + ? null + : getWsReconnectDelayMsForRetry(Math.max(0, current.reconnectAttemptCount - 1)); + + return { + ...current, + ...updates, + disconnectedAt, + nextRetryAt: + nextRetryDelayMs === null + ? current.nextRetryAt + : new Date(Date.now() + nextRetryDelayMs).toISOString(), + phase: "disconnected", + reconnectPhase: + current.reconnectPhase === "waiting" || current.reconnectPhase === "exhausted" + ? current.reconnectPhase + : nextRetryDelayMs === null + ? "exhausted" + : "waiting", + }; +} diff --git a/apps/web/src/store.ts b/apps/web/src/store.ts index 7a227fbb76..bc5045ad1b 100644 --- a/apps/web/src/store.ts +++ b/apps/web/src/store.ts @@ -19,6 +19,7 @@ import { derivePendingApprovals, derivePendingUserInputs, } from "./session-logic"; +import { sanitizeThreadErrorMessage } from "./rpc/transportError"; import { type ChatMessage, type Project, type SidebarThreadSummary, type Thread } from "./types"; // ── State ──────────────────────────────────────────────────────────── @@ -163,7 +164,7 @@ function mapThread(thread: OrchestrationThread): Thread { session: thread.session ? mapSession(thread.session) : null, messages: thread.messages.map(mapMessage), proposedPlans: thread.proposedPlans.map(mapProposedPlan), - error: thread.session?.lastError ?? null, + error: sanitizeThreadErrorMessage(thread.session?.lastError), createdAt: thread.createdAt, archivedAt: thread.archivedAt, updatedAt: thread.updatedAt, @@ -902,7 +903,7 @@ export function applyOrchestrationEvent(state: AppState, event: OrchestrationEve return updateThreadState(state, event.payload.threadId, (thread) => ({ ...thread, session: mapSession(event.payload.session), - error: event.payload.session.lastError ?? null, + error: sanitizeThreadErrorMessage(event.payload.session.lastError), latestTurn: event.payload.session.status === "running" && event.payload.session.activeTurnId !== null ? buildLatestTurn({ diff --git a/apps/web/src/wsNativeApi.test.ts b/apps/web/src/wsNativeApi.test.ts index 0b3165345a..219d87c56f 100644 --- a/apps/web/src/wsNativeApi.test.ts +++ b/apps/web/src/wsNativeApi.test.ts @@ -247,6 +247,20 @@ describe("wsNativeApi", () => { expect(onDomainEvent).toHaveBeenCalledWith(orchestrationEvent); }); + it("forwards orchestration stream subscription options to the RPC client", async () => { + const { createWsNativeApi } = await import("./wsNativeApi"); + + const api = createWsNativeApi(); + const onDomainEvent = vi.fn(); + const onResubscribe = vi.fn(); + + api.orchestration.onDomainEvent(onDomainEvent, { onResubscribe }); + + expect(rpcClientMock.orchestration.onDomainEvent).toHaveBeenCalledWith(onDomainEvent, { + onResubscribe, + }); + }); + it("sends orchestration dispatch commands as the direct RPC payload", async () => { rpcClientMock.orchestration.dispatchCommand.mockResolvedValue({ sequence: 1 }); const { createWsNativeApi } = await import("./wsNativeApi"); diff --git a/apps/web/src/wsNativeApi.ts b/apps/web/src/wsNativeApi.ts index a2e2d7ff09..5fe9d10d6e 100644 --- a/apps/web/src/wsNativeApi.ts +++ b/apps/web/src/wsNativeApi.ts @@ -1,7 +1,9 @@ import { type ContextMenuItem, type NativeApi } from "@t3tools/contracts"; import { showContextMenuFallback } from "./contextMenuFallback"; +import { resetRequestLatencyStateForTests } from "./rpc/requestLatencyState"; import { resetServerStateForTests } from "./rpc/serverState"; +import { resetWsConnectionStateForTests } from "./rpc/wsConnectionState"; import { __resetWsRpcClientForTests, getWsRpcClient } from "./wsRpcClient"; let instance: { api: NativeApi } | null = null; @@ -9,7 +11,9 @@ let instance: { api: NativeApi } | null = null; export function __resetWsNativeApiForTests() { instance = null; __resetWsRpcClientForTests(); + resetRequestLatencyStateForTests(); resetServerStateForTests(); + resetWsConnectionStateForTests(); } export function createWsNativeApi(): NativeApi { @@ -116,7 +120,8 @@ export function createWsNativeApi(): NativeApi { rpcClient.orchestration .replayEvents({ fromSequenceExclusive }) .then((events) => [...events]), - onDomainEvent: (callback) => rpcClient.orchestration.onDomainEvent(callback), + onDomainEvent: (callback, options) => + rpcClient.orchestration.onDomainEvent(callback, options), }, }; diff --git a/apps/web/src/wsRpcClient.ts b/apps/web/src/wsRpcClient.ts index 60f51ba707..1d411aa1b9 100644 --- a/apps/web/src/wsRpcClient.ts +++ b/apps/web/src/wsRpcClient.ts @@ -10,12 +10,17 @@ import { import { Effect, Stream } from "effect"; import { type WsRpcProtocolClient } from "./rpc/protocol"; +import { resetWsReconnectBackoff } from "./rpc/wsConnectionState"; import { WsTransport } from "./wsTransport"; type RpcTag = keyof WsRpcProtocolClient & string; type RpcMethod = WsRpcProtocolClient[TTag]; type RpcInput = Parameters>[0]; +interface StreamSubscriptionOptions { + readonly onResubscribe?: () => void; +} + type RpcUnaryMethod = RpcMethod extends (input: any, options?: any) => Effect.Effect ? (input: RpcInput) => Promise @@ -28,7 +33,7 @@ type RpcUnaryNoArgMethod = type RpcStreamMethod = RpcMethod extends (input: any, options?: any) => Stream.Stream - ? (listener: (event: TEvent) => void) => () => void + ? (listener: (event: TEvent) => void, options?: StreamSubscriptionOptions) => () => void : never; interface GitRunStackedActionOptions { @@ -37,6 +42,7 @@ interface GitRunStackedActionOptions { export interface WsRpcClient { readonly dispose: () => Promise; + readonly reconnect: () => Promise; readonly terminal: { readonly open: RpcUnaryMethod; readonly write: RpcUnaryMethod; @@ -113,6 +119,10 @@ export async function __resetWsRpcClientForTests() { export function createWsRpcClient(transport = new WsTransport()): WsRpcClient { return { dispose: () => transport.dispose(), + reconnect: async () => { + resetWsReconnectBackoff(); + await transport.reconnect(); + }, terminal: { open: (input) => transport.request((client) => client[WS_METHODS.terminalOpen](input)), write: (input) => transport.request((client) => client[WS_METHODS.terminalWrite](input)), @@ -120,8 +130,12 @@ export function createWsRpcClient(transport = new WsTransport()): WsRpcClient { clear: (input) => transport.request((client) => client[WS_METHODS.terminalClear](input)), restart: (input) => transport.request((client) => client[WS_METHODS.terminalRestart](input)), close: (input) => transport.request((client) => client[WS_METHODS.terminalClose](input)), - onEvent: (listener) => - transport.subscribe((client) => client[WS_METHODS.subscribeTerminalEvents]({}), listener), + onEvent: (listener, options) => + transport.subscribe( + (client) => client[WS_METHODS.subscribeTerminalEvents]({}), + listener, + options, + ), }, projects: { searchEntries: (input) => @@ -179,10 +193,18 @@ export function createWsRpcClient(transport = new WsTransport()): WsRpcClient { getSettings: () => transport.request((client) => client[WS_METHODS.serverGetSettings]({})), updateSettings: (patch) => transport.request((client) => client[WS_METHODS.serverUpdateSettings]({ patch })), - subscribeConfig: (listener) => - transport.subscribe((client) => client[WS_METHODS.subscribeServerConfig]({}), listener), - subscribeLifecycle: (listener) => - transport.subscribe((client) => client[WS_METHODS.subscribeServerLifecycle]({}), listener), + subscribeConfig: (listener, options) => + transport.subscribe( + (client) => client[WS_METHODS.subscribeServerConfig]({}), + listener, + options, + ), + subscribeLifecycle: (listener, options) => + transport.subscribe( + (client) => client[WS_METHODS.subscribeServerLifecycle]({}), + listener, + options, + ), }, orchestration: { getSnapshot: () => @@ -197,10 +219,11 @@ export function createWsRpcClient(transport = new WsTransport()): WsRpcClient { transport .request((client) => client[ORCHESTRATION_WS_METHODS.replayEvents](input)) .then((events) => [...events]), - onDomainEvent: (listener) => + onDomainEvent: (listener, options) => transport.subscribe( (client) => client[WS_METHODS.subscribeOrchestrationDomainEvents]({}), listener, + options, ), }, }; diff --git a/apps/web/src/wsTransport.test.ts b/apps/web/src/wsTransport.test.ts index 2f64f2a692..c0e7803e5c 100644 --- a/apps/web/src/wsTransport.test.ts +++ b/apps/web/src/wsTransport.test.ts @@ -1,6 +1,16 @@ import { WS_METHODS } from "@t3tools/contracts"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { + __resetClientTracingForTests, + configureClientTracing, +} from "./observability/clientTracing"; +import { getSlowRpcAckRequests, resetRequestLatencyStateForTests } from "./rpc/requestLatencyState"; +import { + getWsConnectionStatus, + getWsConnectionUiState, + resetWsConnectionStateForTests, +} from "./rpc/wsConnectionState"; import { WsTransport } from "./wsTransport"; type WsEventType = "open" | "message" | "close" | "error"; @@ -53,6 +63,10 @@ class MockWebSocket { this.emit("message", { data, type: "message" }); } + error() { + this.emit("error", { type: "error" }); + } + private emit(type: WsEventType, event?: WsEvent) { const listeners = this.listeners.get(type); if (!listeners) return; @@ -63,6 +77,7 @@ class MockWebSocket { } const originalWebSocket = globalThis.WebSocket; +const originalFetch = globalThis.fetch; function getSocket(): MockWebSocket { const socket = sockets.at(-1); @@ -88,7 +103,10 @@ async function waitFor(assertion: () => void, timeoutMs = 1_000): Promise } beforeEach(() => { + vi.useRealTimers(); sockets.length = 0; + resetRequestLatencyStateForTests(); + resetWsConnectionStateForTests(); Object.defineProperty(globalThis, "window", { configurable: true, @@ -102,12 +120,20 @@ beforeEach(() => { desktopBridge: undefined, }, }); + Object.defineProperty(globalThis, "navigator", { + configurable: true, + value: { onLine: true }, + }); globalThis.WebSocket = MockWebSocket as unknown as typeof WebSocket; }); -afterEach(() => { +afterEach(async () => { globalThis.WebSocket = originalWebSocket; + globalThis.fetch = originalFetch; + resetRequestLatencyStateForTests(); + resetWsConnectionStateForTests(); + await __resetClientTracingForTests(); vi.restoreAllMocks(); }); @@ -141,6 +167,184 @@ describe("WsTransport", () => { await transport.dispose(); }); + it("tracks initial connection failures for the app error state", async () => { + const transport = new WsTransport("ws://localhost:3020"); + + await waitFor(() => { + expect(sockets).toHaveLength(1); + }); + + const socket = getSocket(); + expect(getWsConnectionStatus()).toMatchObject({ + attemptCount: 1, + phase: "connecting", + socketUrl: "ws://localhost:3020/ws", + }); + + socket.error(); + socket.close(1006, "server unavailable"); + + await waitFor(() => { + expect(getWsConnectionStatus()).toMatchObject({ + closeCode: 1006, + closeReason: "server unavailable", + hasConnected: false, + lastError: "Unable to connect to the T3 server WebSocket.", + phase: "disconnected", + }); + }); + expect(getWsConnectionUiState(getWsConnectionStatus())).toBe("error"); + + await transport.dispose(); + }); + + it("surfaces reconnecting state after a live socket disconnects", async () => { + const transport = new WsTransport("ws://localhost:3020"); + + await waitFor(() => { + expect(sockets).toHaveLength(1); + }); + + const socket = getSocket(); + socket.open(); + + await waitFor(() => { + expect(getWsConnectionStatus()).toMatchObject({ + hasConnected: true, + phase: "connected", + }); + }); + + socket.close(1013, "try again later"); + + await waitFor(() => { + expect(getWsConnectionStatus()).toMatchObject({ + closeReason: "try again later", + hasConnected: true, + }); + }); + expect(getWsConnectionUiState(getWsConnectionStatus())).toBe("reconnecting"); + + await transport.dispose(); + }); + + it("reconnects the websocket session without disposing the transport", async () => { + const transport = new WsTransport("ws://localhost:3020"); + + await waitFor(() => { + expect(sockets).toHaveLength(1); + }); + + const firstSocket = getSocket(); + firstSocket.open(); + + await waitFor(() => { + expect(getWsConnectionStatus()).toMatchObject({ + hasConnected: true, + phase: "connected", + }); + }); + + await transport.reconnect(); + + await waitFor(() => { + expect(sockets).toHaveLength(2); + }); + + const secondSocket = getSocket(); + expect(secondSocket).not.toBe(firstSocket); + expect(firstSocket.readyState).toBe(MockWebSocket.CLOSED); + + const requestPromise = transport.request((client) => + client[WS_METHODS.serverUpsertKeybinding]({ + command: "terminal.toggle", + key: "ctrl+k", + }), + ); + + secondSocket.open(); + + await waitFor(() => { + expect(secondSocket.sent).toHaveLength(1); + }); + + const requestMessage = JSON.parse(secondSocket.sent[0] ?? "{}") as { id: string }; + secondSocket.serverMessage( + JSON.stringify({ + _tag: "Exit", + requestId: requestMessage.id, + exit: { + _tag: "Success", + value: { + keybindings: [], + issues: [], + }, + }, + }), + ); + + await expect(requestPromise).resolves.toEqual({ + keybindings: [], + issues: [], + }); + + await transport.dispose(); + }); + + it("marks unary requests as slow until the first server ack arrives", async () => { + const transport = new WsTransport("ws://localhost:3020"); + + const requestPromise = transport.request((client) => + client[WS_METHODS.serverUpsertKeybinding]({ + command: "terminal.toggle", + key: "ctrl+k", + }), + ); + + await waitFor(() => { + expect(sockets).toHaveLength(1); + }); + + const socket = getSocket(); + socket.open(); + + await waitFor(() => { + expect(socket.sent).toHaveLength(1); + }); + + const requestMessage = JSON.parse(socket.sent[0] ?? "{}") as { id: string }; + await waitFor(() => { + expect(getSlowRpcAckRequests()).toMatchObject([ + { + requestId: requestMessage.id, + tag: WS_METHODS.serverUpsertKeybinding, + }, + ]); + }, 5_000); + + socket.serverMessage( + JSON.stringify({ + _tag: "Exit", + requestId: requestMessage.id, + exit: { + _tag: "Success", + value: { + keybindings: [], + issues: [], + }, + }, + }), + ); + + await expect(requestPromise).resolves.toEqual({ + keybindings: [], + issues: [], + }); + expect(getSlowRpcAckRequests()).toEqual([]); + + await transport.dispose(); + }, 10_000); + it("sends unary RPC requests and resolves successful exits", async () => { const transport = new WsTransport("ws://localhost:3020"); @@ -250,10 +454,12 @@ describe("WsTransport", () => { it("re-subscribes stream listeners after the stream exits", async () => { const transport = new WsTransport("ws://localhost:3020"); const listener = vi.fn(); + const onResubscribe = vi.fn(); const unsubscribe = transport.subscribe( (client) => client[WS_METHODS.subscribeServerLifecycle]({}), listener, + { onResubscribe }, ); await waitFor(() => { expect(sockets).toHaveLength(1); @@ -301,6 +507,7 @@ describe("WsTransport", () => { .find((message) => message._tag === "Request" && message.id !== firstRequest.id); expect(nextRequest).toBeDefined(); }); + expect(onResubscribe).toHaveBeenCalledOnce(); const secondRequest = socket.sent .map((message) => JSON.parse(message) as { _tag?: string; id?: string; tag?: string }) @@ -339,6 +546,52 @@ describe("WsTransport", () => { await transport.dispose(); }); + it("does not fire onResubscribe when the first stream attempt exits before any value", async () => { + const transport = new WsTransport("ws://localhost:3020"); + const listener = vi.fn(); + const onResubscribe = vi.fn(); + + const unsubscribe = transport.subscribe( + (client) => client[WS_METHODS.subscribeServerLifecycle]({}), + listener, + { onResubscribe }, + ); + await waitFor(() => { + expect(sockets).toHaveLength(1); + }); + + const socket = getSocket(); + socket.open(); + + await waitFor(() => { + expect(socket.sent).toHaveLength(1); + }); + + const firstRequest = JSON.parse(socket.sent[0] ?? "{}") as { id: string }; + socket.serverMessage( + JSON.stringify({ + _tag: "Exit", + requestId: firstRequest.id, + exit: { + _tag: "Success", + value: null, + }, + }), + ); + + await waitFor(() => { + const nextRequest = socket.sent + .map((message) => JSON.parse(message) as { _tag?: string; id?: string }) + .find((message) => message._tag === "Request" && message.id !== firstRequest.id); + expect(nextRequest).toBeDefined(); + }); + expect(onResubscribe).not.toHaveBeenCalled(); + expect(listener).not.toHaveBeenCalled(); + + unsubscribe(); + await transport.dispose(); + }); + it("streams finite request events without re-subscribing", async () => { const transport = new WsTransport("ws://localhost:3020"); const listener = vi.fn(); @@ -422,11 +675,21 @@ describe("WsTransport", () => { }; const transport = { disposed: false, - clientScope: {} as never, - runtime, + session: { + clientScope: {} as never, + runtime, + }, + closeSession: ( + WsTransport.prototype as unknown as { + closeSession: (session: { + clientScope: unknown; + runtime: { dispose: () => Promise; runPromise: () => Promise }; + }) => Promise; + } + ).closeSession, } as unknown as WsTransport; - WsTransport.prototype.dispose.call(transport); + void WsTransport.prototype.dispose.call(transport); expect(runtime.runPromise).toHaveBeenCalledTimes(1); expect(runtime.dispose).not.toHaveBeenCalled(); diff --git a/apps/web/src/wsTransport.ts b/apps/web/src/wsTransport.ts index 70042261d5..3e435ee167 100644 --- a/apps/web/src/wsTransport.ts +++ b/apps/web/src/wsTransport.ts @@ -1,14 +1,26 @@ -import { Duration, Effect, Exit, ManagedRuntime, Option, Scope, Stream } from "effect"; +import { + Cause, + Duration, + Effect, + Exit, + Layer, + ManagedRuntime, + Option, + Scope, + Stream, +} from "effect"; +import { RpcClient } from "effect/unstable/rpc"; +import { ClientTracingLive, configureClientTracing } from "./observability/clientTracing"; import { createWsRpcProtocolLayer, makeWsRpcProtocolClient, type WsRpcProtocolClient, } from "./rpc/protocol"; -import { RpcClient } from "effect/unstable/rpc"; interface SubscribeOptions { readonly retryDelay?: Duration.Input; + readonly onResubscribe?: () => void; } interface RequestOptions { @@ -16,6 +28,13 @@ interface RequestOptions { } const DEFAULT_SUBSCRIPTION_RETRY_DELAY_MS = Duration.millis(250); +const NOOP: () => void = () => undefined; + +interface TransportSession { + readonly clientPromise: Promise; + readonly clientScope: Scope.Closeable; + readonly runtime: ManagedRuntime.ManagedRuntime; +} function formatErrorMessage(error: unknown): string { if (error instanceof Error && error.message.trim().length > 0) { @@ -25,17 +44,16 @@ function formatErrorMessage(error: unknown): string { } export class WsTransport { - private readonly runtime: ManagedRuntime.ManagedRuntime; - private readonly clientScope: Scope.Closeable; - private readonly clientPromise: Promise; + private readonly tracingReady: Promise; + private readonly url: string | undefined; private disposed = false; + private reconnectChain: Promise = Promise.resolve(); + private session: TransportSession; constructor(url?: string) { - this.runtime = ManagedRuntime.make(createWsRpcProtocolLayer(url)); - this.clientScope = this.runtime.runSync(Scope.make()); - this.clientPromise = this.runtime.runPromise( - Scope.provide(this.clientScope)(makeWsRpcProtocolClient), - ); + this.url = url; + this.tracingReady = configureClientTracing(); + this.session = this.createSession(); } async request( @@ -46,8 +64,10 @@ export class WsTransport { throw new Error("Transport disposed"); } - const client = await this.clientPromise; - return await this.runtime.runPromise(Effect.suspend(() => execute(client))); + await this.tracingReady; + const session = this.session; + const client = await session.clientPromise; + return await session.runtime.runPromise(Effect.suspend(() => execute(client))); } async requestStream( @@ -58,8 +78,10 @@ export class WsTransport { throw new Error("Transport disposed"); } - const client = await this.clientPromise; - await this.runtime.runPromise( + await this.tracingReady; + const session = this.session; + const client = await session.clientPromise; + await session.runtime.runPromise( Stream.runForEach(connect(client), (value) => Effect.sync(() => { try { @@ -82,15 +104,132 @@ export class WsTransport { } let active = true; - const retryDelayMs = options?.retryDelay ?? DEFAULT_SUBSCRIPTION_RETRY_DELAY_MS; - const cancel = this.runtime.runCallback( - Effect.promise(() => this.clientPromise).pipe( + let hasReceivedValue = false; + const retryDelayMs = Duration.toMillis( + Duration.fromInputUnsafe(options?.retryDelay ?? DEFAULT_SUBSCRIPTION_RETRY_DELAY_MS), + ); + let cancelCurrentStream: () => void = NOOP; + + void (async () => { + for (;;) { + if (!active || this.disposed) { + return; + } + + try { + if (hasReceivedValue) { + try { + options?.onResubscribe?.(); + } catch { + // Swallow reconnect hook errors so the stream can recover. + } + } + + const session = this.session; + const runningStream = this.runStreamOnSession( + session, + connect, + listener, + () => active, + () => { + hasReceivedValue = true; + }, + ); + cancelCurrentStream = runningStream.cancel; + await runningStream.completed; + cancelCurrentStream = NOOP; + } catch (error) { + cancelCurrentStream = NOOP; + if (!active || this.disposed) { + return; + } + + console.warn("WebSocket RPC subscription disconnected", { + error: formatErrorMessage(error), + }); + await sleep(retryDelayMs); + } + } + })(); + + return () => { + active = false; + cancelCurrentStream(); + }; + } + + async reconnect() { + if (this.disposed) { + throw new Error("Transport disposed"); + } + + const reconnectOperation = this.reconnectChain.then(async () => { + if (this.disposed) { + throw new Error("Transport disposed"); + } + + const previousSession = this.session; + this.session = this.createSession(); + await this.closeSession(previousSession); + }); + + this.reconnectChain = reconnectOperation.catch(() => undefined); + await reconnectOperation; + } + + async dispose() { + if (this.disposed) { + return; + } + this.disposed = true; + await this.closeSession(this.session); + } + + private closeSession(session: TransportSession) { + return session.runtime.runPromise(Scope.close(session.clientScope, Exit.void)).finally(() => { + session.runtime.dispose(); + }); + } + + private createSession(): TransportSession { + const runtime = ManagedRuntime.make( + Layer.mergeAll(createWsRpcProtocolLayer(this.url), ClientTracingLive), + ); + const clientScope = runtime.runSync(Scope.make()); + return { + runtime, + clientScope, + clientPromise: runtime.runPromise(Scope.provide(clientScope)(makeWsRpcProtocolClient)), + }; + } + + private runStreamOnSession( + session: TransportSession, + connect: (client: WsRpcProtocolClient) => Stream.Stream, + listener: (value: TValue) => void, + isActive: () => boolean, + markValueReceived: () => void, + ): { + readonly cancel: () => void; + readonly completed: Promise; + } { + let resolveCompleted!: () => void; + let rejectCompleted!: (error: unknown) => void; + const completed = new Promise((resolve, reject) => { + resolveCompleted = resolve; + rejectCompleted = reject; + }); + const cancel = session.runtime.runCallback( + Effect.promise(() => this.tracingReady).pipe( + Effect.flatMap(() => Effect.promise(() => session.clientPromise)), Effect.flatMap((client) => Stream.runForEach(connect(client), (value) => Effect.sync(() => { - if (!active) { + if (!isActive()) { return; } + + markValueReceived(); try { listener(value); } catch { @@ -99,33 +238,28 @@ export class WsTransport { }), ), ), - Effect.catch((error) => { - if (!active || this.disposed) { - return Effect.interrupt; - } - return Effect.sync(() => { - console.warn("WebSocket RPC subscription disconnected", { - error: formatErrorMessage(error), - }); - }).pipe(Effect.andThen(Effect.sleep(retryDelayMs))); - }), - Effect.forever, ), + { + onExit: (exit) => { + if (Exit.isSuccess(exit)) { + resolveCompleted(); + return; + } + + rejectCompleted(Cause.squash(exit.cause)); + }, + }, ); - return () => { - active = false; - cancel(); + return { + cancel, + completed, }; } +} - async dispose() { - if (this.disposed) { - return; - } - this.disposed = true; - await this.runtime.runPromise(Scope.close(this.clientScope, Exit.void)).finally(() => { - this.runtime.dispose(); - }); - } +function sleep(ms: number): Promise { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); } diff --git a/packages/contracts/src/ipc.ts b/packages/contracts/src/ipc.ts index 2de3087938..33e45a2fb4 100644 --- a/packages/contracts/src/ipc.ts +++ b/packages/contracts/src/ipc.ts @@ -194,6 +194,11 @@ export interface NativeApi { input: OrchestrationGetFullThreadDiffInput, ) => Promise; replayEvents: (fromSequenceExclusive: number) => Promise; - onDomainEvent: (callback: (event: OrchestrationEvent) => void) => () => void; + onDomainEvent: ( + callback: (event: OrchestrationEvent) => void, + options?: { + onResubscribe?: () => void; + }, + ) => () => void; }; } From e55ab4967c46c6eaf2c899e736ac2fadf4daa89f Mon Sep 17 00:00:00 2001 From: sherlock Date: Sun, 5 Apr 2026 10:09:45 +0530 Subject: [PATCH 2/3] Add no-op clientTracing stub for standalone compilation This commit depends on clientTracing from upstream #1739 (OTLP trace proxy), which is in a separate PR (#48). This stub provides the required exports so this PR compiles independently. The real implementation from PR #48 will replace this stub when merged. --- apps/web/src/observability/clientTracing.ts | 30 +++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 apps/web/src/observability/clientTracing.ts diff --git a/apps/web/src/observability/clientTracing.ts b/apps/web/src/observability/clientTracing.ts new file mode 100644 index 0000000000..02890935d7 --- /dev/null +++ b/apps/web/src/observability/clientTracing.ts @@ -0,0 +1,30 @@ +/** + * Stub for client-side tracing. + * + * The real implementation is introduced by upstream PR #1739 (OTLP trace proxy, + * commit 04a1ae77) which will land via PR #48. Until that merge, this no-op + * shim keeps wsTransport compiling standalone. + */ +import { Layer } from "effect"; + +/** + * Initialise client-side tracing. No-op until the full OTLP implementation + * lands. + */ +export function configureClientTracing(): Promise { + return Promise.resolve(); +} + +/** + * A passthrough Effect layer — provides nothing, requires nothing. + * The real `ClientTracingLive` will supply an OTLP exporter layer. + */ +export const ClientTracingLive = Layer.empty; + +/** + * Reset helper used by tests to tear down tracing state between runs. + * No-op while the stub is in place. + */ +export function __resetClientTracingForTests(): Promise { + return Promise.resolve(); +} From 7594027d680e863cd4ec5e1c2b758cf1ce6544e6 Mon Sep 17 00:00:00 2001 From: sherlock Date: Sun, 5 Apr 2026 11:17:38 +0530 Subject: [PATCH 3/3] Address CodeRabbit review: await async reset helper in test teardown __resetWsRpcClientForTests() is async but was called without await, risking stale client leaking into subsequent tests. Propagate async through the call chain: wsNativeApi, nativeApi, and all browser test beforeEach/afterEach hooks. --- apps/web/src/components/ChatView.browser.tsx | 2 +- apps/web/src/components/KeybindingsToast.browser.tsx | 2 +- .../src/components/settings/SettingsPanels.browser.tsx | 8 ++++---- apps/web/src/nativeApi.ts | 4 ++-- apps/web/src/wsNativeApi.ts | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/apps/web/src/components/ChatView.browser.tsx b/apps/web/src/components/ChatView.browser.tsx index fe917251a3..b264c0d859 100644 --- a/apps/web/src/components/ChatView.browser.tsx +++ b/apps/web/src/components/ChatView.browser.tsx @@ -1138,7 +1138,7 @@ describe("ChatView timeline estimator parity (full app)", () => { return []; }, }); - __resetNativeApiForTests(); + await __resetNativeApiForTests(); await setViewport(DEFAULT_VIEWPORT); localStorage.clear(); document.body.innerHTML = ""; diff --git a/apps/web/src/components/KeybindingsToast.browser.tsx b/apps/web/src/components/KeybindingsToast.browser.tsx index f0b2ea89fa..6a9c0d8399 100644 --- a/apps/web/src/components/KeybindingsToast.browser.tsx +++ b/apps/web/src/components/KeybindingsToast.browser.tsx @@ -328,7 +328,7 @@ describe("Keybindings update toast", () => { return []; }, }); - __resetNativeApiForTests(); + await __resetNativeApiForTests(); localStorage.clear(); document.body.innerHTML = ""; useComposerDraftStore.setState({ diff --git a/apps/web/src/components/settings/SettingsPanels.browser.tsx b/apps/web/src/components/settings/SettingsPanels.browser.tsx index 090f6f12ad..f0ea32d4be 100644 --- a/apps/web/src/components/settings/SettingsPanels.browser.tsx +++ b/apps/web/src/components/settings/SettingsPanels.browser.tsx @@ -30,16 +30,16 @@ function createBaseServerConfig(): ServerConfig { } describe("GeneralSettingsPanel observability", () => { - beforeEach(() => { + beforeEach(async () => { resetServerStateForTests(); - __resetNativeApiForTests(); + await __resetNativeApiForTests(); localStorage.clear(); document.body.innerHTML = ""; }); - afterEach(() => { + afterEach(async () => { resetServerStateForTests(); - __resetNativeApiForTests(); + await __resetNativeApiForTests(); document.body.innerHTML = ""; }); diff --git a/apps/web/src/nativeApi.ts b/apps/web/src/nativeApi.ts index 9f528b6342..f9b0607347 100644 --- a/apps/web/src/nativeApi.ts +++ b/apps/web/src/nativeApi.ts @@ -25,7 +25,7 @@ export function ensureNativeApi(): NativeApi { return api; } -export function __resetNativeApiForTests() { +export async function __resetNativeApiForTests() { cachedApi = undefined; - __resetWsNativeApiForTests(); + await __resetWsNativeApiForTests(); } diff --git a/apps/web/src/wsNativeApi.ts b/apps/web/src/wsNativeApi.ts index 5fe9d10d6e..3fea96559f 100644 --- a/apps/web/src/wsNativeApi.ts +++ b/apps/web/src/wsNativeApi.ts @@ -8,9 +8,9 @@ import { __resetWsRpcClientForTests, getWsRpcClient } from "./wsRpcClient"; let instance: { api: NativeApi } | null = null; -export function __resetWsNativeApiForTests() { +export async function __resetWsNativeApiForTests() { instance = null; - __resetWsRpcClientForTests(); + await __resetWsRpcClientForTests(); resetRequestLatencyStateForTests(); resetServerStateForTests(); resetWsConnectionStateForTests();