diff --git a/packages/transport-http/src/subscribe/handlers/account-statuses.ts b/packages/transport-http/src/subscribe/handlers/account-statuses.ts new file mode 100644 index 000000000..29391ce73 --- /dev/null +++ b/packages/transport-http/src/subscribe/handlers/account-statuses.ts @@ -0,0 +1,125 @@ +import {SdkTransport} from "@onflow/typedefs" +import {createSubscriptionHandler} from "./types" + +type AccountStatusesArgs = + SdkTransport.SubscriptionArguments + +type AccountStatusesData = + SdkTransport.SubscriptionData + +type AccountStatusesArgsDto = { + start_block_id?: string + start_block_height?: number + event_types?: string[] + addresses?: string[] + account_addresses?: string[] +} + +type AccountStatusesDataDto = { + block_id: string + block_height: string + block_timestamp: string + account_events: { + [address: string]: { + type: string + transaction_id: string + transaction_index: string + event_index: string + payload: string + }[] + } + message_index: string +} + +export const accountStatusesHandler = createSubscriptionHandler<{ + Topic: SdkTransport.SubscriptionTopic.ACCOUNT_STATUSES + Args: SdkTransport.SubscriptionArguments + Data: SdkTransport.SubscriptionData + ArgsDto: AccountStatusesArgsDto + DataDto: AccountStatusesDataDto +}>({ + topic: SdkTransport.SubscriptionTopic.ACCOUNT_STATUSES, + createSubscriber: (initialArgs, onData, onError) => { + let resumeArgs: AccountStatusesArgs = { + ...initialArgs, + } + + return { + onData(rawData: AccountStatusesDataDto) { + const data: AccountStatusesData[] = [] + for (const [address, events] of Object.entries( + rawData.account_events + )) { + for (const event of events) { + // Parse the raw data + const parsedData: AccountStatusesData = { + accountStatus: { + accountAddress: address, + blockId: rawData.block_id, + blockHeight: Number(rawData.block_height), + blockTimestamp: rawData.block_timestamp, + type: event.type, + transactionId: event.transaction_id, + transactionIndex: Number(event.transaction_index), + eventIndex: Number(event.event_index), + payload: event.payload, + }, + } + + // Update the resume args + resumeArgs = { + ...resumeArgs, + startBlockHeight: Number( + BigInt(rawData.block_height) + BigInt(1) + ), + startBlockId: undefined, + } + + data.push(parsedData) + } + + // Sort the messages by increasing message index + data.sort((a, b) => { + const txIndexDiff = + a.accountStatus.transactionIndex - + b.accountStatus.transactionIndex + if (txIndexDiff !== 0) return txIndexDiff + + return a.accountStatus.eventIndex - b.accountStatus.eventIndex + }) + + // Emit the messages + for (const message of data) { + onData(message) + } + } + }, + onError(error: Error) { + onError(error) + }, + getConnectionArgs() { + let encodedArgs: AccountStatusesArgsDto = { + event_types: resumeArgs.eventTypes, + addresses: resumeArgs.addresses, + account_addresses: resumeArgs.accountAddresses, + } + + if ("startBlockHeight" in resumeArgs) { + return { + ...encodedArgs, + start_block_height: resumeArgs.startBlockHeight, + } + } + + if ("startBlockId" in resumeArgs) { + return { + ...encodedArgs, + start_block_id: resumeArgs.startBlockId, + } + } + + return encodedArgs + }, + } + }, +}) diff --git a/packages/transport-http/src/subscribe/subscribe.ts b/packages/transport-http/src/subscribe/subscribe.ts index a292237d0..3a7206b50 100644 --- a/packages/transport-http/src/subscribe/subscribe.ts +++ b/packages/transport-http/src/subscribe/subscribe.ts @@ -3,15 +3,17 @@ import {SubscriptionManager} from "./subscription-manager" import {blocksHandler} from "./handlers/blocks" import {blockHeadersHandler} from "./handlers/block-headers" import {blockDigestsHandler} from "./handlers/block-digests" -import {eventsHandler} from "./handlers/events" +import {accountStatusesHandler} from "./handlers/account-statuses" import {transactionStatusesHandler} from "./handlers/transaction-statuses" +import {eventsHandler} from "./handlers/events" const SUBSCRIPTION_HANDLERS = [ blocksHandler, blockHeadersHandler, blockDigestsHandler, - eventsHandler, + accountStatusesHandler, transactionStatusesHandler, + eventsHandler, ] // Map of SubscriptionManager instances by access node URL @@ -44,7 +46,6 @@ export async function subscribe( return manager.subscribe({ topic, args, - // @ts-ignore - TODO: This is temporary until we start implementing the handlers onData, onError, }) diff --git a/packages/typedefs/src/index.ts b/packages/typedefs/src/index.ts index f2f541db9..f1af56cea 100644 --- a/packages/typedefs/src/index.ts +++ b/packages/typedefs/src/index.ts @@ -464,6 +464,7 @@ export type NodeVersionInfo = { */ nodeRootBlockHeight: number } + export interface StreamConnection { on( channel: C, diff --git a/packages/typedefs/src/sdk-transport/subscriptions.ts b/packages/typedefs/src/sdk-transport/subscriptions.ts index ed3039c5a..37fe2a785 100644 --- a/packages/typedefs/src/sdk-transport/subscriptions.ts +++ b/packages/typedefs/src/sdk-transport/subscriptions.ts @@ -26,6 +26,15 @@ export type SubscriptionSchema = { blockDigest: BlockDigest } > + [SubscriptionTopic.ACCOUNT_STATUSES]: SchemaItem< + AccountStatusesArgs, + { + accountStatus: Omit & { + payload: string + accountAddress: string + } + } + > [SubscriptionTopic.TRANSACTION_STATUSES]: SchemaItem< { transactionId: string @@ -55,6 +64,7 @@ export enum SubscriptionTopic { BLOCKS = "blocks", BLOCK_HEADERS = "block_headers", BLOCK_DIGESTS = "block_digests", + ACCOUNT_STATUSES = "account_statuses", TRANSACTION_STATUSES = "transaction_statuses", EVENTS = "events", } @@ -69,6 +79,14 @@ type BlockArgs = startBlockHeight?: number } +type AccountStatusesArgs = { + startBlockId?: string + startBlockHeight?: number + eventTypes?: string[] + addresses?: string[] + accountAddresses?: string[] +} + type SchemaItem = { args: TArgs data: TData