From 78e4257fcbde15d4e3a812acbab3b19eed96c7b2 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Wed, 20 Nov 2024 10:22:30 -0800 Subject: [PATCH] WIP --- .../subscribe/subscription-manager.test.ts | 3 + .../sdk/src/subscribe/subscription-manager.ts | 67 +++++++++++ .../src/subscriptions/models.ts | 58 +++++++++ .../subscription-manager.test.ts | 3 + .../src/subscriptions/subscription-manager.ts | 112 ++++++++++++++++++ .../transport-http/src/subscriptions/types.ts | 38 ++++++ packages/transport-http/src/websocket.ts | 4 +- packages/typedefs/src/transports.ts | 0 8 files changed, 283 insertions(+), 2 deletions(-) create mode 100644 packages/sdk/src/subscribe/subscription-manager.test.ts create mode 100644 packages/sdk/src/subscribe/subscription-manager.ts create mode 100644 packages/transport-http/src/subscriptions/models.ts create mode 100644 packages/transport-http/src/subscriptions/subscription-manager.test.ts create mode 100644 packages/transport-http/src/subscriptions/subscription-manager.ts create mode 100644 packages/transport-http/src/subscriptions/types.ts create mode 100644 packages/typedefs/src/transports.ts diff --git a/packages/sdk/src/subscribe/subscription-manager.test.ts b/packages/sdk/src/subscribe/subscription-manager.test.ts new file mode 100644 index 000000000..f5a213d99 --- /dev/null +++ b/packages/sdk/src/subscribe/subscription-manager.test.ts @@ -0,0 +1,3 @@ +describe('SubscriptionManager', () => { + +}); \ No newline at end of file diff --git a/packages/sdk/src/subscribe/subscription-manager.ts b/packages/sdk/src/subscribe/subscription-manager.ts new file mode 100644 index 000000000..14efec5e4 --- /dev/null +++ b/packages/sdk/src/subscribe/subscription-manager.ts @@ -0,0 +1,67 @@ +//subscriptionmanager + +enum SubscriptionTopic { + EVENTS = 'events', + BLOCKS = 'blocks', +} + +type SubscriptionTypes = { + [SubscriptionTopic.EVENTS]: { + args: { + startBlock: number; + endBlock: number; + } + response: { + type: string; + data: any; + } + }; + [SubscriptionTopic.BLOCKS]: { + args: { + startBlock: number; + endBlock: number; + }, + response: { + type: string; + data: any; + } + }; +} + +type Subscription = { + unsubscribe: () => void; +} + +type SubscriptionTransport = { + subscribe: (topic: string, args: any, callback: (data: any) => void) => string; + unsubscribe: (subscriptionId: string) => void; +} + +export class SubscriptionManager { + private subscriptions: Subscription[] = []; + + constructor(private readonly transport: SubscriptionTransport) {} + + subscribe( + topic: T, + args: SubscriptionTypes[T]['args'], + callback: (data: any) => void + ): () => void { + const subscription = this.transport.subscribe(topic, args, (data) => { + const decodedData = this.decode(topic, data); + callback(decodedData); + }); + + return () => { + const index = this.subscriptions.indexOf(subscription); + if (index !== -1) { + this.subscriptions.splice(index, 1); + subscription.unsubscribe(); + } + } + } + + decode(topic: T, data: any): SubscriptionTypes[T]['response'] { + return data; + } +} \ No newline at end of file diff --git a/packages/transport-http/src/subscriptions/models.ts b/packages/transport-http/src/subscriptions/models.ts new file mode 100644 index 000000000..419e921e5 --- /dev/null +++ b/packages/transport-http/src/subscriptions/models.ts @@ -0,0 +1,58 @@ +export interface BaseMessageRequest { + action: ActionType +} + +export interface BaseMessageResponse { + action?: ActionType + success: boolean + error_message?: string +} + +export interface ListSubscriptionsMessageRequest extends BaseMessageRequest { + action: 'list_subscriptions' +} + +export interface ListSubscriptionsMessageResponse extends BaseMessageResponse { + action: 'list_subscriptions' + subscriptions?: SubscriptionEntry[] +} + +export interface SubscribeMessageRequest extends BaseMessageRequest { + action: 'subscribe' + topic: string + arguments: Record +} + +export interface SubscribeMessageResponse extends BaseMessageResponse { + action: 'subscribe' + topic: string + id: string +} + +export interface UnsubscribeMessageRequest extends BaseMessageRequest { + action: 'unsubscribe' + id: string +} + +export type UnsubscribeMessageResponse = BaseMessageResponse & { + action: 'unsubscribe' + id: string +} + +export type SubscriptionEntry = { + id: string + topic: string + arguments: Record +} + +export type ActionType = 'list_subscriptions' | 'subscribe' | 'unsubscribe' + +export type MessageRequest = ListSubscriptionsMessageRequest | SubscribeMessageRequest | UnsubscribeMessageRequest + +export type MessageResponse = ListSubscriptionsMessageResponse | SubscribeMessageResponse | UnsubscribeMessageResponse + +export type SubscriptionDataMessage = { + id: string + data: any +} + diff --git a/packages/transport-http/src/subscriptions/subscription-manager.test.ts b/packages/transport-http/src/subscriptions/subscription-manager.test.ts new file mode 100644 index 000000000..f5a213d99 --- /dev/null +++ b/packages/transport-http/src/subscriptions/subscription-manager.test.ts @@ -0,0 +1,3 @@ +describe('SubscriptionManager', () => { + +}); \ No newline at end of file diff --git a/packages/transport-http/src/subscriptions/subscription-manager.ts b/packages/transport-http/src/subscriptions/subscription-manager.ts new file mode 100644 index 000000000..d709a0105 --- /dev/null +++ b/packages/transport-http/src/subscriptions/subscription-manager.ts @@ -0,0 +1,112 @@ +import { MessageResponse, SubscriptionDataMessage } from "./models"; +import { SubscribeMessageRequest, SubscribeMessageResponse, UnsubscribeMessageRequest } from "./models"; +import { Subscription, SubscriptionArguments, SubscriptionResponse, SubscriptionTopic, SubscriptionTransport } from "./types"; +import { WebSocket } from "../websocket"; + +type Sub = { + topic: string; + args: any; + callback: (data: any) => void; +} + +export class WebsocketTransport implements SubscriptionTransport { + // Map of subscription IDs to listeners + private subscriptions = new Map void>(); + + // WebSocket is only opened when there are active subscriptions + private socket: WebSocket | null = null; + + constructor(private hostname: string) {} + + // Lazy connect to the socket when the first subscription is made + connect() { + // If the socket is already open, do nothing + if (this.socket?.readyState === WebSocket.OPEN) { + return; + } + + this.socket = new WebSocket(this.hostname); + this.socket.onmessage = (event) => { + const data = JSON.parse(event.data) as MessageResponse | SubscriptionDataMessage; + if ("action" in data) { + // TODO, waiting for AN team to decide what to do here + } else { + const callback = this.subscriptions.get(data.id); + if (callback) { + callback(data.data); + } + } + } + this.socket.onclose = () => { + this.subscriptions.forEach((_, id) => { + this.unsubscribe(id); + }); + this.socket?.close(); + } + this.socket.onerror = () => { + + } + + this.socket.onopen = () => { + this.subscriptions.forEach((sub) => { + this.socket?.send(JSON.stringify(sub)); + }); + } + } + + async subscribe( + topic: T, + args: SubscriptionArguments, + callback: (data: SubscriptionResponse) => void + ): Promise { + // Connect the socket if it's not already open + this.connect(); + + // Send the subscription message + const request: SubscribeMessageRequest = { + action: 'subscribe', + topic, + arguments: args, + } + this.socket?.send(JSON.stringify(request)); + + // Add to the response queue + const response = await this.waitForResponse(); + + if (!response.success) { + throw new Error(`Failed to subscribe to topic ${topic}, error message: ${response.error_message}`); + } + + // Add the subscription to the map + this.subscriptions.set(response.id, callback); + + return { + unsubscribe: () => { + this.unsubscribe(response.id); + } + } + } + + private async unsubscribe(id: string): Promise { + const request: UnsubscribeMessageRequest = { + action: 'unsubscribe', + id, + } + this.socket?.send(JSON.stringify(request)); + + const response = await this.waitForResponse(); + + this.subscriptions.delete(id); + + // If there are no subscriptions left, close the socket + if (this.subscriptions.size === 0) { + this.socket?.close(); + } + } + + private async waitForResponse() { + return new Promise((resolve) => { + // TODO: NOOP, waiting for AN team to decide what to do here + }); + } +} \ No newline at end of file diff --git a/packages/transport-http/src/subscriptions/types.ts b/packages/transport-http/src/subscriptions/types.ts new file mode 100644 index 000000000..1bdd7f7af --- /dev/null +++ b/packages/transport-http/src/subscriptions/types.ts @@ -0,0 +1,38 @@ +export enum SubscriptionTopic { + EVENTS = 'events', + BLOCKS = 'blocks', +} + +export type SubscriptionSchema = { + [SubscriptionTopic.EVENTS]: { + args: { + startBlock: number; + endBlock: number; + } + response: { + type: string; + data: any; + } + }; + [SubscriptionTopic.BLOCKS]: { + args: { + startBlock: number; + endBlock: number; + }, + response: { + type: string; + data: any; + } + }; +} + +export type SubscriptionArguments = SubscriptionSchema[T]['args']; +export type SubscriptionResponse = SubscriptionSchema[T]['response']; + +export type Subscription = { + unsubscribe: () => void; +} + +export type SubscriptionTransport = { + subscribe: (topic: T, args: SubscriptionArguments, callback: (data: SubscriptionResponse) => void) => Promise; +} \ No newline at end of file diff --git a/packages/transport-http/src/websocket.ts b/packages/transport-http/src/websocket.ts index 370efcede..087c2de0e 100644 --- a/packages/transport-http/src/websocket.ts +++ b/packages/transport-http/src/websocket.ts @@ -1,6 +1,6 @@ import _WebSocket from "isomorphic-ws" -export const WebSocket = _WebSocket as new ( +export const WebSocket = _WebSocket as (new ( url: string | URL, protocols?: string | string[] | undefined -) => WebSocket +) => WebSocket) & WebSocket \ No newline at end of file diff --git a/packages/typedefs/src/transports.ts b/packages/typedefs/src/transports.ts new file mode 100644 index 000000000..e69de29bb