Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 210 additions & 27 deletions src/bft/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,17 @@
//
// All subscriptions go through `eth_subscribe`, even the Sentrix-native
// channels (sentrix_finalized, sentrix_validatorSet, sentrix_tokenOps,
// sentrix_stakingOps, sentrix_jail). The chain dispatches them by channel
// name — there is no separate `sentrix_subscribe` method, common
// confusion source.
// sentrix_stakingOps, sentrix_jail). The chain dispatches them by
// channel name — there is no separate `sentrix_subscribe` method,
// common confusion source.
//
// Recommended usage: instantiate `SubscriptionManager` once per process
// + call `.subscribe()` repeatedly. The manager multiplexes every
// subscription over one socket, sends keepalive pings every 30 s, and
// transparently re-subscribes after reconnect. The single-shot
// `subscribe(network, channel, opts)` helper is convenient for one-off
// scripts but opens its own socket — avoid in production code that
// listens to multiple channels.

import WebSocket from "ws";
import { getSpec, type SentrixNetwork } from "../network.js";
Expand All @@ -22,6 +30,44 @@ export type Channel =
| "sentrix_stakingOps"
| "sentrix_jail";

// Discriminated payload type per channel. Consumers that opt into
// `subscribeTyped<C>()` get a precise payload type; the original
// untyped `subscribe()` path stays for back-compat with existing apps.
export interface NewHeadsPayload {
number: `0x${string}`;
hash: `0x${string}`;
parentHash: `0x${string}`;
timestamp: `0x${string}`;
miner: `0x${string}`;
}
export interface LogsPayload {
address: `0x${string}`;
topics: `0x${string}`[];
data: `0x${string}`;
blockNumber: `0x${string}`;
transactionHash: `0x${string}`;
logIndex: `0x${string}`;
removed: boolean;
}
export type SentrixFinalizedPayload = { height: number; hash: `0x${string}` };
export type SentrixValidatorSetPayload = { epoch: number; validators: `0x${string}`[] };
// Native ops payloads stay loose for now — chain shape is still
// stabilising and a precise type would lag the source. Apps cast at
// the use site if they need a stricter shape.
export type SentrixOpsPayload = Record<string, unknown>;

export interface ChannelPayloadMap {
newHeads: NewHeadsPayload;
logs: LogsPayload;
newPendingTransactions: `0x${string}`;
syncing: boolean | { startingBlock: `0x${string}`; currentBlock: `0x${string}`; highestBlock: `0x${string}` };
sentrix_finalized: SentrixFinalizedPayload;
sentrix_validatorSet: SentrixValidatorSetPayload;
sentrix_tokenOps: SentrixOpsPayload;
sentrix_stakingOps: SentrixOpsPayload;
sentrix_jail: SentrixOpsPayload;
}

export interface SubscribeOptions {
/** Override the WS URL. */
wsUrl?: string;
Expand All @@ -38,10 +84,11 @@ export interface Subscription {
close(): Promise<void>;
}

/** Open a single subscription on a fresh WS connection. Use this when
* you want one channel and don't mind a dedicated socket. For
* multi-channel usage, instantiate `SubscriptionManager` once and
* call `subscribe` repeatedly to share a single connection. */
/** Open a single subscription on a fresh WS connection. Convenience for
* one-off scripts. For multi-channel usage, instantiate
* `SubscriptionManager` once and call `subscribe` repeatedly to share
* a single connection — opening N sockets for N channels burns server
* file descriptors and breaks the per-IP connection cap. */
export function subscribe(
network: SentrixNetwork,
channel: Channel,
Expand All @@ -60,19 +107,38 @@ interface InternalSub {
channel: Channel;
serverId: string;
onMessage: (payload: unknown, channel: Channel) => void;
onError?: (err: Error) => void;
filter?: Record<string, unknown>;
}

interface PendingSubscribe {
channel: Channel;
resolve: (serverId: string) => void;
reject: (err: Error) => void;
}

/** Multiplexes many subscriptions over one WebSocket. Reconnects with
* exponential backoff on close (1s → 2s → 4s → 8s → 16s, capped 30s);
* re-subscribes registered channels after each reconnect. */
* exponential backoff on close (1s → 2s → 4s → 8s → 16s → 30s capped);
* re-subscribes registered channels after each reconnect. Sends a
* WebSocket ping frame every 30 s to keep middleboxes (NAT, ALB, Caddy)
* from killing the connection during quiet periods. */
export class SubscriptionManager {
private readonly wsUrl: string;
private ws: WebSocket | null = null;
private nextId = 1;
private subs = new Map<string, InternalSub>(); // serverId → sub
private pending = new Map<number, (id: string) => void>(); // jsonrpc id → resolver
private pending = new Map<number, PendingSubscribe>(); // jsonrpc id → callback pair
private backoffMs = 1000;
private closed = false;
private pingTimer: ReturnType<typeof setInterval> | null = null;
/** Last time a frame (subscribe-response, event, or pong) arrived.
* Used by the keepalive interval to detect a half-open connection. */
private lastFrameAt = 0;
/** How long to wait between pings + how long without a frame before
* we consider the socket dead and force a reconnect. Tunable per
* environment via constructor. */
private static readonly KEEPALIVE_INTERVAL_MS = 30_000;
private static readonly STALE_TIMEOUT_MS = 90_000;

constructor(network: SentrixNetwork, wsUrl?: string) {
this.wsUrl = wsUrl ?? getSpec(network).wsUrl;
Expand All @@ -88,20 +154,26 @@ export class SubscriptionManager {
if (channel === "logs" && opts.filter) params.push(opts.filter);

const serverId = await new Promise<string>((resolve, reject) => {
this.pending.set(id, resolve);
this.pending.set(id, { channel, resolve, reject });
const payload = { jsonrpc: "2.0", id, method: "eth_subscribe", params };
try {
this.ws!.send(JSON.stringify(payload));
} catch (err) {
this.pending.delete(id);
reject(err);
reject(err as Error);
}
setTimeout(() => {
if (this.pending.delete(id)) reject(new Error(`subscribe ${channel} timed out`));
}, 10_000);
});

this.subs.set(serverId, { channel, serverId, onMessage: opts.onMessage });
this.subs.set(serverId, {
channel,
serverId,
onMessage: opts.onMessage,
onError: opts.onError,
filter: opts.filter,
});

return {
close: async () => {
Expand All @@ -115,13 +187,51 @@ export class SubscriptionManager {
};
}

/** Typed alternative to `subscribe`. The payload type is derived
* from the channel via `ChannelPayloadMap` — `subscribeTyped("newHeads", ...)`
* gives `payload: NewHeadsPayload` instead of `unknown`. */
subscribeTyped<C extends Channel>(
channel: C,
opts: {
filter?: Record<string, unknown>;
onMessage: (payload: ChannelPayloadMap[C]) => void;
onError?: (err: Error) => void;
},
): Promise<Subscription> {
return this.subscribe(channel, {
filter: opts.filter,
onMessage: (p) => opts.onMessage(p as ChannelPayloadMap[C]),
onError: opts.onError,
});
}

/** Close the underlying socket and stop reconnecting. */
close(): void {
this.closed = true;
if (this.pingTimer) {
clearInterval(this.pingTimer);
this.pingTimer = null;
}
this.ws?.close();
this.ws = null;
}

/** Diagnostic snapshot — useful for ops dashboards / debug pages. */
status(): {
socketState: "open" | "connecting" | "closed";
subs: number;
secondsSinceLastFrame: number | null;
} {
const sec =
this.lastFrameAt === 0 ? null : Math.floor((Date.now() - this.lastFrameAt) / 1000);
let state: "open" | "connecting" | "closed" = "closed";
if (this.ws) {
if (this.ws.readyState === WebSocket.OPEN) state = "open";
else if (this.ws.readyState === WebSocket.CONNECTING) state = "connecting";
}
return { socketState: state, subs: this.subs.size, secondsSinceLastFrame: sec };
}

private ensureSocket(onError?: (err: Error) => void): Promise<void> {
if (this.ws && this.ws.readyState === WebSocket.OPEN) return Promise.resolve();
return new Promise((resolve, reject) => {
Expand All @@ -130,60 +240,133 @@ export class SubscriptionManager {

ws.on("open", () => {
this.backoffMs = 1000;
this.lastFrameAt = Date.now();
this.startKeepalive();
resolve();
});

ws.on("message", (raw: WebSocket.RawData) => {
const msg = JSON.parse(raw.toString());
// Subscribe-response (one-time): { id, result: "0x..." }
if (typeof msg.id === "number" && typeof msg.result === "string") {
this.lastFrameAt = Date.now();
let msg: { id?: number; result?: string; method?: string; params?: { subscription?: string; result?: unknown }; error?: { message: string } };
try {
msg = JSON.parse(raw.toString());
} catch {
// Malformed frame — drop. Can happen on edge buffer fragmentation.
return;
}
// Subscribe-response (one-time): { id, result: "0x..." } OR { id, error }
if (typeof msg.id === "number" && (typeof msg.result === "string" || msg.error)) {
const cb = this.pending.get(msg.id);
if (cb) {
this.pending.delete(msg.id);
cb(msg.result);
if (!cb) return;
this.pending.delete(msg.id);
if (msg.error) {
cb.reject(new Error(`eth_subscribe ${cb.channel}: ${msg.error.message}`));
} else if (typeof msg.result === "string") {
cb.resolve(msg.result);
}
return;
}
// Stream event: { method: "eth_subscription", params: { subscription, result } }
if (msg.method === "eth_subscription") {
const sid = msg.params?.subscription as string | undefined;
const sid = msg.params?.subscription;
if (!sid) return;
const sub = this.subs.get(sid);
sub?.onMessage(msg.params.result, sub.channel);
sub?.onMessage(msg.params!.result, sub.channel);
}
});

// Pong frame from server keepalive ping. Resets the
// last-frame timestamp so a long-quiet subscription (eg
// sentrix_jail with no events for hours) doesn't trip the
// stale-connection guard.
ws.on("pong", () => {
this.lastFrameAt = Date.now();
});

ws.on("error", (err) => {
if (onError) onError(err);
if (this.pending.size > 0) {
for (const r of this.pending.keys()) this.pending.delete(r);
reject(err);
// Reject every pending subscribe — pre-fix only the first
// pending caller saw the rejection, the rest hung until their
// 10 s timeout fired one-by-one. Now they all get the same
// surfaced error immediately.
for (const [id, p] of this.pending) {
this.pending.delete(id);
p.reject(err);
}
// Surface to per-sub error handlers too.
for (const sub of this.subs.values()) sub.onError?.(err);
reject(err);
});

ws.on("close", () => {
if (this.pingTimer) {
clearInterval(this.pingTimer);
this.pingTimer = null;
}
// Reject any pending subscribes — same race fix as the error
// path. Without this a subscribe in flight when the close
// lands resolves never (the stream-event path can't fire on
// a closed socket).
for (const [id, p] of this.pending) {
this.pending.delete(id);
p.reject(new Error("websocket closed before subscribe response"));
}
if (this.closed) return;
// Reconnect with exponential backoff, then re-subscribe.
const wait = Math.min(this.backoffMs, 30_000);
this.backoffMs = Math.min(this.backoffMs * 2, 30_000);
setTimeout(() => {
this.ensureSocket(onError)
.then(() => this.resubscribeAll(onError))
.catch(() => {});
.catch(() => {
/* will retry via the next close event */
});
}, wait);
});
});
}

/** Send a WebSocket ping frame every KEEPALIVE_INTERVAL_MS. If the
* socket has gone STALE_TIMEOUT_MS without any frame, force a close
* (which triggers the reconnect path). Middleboxes — Caddy
* reverse_proxy idle_timeout, NAT, AWS ALB — drop quiet
* connections at 60–120 s; the ping keeps them open. */
private startKeepalive(): void {
if (this.pingTimer) clearInterval(this.pingTimer);
this.pingTimer = setInterval(() => {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return;
const idle = Date.now() - this.lastFrameAt;
if (idle > SubscriptionManager.STALE_TIMEOUT_MS) {
// Half-open — server stopped pong'ing. Force close so the
// close handler fires and reconnects.
try {
this.ws.terminate();
} catch {
/* ignore */
}
return;
}
try {
this.ws.ping();
} catch {
/* socket may have closed mid-call; close handler will recover */
}
}, SubscriptionManager.KEEPALIVE_INTERVAL_MS);
}

private async resubscribeAll(onError?: (err: Error) => void): Promise<void> {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return;
const stale = Array.from(this.subs.values());
this.subs.clear();
for (const old of stale) {
try {
await this.subscribe(old.channel, { onMessage: old.onMessage, onError });
await this.subscribe(old.channel, {
filter: old.filter,
onMessage: old.onMessage,
onError: old.onError ?? onError,
});
} catch (err) {
onError?.(err as Error);
(old.onError ?? onError)?.(err as Error);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export interface SentrixChainSpec {
}

export const sentrixMainnet: SentrixChainSpec = {
name: "Sentrix Mainnet",
name: "Sentrix Chain",
chainId: 7119,
rpcUrl: "https://rpc.sentrixchain.com",
wsUrl: "wss://rpc.sentrixchain.com/ws",
Expand All @@ -42,7 +42,7 @@ export const sentrixTestnet: SentrixChainSpec = {
rpcUrl: "https://testnet-rpc.sentrixchain.com",
wsUrl: "wss://testnet-rpc.sentrixchain.com/ws",
restUrl: "https://testnet-rpc.sentrixchain.com",
explorerUrl: "https://scan.sentrixchain.com",
explorerUrl: "https://scan-testnet.sentrixchain.com",
verifierUrl: "https://verify.sentrixchain.com",
faucetUrl: "https://faucet.sentrixchain.com",
};
Expand Down
Loading