Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 58 additions & 25 deletions src/hooks/useMultiExchangeWebSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ const RECONNECT_MAX_DELAY = 60000;
const MAX_MESSAGE_SIZE = 1024 * 1024; // 1MB guard against oversized payloads
// Close codes that indicate we should not reconnect
const NO_RECONNECT_CODES = new Set([1008, 1011, 4000, 4001, 4003]);
const PING_INTERVAL = 30000; // 30s keepalive for exchanges that need it

interface WebSocketConfig {
exchange: Exchange;
url: string;
subscribe?: object;
ping?: object; // Keepalive message to send periodically
parse: (data: unknown, threshold: number) => Liquidation[];
}

Expand Down Expand Up @@ -142,46 +144,53 @@ const EXCHANGES: WebSocketConfig[] = [
method: 'subscribe',
subscription: { type: 'trades', coin: 'BTC' },
},
ping: { method: 'ping' },
parse: (data: unknown, threshold: number) => {
const msg = data as {
channel?: string;
data?: {
data?: Array<{
coin?: string;
side?: string;
px?: string;
sz?: string;
time?: number;
tid?: number;
liquidation?: boolean;
startPosition?: boolean;
dir?: string;
closedPnl?: string;
};
}>;
};

// Check if it's a fill with liquidation
if (!msg.data || !msg.data.liquidation) return [];
if (!msg.data.coin?.toUpperCase().includes('BTC')) return [];
// Hyperliquid trades channel sends data as an array
if (!Array.isArray(msg.data)) return [];

const quantity = parseFloat(msg.data.sz || '0');
const price = parseFloat(msg.data.px || '0');
if (!isFinite(quantity) || !isFinite(price)) return [];
const valueUsd = quantity * price;
const results: Liquidation[] = [];
for (const trade of msg.data) {
if (!trade.liquidation) continue;
if (!trade.coin?.toUpperCase().includes('BTC')) continue;

if (valueUsd < threshold) return [];
const quantity = parseFloat(trade.sz || '0');
const price = parseFloat(trade.px || '0');
if (!isFinite(quantity) || !isFinite(price)) continue;
const valueUsd = quantity * price;

// Determine side based on direction
const isLong = msg.data.side === 'A' || msg.data.dir?.includes('Long');
if (valueUsd < threshold) continue;

return [{
id: `hl-${msg.data.time}-${msg.data.side}`,
exchange: 'Hyperliquid',
symbol: msg.data.coin || 'BTC',
side: isLong ? 'Long' : 'Short',
quantity,
price,
valueUsd,
timestamp: new Date(msg.data.time || Date.now()),
}];
const isLong = trade.side === 'A' || trade.dir?.includes('Long');

results.push({
id: `hl-${trade.tid ?? trade.time}-${trade.side}-${Math.random().toString(36).slice(2, 6)}`,
exchange: 'Hyperliquid',
symbol: trade.coin || 'BTC',
side: isLong ? 'Long' : 'Short',
quantity,
price,
valueUsd,
timestamp: new Date(trade.time || Date.now()),
});
}
return results;
},
},
// Aevo - Subscribe to trades and filter liquidations
Expand All @@ -190,7 +199,7 @@ const EXCHANGES: WebSocketConfig[] = [
url: 'wss://ws.aevo.xyz',
subscribe: {
op: 'subscribe',
data: ['orderbook:BTC-PERP'],
data: ['trades:BTC-PERP'],
},
parse: (data: unknown, threshold: number) => {
const msg = data as {
Expand Down Expand Up @@ -247,6 +256,7 @@ export function useMultiExchangeWebSocket(threshold: number = 10000) {
const wsRefs = useRef<Map<Exchange, WebSocket>>(new Map());
const reconnectTimeouts = useRef<Map<Exchange, NodeJS.Timeout>>(new Map());
const reconnectAttempts = useRef<Map<Exchange, number>>(new Map());
const pingIntervals = useRef<Map<Exchange, NodeJS.Timeout>>(new Map());
const thresholdRef = useRef(threshold);

// Keep threshold ref updated
Expand Down Expand Up @@ -276,6 +286,19 @@ export function useMultiExchangeWebSocket(threshold: number = 10000) {
if (config.subscribe) {
ws.send(JSON.stringify(config.subscribe));
}

// Start keepalive ping if configured
if (config.ping) {
const existingPing = pingIntervals.current.get(config.exchange);
if (existingPing) clearInterval(existingPing);

const interval = setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(config.ping));
}
}, PING_INTERVAL);
pingIntervals.current.set(config.exchange, interval);
}
};

ws.onmessage = (event) => {
Expand All @@ -292,8 +315,8 @@ export function useMultiExchangeWebSocket(threshold: number = 10000) {
return [...deduped, ...prev].slice(0, MAX_LIQUIDATIONS);
});
}
} catch {
// Silent parse errors
} catch (e) {
console.debug(`[${config.exchange}] parse error:`, e);
}
};

Expand All @@ -304,6 +327,13 @@ export function useMultiExchangeWebSocket(threshold: number = 10000) {
ws.onclose = (event) => {
updateConnection(config.exchange, { isConnected: false });

// Clear ping interval on close
const pingInterval = pingIntervals.current.get(config.exchange);
if (pingInterval) {
clearInterval(pingInterval);
pingIntervals.current.delete(config.exchange);
}

if (NO_RECONNECT_CODES.has(event.code)) {
console.log(`${config.exchange} closed with code ${event.code}, not reconnecting`);
updateConnection(config.exchange, { error: `Rejected (code ${event.code})` });
Expand Down Expand Up @@ -340,6 +370,9 @@ export function useMultiExchangeWebSocket(threshold: number = 10000) {
reconnectTimeouts.current.clear();
reconnectAttempts.current.clear();

pingIntervals.current.forEach((interval) => clearInterval(interval));
pingIntervals.current.clear();

wsRefs.current.forEach((ws) => ws.close());
wsRefs.current.clear();
}, []);
Expand Down
Loading