diff --git a/src/bft/index.ts b/src/bft/index.ts index 5fbdd8d..0d06211 100644 --- a/src/bft/index.ts +++ b/src/bft/index.ts @@ -2,9 +2,17 @@ // // All subscriptions go through `eth_subscribe`, even the Sentrix-native // channels (sentrix_finalized, sentrix_validatorSet, sentrix_tokenOps, -// sentrix_stakingOps, sentrix_jail). The chain dispatches them by channel -// name — there is no separate `sentrix_subscribe` method, common -// confusion source. +// sentrix_stakingOps, sentrix_jail). The chain dispatches them by +// channel name — there is no separate `sentrix_subscribe` method, +// common confusion source. +// +// Recommended usage: instantiate `SubscriptionManager` once per process +// + call `.subscribe()` repeatedly. The manager multiplexes every +// subscription over one socket, sends keepalive pings every 30 s, and +// transparently re-subscribes after reconnect. The single-shot +// `subscribe(network, channel, opts)` helper is convenient for one-off +// scripts but opens its own socket — avoid in production code that +// listens to multiple channels. import WebSocket from "ws"; import { getSpec, type SentrixNetwork } from "../network.js"; @@ -22,6 +30,44 @@ export type Channel = | "sentrix_stakingOps" | "sentrix_jail"; +// Discriminated payload type per channel. Consumers that opt into +// `subscribeTyped()` get a precise payload type; the original +// untyped `subscribe()` path stays for back-compat with existing apps. +export interface NewHeadsPayload { + number: `0x${string}`; + hash: `0x${string}`; + parentHash: `0x${string}`; + timestamp: `0x${string}`; + miner: `0x${string}`; +} +export interface LogsPayload { + address: `0x${string}`; + topics: `0x${string}`[]; + data: `0x${string}`; + blockNumber: `0x${string}`; + transactionHash: `0x${string}`; + logIndex: `0x${string}`; + removed: boolean; +} +export type SentrixFinalizedPayload = { height: number; hash: `0x${string}` }; +export type SentrixValidatorSetPayload = { epoch: number; validators: `0x${string}`[] }; +// Native ops payloads stay loose for now — chain shape is still +// stabilising and a precise type would lag the source. Apps cast at +// the use site if they need a stricter shape. +export type SentrixOpsPayload = Record; + +export interface ChannelPayloadMap { + newHeads: NewHeadsPayload; + logs: LogsPayload; + newPendingTransactions: `0x${string}`; + syncing: boolean | { startingBlock: `0x${string}`; currentBlock: `0x${string}`; highestBlock: `0x${string}` }; + sentrix_finalized: SentrixFinalizedPayload; + sentrix_validatorSet: SentrixValidatorSetPayload; + sentrix_tokenOps: SentrixOpsPayload; + sentrix_stakingOps: SentrixOpsPayload; + sentrix_jail: SentrixOpsPayload; +} + export interface SubscribeOptions { /** Override the WS URL. */ wsUrl?: string; @@ -38,10 +84,11 @@ export interface Subscription { close(): Promise; } -/** Open a single subscription on a fresh WS connection. Use this when - * you want one channel and don't mind a dedicated socket. For - * multi-channel usage, instantiate `SubscriptionManager` once and - * call `subscribe` repeatedly to share a single connection. */ +/** Open a single subscription on a fresh WS connection. Convenience for + * one-off scripts. For multi-channel usage, instantiate + * `SubscriptionManager` once and call `subscribe` repeatedly to share + * a single connection — opening N sockets for N channels burns server + * file descriptors and breaks the per-IP connection cap. */ export function subscribe( network: SentrixNetwork, channel: Channel, @@ -60,19 +107,38 @@ interface InternalSub { channel: Channel; serverId: string; onMessage: (payload: unknown, channel: Channel) => void; + onError?: (err: Error) => void; + filter?: Record; +} + +interface PendingSubscribe { + channel: Channel; + resolve: (serverId: string) => void; + reject: (err: Error) => void; } /** Multiplexes many subscriptions over one WebSocket. Reconnects with - * exponential backoff on close (1s → 2s → 4s → 8s → 16s, capped 30s); - * re-subscribes registered channels after each reconnect. */ + * exponential backoff on close (1s → 2s → 4s → 8s → 16s → 30s capped); + * re-subscribes registered channels after each reconnect. Sends a + * WebSocket ping frame every 30 s to keep middleboxes (NAT, ALB, Caddy) + * from killing the connection during quiet periods. */ export class SubscriptionManager { private readonly wsUrl: string; private ws: WebSocket | null = null; private nextId = 1; private subs = new Map(); // serverId → sub - private pending = new Map void>(); // jsonrpc id → resolver + private pending = new Map(); // jsonrpc id → callback pair private backoffMs = 1000; private closed = false; + private pingTimer: ReturnType | null = null; + /** Last time a frame (subscribe-response, event, or pong) arrived. + * Used by the keepalive interval to detect a half-open connection. */ + private lastFrameAt = 0; + /** How long to wait between pings + how long without a frame before + * we consider the socket dead and force a reconnect. Tunable per + * environment via constructor. */ + private static readonly KEEPALIVE_INTERVAL_MS = 30_000; + private static readonly STALE_TIMEOUT_MS = 90_000; constructor(network: SentrixNetwork, wsUrl?: string) { this.wsUrl = wsUrl ?? getSpec(network).wsUrl; @@ -88,20 +154,26 @@ export class SubscriptionManager { if (channel === "logs" && opts.filter) params.push(opts.filter); const serverId = await new Promise((resolve, reject) => { - this.pending.set(id, resolve); + this.pending.set(id, { channel, resolve, reject }); const payload = { jsonrpc: "2.0", id, method: "eth_subscribe", params }; try { this.ws!.send(JSON.stringify(payload)); } catch (err) { this.pending.delete(id); - reject(err); + reject(err as Error); } setTimeout(() => { if (this.pending.delete(id)) reject(new Error(`subscribe ${channel} timed out`)); }, 10_000); }); - this.subs.set(serverId, { channel, serverId, onMessage: opts.onMessage }); + this.subs.set(serverId, { + channel, + serverId, + onMessage: opts.onMessage, + onError: opts.onError, + filter: opts.filter, + }); return { close: async () => { @@ -115,13 +187,51 @@ export class SubscriptionManager { }; } + /** Typed alternative to `subscribe`. The payload type is derived + * from the channel via `ChannelPayloadMap` — `subscribeTyped("newHeads", ...)` + * gives `payload: NewHeadsPayload` instead of `unknown`. */ + subscribeTyped( + channel: C, + opts: { + filter?: Record; + onMessage: (payload: ChannelPayloadMap[C]) => void; + onError?: (err: Error) => void; + }, + ): Promise { + return this.subscribe(channel, { + filter: opts.filter, + onMessage: (p) => opts.onMessage(p as ChannelPayloadMap[C]), + onError: opts.onError, + }); + } + /** Close the underlying socket and stop reconnecting. */ close(): void { this.closed = true; + if (this.pingTimer) { + clearInterval(this.pingTimer); + this.pingTimer = null; + } this.ws?.close(); this.ws = null; } + /** Diagnostic snapshot — useful for ops dashboards / debug pages. */ + status(): { + socketState: "open" | "connecting" | "closed"; + subs: number; + secondsSinceLastFrame: number | null; + } { + const sec = + this.lastFrameAt === 0 ? null : Math.floor((Date.now() - this.lastFrameAt) / 1000); + let state: "open" | "connecting" | "closed" = "closed"; + if (this.ws) { + if (this.ws.readyState === WebSocket.OPEN) state = "open"; + else if (this.ws.readyState === WebSocket.CONNECTING) state = "connecting"; + } + return { socketState: state, subs: this.subs.size, secondsSinceLastFrame: sec }; + } + private ensureSocket(onError?: (err: Error) => void): Promise { if (this.ws && this.ws.readyState === WebSocket.OPEN) return Promise.resolve(); return new Promise((resolve, reject) => { @@ -130,38 +240,77 @@ export class SubscriptionManager { ws.on("open", () => { this.backoffMs = 1000; + this.lastFrameAt = Date.now(); + this.startKeepalive(); resolve(); }); ws.on("message", (raw: WebSocket.RawData) => { - const msg = JSON.parse(raw.toString()); - // Subscribe-response (one-time): { id, result: "0x..." } - if (typeof msg.id === "number" && typeof msg.result === "string") { + this.lastFrameAt = Date.now(); + let msg: { id?: number; result?: string; method?: string; params?: { subscription?: string; result?: unknown }; error?: { message: string } }; + try { + msg = JSON.parse(raw.toString()); + } catch { + // Malformed frame — drop. Can happen on edge buffer fragmentation. + return; + } + // Subscribe-response (one-time): { id, result: "0x..." } OR { id, error } + if (typeof msg.id === "number" && (typeof msg.result === "string" || msg.error)) { const cb = this.pending.get(msg.id); - if (cb) { - this.pending.delete(msg.id); - cb(msg.result); + if (!cb) return; + this.pending.delete(msg.id); + if (msg.error) { + cb.reject(new Error(`eth_subscribe ${cb.channel}: ${msg.error.message}`)); + } else if (typeof msg.result === "string") { + cb.resolve(msg.result); } return; } // Stream event: { method: "eth_subscription", params: { subscription, result } } if (msg.method === "eth_subscription") { - const sid = msg.params?.subscription as string | undefined; + const sid = msg.params?.subscription; if (!sid) return; const sub = this.subs.get(sid); - sub?.onMessage(msg.params.result, sub.channel); + sub?.onMessage(msg.params!.result, sub.channel); } }); + // Pong frame from server keepalive ping. Resets the + // last-frame timestamp so a long-quiet subscription (eg + // sentrix_jail with no events for hours) doesn't trip the + // stale-connection guard. + ws.on("pong", () => { + this.lastFrameAt = Date.now(); + }); + ws.on("error", (err) => { if (onError) onError(err); - if (this.pending.size > 0) { - for (const r of this.pending.keys()) this.pending.delete(r); - reject(err); + // Reject every pending subscribe — pre-fix only the first + // pending caller saw the rejection, the rest hung until their + // 10 s timeout fired one-by-one. Now they all get the same + // surfaced error immediately. + for (const [id, p] of this.pending) { + this.pending.delete(id); + p.reject(err); } + // Surface to per-sub error handlers too. + for (const sub of this.subs.values()) sub.onError?.(err); + reject(err); }); ws.on("close", () => { + if (this.pingTimer) { + clearInterval(this.pingTimer); + this.pingTimer = null; + } + // Reject any pending subscribes — same race fix as the error + // path. Without this a subscribe in flight when the close + // lands resolves never (the stream-event path can't fire on + // a closed socket). + for (const [id, p] of this.pending) { + this.pending.delete(id); + p.reject(new Error("websocket closed before subscribe response")); + } if (this.closed) return; // Reconnect with exponential backoff, then re-subscribe. const wait = Math.min(this.backoffMs, 30_000); @@ -169,21 +318,55 @@ export class SubscriptionManager { setTimeout(() => { this.ensureSocket(onError) .then(() => this.resubscribeAll(onError)) - .catch(() => {}); + .catch(() => { + /* will retry via the next close event */ + }); }, wait); }); }); } + /** Send a WebSocket ping frame every KEEPALIVE_INTERVAL_MS. If the + * socket has gone STALE_TIMEOUT_MS without any frame, force a close + * (which triggers the reconnect path). Middleboxes — Caddy + * reverse_proxy idle_timeout, NAT, AWS ALB — drop quiet + * connections at 60–120 s; the ping keeps them open. */ + private startKeepalive(): void { + if (this.pingTimer) clearInterval(this.pingTimer); + this.pingTimer = setInterval(() => { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return; + const idle = Date.now() - this.lastFrameAt; + if (idle > SubscriptionManager.STALE_TIMEOUT_MS) { + // Half-open — server stopped pong'ing. Force close so the + // close handler fires and reconnects. + try { + this.ws.terminate(); + } catch { + /* ignore */ + } + return; + } + try { + this.ws.ping(); + } catch { + /* socket may have closed mid-call; close handler will recover */ + } + }, SubscriptionManager.KEEPALIVE_INTERVAL_MS); + } + private async resubscribeAll(onError?: (err: Error) => void): Promise { if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return; const stale = Array.from(this.subs.values()); this.subs.clear(); for (const old of stale) { try { - await this.subscribe(old.channel, { onMessage: old.onMessage, onError }); + await this.subscribe(old.channel, { + filter: old.filter, + onMessage: old.onMessage, + onError: old.onError ?? onError, + }); } catch (err) { - onError?.(err as Error); + (old.onError ?? onError)?.(err as Error); } } } diff --git a/src/network.ts b/src/network.ts index 0a61638..05d2caf 100644 --- a/src/network.ts +++ b/src/network.ts @@ -26,7 +26,7 @@ export interface SentrixChainSpec { } export const sentrixMainnet: SentrixChainSpec = { - name: "Sentrix Mainnet", + name: "Sentrix Chain", chainId: 7119, rpcUrl: "https://rpc.sentrixchain.com", wsUrl: "wss://rpc.sentrixchain.com/ws", @@ -42,7 +42,7 @@ export const sentrixTestnet: SentrixChainSpec = { rpcUrl: "https://testnet-rpc.sentrixchain.com", wsUrl: "wss://testnet-rpc.sentrixchain.com/ws", restUrl: "https://testnet-rpc.sentrixchain.com", - explorerUrl: "https://scan.sentrixchain.com", + explorerUrl: "https://scan-testnet.sentrixchain.com", verifierUrl: "https://verify.sentrixchain.com", faucetUrl: "https://faucet.sentrixchain.com", };