Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jribbink committed Nov 20, 2024
1 parent 78e4257 commit d65541b
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 163 deletions.
4 changes: 1 addition & 3 deletions packages/sdk/src/subscribe/subscription-manager.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
describe('SubscriptionManager', () => {

});
describe("SubscriptionManager", () => {})
81 changes: 42 additions & 39 deletions packages/sdk/src/subscribe/subscription-manager.ts
Original file line number Diff line number Diff line change
@@ -1,67 +1,70 @@
//subscriptionmanager

enum SubscriptionTopic {
EVENTS = 'events',
BLOCKS = 'blocks',
EVENTS = "events",
BLOCKS = "blocks",
}

type SubscriptionTypes = {
[SubscriptionTopic.EVENTS]: {
args: {
startBlock: number;
endBlock: number;
}
response: {
type: string;
data: any;
}
};
[SubscriptionTopic.BLOCKS]: {
args: {
startBlock: number;
endBlock: number;
},
response: {
type: string;
data: any;
}
};
[SubscriptionTopic.EVENTS]: {
args: {
startBlock: number
endBlock: number
}
response: {
type: string
data: any
}
}
[SubscriptionTopic.BLOCKS]: {
args: {
startBlock: number
endBlock: number
}
response: {
type: string
data: any
}
}
}

type Subscription = {
unsubscribe: () => void;
unsubscribe: () => void
}

type SubscriptionTransport = {
subscribe: (topic: string, args: any, callback: (data: any) => void) => string;
unsubscribe: (subscriptionId: string) => void;
subscribe: (topic: string, args: any, callback: (data: any) => void) => string
unsubscribe: (subscriptionId: string) => void
}

export class SubscriptionManager {
private subscriptions: Subscription[] = [];
private subscriptions: Subscription[] = []

constructor(private readonly transport: SubscriptionTransport) {}

subscribe<T extends SubscriptionTopic>(
topic: T,
args: SubscriptionTypes[T]['args'],
args: SubscriptionTypes[T]["args"],
callback: (data: any) => void
): () => void {
const subscription = this.transport.subscribe(topic, args, (data) => {
const decodedData = this.decode(topic, data);
callback(decodedData);
});
const subscription = this.transport.subscribe(topic, args, data => {
const decodedData = this.decode(topic, data)
callback(decodedData)
})

return () => {
const index = this.subscriptions.indexOf(subscription);
if (index !== -1) {
this.subscriptions.splice(index, 1);
subscription.unsubscribe();
}
const index = this.subscriptions.indexOf(subscription)
if (index !== -1) {
this.subscriptions.splice(index, 1)
subscription.unsubscribe()
}
}
}

decode<T extends SubscriptionTopic>(topic: T, data: any): SubscriptionTypes[T]['response'] {
return data;
decode<T extends SubscriptionTopic>(
topic: T,
data: any
): SubscriptionTypes[T]["response"] {
return data
}
}
}
57 changes: 31 additions & 26 deletions packages/transport-http/src/subscriptions/models.ts
Original file line number Diff line number Diff line change
@@ -1,58 +1,63 @@
export interface BaseMessageRequest {
action: ActionType
action: ActionType
}

export interface BaseMessageResponse {
action?: ActionType
success: boolean
error_message?: string
action?: ActionType
success: boolean
error_message?: string
}

export interface ListSubscriptionsMessageRequest extends BaseMessageRequest {
action: 'list_subscriptions'
action: "list_subscriptions"
}

export interface ListSubscriptionsMessageResponse extends BaseMessageResponse {
action: 'list_subscriptions'
subscriptions?: SubscriptionEntry[]
action: "list_subscriptions"
subscriptions?: SubscriptionEntry[]
}

export interface SubscribeMessageRequest extends BaseMessageRequest {
action: 'subscribe'
topic: string
arguments: Record<string, any>
action: "subscribe"
topic: string
arguments: Record<string, any>
}

export interface SubscribeMessageResponse extends BaseMessageResponse {
action: 'subscribe'
topic: string
id: string
action: "subscribe"
topic: string
id: string
}

export interface UnsubscribeMessageRequest extends BaseMessageRequest {
action: 'unsubscribe'
id: string
action: "unsubscribe"
id: string
}

export type UnsubscribeMessageResponse = BaseMessageResponse & {
action: 'unsubscribe'
id: string
action: "unsubscribe"
id: string
}

export type SubscriptionEntry = {
id: string
topic: string
arguments: Record<string, any>
id: string
topic: string
arguments: Record<string, any>
}

export type ActionType = 'list_subscriptions' | 'subscribe' | 'unsubscribe'
export type ActionType = "list_subscriptions" | "subscribe" | "unsubscribe"

export type MessageRequest = ListSubscriptionsMessageRequest | SubscribeMessageRequest | UnsubscribeMessageRequest
export type MessageRequest =
| ListSubscriptionsMessageRequest
| SubscribeMessageRequest
| UnsubscribeMessageRequest

export type MessageResponse = ListSubscriptionsMessageResponse | SubscribeMessageResponse | UnsubscribeMessageResponse
export type MessageResponse =
| ListSubscriptionsMessageResponse
| SubscribeMessageResponse
| UnsubscribeMessageResponse

export type SubscriptionDataMessage = {
id: string
data: any
id: string
data: any
}

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export class EventsSubscription
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
export abstract class SubscriptionType {
id: number
remoteId?: string
topic: string
checkpointArgs: any
callback: (data: any) => void

constructor(opts: {
id: number
remoteId?: string
topic: string
callback: (data: any) => void
}) {
this.id = opts.id
this.remoteId = opts.remoteId
this.topic = opts.topic
this.callback = opts.callback
}

abstract handleMessage(data: any): void

abstract connect(): void
}
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
describe('SubscriptionManager', () => {

});
describe("SubscriptionManager", () => {})
Loading

0 comments on commit d65541b

Please sign in to comment.