From 59bf81cc9ab1345ed2cf3f295be495d5595c3a2b Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Tue, 26 Nov 2024 11:33:28 -0800 Subject: [PATCH 01/25] Add SDK subscribe function --- packages/sdk/src/account/account.js | 2 +- packages/sdk/src/block/block.js | 2 +- packages/sdk/src/contract.test.js | 2 +- .../node-version-info/node-version-info.ts | 2 +- packages/sdk/src/sdk.ts | 4 +- .../sdk/src/{send => transport}/send.test.js | 0 .../src/{send/send.js => transport/send.ts} | 26 ++++----- packages/sdk/src/transport/subscribe.test.ts | 48 ++++++++++++++++ packages/sdk/src/transport/subscribe.ts | 57 +++++++++++++++++++ packages/sdk/src/transport/transport.ts | 44 ++++++++++++++ 10 files changed, 169 insertions(+), 18 deletions(-) rename packages/sdk/src/{send => transport}/send.test.js (100%) rename packages/sdk/src/{send/send.js => transport/send.ts} (52%) create mode 100644 packages/sdk/src/transport/subscribe.test.ts create mode 100644 packages/sdk/src/transport/subscribe.ts create mode 100644 packages/sdk/src/transport/transport.ts diff --git a/packages/sdk/src/account/account.js b/packages/sdk/src/account/account.js index f3a2a75ed..54094ca86 100644 --- a/packages/sdk/src/account/account.js +++ b/packages/sdk/src/account/account.js @@ -3,7 +3,7 @@ import {atBlockId} from "../build/build-at-block-id.js" import {getAccount} from "../build/build-get-account.js" import {invariant} from "@onflow/util-invariant" import {decodeResponse as decode} from "../decode/decode.js" -import {send} from "../send/send.js" +import {send} from "../transport/send" /** * @typedef {import("@onflow/typedefs").Account} Account diff --git a/packages/sdk/src/block/block.js b/packages/sdk/src/block/block.js index 6cc92df8e..0ecdfd03e 100644 --- a/packages/sdk/src/block/block.js +++ b/packages/sdk/src/block/block.js @@ -1,4 +1,4 @@ -import {send} from "../send/send.js" +import {send} from "../transport/send" import {getBlock} from "../build/build-get-block" import {atBlockHeight} from "../build/build-at-block-height.js" import {atBlockId} from "../build/build-at-block-id.js" diff --git a/packages/sdk/src/contract.test.js b/packages/sdk/src/contract.test.js index 40f95cea8..a49d70d9f 100644 --- a/packages/sdk/src/contract.test.js +++ b/packages/sdk/src/contract.test.js @@ -2,7 +2,7 @@ import * as root from "./sdk" import * as decode from "./decode/decode.js" import * as encode from "./encode/encode" import * as interaction from "./interaction/interaction" -import * as send from "./send/send.js" +import * as send from "./transport/send" import * as template from "@onflow/util-template" const interfaceContract = diff --git a/packages/sdk/src/node-version-info/node-version-info.ts b/packages/sdk/src/node-version-info/node-version-info.ts index c905a76cd..9c73081b1 100644 --- a/packages/sdk/src/node-version-info/node-version-info.ts +++ b/packages/sdk/src/node-version-info/node-version-info.ts @@ -1,4 +1,4 @@ -import {send} from "../send/send.js" +import {send} from "../transport/send" import {decodeResponse as decode} from "../decode/decode.js" import {getNodeVersionInfo} from "../build/build-get-node-version-info" import {NodeVersionInfo} from "@onflow/typedefs" diff --git a/packages/sdk/src/sdk.ts b/packages/sdk/src/sdk.ts index 9ea40de35..f3ff5fa50 100644 --- a/packages/sdk/src/sdk.ts +++ b/packages/sdk/src/sdk.ts @@ -2,7 +2,7 @@ import * as logger from "@onflow/util-logger" // Base export {build} from "./build/build.js" export {resolve} from "./resolve/resolve.js" -export {send} from "./send/send.js" +export {send} from "./transport/send" export {decode} from "./decode/sdk-decode.js" export { encodeTransactionPayload, @@ -113,3 +113,5 @@ import * as TestUtils from "./test-utils" export {TestUtils} export {VERSION} from "./VERSION" + +export {subscribe} from "./transport/subscribe" diff --git a/packages/sdk/src/send/send.test.js b/packages/sdk/src/transport/send.test.js similarity index 100% rename from packages/sdk/src/send/send.test.js rename to packages/sdk/src/transport/send.test.js diff --git a/packages/sdk/src/send/send.js b/packages/sdk/src/transport/send.ts similarity index 52% rename from packages/sdk/src/send/send.js rename to packages/sdk/src/transport/send.ts index eab60a9e4..94a0aea23 100644 --- a/packages/sdk/src/send/send.js +++ b/packages/sdk/src/transport/send.ts @@ -1,23 +1,23 @@ import {Buffer} from "@onflow/rlp" -import {send as defaultSend} from "@onflow/transport-http" import {initInteraction, pipe} from "../interaction/interaction" import * as ixModule from "../interaction/interaction" -import {invariant} from "../build/build-invariant.js" +import {invariant} from "../build/build-invariant" import {response} from "../response/response" import {config} from "@onflow/config" -import {resolve as defaultResolve} from "../resolve/resolve.js" +import {resolve as defaultResolve} from "../resolve/resolve" +import {getTransport} from "./transport" /** * @description - Sends arbitrary scripts, transactions, and requests to Flow - * @param {Array. | Function} args - An array of functions that take interaction and return interaction - * @param {object} opts - Optional parameters - * @returns {Promise<*>} - A promise that resolves to a response + * @param args - An array of functions that take interaction and return interaction + * @param opts - Optional parameters + * @returns - A promise that resolves to a response */ -export const send = async (args = [], opts = {}) => { - const sendFn = await config.first( - ["sdk.transport", "sdk.send"], - opts.send || defaultSend - ) +export const send = async ( + args: Function | Function[] = [], + opts: any = {} +): Promise => { + const {send: sendFn} = await getTransport(opts) invariant( sendFn, @@ -31,10 +31,10 @@ export const send = async (args = [], opts = {}) => { opts.node = opts.node || (await config().get("accessNode.api")) - if (Array.isArray(args)) args = pipe(initInteraction(), args) + if (Array.isArray(args)) args = pipe(initInteraction(), args as any) as any return sendFn( await resolveFn(args), - {config, response, ix: ixModule, Buffer}, + {config, response, ix: ixModule, Buffer} as any, opts ) } diff --git a/packages/sdk/src/transport/subscribe.test.ts b/packages/sdk/src/transport/subscribe.test.ts new file mode 100644 index 000000000..3cf7f515a --- /dev/null +++ b/packages/sdk/src/transport/subscribe.test.ts @@ -0,0 +1,48 @@ +import {config} from "@onflow/config" +import {subscribe} from "./subscribe" +import {SdkTransport} from "@onflow/typedefs" +import {getTransport} from "./transport" + +jest.mock("./transport") + +describe("subscribe", () => { + let mockTransport: jest.Mocked + + beforeEach(() => { + jest.resetAllMocks() + + mockTransport = { + subscribe: jest.fn().mockReturnValue({ + unsubscribe: jest.fn(), + }), + send: jest.fn(), + } + jest.mocked(getTransport).mockResolvedValue(mockTransport) + }) + + test("subscribes to a topic and returns a subscription", async () => { + const topic = "topic" as SdkTransport.SubscriptionTopic + const args = {foo: "bar"} as SdkTransport.SubscriptionArguments + const onData = jest.fn() + const onError = jest.fn() + + const sub = await config().overload( + { + "accessNode.api": "http://localhost:8080", + }, + async () => { + return await subscribe({topic, args, onData, onError}) + } + ) + + expect(mockTransport.subscribe).toHaveBeenCalledTimes(1) + expect(mockTransport.subscribe).toHaveBeenCalledWith( + {topic, args, onData: expect.any(Function), onError}, + {node: "http://localhost:8080"} + ) + + expect(sub).toStrictEqual({ + unsubscribe: expect.any(Function), + }) + }) +}) diff --git a/packages/sdk/src/transport/subscribe.ts b/packages/sdk/src/transport/subscribe.ts new file mode 100644 index 000000000..ea74756b6 --- /dev/null +++ b/packages/sdk/src/transport/subscribe.ts @@ -0,0 +1,57 @@ +import {config} from "@onflow/config" +import {SdkTransport} from "@onflow/typedefs" +import {getTransport} from "./transport" +import {invariant} from "@onflow/util-invariant" + +// TODO: OPTS FUNCTION +export async function subscribe( + { + topic, + args, + onData, + onError, + }: { + topic: T + args: SdkTransport.SubscriptionArguments + onData: (data: SdkTransport.SubscriptionData) => void + onError: (error: Error) => void + }, + opts: { + node?: string + send?: SdkTransport.SendFn + transport?: SdkTransport.Transport + } = {} +) { + const transport = await getTransport(opts) + const node = opts?.node || (await config().get("accessNode.api")) + + invariant( + !!node, + `SDK Send Error: Either opts.node or "accessNode.api" in config must be defined.` + ) + + // TODO: handle onError + // Subscribe using the resolved transport + return transport.subscribe( + { + topic, + args, + onData: data => { + // TODO: decode function + onData(decode(topic, data)) + }, + onError, + }, + { + node, + ...opts, + } + ) +} + +export function decode( + topic: T, + data: SdkTransport.SubscriptionData +): any { + return data +} diff --git a/packages/sdk/src/transport/transport.ts b/packages/sdk/src/transport/transport.ts new file mode 100644 index 000000000..aa23ba5d6 --- /dev/null +++ b/packages/sdk/src/transport/transport.ts @@ -0,0 +1,44 @@ +import {config} from "@onflow/config" +import {httpTransport as defaultTransport} from "@onflow/transport-http" +import {SdkTransport} from "@onflow/typedefs" +import {invariant} from "@onflow/util-invariant" + +export async function getTransport( + opts: { + send?: SdkTransport.SendFn + transport?: SdkTransport.Transport + } = {} +): Promise { + invariant( + opts.send == null || opts.transport == null, + `SDK Transport Error: Cannot provide both "transport" and legacy "send" options.` + ) + + const transportOrSend = await config().first< + SdkTransport.Transport | SdkTransport.SendFn + >( + ["sdk.transport", "sdk.send"], + opts.transport || opts.send || defaultTransport + ) + + if (isTransportObject(transportOrSend)) { + // This is a transport object, return it directly + return transportOrSend + } else { + // This is a legacy send function, wrap it in a transport object + return { + send: transportOrSend, + subscribe: () => { + throw new Error( + "Subscribe not supported with legacy send function transport, please provide a transport object." + ) + }, + } + } +} + +function isTransportObject( + transport: any +): transport is SdkTransport.Transport { + return transport.send !== undefined && transport.subscribe !== undefined +} From 547a6497b5976d25bd8a08e2c271b6ff67c0f318 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Tue, 26 Nov 2024 19:06:48 -0800 Subject: [PATCH 02/25] refactor --- packages/sdk/src/account/account.js | 2 +- packages/sdk/src/block/block.js | 2 +- packages/sdk/src/contract.test.js | 2 +- .../node-version-info/node-version-info.ts | 2 +- packages/sdk/src/sdk.ts | 4 +- .../{transport.ts => get-transport.ts} | 19 +-- packages/sdk/src/transport/index.ts | 3 + .../sdk/src/transport/{ => send}/send.test.js | 0 packages/sdk/src/transport/{ => send}/send.ts | 15 +-- .../raw-subscribe.test.ts} | 23 ++-- .../raw-subscribe.ts} | 10 +- .../src/transport/subscribe/subscribe.test.ts | 112 ++++++++++++++++++ .../sdk/src/transport/subscribe/subscribe.ts | 40 +++++++ 13 files changed, 194 insertions(+), 40 deletions(-) rename packages/sdk/src/transport/{transport.ts => get-transport.ts} (72%) create mode 100644 packages/sdk/src/transport/index.ts rename packages/sdk/src/transport/{ => send}/send.test.js (100%) rename packages/sdk/src/transport/{ => send}/send.ts (69%) rename packages/sdk/src/transport/{subscribe.test.ts => subscribe/raw-subscribe.test.ts} (63%) rename packages/sdk/src/transport/{subscribe.ts => subscribe/raw-subscribe.ts} (80%) create mode 100644 packages/sdk/src/transport/subscribe/subscribe.test.ts create mode 100644 packages/sdk/src/transport/subscribe/subscribe.ts diff --git a/packages/sdk/src/account/account.js b/packages/sdk/src/account/account.js index 54094ca86..59709f96a 100644 --- a/packages/sdk/src/account/account.js +++ b/packages/sdk/src/account/account.js @@ -3,7 +3,7 @@ import {atBlockId} from "../build/build-at-block-id.js" import {getAccount} from "../build/build-get-account.js" import {invariant} from "@onflow/util-invariant" import {decodeResponse as decode} from "../decode/decode.js" -import {send} from "../transport/send" +import {send} from "../transport" /** * @typedef {import("@onflow/typedefs").Account} Account diff --git a/packages/sdk/src/block/block.js b/packages/sdk/src/block/block.js index 0ecdfd03e..b039a9334 100644 --- a/packages/sdk/src/block/block.js +++ b/packages/sdk/src/block/block.js @@ -1,4 +1,4 @@ -import {send} from "../transport/send" +import {send} from "../transport/send/send" import {getBlock} from "../build/build-get-block" import {atBlockHeight} from "../build/build-at-block-height.js" import {atBlockId} from "../build/build-at-block-id.js" diff --git a/packages/sdk/src/contract.test.js b/packages/sdk/src/contract.test.js index a49d70d9f..374257ef5 100644 --- a/packages/sdk/src/contract.test.js +++ b/packages/sdk/src/contract.test.js @@ -2,7 +2,7 @@ import * as root from "./sdk" import * as decode from "./decode/decode.js" import * as encode from "./encode/encode" import * as interaction from "./interaction/interaction" -import * as send from "./transport/send" +import * as send from "./transport" import * as template from "@onflow/util-template" const interfaceContract = diff --git a/packages/sdk/src/node-version-info/node-version-info.ts b/packages/sdk/src/node-version-info/node-version-info.ts index 9c73081b1..76d38e25f 100644 --- a/packages/sdk/src/node-version-info/node-version-info.ts +++ b/packages/sdk/src/node-version-info/node-version-info.ts @@ -1,4 +1,4 @@ -import {send} from "../transport/send" +import {send} from "../transport/send/send" import {decodeResponse as decode} from "../decode/decode.js" import {getNodeVersionInfo} from "../build/build-get-node-version-info" import {NodeVersionInfo} from "@onflow/typedefs" diff --git a/packages/sdk/src/sdk.ts b/packages/sdk/src/sdk.ts index f3ff5fa50..0a8bc9eeb 100644 --- a/packages/sdk/src/sdk.ts +++ b/packages/sdk/src/sdk.ts @@ -2,7 +2,7 @@ import * as logger from "@onflow/util-logger" // Base export {build} from "./build/build.js" export {resolve} from "./resolve/resolve.js" -export {send} from "./transport/send" +export {send} from "./transport" export {decode} from "./decode/sdk-decode.js" export { encodeTransactionPayload, @@ -114,4 +114,4 @@ export {TestUtils} export {VERSION} from "./VERSION" -export {subscribe} from "./transport/subscribe" +export {subscribe} from "./transport" diff --git a/packages/sdk/src/transport/transport.ts b/packages/sdk/src/transport/get-transport.ts similarity index 72% rename from packages/sdk/src/transport/transport.ts rename to packages/sdk/src/transport/get-transport.ts index aa23ba5d6..90557b1a7 100644 --- a/packages/sdk/src/transport/transport.ts +++ b/packages/sdk/src/transport/get-transport.ts @@ -3,14 +3,17 @@ import {httpTransport as defaultTransport} from "@onflow/transport-http" import {SdkTransport} from "@onflow/typedefs" import {invariant} from "@onflow/util-invariant" -export async function getTransport( - opts: { - send?: SdkTransport.SendFn - transport?: SdkTransport.Transport - } = {} -): Promise { +/** + * Get the SDK transport object, either from the provided override or from the global config. + * @param overrides + * @returns + */ +export async function getTransport(override: { + send?: SdkTransport.SendFn + transport?: SdkTransport.Transport +}): Promise { invariant( - opts.send == null || opts.transport == null, + override.send == null || override.transport == null, `SDK Transport Error: Cannot provide both "transport" and legacy "send" options.` ) @@ -18,7 +21,7 @@ export async function getTransport( SdkTransport.Transport | SdkTransport.SendFn >( ["sdk.transport", "sdk.send"], - opts.transport || opts.send || defaultTransport + override.transport || override.send || defaultTransport ) if (isTransportObject(transportOrSend)) { diff --git a/packages/sdk/src/transport/index.ts b/packages/sdk/src/transport/index.ts new file mode 100644 index 000000000..cf3d3ae64 --- /dev/null +++ b/packages/sdk/src/transport/index.ts @@ -0,0 +1,3 @@ +export {send} from "./send/send" +export {subscribe} from "./subscribe/subscribe" +export {rawSubscribe} from "./subscribe/raw-subscribe" diff --git a/packages/sdk/src/transport/send.test.js b/packages/sdk/src/transport/send/send.test.js similarity index 100% rename from packages/sdk/src/transport/send.test.js rename to packages/sdk/src/transport/send/send.test.js diff --git a/packages/sdk/src/transport/send.ts b/packages/sdk/src/transport/send/send.ts similarity index 69% rename from packages/sdk/src/transport/send.ts rename to packages/sdk/src/transport/send/send.ts index 94a0aea23..02ce8f1a6 100644 --- a/packages/sdk/src/transport/send.ts +++ b/packages/sdk/src/transport/send/send.ts @@ -1,11 +1,11 @@ import {Buffer} from "@onflow/rlp" -import {initInteraction, pipe} from "../interaction/interaction" -import * as ixModule from "../interaction/interaction" -import {invariant} from "../build/build-invariant" -import {response} from "../response/response" +import {initInteraction, pipe} from "../../interaction/interaction" +import * as ixModule from "../../interaction/interaction" +import {invariant} from "../../build/build-invariant" +import {response} from "../../response/response" import {config} from "@onflow/config" -import {resolve as defaultResolve} from "../resolve/resolve" -import {getTransport} from "./transport" +import {resolve as defaultResolve} from "../../resolve/resolve" +import {getTransport} from "../get-transport" /** * @description - Sends arbitrary scripts, transactions, and requests to Flow @@ -17,7 +17,8 @@ export const send = async ( args: Function | Function[] = [], opts: any = {} ): Promise => { - const {send: sendFn} = await getTransport(opts) + const transport = await getTransport(opts) + const sendFn = transport.send.bind(transport) invariant( sendFn, diff --git a/packages/sdk/src/transport/subscribe.test.ts b/packages/sdk/src/transport/subscribe/raw-subscribe.test.ts similarity index 63% rename from packages/sdk/src/transport/subscribe.test.ts rename to packages/sdk/src/transport/subscribe/raw-subscribe.test.ts index 3cf7f515a..bbf134730 100644 --- a/packages/sdk/src/transport/subscribe.test.ts +++ b/packages/sdk/src/transport/subscribe/raw-subscribe.test.ts @@ -1,26 +1,27 @@ import {config} from "@onflow/config" -import {subscribe} from "./subscribe" +import {rawSubscribe} from "./raw-subscribe" import {SdkTransport} from "@onflow/typedefs" -import {getTransport} from "./transport" +import {getTransport} from "../get-transport" -jest.mock("./transport") +jest.mock("../get-transport") describe("subscribe", () => { let mockTransport: jest.Mocked + let mockSub: jest.Mocked = { + unsubscribe: jest.fn(), + } beforeEach(() => { jest.resetAllMocks() mockTransport = { - subscribe: jest.fn().mockReturnValue({ - unsubscribe: jest.fn(), - }), + subscribe: jest.fn().mockReturnValue(mockSub), send: jest.fn(), } jest.mocked(getTransport).mockResolvedValue(mockTransport) }) - test("subscribes to a topic and returns a subscription", async () => { + test("subscribes to a topic and returns subscription from transport", async () => { const topic = "topic" as SdkTransport.SubscriptionTopic const args = {foo: "bar"} as SdkTransport.SubscriptionArguments const onData = jest.fn() @@ -31,18 +32,16 @@ describe("subscribe", () => { "accessNode.api": "http://localhost:8080", }, async () => { - return await subscribe({topic, args, onData, onError}) + return await rawSubscribe({topic, args, onData, onError}) } ) expect(mockTransport.subscribe).toHaveBeenCalledTimes(1) expect(mockTransport.subscribe).toHaveBeenCalledWith( - {topic, args, onData: expect.any(Function), onError}, + {topic, args, onData: onData, onError}, {node: "http://localhost:8080"} ) - expect(sub).toStrictEqual({ - unsubscribe: expect.any(Function), - }) + expect(sub).toBe(mockSub) }) }) diff --git a/packages/sdk/src/transport/subscribe.ts b/packages/sdk/src/transport/subscribe/raw-subscribe.ts similarity index 80% rename from packages/sdk/src/transport/subscribe.ts rename to packages/sdk/src/transport/subscribe/raw-subscribe.ts index ea74756b6..3bd1d5633 100644 --- a/packages/sdk/src/transport/subscribe.ts +++ b/packages/sdk/src/transport/subscribe/raw-subscribe.ts @@ -1,10 +1,10 @@ import {config} from "@onflow/config" import {SdkTransport} from "@onflow/typedefs" -import {getTransport} from "./transport" +import {getTransport} from "../get-transport" import {invariant} from "@onflow/util-invariant" // TODO: OPTS FUNCTION -export async function subscribe( +export async function rawSubscribe( { topic, args, @@ -18,7 +18,6 @@ export async function subscribe( }, opts: { node?: string - send?: SdkTransport.SendFn transport?: SdkTransport.Transport } = {} ) { @@ -36,10 +35,7 @@ export async function subscribe( { topic, args, - onData: data => { - // TODO: decode function - onData(decode(topic, data)) - }, + onData, onError, }, { diff --git a/packages/sdk/src/transport/subscribe/subscribe.test.ts b/packages/sdk/src/transport/subscribe/subscribe.test.ts new file mode 100644 index 000000000..ad7be7950 --- /dev/null +++ b/packages/sdk/src/transport/subscribe/subscribe.test.ts @@ -0,0 +1,112 @@ +import {SdkTransport} from "@onflow/typedefs" +import {subscribe} from "./subscribe" +import {rawSubscribe} from "./raw-subscribe" + +jest.mock("./raw-subscribe") +const mockRawSubscribe = jest.mocked(rawSubscribe) + +describe("subscribe", () => { + let mockSub: jest.Mocked = { + unsubscribe: jest.fn(), + } + + beforeEach(() => { + jest.resetAllMocks() + mockRawSubscribe.mockResolvedValue(mockSub) + }) + + test("subscribes to a topic and returns a subscription", async () => { + const topic = "topic" as SdkTransport.SubscriptionTopic + const args = {foo: "bar"} as SdkTransport.SubscriptionArguments + const onData = jest.fn() + const onError = jest.fn() + + const sub = await subscribe({ + topic, + args, + onData, + onError, + }) + + expect(sub).toBe(mockSub) + expect(mockRawSubscribe).toHaveBeenCalledTimes(1) + expect(mockRawSubscribe).toHaveBeenCalledWith( + {topic, args, onData: expect.any(Function), onError}, + {} + ) + }) + + test("unsubscribes from a subscription", async () => { + const topic = "topic" as SdkTransport.SubscriptionTopic + const args = {foo: "bar"} as SdkTransport.SubscriptionArguments + const onData = jest.fn() + const onError = jest.fn() + + const sub = await subscribe({ + topic, + args, + onData, + onError, + }) + + sub.unsubscribe() + + expect(mockSub.unsubscribe).toHaveBeenCalledTimes(1) + }) + + test("subscribes to a topic with a node", async () => { + const topic = "topic" as SdkTransport.SubscriptionTopic + const args = {foo: "bar"} as SdkTransport.SubscriptionArguments + const onData = jest.fn() + const onError = jest.fn() + + const node = "http://localhost:8080" + + const sub = await subscribe( + { + topic, + args, + onData, + onError, + }, + {node} + ) + + expect(sub).toBe(mockSub) + expect(mockRawSubscribe).toHaveBeenCalledTimes(1) + expect(mockRawSubscribe).toHaveBeenCalledWith( + {topic, args, onData: expect.any(Function), onError}, + {node} + ) + }) + + test("subscribes to a topic with custom node and transport", async () => { + const topic = "topic" as SdkTransport.SubscriptionTopic + const args = {foo: "bar"} as SdkTransport.SubscriptionArguments + const onData = jest.fn() + const onError = jest.fn() + + const node = "http://localhost:8080" + const transport = { + send: jest.fn(), + subscribe: jest.fn().mockResolvedValue(mockSub), + } as jest.Mocked + + const sub = await subscribe( + { + topic, + args, + onData, + onError, + }, + {node, transport} + ) + + expect(sub).toBe(mockSub) + expect(mockRawSubscribe).toHaveBeenCalledTimes(1) + expect(mockRawSubscribe).toHaveBeenCalledWith( + {topic, args, onData: expect.any(Function), onError}, + {node, transport} + ) + }) +}) diff --git a/packages/sdk/src/transport/subscribe/subscribe.ts b/packages/sdk/src/transport/subscribe/subscribe.ts new file mode 100644 index 000000000..ff8a0b4b5 --- /dev/null +++ b/packages/sdk/src/transport/subscribe/subscribe.ts @@ -0,0 +1,40 @@ +import {SdkTransport} from "@onflow/typedefs" +import {rawSubscribe} from "./raw-subscribe" +import {decodeResponse} from "../../decode/decode" + +export async function subscribe( + { + topic, + args, + onData, + onError, + }: { + topic: T + args: SdkTransport.SubscriptionArguments + onData: (data: SdkTransport.SubscriptionData) => void + onError: (error: Error) => void + }, + opts: { + node?: string + transport?: SdkTransport.Transport + } = {} +): Promise { + const sub = await rawSubscribe( + { + topic, + args, + onData: data => { + decodeResponse(data) + .then(onData) + .catch(e => { + onError(new Error(`Failed to subscription data: ${e}`)) + sub.unsubscribe() + }) + }, + onError, + }, + opts + ) + + return sub +} From 6aa731876bffc54c49ba4d704fdb2adec510d3b3 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Tue, 26 Nov 2024 20:11:36 -0800 Subject: [PATCH 03/25] cleanup --- packages/sdk/src/transport/get-transport.ts | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/packages/sdk/src/transport/get-transport.ts b/packages/sdk/src/transport/get-transport.ts index 90557b1a7..1ef253dee 100644 --- a/packages/sdk/src/transport/get-transport.ts +++ b/packages/sdk/src/transport/get-transport.ts @@ -5,8 +5,8 @@ import {invariant} from "@onflow/util-invariant" /** * Get the SDK transport object, either from the provided override or from the global config. - * @param overrides - * @returns + * @param overrides - Override default configuration with custom transport or send function. + * @returns The SDK transport object. */ export async function getTransport(override: { send?: SdkTransport.SendFn @@ -24,11 +24,8 @@ export async function getTransport(override: { override.transport || override.send || defaultTransport ) - if (isTransportObject(transportOrSend)) { - // This is a transport object, return it directly - return transportOrSend - } else { - // This is a legacy send function, wrap it in a transport object + // Backwards compatibility with legacy send function + if (!isTransportObject(transportOrSend)) { return { send: transportOrSend, subscribe: () => { @@ -38,6 +35,8 @@ export async function getTransport(override: { }, } } + + return transportOrSend } function isTransportObject( From a2caec8dacca6d894a8bcf087a3a3ea80047e7b9 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Tue, 26 Nov 2024 21:23:07 -0800 Subject: [PATCH 04/25] Add tests --- .../sdk/src/transport/get-transport.test.ts | 135 ++++++++++++++++++ packages/sdk/src/transport/get-transport.ts | 10 +- 2 files changed, 141 insertions(+), 4 deletions(-) create mode 100644 packages/sdk/src/transport/get-transport.test.ts diff --git a/packages/sdk/src/transport/get-transport.test.ts b/packages/sdk/src/transport/get-transport.test.ts new file mode 100644 index 000000000..4470093b2 --- /dev/null +++ b/packages/sdk/src/transport/get-transport.test.ts @@ -0,0 +1,135 @@ +import {SdkTransport} from "@onflow/typedefs" +import {getTransport} from "./get-transport" +import {httpTransport} from "@onflow/transport-http" +import {config} from "@onflow/config" + +jest.mock("@onflow/transport-http", () => ({ + httpTransport: { + send: jest.fn(), + subscribe: jest.fn(), + } as jest.Mocked, +})) + +describe("getTransport", () => { + beforeEach(() => { + jest.resetAllMocks() + }) + + test("fallback to http transport", async () => { + const transport = await getTransport() + expect(transport).toBe(httpTransport) + }) + + test("override with custom transport", async () => { + const customTransport = { + send: jest.fn(), + subscribe: jest.fn(), + } as jest.Mocked + + const transport = await getTransport({transport: customTransport}) + expect(transport).toBe(customTransport) + }) + + test("override with custom send function", async () => { + const customSend = jest.fn() + + const transport = await getTransport({send: customSend}) + expect(transport).toEqual({ + send: customSend, + subscribe: expect.any(Function), + }) + }) + + test("override with both custom transport and send function", async () => { + await expect( + getTransport({ + send: jest.fn(), + transport: { + send: jest.fn(), + subscribe: jest.fn(), + }, + }) + ).rejects.toThrow( + /Cannot provide both "transport" and legacy "send" options/ + ) + }) + + test("transport from global config - sdk.transport", async () => { + const customTransport = { + send: jest.fn(), + subscribe: jest.fn(), + } as jest.Mocked + + const tranpsort = await config().overload( + { + "sdk.transport": customTransport, + }, + async () => { + return await getTransport() + } + ) + + expect(tranpsort).toBe(customTransport) + }) + + test("send function from global config - sdk.transport", async () => { + const customSend = jest.fn() + + const transport = await config().overload( + { + "sdk.transport": customSend, + }, + async () => { + return await getTransport() + } + ) + expect(transport).toEqual({ + send: customSend, + subscribe: expect.any(Function), + }) + }) + + test("send function from global config - sdk.send", async () => { + const customSend = jest.fn() + + const transport = await config().overload( + { + "sdk.send": customSend, + }, + async () => { + return await getTransport() + } + ) + + expect(transport).toEqual({ + send: customSend, + subscribe: expect.any(Function), + }) + }) + + /** + * TODO: (jribbink) Figure out what to do with this logic. + * + * Currently, and previously, this logic is the reverse where the global config has priority over the custom transport. + * I disagree with this logic and believe that the custom transport should have priority over the global config. + * However, it would be a breaking change to change this logic. + * + */ + /*test("custom transport has priority over global config", async () => { + const customTransport = { + send: jest.fn(), + subscribe: jest.fn(), + } as jest.Mocked + + const transport = await config().overload( + { + "sdk.transport": httpTransport, + }, + async () => { + return await getTransport({transport: customTransport}) + } + ) + + expect(transport).toBe(customTransport) + })*/ +}) diff --git a/packages/sdk/src/transport/get-transport.ts b/packages/sdk/src/transport/get-transport.ts index 1ef253dee..fd1de2f98 100644 --- a/packages/sdk/src/transport/get-transport.ts +++ b/packages/sdk/src/transport/get-transport.ts @@ -8,10 +8,12 @@ import {invariant} from "@onflow/util-invariant" * @param overrides - Override default configuration with custom transport or send function. * @returns The SDK transport object. */ -export async function getTransport(override: { - send?: SdkTransport.SendFn - transport?: SdkTransport.Transport -}): Promise { +export async function getTransport( + override: { + send?: SdkTransport.SendFn + transport?: SdkTransport.Transport + } = {} +): Promise { invariant( override.send == null || override.transport == null, `SDK Transport Error: Cannot provide both "transport" and legacy "send" options.` From 48d0acbcd77ede5362ce5303f591ae3c1582ed8a Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Wed, 27 Nov 2024 15:43:58 -0800 Subject: [PATCH 05/25] stash --- .../src/subscribe/data-providers/blocks.ts | 1 + .../subscribe/data-providers/data-provider.ts | 20 +++++++++++++++++++ .../src/subscribe/subscription-manager.ts | 8 +++++--- .../src/subscribe/websocket-controller.ts | 0 .../src/sdk-transport/subscriptions.ts | 8 +++++--- 5 files changed, 31 insertions(+), 6 deletions(-) create mode 100644 packages/transport-http/src/subscribe/data-providers/blocks.ts create mode 100644 packages/transport-http/src/subscribe/data-providers/data-provider.ts create mode 100644 packages/transport-http/src/subscribe/websocket-controller.ts diff --git a/packages/transport-http/src/subscribe/data-providers/blocks.ts b/packages/transport-http/src/subscribe/data-providers/blocks.ts new file mode 100644 index 000000000..9c1739ee6 --- /dev/null +++ b/packages/transport-http/src/subscribe/data-providers/blocks.ts @@ -0,0 +1 @@ +export const blocksProvider = {} diff --git a/packages/transport-http/src/subscribe/data-providers/data-provider.ts b/packages/transport-http/src/subscribe/data-providers/data-provider.ts new file mode 100644 index 000000000..062fbcab7 --- /dev/null +++ b/packages/transport-http/src/subscribe/data-providers/data-provider.ts @@ -0,0 +1,20 @@ +export interface DataProvider< + Topic = any, + Args = any, + Data = any, + RawData = any, +> { + /** + * The topic of the data type + */ + topic: Topic + /** + * The function to get the checkpoint to resume the subscription from if a reconnection is needed + * @param data The data to derive the reconnection checkpoint from + */ + getReconnectionArgs: (data: Data) => Args + /** + * The callback to call when a data is received + */ + parseData: (data: RawData) => Data +} diff --git a/packages/transport-http/src/subscribe/subscription-manager.ts b/packages/transport-http/src/subscribe/subscription-manager.ts index b7ea02436..df3c46642 100644 --- a/packages/transport-http/src/subscribe/subscription-manager.ts +++ b/packages/transport-http/src/subscribe/subscription-manager.ts @@ -9,8 +9,9 @@ import { SubscribeMessageResponse, UnsubscribeMessageRequest, } from "./models" -import type {SdkTransport} from "@onflow/typedefs" +import {SdkTransport} from "@onflow/typedefs" import {WebSocket} from "./websocket" +import {DataProvider} from "./data-providers/data-provider" import * as logger from "@onflow/util-logger" const WS_OPEN = 1 @@ -63,8 +64,8 @@ export interface SubscriptionManagerConfig { export class SubscriptionManager { private counter = 0 - private subscriptions: SubscriptionInfo[] = [] private socket: WebSocket | null = null + private subscriptions: SubscriptionInfo[] = [] private config: DeepRequired private reconnectAttempts = 0 @@ -104,7 +105,8 @@ export class SubscriptionManager { this.updateSubscriptionCheckpoint(sub, data) // Call the subscription callback - sub.onData(data.data) + if (sub.topic === SdkTransport.SubscriptionTopic.BLOCKS) { + } } } this.socket.onclose = () => { diff --git a/packages/transport-http/src/subscribe/websocket-controller.ts b/packages/transport-http/src/subscribe/websocket-controller.ts new file mode 100644 index 000000000..e69de29bb diff --git a/packages/typedefs/src/sdk-transport/subscriptions.ts b/packages/typedefs/src/sdk-transport/subscriptions.ts index 53240d183..522073f4d 100644 --- a/packages/typedefs/src/sdk-transport/subscriptions.ts +++ b/packages/typedefs/src/sdk-transport/subscriptions.ts @@ -5,12 +5,14 @@ type SchemaItem = { // TODO: PLACEHOLDER - Replace with actual subscription topics export enum SubscriptionTopic { - PLACEHOLDER = "PLACEHOLDER", + BLOCKS = "blocks", } export type SubscriptionSchema = { - [SubscriptionTopic.PLACEHOLDER]: SchemaItem< - {}, + [SubscriptionTopic.BLOCKS]: SchemaItem< + { + start: number + }, { placeholder: string } From ac9254f66137c4bdef47af85d50b94af0eb4af63 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Thu, 28 Nov 2024 11:06:22 -0800 Subject: [PATCH 06/25] stash --- .../src/subscribe/data-providers/blocks.ts | 2 +- .../subscribe/data-providers/data-provider.ts | 2 +- .../src/subscribe/subscription-manager.ts | 58 +++++++++++++------ .../src/subscribe/subscription-registry.ts | 0 4 files changed, 42 insertions(+), 20 deletions(-) create mode 100644 packages/transport-http/src/subscribe/subscription-registry.ts diff --git a/packages/transport-http/src/subscribe/data-providers/blocks.ts b/packages/transport-http/src/subscribe/data-providers/blocks.ts index 9c1739ee6..3dbed25de 100644 --- a/packages/transport-http/src/subscribe/data-providers/blocks.ts +++ b/packages/transport-http/src/subscribe/data-providers/blocks.ts @@ -1 +1 @@ -export const blocksProvider = {} +export const blocksProvider: DataProvider = {} diff --git a/packages/transport-http/src/subscribe/data-providers/data-provider.ts b/packages/transport-http/src/subscribe/data-providers/data-provider.ts index 062fbcab7..10d630570 100644 --- a/packages/transport-http/src/subscribe/data-providers/data-provider.ts +++ b/packages/transport-http/src/subscribe/data-providers/data-provider.ts @@ -12,7 +12,7 @@ export interface DataProvider< * The function to get the checkpoint to resume the subscription from if a reconnection is needed * @param data The data to derive the reconnection checkpoint from */ - getReconnectionArgs: (data: Data) => Args + getReconnectArgs: (data: Data) => Args /** * The callback to call when a data is received */ diff --git a/packages/transport-http/src/subscribe/subscription-manager.ts b/packages/transport-http/src/subscribe/subscription-manager.ts index df3c46642..7f23b7e24 100644 --- a/packages/transport-http/src/subscribe/subscription-manager.ts +++ b/packages/transport-http/src/subscribe/subscription-manager.ts @@ -27,8 +27,8 @@ interface SubscriptionInfo { remoteId?: string // The topic of the subscription topic: T - // The checkpoint to resume the subscription from - checkpoint: SdkTransport.SubscriptionArguments + // The reconection arguments to resume the subscription from + reconnectArgs: SdkTransport.SubscriptionArguments // The callback to call when a data is received onData: (data: any) => void // The callback to call when an error occurs @@ -68,8 +68,12 @@ export class SubscriptionManager { private subscriptions: SubscriptionInfo[] = [] private config: DeepRequired private reconnectAttempts = 0 + private dataProviders: Record - constructor(config: SubscriptionManagerConfig) { + constructor( + config: SubscriptionManagerConfig, + dataProviders: DataProvider[] + ) { this.config = { ...config, reconnectOptions: { @@ -79,6 +83,15 @@ export class SubscriptionManager { ...config.reconnectOptions, }, } + + // Map data providers by topic + this.dataProviders = dataProviders.reduce( + (acc, provider) => { + acc[provider.topic] = provider + return acc + }, + {} as Record + ) } // Lazy connect to the socket when the first subscription is made @@ -91,22 +104,15 @@ export class SubscriptionManager { this.socket = new WebSocket(this.config.node) this.socket.onmessage = event => { - const data = JSON.parse(event.data) as + const message = JSON.parse(event.data) as | MessageResponse | SubscriptionDataMessage - if ("action" in data) { + if ("action" in message) { // TODO, waiting for AN team to decide what to do here } else { - const sub = this.subscriptions.find(sub => sub.remoteId === data.id) - if (!sub) return - // Update the block height to checkpoint for disconnects - this.updateSubscriptionCheckpoint(sub, data) - - // Call the subscription callback - if (sub.topic === SdkTransport.SubscriptionTopic.BLOCKS) { - } + this.handleSubscriptionData(message) } } this.socket.onclose = () => { @@ -197,7 +203,7 @@ export class SubscriptionManager { const sub: SubscriptionInfo = { id: this.counter++, topic: opts.topic, - checkpoint: opts.args, + reconnectArgs: opts.args, onData: opts.onData, onError: opts.onError, } @@ -248,7 +254,7 @@ export class SubscriptionManager { const request: SubscribeMessageRequest = { action: Action.SUBSCRIBE, topic: sub.topic, - arguments: sub.checkpoint, + arguments: sub.reconnectArgs, } this.socket?.send(JSON.stringify(request)) @@ -299,10 +305,26 @@ export class SubscriptionManager { // Update the subscription checkpoint when a message is received // These checkpoints are used to resume subscriptions after disconnects - private updateSubscriptionCheckpoint< + private handleSubscriptionData< T extends SdkTransport.SubscriptionTopic = SdkTransport.SubscriptionTopic, - >(sub: SubscriptionInfo, message: SubscriptionDataMessage) { - // TODO: Will be implemented with each subscription topic + >(message: SubscriptionDataMessage) { + // Get the subscription + const sub = this.subscriptions.find(sub => sub.remoteId === message.id) + if (!sub) { + throw new Error(`No subscription found for id ${message.id}`) + } + + // Get the data provider for the subscription + const provider = this.dataProviders[sub.topic] + if (!provider) { + throw new Error(`No data provider for topic ${sub.topic}`) + } + + // Update the checkpoint + sub.reconnectArgs = provider.getReconnectArgs(message) + + // Call the subscription callback + sub.onData(provider.parseData(message)) } /** diff --git a/packages/transport-http/src/subscribe/subscription-registry.ts b/packages/transport-http/src/subscribe/subscription-registry.ts new file mode 100644 index 000000000..e69de29bb From 178e25015102c64c5b9e28e6b67c4ed93d5b947a Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Thu, 28 Nov 2024 21:11:30 -0800 Subject: [PATCH 07/25] big updates --- .../src/subscribe/data-providers/blocks.ts | 1 - .../subscribe/data-providers/data-provider.ts | 20 ----- .../src/subscribe/handlers/blocks.ts | 37 ++++++++ .../src/subscribe/handlers/types.ts | 48 ++++++++++ .../transport-http/src/subscribe/subscribe.ts | 18 ++-- .../subscribe/subscription-manager.test.ts | 68 +++++++++----- .../src/subscribe/subscription-manager.ts | 88 +++++++++---------- .../src/subscribe/subscription-registry.ts | 0 .../src/subscribe/websocket-controller.ts | 0 .../src/sdk-transport/subscriptions.ts | 9 ++ 10 files changed, 194 insertions(+), 95 deletions(-) delete mode 100644 packages/transport-http/src/subscribe/data-providers/blocks.ts delete mode 100644 packages/transport-http/src/subscribe/data-providers/data-provider.ts create mode 100644 packages/transport-http/src/subscribe/handlers/blocks.ts create mode 100644 packages/transport-http/src/subscribe/handlers/types.ts delete mode 100644 packages/transport-http/src/subscribe/subscription-registry.ts delete mode 100644 packages/transport-http/src/subscribe/websocket-controller.ts diff --git a/packages/transport-http/src/subscribe/data-providers/blocks.ts b/packages/transport-http/src/subscribe/data-providers/blocks.ts deleted file mode 100644 index 3dbed25de..000000000 --- a/packages/transport-http/src/subscribe/data-providers/blocks.ts +++ /dev/null @@ -1 +0,0 @@ -export const blocksProvider: DataProvider = {} diff --git a/packages/transport-http/src/subscribe/data-providers/data-provider.ts b/packages/transport-http/src/subscribe/data-providers/data-provider.ts deleted file mode 100644 index 10d630570..000000000 --- a/packages/transport-http/src/subscribe/data-providers/data-provider.ts +++ /dev/null @@ -1,20 +0,0 @@ -export interface DataProvider< - Topic = any, - Args = any, - Data = any, - RawData = any, -> { - /** - * The topic of the data type - */ - topic: Topic - /** - * The function to get the checkpoint to resume the subscription from if a reconnection is needed - * @param data The data to derive the reconnection checkpoint from - */ - getReconnectArgs: (data: Data) => Args - /** - * The callback to call when a data is received - */ - parseData: (data: RawData) => Data -} diff --git a/packages/transport-http/src/subscribe/handlers/blocks.ts b/packages/transport-http/src/subscribe/handlers/blocks.ts new file mode 100644 index 000000000..76af1a309 --- /dev/null +++ b/packages/transport-http/src/subscribe/handlers/blocks.ts @@ -0,0 +1,37 @@ +import {SdkTransport} from "@onflow/typedefs" +import {createSubscriptionHandler} from "./types" + +export const blocksHandler = createSubscriptionHandler<{ + Topic: SdkTransport.SubscriptionTopic.BLOCKS + Args: SdkTransport.SubscriptionArguments + Data: SdkTransport.SubscriptionData + RawData: { + data: SdkTransport.SubscriptionData + } +}>({ + topic: SdkTransport.SubscriptionTopic.BLOCKS, + createSubscriber: (initialArgs, onData, onError) => { + let resumeArgs: SdkTransport.SubscriptionArguments = + initialArgs + + return { + sendData(data: { + data: SdkTransport.SubscriptionData + }) { + const parsedData = data.data + + resumeArgs = { + start: parsedData.placeholder.length, + } + + onData(data.data) + }, + sendError(error: Error) { + onError(error) + }, + get connectionArgs() { + return resumeArgs + }, + } + }, +}) diff --git a/packages/transport-http/src/subscribe/handlers/types.ts b/packages/transport-http/src/subscribe/handlers/types.ts new file mode 100644 index 000000000..39e445854 --- /dev/null +++ b/packages/transport-http/src/subscribe/handlers/types.ts @@ -0,0 +1,48 @@ +export interface DataConsumer { + onData(data: Data): void + onError(error: Error): void +} + +export interface SubscriptionHandler< + T extends { + Topic: string + Args: any + Data: any + RawData: any + }, +> { + readonly topic: T["Topic"] + createSubscriber( + initialArgs: T["Args"], + onData: (data: T["Data"]) => void, + onError: (error: Error) => void + ): DataSubscriber +} + +export interface DataSubscriber { + /** + * The callback to call when a data is received + */ + sendData(data: RawData): void + + /** + * The callback to call when an error is received + */ + sendError(error: Error): void + + /** + * Get the arguments to connect or reconnect to the subscription + */ + get connectionArgs(): Args +} + +export function createSubscriptionHandler< + T extends { + Topic: string + Args: any + Data: any + RawData: any + }, +>(handler: SubscriptionHandler): SubscriptionHandler { + return handler +} diff --git a/packages/transport-http/src/subscribe/subscribe.ts b/packages/transport-http/src/subscribe/subscribe.ts index 50e3ae3d4..65dfa77b3 100644 --- a/packages/transport-http/src/subscribe/subscribe.ts +++ b/packages/transport-http/src/subscribe/subscribe.ts @@ -1,8 +1,14 @@ import {SdkTransport} from "@onflow/typedefs" import {SubscriptionManager} from "./subscription-manager" +import {blocksHandler} from "./handlers/blocks" + +const SUBSCRIPTION_HANDLERS = [blocksHandler] // Map of SubscriptionManager instances by access node URL -let subscriptionManagerMap: Map = new Map() +let subscriptionManagerMap: Map< + string, + SubscriptionManager +> = new Map() export async function subscribe( { @@ -18,12 +24,12 @@ export async function subscribe( }, opts: {node: string} ): Promise { + // Get the SubscriptionManager instance for the access node, or create a new one + const node = opts.node const manager = - subscriptionManagerMap.get(opts.node) || - new SubscriptionManager({ - node: opts.node, - }) - subscriptionManagerMap.set(opts.node, manager) + subscriptionManagerMap.get(node) || + new SubscriptionManager(SUBSCRIPTION_HANDLERS, {node}) + subscriptionManagerMap.set(node, manager) return manager.subscribe({ topic, diff --git a/packages/transport-http/src/subscribe/subscription-manager.test.ts b/packages/transport-http/src/subscribe/subscription-manager.test.ts index 48d2b81cc..6a706d78a 100644 --- a/packages/transport-http/src/subscribe/subscription-manager.test.ts +++ b/packages/transport-http/src/subscribe/subscription-manager.test.ts @@ -12,15 +12,33 @@ import { SubscriptionManagerConfig, } from "./subscription-manager" import {SdkTransport} from "@onflow/typedefs" +import {DataSubscriber, SubscriptionHandler} from "./handlers/types" jest.mock("./websocket", () => ({ WebSocket: mockSocket, })) -describe("WsSubscriptionTransport", () => { +describe("WebSocket Manager", () => { let mockWs: WS + let mockSubscriber: jest.Mocked> + let mockHandler: jest.Mocked> + const mockConnectionArgs = {mock: "connection args"} + beforeEach(() => { + jest.resetAllMocks() + mockWs = new WS("wss://localhost:8080") + mockSubscriber = { + sendData: jest.fn(), + sendError: jest.fn(), + get connectionArgs() { + return mockConnectionArgs + }, + } + mockHandler = { + topic: "topic", + createSubscriber: jest.fn().mockReturnValue(mockSubscriber), + } }) afterEach(() => { @@ -28,11 +46,7 @@ describe("WsSubscriptionTransport", () => { }) test("does not connect to the socket when no subscriptions are made", async () => { - const config: SubscriptionManagerConfig = { - node: "wss://localhost:8080", - } - - new SubscriptionManager(config) + new SubscriptionManager([mockHandler], {node: "wss://localhost:8080"}) await new Promise(resolve => setTimeout(resolve, 0)) expect(mockWs.server.clients).toHaveLength(0) @@ -42,7 +56,7 @@ describe("WsSubscriptionTransport", () => { const config: SubscriptionManagerConfig = { node: "wss://localhost:8080", } - const streamController = new SubscriptionManager(config) + const subscriptionManager = new SubscriptionManager([mockHandler], config) const topic = "topic" as SdkTransport.SubscriptionTopic const args = {key: "value"} as any const onData = jest.fn() @@ -56,7 +70,7 @@ describe("WsSubscriptionTransport", () => { expect(data).toEqual({ action: "subscribe", topic, - arguments: args, + arguments: mockConnectionArgs, }) const response: SubscribeMessageResponse = { @@ -69,7 +83,7 @@ describe("WsSubscriptionTransport", () => { })() const [subscription] = await Promise.all([ - streamController.subscribe({ + subscriptionManager.subscribe({ topic, args, onData, @@ -92,7 +106,7 @@ describe("WsSubscriptionTransport", () => { const config: SubscriptionManagerConfig = { node: "wss://localhost:8080", } - const streamController = new SubscriptionManager(config) + const subscriptionManager = new SubscriptionManager([mockHandler], config) const topic = "topic" as SdkTransport.SubscriptionTopic const args = {key: "value"} as any const onData = jest.fn() @@ -106,7 +120,7 @@ describe("WsSubscriptionTransport", () => { expect(data).toEqual({ action: "subscribe", topic, - arguments: args, + arguments: mockConnectionArgs, }) const response: SubscribeMessageResponse = { @@ -119,7 +133,7 @@ describe("WsSubscriptionTransport", () => { })() const [subscription] = await Promise.all([ - streamController.subscribe({ + subscriptionManager.subscribe({ topic, args, onData, @@ -141,9 +155,9 @@ describe("WsSubscriptionTransport", () => { await serverPromise - expect(onData).toHaveBeenCalledTimes(1) - expect(onData).toHaveBeenCalledWith({key: "value"}) - expect(onError).toHaveBeenCalledTimes(0) + expect(mockSubscriber.sendData).toHaveBeenCalledTimes(1) + expect(mockSubscriber.sendData).toHaveBeenCalledWith({key: "value"}) + expect(mockSubscriber.sendError).toHaveBeenCalledTimes(0) serverPromise = (async () => { const msg = (await mockWs.nextMessage) as string @@ -162,7 +176,7 @@ describe("WsSubscriptionTransport", () => { const config: SubscriptionManagerConfig = { node: "wss://localhost:8080", } - const streamController = new SubscriptionManager(config) + const subscriptionManager = new SubscriptionManager([mockHandler], config) const topic = "topic" as SdkTransport.SubscriptionTopic const args = {key: "value"} as any const onData = jest.fn() @@ -176,7 +190,7 @@ describe("WsSubscriptionTransport", () => { expect(data).toEqual({ action: "subscribe", topic, - arguments: args, + arguments: mockConnectionArgs, }) const response: SubscribeMessageResponse = { @@ -189,7 +203,7 @@ describe("WsSubscriptionTransport", () => { })() const [subscription] = await Promise.all([ - streamController.subscribe({ + subscriptionManager.subscribe({ topic, args, onData, @@ -200,6 +214,12 @@ describe("WsSubscriptionTransport", () => { expect(subscription).toBeDefined() expect(subscription.unsubscribe).toBeInstanceOf(Function) + expect(mockHandler.createSubscriber).toHaveBeenCalledTimes(1) + expect(mockHandler.createSubscriber).toHaveBeenCalledWith( + args, + onData, + onError + ) serverPromise = (async () => { const data = { @@ -211,9 +231,9 @@ describe("WsSubscriptionTransport", () => { await serverPromise - expect(onData).toHaveBeenCalledTimes(1) - expect(onData).toHaveBeenCalledWith({key: "value"}) - expect(onError).toHaveBeenCalledTimes(0) + expect(mockSubscriber.sendData).toHaveBeenCalledTimes(1) + expect(mockSubscriber.sendData).toHaveBeenCalledWith({key: "value"}) + expect(mockSubscriber.sendError).toHaveBeenCalledTimes(0) // Close the connection and create a new one mockWs.close() @@ -227,7 +247,7 @@ describe("WsSubscriptionTransport", () => { expect(data).toEqual({ action: "subscribe", topic, - arguments: args, + arguments: mockConnectionArgs, }) const response: SubscribeMessageResponse = { @@ -254,7 +274,7 @@ describe("WsSubscriptionTransport", () => { await serverPromise - expect(onData).toHaveBeenCalledTimes(2) - expect(onData.mock.calls[1]).toEqual([{key: "value2"}]) + expect(mockSubscriber.sendData).toHaveBeenCalledTimes(2) + expect(mockSubscriber.sendData.mock.calls[1]).toEqual([{key: "value2"}]) }) }) diff --git a/packages/transport-http/src/subscribe/subscription-manager.ts b/packages/transport-http/src/subscribe/subscription-manager.ts index 7f23b7e24..f5f988df0 100644 --- a/packages/transport-http/src/subscribe/subscription-manager.ts +++ b/packages/transport-http/src/subscribe/subscription-manager.ts @@ -11,7 +11,7 @@ import { } from "./models" import {SdkTransport} from "@onflow/typedefs" import {WebSocket} from "./websocket" -import {DataProvider} from "./data-providers/data-provider" +import {DataSubscriber, SubscriptionHandler} from "./handlers/types" import * as logger from "@onflow/util-logger" const WS_OPEN = 1 @@ -20,19 +20,17 @@ type DeepRequired = Required<{ [K in keyof T]: DeepRequired }> -interface SubscriptionInfo { +type InferHandler = T extends SubscriptionHandler ? H : never + +interface SubscriptionInfo> { // Internal ID for the subscription id: number // Remote ID assigned by the server used for message routing and unsubscribing remoteId?: string // The topic of the subscription - topic: T - // The reconection arguments to resume the subscription from - reconnectArgs: SdkTransport.SubscriptionArguments - // The callback to call when a data is received - onData: (data: any) => void - // The callback to call when an error occurs - onError: (error: Error) => void + topic: string + // Data provider for the subscription + subscriber: DataSubscriber } export interface SubscriptionManagerConfig { @@ -62,18 +60,17 @@ export interface SubscriptionManagerConfig { } } -export class SubscriptionManager { +export class SubscriptionManager< + Handlers extends [...SubscriptionHandler[]], +> { private counter = 0 private socket: WebSocket | null = null - private subscriptions: SubscriptionInfo[] = [] + private subscriptions: SubscriptionInfo>[] = [] private config: DeepRequired private reconnectAttempts = 0 - private dataProviders: Record + private handlers: Record> - constructor( - config: SubscriptionManagerConfig, - dataProviders: DataProvider[] - ) { + constructor(handlers: Handlers, config: SubscriptionManagerConfig) { this.config = { ...config, reconnectOptions: { @@ -85,12 +82,12 @@ export class SubscriptionManager { } // Map data providers by topic - this.dataProviders = dataProviders.reduce( - (acc, provider) => { - acc[provider.topic] = provider + this.handlers = handlers.reduce( + (acc, handler) => { + acc[handler.topic] = handler return acc }, - {} as Record + {} as Record> ) } @@ -165,7 +162,7 @@ export class SubscriptionManager { }) this.subscriptions.forEach(sub => { - sub.onError( + sub.subscriber.sendError( new Error( `Failed to reconnect to the server after ${this.reconnectAttempts + 1} attempts: ${error}` ) @@ -190,22 +187,28 @@ export class SubscriptionManager { } } - async subscribe(opts: { - topic: T - args: SdkTransport.SubscriptionArguments - onData: (data: SdkTransport.SubscriptionData) => void + async subscribe(opts: { + topic: InferHandler["Topic"] + args: InferHandler["Args"] + onData: (data: InferHandler["Data"]) => void onError: (error: Error) => void }): Promise { // Connect the socket if it's not already open await this.connect() + // Get the data provider for the topic + const topicHandler = this.getHandler(opts.topic) + const subscriber = topicHandler.createSubscriber( + opts.args, + opts.onData, + opts.onError + ) + // Track the subscription locally - const sub: SubscriptionInfo = { + const sub: SubscriptionInfo> = { id: this.counter++, topic: opts.topic, - reconnectArgs: opts.args, - onData: opts.onData, - onError: opts.onError, + subscriber: subscriber, } this.subscriptions.push(sub) @@ -247,14 +250,12 @@ export class SubscriptionManager { } } - private async sendSubscribe( - sub: SubscriptionInfo - ) { + private async sendSubscribe(sub: SubscriptionInfo>) { // Send the subscription message const request: SubscribeMessageRequest = { action: Action.SUBSCRIBE, topic: sub.topic, - arguments: sub.reconnectArgs, + arguments: sub.subscriber.connectionArgs, } this.socket?.send(JSON.stringify(request)) @@ -270,7 +271,7 @@ export class SubscriptionManager { } private async sendUnsubscribe( - sub: SubscriptionInfo + sub: SubscriptionInfo> ) { // Send the unsubscribe message if the subscription has a remote id const {remoteId} = sub @@ -314,17 +315,16 @@ export class SubscriptionManager { throw new Error(`No subscription found for id ${message.id}`) } - // Get the data provider for the subscription - const provider = this.dataProviders[sub.topic] - if (!provider) { - throw new Error(`No data provider for topic ${sub.topic}`) - } - - // Update the checkpoint - sub.reconnectArgs = provider.getReconnectArgs(message) + // Send data to the subscriber + sub.subscriber.sendData(message.data) + } - // Call the subscription callback - sub.onData(provider.parseData(message)) + private getHandler(topic: string) { + const handler = this.handlers[topic] + if (!handler) { + throw new Error(`No handler found for topic ${topic}`) + } + return handler } /** diff --git a/packages/transport-http/src/subscribe/subscription-registry.ts b/packages/transport-http/src/subscribe/subscription-registry.ts deleted file mode 100644 index e69de29bb..000000000 diff --git a/packages/transport-http/src/subscribe/websocket-controller.ts b/packages/transport-http/src/subscribe/websocket-controller.ts deleted file mode 100644 index e69de29bb..000000000 diff --git a/packages/typedefs/src/sdk-transport/subscriptions.ts b/packages/typedefs/src/sdk-transport/subscriptions.ts index 522073f4d..8ccc80d59 100644 --- a/packages/typedefs/src/sdk-transport/subscriptions.ts +++ b/packages/typedefs/src/sdk-transport/subscriptions.ts @@ -6,6 +6,7 @@ type SchemaItem = { // TODO: PLACEHOLDER - Replace with actual subscription topics export enum SubscriptionTopic { BLOCKS = "blocks", + FOO = "foo", } export type SubscriptionSchema = { @@ -17,6 +18,14 @@ export type SubscriptionSchema = { placeholder: string } > + [SubscriptionTopic.FOO]: SchemaItem< + { + x: number + }, + { + x: string + } + > } export type SubscriptionArguments = From 088eeac6868b29e1d334ebc98f5f100141c850c5 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Thu, 28 Nov 2024 21:12:05 -0800 Subject: [PATCH 08/25] fix build --- packages/typedefs/src/sdk-transport/subscriptions.ts | 9 --------- 1 file changed, 9 deletions(-) diff --git a/packages/typedefs/src/sdk-transport/subscriptions.ts b/packages/typedefs/src/sdk-transport/subscriptions.ts index 8ccc80d59..522073f4d 100644 --- a/packages/typedefs/src/sdk-transport/subscriptions.ts +++ b/packages/typedefs/src/sdk-transport/subscriptions.ts @@ -6,7 +6,6 @@ type SchemaItem = { // TODO: PLACEHOLDER - Replace with actual subscription topics export enum SubscriptionTopic { BLOCKS = "blocks", - FOO = "foo", } export type SubscriptionSchema = { @@ -18,14 +17,6 @@ export type SubscriptionSchema = { placeholder: string } > - [SubscriptionTopic.FOO]: SchemaItem< - { - x: number - }, - { - x: string - } - > } export type SubscriptionArguments = From 42a95f468d699a1318f62fb4f8d0eb727f337dcf Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Thu, 28 Nov 2024 22:08:03 -0800 Subject: [PATCH 09/25] Update decoding --- .../src/subscribe/handlers/blocks.ts | 56 +++++++++++++++---- packages/typedefs/src/index.ts | 6 +- .../src/sdk-transport/subscriptions.ts | 16 ++++-- 3 files changed, 59 insertions(+), 19 deletions(-) diff --git a/packages/transport-http/src/subscribe/handlers/blocks.ts b/packages/transport-http/src/subscribe/handlers/blocks.ts index 76af1a309..eba1f9c52 100644 --- a/packages/transport-http/src/subscribe/handlers/blocks.ts +++ b/packages/transport-http/src/subscribe/handlers/blocks.ts @@ -1,30 +1,66 @@ import {SdkTransport} from "@onflow/typedefs" import {createSubscriptionHandler} from "./types" +type BlockDataModel = { + block: { + id: string + parent_id: string + height: number + timestamp: string + collection_guarantees: { + collection_id: string + signer_ids: string[] + }[] + block_seals: { + block_id: string + result_id: string + }[] + } +} + export const blocksHandler = createSubscriptionHandler<{ Topic: SdkTransport.SubscriptionTopic.BLOCKS Args: SdkTransport.SubscriptionArguments Data: SdkTransport.SubscriptionData - RawData: { - data: SdkTransport.SubscriptionData - } + RawData: BlockDataModel }>({ topic: SdkTransport.SubscriptionTopic.BLOCKS, createSubscriber: (initialArgs, onData, onError) => { let resumeArgs: SdkTransport.SubscriptionArguments = - initialArgs + { + ...initialArgs, + } return { - sendData(data: { - data: SdkTransport.SubscriptionData - }) { - const parsedData = data.data + sendData(data: BlockDataModel) { + // Parse the raw data + const parsedData: SdkTransport.SubscriptionData = + { + block: { + id: data.block.id, + parentId: data.block.parent_id, + height: data.block.height, + timestamp: data.block.timestamp, + collectionGuarantees: data.block.collection_guarantees.map( + guarantee => ({ + collectionId: guarantee.collection_id, + signerIds: guarantee.signer_ids, + }) + ), + blockSeals: data.block.block_seals.map(seal => ({ + blockId: seal.block_id, + executionReceiptId: seal.result_id, + })), + }, + } + // Update the resume args resumeArgs = { - start: parsedData.placeholder.length, + block_status: resumeArgs.block_status, + start_block_id: data.block.id, } - onData(data.data) + onData(parsedData) }, sendError(error: Error) { onError(error) diff --git a/packages/typedefs/src/index.ts b/packages/typedefs/src/index.ts index b9d516de3..4f92c10f9 100644 --- a/packages/typedefs/src/index.ts +++ b/packages/typedefs/src/index.ts @@ -99,10 +99,6 @@ export type Block = { * - The details of which nodes executed and sealed the blocks */ blockSeals: Array - /** - * - The cryptographic signature of the block - */ - signatures: Array } export type CollectionGuarantee = { /** @@ -112,7 +108,7 @@ export type CollectionGuarantee = { /** * - The signer ids of the block */ - signerIds: Array + signerIds: Array } export type BlockSeal = { /** diff --git a/packages/typedefs/src/sdk-transport/subscriptions.ts b/packages/typedefs/src/sdk-transport/subscriptions.ts index 522073f4d..6b5ef8d02 100644 --- a/packages/typedefs/src/sdk-transport/subscriptions.ts +++ b/packages/typedefs/src/sdk-transport/subscriptions.ts @@ -1,9 +1,10 @@ +import {Block} from ".." + type SchemaItem = { args: TArgs data: TData } -// TODO: PLACEHOLDER - Replace with actual subscription topics export enum SubscriptionTopic { BLOCKS = "blocks", } @@ -11,10 +12,17 @@ export enum SubscriptionTopic { export type SubscriptionSchema = { [SubscriptionTopic.BLOCKS]: SchemaItem< { - start: number - }, + block_status?: number + } & ( + | { + start_block_id?: string + } + | { + start_block_height?: number + } + ), { - placeholder: string + block: Block } > } From aaf89ca3841038e3bcb1353e2e1846c68de6ca7f Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Thu, 28 Nov 2024 22:31:34 -0800 Subject: [PATCH 10/25] update arg format --- .../src/subscribe/handlers/blocks.ts | 40 +++++++++++++++++-- .../src/subscribe/handlers/types.ts | 17 +++++--- .../subscribe/subscription-manager.test.ts | 3 +- .../src/subscribe/subscription-manager.ts | 14 ++++--- .../src/sdk-transport/subscriptions.ts | 18 ++++----- 5 files changed, 67 insertions(+), 25 deletions(-) diff --git a/packages/transport-http/src/subscribe/handlers/blocks.ts b/packages/transport-http/src/subscribe/handlers/blocks.ts index eba1f9c52..41595263b 100644 --- a/packages/transport-http/src/subscribe/handlers/blocks.ts +++ b/packages/transport-http/src/subscribe/handlers/blocks.ts @@ -18,11 +18,22 @@ type BlockDataModel = { } } +type BlockArgsModel = + | { + block_status?: number + start_block_id?: string + } + | { + block_status?: number + start_block_height?: number + } + export const blocksHandler = createSubscriptionHandler<{ Topic: SdkTransport.SubscriptionTopic.BLOCKS Args: SdkTransport.SubscriptionArguments Data: SdkTransport.SubscriptionData - RawData: BlockDataModel + ArgsModel: BlockArgsModel + DataModel: BlockDataModel }>({ topic: SdkTransport.SubscriptionTopic.BLOCKS, createSubscriber: (initialArgs, onData, onError) => { @@ -56,8 +67,8 @@ export const blocksHandler = createSubscriptionHandler<{ // Update the resume args resumeArgs = { - block_status: resumeArgs.block_status, - start_block_id: data.block.id, + blockStatus: resumeArgs.blockStatus, + startBlockHeight: data.block.height + 1, } onData(parsedData) @@ -65,6 +76,29 @@ export const blocksHandler = createSubscriptionHandler<{ sendError(error: Error) { onError(error) }, + encodeArgs( + args: SdkTransport.SubscriptionArguments + ) { + let encodedArgs: BlockArgsModel = { + block_status: args.blockStatus, + } + + if ("startBlockHeight" in args) { + return { + ...encodedArgs, + start_block_height: args.startBlockHeight, + } + } + + if ("startBlockId" in args) { + return { + ...encodedArgs, + start_block_id: args.startBlockId, + } + } + + return encodedArgs + }, get connectionArgs() { return resumeArgs }, diff --git a/packages/transport-http/src/subscribe/handlers/types.ts b/packages/transport-http/src/subscribe/handlers/types.ts index 39e445854..1ab7048d5 100644 --- a/packages/transport-http/src/subscribe/handlers/types.ts +++ b/packages/transport-http/src/subscribe/handlers/types.ts @@ -8,7 +8,8 @@ export interface SubscriptionHandler< Topic: string Args: any Data: any - RawData: any + DataModel: any + ArgsModel: any }, > { readonly topic: T["Topic"] @@ -16,20 +17,25 @@ export interface SubscriptionHandler< initialArgs: T["Args"], onData: (data: T["Data"]) => void, onError: (error: Error) => void - ): DataSubscriber + ): DataSubscriber } -export interface DataSubscriber { +export interface DataSubscriber { /** * The callback to call when a data is received */ - sendData(data: RawData): void + sendData(data: Data): void /** * The callback to call when an error is received */ sendError(error: Error): void + /** + * The arguments to connect or reconnect to the subscription + */ + encodeArgs(args: Args): ArgsModel + /** * Get the arguments to connect or reconnect to the subscription */ @@ -41,7 +47,8 @@ export function createSubscriptionHandler< Topic: string Args: any Data: any - RawData: any + DataModel: any + ArgsModel: any }, >(handler: SubscriptionHandler): SubscriptionHandler { return handler diff --git a/packages/transport-http/src/subscribe/subscription-manager.test.ts b/packages/transport-http/src/subscribe/subscription-manager.test.ts index 6a706d78a..0973f4048 100644 --- a/packages/transport-http/src/subscribe/subscription-manager.test.ts +++ b/packages/transport-http/src/subscribe/subscription-manager.test.ts @@ -20,7 +20,7 @@ jest.mock("./websocket", () => ({ describe("WebSocket Manager", () => { let mockWs: WS - let mockSubscriber: jest.Mocked> + let mockSubscriber: jest.Mocked> let mockHandler: jest.Mocked> const mockConnectionArgs = {mock: "connection args"} @@ -31,6 +31,7 @@ describe("WebSocket Manager", () => { mockSubscriber = { sendData: jest.fn(), sendError: jest.fn(), + encodeArgs: jest.fn().mockReturnValue(mockConnectionArgs), get connectionArgs() { return mockConnectionArgs }, diff --git a/packages/transport-http/src/subscribe/subscription-manager.ts b/packages/transport-http/src/subscribe/subscription-manager.ts index f5f988df0..dc36da429 100644 --- a/packages/transport-http/src/subscribe/subscription-manager.ts +++ b/packages/transport-http/src/subscribe/subscription-manager.ts @@ -22,7 +22,7 @@ type DeepRequired = Required<{ type InferHandler = T extends SubscriptionHandler ? H : never -interface SubscriptionInfo> { +interface SubscriptionInfo> { // Internal ID for the subscription id: number // Remote ID assigned by the server used for message routing and unsubscribing @@ -30,7 +30,7 @@ interface SubscriptionInfo> { // The topic of the subscription topic: string // Data provider for the subscription - subscriber: DataSubscriber + subscriber: DataSubscriber } export interface SubscriptionManagerConfig { @@ -65,7 +65,7 @@ export class SubscriptionManager< > { private counter = 0 private socket: WebSocket | null = null - private subscriptions: SubscriptionInfo>[] = [] + private subscriptions: SubscriptionInfo>[] = [] private config: DeepRequired private reconnectAttempts = 0 private handlers: Record> @@ -205,7 +205,7 @@ export class SubscriptionManager< ) // Track the subscription locally - const sub: SubscriptionInfo> = { + const sub: SubscriptionInfo> = { id: this.counter++, topic: opts.topic, subscriber: subscriber, @@ -250,7 +250,9 @@ export class SubscriptionManager< } } - private async sendSubscribe(sub: SubscriptionInfo>) { + private async sendSubscribe( + sub: SubscriptionInfo> + ) { // Send the subscription message const request: SubscribeMessageRequest = { action: Action.SUBSCRIBE, @@ -271,7 +273,7 @@ export class SubscriptionManager< } private async sendUnsubscribe( - sub: SubscriptionInfo> + sub: SubscriptionInfo> ) { // Send the unsubscribe message if the subscription has a remote id const {remoteId} = sub diff --git a/packages/typedefs/src/sdk-transport/subscriptions.ts b/packages/typedefs/src/sdk-transport/subscriptions.ts index 6b5ef8d02..d80c6a367 100644 --- a/packages/typedefs/src/sdk-transport/subscriptions.ts +++ b/packages/typedefs/src/sdk-transport/subscriptions.ts @@ -11,16 +11,14 @@ export enum SubscriptionTopic { export type SubscriptionSchema = { [SubscriptionTopic.BLOCKS]: SchemaItem< - { - block_status?: number - } & ( - | { - start_block_id?: string - } - | { - start_block_height?: number - } - ), + | { + blockStatus?: number + startBlockId?: string + } + | { + blockStatus?: number + startBlockHeight?: number + }, { block: Block } From b150d6b3cc6272c4ea71e3775b711c2b5958b86a Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 2 Dec 2024 13:05:03 -0800 Subject: [PATCH 11/25] change naming --- .../src/subscribe/handlers/blocks.ts | 70 ++++++++++--------- .../src/subscribe/handlers/types.ts | 10 +-- 2 files changed, 41 insertions(+), 39 deletions(-) diff --git a/packages/transport-http/src/subscribe/handlers/blocks.ts b/packages/transport-http/src/subscribe/handlers/blocks.ts index 41595263b..7d36255bc 100644 --- a/packages/transport-http/src/subscribe/handlers/blocks.ts +++ b/packages/transport-http/src/subscribe/handlers/blocks.ts @@ -1,7 +1,13 @@ import {SdkTransport} from "@onflow/typedefs" import {createSubscriptionHandler} from "./types" -type BlockDataModel = { +type BlocksArgs = + SdkTransport.SubscriptionArguments + +type BlocksData = + SdkTransport.SubscriptionData + +type BlocksDataDto = { block: { id: string parent_id: string @@ -18,7 +24,7 @@ type BlockDataModel = { } } -type BlockArgsModel = +type BlocksArgsDto = | { block_status?: number start_block_id?: string @@ -30,40 +36,38 @@ type BlockArgsModel = export const blocksHandler = createSubscriptionHandler<{ Topic: SdkTransport.SubscriptionTopic.BLOCKS - Args: SdkTransport.SubscriptionArguments - Data: SdkTransport.SubscriptionData - ArgsModel: BlockArgsModel - DataModel: BlockDataModel + Args: BlocksArgs + Data: BlocksData + ArgsDto: BlocksArgsDto + DataDto: BlocksDataDto }>({ topic: SdkTransport.SubscriptionTopic.BLOCKS, createSubscriber: (initialArgs, onData, onError) => { - let resumeArgs: SdkTransport.SubscriptionArguments = - { - ...initialArgs, - } + let resumeArgs: BlocksArgs = { + ...initialArgs, + } return { - sendData(data: BlockDataModel) { + sendData(data: BlocksDataDto) { // Parse the raw data - const parsedData: SdkTransport.SubscriptionData = - { - block: { - id: data.block.id, - parentId: data.block.parent_id, - height: data.block.height, - timestamp: data.block.timestamp, - collectionGuarantees: data.block.collection_guarantees.map( - guarantee => ({ - collectionId: guarantee.collection_id, - signerIds: guarantee.signer_ids, - }) - ), - blockSeals: data.block.block_seals.map(seal => ({ - blockId: seal.block_id, - executionReceiptId: seal.result_id, - })), - }, - } + const parsedData: BlocksData = { + block: { + id: data.block.id, + parentId: data.block.parent_id, + height: data.block.height, + timestamp: data.block.timestamp, + collectionGuarantees: data.block.collection_guarantees.map( + guarantee => ({ + collectionId: guarantee.collection_id, + signerIds: guarantee.signer_ids, + }) + ), + blockSeals: data.block.block_seals.map(seal => ({ + blockId: seal.block_id, + executionReceiptId: seal.result_id, + })), + }, + } // Update the resume args resumeArgs = { @@ -76,10 +80,8 @@ export const blocksHandler = createSubscriptionHandler<{ sendError(error: Error) { onError(error) }, - encodeArgs( - args: SdkTransport.SubscriptionArguments - ) { - let encodedArgs: BlockArgsModel = { + encodeArgs(args: BlocksArgs) { + let encodedArgs: BlocksArgsDto = { block_status: args.blockStatus, } diff --git a/packages/transport-http/src/subscribe/handlers/types.ts b/packages/transport-http/src/subscribe/handlers/types.ts index 1ab7048d5..df4a3ae9c 100644 --- a/packages/transport-http/src/subscribe/handlers/types.ts +++ b/packages/transport-http/src/subscribe/handlers/types.ts @@ -8,8 +8,8 @@ export interface SubscriptionHandler< Topic: string Args: any Data: any - DataModel: any - ArgsModel: any + ArgsDto: any + DataDto: any }, > { readonly topic: T["Topic"] @@ -17,7 +17,7 @@ export interface SubscriptionHandler< initialArgs: T["Args"], onData: (data: T["Data"]) => void, onError: (error: Error) => void - ): DataSubscriber + ): DataSubscriber } export interface DataSubscriber { @@ -47,8 +47,8 @@ export function createSubscriptionHandler< Topic: string Args: any Data: any - DataModel: any - ArgsModel: any + ArgsDto: any + DataDto: any }, >(handler: SubscriptionHandler): SubscriptionHandler { return handler From 44470d60410d299740b8d28738d1d8816b5546ef Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 2 Dec 2024 13:09:36 -0800 Subject: [PATCH 12/25] Update handlers --- .../src/subscribe/handlers/types.ts | 55 +++++++++++ .../transport-http/src/subscribe/subscribe.ts | 18 ++-- .../subscribe/subscription-manager.test.ts | 69 ++++++++----- .../src/subscribe/subscription-manager.ts | 98 ++++++++++++------- 4 files changed, 174 insertions(+), 66 deletions(-) create mode 100644 packages/transport-http/src/subscribe/handlers/types.ts diff --git a/packages/transport-http/src/subscribe/handlers/types.ts b/packages/transport-http/src/subscribe/handlers/types.ts new file mode 100644 index 000000000..df4a3ae9c --- /dev/null +++ b/packages/transport-http/src/subscribe/handlers/types.ts @@ -0,0 +1,55 @@ +export interface DataConsumer { + onData(data: Data): void + onError(error: Error): void +} + +export interface SubscriptionHandler< + T extends { + Topic: string + Args: any + Data: any + ArgsDto: any + DataDto: any + }, +> { + readonly topic: T["Topic"] + createSubscriber( + initialArgs: T["Args"], + onData: (data: T["Data"]) => void, + onError: (error: Error) => void + ): DataSubscriber +} + +export interface DataSubscriber { + /** + * The callback to call when a data is received + */ + sendData(data: Data): void + + /** + * The callback to call when an error is received + */ + sendError(error: Error): void + + /** + * The arguments to connect or reconnect to the subscription + */ + encodeArgs(args: Args): ArgsModel + + /** + * Get the arguments to connect or reconnect to the subscription + */ + get connectionArgs(): Args +} + +export function createSubscriptionHandler< + T extends { + Topic: string + Args: any + Data: any + ArgsDto: any + DataDto: any + }, +>(handler: SubscriptionHandler): SubscriptionHandler { + return handler +} diff --git a/packages/transport-http/src/subscribe/subscribe.ts b/packages/transport-http/src/subscribe/subscribe.ts index 50e3ae3d4..9f47a0f5d 100644 --- a/packages/transport-http/src/subscribe/subscribe.ts +++ b/packages/transport-http/src/subscribe/subscribe.ts @@ -1,8 +1,13 @@ import {SdkTransport} from "@onflow/typedefs" import {SubscriptionManager} from "./subscription-manager" +const SUBSCRIPTION_HANDLERS: any[] = [] + // Map of SubscriptionManager instances by access node URL -let subscriptionManagerMap: Map = new Map() +let subscriptionManagerMap: Map< + string, + SubscriptionManager +> = new Map() export async function subscribe( { @@ -18,16 +23,17 @@ export async function subscribe( }, opts: {node: string} ): Promise { + // Get the SubscriptionManager instance for the access node, or create a new one + const node = opts.node const manager = - subscriptionManagerMap.get(opts.node) || - new SubscriptionManager({ - node: opts.node, - }) - subscriptionManagerMap.set(opts.node, manager) + subscriptionManagerMap.get(node) || + new SubscriptionManager(SUBSCRIPTION_HANDLERS, {node}) + subscriptionManagerMap.set(node, manager) return manager.subscribe({ topic, args, + // @ts-ignore - TODO: This is temporary until we start implementing the handlers onData, onError, }) diff --git a/packages/transport-http/src/subscribe/subscription-manager.test.ts b/packages/transport-http/src/subscribe/subscription-manager.test.ts index 48d2b81cc..0973f4048 100644 --- a/packages/transport-http/src/subscribe/subscription-manager.test.ts +++ b/packages/transport-http/src/subscribe/subscription-manager.test.ts @@ -12,15 +12,34 @@ import { SubscriptionManagerConfig, } from "./subscription-manager" import {SdkTransport} from "@onflow/typedefs" +import {DataSubscriber, SubscriptionHandler} from "./handlers/types" jest.mock("./websocket", () => ({ WebSocket: mockSocket, })) -describe("WsSubscriptionTransport", () => { +describe("WebSocket Manager", () => { let mockWs: WS + let mockSubscriber: jest.Mocked> + let mockHandler: jest.Mocked> + const mockConnectionArgs = {mock: "connection args"} + beforeEach(() => { + jest.resetAllMocks() + mockWs = new WS("wss://localhost:8080") + mockSubscriber = { + sendData: jest.fn(), + sendError: jest.fn(), + encodeArgs: jest.fn().mockReturnValue(mockConnectionArgs), + get connectionArgs() { + return mockConnectionArgs + }, + } + mockHandler = { + topic: "topic", + createSubscriber: jest.fn().mockReturnValue(mockSubscriber), + } }) afterEach(() => { @@ -28,11 +47,7 @@ describe("WsSubscriptionTransport", () => { }) test("does not connect to the socket when no subscriptions are made", async () => { - const config: SubscriptionManagerConfig = { - node: "wss://localhost:8080", - } - - new SubscriptionManager(config) + new SubscriptionManager([mockHandler], {node: "wss://localhost:8080"}) await new Promise(resolve => setTimeout(resolve, 0)) expect(mockWs.server.clients).toHaveLength(0) @@ -42,7 +57,7 @@ describe("WsSubscriptionTransport", () => { const config: SubscriptionManagerConfig = { node: "wss://localhost:8080", } - const streamController = new SubscriptionManager(config) + const subscriptionManager = new SubscriptionManager([mockHandler], config) const topic = "topic" as SdkTransport.SubscriptionTopic const args = {key: "value"} as any const onData = jest.fn() @@ -56,7 +71,7 @@ describe("WsSubscriptionTransport", () => { expect(data).toEqual({ action: "subscribe", topic, - arguments: args, + arguments: mockConnectionArgs, }) const response: SubscribeMessageResponse = { @@ -69,7 +84,7 @@ describe("WsSubscriptionTransport", () => { })() const [subscription] = await Promise.all([ - streamController.subscribe({ + subscriptionManager.subscribe({ topic, args, onData, @@ -92,7 +107,7 @@ describe("WsSubscriptionTransport", () => { const config: SubscriptionManagerConfig = { node: "wss://localhost:8080", } - const streamController = new SubscriptionManager(config) + const subscriptionManager = new SubscriptionManager([mockHandler], config) const topic = "topic" as SdkTransport.SubscriptionTopic const args = {key: "value"} as any const onData = jest.fn() @@ -106,7 +121,7 @@ describe("WsSubscriptionTransport", () => { expect(data).toEqual({ action: "subscribe", topic, - arguments: args, + arguments: mockConnectionArgs, }) const response: SubscribeMessageResponse = { @@ -119,7 +134,7 @@ describe("WsSubscriptionTransport", () => { })() const [subscription] = await Promise.all([ - streamController.subscribe({ + subscriptionManager.subscribe({ topic, args, onData, @@ -141,9 +156,9 @@ describe("WsSubscriptionTransport", () => { await serverPromise - expect(onData).toHaveBeenCalledTimes(1) - expect(onData).toHaveBeenCalledWith({key: "value"}) - expect(onError).toHaveBeenCalledTimes(0) + expect(mockSubscriber.sendData).toHaveBeenCalledTimes(1) + expect(mockSubscriber.sendData).toHaveBeenCalledWith({key: "value"}) + expect(mockSubscriber.sendError).toHaveBeenCalledTimes(0) serverPromise = (async () => { const msg = (await mockWs.nextMessage) as string @@ -162,7 +177,7 @@ describe("WsSubscriptionTransport", () => { const config: SubscriptionManagerConfig = { node: "wss://localhost:8080", } - const streamController = new SubscriptionManager(config) + const subscriptionManager = new SubscriptionManager([mockHandler], config) const topic = "topic" as SdkTransport.SubscriptionTopic const args = {key: "value"} as any const onData = jest.fn() @@ -176,7 +191,7 @@ describe("WsSubscriptionTransport", () => { expect(data).toEqual({ action: "subscribe", topic, - arguments: args, + arguments: mockConnectionArgs, }) const response: SubscribeMessageResponse = { @@ -189,7 +204,7 @@ describe("WsSubscriptionTransport", () => { })() const [subscription] = await Promise.all([ - streamController.subscribe({ + subscriptionManager.subscribe({ topic, args, onData, @@ -200,6 +215,12 @@ describe("WsSubscriptionTransport", () => { expect(subscription).toBeDefined() expect(subscription.unsubscribe).toBeInstanceOf(Function) + expect(mockHandler.createSubscriber).toHaveBeenCalledTimes(1) + expect(mockHandler.createSubscriber).toHaveBeenCalledWith( + args, + onData, + onError + ) serverPromise = (async () => { const data = { @@ -211,9 +232,9 @@ describe("WsSubscriptionTransport", () => { await serverPromise - expect(onData).toHaveBeenCalledTimes(1) - expect(onData).toHaveBeenCalledWith({key: "value"}) - expect(onError).toHaveBeenCalledTimes(0) + expect(mockSubscriber.sendData).toHaveBeenCalledTimes(1) + expect(mockSubscriber.sendData).toHaveBeenCalledWith({key: "value"}) + expect(mockSubscriber.sendError).toHaveBeenCalledTimes(0) // Close the connection and create a new one mockWs.close() @@ -227,7 +248,7 @@ describe("WsSubscriptionTransport", () => { expect(data).toEqual({ action: "subscribe", topic, - arguments: args, + arguments: mockConnectionArgs, }) const response: SubscribeMessageResponse = { @@ -254,7 +275,7 @@ describe("WsSubscriptionTransport", () => { await serverPromise - expect(onData).toHaveBeenCalledTimes(2) - expect(onData.mock.calls[1]).toEqual([{key: "value2"}]) + expect(mockSubscriber.sendData).toHaveBeenCalledTimes(2) + expect(mockSubscriber.sendData.mock.calls[1]).toEqual([{key: "value2"}]) }) }) diff --git a/packages/transport-http/src/subscribe/subscription-manager.ts b/packages/transport-http/src/subscribe/subscription-manager.ts index b7ea02436..dc36da429 100644 --- a/packages/transport-http/src/subscribe/subscription-manager.ts +++ b/packages/transport-http/src/subscribe/subscription-manager.ts @@ -9,8 +9,9 @@ import { SubscribeMessageResponse, UnsubscribeMessageRequest, } from "./models" -import type {SdkTransport} from "@onflow/typedefs" +import {SdkTransport} from "@onflow/typedefs" import {WebSocket} from "./websocket" +import {DataSubscriber, SubscriptionHandler} from "./handlers/types" import * as logger from "@onflow/util-logger" const WS_OPEN = 1 @@ -19,19 +20,17 @@ type DeepRequired = Required<{ [K in keyof T]: DeepRequired }> -interface SubscriptionInfo { +type InferHandler = T extends SubscriptionHandler ? H : never + +interface SubscriptionInfo> { // Internal ID for the subscription id: number // Remote ID assigned by the server used for message routing and unsubscribing remoteId?: string // The topic of the subscription - topic: T - // The checkpoint to resume the subscription from - checkpoint: SdkTransport.SubscriptionArguments - // The callback to call when a data is received - onData: (data: any) => void - // The callback to call when an error occurs - onError: (error: Error) => void + topic: string + // Data provider for the subscription + subscriber: DataSubscriber } export interface SubscriptionManagerConfig { @@ -61,14 +60,17 @@ export interface SubscriptionManagerConfig { } } -export class SubscriptionManager { +export class SubscriptionManager< + Handlers extends [...SubscriptionHandler[]], +> { private counter = 0 - private subscriptions: SubscriptionInfo[] = [] private socket: WebSocket | null = null + private subscriptions: SubscriptionInfo>[] = [] private config: DeepRequired private reconnectAttempts = 0 + private handlers: Record> - constructor(config: SubscriptionManagerConfig) { + constructor(handlers: Handlers, config: SubscriptionManagerConfig) { this.config = { ...config, reconnectOptions: { @@ -78,6 +80,15 @@ export class SubscriptionManager { ...config.reconnectOptions, }, } + + // Map data providers by topic + this.handlers = handlers.reduce( + (acc, handler) => { + acc[handler.topic] = handler + return acc + }, + {} as Record> + ) } // Lazy connect to the socket when the first subscription is made @@ -90,21 +101,15 @@ export class SubscriptionManager { this.socket = new WebSocket(this.config.node) this.socket.onmessage = event => { - const data = JSON.parse(event.data) as + const message = JSON.parse(event.data) as | MessageResponse | SubscriptionDataMessage - if ("action" in data) { + if ("action" in message) { // TODO, waiting for AN team to decide what to do here } else { - const sub = this.subscriptions.find(sub => sub.remoteId === data.id) - if (!sub) return - // Update the block height to checkpoint for disconnects - this.updateSubscriptionCheckpoint(sub, data) - - // Call the subscription callback - sub.onData(data.data) + this.handleSubscriptionData(message) } } this.socket.onclose = () => { @@ -157,7 +162,7 @@ export class SubscriptionManager { }) this.subscriptions.forEach(sub => { - sub.onError( + sub.subscriber.sendError( new Error( `Failed to reconnect to the server after ${this.reconnectAttempts + 1} attempts: ${error}` ) @@ -182,22 +187,28 @@ export class SubscriptionManager { } } - async subscribe(opts: { - topic: T - args: SdkTransport.SubscriptionArguments - onData: (data: SdkTransport.SubscriptionData) => void + async subscribe(opts: { + topic: InferHandler["Topic"] + args: InferHandler["Args"] + onData: (data: InferHandler["Data"]) => void onError: (error: Error) => void }): Promise { // Connect the socket if it's not already open await this.connect() + // Get the data provider for the topic + const topicHandler = this.getHandler(opts.topic) + const subscriber = topicHandler.createSubscriber( + opts.args, + opts.onData, + opts.onError + ) + // Track the subscription locally - const sub: SubscriptionInfo = { + const sub: SubscriptionInfo> = { id: this.counter++, topic: opts.topic, - checkpoint: opts.args, - onData: opts.onData, - onError: opts.onError, + subscriber: subscriber, } this.subscriptions.push(sub) @@ -240,13 +251,13 @@ export class SubscriptionManager { } private async sendSubscribe( - sub: SubscriptionInfo + sub: SubscriptionInfo> ) { // Send the subscription message const request: SubscribeMessageRequest = { action: Action.SUBSCRIBE, topic: sub.topic, - arguments: sub.checkpoint, + arguments: sub.subscriber.connectionArgs, } this.socket?.send(JSON.stringify(request)) @@ -262,7 +273,7 @@ export class SubscriptionManager { } private async sendUnsubscribe( - sub: SubscriptionInfo + sub: SubscriptionInfo> ) { // Send the unsubscribe message if the subscription has a remote id const {remoteId} = sub @@ -297,10 +308,25 @@ export class SubscriptionManager { // Update the subscription checkpoint when a message is received // These checkpoints are used to resume subscriptions after disconnects - private updateSubscriptionCheckpoint< + private handleSubscriptionData< T extends SdkTransport.SubscriptionTopic = SdkTransport.SubscriptionTopic, - >(sub: SubscriptionInfo, message: SubscriptionDataMessage) { - // TODO: Will be implemented with each subscription topic + >(message: SubscriptionDataMessage) { + // Get the subscription + const sub = this.subscriptions.find(sub => sub.remoteId === message.id) + if (!sub) { + throw new Error(`No subscription found for id ${message.id}`) + } + + // Send data to the subscriber + sub.subscriber.sendData(message.data) + } + + private getHandler(topic: string) { + const handler = this.handlers[topic] + if (!handler) { + throw new Error(`No handler found for topic ${topic}`) + } + return handler } /** From 7bacf85c8c00ecd021450741d1acb1d33972459f Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 2 Dec 2024 13:10:57 -0800 Subject: [PATCH 13/25] update types --- packages/transport-http/src/subscribe/handlers/types.ts | 5 ----- 1 file changed, 5 deletions(-) diff --git a/packages/transport-http/src/subscribe/handlers/types.ts b/packages/transport-http/src/subscribe/handlers/types.ts index df4a3ae9c..83e1ded6b 100644 --- a/packages/transport-http/src/subscribe/handlers/types.ts +++ b/packages/transport-http/src/subscribe/handlers/types.ts @@ -1,8 +1,3 @@ -export interface DataConsumer { - onData(data: Data): void - onError(error: Error): void -} - export interface SubscriptionHandler< T extends { Topic: string From 02da135d5a58d1212ec6a40b87d1af782c294779 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 2 Dec 2024 13:11:56 -0800 Subject: [PATCH 14/25] fix err --- packages/sdk/src/transport/subscribe/subscribe.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sdk/src/transport/subscribe/subscribe.ts b/packages/sdk/src/transport/subscribe/subscribe.ts index ff8a0b4b5..10885994b 100644 --- a/packages/sdk/src/transport/subscribe/subscribe.ts +++ b/packages/sdk/src/transport/subscribe/subscribe.ts @@ -27,7 +27,7 @@ export async function subscribe( decodeResponse(data) .then(onData) .catch(e => { - onError(new Error(`Failed to subscription data: ${e}`)) + onError(new Error(`Failed to decode response: ${e.message}`)) sub.unsubscribe() }) }, From 4749c8dcdeb767bcde0feafb9f54d26ea4455bb9 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 2 Dec 2024 13:12:34 -0800 Subject: [PATCH 15/25] export raw sub --- packages/sdk/src/sdk.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sdk/src/sdk.ts b/packages/sdk/src/sdk.ts index 0a8bc9eeb..de82220d1 100644 --- a/packages/sdk/src/sdk.ts +++ b/packages/sdk/src/sdk.ts @@ -114,4 +114,4 @@ export {TestUtils} export {VERSION} from "./VERSION" -export {subscribe} from "./transport" +export {subscribe, rawSubscribe} from "./transport" From 6456d00a92985f5883d43505da9f71e074bc2672 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 2 Dec 2024 13:15:33 -0800 Subject: [PATCH 16/25] fix test --- .../transport-http/src/subscribe/subscription-manager.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/transport-http/src/subscribe/subscription-manager.test.ts b/packages/transport-http/src/subscribe/subscription-manager.test.ts index 0973f4048..77f705bf4 100644 --- a/packages/transport-http/src/subscribe/subscription-manager.test.ts +++ b/packages/transport-http/src/subscribe/subscription-manager.test.ts @@ -18,7 +18,7 @@ jest.mock("./websocket", () => ({ WebSocket: mockSocket, })) -describe("WebSocket Manager", () => { +describe("SubscriptionManager", () => { let mockWs: WS let mockSubscriber: jest.Mocked> let mockHandler: jest.Mocked> From fb645d871dca9deb06eb4745b0292d566a99dabb Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 2 Dec 2024 14:09:24 -0800 Subject: [PATCH 17/25] address feedback --- .../sdk/src/transport/subscribe/raw-subscribe.ts | 15 ++++++--------- packages/sdk/src/transport/subscribe/subscribe.ts | 6 ++++++ 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/packages/sdk/src/transport/subscribe/raw-subscribe.ts b/packages/sdk/src/transport/subscribe/raw-subscribe.ts index 3bd1d5633..10ecb6559 100644 --- a/packages/sdk/src/transport/subscribe/raw-subscribe.ts +++ b/packages/sdk/src/transport/subscribe/raw-subscribe.ts @@ -3,7 +3,12 @@ import {SdkTransport} from "@onflow/typedefs" import {getTransport} from "../get-transport" import {invariant} from "@onflow/util-invariant" -// TODO: OPTS FUNCTION +/** + * Subscribe to a topic without decoding the data. + * @param params - The parameters for the subscription. + * @param opts - The options for the subscription. + * @returns A promise that resolves when the subscription is complete. + */ export async function rawSubscribe( { topic, @@ -29,7 +34,6 @@ export async function rawSubscribe( `SDK Send Error: Either opts.node or "accessNode.api" in config must be defined.` ) - // TODO: handle onError // Subscribe using the resolved transport return transport.subscribe( { @@ -44,10 +48,3 @@ export async function rawSubscribe( } ) } - -export function decode( - topic: T, - data: SdkTransport.SubscriptionData -): any { - return data -} diff --git a/packages/sdk/src/transport/subscribe/subscribe.ts b/packages/sdk/src/transport/subscribe/subscribe.ts index 10885994b..edf534301 100644 --- a/packages/sdk/src/transport/subscribe/subscribe.ts +++ b/packages/sdk/src/transport/subscribe/subscribe.ts @@ -2,6 +2,12 @@ import {SdkTransport} from "@onflow/typedefs" import {rawSubscribe} from "./raw-subscribe" import {decodeResponse} from "../../decode/decode" +/** + * Subscribe to a topic and decode the data. + * @param params - The parameters for the subscription. + * @param opts - The options for the subscription. + * @returns A promise that resolves when the subscription is complete. + */ export async function subscribe( { topic, From c884dabee40fcbfadfebafa0919b82eab12f98c5 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 2 Dec 2024 14:11:22 -0800 Subject: [PATCH 18/25] address feedback --- packages/sdk/src/transport/get-transport.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/packages/sdk/src/transport/get-transport.ts b/packages/sdk/src/transport/get-transport.ts index fd1de2f98..2843b5acf 100644 --- a/packages/sdk/src/transport/get-transport.ts +++ b/packages/sdk/src/transport/get-transport.ts @@ -44,5 +44,10 @@ export async function getTransport( function isTransportObject( transport: any ): transport is SdkTransport.Transport { - return transport.send !== undefined && transport.subscribe !== undefined + return ( + transport.send !== undefined && + transport.subscribe !== undefined && + typeof transport.send === "function" && + typeof transport.subscribe === "function" + ) } From 5aaa90da9421ff1c2896c4dacbd460383943f114 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 2 Dec 2024 14:30:19 -0800 Subject: [PATCH 19/25] fixup function signatures --- .../src/transport/subscribe/raw-subscribe.ts | 17 ++++------------- .../sdk/src/transport/subscribe/subscribe.ts | 17 ++++------------- packages/sdk/src/transport/subscribe/types.ts | 8 ++++++++ 3 files changed, 16 insertions(+), 26 deletions(-) create mode 100644 packages/sdk/src/transport/subscribe/types.ts diff --git a/packages/sdk/src/transport/subscribe/raw-subscribe.ts b/packages/sdk/src/transport/subscribe/raw-subscribe.ts index 10ecb6559..6956fc649 100644 --- a/packages/sdk/src/transport/subscribe/raw-subscribe.ts +++ b/packages/sdk/src/transport/subscribe/raw-subscribe.ts @@ -2,25 +2,16 @@ import {config} from "@onflow/config" import {SdkTransport} from "@onflow/typedefs" import {getTransport} from "../get-transport" import {invariant} from "@onflow/util-invariant" +import {SubscribeParams} from "./types" /** * Subscribe to a topic without decoding the data. * @param params - The parameters for the subscription. - * @param opts - The options for the subscription. - * @returns A promise that resolves when the subscription is complete. + * @param opts - Additional options for the subscription. + * @returns A promise that resolves once the subscription is active. */ export async function rawSubscribe( - { - topic, - args, - onData, - onError, - }: { - topic: T - args: SdkTransport.SubscriptionArguments - onData: (data: SdkTransport.SubscriptionData) => void - onError: (error: Error) => void - }, + {topic, args, onData, onError}: SubscribeParams, opts: { node?: string transport?: SdkTransport.Transport diff --git a/packages/sdk/src/transport/subscribe/subscribe.ts b/packages/sdk/src/transport/subscribe/subscribe.ts index edf534301..07112374d 100644 --- a/packages/sdk/src/transport/subscribe/subscribe.ts +++ b/packages/sdk/src/transport/subscribe/subscribe.ts @@ -1,25 +1,16 @@ import {SdkTransport} from "@onflow/typedefs" import {rawSubscribe} from "./raw-subscribe" import {decodeResponse} from "../../decode/decode" +import {SubscribeParams} from "./types" /** * Subscribe to a topic and decode the data. * @param params - The parameters for the subscription. - * @param opts - The options for the subscription. - * @returns A promise that resolves when the subscription is complete. + * @param opts - Additional options for the subscription. + * @returns A promise that resolves when the subscription is active. */ export async function subscribe( - { - topic, - args, - onData, - onError, - }: { - topic: T - args: SdkTransport.SubscriptionArguments - onData: (data: SdkTransport.SubscriptionData) => void - onError: (error: Error) => void - }, + {topic, args, onData, onError}: SubscribeParams, opts: { node?: string transport?: SdkTransport.Transport diff --git a/packages/sdk/src/transport/subscribe/types.ts b/packages/sdk/src/transport/subscribe/types.ts new file mode 100644 index 000000000..0069edf74 --- /dev/null +++ b/packages/sdk/src/transport/subscribe/types.ts @@ -0,0 +1,8 @@ +import {SdkTransport} from "@onflow/typedefs" + +export type SubscribeParams = { + topic: T + args: SdkTransport.SubscriptionArguments + onData: (data: SdkTransport.SubscriptionData) => void + onError: (error: Error) => void +} From 0f6a0187addfd4c90906ee8e3f7388f2004aa80d Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 2 Dec 2024 14:33:06 -0800 Subject: [PATCH 20/25] remove comment --- packages/sdk/src/transport/get-transport.test.ts | 12 ++---------- packages/sdk/src/transport/get-transport.ts | 13 +++++++------ 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/packages/sdk/src/transport/get-transport.test.ts b/packages/sdk/src/transport/get-transport.test.ts index 4470093b2..9e78b8d30 100644 --- a/packages/sdk/src/transport/get-transport.test.ts +++ b/packages/sdk/src/transport/get-transport.test.ts @@ -107,15 +107,7 @@ describe("getTransport", () => { }) }) - /** - * TODO: (jribbink) Figure out what to do with this logic. - * - * Currently, and previously, this logic is the reverse where the global config has priority over the custom transport. - * I disagree with this logic and believe that the custom transport should have priority over the global config. - * However, it would be a breaking change to change this logic. - * - */ - /*test("custom transport has priority over global config", async () => { + test("custom transport has priority over global config", async () => { const customTransport = { send: jest.fn(), subscribe: jest.fn(), @@ -131,5 +123,5 @@ describe("getTransport", () => { ) expect(transport).toBe(customTransport) - })*/ + }) }) diff --git a/packages/sdk/src/transport/get-transport.ts b/packages/sdk/src/transport/get-transport.ts index 2843b5acf..635d6e054 100644 --- a/packages/sdk/src/transport/get-transport.ts +++ b/packages/sdk/src/transport/get-transport.ts @@ -19,12 +19,13 @@ export async function getTransport( `SDK Transport Error: Cannot provide both "transport" and legacy "send" options.` ) - const transportOrSend = await config().first< - SdkTransport.Transport | SdkTransport.SendFn - >( - ["sdk.transport", "sdk.send"], - override.transport || override.send || defaultTransport - ) + const transportOrSend = + override.transport || + override.send || + (await config().first( + ["sdk.transport", "sdk.send"], + defaultTransport + )) // Backwards compatibility with legacy send function if (!isTransportObject(transportOrSend)) { From 49cecf0eeec930145b1b9c5600058363320d59c0 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 2 Dec 2024 14:48:48 -0800 Subject: [PATCH 21/25] rename fields --- .../src/subscribe/handlers/types.ts | 8 +++---- .../subscribe/subscription-manager.test.ts | 22 +++++++++---------- .../src/subscribe/subscription-manager.ts | 4 ++-- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/packages/transport-http/src/subscribe/handlers/types.ts b/packages/transport-http/src/subscribe/handlers/types.ts index 83e1ded6b..939b6c59f 100644 --- a/packages/transport-http/src/subscribe/handlers/types.ts +++ b/packages/transport-http/src/subscribe/handlers/types.ts @@ -15,21 +15,21 @@ export interface SubscriptionHandler< ): DataSubscriber } -export interface DataSubscriber { +export interface DataSubscriber { /** * The callback to call when a data is received */ - sendData(data: Data): void + onData(data: DataDto): void /** * The callback to call when an error is received */ - sendError(error: Error): void + onError(error: Error): void /** * The arguments to connect or reconnect to the subscription */ - encodeArgs(args: Args): ArgsModel + argsToDto(args: Args): ArgsDto /** * Get the arguments to connect or reconnect to the subscription diff --git a/packages/transport-http/src/subscribe/subscription-manager.test.ts b/packages/transport-http/src/subscribe/subscription-manager.test.ts index 77f705bf4..0d656ebb9 100644 --- a/packages/transport-http/src/subscribe/subscription-manager.test.ts +++ b/packages/transport-http/src/subscribe/subscription-manager.test.ts @@ -29,9 +29,9 @@ describe("SubscriptionManager", () => { mockWs = new WS("wss://localhost:8080") mockSubscriber = { - sendData: jest.fn(), - sendError: jest.fn(), - encodeArgs: jest.fn().mockReturnValue(mockConnectionArgs), + onData: jest.fn(), + onError: jest.fn(), + argsToDto: jest.fn().mockReturnValue(mockConnectionArgs), get connectionArgs() { return mockConnectionArgs }, @@ -156,9 +156,9 @@ describe("SubscriptionManager", () => { await serverPromise - expect(mockSubscriber.sendData).toHaveBeenCalledTimes(1) - expect(mockSubscriber.sendData).toHaveBeenCalledWith({key: "value"}) - expect(mockSubscriber.sendError).toHaveBeenCalledTimes(0) + expect(mockSubscriber.onData).toHaveBeenCalledTimes(1) + expect(mockSubscriber.onData).toHaveBeenCalledWith({key: "value"}) + expect(mockSubscriber.onError).toHaveBeenCalledTimes(0) serverPromise = (async () => { const msg = (await mockWs.nextMessage) as string @@ -232,9 +232,9 @@ describe("SubscriptionManager", () => { await serverPromise - expect(mockSubscriber.sendData).toHaveBeenCalledTimes(1) - expect(mockSubscriber.sendData).toHaveBeenCalledWith({key: "value"}) - expect(mockSubscriber.sendError).toHaveBeenCalledTimes(0) + expect(mockSubscriber.onData).toHaveBeenCalledTimes(1) + expect(mockSubscriber.onData).toHaveBeenCalledWith({key: "value"}) + expect(mockSubscriber.onError).toHaveBeenCalledTimes(0) // Close the connection and create a new one mockWs.close() @@ -275,7 +275,7 @@ describe("SubscriptionManager", () => { await serverPromise - expect(mockSubscriber.sendData).toHaveBeenCalledTimes(2) - expect(mockSubscriber.sendData.mock.calls[1]).toEqual([{key: "value2"}]) + expect(mockSubscriber.onData).toHaveBeenCalledTimes(2) + expect(mockSubscriber.onData.mock.calls[1]).toEqual([{key: "value2"}]) }) }) diff --git a/packages/transport-http/src/subscribe/subscription-manager.ts b/packages/transport-http/src/subscribe/subscription-manager.ts index dc36da429..2c5000312 100644 --- a/packages/transport-http/src/subscribe/subscription-manager.ts +++ b/packages/transport-http/src/subscribe/subscription-manager.ts @@ -162,7 +162,7 @@ export class SubscriptionManager< }) this.subscriptions.forEach(sub => { - sub.subscriber.sendError( + sub.subscriber.onError( new Error( `Failed to reconnect to the server after ${this.reconnectAttempts + 1} attempts: ${error}` ) @@ -318,7 +318,7 @@ export class SubscriptionManager< } // Send data to the subscriber - sub.subscriber.sendData(message.data) + sub.subscriber.onData(message.data) } private getHandler(topic: string) { From c98d8ed09aeb3a5ebcc192abf5dd0bf72613c5ff Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 2 Dec 2024 15:27:06 -0800 Subject: [PATCH 22/25] fix build --- packages/transport-http/src/subscribe/handlers/blocks.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/transport-http/src/subscribe/handlers/blocks.ts b/packages/transport-http/src/subscribe/handlers/blocks.ts index 7d36255bc..5ab96627c 100644 --- a/packages/transport-http/src/subscribe/handlers/blocks.ts +++ b/packages/transport-http/src/subscribe/handlers/blocks.ts @@ -48,7 +48,7 @@ export const blocksHandler = createSubscriptionHandler<{ } return { - sendData(data: BlocksDataDto) { + onData(data: BlocksDataDto) { // Parse the raw data const parsedData: BlocksData = { block: { @@ -77,10 +77,10 @@ export const blocksHandler = createSubscriptionHandler<{ onData(parsedData) }, - sendError(error: Error) { + onError(error: Error) { onError(error) }, - encodeArgs(args: BlocksArgs) { + argsToDto(args: BlocksArgs) { let encodedArgs: BlocksArgsDto = { block_status: args.blockStatus, } From 5a58e0fa41a8ffa77fe08b5a18e45f2bfd9fb366 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 2 Dec 2024 15:32:08 -0800 Subject: [PATCH 23/25] fix conflict --- packages/transport-http/src/subscribe/subscribe.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/transport-http/src/subscribe/subscribe.ts b/packages/transport-http/src/subscribe/subscribe.ts index 174de0d4a..3daf07823 100644 --- a/packages/transport-http/src/subscribe/subscribe.ts +++ b/packages/transport-http/src/subscribe/subscribe.ts @@ -4,8 +4,6 @@ import {blocksHandler} from "./handlers/blocks" const SUBSCRIPTION_HANDLERS = [blocksHandler] -const SUBSCRIPTION_HANDLERS: any[] = [] - // Map of SubscriptionManager instances by access node URL let subscriptionManagerMap: Map< string, From 2af8a8b62dc067e617117b42a4d21edb3dd13806 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Wed, 4 Dec 2024 16:32:48 -0800 Subject: [PATCH 24/25] Fix type --- packages/transport-http/src/subscribe/subscription-manager.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/transport-http/src/subscribe/subscription-manager.ts b/packages/transport-http/src/subscribe/subscription-manager.ts index 2c5000312..bc119abce 100644 --- a/packages/transport-http/src/subscribe/subscription-manager.ts +++ b/packages/transport-http/src/subscribe/subscription-manager.ts @@ -60,9 +60,7 @@ export interface SubscriptionManagerConfig { } } -export class SubscriptionManager< - Handlers extends [...SubscriptionHandler[]], -> { +export class SubscriptionManager[]> { private counter = 0 private socket: WebSocket | null = null private subscriptions: SubscriptionInfo>[] = [] From fafa634ec174c198b8a4212eb0059906e081b80d Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Wed, 4 Dec 2024 16:36:16 -0800 Subject: [PATCH 25/25] fix types --- packages/sdk/src/sdk.ts | 4 +--- .../src/subscribe/subscription-manager.ts | 14 +++++--------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/packages/sdk/src/sdk.ts b/packages/sdk/src/sdk.ts index de82220d1..3f3b8550b 100644 --- a/packages/sdk/src/sdk.ts +++ b/packages/sdk/src/sdk.ts @@ -2,7 +2,7 @@ import * as logger from "@onflow/util-logger" // Base export {build} from "./build/build.js" export {resolve} from "./resolve/resolve.js" -export {send} from "./transport" +export {send, subscribe, rawSubscribe} from "./transport" export {decode} from "./decode/sdk-decode.js" export { encodeTransactionPayload, @@ -113,5 +113,3 @@ import * as TestUtils from "./test-utils" export {TestUtils} export {VERSION} from "./VERSION" - -export {subscribe, rawSubscribe} from "./transport" diff --git a/packages/transport-http/src/subscribe/subscription-manager.ts b/packages/transport-http/src/subscribe/subscription-manager.ts index bc119abce..cc9c3bf3e 100644 --- a/packages/transport-http/src/subscribe/subscription-manager.ts +++ b/packages/transport-http/src/subscribe/subscription-manager.ts @@ -22,7 +22,7 @@ type DeepRequired = Required<{ type InferHandler = T extends SubscriptionHandler ? H : never -interface SubscriptionInfo> { +interface SubscriptionInfo { // Internal ID for the subscription id: number // Remote ID assigned by the server used for message routing and unsubscribing @@ -63,7 +63,7 @@ export interface SubscriptionManagerConfig { export class SubscriptionManager[]> { private counter = 0 private socket: WebSocket | null = null - private subscriptions: SubscriptionInfo>[] = [] + private subscriptions: SubscriptionInfo[] = [] private config: DeepRequired private reconnectAttempts = 0 private handlers: Record> @@ -203,7 +203,7 @@ export class SubscriptionManager[]> { ) // Track the subscription locally - const sub: SubscriptionInfo> = { + const sub: SubscriptionInfo = { id: this.counter++, topic: opts.topic, subscriber: subscriber, @@ -248,9 +248,7 @@ export class SubscriptionManager[]> { } } - private async sendSubscribe( - sub: SubscriptionInfo> - ) { + private async sendSubscribe(sub: SubscriptionInfo) { // Send the subscription message const request: SubscribeMessageRequest = { action: Action.SUBSCRIBE, @@ -270,9 +268,7 @@ export class SubscriptionManager[]> { return response } - private async sendUnsubscribe( - sub: SubscriptionInfo> - ) { + private async sendUnsubscribe(sub: SubscriptionInfo) { // Send the unsubscribe message if the subscription has a remote id const {remoteId} = sub if (remoteId) {