diff --git a/examples/ws-userdata.ts b/examples/ws-userdata.ts index 41834446..b15148ce 100644 --- a/examples/ws-userdata.ts +++ b/examples/ws-userdata.ts @@ -2,8 +2,8 @@ import { DefaultLogger, isWsFormattedFuturesUserDataEvent, isWsFormattedSpotUserDataEvent, - isWsFormattedUserDataEvent, isWsFormattedSpotUserDataExecutionReport, + isWsFormattedUserDataEvent, WsUserDataEvents, } from '../src'; import { WebsocketClient } from '../src/websocket-client'; @@ -12,8 +12,10 @@ import { WebsocketClient } from '../src/websocket-client'; // import { DefaultLogger, WebsocketClient } from 'binance'; (async () => { - const key = process.env.APIKEY || 'APIKEY'; - const secret = process.env.APISECRET || 'APISECRET'; + const key = process.env.API_KEY_COM || 'APIKEY'; + const secret = process.env.API_SECRET_COM || 'APISECRET'; + + console.log({ key, secret }); const ignoredSillyLogMsgs = [ 'Sending ping', @@ -38,7 +40,7 @@ import { WebsocketClient } from '../src/websocket-client'; api_secret: secret, beautify: true, }, - logger + logger, ); wsClient.on('message', (data) => { diff --git a/package-lock.json b/package-lock.json index 02cbc711..ea1f7dc2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,7 +9,7 @@ "version": "2.15.6", "license": "MIT", "dependencies": { - "axios": "^1.6.2", + "axios": "^1.7.9", "isomorphic-ws": "^4.0.1", "nanoid": "^3.1.30", "ws": "^7.4.0" @@ -23,9 +23,9 @@ "eslint-plugin-prettier": "^5.2.1", "eslint-plugin-require-extensions": "^0.1.3", "eslint-plugin-simple-import-sort": "^12.1.1", - "jest": "^29.1.1", - "ts-jest": "^29.1.1", - "typescript": "^4.7.4" + "jest": "^29.7.0", + "ts-jest": "^29.1.2", + "typescript": "^5.7.3" }, "funding": { "type": "individual", @@ -2196,15 +2196,22 @@ "sprintf-js": "~1.0.2" } }, + "node_modules/async": { + "version": "3.2.6", + "resolved": "https://registry.npmjs.org/async/-/async-3.2.6.tgz", + "integrity": "sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA==", + "dev": true, + "license": "MIT" + }, "node_modules/asynckit": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", "integrity": "sha1-x57Zf380y48robyXkLzDZkdLS3k=" }, "node_modules/axios": { - "version": "1.7.7", - "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.7.tgz", - "integrity": "sha512-S4kL7XrjgBmvdGut0sN3yJxqYzrDOnivkBiN0OFs6hLiUam3UPvswUo0kqGyhqUZGEOytHyumEdXsAkgCOUf3Q==", + "version": "1.7.9", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.9.tgz", + "integrity": "sha512-LhLcE7Hbiryz8oMDdDptSrWowmB4Bl6RCt6sIJKpRB4XtVf0iEgewX3au/pJqm+Py1kCASkb/FFKjxQaLtxJvw==", "license": "MIT", "dependencies": { "follow-redirects": "^1.15.6", @@ -2803,6 +2810,22 @@ "node": ">=6.0.0" } }, + "node_modules/ejs": { + "version": "3.1.10", + "resolved": "https://registry.npmjs.org/ejs/-/ejs-3.1.10.tgz", + "integrity": "sha512-UeJmFfOrAQS8OJWPZ4qtgHyWExa088/MtK5UEyoJGFH67cDEXkZSviOiKRCZ4Xij0zxI3JECgYs3oKx+AizQBA==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "jake": "^10.8.5" + }, + "bin": { + "ejs": "bin/cli.js" + }, + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/electron-to-chromium": { "version": "1.5.72", "resolved": "https://registry.npmjs.org/electron-to-chromium/-/electron-to-chromium-1.5.72.tgz", @@ -3416,6 +3439,39 @@ "node": "^10.12.0 || >=12.0.0" } }, + "node_modules/filelist": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/filelist/-/filelist-1.0.4.tgz", + "integrity": "sha512-w1cEuf3S+DrLCQL7ET6kz+gmlJdbq9J7yXCSjK/OZCPA+qEN1WyF4ZAf0YYJa4/shHJra2t/d/r8SV4Ji+x+8Q==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "minimatch": "^5.0.1" + } + }, + "node_modules/filelist/node_modules/brace-expansion": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", + "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "dev": true, + "license": "MIT", + "dependencies": { + "balanced-match": "^1.0.0" + } + }, + "node_modules/filelist/node_modules/minimatch": { + "version": "5.1.6", + "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-5.1.6.tgz", + "integrity": "sha512-lKwV/1brpG6mBUFHtb7NUmtABCb2WZZmm2wNiOA5hAb8VdCS4B3dtMWyvcoViccwAW/COERjXLt0zP1zXUN26g==", + "dev": true, + "license": "ISC", + "dependencies": { + "brace-expansion": "^2.0.1" + }, + "engines": { + "node": ">=10" + } + }, "node_modules/fill-range": { "version": "7.1.1", "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.1.1.tgz", @@ -3984,6 +4040,25 @@ "node": ">=8" } }, + "node_modules/jake": { + "version": "10.9.2", + "resolved": "https://registry.npmjs.org/jake/-/jake-10.9.2.tgz", + "integrity": "sha512-2P4SQ0HrLQ+fw6llpLnOaGAvN2Zu6778SJMrCUwns4fOoG9ayrTiZk3VV8sCPkVZF8ab0zksVpS8FDY5pRCNBA==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "async": "^3.2.3", + "chalk": "^4.0.2", + "filelist": "^1.0.4", + "minimatch": "^3.1.2" + }, + "bin": { + "jake": "bin/cli.js" + }, + "engines": { + "node": ">=10" + } + }, "node_modules/jest": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/jest/-/jest-29.7.0.tgz", @@ -5974,28 +6049,31 @@ } }, "node_modules/ts-jest": { - "version": "29.1.1", - "resolved": "https://registry.npmjs.org/ts-jest/-/ts-jest-29.1.1.tgz", - "integrity": "sha512-D6xjnnbP17cC85nliwGiL+tpoKN0StpgE0TeOjXQTU6MVCfsB4v7aW05CgQ/1OywGb0x/oy9hHFnN+sczTiRaA==", + "version": "29.2.5", + "resolved": "https://registry.npmjs.org/ts-jest/-/ts-jest-29.2.5.tgz", + "integrity": "sha512-KD8zB2aAZrcKIdGk4OwpJggeLcH1FgrICqDSROWqlnJXGCXK4Mn6FcdK2B6670Xr73lHMG1kHw8R87A0ecZ+vA==", "dev": true, + "license": "MIT", "dependencies": { - "bs-logger": "0.x", - "fast-json-stable-stringify": "2.x", + "bs-logger": "^0.2.6", + "ejs": "^3.1.10", + "fast-json-stable-stringify": "^2.1.0", "jest-util": "^29.0.0", "json5": "^2.2.3", - "lodash.memoize": "4.x", - "make-error": "1.x", - "semver": "^7.5.3", - "yargs-parser": "^21.0.1" + "lodash.memoize": "^4.1.2", + "make-error": "^1.3.6", + "semver": "^7.6.3", + "yargs-parser": "^21.1.1" }, "bin": { "ts-jest": "cli.js" }, "engines": { - "node": "^14.15.0 || ^16.10.0 || >=18.0.0" + "node": "^14.15.0 || ^16.10.0 || ^18.0.0 || >=20.0.0" }, "peerDependencies": { "@babel/core": ">=7.0.0-beta.0 <8", + "@jest/transform": "^29.0.0", "@jest/types": "^29.0.0", "babel-jest": "^29.0.0", "jest": "^29.0.0", @@ -6005,6 +6083,9 @@ "@babel/core": { "optional": true }, + "@jest/transform": { + "optional": true + }, "@jest/types": { "optional": true }, @@ -6146,16 +6227,17 @@ } }, "node_modules/typescript": { - "version": "4.9.5", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.9.5.tgz", - "integrity": "sha512-1FXk9E2Hm+QzZQ7z+McJiHL4NW1F2EzMu9Nq9i3zAaGqibafqYwCVU6WyWAuyQRRzOlxou8xZSyXLEN8oKj24g==", + "version": "5.7.3", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.7.3.tgz", + "integrity": "sha512-84MVSjMEHP+FQRPy3pX9sTVV/INIex71s9TL2Gm5FG/WG1SqXeKyZ0k7/blY/4FdOzI12CBy1vGc4og/eus0fw==", "devOptional": true, + "license": "Apache-2.0", "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" }, "engines": { - "node": ">=4.2.0" + "node": ">=14.17" } }, "node_modules/undici-types": { @@ -8191,15 +8273,21 @@ "sprintf-js": "~1.0.2" } }, + "async": { + "version": "3.2.6", + "resolved": "https://registry.npmjs.org/async/-/async-3.2.6.tgz", + "integrity": "sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA==", + "dev": true + }, "asynckit": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", "integrity": "sha1-x57Zf380y48robyXkLzDZkdLS3k=" }, "axios": { - "version": "1.7.7", - "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.7.tgz", - "integrity": "sha512-S4kL7XrjgBmvdGut0sN3yJxqYzrDOnivkBiN0OFs6hLiUam3UPvswUo0kqGyhqUZGEOytHyumEdXsAkgCOUf3Q==", + "version": "1.7.9", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.9.tgz", + "integrity": "sha512-LhLcE7Hbiryz8oMDdDptSrWowmB4Bl6RCt6sIJKpRB4XtVf0iEgewX3au/pJqm+Py1kCASkb/FFKjxQaLtxJvw==", "requires": { "follow-redirects": "^1.15.6", "form-data": "^4.0.0", @@ -8631,6 +8719,15 @@ "esutils": "^2.0.2" } }, + "ejs": { + "version": "3.1.10", + "resolved": "https://registry.npmjs.org/ejs/-/ejs-3.1.10.tgz", + "integrity": "sha512-UeJmFfOrAQS8OJWPZ4qtgHyWExa088/MtK5UEyoJGFH67cDEXkZSviOiKRCZ4Xij0zxI3JECgYs3oKx+AizQBA==", + "dev": true, + "requires": { + "jake": "^10.8.5" + } + }, "electron-to-chromium": { "version": "1.5.72", "resolved": "https://registry.npmjs.org/electron-to-chromium/-/electron-to-chromium-1.5.72.tgz", @@ -9053,6 +9150,35 @@ "flat-cache": "^3.0.4" } }, + "filelist": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/filelist/-/filelist-1.0.4.tgz", + "integrity": "sha512-w1cEuf3S+DrLCQL7ET6kz+gmlJdbq9J7yXCSjK/OZCPA+qEN1WyF4ZAf0YYJa4/shHJra2t/d/r8SV4Ji+x+8Q==", + "dev": true, + "requires": { + "minimatch": "^5.0.1" + }, + "dependencies": { + "brace-expansion": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", + "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "dev": true, + "requires": { + "balanced-match": "^1.0.0" + } + }, + "minimatch": { + "version": "5.1.6", + "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-5.1.6.tgz", + "integrity": "sha512-lKwV/1brpG6mBUFHtb7NUmtABCb2WZZmm2wNiOA5hAb8VdCS4B3dtMWyvcoViccwAW/COERjXLt0zP1zXUN26g==", + "dev": true, + "requires": { + "brace-expansion": "^2.0.1" + } + } + } + }, "fill-range": { "version": "7.1.1", "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.1.1.tgz", @@ -9454,6 +9580,18 @@ "istanbul-lib-report": "^3.0.0" } }, + "jake": { + "version": "10.9.2", + "resolved": "https://registry.npmjs.org/jake/-/jake-10.9.2.tgz", + "integrity": "sha512-2P4SQ0HrLQ+fw6llpLnOaGAvN2Zu6778SJMrCUwns4fOoG9ayrTiZk3VV8sCPkVZF8ab0zksVpS8FDY5pRCNBA==", + "dev": true, + "requires": { + "async": "^3.2.3", + "chalk": "^4.0.2", + "filelist": "^1.0.4", + "minimatch": "^3.1.2" + } + }, "jest": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/jest/-/jest-29.7.0.tgz", @@ -10892,19 +11030,20 @@ "requires": {} }, "ts-jest": { - "version": "29.1.1", - "resolved": "https://registry.npmjs.org/ts-jest/-/ts-jest-29.1.1.tgz", - "integrity": "sha512-D6xjnnbP17cC85nliwGiL+tpoKN0StpgE0TeOjXQTU6MVCfsB4v7aW05CgQ/1OywGb0x/oy9hHFnN+sczTiRaA==", + "version": "29.2.5", + "resolved": "https://registry.npmjs.org/ts-jest/-/ts-jest-29.2.5.tgz", + "integrity": "sha512-KD8zB2aAZrcKIdGk4OwpJggeLcH1FgrICqDSROWqlnJXGCXK4Mn6FcdK2B6670Xr73lHMG1kHw8R87A0ecZ+vA==", "dev": true, "requires": { - "bs-logger": "0.x", - "fast-json-stable-stringify": "2.x", + "bs-logger": "^0.2.6", + "ejs": "^3.1.10", + "fast-json-stable-stringify": "^2.1.0", "jest-util": "^29.0.0", "json5": "^2.2.3", - "lodash.memoize": "4.x", - "make-error": "1.x", - "semver": "^7.5.3", - "yargs-parser": "^21.0.1" + "lodash.memoize": "^4.1.2", + "make-error": "^1.3.6", + "semver": "^7.6.3", + "yargs-parser": "^21.1.1" } }, "ts-loader": { @@ -10990,9 +11129,9 @@ "dev": true }, "typescript": { - "version": "4.9.5", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.9.5.tgz", - "integrity": "sha512-1FXk9E2Hm+QzZQ7z+McJiHL4NW1F2EzMu9Nq9i3zAaGqibafqYwCVU6WyWAuyQRRzOlxou8xZSyXLEN8oKj24g==", + "version": "5.7.3", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.7.3.tgz", + "integrity": "sha512-84MVSjMEHP+FQRPy3pX9sTVV/INIex71s9TL2Gm5FG/WG1SqXeKyZ0k7/blY/4FdOzI12CBy1vGc4og/eus0fw==", "devOptional": true }, "undici-types": { diff --git a/package.json b/package.json index fc75760c..3553ec04 100644 --- a/package.json +++ b/package.json @@ -34,7 +34,7 @@ "0xSmartCrypto <0xsmartcrypto@gmail.com> (https://twitter.com/0xSmartCrypto)" ], "dependencies": { - "axios": "^1.6.2", + "axios": "^1.7.9", "isomorphic-ws": "^4.0.1", "nanoid": "^3.1.30", "ws": "^7.4.0" @@ -48,9 +48,9 @@ "eslint-plugin-prettier": "^5.2.1", "eslint-plugin-require-extensions": "^0.1.3", "eslint-plugin-simple-import-sort": "^12.1.1", - "jest": "^29.1.1", - "ts-jest": "^29.1.1", - "typescript": "^4.7.4" + "jest": "^29.7.0", + "ts-jest": "^29.1.2", + "typescript": "^5.7.3" }, "optionalDependencies": { "source-map-loader": "^2.0.0", diff --git a/src/index.ts b/src/index.ts index c8168d84..7d816396 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,16 +1,16 @@ -export * from './'; export * from './coinm-client'; -export * from './logger'; export * from './main-client'; export * from './portfolio-client'; export * from './types/coin'; export * from './types/futures'; +export * from './types/portfolio-margin'; export * from './types/shared'; export * from './types/spot'; export * from './types/websockets'; export * from './usdm-client'; +export * from './util/logger'; export * from './util/requestUtils'; export * from './util/typeGuards'; export * from './util/usdm'; -export * from './util/WsStore'; +export * from './util/websockets/WsStore'; export * from './websocket-client'; diff --git a/src/logger.ts b/src/logger.ts deleted file mode 100644 index ec59a7d9..00000000 --- a/src/logger.ts +++ /dev/null @@ -1,21 +0,0 @@ -export type LogParams = null | any; - -export const DefaultLogger = { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - silly: (...params: LogParams): void => {}, - debug: (...params: LogParams): void => { - console.log(new Date(), params); - }, - notice: (...params: LogParams): void => { - console.log(new Date(), params); - }, - info: (...params: LogParams): void => { - console.info(new Date(), params); - }, - warning: (...params: LogParams): void => { - console.error(new Date(), params); - }, - error: (...params: LogParams): void => { - console.error(new Date(), params); - }, -}; diff --git a/src/types/websockets/ws-api.ts b/src/types/websockets/ws-api.ts new file mode 100644 index 00000000..ed3b19db --- /dev/null +++ b/src/types/websockets/ws-api.ts @@ -0,0 +1,94 @@ +import { WS_KEY_MAP } from '../../util/websockets/websocket-util'; +import { WsKey } from './ws-general'; + +export type WSAPIOperation = 'order.create' | 'order.amend' | 'order.cancel'; + +export type WsOperation = + | 'subscribe' + | 'unsubscribe' + | 'auth' + | 'ping' + | 'pong'; + +export const WS_API_Operations: WSAPIOperation[] = [ + 'order.create', + 'order.amend', + 'order.cancel', +]; + +export interface WsRequestOperationBybit { + req_id: string; + op: WsOperation; + args?: (TWSTopic | string | number)[]; +} + +export interface WSAPIRequest< + TRequestParams = undefined, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + TWSOperation extends WSAPIOperation = any, +> { + reqId: string; + op: TWSOperation; + header: { + 'X-BAPI-TIMESTAMP': string; + 'X-BAPI-RECV-WINDOW': string; + // Referer: typeof APIID; + }; + args: [TRequestParams]; +} + +export interface WSAPIResponse< + TResponseData extends object = object, + TOperation extends WSAPIOperation = WSAPIOperation, +> { + wsKey: WsKey; + /** Auto-generated */ + reqId: string; + retCode: 0 | number; + retMsg: 'OK' | string; + op: TOperation; + data: TResponseData; + header?: { + 'X-Bapi-Limit': string; + 'X-Bapi-Limit-Status': string; + 'X-Bapi-Limit-Reset-Timestamp': string; + Traceid: string; + Timenow: string; + }; + connId: string; +} + +export type Exact = { + // This part says: if there's any key that's not in T, it's an error + [K: string]: never; +} & { + [K in keyof T]: T[K]; +}; + +/** + * List of operations supported for this WsKey (connection) + */ +export interface WsAPIWsKeyTopicMap { + [WS_KEY_MAP.v5PrivateTrade]: WSAPIOperation; +} + +/** + * Request parameters expected per operation + */ +export interface WsAPITopicRequestParamMap { + 'order.create': never; +} + +/** + * Response structure expected for each operation + */ +export interface WsAPIOperationResponseMap { + 'order.create': WSAPIResponse; + ping: { + retCode: 0 | number; + retMsg: 'OK' | string; + op: 'pong'; + data: [string]; + connId: string; + }; +} diff --git a/src/types/websockets/ws-general.ts b/src/types/websockets/ws-general.ts new file mode 100644 index 00000000..45c80f37 --- /dev/null +++ b/src/types/websockets/ws-general.ts @@ -0,0 +1,147 @@ +import { RestClientOptions } from '../../util/requestUtils'; +import { WS_KEY_MAP } from '../../util/websockets/websocket-util'; + +/** For spot markets, spotV3 is recommended */ +export type APIMarket = 'v5'; + +// Same as inverse futures +export type WsPublicInverseTopic = + | 'orderBookL2_25' + | 'orderBookL2_200' + | 'trade' + | 'insurance' + | 'instrument_info' + | 'klineV2'; + +export type WsPublicUSDTPerpTopic = + | 'orderBookL2_25' + | 'orderBookL2_200' + | 'trade' + | 'insurance' + | 'instrument_info' + | 'kline'; + +export type WsPublicSpotV1Topic = + | 'trade' + | 'realtimes' + | 'kline' + | 'depth' + | 'mergedDepth' + | 'diffDepth'; + +export type WsPublicSpotV2Topic = + | 'depth' + | 'kline' + | 'trade' + | 'bookTicker' + | 'realtimes'; + +export type WsPublicTopics = + | WsPublicInverseTopic + | WsPublicUSDTPerpTopic + | WsPublicSpotV1Topic + | WsPublicSpotV2Topic + | string; + +// Same as inverse futures +export type WsPrivateInverseTopic = + | 'position' + | 'execution' + | 'order' + | 'stop_order'; + +export type WsPrivateUSDTPerpTopic = + | 'position' + | 'execution' + | 'order' + | 'stop_order' + | 'wallet'; + +export type WsPrivateSpotTopic = + | 'outboundAccountInfo' + | 'executionReport' + | 'ticketInfo'; + +export type WsPrivateTopic = + | WsPrivateInverseTopic + | WsPrivateUSDTPerpTopic + | WsPrivateSpotTopic + | string; + +export type WsTopic = WsPublicTopics | WsPrivateTopic; + +/** This is used to differentiate between each of the available websocket streams (as bybit has multiple websockets) */ +export type WsKey = (typeof WS_KEY_MAP)[keyof typeof WS_KEY_MAP]; +export type WsMarket = 'all'; + +export interface WSClientConfigurableOptions { + /** Your API key */ + key?: string; + + /** Your API secret */ + secret?: string; + + /** + * Set to `true` to connect to Bybit's testnet environment. + * + * Notes: + * + * - If demo trading, `testnet` should be set to false! + * - If testing a strategy, use demo trading instead. Testnet market data is very different from real market conditions. + */ + testnet?: boolean; + + /** + * Set to `true` to connect to Bybit's V5 demo trading: https://bybit-exchange.github.io/docs/v5/demo + * + * Only the "V5" "market" is supported here. + */ + demoTrading?: boolean; + + /** + * The API group this client should connect to. The V5 market is currently used by default. + * + * Only the "V5" "market" is supported here. + */ + market?: APIMarket; + + /** Define a recv window when preparing a private websocket signature. This is in milliseconds, so 5000 == 5 seconds */ + recvWindow?: number; + + /** How often to check if the connection is alive */ + pingInterval?: number; + + /** How long to wait for a pong (heartbeat reply) before assuming the connection is dead */ + pongTimeout?: number; + + /** Delay in milliseconds before respawning the connection */ + reconnectTimeout?: number; + + restOptions?: RestClientOptions; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + requestOptions?: any; + + wsUrl?: string; + + /** + * Allows you to provide a custom "signMessage" function, e.g. to use node's much faster createHmac method + * + * Look in the examples folder for a demonstration on using node's createHmac instead. + */ + customSignMessageFn?: (message: string, secret: string) => Promise; +} + +/** + * WS configuration that's always defined, regardless of user configuration + * (usually comes from defaults if there's no user-provided values) + */ +export interface WebsocketClientOptions extends WSClientConfigurableOptions { + market: APIMarket; + pongTimeout: number; + pingInterval: number; + reconnectTimeout: number; + recvWindow: number; + authPrivateConnectionsOnConnect: boolean; + authPrivateRequests: boolean; +} diff --git a/src/util/BaseWSClient.ts b/src/util/BaseWSClient.ts new file mode 100644 index 00000000..7b36c4a6 --- /dev/null +++ b/src/util/BaseWSClient.ts @@ -0,0 +1,1276 @@ +/* eslint-disable max-len */ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import EventEmitter from 'events'; +import WebSocket from 'isomorphic-ws'; + +import { WsMarket } from '../types/websockets'; +import { WsOperation } from '../types/websockets/ws-api'; +import { + WebsocketClientOptions, + WSClientConfigurableOptions, +} from '../websocket-client'; +import { DefaultLogger } from './logger'; +import { + getNormalisedTopicRequests, + WS_LOGGER_CATEGORY, + WsTopicRequest, + WsTopicRequestOrStringTopic, +} from './websockets/websocket-util'; +import { safeTerminateWs } from './websockets/ws-utils'; +import { WsStore } from './websockets/WsStore'; +import { WsConnectionStateEnum } from './websockets/WsStore.types'; + +interface WSClientEventMap { + /** Connection opened. If this connection was previously opened and reconnected, expect the reconnected event instead */ + open: (evt: { wsKey: WsKey; event: any }) => void; + /** Reconnecting a dropped connection */ + reconnect: (evt: { wsKey: WsKey; event: any }) => void; + /** Successfully reconnected a connection that dropped */ + reconnected: (evt: { wsKey: WsKey; event: any }) => void; + /** Connection closed */ + close: (evt: { wsKey: WsKey; event: any }) => void; + /** Received reply to websocket command (e.g. after subscribing to topics) */ + response: ( + response: any & { wsKey: WsKey; isWSAPIResponse?: boolean }, + ) => void; + /** Received data for topic */ + update: (response: any & { wsKey: WsKey }) => void; + /** Exception from ws client OR custom listeners (e.g. if you throw inside your event handler) */ + error: (response: any & { wsKey: WsKey; isWSAPIResponse?: boolean }) => void; + /** Confirmation that a connection successfully authenticated */ + authenticated: (event: { + wsKey: WsKey; + event: any; + isWSAPIResponse?: boolean; + }) => void; +} + +export interface EmittableEvent { + eventType: 'response' | 'update' | 'error' | 'authenticated'; + event: TEvent; + isWSAPIResponse?: boolean; +} + +// Type safety for on and emit handlers: https://stackoverflow.com/a/61609010/880837 +export interface BaseWebsocketClient< + TWSKey extends string, + // eslint-disable-next-line @typescript-eslint/no-unused-vars, no-unused-vars + TWSRequestEvent extends object, +> { + on>( + event: U, + listener: WSClientEventMap[U], + ): this; + + emit>( + event: U, + ...args: Parameters[U]> + ): boolean; +} + +// interface TopicsPendingSubscriptions { +// wsKey: string; +// failedTopicsSubscriptions: Set; +// pendingTopicsSubscriptions: Set; +// resolver: TopicsPendingSubscriptionsResolver; +// rejector: TopicsPendingSubscriptionsRejector; +// } + +/** + * A midflight WS request event (e.g. subscribe to these topics). + * + * - requestKey: unique identifier for this request, if available. Can be anything as a string. + * - requestEvent: the raw request, as an object, that will be sent on the ws connection. This may contain multiple topics/requests in one object, if the exchange supports it. + */ +export interface MidflightWsRequestEvent { + requestKey: string; + requestEvent: TEvent; +} + +type TopicsPendingSubscriptionsResolver = ( + requests: TWSRequestEvent, +) => void; + +type TopicsPendingSubscriptionsRejector = ( + requests: TWSRequestEvent, + reason: string | object, +) => void; + +interface WsKeyPendingTopicSubscriptions { + requestData: TWSRequestEvent; + resolver: TopicsPendingSubscriptionsResolver; + rejector: TopicsPendingSubscriptionsRejector; +} + +/** + * Base WebSocket abstraction layer. Handles connections, tracking each connection as a unique "WS Key" + */ +// eslint-disable-next-line @typescript-eslint/no-unsafe-declaration-merging +export abstract class BaseWebsocketClient< + /** + * The WS connections supported by the client, each identified by a unique primary key + */ + TWSKey extends string, + TWSRequestEvent extends object, +> extends EventEmitter { + /** + * State store to track a list of topics (topic requests) we are expected to be subscribed to if reconnected + */ + private wsStore: WsStore>; + + protected logger: typeof DefaultLogger; + + protected options: WebsocketClientOptions; + + private wsApiRequestId: number = 0; + + private timeOffsetMs: number = 0; + + /** + * A nested wsKey->request key store. + * pendingTopicSubscriptionRequests[wsKey][requestKey] = WsKeyPendingTopicSubscriptions + */ + private pendingTopicSubscriptionRequests: Record< + string, + Record> + > = {}; + + constructor( + options?: WSClientConfigurableOptions, + logger?: typeof DefaultLogger, + ) { + super(); + + this.logger = logger || DefaultLogger; + this.wsStore = new WsStore(this.logger); + + this.options = { + // Some defaults: + testnet: false, + demoTrading: false, + + // Connect to V5 by default, if not defined by the user + market: 'v5', + + pongTimeout: 1000, + pingInterval: 10000, + reconnectTimeout: 500, + recvWindow: 5000, + + // Automatically send an authentication op/request after a connection opens, for private connections. + authPrivateConnectionsOnConnect: true, + // Individual requests do not require a signature, so this is disabled. + authPrivateRequests: false, + ...options, + }; + + // add default error handling so this doesn't crash node (if the user didn't set a handler) + // eslint-disable-next-line @typescript-eslint/no-empty-function + this.on('error', () => {}); + } + + /** + * Return true if this wsKey connection should automatically authenticate immediately after connecting + */ + protected abstract isAuthOnConnectWsKey(wsKey: TWSKey): boolean; + + protected abstract sendPingEvent(wsKey: TWSKey, ws: WebSocket): void; + + protected abstract sendPongEvent(wsKey: TWSKey, ws: WebSocket): void; + + protected abstract isWsPing(data: any): boolean; + + protected abstract isWsPong(data: any): boolean; + + protected abstract getWsAuthRequestEvent(wsKey: TWSKey): Promise; + + protected abstract isPrivateTopicRequest( + request: WsTopicRequest, + wsKey: TWSKey, + ): boolean; + + protected abstract getPrivateWSKeys(): TWSKey[]; + + protected abstract getWsUrl(wsKey: TWSKey): Promise; + + protected abstract getMaxTopicsPerSubscribeEvent( + wsKey: TWSKey, + ): number | null; + + /** + * @returns one or more correctly structured request events for performing a operations over WS. This can vary per exchange spec. + */ + protected abstract getWsRequestEvents( + market: WsMarket, + operation: WsOperation, + requests: WsTopicRequest[], + wsKey: TWSKey, + ): Promise[]>; + + /** + * Abstraction called to sort ws events into emittable event types (response to a request, data update, etc) + */ + protected abstract resolveEmittableEvents( + wsKey: TWSKey, + event: MessageEventLike, + ): EmittableEvent[]; + + /** + * Request connection of all dependent (public & private) websockets, instead of waiting for automatic connection by library + */ + protected abstract connectAll(): Promise[]; + + protected isPrivateWsKey(wsKey: TWSKey): boolean { + return this.getPrivateWSKeys().includes(wsKey); + } + + /** Returns auto-incrementing request ID, used to track promise references for async requests */ + protected getNewRequestId(): string { + return `${++this.wsApiRequestId}`; + } + + protected abstract sendWSAPIRequest( + wsKey: TWSKey, + channel: string, + params?: any, + ): Promise; + + protected abstract sendWSAPIRequest( + wsKey: TWSKey, + channel: string, + params: any, + ): Promise; + + public getTimeOffsetMs() { + return this.timeOffsetMs; + } + + public setTimeOffsetMs(newOffset: number) { + this.timeOffsetMs = newOffset; + } + + private getWsKeyPendingSubscriptionStore(wsKey: TWSKey) { + if (!this.pendingTopicSubscriptionRequests[wsKey]) { + this.pendingTopicSubscriptionRequests[wsKey] = {}; + } + + return this.pendingTopicSubscriptionRequests[wsKey]!; + } + + protected upsertPendingTopicSubscribeRequests( + wsKey: TWSKey, + requestData: MidflightWsRequestEvent, + ) { + // a unique identifier for this subscription request (e.g. csv of topics, or request id, etc) + const requestKey = requestData.requestKey; + + // Should not be possible to see a requestKey collision in the current design, since the req ID increments automatically with every request, so this should never be true, but just in case a future mistake happens... + + const pendingSubReqs = this.getWsKeyPendingSubscriptionStore(wsKey); + if (pendingSubReqs[requestKey]) { + throw new Error( + 'Implementation error: attempted to upsert pending topics with duplicate request ID!', + ); + } + + return new Promise( + ( + resolver: TopicsPendingSubscriptionsResolver, + rejector: TopicsPendingSubscriptionsRejector, + ) => { + const pendingSubReqs = this.getWsKeyPendingSubscriptionStore(wsKey); + pendingSubReqs[requestKey] = { + requestData: requestData.requestEvent, + resolver, + rejector, + }; + }, + ); + } + + protected removeTopicPendingSubscription(wsKey: TWSKey, requestKey: string) { + const pendingSubReqs = this.getWsKeyPendingSubscriptionStore(wsKey); + delete pendingSubReqs[requestKey]; + } + + private clearTopicsPendingSubscriptions( + wsKey: TWSKey, + rejectAll: boolean, + rejectReason: string, + ) { + if (rejectAll) { + const pendingSubReqs = this.getWsKeyPendingSubscriptionStore(wsKey); + + for (const requestKey in pendingSubReqs) { + const request = pendingSubReqs[requestKey]; + request?.rejector(request.requestData, rejectReason); + } + } + + this.pendingTopicSubscriptionRequests[wsKey] = {}; + } + + /** + * Resolve/reject the promise for a midflight request. + * + * This will typically execute before the event is emitted. + */ + protected updatePendingTopicSubscriptionStatus( + wsKey: TWSKey, + requestKey: string, + msg: object, + isTopicSubscriptionSuccessEvent: boolean, + ) { + const wsKeyPendingRequests = this.getWsKeyPendingSubscriptionStore(wsKey); + if (!wsKeyPendingRequests) { + return; + } + + const pendingSubscriptionRequest = wsKeyPendingRequests[requestKey]; + if (!pendingSubscriptionRequest) { + return; + } + + if (isTopicSubscriptionSuccessEvent) { + pendingSubscriptionRequest.resolver( + pendingSubscriptionRequest.requestData, + ); + } else { + pendingSubscriptionRequest.rejector( + pendingSubscriptionRequest.requestData, + msg, + ); + } + + this.removeTopicPendingSubscription(wsKey, requestKey); + } + + /** + * Don't call directly! Use subscribe() instead! + * + * Subscribe to one or more topics on a WS connection (identified by WS Key). + * + * - Topics are automatically cached + * - Connections are automatically opened, if not yet connected + * - Authentication is automatically handled + * - Topics are automatically resubscribed to, if something happens to the connection, unless you call unsubsribeTopicsForWsKey(topics, key). + * + * @param wsRequests array of topics to subscribe to + * @param wsKey ws key referring to the ws connection these topics should be subscribed on + */ + protected async subscribeTopicsForWsKey( + wsTopicRequests: WsTopicRequestOrStringTopic[], + wsKey: TWSKey, + ) { + const normalisedTopicRequests = getNormalisedTopicRequests(wsTopicRequests); + + // Store topics, so future automation (post-auth, post-reconnect) has everything needed to resubscribe automatically + for (const topic of normalisedTopicRequests) { + this.wsStore.addTopic(wsKey, topic); + } + + const isConnected = this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.CONNECTED, + ); + + const isConnectionInProgress = + this.wsStore.isConnectionAttemptInProgress(wsKey); + + // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect + if (!isConnected && !isConnectionInProgress) { + return this.connect(wsKey); + } + + // Subscribe should happen automatically once connected, nothing to do here after topics are added to wsStore. + if (!isConnected) { + /** + * Are we in the process of connection? Nothing to send yet. + */ + this.logger.trace( + 'WS not connected - requests queued for retry once connected.', + { + ...WS_LOGGER_CATEGORY, + wsKey, + wsTopicRequests, + }, + ); + return; + } + + // We're connected. Check if auth is needed and if already authenticated + const isPrivateConnection = this.isPrivateWsKey(wsKey); + const isAuthenticated = this.wsStore.get(wsKey)?.isAuthenticated; + if (isPrivateConnection && !isAuthenticated) { + /** + * If not authenticated yet and auth is required, don't request topics yet. + * + * Auth should already automatically be in progress, so no action needed from here. Topics will automatically subscribe post-auth success. + */ + return false; + } + + // Finally, request subscription to topics if the connection is healthy and ready + return this.requestSubscribeTopics(wsKey, normalisedTopicRequests); + } + + protected async unsubscribeTopicsForWsKey( + wsTopicRequests: WsTopicRequestOrStringTopic[], + wsKey: TWSKey, + ): Promise { + const normalisedTopicRequests = getNormalisedTopicRequests(wsTopicRequests); + + // Store topics, so future automation (post-auth, post-reconnect) has everything needed to resubscribe automatically + for (const topic of normalisedTopicRequests) { + this.wsStore.deleteTopic(wsKey, topic); + } + + const isConnected = this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.CONNECTED, + ); + + // If not connected, don't need to do anything. + // Removing the topic from the store is enough to stop it from being resubscribed to on reconnect. + if (!isConnected) { + return; + } + + // We're connected. Check if auth is needed and if already authenticated + const isPrivateConnection = this.isPrivateWsKey(wsKey); + const isAuthenticated = this.wsStore.get(wsKey)?.isAuthenticated; + if (isPrivateConnection && !isAuthenticated) { + /** + * If not authenticated yet and auth is required, don't need to do anything. + * We don't subscribe to topics until auth is complete anyway. + */ + return; + } + + // Finally, request subscription to topics if the connection is healthy and ready + return this.requestUnsubscribeTopics(wsKey, normalisedTopicRequests); + } + + /** + * Splits topic requests into two groups, public & private topic requests + */ + private sortTopicRequestsIntoPublicPrivate( + wsTopicRequests: WsTopicRequest[], + wsKey: TWSKey, + ): { + publicReqs: WsTopicRequest[]; + privateReqs: WsTopicRequest[]; + } { + const publicTopicRequests: WsTopicRequest[] = []; + const privateTopicRequests: WsTopicRequest[] = []; + + for (const topic of wsTopicRequests) { + if (this.isPrivateTopicRequest(topic, wsKey)) { + privateTopicRequests.push(topic); + } else { + publicTopicRequests.push(topic); + } + } + + return { + publicReqs: publicTopicRequests, + privateReqs: privateTopicRequests, + }; + } + + /** Get the WsStore that tracks websockets & topics */ + public getWsStore(): WsStore> { + return this.wsStore; + } + + public close(wsKey: TWSKey, force?: boolean) { + this.logger.info('Closing connection', { ...WS_LOGGER_CATEGORY, wsKey }); + this.setWsState(wsKey, WsConnectionStateEnum.CLOSING); + this.clearTimers(wsKey); + + const ws = this.getWs(wsKey); + ws?.close(); + if (force) { + safeTerminateWs(ws); + } + } + + public closeAll(force?: boolean) { + const keys = this.wsStore.getKeys(); + + this.logger.info(`Closing all ws connections: ${keys}`); + keys.forEach((key: TWSKey) => { + this.close(key, force); + }); + } + + public isConnected(wsKey: TWSKey): boolean { + return this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.CONNECTED, + ); + } + + /** + * Request connection to a specific websocket, instead of waiting for automatic connection. + */ + public async connect(wsKey: TWSKey): Promise { + try { + if (this.wsStore.isWsOpen(wsKey)) { + this.logger.error( + 'Refused to connect to ws with existing active connection', + { ...WS_LOGGER_CATEGORY, wsKey }, + ); + return { wsKey }; + } + + if ( + this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTING) + ) { + this.logger.error( + 'Refused to connect to ws, connection attempt already active', + { ...WS_LOGGER_CATEGORY, wsKey }, + ); + return; + } + + if ( + !this.wsStore.getConnectionState(wsKey) || + this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.INITIAL) + ) { + this.setWsState(wsKey, WsConnectionStateEnum.CONNECTING); + } + + if (!this.wsStore.getConnectionInProgressPromise(wsKey)) { + this.wsStore.createConnectionInProgressPromise(wsKey, false); + } + + const url = await this.getWsUrl(wsKey); + const ws = this.connectToWsUrl(url, wsKey); + + this.wsStore.setWs(wsKey, ws); + + return this.wsStore.getConnectionInProgressPromise(wsKey)?.promise; + } catch (err) { + this.parseWsError('Connection failed', err, wsKey); + this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!); + } + } + + private connectToWsUrl(url: string, wsKey: TWSKey): WebSocket { + this.logger.trace(`Opening WS connection to URL: ${url}`, { + ...WS_LOGGER_CATEGORY, + wsKey, + }); + + const agent = this.options.requestOptions?.agent; + const ws = new WebSocket(url, undefined, agent ? { agent } : undefined); + + ws.onopen = (event: any) => this.onWsOpen(event, wsKey); + ws.onmessage = (event: any) => this.onWsMessage(event, wsKey, ws); + ws.onerror = (event: any) => + this.parseWsError('Websocket onWsError', event, wsKey); + ws.onclose = (event: any) => this.onWsClose(event, wsKey); + + return ws; + } + + private parseWsError(context: string, error: any, wsKey: TWSKey) { + if (!error.message) { + this.logger.error(`${context} due to unexpected error: `, error); + this.emit('response', { ...error, wsKey }); + this.emit('error', { ...error, wsKey }); + return; + } + + switch (error.message) { + case 'Unexpected server response: 401': + this.logger.error(`${context} due to 401 authorization failure.`, { + ...WS_LOGGER_CATEGORY, + wsKey, + }); + break; + + default: + if ( + this.wsStore.getConnectionState(wsKey) !== + WsConnectionStateEnum.CLOSING + ) { + this.logger.error( + `${context} due to unexpected response error: "${ + error?.msg || error?.message || error + }"`, + { ...WS_LOGGER_CATEGORY, wsKey, error }, + ); + this.executeReconnectableClose(wsKey, 'unhandled onWsError'); + } else { + this.logger.info( + `${wsKey} socket forcefully closed. Will not reconnect.`, + ); + } + break; + } + + this.emit('response', { ...error, wsKey }); + this.emit('error', { ...error, wsKey }); + } + + /** Get a signature, build the auth request and send it */ + private async sendAuthRequest(wsKey: TWSKey): Promise { + try { + this.logger.trace('Sending auth request...', { + ...WS_LOGGER_CATEGORY, + wsKey, + }); + + await this.assertIsConnected(wsKey); + + if (!this.wsStore.getAuthenticationInProgressPromise(wsKey)) { + this.wsStore.createAuthenticationInProgressPromise(wsKey, false); + } + + const request = await this.getWsAuthRequestEvent(wsKey); + + // console.log('ws auth req', request); + + this.tryWsSend(wsKey, JSON.stringify(request)); + + return this.wsStore.getAuthenticationInProgressPromise(wsKey)?.promise; + } catch (e) { + this.logger.trace(e, { ...WS_LOGGER_CATEGORY, wsKey }); + } + } + + private reconnectWithDelay(wsKey: TWSKey, connectionDelayMs: number) { + this.clearTimers(wsKey); + + if (!this.wsStore.isConnectionAttemptInProgress(wsKey)) { + this.setWsState(wsKey, WsConnectionStateEnum.RECONNECTING); + } + + if (this.wsStore.get(wsKey)?.activeReconnectTimer) { + this.clearReconnectTimer(wsKey); + } + + this.wsStore.get(wsKey, true).activeReconnectTimer = setTimeout(() => { + this.logger.info('Reconnecting to websocket', { + ...WS_LOGGER_CATEGORY, + wsKey, + }); + this.clearReconnectTimer(wsKey); + this.connect(wsKey); + }, connectionDelayMs); + } + + private ping(wsKey: TWSKey) { + if (this.wsStore.get(wsKey, true).activePongTimer) { + return; + } + + this.clearPongTimer(wsKey); + + this.logger.trace('Sending ping', { ...WS_LOGGER_CATEGORY, wsKey }); + const ws = this.wsStore.get(wsKey, true).ws; + + if (!ws) { + this.logger.error( + `Unable to send ping for wsKey "${wsKey}" - no connection found`, + ); + return; + } + this.sendPingEvent(wsKey, ws); + + this.wsStore.get(wsKey, true).activePongTimer = setTimeout( + () => this.executeReconnectableClose(wsKey, 'Pong timeout'), + this.options.pongTimeout, + ); + } + + /** + * Closes a connection, if it's even open. If open, this will trigger a reconnect asynchronously. + * If closed, trigger a reconnect immediately + */ + private executeReconnectableClose(wsKey: TWSKey, reason: string) { + this.logger.info(`${reason} - closing socket to reconnect`, { + ...WS_LOGGER_CATEGORY, + wsKey, + reason, + }); + + const wasOpen = this.wsStore.isWsOpen(wsKey); + + this.clearPingTimer(wsKey); + this.clearPongTimer(wsKey); + + const ws = this.getWs(wsKey); + + if (ws) { + ws.close(); + safeTerminateWs(ws); + } + + if (!wasOpen) { + this.logger.info( + `${reason} - socket already closed - trigger immediate reconnect`, + { + ...WS_LOGGER_CATEGORY, + wsKey, + reason, + }, + ); + this.reconnectWithDelay(wsKey, this.options.reconnectTimeout); + } + } + + private clearTimers(wsKey: TWSKey) { + this.clearPingTimer(wsKey); + this.clearPongTimer(wsKey); + this.clearReconnectTimer(wsKey); + } + + // Send a ping at intervals + private clearPingTimer(wsKey: TWSKey) { + const wsState = this.wsStore.get(wsKey); + if (wsState?.activePingTimer) { + clearInterval(wsState.activePingTimer); + wsState.activePingTimer = undefined; + } + } + + // Expect a pong within a time limit + private clearPongTimer(wsKey: TWSKey) { + const wsState = this.wsStore.get(wsKey); + if (wsState?.activePongTimer) { + clearTimeout(wsState.activePongTimer); + wsState.activePongTimer = undefined; + // this.logger.trace(`Cleared pong timeout for "${wsKey}"`); + } else { + // this.logger.trace(`No active pong timer for "${wsKey}"`); + } + } + + private clearReconnectTimer(wsKey: TWSKey) { + const wsState = this.wsStore.get(wsKey); + if (wsState?.activeReconnectTimer) { + clearTimeout(wsState.activeReconnectTimer); + wsState.activeReconnectTimer = undefined; + } + } + + /** + * Returns a list of string events that can be individually sent upstream to complete subscribing/unsubscribing/etc to these topics + * + * If events are an object, these should be stringified (`return JSON.stringify(event);`) + * Each event returned by this will be sent one at a time + * + * Events are automatically split into smaller batches, by this method, if needed. + */ + protected async getWsOperationEventsForTopics( + topics: WsTopicRequest[], + wsKey: TWSKey, + operation: WsOperation, + ): Promise[]> { + // console.log(new Date(), `called getWsSubscribeEventsForTopics()`, topics); + // console.trace(); + if (!topics.length) { + return []; + } + + // Events that are ready to send (usually stringified JSON) + const requestEvents: MidflightWsRequestEvent[] = []; + const market: WsMarket = 'all'; + + const maxTopicsPerEvent = this.getMaxTopicsPerSubscribeEvent(wsKey); + if ( + maxTopicsPerEvent && + maxTopicsPerEvent !== null && + topics.length > maxTopicsPerEvent + ) { + for (let i = 0; i < topics.length; i += maxTopicsPerEvent) { + const batch = topics.slice(i, i + maxTopicsPerEvent); + const subscribeRequestEvents = await this.getWsRequestEvents( + market, + operation, + batch, + wsKey, + ); + + requestEvents.push(...subscribeRequestEvents); + } + + return requestEvents; + } + + const subscribeRequestEvents = await this.getWsRequestEvents( + market, + operation, + topics, + wsKey, + ); + + return subscribeRequestEvents; + } + + /** + * Simply builds and sends subscribe events for a list of topics for a ws key + * + * @private Use the `subscribe(topics)` or `subscribeTopicsForWsKey(topics, wsKey)` method to subscribe to topics. + */ + private async requestSubscribeTopics( + wsKey: TWSKey, + wsTopicRequests: WsTopicRequest[], + ) { + if (!wsTopicRequests.length) { + return; + } + + // Automatically splits requests into smaller batches, if needed + const subscribeWsMessages = await this.getWsOperationEventsForTopics( + wsTopicRequests, + wsKey, + 'subscribe', + ); + + this.logger.trace( + `Subscribing to ${wsTopicRequests.length} "${wsKey}" topics in ${subscribeWsMessages.length} batches.`, // Events: "${JSON.stringify(topics)}" + ); + + // console.log(`batches: `, JSON.stringify(subscribeWsMessages, null, 2)); + + const promises: Promise[] = []; + + for (const midflightRequest of subscribeWsMessages) { + const wsMessage = midflightRequest.requestEvent; + + promises.push( + this.upsertPendingTopicSubscribeRequests(wsKey, midflightRequest), + ); + + this.logger.trace( + `Sending batch via message: "${JSON.stringify(wsMessage)}"`, + ); + this.tryWsSend(wsKey, JSON.stringify(wsMessage)); + } + + this.logger.trace( + `Finished subscribing to ${wsTopicRequests.length} "${wsKey}" topics in ${subscribeWsMessages.length} batches.`, + ); + + return Promise.all(promises); + } + + /** + * Simply builds and sends unsubscribe events for a list of topics for a ws key + * + * @private Use the `unsubscribe(topics)` method to unsubscribe from topics. Send WS message to unsubscribe from topics. + */ + private async requestUnsubscribeTopics( + wsKey: TWSKey, + wsTopicRequests: WsTopicRequest[], + ) { + if (!wsTopicRequests.length) { + return; + } + + const subscribeWsMessages = await this.getWsOperationEventsForTopics( + wsTopicRequests, + wsKey, + 'unsubscribe', + ); + + this.logger.trace( + `Unsubscribing to ${wsTopicRequests.length} "${wsKey}" topics in ${subscribeWsMessages.length} batches. Events: "${JSON.stringify(wsTopicRequests)}"`, + ); + + const promises: Promise[] = []; + + for (const midflightRequest of subscribeWsMessages) { + const wsMessage = midflightRequest.requestEvent; + + promises.push( + this.upsertPendingTopicSubscribeRequests(wsKey, midflightRequest), + ); + + this.logger.trace(`Sending batch via message: "${wsMessage}"`); + this.tryWsSend(wsKey, JSON.stringify(wsMessage)); + } + + this.logger.trace( + `Finished unsubscribing to ${wsTopicRequests.length} "${wsKey}" topics in ${subscribeWsMessages.length} batches.`, + ); + + return Promise.all(promises); + } + + /** + * Try sending a string event on a WS connection (identified by the WS Key) + */ + public tryWsSend( + wsKey: TWSKey, + wsMessage: string, + throwExceptions?: boolean, + ) { + try { + this.logger.trace('Sending upstream ws message: ', { + ...WS_LOGGER_CATEGORY, + wsMessage, + wsKey, + }); + if (!wsKey) { + throw new Error( + 'Cannot send message due to no known websocket for this wsKey', + ); + } + const ws = this.getWs(wsKey); + if (!ws) { + throw new Error( + `${wsKey} socket not connected yet, call "connectAll()" first then try again when the "open" event arrives`, + ); + } + ws.send(wsMessage); + } catch (e) { + this.logger.error('Failed to send WS message', { + ...WS_LOGGER_CATEGORY, + wsMessage, + wsKey, + exception: e, + }); + if (throwExceptions) { + throw e; + } + } + } + + private async onWsOpen(event: any, wsKey: TWSKey) { + const isFreshConnectionAttempt = this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.CONNECTING, + ); + + const isReconnectionAttempt = this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.RECONNECTING, + ); + + if (isFreshConnectionAttempt) { + this.logger.info('Websocket connected', { + ...WS_LOGGER_CATEGORY, + wsKey, + testnet: this.options.testnet === true, + market: this.options.market, + }); + + this.emit('open', { wsKey, event }); + } else if (isReconnectionAttempt) { + this.logger.info('Websocket reconnected', { + ...WS_LOGGER_CATEGORY, + wsKey, + testnet: this.options.testnet === true, + market: this.options.market, + }); + + this.emit('reconnected', { wsKey, event }); + } + + this.setWsState(wsKey, WsConnectionStateEnum.CONNECTED); + + this.logger.trace('Enabled ping timer', { ...WS_LOGGER_CATEGORY, wsKey }); + this.wsStore.get(wsKey, true)!.activePingTimer = setInterval( + () => this.ping(wsKey), + this.options.pingInterval, + ); + + // Resolve & cleanup deferred "connection attempt in progress" promise + try { + const connectionInProgressPromise = + this.wsStore.getConnectionInProgressPromise(wsKey); + if (connectionInProgressPromise?.resolve) { + connectionInProgressPromise.resolve({ + wsKey, + }); + } + } catch (e) { + this.logger.error( + 'Exception trying to resolve "connectionInProgress" promise', + ); + } + + // Remove before continuing, in case there's more requests queued + this.wsStore.removeConnectingInProgressPromise(wsKey); + + // Reconnect to topics known before it connected + const { privateReqs, publicReqs } = this.sortTopicRequestsIntoPublicPrivate( + [...this.wsStore.getTopics(wsKey)], + wsKey, + ); + + // Request sub to public topics, if any + this.requestSubscribeTopics(wsKey, publicReqs); + + // Request sub to private topics, if auth on connect isn't needed + // Else, this is automatic after authentication is successfully confirmed + if (!this.options.authPrivateConnectionsOnConnect) { + this.requestSubscribeTopics(wsKey, privateReqs); + } + + // Some websockets require an auth packet to be sent after opening the connection + if ( + this.isAuthOnConnectWsKey(wsKey) && + this.options.authPrivateConnectionsOnConnect + ) { + await this.sendAuthRequest(wsKey); + } + } + + /** + * Handle subscription to private topics _after_ authentication successfully completes asynchronously. + * + * Only used for exchanges that require auth before sending private topic subscription requests + */ + private onWsAuthenticated(wsKey: TWSKey, event: unknown) { + const wsState = this.wsStore.get(wsKey, true); + wsState.isAuthenticated = true; + + // Resolve & cleanup deferred "connection attempt in progress" promise + try { + const inProgressPromise = + this.wsStore.getAuthenticationInProgressPromise(wsKey); + + if (inProgressPromise?.resolve) { + inProgressPromise.resolve({ + wsKey, + event, + }); + } + } catch (e) { + this.logger.error( + 'Exception trying to resolve "connectionInProgress" promise', + ); + } + + // Remove before continuing, in case there's more requests queued + this.wsStore.removeAuthenticationInProgressPromise(wsKey); + + if (this.options.authPrivateConnectionsOnConnect) { + const topics = [...this.wsStore.getTopics(wsKey)]; + const privateTopics = topics.filter((topic) => + this.isPrivateTopicRequest(topic, wsKey), + ); + + if (privateTopics.length) { + this.subscribeTopicsForWsKey(privateTopics, wsKey); + } + } + } + + private onWsMessage(event: unknown, wsKey: TWSKey, ws: WebSocket) { + try { + // console.log('onMessageRaw: ', (event as any).data); + // any message can clear the pong timer - wouldn't get a message if the ws wasn't working + this.clearPongTimer(wsKey); + + if (this.isWsPong(event)) { + this.logger.trace('Received pong', { + ...WS_LOGGER_CATEGORY, + wsKey, + event: (event as any)?.data, + }); + return; + } + + if (this.isWsPing(event)) { + this.logger.trace('Received ping', { + ...WS_LOGGER_CATEGORY, + wsKey, + event, + }); + this.sendPongEvent(wsKey, ws); + return; + } + + if (isMessageEvent(event)) { + const data = event.data; + const dataType = event.type; + + const emittableEvents = this.resolveEmittableEvents(wsKey, event); + + if (!emittableEvents.length) { + // console.log(`raw event: `, { data, dataType, emittableEvents }); + this.logger.error( + 'Unhandled/unrecognised ws event message - returned no emittable data', + { + ...WS_LOGGER_CATEGORY, + message: data || 'no message', + dataType, + event, + wsKey, + }, + ); + + return this.emit('update', { ...event, wsKey }); + } + + for (const emittable of emittableEvents) { + // if (emittable.event?.op) { + // console.log('emittable: ', emittable); + // } + + if (this.isWsPong(emittable)) { + this.logger.trace('Received pong2', { + ...WS_LOGGER_CATEGORY, + wsKey, + data, + }); + continue; + } + const emittableFinalEvent = { + ...emittable.event, + wsKey, + isWSAPIResponse: emittable.isWSAPIResponse, + }; + + if (emittable.eventType === 'authenticated') { + this.logger.trace('Successfully authenticated', { + ...WS_LOGGER_CATEGORY, + wsKey, + emittable, + }); + this.emit(emittable.eventType, emittableFinalEvent); + this.onWsAuthenticated(wsKey, emittable.event); + continue; + } + + this.emit(emittable.eventType, emittableFinalEvent); + } + + return; + } + + this.logger.error( + 'Unhandled/unrecognised ws event message - unexpected message format', + { + ...WS_LOGGER_CATEGORY, + message: event || 'no message', + event, + wsKey, + }, + ); + } catch (e) { + this.logger.error('Failed to parse ws event message', { + ...WS_LOGGER_CATEGORY, + error: e, + event, + wsKey, + }); + } + } + + private onWsClose(event: unknown, wsKey: TWSKey) { + this.logger.info('Websocket connection closed', { + ...WS_LOGGER_CATEGORY, + wsKey, + }); + + const wsState = this.wsStore.get(wsKey, true); + wsState.isAuthenticated = false; + + if ( + this.wsStore.getConnectionState(wsKey) !== WsConnectionStateEnum.CLOSING + ) { + // clean up any pending promises for this connection + this.getWsStore().rejectAllDeferredPromises( + wsKey, + 'connection lost, reconnecting', + ); + + this.clearTopicsPendingSubscriptions(wsKey, true, 'WS Closed'); + + this.setWsState(wsKey, WsConnectionStateEnum.INITIAL); + + this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!); + this.emit('reconnect', { wsKey, event }); + } else { + // clean up any pending promises for this connection + this.getWsStore().rejectAllDeferredPromises(wsKey, 'disconnected'); + this.setWsState(wsKey, WsConnectionStateEnum.INITIAL); + this.emit('close', { wsKey, event }); + } + } + + private getWs(wsKey: TWSKey) { + return this.wsStore.getWs(wsKey); + } + + private setWsState(wsKey: TWSKey, state: WsConnectionStateEnum) { + this.wsStore.setConnectionState(wsKey, state); + } + + /** + * Promise-driven method to assert that a ws has successfully connected (will await until connection is open) + */ + public async assertIsConnected(wsKey: TWSKey): Promise { + const isConnected = this.getWsStore().isConnectionState( + wsKey, + WsConnectionStateEnum.CONNECTED, + ); + + if (!isConnected) { + const inProgressPromise = + this.getWsStore().getConnectionInProgressPromise(wsKey); + + // Already in progress? Await shared promise and retry + if (inProgressPromise) { + this.logger.trace('assertIsConnected(): awaiting...'); + await inProgressPromise.promise; + this.logger.trace('assertIsConnected(): connected!'); + return inProgressPromise.promise; + } + + // Start connection, it should automatically store/return a promise. + this.logger.trace('assertIsConnected(): connecting...'); + + await this.connect(wsKey); + + this.logger.trace('assertIsConnected(): newly connected!'); + } + } + + /** + * Promise-driven method to assert that a ws has been successfully authenticated (will await until auth is confirmed) + */ + public async assertIsAuthenticated(wsKey: TWSKey): Promise { + const isConnected = this.getWsStore().isConnectionState( + wsKey, + WsConnectionStateEnum.CONNECTED, + ); + + if (!isConnected) { + this.logger.trace('assertIsAuthenticated(): connecting...'); + await this.assertIsConnected(wsKey); + } + + const inProgressPromise = + this.getWsStore().getAuthenticationInProgressPromise(wsKey); + + // Already in progress? Await shared promise and retry + if (inProgressPromise) { + this.logger.trace('assertIsAuthenticated(): awaiting...'); + await inProgressPromise.promise; + this.logger.trace('assertIsAuthenticated(): authenticated!'); + return; + } + + const isAuthenticated = this.wsStore.get(wsKey)?.isAuthenticated; + if (isAuthenticated) { + this.logger.trace('assertIsAuthenticated(): ok'); + return; + } + + // Start authentication, it should automatically store/return a promise. + this.logger.trace('assertIsAuthenticated(): authenticating...'); + + await this.sendAuthRequest(wsKey); + + this.logger.trace('assertIsAuthenticated(): newly authenticated!'); + } +} diff --git a/src/util/WsStore.ts b/src/util/WsStore.ts deleted file mode 100644 index eb5b2278..00000000 --- a/src/util/WsStore.ts +++ /dev/null @@ -1,167 +0,0 @@ -import WebSocket from 'isomorphic-ws'; - -import { DefaultLogger } from '../logger'; -import { WsKey } from '../websocket-client'; - -export enum WsConnectionStateEnum { - INITIAL = 0, - CONNECTING = 1, - CONNECTED = 2, - CLOSING = 3, - RECONNECTING = 4, - // ERROR = 5, -} - -/** A "topic" is always a string */ -type WsTopic = string; - -/** - * A "Set" is used to ensure we only subscribe to a topic once (tracking a list of unique topics we're expected to be connected to) - * Note: Accurate duplicate tracking only works for plaintext topics. E.g. JSON objects may not be seen as duplicates if keys are in different orders. If that's needed, check the FTX implementation. - */ -type WsTopicList = Set; - -interface WsStoredState { - /** The currently active websocket connection */ - ws?: WebSocket; - /** The current lifecycle state of the connection (enum) */ - connectionState?: WsConnectionStateEnum; - /** A timer that will send an upstream heartbeat (ping) when it expires */ - activePingTimer?: ReturnType | undefined; - /** A timer tracking that an upstream heartbeat was sent, expecting a reply before it expires */ - activePongTimer?: ReturnType | undefined; - /** If a reconnection is in progress, this will have the timer for the delayed reconnect */ - activeReconnectTimer?: ReturnType | undefined; - /** - * All the topics we are expected to be subscribed to (and we automatically resubscribe to if the connection drops) - */ - subscribedTopics: WsTopicList; -} - -export default class WsStore { - private wsState: Record; - - private logger: typeof DefaultLogger; - - constructor(logger: typeof DefaultLogger) { - this.logger = logger || DefaultLogger; - this.wsState = {}; - } - - /** Get WS stored state for key, optionally create if missing */ - get(key: WsKey, createIfMissing?: true): WsStoredState; - - get(key: WsKey, createIfMissing?: false): WsStoredState | undefined; - - get(key: WsKey, createIfMissing?: boolean): WsStoredState | undefined { - if (this.wsState[key]) { - return this.wsState[key]; - } - - if (createIfMissing) { - return this.create(key); - } - } - - getKeys(): WsKey[] { - return Object.keys(this.wsState) as WsKey[]; - } - - create(key: WsKey): WsStoredState | undefined { - if (this.hasExistingActiveConnection(key)) { - this.logger.warning( - 'WsStore setConnection() overwriting existing open connection: ', - this.getWs(key), - ); - } - this.wsState[key] = { - subscribedTopics: new Set(), - connectionState: WsConnectionStateEnum.INITIAL, - }; - return this.get(key); - } - - delete(key: WsKey) { - if (this.hasExistingActiveConnection(key)) { - const ws = this.getWs(key); - this.logger.warning( - 'WsStore deleting state for connection still open: ', - ws, - ); - ws?.close(); - } - delete this.wsState[key]; - } - - /* connection websocket */ - - hasExistingActiveConnection(key: WsKey) { - return this.get(key) && this.isWsOpen(key); - } - - getWs(key: WsKey): WebSocket | undefined { - return this.get(key)?.ws; - } - - setWs(key: WsKey, wsConnection: WebSocket): WebSocket { - if (this.isWsOpen(key)) { - this.logger.warning( - 'WsStore setConnection() overwriting existing open connection: ', - this.getWs(key), - ); - } - this.get(key, true)!.ws = wsConnection; - return wsConnection; - } - - /* connection state */ - - isWsOpen(key: WsKey): boolean { - const existingConnection = this.getWs(key); - return ( - !!existingConnection && - existingConnection.readyState === existingConnection.OPEN - ); - } - - getConnectionState(key: WsKey): WsConnectionStateEnum { - return this.get(key, true)!.connectionState!; - } - - setConnectionState(key: WsKey, state: WsConnectionStateEnum) { - this.get(key, true)!.connectionState = state; - } - - isConnectionState(key: WsKey, state: WsConnectionStateEnum): boolean { - return this.getConnectionState(key) === state; - } - - isWsConnecting(key: WsKey): boolean { - return ( - this.isConnectionState(key, WsConnectionStateEnum.CONNECTING) || - this.isConnectionState(key, WsConnectionStateEnum.RECONNECTING) - ); - } - - /* subscribed topics */ - - getTopics(key: WsKey): WsTopicList { - return this.get(key, true)!.subscribedTopics; - } - - getTopicsByKey(): Record { - const result = {}; - for (const refKey in this.wsState) { - result[refKey] = this.getTopics(refKey); - } - return result; - } - - addTopic(key: WsKey, topic: WsTopic) { - return this.getTopics(key).add(topic); - } - - deleteTopic(key: WsKey, topic: WsTopic) { - return this.getTopics(key).delete(topic); - } -} diff --git a/src/util/logger.ts b/src/util/logger.ts new file mode 100644 index 00000000..3c30d766 --- /dev/null +++ b/src/util/logger.ts @@ -0,0 +1,18 @@ +/* eslint-disable @typescript-eslint/no-unused-vars */ + +export type LogParams = null | any; + +export type DefaultLogger = typeof DefaultLogger; + +export const DefaultLogger = { + /** Ping/pong events and other raw messages that might be noisy. Enable this while troubleshooting. */ + trace: (..._params: LogParams): void => { + // console.log(_params); + }, + info: (...params: LogParams): void => { + console.info(params); + }, + error: (...params: LogParams): void => { + console.error(params); + }, +}; diff --git a/src/util/typeGuards.ts b/src/util/typeGuards.ts index 83709dc2..7ec7ac14 100644 --- a/src/util/typeGuards.ts +++ b/src/util/typeGuards.ts @@ -30,6 +30,10 @@ import { WsUserDataEvents, } from '../types/websockets'; +export function neverGuard(x: never, msg: string): Error { + return new Error(`Unhandled value exception "${x}", ${msg}`); +} + /** * Use type guards to narrow down types with minimal efforts. * diff --git a/src/util/webCryptoAPI.ts b/src/util/webCryptoAPI.ts new file mode 100644 index 00000000..2743aac8 --- /dev/null +++ b/src/util/webCryptoAPI.ts @@ -0,0 +1,84 @@ +import { neverGuard } from './typeGuards'; + +function bufferToB64(buffer: ArrayBuffer): string { + let binary = ''; + const bytes = new Uint8Array(buffer); + const len = bytes.byteLength; + for (let i = 0; i < len; i++) { + binary += String.fromCharCode(bytes[i]); + } + return globalThis.btoa(binary); +} + +export type SignEncodeMethod = 'hex' | 'base64'; +export type SignAlgorithm = 'SHA-256' | 'SHA-512'; + +/** + * Similar to node crypto's `createHash()` function + */ +export async function hashMessage( + message: string, + method: SignEncodeMethod, + algorithm: SignAlgorithm, +): Promise { + const encoder = new TextEncoder(); + + const buffer = await globalThis.crypto.subtle.digest( + algorithm, + encoder.encode(message), + ); + + switch (method) { + case 'hex': { + return Array.from(new Uint8Array(buffer)) + .map((byte) => byte.toString(16).padStart(2, '0')) + .join(''); + } + case 'base64': { + return bufferToB64(buffer); + } + default: { + throw neverGuard(method, `Unhandled sign method: "${method}"`); + } + } +} + +/** + * Sign a message, with a secret, using the Web Crypto API + */ +export async function signMessage( + message: string, + secret: string, + method: SignEncodeMethod, + algorithm: SignAlgorithm, +): Promise { + const encoder = new TextEncoder(); + + const key = await globalThis.crypto.subtle.importKey( + 'raw', + encoder.encode(secret), + { name: 'HMAC', hash: algorithm }, + false, + ['sign'], + ); + + const buffer = await globalThis.crypto.subtle.sign( + 'HMAC', + key, + encoder.encode(message), + ); + + switch (method) { + case 'hex': { + return Array.from(new Uint8Array(buffer)) + .map((byte) => byte.toString(16).padStart(2, '0')) + .join(''); + } + case 'base64': { + return bufferToB64(buffer); + } + default: { + throw neverGuard(method, `Unhandled sign method: "${method}"`); + } + } +} diff --git a/src/util/websockets/WsStore.ts b/src/util/websockets/WsStore.ts new file mode 100644 index 00000000..b34d8759 --- /dev/null +++ b/src/util/websockets/WsStore.ts @@ -0,0 +1,421 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import WebSocket from 'isomorphic-ws'; + +import { DefaultLogger } from '../logger'; +import { + DeferredPromise, + WSConnectedResult, + WsConnectionStateEnum, + WsStoredState, +} from './WsStore.types'; + +/** + * Simple comparison of two objects, only checks 1-level deep (nested objects won't match) + */ +export function isDeepObjectMatch(object1: unknown, object2: unknown): boolean { + if (typeof object1 === 'string' && typeof object2 === 'string') { + return object1 === object2; + } + + if (typeof object1 !== 'object' || typeof object2 !== 'object') { + return false; + } + + for (const key in object1) { + const value1 = (object1 as any)[key]; + const value2 = (object2 as any)[key]; + + if (value1 !== value2) { + return false; + } + } + return true; +} + +export const DEFERRED_PROMISE_REF = { + CONNECTION_IN_PROGRESS: 'CONNECTION_IN_PROGRESS', + AUTHENTICATION_IN_PROGRESS: 'AUTHENTICATION_IN_PROGRESS', +} as const; + +type DeferredPromiseRef = + (typeof DEFERRED_PROMISE_REF)[keyof typeof DEFERRED_PROMISE_REF]; + +export class WsStore< + WsKey extends string, + TWSTopicSubscribeEventArgs extends string | object, +> { + private wsState: Record> = + {}; + + private logger: DefaultLogger; + + constructor(logger: DefaultLogger) { + this.logger = logger || DefaultLogger; + } + + /** Get WS stored state for key, optionally create if missing */ + get( + key: WsKey, + createIfMissing?: true, + ): WsStoredState; + + get( + key: WsKey, + createIfMissing?: false, + ): WsStoredState | undefined; + + get( + key: WsKey, + createIfMissing?: boolean, + ): WsStoredState | undefined { + if (this.wsState[key]) { + return this.wsState[key]; + } + + if (createIfMissing) { + return this.create(key); + } + } + + getKeys(): WsKey[] { + return Object.keys(this.wsState) as WsKey[]; + } + + create(key: WsKey): WsStoredState | undefined { + if (this.hasExistingActiveConnection(key)) { + this.logger.info( + 'WsStore setConnection() overwriting existing open connection: ', + this.getWs(key), + ); + } + this.wsState[key] = { + subscribedTopics: new Set(), + connectionState: WsConnectionStateEnum.INITIAL, + deferredPromiseStore: {}, + }; + return this.get(key); + } + + delete(key: WsKey): void { + // TODO: should we allow this at all? Perhaps block this from happening... + if (this.hasExistingActiveConnection(key)) { + const ws = this.getWs(key); + this.logger.info( + 'WsStore deleting state for connection still open: ', + ws, + ); + ws?.close(); + } + delete this.wsState[key]; + } + + /* connection websocket */ + + hasExistingActiveConnection(key: WsKey): boolean { + return this.get(key) && this.isWsOpen(key); + } + + getWs(key: WsKey): WebSocket | undefined { + return this.get(key)?.ws; + } + + setWs(key: WsKey, wsConnection: WebSocket): WebSocket { + if (this.isWsOpen(key)) { + this.logger.info( + 'WsStore setConnection() overwriting existing open connection: ', + this.getWs(key), + ); + } + + this.get(key, true).ws = wsConnection; + return wsConnection; + } + + /** + * deferred promises + */ + + getDeferredPromise( + wsKey: WsKey, + promiseRef: string | DeferredPromiseRef, + ): DeferredPromise | undefined { + const storeForKey = this.get(wsKey); + if (!storeForKey) { + return; + } + + const deferredPromiseStore = storeForKey.deferredPromiseStore; + return deferredPromiseStore[promiseRef]; + } + + createDeferredPromise( + wsKey: WsKey, + promiseRef: string | DeferredPromiseRef, + throwIfExists: boolean, + ): DeferredPromise { + const existingPromise = this.getDeferredPromise( + wsKey, + promiseRef, + ); + if (existingPromise) { + if (throwIfExists) { + throw new Error(`Promise exists for "${wsKey}"`); + } else { + // console.log('existing promise'); + return existingPromise; + } + } + + // console.log('create promise'); + const createIfMissing = true; + const storeForKey = this.get(wsKey, createIfMissing); + + // TODO: Once stable, use Promise.withResolvers in future + const deferredPromise: DeferredPromise = {}; + + deferredPromise.promise = new Promise((resolve, reject) => { + deferredPromise.resolve = resolve; + deferredPromise.reject = reject; + }); + + const deferredPromiseStore = storeForKey.deferredPromiseStore; + + deferredPromiseStore[promiseRef] = deferredPromise; + + return deferredPromise; + } + + resolveDeferredPromise( + wsKey: WsKey, + promiseRef: string | DeferredPromiseRef, + value: unknown, + removeAfter: boolean, + ): void { + const promise = this.getDeferredPromise(wsKey, promiseRef); + if (promise?.resolve) { + promise.resolve(value); + } + if (removeAfter) { + this.removeDeferredPromise(wsKey, promiseRef); + } + } + + rejectDeferredPromise( + wsKey: WsKey, + promiseRef: string | DeferredPromiseRef, + value: unknown, + removeAfter: boolean, + ): void { + const promise = this.getDeferredPromise(wsKey, promiseRef); + + if (promise?.reject) { + promise.reject(value); + } + + if (removeAfter) { + this.removeDeferredPromise(wsKey, promiseRef); + } + } + + removeDeferredPromise( + wsKey: WsKey, + promiseRef: string | DeferredPromiseRef, + ): void { + const storeForKey = this.get(wsKey); + if (!storeForKey) { + return; + } + + const deferredPromise = storeForKey.deferredPromiseStore[promiseRef]; + if (deferredPromise) { + // Just in case it's pending + if (deferredPromise.resolve) { + deferredPromise.resolve('promiseRemoved'); + } + + delete storeForKey.deferredPromiseStore[promiseRef]; + } + } + + rejectAllDeferredPromises(wsKey: WsKey, reason: string): void { + const storeForKey = this.get(wsKey); + const deferredPromiseStore = storeForKey.deferredPromiseStore; + if (!storeForKey || !deferredPromiseStore) { + return; + } + + const reservedKeys = Object.values(DEFERRED_PROMISE_REF) as string[]; + + for (const promiseRef in deferredPromiseStore) { + // Skip reserved keys, such as the connection promise + if (reservedKeys.includes(promiseRef)) { + continue; + } + + try { + this.rejectDeferredPromise(wsKey, promiseRef, reason, true); + } catch (e) { + this.logger.error( + 'rejectAllDeferredPromises(): Exception rejecting deferred promise', + { wsKey: wsKey, reason, promiseRef, exception: e }, + ); + } + } + } + + /** Get promise designed to track a connection attempt in progress. Resolves once connected. */ + getConnectionInProgressPromise( + wsKey: WsKey, + ): DeferredPromise | undefined { + return this.getDeferredPromise( + wsKey, + DEFERRED_PROMISE_REF.CONNECTION_IN_PROGRESS, + ); + } + + getAuthenticationInProgressPromise( + wsKey: WsKey, + ): DeferredPromise | undefined { + return this.getDeferredPromise( + wsKey, + DEFERRED_PROMISE_REF.AUTHENTICATION_IN_PROGRESS, + ); + } + + /** + * Create a deferred promise designed to track a connection attempt in progress. + * + * Will throw if existing promise is found. + */ + createConnectionInProgressPromise( + wsKey: WsKey, + throwIfExists: boolean, + ): DeferredPromise { + return this.createDeferredPromise( + wsKey, + DEFERRED_PROMISE_REF.CONNECTION_IN_PROGRESS, + throwIfExists, + ); + } + + createAuthenticationInProgressPromise( + wsKey: WsKey, + throwIfExists: boolean, + ): DeferredPromise { + return this.createDeferredPromise( + wsKey, + DEFERRED_PROMISE_REF.AUTHENTICATION_IN_PROGRESS, + throwIfExists, + ); + } + + /** Remove promise designed to track a connection attempt in progress */ + removeConnectingInProgressPromise(wsKey: WsKey): void { + return this.removeDeferredPromise( + wsKey, + DEFERRED_PROMISE_REF.CONNECTION_IN_PROGRESS, + ); + } + + removeAuthenticationInProgressPromise(wsKey: WsKey): void { + return this.removeDeferredPromise( + wsKey, + DEFERRED_PROMISE_REF.AUTHENTICATION_IN_PROGRESS, + ); + } + + /* connection state */ + + isWsOpen(key: WsKey): boolean { + const existingConnection = this.getWs(key); + return ( + !!existingConnection && + existingConnection.readyState === existingConnection.OPEN + ); + } + + getConnectionState(key: WsKey): WsConnectionStateEnum { + return this.get(key, true).connectionState!; + } + + setConnectionState(key: WsKey, state: WsConnectionStateEnum) { + this.get(key, true).connectionState = state; + } + + isConnectionState(key: WsKey, state: WsConnectionStateEnum): boolean { + return this.getConnectionState(key) === state; + } + + /** + * Check if we're currently in the process of opening a connection for any reason. Safer than only checking "CONNECTING" as the state + * @param key + * @returns + */ + isConnectionAttemptInProgress(key: WsKey): boolean { + const isConnectionInProgress = + this.isConnectionState(key, WsConnectionStateEnum.CONNECTING) || + this.isConnectionState(key, WsConnectionStateEnum.RECONNECTING); + + return isConnectionInProgress; + } + + /* subscribed topics */ + + getTopics(key: WsKey): Set { + return this.get(key, true).subscribedTopics; + } + + getTopicsByKey(): Record> { + const result: any = {}; + for (const refKey in this.wsState) { + result[refKey] = this.getTopics(refKey as WsKey); + } + return result; + } + + // Since topics are objects we can't rely on the set to detect duplicates + /** + * Find matching "topic" request from the store + * @param key + * @param topic + * @returns + */ + getMatchingTopic(key: WsKey, topic: TWSTopicSubscribeEventArgs) { + // if (typeof topic === 'string') { + // return this.getMatchingTopic(key, { channel: topic }); + // } + + const allTopics = this.getTopics(key).values(); + for (const storedTopic of allTopics) { + if (isDeepObjectMatch(topic, storedTopic)) { + return storedTopic; + } + } + } + + addTopic(key: WsKey, topic: TWSTopicSubscribeEventArgs) { + // if (typeof topic === 'string') { + // return this.addTopic(key, { + // instType: 'sp', + // channel: topic, + // instId: 'default', + // }; + // } + // Check for duplicate topic. If already tracked, don't store this one + const existingTopic = this.getMatchingTopic(key, topic); + if (existingTopic) { + return this.getTopics(key); + } + return this.getTopics(key).add(topic); + } + + deleteTopic(key: WsKey, topic: TWSTopicSubscribeEventArgs) { + // Check if we're subscribed to a topic like this + const storedTopic = this.getMatchingTopic(key, topic); + if (storedTopic) { + this.getTopics(key).delete(storedTopic); + } + + return this.getTopics(key); + } +} diff --git a/src/util/websockets/WsStore.types.ts b/src/util/websockets/WsStore.types.ts new file mode 100644 index 00000000..d2a4d493 --- /dev/null +++ b/src/util/websockets/WsStore.types.ts @@ -0,0 +1,58 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import WebSocket from 'isomorphic-ws'; + +export enum WsConnectionStateEnum { + INITIAL = 0, + CONNECTING = 1, + CONNECTED = 2, + CLOSING = 3, + RECONNECTING = 4, + // ERROR_RECONNECTING = 5, + // ERROR = 5, +} + +export interface DeferredPromise { + resolve?: (value: TSuccess) => TSuccess; + reject?: (value: TError) => TError; + promise?: Promise; +} + +export interface WSConnectedResult { + wsKey: string; +} + +export interface WsStoredState { + /** The currently active websocket connection */ + ws?: WebSocket; + /** The current lifecycle state of the connection (enum) */ + connectionState?: WsConnectionStateEnum; + /** A timer that will send an upstream heartbeat (ping) when it expires */ + activePingTimer?: ReturnType | undefined; + /** A timer tracking that an upstream heartbeat was sent, expecting a reply before it expires */ + activePongTimer?: ReturnType | undefined; + /** If a reconnection is in progress, this will have the timer for the delayed reconnect */ + activeReconnectTimer?: ReturnType | undefined; + /** + * When a connection attempt is in progress (even for reconnect), a promise is stored here. + * + * This promise will resolve once connected (and will then get removed); + */ + // connectionInProgressPromise?: DeferredPromise | undefined; + deferredPromiseStore: Record; + /** + * All the topics we are expected to be subscribed to on this connection (and we automatically resubscribe to if the connection drops) + * + * A "Set" and a deep-object-match are used to ensure we only subscribe to a + * topic once (tracking a list of unique topics we're expected to be connected to) + */ + subscribedTopics: Set; + /** Whether this connection has completed authentication (only applies to private connections) */ + isAuthenticated?: boolean; + /** + * Whether this connection has completed authentication before for the Websocket API, so it k + * nows to automatically reauth if reconnected + */ + didAuthWSAPI?: boolean; + /** To reauthenticate on the WS API, which channel do we send to? */ + WSAPIAuthChannel?: string; +} diff --git a/src/util/websockets/websocket-util.ts b/src/util/websockets/websocket-util.ts new file mode 100644 index 00000000..d344941c --- /dev/null +++ b/src/util/websockets/websocket-util.ts @@ -0,0 +1,444 @@ +import WebSocket from 'isomorphic-ws'; + +import { WSAPIRequest } from '../../types/websockets/ws-api'; +import { APIMarket, WsTopic } from '../../types/websockets/ws-general'; +import { WebsocketClientOptions, WsKey } from '../../websocket-client'; +import { DefaultLogger } from '../logger'; +import { neverGuard } from '../typeGuards'; + +export const WS_LOGGER_CATEGORY = { category: 'binance-ws' }; + +export const WS_KEY_MAP = { + // https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams + main: 'main', // spot, margin, isolated margin, user data + main2: 'main2', // spot, margin, isolated margin, user data | alternative + main3: 'main3', // spot, margin, isolated margin | alternative | MARKET DATA ONLY | NO USER DATA + + // https://developers.binance.com/docs/binance-spot-api-docs/web-socket-api/general-api-information + mainWSAPI: 'mainWSAPI', // trading over WS in spot, margin, isolated margin. User data supported too. + mainWSAPI2: 'mainWSAPI2', // trading over WS in spot, margin, isolated margin. User data supported too. + mainWSAPITestnet: 'mainWSAPITestnet', // trading over WS in spot, margin, isolated margin | TESTNET + + // https://developers.binance.com/docs/margin_trading/risk-data-stream + riskUserData: 'riskUserData', + + // https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams + // market data, user data + usdm: 'usdm', + // https://developers.binance.com/docs/derivatives/usds-margined-futures/general-info + usdmTestnet: 'usdmTestnet', + + // https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-api-general-info + // ONLY WS API | NO USER DATA + usdmWSAPI: 'usdmWSAPI', + usdmWSAPITestnet: 'usdmWSAPITestnet', + + coinm: 'coinm', + coinm2: 'coinm2', + coinmTestnet: 'coinmTestnet', + + options: 'options', + optionsTestnet: 'optionsTestnet', + + portfolioMargin: 'portfolioMargin', + portfolioMarginPro: 'portfolioMarginPro', +} as const; + +/** + * + * Listen key sub on an active connection +{ + "method": "SUBSCRIBE", + "params": [ + "listenkey" + ], + "id": 1 +} + + * + * + */ + +const allWsURLs: Record = { + // https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams + main: 'wss://stream.binance.com:9443', // spot, margin, isolated margin, user data + main2: 'wss://stream.binance.com:443', // spot, margin, isolated margin, user data | alternative + main3: 'wss://data-stream.binance.vision', // spot, margin, isolated margin | alternative | MARKET DATA ONLY | NO USER DATA + + // https://developers.binance.com/docs/binance-spot-api-docs/testnet/web-socket-streams#general-wss-information + mainTestnetPublic: 'wss://testnet.binance.vision/ws', + mainTestnetUserData: 'wss://stream.testnet.binance.vision:9443', + + // https://developers.binance.com/docs/binance-spot-api-docs/web-socket-api/general-api-information + mainWSAPI: 'wss://ws-api.binance.com:443/ws-api/v3', + mainWSAPI2: 'wss://ws-api.binance.com:9443/ws-api/v3', + mainWSAPITestnet: 'wss://testnet.binance.vision/ws-api/v3', + + // https://developers.binance.com/docs/margin_trading/risk-data-stream + marginRiskUserData: 'wss://margin-stream.binance.com', + + // https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams + // market data, user data + usdm: 'wss://fstream.binance.com', + + // https://developers.binance.com/docs/derivatives/usds-margined-futures/general-info + usdmTestnet: 'wss://stream.binancefuture.com', + + // https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-api-general-info + // ONLY WS API + usdmWSAPI: 'wss://ws-fapi.binance.com/ws-fapi/v1', + usdmWSAPITestnet: 'wss://testnet.binancefuture.com/ws-fapi/v1', + + // https://developers.binance.com/docs/derivatives/coin-margined-futures/websocket-market-streams + // market data, user data + coinm: 'wss://dstream.binance.com', + coinm2: 'wss://dstream-auth.binance.com', + // https://developers.binance.com/docs/derivatives/coin-margined-futures/general-info + coinmTestnet: 'wss://dstream.binancefuture.com', + + // https://developers.binance.com/docs/derivatives/option/user-data-streams + options: 'wss://nbstream.binance.com/eoptions', + // optionsTestnet: 'wss://testnetws.binanceops.com', + + // https://developers.binance.com/docs/derivatives/portfolio-margin/user-data-streams + portfolioMargin: 'wss://fstream.binance.com/pm', // /ws/listekeyhere + + // https://developers.binance.com/docs/derivatives/portfolio-margin-pro/portfolio-margin-pro-user-data-stream + portfolioMarginPro: 'wss://fstream.binance.com/pm-classic', // /ws/listenkeyhere +}; + +export const WS_AUTH_ON_CONNECT_KEYS: WsKey[] = [ + WS_KEY_MAP.v5Private, + WS_KEY_MAP.v5PrivateTrade, +]; + +export const PUBLIC_WS_KEYS = [ + WS_KEY_MAP.v5SpotPublic, + WS_KEY_MAP.v5LinearPublic, + WS_KEY_MAP.v5InversePublic, + WS_KEY_MAP.v5OptionPublic, +] as string[]; + +/** Used to automatically determine if a sub request should be to the public or private ws (when there's two) */ +const PRIVATE_TOPICS = [ + 'stop_order', + 'outboundAccountInfo', + 'executionReport', + 'ticketInfo', + // copy trading apis + 'copyTradePosition', + 'copyTradeOrder', + 'copyTradeExecution', + 'copyTradeWallet', + // usdc options + 'user.openapi.option.position', + 'user.openapi.option.trade', + 'user.order', + 'user.openapi.option.order', + 'user.service', + 'user.openapi.greeks', + 'user.mmp.event', + // usdc perps + 'user.openapi.perp.position', + 'user.openapi.perp.trade', + 'user.openapi.perp.order', + 'user.service', + // unified margin + 'user.position.unifiedAccount', + 'user.execution.unifiedAccount', + 'user.order.unifiedAccount', + 'user.wallet.unifiedAccount', + 'user.greeks.unifiedAccount', + // contract v3 + 'user.position.contractAccount', + 'user.execution.contractAccount', + 'user.order.contractAccount', + 'user.wallet.contractAccount', + // v5 + 'position', + 'execution', + 'order', + 'wallet', + 'greeks', +]; + +/** + * Normalised internal format for a request (subscribe/unsubscribe/etc) on a topic, with optional parameters. + * + * - Topic: the topic this event is for + * - Payload: the parameters to include, optional. E.g. auth requires key + sign. Some topics allow configurable parameters. + * - Category: required for bybit, since different categories have different public endpoints + */ +export interface WsTopicRequest< + TWSTopic extends string = string, + TWSPayload = unknown, +> { + topic: TWSTopic; + payload?: TWSPayload; +} + +/** + * Conveniently allow users to request a topic either as string topics or objects (containing string topic + params) + */ +export type WsTopicRequestOrStringTopic< + TWSTopic extends string, + TWSPayload = unknown, +> = WsTopicRequest | string; + +interface NetworkMapV3 { + livenet: string; + livenet2?: string; + testnet: string; + testnet2?: string; +} + +type PublicPrivateNetwork = 'public' | 'private'; + +/** + * The following WS keys are logical. + * + * They're not directly used as a market. They usually have one private endpoint but many public ones, + * so they need a bit of extra handling for seamless messaging between endpoints. + * + * For the unified keys, the "split" happens using the symbol. Symbols suffixed with USDT are obviously USDT topics. + * For the v5 endpoints, the subscribe/unsubscribe call must specify the category the subscription should route to. + */ +type PublicOnlyWsKeys = + | 'v5SpotPublic' + | 'v5LinearPublic' + | 'v5InversePublic' + | 'v5OptionPublic'; + +export const WS_BASE_URL_MAP: Record< + APIMarket, + Record +> & + Record> & + Record< + typeof WS_KEY_MAP.v5PrivateTrade, + Record + > = { + v5: { + public: { + livenet: 'public topics are routed internally via the public wskeys', + testnet: 'public topics are routed internally via the public wskeys', + }, + private: { + livenet: 'wss://stream.bybit.com/v5/private', + testnet: 'wss://stream-testnet.bybit.com/v5/private', + }, + }, + v5PrivateTrade: { + public: { + livenet: 'public topics are routed internally via the public wskeys', + testnet: 'public topics are routed internally via the public wskeys', + }, + private: { + livenet: 'wss://stream.bybit.com/v5/trade', + testnet: 'wss://stream-testnet.bybit.com/v5/trade', + }, + }, + v5SpotPublic: { + public: { + livenet: 'wss://stream.bybit.com/v5/public/spot', + testnet: 'wss://stream-testnet.bybit.com/v5/public/spot', + }, + }, + v5LinearPublic: { + public: { + livenet: 'wss://stream.bybit.com/v5/public/linear', + testnet: 'wss://stream-testnet.bybit.com/v5/public/linear', + }, + }, + v5InversePublic: { + public: { + livenet: 'wss://stream.bybit.com/v5/public/inverse', + testnet: 'wss://stream-testnet.bybit.com/v5/public/inverse', + }, + }, + v5OptionPublic: { + public: { + livenet: 'wss://stream.bybit.com/v5/public/option', + testnet: 'wss://stream-testnet.bybit.com/v5/public/option', + }, + }, +}; + +export function isPrivateWsTopic(topic: string): boolean { + return PRIVATE_TOPICS.includes(topic); +} + +export function getWsKeyForTopic( + market: APIMarket, + topic: string, + isPrivate?: boolean, +): WsKey { + const isPrivateTopic = isPrivate === true || PRIVATE_TOPICS.includes(topic); + switch (market) { + case 'v5': { + if (isPrivateTopic) { + return WS_KEY_MAP.v5Private; + } + return WS_KEY_MAP.v5Private; + } + default: { + throw neverGuard(market, 'getWsKeyForTopic(): Unhandled market'); + } + } +} + +export function getWsUrl( + wsKey: WsKey, + wsClientOptions: WebsocketClientOptions, + logger: typeof DefaultLogger, +): string { + const wsUrl = wsClientOptions.wsUrl; + if (wsUrl) { + return wsUrl; + } + + // https://bybit-exchange.github.io/docs/v5/demo + // const isDemoTrading = wsClientOptions.demoTrading; + // if (isDemoTrading) { + // return 'wss://stream-demo.bybit.com/v5/private'; + // } + + const isTestnet = wsClientOptions.testnet; + const networkKey = isTestnet ? 'testnet' : 'livenet'; + + switch (wsKey) { + case WS_KEY_MAP.v5Private: { + return WS_BASE_URL_MAP.v5.private[networkKey]; + } + case WS_KEY_MAP.v5PrivateTrade: { + return WS_BASE_URL_MAP[wsKey].private[networkKey]; + } + case WS_KEY_MAP.v5SpotPublic: { + return WS_BASE_URL_MAP.v5SpotPublic.public[networkKey]; + } + case WS_KEY_MAP.v5LinearPublic: { + return WS_BASE_URL_MAP.v5LinearPublic.public[networkKey]; + } + case WS_KEY_MAP.v5InversePublic: { + return WS_BASE_URL_MAP.v5InversePublic.public[networkKey]; + } + case WS_KEY_MAP.v5OptionPublic: { + return WS_BASE_URL_MAP.v5OptionPublic.public[networkKey]; + } + default: { + logger.error('getWsUrl(): Unhandled wsKey: ', { + category: 'bybit-ws', + wsKey, + }); + throw neverGuard(wsKey, 'getWsUrl(): Unhandled wsKey'); + } + } +} + +export function getMaxTopicsPerSubscribeEvent( + market: APIMarket, + wsKey: WsKey, +): number | null { + switch (market) { + case 'v5': { + if (wsKey === WS_KEY_MAP.v5SpotPublic) { + return 10; + } + return null; + } + default: { + throw neverGuard(market, 'getWsKeyForTopic(): Unhandled market'); + } + } +} + +export const WS_ERROR_ENUM = { + NOT_AUTHENTICATED_SPOT_V3: '-1004', + API_ERROR_GENERIC: '10001', + API_SIGN_AUTH_FAILED: '10003', + USDC_OPTION_AUTH_FAILED: '3303006', +}; + +/** + * #305: ws.terminate() is undefined in browsers. + * This only works in node.js, not in browsers. + * Does nothing if `ws` is undefined. + */ +export function safeTerminateWs(ws?: WebSocket | unknown) { + // #305: ws.terminate() undefined in browsers + if (ws && typeof ws['terminate'] === 'function') { + ws.terminate(); + } +} +/** + * WS API promises are stored using a primary key. This key is constructed using + * properties found in every request & reply. + */ +export function getPromiseRefForWSAPIRequest( + requestEvent: WSAPIRequest, +): string { + const promiseRef = [requestEvent.op, requestEvent.reqId].join('_'); + return promiseRef; +} + +/** + * Users can conveniently pass topics as strings or objects (object has topic name + optional params). + * + * This method normalises topics into objects (object has topic name + optional params). + */ +export function getNormalisedTopicRequests( + wsTopicRequests: WsTopicRequestOrStringTopic[], +): WsTopicRequest[] { + const normalisedTopicRequests: WsTopicRequest[] = []; + + for (const wsTopicRequest of wsTopicRequests) { + // passed as string, convert to object + if (typeof wsTopicRequest === 'string') { + const topicRequest: WsTopicRequest = { + topic: wsTopicRequest, + payload: undefined, + }; + normalisedTopicRequests.push(topicRequest); + continue; + } + + // already a normalised object, thanks to user + normalisedTopicRequests.push(wsTopicRequest); + } + return normalisedTopicRequests; +} + +/** + * Groups topics in request into per-wsKey groups + * @param normalisedTopicRequests + * @param wsKey + * @param isPrivateTopic + * @returns + */ +export function getTopicsPerWSKey( + market: APIMarket, + normalisedTopicRequests: WsTopicRequest[], + wsKey?: WsKey, + isPrivateTopic?: boolean, +): { + [key in WsKey]?: WsTopicRequest[]; +} { + const perWsKeyTopics: { [key in WsKey]?: WsTopicRequest[] } = {}; + + // Sort into per wsKey arrays, in case topics are mixed together for different wsKeys + for (const topicRequest of normalisedTopicRequests) { + const derivedWsKey = + wsKey || getWsKeyForTopic(market, topicRequest.topic, isPrivateTopic); + + if ( + !perWsKeyTopics[derivedWsKey] || + !Array.isArray(perWsKeyTopics[derivedWsKey]) + ) { + perWsKeyTopics[derivedWsKey] = []; + } + + perWsKeyTopics[derivedWsKey]!.push(topicRequest); + } + + return perWsKeyTopics; +} diff --git a/src/util/ws-utils.ts b/src/util/ws-utils.ts deleted file mode 100644 index 5935958e..00000000 --- a/src/util/ws-utils.ts +++ /dev/null @@ -1,13 +0,0 @@ -import WebSocket from 'isomorphic-ws'; - -/** - * #168: ws.terminate() is undefined in browsers. - * This only works in node.js, not in browsers. - * Does nothing if `ws` is undefined. - */ -export function safeTerminateWs(ws: WebSocket | unknown) { - // #168: ws.terminate() undefined in browsers - if (typeof ws?.terminate === 'function' && ws) { - ws.terminate(); - } -} diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 08ee1ebd..ddbe3dd0 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -4,7 +4,6 @@ import { EventEmitter } from 'events'; import WebSocket from 'isomorphic-ws'; import { CoinMClient } from './coinm-client'; -import { DefaultLogger } from './logger'; import { MainClient } from './main-client'; import { KlineInterval } from './types/shared'; import { @@ -16,6 +15,7 @@ import { } from './types/websockets'; import { USDMClient } from './usdm-client'; import Beautifier from './util/beautifier'; +import { DefaultLogger } from './util/logger'; import { appendEventIfMissing, appendEventMarket, @@ -23,8 +23,9 @@ import { getWsKeyWithContext, RestClientOptions, } from './util/requestUtils'; -import { safeTerminateWs } from './util/ws-utils'; -import WsStore, { WsConnectionStateEnum } from './util/WsStore'; +import { safeTerminateWs } from './util/websockets/websocket-util'; +import { WsStore } from './util/websockets/WsStore'; +import { WsConnectionStateEnum } from './util/websockets/WsStore.types'; const wsBaseEndpoints: Record = { spot: 'wss://stream.binance.com:9443', @@ -163,7 +164,7 @@ export class WebsocketClient extends EventEmitter { private options: WebsocketClientOptions; - private wsStore: WsStore; + private wsStore: WsStore; private beautifier: Beautifier; @@ -217,14 +218,14 @@ export class WebsocketClient extends EventEmitter { const oldWs = this.wsStore.getWs(wsRefKey); if (oldWs && this.wsStore.isWsOpen(wsRefKey) && !forceNewConnection) { - this.logger.silly( + this.logger.trace( 'connectToWsUrl(): Returning existing open WS connection', { ...loggerCategory, wsRefKey }, ); return oldWs; } - this.logger.silly( + this.logger.trace( `connectToWsUrl(): Opening WS connection to URL: ${url}`, { ...loggerCategory, wsRefKey }, ); @@ -259,7 +260,7 @@ export class WebsocketClient extends EventEmitter { public tryWsSend(wsKey: WsKey, wsMessage: string) { try { - this.logger.silly('Sending upstream ws message: ', { + this.logger.trace('Sending upstream ws message: ', { ...loggerCategory, wsMessage, wsKey, @@ -287,7 +288,7 @@ export class WebsocketClient extends EventEmitter { public tryWsPing(wsKey: WsKey) { try { - // this.logger.silly(`Sending upstream ping: `, { ...loggerCategory, wsKey }); + // this.logger.trace(`Sending upstream ping: `, { ...loggerCategory, wsKey }); if (!wsKey) { throw new Error('No wsKey provided'); } @@ -304,7 +305,7 @@ export class WebsocketClient extends EventEmitter { ws.ping(); ws.pong(); } else { - this.logger.silly( + this.logger.trace( 'WS ready state not open - refusing to send WS ping', { ...loggerCategory, wsKey, readyState: ws?.readyState }, ); @@ -319,7 +320,7 @@ export class WebsocketClient extends EventEmitter { } private onWsOpen(ws: WebSocket, wsKey: WsKey, wsUrl: string) { - this.logger.silly(`onWsOpen(): ${wsUrl} : ${wsKey}`); + this.logger.trace(`onWsOpen(): ${wsUrl} : ${wsKey}`); if ( this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.RECONNECTING) ) { @@ -455,7 +456,7 @@ export class WebsocketClient extends EventEmitter { return; } - this.logger.warning( + this.logger.error( 'Bug? Unhandled ws message event type. Check if appendEventIfMissing needs to parse wsKey.', { ...loggerCategory, @@ -480,7 +481,7 @@ export class WebsocketClient extends EventEmitter { private sendPing(wsKey: WsKey, wsUrl: string) { this.clearPongTimer(wsKey); - this.logger.silly('Sending ping', { ...loggerCategory, wsKey }); + this.logger.trace('Sending ping', { ...loggerCategory, wsKey }); this.tryWsPing(wsKey); this.wsStore.get(wsKey, true).activePongTimer = setTimeout( @@ -495,7 +496,7 @@ export class WebsocketClient extends EventEmitter { ws: WebSocket, source: WsEventInternalSrc, ) { - this.logger.silly('Received ping, sending pong frame', { + this.logger.trace('Received ping, sending pong frame', { ...loggerCategory, wsKey, source, @@ -504,7 +505,7 @@ export class WebsocketClient extends EventEmitter { } private onWsPong(event: any, wsKey: WsKey, source: WsEventInternalSrc) { - this.logger.silly('Received pong, clearing pong timer', { + this.logger.trace('Received pong, clearing pong timer', { ...loggerCategory, wsKey, source, @@ -726,14 +727,14 @@ export class WebsocketClient extends EventEmitter { } if (state.keepAliveTimer) { - this.logger.silly( + this.logger.trace( `Clearing old listen key interval timer for ${listenKey}`, ); clearInterval(state.keepAliveTimer); } if (state.keepAliveRetryTimer) { - this.logger.silly( + this.logger.trace( `Clearing old listen key keepAliveRetry timer for ${listenKey}`, ); clearTimeout(state.keepAliveRetryTimer); @@ -916,7 +917,7 @@ export class WebsocketClient extends EventEmitter { this.clearUserDataKeepAliveTimer(listenKey); - this.logger.silly(`Created new listen key interval timer for ${listenKey}`); + this.logger.trace(`Created new listen key interval timer for ${listenKey}`); // Set timer to keep WS alive every 50 minutes const minutes50 = 1000 * 60 * 50; @@ -1055,7 +1056,7 @@ export class WebsocketClient extends EventEmitter { } const reconnectDelaySeconds = 1000 * 15; - this.logger.warning( + this.logger.info( `Userdata keep alive request failed due to error, trying again with short delay (${reconnectDelaySeconds} seconds)`, { ...loggerCategory, @@ -1789,8 +1790,11 @@ export class WebsocketClient extends EventEmitter { const market: WsMarket = 'spot'; const wsKey = getWsKeyWithContext(market, 'userData', undefined, listenKey); - if (!forceNewConnection && this.wsStore.isWsConnecting(wsKey)) { - this.logger.silly( + if ( + !forceNewConnection && + this.wsStore.isConnectionAttemptInProgress(wsKey) + ) { + this.logger.trace( 'Existing spot user data connection in progress for listen key. Avoiding duplicate', ); return this.getWs(wsKey); @@ -1857,8 +1861,11 @@ export class WebsocketClient extends EventEmitter { listenKey, ); - if (!forceNewConnection && this.wsStore.isWsConnecting(wsKey)) { - this.logger.silly( + if ( + !forceNewConnection && + this.wsStore.isConnectionAttemptInProgress(wsKey) + ) { + this.logger.trace( 'Existing margin user data connection in progress for listen key. Avoiding duplicate', ); return this.getWs(wsKey); @@ -1912,8 +1919,11 @@ export class WebsocketClient extends EventEmitter { listenKey, ); - if (!forceNewConnection && this.wsStore.isWsConnecting(wsKey)) { - this.logger.silly( + if ( + !forceNewConnection && + this.wsStore.isConnectionAttemptInProgress(wsKey) + ) { + this.logger.trace( 'Existing isolated margin user data connection in progress for listen key. Avoiding duplicate', ); return this.getWs(wsKey); @@ -1974,8 +1984,11 @@ export class WebsocketClient extends EventEmitter { listenKey, ); - if (!forceNewConnection && this.wsStore.isWsConnecting(wsKey)) { - this.logger.silly( + if ( + !forceNewConnection && + this.wsStore.isConnectionAttemptInProgress(wsKey) + ) { + this.logger.trace( 'Existing usd futures user data connection in progress for listen key. Avoiding duplicate', ); return this.getWs(wsKey); @@ -2034,8 +2047,11 @@ export class WebsocketClient extends EventEmitter { listenKey, ); - if (!forceNewConnection && this.wsStore.isWsConnecting(wsKey)) { - this.logger.silly( + if ( + !forceNewConnection && + this.wsStore.isConnectionAttemptInProgress(wsKey) + ) { + this.logger.trace( 'Existing usd futures user data connection in progress for listen key. Avoiding duplicate', ); return this.getWs(wsKey); diff --git a/test/websocket-client.test.ts b/test/websocket-client.test.ts index fa3c262c..6a8a2f1a 100644 --- a/test/websocket-client.test.ts +++ b/test/websocket-client.test.ts @@ -1,9 +1,10 @@ -import { parseRawWsMessage } from "../src/websocket-client"; +import { parseRawWsMessage } from '../src/websocket-client'; describe('websocket-client', () => { describe('parseRawWsMessage()', () => { it('should parse & resolve an event with nested data', () => { - const event = '{"stream":"!forceOrder@arr","data":{"e":"forceOrder","E":1634653599186,"o":{"s":"IOTXUSDT","S":"SELL","o":"LIMIT","f":"IOC","q":"3661","p":"0.06606","ap":"0.06669","X":"FILLED","l":"962","z":"3661","T":1634653599180}}}'; + const event = + '{"stream":"!forceOrder@arr","data":{"e":"forceOrder","E":1634653599186,"o":{"s":"IOTXUSDT","S":"SELL","o":"LIMIT","f":"IOC","q":"3661","p":"0.06606","ap":"0.06669","X":"FILLED","l":"962","z":"3661","T":1634653599180}}}'; const result = parseRawWsMessage(event); expect(typeof result).toBe('object'); expect(result.data).toBe(undefined); diff --git a/tsconfig.json b/tsconfig.json index 265ac648..5f4b66b5 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -17,6 +17,6 @@ "baseUrl": ".", "outDir": "lib" }, - "include": ["src/**/*", "src/.ts"], + "include": ["src/**/*"], "exclude": ["node_modules", "**/node_modules/*", "coverage", "doc"] }