Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jribbink committed Nov 20, 2024
1 parent 2b5869a commit 78e4257
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 2 deletions.
3 changes: 3 additions & 0 deletions packages/sdk/src/subscribe/subscription-manager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
describe('SubscriptionManager', () => {

});
67 changes: 67 additions & 0 deletions packages/sdk/src/subscribe/subscription-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//subscriptionmanager

enum SubscriptionTopic {
EVENTS = 'events',
BLOCKS = 'blocks',
}

type SubscriptionTypes = {
[SubscriptionTopic.EVENTS]: {
args: {
startBlock: number;
endBlock: number;
}
response: {
type: string;
data: any;
}
};
[SubscriptionTopic.BLOCKS]: {
args: {
startBlock: number;
endBlock: number;
},
response: {
type: string;
data: any;
}
};
}

type Subscription = {
unsubscribe: () => void;
}

type SubscriptionTransport = {
subscribe: (topic: string, args: any, callback: (data: any) => void) => string;
unsubscribe: (subscriptionId: string) => void;
}

export class SubscriptionManager {
private subscriptions: Subscription[] = [];

constructor(private readonly transport: SubscriptionTransport) {}

subscribe<T extends SubscriptionTopic>(
topic: T,
args: SubscriptionTypes[T]['args'],
callback: (data: any) => void
): () => void {
const subscription = this.transport.subscribe(topic, args, (data) => {
const decodedData = this.decode(topic, data);
callback(decodedData);
});

return () => {
const index = this.subscriptions.indexOf(subscription);
if (index !== -1) {
this.subscriptions.splice(index, 1);
subscription.unsubscribe();
}
}
}

decode<T extends SubscriptionTopic>(topic: T, data: any): SubscriptionTypes[T]['response'] {
return data;
}
}
58 changes: 58 additions & 0 deletions packages/transport-http/src/subscriptions/models.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
export interface BaseMessageRequest {
action: ActionType
}

export interface BaseMessageResponse {
action?: ActionType
success: boolean
error_message?: string
}

export interface ListSubscriptionsMessageRequest extends BaseMessageRequest {
action: 'list_subscriptions'
}

export interface ListSubscriptionsMessageResponse extends BaseMessageResponse {
action: 'list_subscriptions'
subscriptions?: SubscriptionEntry[]
}

export interface SubscribeMessageRequest extends BaseMessageRequest {
action: 'subscribe'
topic: string
arguments: Record<string, any>
}

export interface SubscribeMessageResponse extends BaseMessageResponse {
action: 'subscribe'
topic: string
id: string
}

export interface UnsubscribeMessageRequest extends BaseMessageRequest {
action: 'unsubscribe'
id: string
}

export type UnsubscribeMessageResponse = BaseMessageResponse & {
action: 'unsubscribe'
id: string
}

export type SubscriptionEntry = {
id: string
topic: string
arguments: Record<string, any>
}

export type ActionType = 'list_subscriptions' | 'subscribe' | 'unsubscribe'

export type MessageRequest = ListSubscriptionsMessageRequest | SubscribeMessageRequest | UnsubscribeMessageRequest

export type MessageResponse = ListSubscriptionsMessageResponse | SubscribeMessageResponse | UnsubscribeMessageResponse

export type SubscriptionDataMessage = {
id: string
data: any
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
describe('SubscriptionManager', () => {

});
112 changes: 112 additions & 0 deletions packages/transport-http/src/subscriptions/subscription-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import { MessageResponse, SubscriptionDataMessage } from "./models";
import { SubscribeMessageRequest, SubscribeMessageResponse, UnsubscribeMessageRequest } from "./models";
import { Subscription, SubscriptionArguments, SubscriptionResponse, SubscriptionTopic, SubscriptionTransport } from "./types";
import { WebSocket } from "../websocket";

type Sub = {
topic: string;
args: any;
callback: (data: any) => void;
}

export class WebsocketTransport implements SubscriptionTransport {
// Map of subscription IDs to listeners
private subscriptions = new Map<string, (data: any) => void>();

// WebSocket is only opened when there are active subscriptions
private socket: WebSocket | null = null;

constructor(private hostname: string) {}

// Lazy connect to the socket when the first subscription is made
connect() {
// If the socket is already open, do nothing
if (this.socket?.readyState === WebSocket.OPEN) {
return;
}

this.socket = new WebSocket(this.hostname);
this.socket.onmessage = (event) => {
const data = JSON.parse(event.data) as MessageResponse | SubscriptionDataMessage;
if ("action" in data) {
// TODO, waiting for AN team to decide what to do here
} else {
const callback = this.subscriptions.get(data.id);
if (callback) {
callback(data.data);
}
}
}
this.socket.onclose = () => {
this.subscriptions.forEach((_, id) => {
this.unsubscribe(id);
});
this.socket?.close();
}
this.socket.onerror = () => {

}

this.socket.onopen = () => {
this.subscriptions.forEach((sub) => {
this.socket?.send(JSON.stringify(sub));
});
}
}

async subscribe<T extends SubscriptionTopic>(
topic: T,
args: SubscriptionArguments<T>,
callback: (data: SubscriptionResponse<T>) => void
): Promise<Subscription> {
// Connect the socket if it's not already open
this.connect();

// Send the subscription message
const request: SubscribeMessageRequest = {
action: 'subscribe',
topic,
arguments: args,
}
this.socket?.send(JSON.stringify(request));

// Add to the response queue
const response = await this.waitForResponse();

if (!response.success) {
throw new Error(`Failed to subscribe to topic ${topic}, error message: ${response.error_message}`);
}

// Add the subscription to the map
this.subscriptions.set(response.id, callback);

return {
unsubscribe: () => {
this.unsubscribe(response.id);
}
}
}

private async unsubscribe(id: string): Promise<void> {
const request: UnsubscribeMessageRequest = {
action: 'unsubscribe',
id,
}
this.socket?.send(JSON.stringify(request));

const response = await this.waitForResponse();

this.subscriptions.delete(id);

// If there are no subscriptions left, close the socket
if (this.subscriptions.size === 0) {
this.socket?.close();
}
}

private async waitForResponse() {
return new Promise<SubscribeMessageResponse>((resolve) => {
// TODO: NOOP, waiting for AN team to decide what to do here
});
}
}
38 changes: 38 additions & 0 deletions packages/transport-http/src/subscriptions/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
export enum SubscriptionTopic {
EVENTS = 'events',
BLOCKS = 'blocks',
}

export type SubscriptionSchema = {
[SubscriptionTopic.EVENTS]: {
args: {
startBlock: number;
endBlock: number;
}
response: {
type: string;
data: any;
}
};
[SubscriptionTopic.BLOCKS]: {
args: {
startBlock: number;
endBlock: number;
},
response: {
type: string;
data: any;
}
};
}

export type SubscriptionArguments<T extends SubscriptionTopic> = SubscriptionSchema[T]['args'];
export type SubscriptionResponse<T extends SubscriptionTopic> = SubscriptionSchema[T]['response'];

export type Subscription = {
unsubscribe: () => void;
}

export type SubscriptionTransport = {
subscribe: <T extends SubscriptionTopic>(topic: T, args: SubscriptionArguments<T>, callback: (data: SubscriptionResponse<T>) => void) => Promise<Subscription>;
}
4 changes: 2 additions & 2 deletions packages/transport-http/src/websocket.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import _WebSocket from "isomorphic-ws"

export const WebSocket = _WebSocket as new (
export const WebSocket = _WebSocket as (new (
url: string | URL,
protocols?: string | string[] | undefined
) => WebSocket
) => WebSocket) & WebSocket
Empty file.

0 comments on commit 78e4257

Please sign in to comment.