Skip to content

Commit

Permalink
Add SDK subscribe function
Browse files Browse the repository at this point in the history
  • Loading branch information
jribbink committed Nov 26, 2024
1 parent 7081ebf commit 59bf81c
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 18 deletions.
2 changes: 1 addition & 1 deletion packages/sdk/src/account/account.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {atBlockId} from "../build/build-at-block-id.js"
import {getAccount} from "../build/build-get-account.js"
import {invariant} from "@onflow/util-invariant"
import {decodeResponse as decode} from "../decode/decode.js"
import {send} from "../send/send.js"
import {send} from "../transport/send"

/**
* @typedef {import("@onflow/typedefs").Account} Account
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/block/block.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {send} from "../send/send.js"
import {send} from "../transport/send"
import {getBlock} from "../build/build-get-block"
import {atBlockHeight} from "../build/build-at-block-height.js"
import {atBlockId} from "../build/build-at-block-id.js"
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/contract.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as root from "./sdk"
import * as decode from "./decode/decode.js"
import * as encode from "./encode/encode"
import * as interaction from "./interaction/interaction"
import * as send from "./send/send.js"
import * as send from "./transport/send"
import * as template from "@onflow/util-template"

const interfaceContract =
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/node-version-info/node-version-info.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {send} from "../send/send.js"
import {send} from "../transport/send"
import {decodeResponse as decode} from "../decode/decode.js"
import {getNodeVersionInfo} from "../build/build-get-node-version-info"
import {NodeVersionInfo} from "@onflow/typedefs"
Expand Down
4 changes: 3 additions & 1 deletion packages/sdk/src/sdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as logger from "@onflow/util-logger"
// Base
export {build} from "./build/build.js"
export {resolve} from "./resolve/resolve.js"
export {send} from "./send/send.js"
export {send} from "./transport/send"
export {decode} from "./decode/sdk-decode.js"
export {
encodeTransactionPayload,
Expand Down Expand Up @@ -113,3 +113,5 @@ import * as TestUtils from "./test-utils"
export {TestUtils}

export {VERSION} from "./VERSION"

export {subscribe} from "./transport/subscribe"
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import {Buffer} from "@onflow/rlp"
import {send as defaultSend} from "@onflow/transport-http"
import {initInteraction, pipe} from "../interaction/interaction"
import * as ixModule from "../interaction/interaction"
import {invariant} from "../build/build-invariant.js"
import {invariant} from "../build/build-invariant"
import {response} from "../response/response"
import {config} from "@onflow/config"
import {resolve as defaultResolve} from "../resolve/resolve.js"
import {resolve as defaultResolve} from "../resolve/resolve"
import {getTransport} from "./transport"

/**
* @description - Sends arbitrary scripts, transactions, and requests to Flow
* @param {Array.<Function> | Function} args - An array of functions that take interaction and return interaction
* @param {object} opts - Optional parameters
* @returns {Promise<*>} - A promise that resolves to a response
* @param args - An array of functions that take interaction and return interaction
* @param opts - Optional parameters
* @returns - A promise that resolves to a response
*/
export const send = async (args = [], opts = {}) => {
const sendFn = await config.first(
["sdk.transport", "sdk.send"],
opts.send || defaultSend
)
export const send = async (
args: Function | Function[] = [],
opts: any = {}
): Promise<any> => {
const {send: sendFn} = await getTransport(opts)

invariant(
sendFn,
Expand All @@ -31,10 +31,10 @@ export const send = async (args = [], opts = {}) => {

opts.node = opts.node || (await config().get("accessNode.api"))

if (Array.isArray(args)) args = pipe(initInteraction(), args)
if (Array.isArray(args)) args = pipe(initInteraction(), args as any) as any
return sendFn(
await resolveFn(args),
{config, response, ix: ixModule, Buffer},
{config, response, ix: ixModule, Buffer} as any,
opts
)
}
48 changes: 48 additions & 0 deletions packages/sdk/src/transport/subscribe.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import {config} from "@onflow/config"
import {subscribe} from "./subscribe"
import {SdkTransport} from "@onflow/typedefs"
import {getTransport} from "./transport"

jest.mock("./transport")

describe("subscribe", () => {
let mockTransport: jest.Mocked<SdkTransport.Transport>

beforeEach(() => {
jest.resetAllMocks()

mockTransport = {
subscribe: jest.fn().mockReturnValue({
unsubscribe: jest.fn(),
}),
send: jest.fn(),
}
jest.mocked(getTransport).mockResolvedValue(mockTransport)
})

test("subscribes to a topic and returns a subscription", async () => {
const topic = "topic" as SdkTransport.SubscriptionTopic
const args = {foo: "bar"} as SdkTransport.SubscriptionArguments<any>
const onData = jest.fn()
const onError = jest.fn()

const sub = await config().overload(
{
"accessNode.api": "http://localhost:8080",
},
async () => {
return await subscribe({topic, args, onData, onError})
}
)

expect(mockTransport.subscribe).toHaveBeenCalledTimes(1)
expect(mockTransport.subscribe).toHaveBeenCalledWith(
{topic, args, onData: expect.any(Function), onError},
{node: "http://localhost:8080"}
)

expect(sub).toStrictEqual({
unsubscribe: expect.any(Function),
})
})
})
57 changes: 57 additions & 0 deletions packages/sdk/src/transport/subscribe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import {config} from "@onflow/config"
import {SdkTransport} from "@onflow/typedefs"
import {getTransport} from "./transport"
import {invariant} from "@onflow/util-invariant"

// TODO: OPTS FUNCTION
export async function subscribe<T extends SdkTransport.SubscriptionTopic>(
{
topic,
args,
onData,
onError,
}: {
topic: T
args: SdkTransport.SubscriptionArguments<T>
onData: (data: SdkTransport.SubscriptionData<T>) => void
onError: (error: Error) => void
},
opts: {
node?: string
send?: SdkTransport.SendFn
transport?: SdkTransport.Transport
} = {}
) {
const transport = await getTransport(opts)
const node = opts?.node || (await config().get("accessNode.api"))

invariant(
!!node,
`SDK Send Error: Either opts.node or "accessNode.api" in config must be defined.`
)

// TODO: handle onError
// Subscribe using the resolved transport
return transport.subscribe(
{
topic,
args,
onData: data => {
// TODO: decode function
onData(decode(topic, data))
},
onError,
},
{
node,
...opts,
}
)
}

export function decode<T extends SdkTransport.SubscriptionTopic>(
topic: T,
data: SdkTransport.SubscriptionData<T>
): any {
return data
}
44 changes: 44 additions & 0 deletions packages/sdk/src/transport/transport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import {config} from "@onflow/config"
import {httpTransport as defaultTransport} from "@onflow/transport-http"
import {SdkTransport} from "@onflow/typedefs"
import {invariant} from "@onflow/util-invariant"

export async function getTransport(
opts: {
send?: SdkTransport.SendFn
transport?: SdkTransport.Transport
} = {}
): Promise<SdkTransport.Transport> {
invariant(
opts.send == null || opts.transport == null,
`SDK Transport Error: Cannot provide both "transport" and legacy "send" options.`
)

const transportOrSend = await config().first<
SdkTransport.Transport | SdkTransport.SendFn
>(
["sdk.transport", "sdk.send"],
opts.transport || opts.send || defaultTransport
)

if (isTransportObject(transportOrSend)) {
// This is a transport object, return it directly
return transportOrSend
} else {
// This is a legacy send function, wrap it in a transport object
return {
send: transportOrSend,
subscribe: () => {
throw new Error(
"Subscribe not supported with legacy send function transport, please provide a transport object."
)
},
}
}
}

function isTransportObject(
transport: any
): transport is SdkTransport.Transport {
return transport.send !== undefined && transport.subscribe !== undefined
}

0 comments on commit 59bf81c

Please sign in to comment.