Skip to content

Commit

Permalink
update jrpc subscription support - more error proof and concise
Browse files Browse the repository at this point in the history
  • Loading branch information
LiranCohen committed Feb 23, 2024
1 parent d699035 commit 16694f2
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 148 deletions.
31 changes: 19 additions & 12 deletions src/connection/socket-connection.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import type { Dwn, GenericMessage } from "@tbd54566975/dwn-sdk-js";
import type { Dwn, GenericMessage, MessageEvent } from "@tbd54566975/dwn-sdk-js";
import { DwnMethodName } from "@tbd54566975/dwn-sdk-js";

import type { WebSocket } from "ws";
import log from 'loglevel';
import { v4 as uuidv4 } from 'uuid';

import type { RequestContext } from "../lib/json-rpc-router.js";
import type { JsonRpcErrorResponse, JsonRpcId, JsonRpcRequest, JsonRpcResponse } from "../lib/json-rpc.js";
import type { JsonRpcErrorResponse, JsonRpcId, JsonRpcRequest, JsonRpcResponse, JsonRpcSubscription } from "../lib/json-rpc.js";

import { requestCounter } from "../metrics.js";
import { jsonRpcApi } from "../json-rpc-api.js";
Expand All @@ -15,12 +15,6 @@ import { DwnServerError, DwnServerErrorCode } from "../dwn-error.js";

const HEARTBEAT_INTERVAL = 30_000;

export interface JsonRpcSubscription {
/** JSON RPC Id of the Subscription Request */
id: JsonRpcId;
close: () => Promise<void>;
}

/**
* SocketConnection handles a WebSocket connection to a DWN using JSON RPC.
* It also manages references to the long running RPC subscriptions for the connection.
Expand Down Expand Up @@ -54,6 +48,13 @@ export class SocketConnection {
}, HEARTBEAT_INTERVAL);
}

/**
* Checks to see if the incoming `JsonRpcId` is already in use for a subscription.
*/
hasSubscription(id: JsonRpcId): boolean {
return this.subscriptions.has(id);
}

/**
* Adds a reference for the JSON RPC Subscription to this connection.
* Used for cleanup if the connection is closed.
Expand Down Expand Up @@ -182,7 +183,7 @@ export class SocketConnection {
*
* Wraps the incoming `message` in a `JSON RPC Success Response` using the original subscription`JSON RPC Id` to send through the WebSocket.
*/
private createSubscriptionHandler(id: JsonRpcId): (message: GenericMessage) => void {
private createSubscriptionHandler(id: JsonRpcId): (message: MessageEvent) => void {
return (event) => {
const response = createJsonRpcSuccessResponse(id, { reply: { event } });
this.send(response);
Expand All @@ -195,17 +196,23 @@ export class SocketConnection {
* Adds a `subscriptionHandler` for `Subscribe` messages.
*/
private async buildRequestContext(request: JsonRpcRequest): Promise<RequestContext> {
const { id, params, method } = request;
const { params, method } = request;
const { subscribe } = params.rpc || {};

const requestContext: RequestContext = {
transport : 'ws',
dwn : this.dwn,
socketConnection : this,
}

if (method === 'dwn.processMessage') {
if (method.startsWith('rpc.subscribe.') && subscribe) {
const { message } = params as { message: GenericMessage };
if (message.descriptor.method === DwnMethodName.Subscribe) {
requestContext.subscriptionHandler = this.createSubscriptionHandler(id).bind(this);
const handlerFunc = this.createSubscriptionHandler(subscribe);
requestContext.subscriptionRequest = {
id: subscribe,
subscriptionHandler: (message): void => handlerFunc(message),
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/json-rpc-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ import { handleSubscriptionsClose } from './json-rpc-handlers/subscription/index
export const jsonRpcApi = new JsonRpcRouter();

jsonRpcApi.on('dwn.processMessage', handleDwnProcessMessage);
jsonRpcApi.on('subscription.close', handleSubscriptionsClose);
jsonRpcApi.on('rpc.subscribe.dwn.processMessage', handleDwnProcessMessage);
jsonRpcApi.on('rpc.subscribe.close', handleSubscriptionsClose);
67 changes: 36 additions & 31 deletions src/json-rpc-handlers/dwn/process-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,27 @@ import type { GenericMessage } from '@tbd54566975/dwn-sdk-js';
import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js';

import type { Readable as IsomorphicReadable } from 'readable-stream';
import log from 'loglevel';
import { v4 as uuidv4 } from 'uuid';

import type { JsonRpcSubscription } from '../../lib/json-rpc.js';
import type {
HandlerResponse,
JsonRpcHandler,
} from '../../lib/json-rpc-router.js';

import { DwnServerErrorCode } from '../../dwn-error.js';
import {
createJsonRpcErrorResponse,
createJsonRpcSuccessResponse,
JsonRpcErrorCodes,
} from '../../lib/json-rpc.js';
import log from 'loglevel';


export const handleDwnProcessMessage: JsonRpcHandler = async (
dwnRequest,
context,
) => {
const { dwn, dataStream, subscriptionHandler, socketConnection, transport } = context;
const { dwn, dataStream, subscriptionRequest, socketConnection, transport } = context;
const { target, message } = dwnRequest.params as { target: string, message: GenericMessage };
const requestId = dwnRequest.id ?? uuidv4();

Expand All @@ -41,23 +42,41 @@ export const handleDwnProcessMessage: JsonRpcHandler = async (
return { jsonRpcResponse };
}

// subscribe methods must come with a subscriptionRequest context
if (message.descriptor.method === DwnMethodName.Subscribe && subscriptionRequest === undefined) {
const jsonRpcResponse = createJsonRpcErrorResponse(
requestId,
JsonRpcErrorCodes.InvalidRequest,
`subscribe methods must contain a subscriptionRequest context`
);
return { jsonRpcResponse };
}

// Subscribe methods are only supported on 'ws' (WebSockets)
if (transport !== 'ws' && message.descriptor.method === DwnMethodName.Subscribe) {
if (transport !== 'ws' && subscriptionRequest !== undefined) {
const jsonRpcResponse = createJsonRpcErrorResponse(
requestId,
JsonRpcErrorCodes.InvalidParams,
`Subscribe not supported via ${context.transport}`
`subscriptions are not supported via ${context.transport}`
)
return { jsonRpcResponse };
}

if (subscriptionRequest !== undefined && socketConnection?.hasSubscription(subscriptionRequest.id)) {
const jsonRpcResponse = createJsonRpcErrorResponse(
requestId,
JsonRpcErrorCodes.InvalidParams,
`the subscribe id: ${subscriptionRequest.id} is in use by an active subscription`
)
return { jsonRpcResponse };
}

const reply = await dwn.processMessage(target, message, {
dataStream: dataStream as IsomorphicReadable,
subscriptionHandler,
subscriptionHandler: subscriptionRequest?.subscriptionHandler,
});

const { record, subscription } = reply;

const { record } = reply;
// RecordsRead messages return record data as a stream to for accommodate large amounts of data
let recordDataStream: IsomorphicReadable;
if (record !== undefined && record.data !== undefined) {
Expand All @@ -66,29 +85,16 @@ export const handleDwnProcessMessage: JsonRpcHandler = async (
}

// Subscribe messages return a close function to facilitate closing the subscription
if (subscription !== undefined) {
const { close } = subscription;
try {
// adding a reference to the close function for this subscription request to the connection.
// this will facilitate closing the subscription later.
await socketConnection.addSubscription({ id: requestId, close });
delete reply.subscription.close // not serializable via JSON
} catch(error) {
// close the subscription upon receiving an error here
await close();
if (error.code === DwnServerErrorCode.ConnectionSubscriptionJsonRpcIdExists) {
// a subscription with this request id already exists
const errorResponse = createJsonRpcErrorResponse(
requestId,
JsonRpcErrorCodes.BadRequest,
`the request id ${requestId} already has an active subscription`
);
return { jsonRpcResponse: errorResponse };
} else {
// will catch as an unknown error below
throw new Error('unknown error adding subscription');
}
if (subscriptionRequest && reply.subscription) {
const { close } = reply.subscription;
// we add a reference to the close function for this subscription request to the socket connection.
// this will facilitate closing the subscription later.
const subscriptionReply: JsonRpcSubscription = {
id: subscriptionRequest.id,
close,
}
await socketConnection.addSubscription(subscriptionReply);
delete reply.subscription.close // delete the close method from the reply as it's not JSON serializable and has a held reference.
}

const jsonRpcResponse = createJsonRpcSuccessResponse(requestId, { reply });
Expand All @@ -107,7 +113,6 @@ export const handleDwnProcessMessage: JsonRpcHandler = async (

// log the error response
log.error('handleDwnProcessMessage error', jsonRpcResponse);

return { jsonRpcResponse } as HandlerResponse;
}
};
62 changes: 49 additions & 13 deletions src/json-rpc-socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { v4 as uuidv4 } from 'uuid';
import WebSocket from 'ws';

import type { JsonRpcRequest, JsonRpcResponse } from "./lib/json-rpc.js";
import { createJsonRpcRequest } from "./lib/json-rpc.js";

// These were arbitrarily chosen, but can be modified via connect options
const CONNECT_TIMEOUT = 3_000;
Expand Down Expand Up @@ -89,26 +90,61 @@ export class JsonRpcSocket {
* Sends a JSON-RPC request through the socket and keeps a listener open to read associated responses as they arrive.
* Returns a close method to clean up the listener.
*/
subscribe(request: JsonRpcRequest, listener: (response: JsonRpcResponse) => void): { close: () => void } {
request.id ??= uuidv4();
async subscribe(request: JsonRpcRequest, listener: (response: JsonRpcResponse) => void): Promise<{
response: JsonRpcResponse;
close?: () => Promise<void>;
}> {

if (!request.method.startsWith('rpc.subscribe.')) {
throw new Error('subscribe rpc messages must include the `rpc.subscribe` prefix');
}

// extract optional `rpc.subscribe` param
const { rpc } = request.params;
const { subscribe } = rpc || {};
const subscriptionId = subscribe || uuidv4();

// When subscribing to a JSON RPC Message, we want to generate the subscription update Json PRC Id ahead of time and create a listener.
// We then set the subscription Id within a special rpc.subscribe params namespace preserving any other properties
request.params.rpc = {
...rpc,
subscribe: subscriptionId,
};

const messageHandler = (event: { data: any }):void => {
const jsonRpcResponse = JSON.parse(event.data.toString()) as JsonRpcResponse;
if (jsonRpcResponse.id === request.id) {
// if the incoming response id matches the request id, trigger the listener
return listener(jsonRpcResponse);
if (jsonRpcResponse.id === subscriptionId) {
if (jsonRpcResponse.error !== undefined) {
// remove the event listener upon receipt of a JSON RPC Error.
this.socket.removeEventListener('message', messageHandler);
}

listener(jsonRpcResponse);
}
};

// subscribe to the listener before sending the request
this.socket.addEventListener('message', messageHandler);
this.send(request);

return {
close: ():void => {
// removes the listener for this particular request
this.socket.removeEventListener('message', messageHandler);
const response = await this.request(request);
if (response.error) {
this.socket.removeEventListener('message', messageHandler);
return { response }
}

// clean up listener and create a `rpc.subscribe.close` message to use when closing this JSON RPC subscription
const close = async (): Promise<void> => {
this.socket.removeEventListener('message', messageHandler);
const requestId = uuidv4();
const request = createJsonRpcRequest(requestId, 'rpc.subscribe.close', { id: subscriptionId });
const response = await this.request(request);
if (response.error) {
throw response.error;
}
};
}

return {
response,
close
}
}

/**
Expand Down
12 changes: 8 additions & 4 deletions src/lib/json-rpc-router.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import type { Dwn, GenericMessage } from '@tbd54566975/dwn-sdk-js';
import type { Dwn, MessageEvent } from '@tbd54566975/dwn-sdk-js';

import type { Readable } from 'node:stream';

import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js';
import type { JsonRpcId, JsonRpcRequest, JsonRpcResponse } from './json-rpc.js';
import type { SocketConnection } from '../connection/socket-connection.js';

export type RequestContext = {
transport: 'http' | 'ws';
dwn: Dwn;
socketConnection?: SocketConnection;
/** The `GenericMessage` handler associated with a subscription request, only used in `ws` requests */
subscriptionHandler?: (message: GenericMessage) => void;
subscriptionRequest?: {
/** The JsonRpcId of the subscription handler */
id: JsonRpcId;
/** The `MessageEvent` handler associated with a subscription request, only used in `ws` requests */
subscriptionHandler: (message: MessageEvent) => void;
}
/** The `Readable` stream associated with a `RecordsWrite` request only used in `http` requests */
dataStream?: Readable;
};
Expand Down
8 changes: 7 additions & 1 deletion src/lib/json-rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ export interface JsonRpcError {
data?: any;
}

export interface JsonRpcSubscription {
/** JSON RPC Id of the Subscription Request */
id: JsonRpcId;
close: () => Promise<void>;
}

export enum JsonRpcErrorCodes {
// JSON-RPC 2.0 pre-defined errors
InvalidRequest = -32600,
Expand All @@ -37,7 +43,7 @@ export interface JsonRpcSuccessResponse {
jsonrpc: JsonRpcVersion;
id: JsonRpcId;
result: any;
error?: undefined;
error?: never;
}

export interface JsonRpcErrorResponse {
Expand Down
1 change: 0 additions & 1 deletion tests/http-api.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ describe('http api', function () {
expect(body.error).to.not.exist;

const { reply } = body.result;
console.log('reply', reply);
expect(reply.status.code).to.equal(202);
});

Expand Down
22 changes: 15 additions & 7 deletions tests/json-rpc-socket.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import chai, { expect } from 'chai';
import { v4 as uuidv4 } from 'uuid';
import { WebSocketServer } from 'ws';

import type { JsonRpcId, JsonRpcRequest, JsonRpcResponse } from '../src/lib/json-rpc.js';
import type { JsonRpcId, JsonRpcRequest, JsonRpcSuccessResponse } from '../src/lib/json-rpc.js';

import { JsonRpcSocket } from '../src/json-rpc-socket.js';
import { createJsonRpcRequest, createJsonRpcSuccessResponse } from '../src/lib/json-rpc.js';
Expand Down Expand Up @@ -69,26 +69,34 @@ describe('JsonRpcSocket', () => {
wsServer.addListener('connection', (socket) => {
socket.on('message', (dataBuffer: Buffer) => {
const request = JSON.parse(dataBuffer.toString()) as JsonRpcRequest;
// initial response
const response = createJsonRpcSuccessResponse(request.id, { reply: {} })
socket.send(Buffer.from(JSON.stringify(response)));

const { params } = request;
const { subscribe } = params.rpc || {};
// send 3 messages
for (let i = 0; i < 3; i++) {
const response = createJsonRpcSuccessResponse(request.id, { count: i });
const response = createJsonRpcSuccessResponse(subscribe, { count: i });
socket.send(Buffer.from(JSON.stringify(response)));
}
});
});
const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003');
const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003', { responseTimeout: 5 });
const requestId = uuidv4();
const request = createJsonRpcRequest(requestId, 'test.method', { param1: 'test-param1', param2: 'test-param2' });
const subscribeId = uuidv4();
const request = createJsonRpcRequest(requestId, 'rpc.subscribe.test.method', { param1: 'test-param1', param2: 'test-param2', rpc: { subscribe: subscribeId } });

let responseCounter = 0;
const responseListener = (response: JsonRpcResponse): void => {
expect(response.id).to.equal(request.id);
const responseListener = (response: JsonRpcSuccessResponse): void => {
expect(response.id).to.equal(subscribeId);
const { count } = response.result;
expect(count).to.equal(responseCounter);
responseCounter++;
}

const subscription = client.subscribe(request, responseListener);
const subscription = await client.subscribe(request, responseListener);
expect(subscription.response.error).to.be.undefined;
// wait for the messages to arrive
await new Promise((resolve) => setTimeout(resolve, 5));
// the original response
Expand Down
Loading

0 comments on commit 16694f2

Please sign in to comment.