Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSocket Subscriptions via JRPC #107

Merged
merged 40 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
dec147b
initial pass at server supporting long running subscriptions via sockets
LiranCohen Feb 9, 2024
aaa6aa2
json rpc cleanup
LiranCohen Feb 10, 2024
6f46c01
brought back requestCounter functionality for jsonrpc response/error
LiranCohen Feb 11, 2024
523558d
add JSONRPCSocket class to handle requests and long-running subscript…
LiranCohen Feb 12, 2024
5b79176
handle connection close not found error explicitly, general clean up
LiranCohen Feb 12, 2024
81f3ccd
replace crypto.randomUUID()
LiranCohen Feb 12, 2024
1dd2943
clean up JSON RPC Socket client
LiranCohen Feb 12, 2024
919df3b
optional tenant gate and event stream in options
LiranCohen Feb 12, 2024
eac8d3b
refactor into separate files, account for null errors, use loglevel
LiranCohen Feb 12, 2024
0669f04
uncesseary chagnge
LiranCohen Feb 12, 2024
f2b71d3
update comments
LiranCohen Feb 13, 2024
47279cd
refactor SocketConnection
LiranCohen Feb 13, 2024
d9b731a
clearer comments related to connetion reference handling and close
LiranCohen Feb 13, 2024
c14ab4f
increase coverage in ws-api and dwn-server
LiranCohen Feb 13, 2024
9451a26
clean up listener on reject
LiranCohen Feb 14, 2024
5ed80e0
additional JSON RPC Socket client coverage
LiranCohen Feb 14, 2024
0f0a223
remove only test decorator
LiranCohen Feb 14, 2024
be4a981
addressed PR suggestions
LiranCohen Feb 14, 2024
50dbb89
remove close connection
LiranCohen Feb 14, 2024
18e4a5c
optional socket send logger for SocketConnection.send()
LiranCohen Feb 14, 2024
7bb207a
rename onError to callback
LiranCohen Feb 14, 2024
c837cd7
rename some classes for consistency, update coments as per PR review
LiranCohen Feb 15, 2024
b2eacec
update chai, added types for chai and sinon
LiranCohen Feb 15, 2024
e12028f
address review suggestions
LiranCohen Feb 15, 2024
97a1ef9
review comments
LiranCohen Feb 15, 2024
1c9adee
match class name in test
LiranCohen Feb 15, 2024
d699035
fix tests
LiranCohen Feb 16, 2024
16694f2
update jrpc subscription support - more error proof and concise
LiranCohen Feb 21, 2024
fcdf960
added `rpc.subscribe.close` method and handling
LiranCohen Feb 22, 2024
7de0ad1
increase testing across the board
LiranCohen Feb 23, 2024
f54fac2
remove utils that were not very useful
LiranCohen Feb 23, 2024
a1caaae
increase test covrage for connection manager and add comments
LiranCohen Feb 27, 2024
7c9983f
review comments
LiranCohen Feb 27, 2024
6bb0ede
review suggestions
LiranCohen Feb 27, 2024
db403d6
add coverage to json-rpc-socet
LiranCohen Feb 27, 2024
72c8a1d
removed test code
LiranCohen Feb 27, 2024
fdc0ee5
unecessary socket injection
LiranCohen Feb 27, 2024
077db21
remove unecessary JsonRpcParams = any export
LiranCohen Feb 27, 2024
ad90dd2
update comment
LiranCohen Feb 27, 2024
95bb520
remove unused test internals
LiranCohen Feb 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 24 additions & 25 deletions src/connection/socket-connection.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import type { Dwn, GenericMessage } from "@tbd54566975/dwn-sdk-js";
import type { Dwn, GenericMessage, MessageEvent } from "@tbd54566975/dwn-sdk-js";
import { DwnMethodName } from "@tbd54566975/dwn-sdk-js";

import type { WebSocket } from "ws";
import log from 'loglevel';
import { v4 as uuidv4 } from 'uuid';

import type { RequestContext } from "../lib/json-rpc-router.js";
import type { JsonRpcErrorResponse, JsonRpcId, JsonRpcRequest, JsonRpcResponse } from "../lib/json-rpc.js";
import type { JsonRpcErrorResponse, JsonRpcId, JsonRpcRequest, JsonRpcResponse, JsonRpcSubscription } from "../lib/json-rpc.js";

import { requestCounter } from "../metrics.js";
import { jsonRpcApi } from "../json-rpc-api.js";
Expand All @@ -15,12 +15,6 @@ import { DwnServerError, DwnServerErrorCode } from "../dwn-error.js";

const HEARTBEAT_INTERVAL = 30_000;

export interface JsonRpcSubscription {
/** JSON RPC Id of the Subscription Request */
id: JsonRpcId;
close: () => Promise<void>;
}

/**
* SocketConnection handles a WebSocket connection to a DWN using JSON RPC.
* It also manages references to the long running RPC subscriptions for the connection.
Expand Down Expand Up @@ -54,6 +48,13 @@ export class SocketConnection {
}, HEARTBEAT_INTERVAL);
}

/**
* Checks to see if the incoming `JsonRpcId` is already in use for a subscription.
*/
hasSubscription(id: JsonRpcId): boolean {
return this.subscriptions.has(id);
}

/**
* Adds a reference for the JSON RPC Subscription to this connection.
* Used for cleanup if the connection is closed.
Expand Down Expand Up @@ -147,7 +148,6 @@ export class SocketConnection {
JsonRpcErrorCodes.BadRequest,
(error as Error).message
);

return this.send(errorResponse);
};

Expand All @@ -165,26 +165,20 @@ export class SocketConnection {
}

/**
* Sends a JSON encoded Buffer through the Websocket. Accepts a callback, if none is provided an error logger is used.
* Sends a JSON encoded Buffer through the Websocket.
*/
private send(response: JsonRpcResponse | JsonRpcErrorResponse, cb?: (error?: Error) => void): void {
if (!cb) {
cb = (error):void => {
if(error) { log.error('socket send error', error, response); }
}
}

this.socket.send(Buffer.from(JSON.stringify(response)), cb);
private send(response: JsonRpcResponse | JsonRpcErrorResponse): void {
this.socket.send(Buffer.from(JSON.stringify(response)));
}

/**
* Creates a subscription handler to send messages matching the subscription requested.
*
* Wraps the incoming `message` in a `JSON RPC Success Response` using the original subscription`JSON RPC Id` to send through the WebSocket.
*/
private createSubscriptionHandler(id: JsonRpcId): (message: GenericMessage) => void {
private createSubscriptionHandler(id: JsonRpcId): (message: MessageEvent) => void {
return (event) => {
const response = createJsonRpcSuccessResponse(id, { reply: { event } });
const response = createJsonRpcSuccessResponse(id, { event });
this.send(response);
}
}
Expand All @@ -195,17 +189,22 @@ export class SocketConnection {
* Adds a `subscriptionHandler` for `Subscribe` messages.
*/
private async buildRequestContext(request: JsonRpcRequest): Promise<RequestContext> {
const { id, params, method } = request;
const { params, method, subscribe } = request;

const requestContext: RequestContext = {
transport : 'ws',
dwn : this.dwn,
socketConnection : this,
}

if (method === 'dwn.processMessage') {
const { message } = params as { message: GenericMessage };
if (message.descriptor.method === DwnMethodName.Subscribe) {
requestContext.subscriptionHandler = this.createSubscriptionHandler(id).bind(this);
if (method.startsWith('rpc.subscribe.') && subscribe) {
const { message } = params as { message?: GenericMessage };
if (message?.descriptor.method === DwnMethodName.Subscribe) {
const handlerFunc = this.createSubscriptionHandler(subscribe.id);
requestContext.subscriptionRequest = {
id: subscribe.id,
subscriptionHandler: (message): void => handlerFunc(message),
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/json-rpc-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ import { handleSubscriptionsClose } from './json-rpc-handlers/subscription/index
export const jsonRpcApi = new JsonRpcRouter();
thehenrytsai marked this conversation as resolved.
Show resolved Hide resolved

jsonRpcApi.on('dwn.processMessage', handleDwnProcessMessage);
jsonRpcApi.on('subscription.close', handleSubscriptionsClose);
jsonRpcApi.on('rpc.subscribe.dwn.processMessage', handleDwnProcessMessage);
thehenrytsai marked this conversation as resolved.
Show resolved Hide resolved

jsonRpcApi.on('rpc.subscribe.close', handleSubscriptionsClose);
67 changes: 36 additions & 31 deletions src/json-rpc-handlers/dwn/process-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,27 @@ import type { GenericMessage } from '@tbd54566975/dwn-sdk-js';
import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js';

import type { Readable as IsomorphicReadable } from 'readable-stream';
import log from 'loglevel';
import { v4 as uuidv4 } from 'uuid';

import type { JsonRpcSubscription } from '../../lib/json-rpc.js';
import type {
HandlerResponse,
JsonRpcHandler,
} from '../../lib/json-rpc-router.js';

import { DwnServerErrorCode } from '../../dwn-error.js';
import {
createJsonRpcErrorResponse,
createJsonRpcSuccessResponse,
JsonRpcErrorCodes,
} from '../../lib/json-rpc.js';
import log from 'loglevel';


export const handleDwnProcessMessage: JsonRpcHandler = async (
dwnRequest,
context,
) => {
const { dwn, dataStream, subscriptionHandler, socketConnection, transport } = context;
const { dwn, dataStream, subscriptionRequest, socketConnection, transport } = context;
const { target, message } = dwnRequest.params as { target: string, message: GenericMessage };
const requestId = dwnRequest.id ?? uuidv4();

Expand All @@ -41,23 +42,41 @@ export const handleDwnProcessMessage: JsonRpcHandler = async (
return { jsonRpcResponse };
}

// subscribe methods must come with a subscriptionRequest context
if (message.descriptor.method === DwnMethodName.Subscribe && subscriptionRequest === undefined) {
const jsonRpcResponse = createJsonRpcErrorResponse(
requestId,
JsonRpcErrorCodes.InvalidRequest,
`subscribe methods must contain a subscriptionRequest context`
);
return { jsonRpcResponse };
}

// Subscribe methods are only supported on 'ws' (WebSockets)
if (transport !== 'ws' && message.descriptor.method === DwnMethodName.Subscribe) {
if (transport !== 'ws' && subscriptionRequest !== undefined) {
const jsonRpcResponse = createJsonRpcErrorResponse(
requestId,
JsonRpcErrorCodes.InvalidParams,
`Subscribe not supported via ${context.transport}`
`subscriptions are not supported via ${context.transport}`
)
return { jsonRpcResponse };
}

if (subscriptionRequest !== undefined && socketConnection?.hasSubscription(subscriptionRequest.id)) {
const jsonRpcResponse = createJsonRpcErrorResponse(
requestId,
JsonRpcErrorCodes.InvalidParams,
`the subscribe id: ${subscriptionRequest.id} is in use by an active subscription`
)
return { jsonRpcResponse };
}

const reply = await dwn.processMessage(target, message, {
dataStream: dataStream as IsomorphicReadable,
subscriptionHandler,
subscriptionHandler: subscriptionRequest?.subscriptionHandler,
});

const { record, subscription } = reply;

const { record } = reply;
// RecordsRead messages return record data as a stream to for accommodate large amounts of data
let recordDataStream: IsomorphicReadable;
if (record !== undefined && record.data !== undefined) {
Expand All @@ -66,29 +85,16 @@ export const handleDwnProcessMessage: JsonRpcHandler = async (
}

// Subscribe messages return a close function to facilitate closing the subscription
if (subscription !== undefined) {
const { close } = subscription;
try {
// adding a reference to the close function for this subscription request to the connection.
// this will facilitate closing the subscription later.
await socketConnection.addSubscription({ id: requestId, close });
delete reply.subscription.close // not serializable via JSON
} catch(error) {
// close the subscription upon receiving an error here
await close();
if (error.code === DwnServerErrorCode.ConnectionSubscriptionJsonRpcIdExists) {
// a subscription with this request id already exists
const errorResponse = createJsonRpcErrorResponse(
requestId,
JsonRpcErrorCodes.BadRequest,
`the request id ${requestId} already has an active subscription`
);
return { jsonRpcResponse: errorResponse };
} else {
// will catch as an unknown error below
throw new Error('unknown error adding subscription');
}
if (subscriptionRequest && reply.subscription) {
const { close } = reply.subscription;
// we add a reference to the close function for this subscription request to the socket connection.
// this will facilitate closing the subscription later.
const subscriptionReply: JsonRpcSubscription = {
id: subscriptionRequest.id,
close,
}
await socketConnection.addSubscription(subscriptionReply);
delete reply.subscription.close // delete the close method from the reply as it's not JSON serializable and has a held reference.
}

const jsonRpcResponse = createJsonRpcSuccessResponse(requestId, { reply });
Expand All @@ -107,7 +113,6 @@ export const handleDwnProcessMessage: JsonRpcHandler = async (

// log the error response
log.error('handleDwnProcessMessage error', jsonRpcResponse);

return { jsonRpcResponse } as HandlerResponse;
}
};
12 changes: 11 additions & 1 deletion src/json-rpc-handlers/subscription/close.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,18 @@ export const handleSubscriptionsClose: JsonRpcHandler = async (
context,
) => {
const requestId = dwnRequest.id ?? uuidv4();
thehenrytsai marked this conversation as resolved.
Show resolved Hide resolved
if (context.socketConnection === undefined) {
const jsonRpcResponse = createJsonRpcErrorResponse(requestId, JsonRpcErrorCodes.InvalidRequest, 'socket connection does not exist');
return { jsonRpcResponse };
}

if (dwnRequest.subscribe === undefined) {
const jsonRpcResponse = createJsonRpcErrorResponse(requestId, JsonRpcErrorCodes.InvalidRequest, 'subscribe options do not exist');
return { jsonRpcResponse };
}

const { socketConnection } = context;
const { id } = dwnRequest.params as { id: JsonRpcId};
const { id } = dwnRequest.subscribe as { id: JsonRpcId };

let jsonRpcResponse:JsonRpcResponse;
try {
Expand Down
60 changes: 44 additions & 16 deletions src/json-rpc-socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import log from 'loglevel';
import { v4 as uuidv4 } from 'uuid';
import WebSocket from 'ws';

import type { JsonRpcRequest, JsonRpcResponse } from "./lib/json-rpc.js";
import type { JsonRpcId, JsonRpcRequest, JsonRpcResponse } from "./lib/json-rpc.js";
import { createJsonRpcSubscribeRequest } from "./lib/json-rpc.js";

// These were arbitrarily chosen, but can be modified via connect options
const CONNECT_TIMEOUT = 3_000;
Expand Down Expand Up @@ -72,7 +73,6 @@ export class JsonRpcSocket {
return resolve(jsonRpsResponse);
}
};

// subscribe to the listener before sending the request
this.socket.addEventListener('message', handleResponse);
this.send(request);
Expand All @@ -89,33 +89,61 @@ export class JsonRpcSocket {
* Sends a JSON-RPC request through the socket and keeps a listener open to read associated responses as they arrive.
* Returns a close method to clean up the listener.
*/
subscribe(request: JsonRpcRequest, listener: (response: JsonRpcResponse) => void): { close: () => void } {
request.id ??= uuidv4();
async subscribe(request: JsonRpcRequest, listener: (response: JsonRpcResponse) => void): Promise<{
response: JsonRpcResponse;
close?: () => Promise<void>;
}> {

if (!request.method.startsWith('rpc.subscribe.')) {
throw new Error('subscribe rpc requests must include the `rpc.subscribe` prefix');
}

if (!request.subscribe) {
throw new Error('subscribe rpc requests must include subscribe options');
}

const subscriptionId = request.subscribe.id;
const messageHandler = (event: { data: any }):void => {
thehenrytsai marked this conversation as resolved.
Show resolved Hide resolved
const jsonRpcResponse = JSON.parse(event.data.toString()) as JsonRpcResponse;
if (jsonRpcResponse.id === request.id) {
// if the incoming response id matches the request id, trigger the listener
return listener(jsonRpcResponse);
if (jsonRpcResponse.id === subscriptionId) {
if (jsonRpcResponse.error !== undefined) {
// remove the event listener upon receipt of a JSON RPC Error.
this.socket.removeEventListener('message', messageHandler);
this.closeSubscription(subscriptionId);
}
listener(jsonRpcResponse);
thehenrytsai marked this conversation as resolved.
Show resolved Hide resolved
}
};

// subscribe to the listener before sending the request
this.socket.addEventListener('message', messageHandler);
this.send(request);

const response = await this.request(request);
if (response.error) {
this.socket.removeEventListener('message', messageHandler);
return { response }
}

// clean up listener and create a `rpc.subscribe.close` message to use when closing this JSON RPC subscription
const close = async (): Promise<void> => {
this.socket.removeEventListener('message', messageHandler);
await this.closeSubscription(subscriptionId);
}

return {
close: ():void => {
// removes the listener for this particular request
this.socket.removeEventListener('message', messageHandler);
}
};
response,
close
}
}

private closeSubscription(id: JsonRpcId): Promise<JsonRpcResponse> {
const requestId = uuidv4();
const request = createJsonRpcSubscribeRequest(requestId, 'rpc.subscribe.close', {}, id);
thehenrytsai marked this conversation as resolved.
Show resolved Hide resolved
return this.request(request);
}

/**
* Sends a JSON-RPC request through the socket. You must subscribe to a message listener separately to capture the response.
*/
send(request: JsonRpcRequest):void {
this.socket.send(Buffer.from(JSON.stringify(request)));
return;
}
}
12 changes: 8 additions & 4 deletions src/lib/json-rpc-router.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import type { Dwn, GenericMessage } from '@tbd54566975/dwn-sdk-js';
import type { Dwn, EventSubscriptionHandler } from '@tbd54566975/dwn-sdk-js';

import type { Readable } from 'node:stream';

import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js';
import type { JsonRpcId, JsonRpcRequest, JsonRpcResponse } from './json-rpc.js';
import type { SocketConnection } from '../connection/socket-connection.js';

export type RequestContext = {
transport: 'http' | 'ws';
dwn: Dwn;
socketConnection?: SocketConnection;
/** The `GenericMessage` handler associated with a subscription request, only used in `ws` requests */
subscriptionHandler?: (message: GenericMessage) => void;
subscriptionRequest?: {
/** The JsonRpcId of the subscription handler */
id: JsonRpcId;
/** The `MessageEvent` handler associated with a subscription request, only used in `ws` requests */
subscriptionHandler: EventSubscriptionHandler;
}
/** The `Readable` stream associated with a `RecordsWrite` request only used in `http` requests */
dataStream?: Readable;
};
Expand Down
Loading
Loading