Skip to content

Commit

Permalink
WebSocket Subscriptions via JRPC (#107)
Browse files Browse the repository at this point in the history
This PR enhances `WebSocket` support via `JSON RPC 2.0`.

This now supports long running subscriptions via `JSON RPC`.
The new `JsonRpcSocket` abstracts this by providing two
methods,`request` and `subscribe`.

`request` sends a JSON RPC message over the socket, setting up a message
handler to listen to the response, once the response is received it will
clean up the listener, and resolve the response to the promise. If a
response is not emitted within a given timeout, it will reject the
promise.

`subscribe` does something similar, however after receiving the response
to the subscribe message, it will keep the handler open and look for any
other messages that match the JSON RPC Id provided in the subscription request, and emit those to a
message handler. The subscribe method returns a close method in order to
clean up these listeners.

Some things to note:
- `RecordsWrite` are currently not supported via sockets, this is due to
a poor handling of the data payload in the prior implementation. Would
rather add this as a separate effort. TODO is tagged in code with the
issue listed below.
- `Subscribe` methods are currently only supported via sockets and hot
`http`.
- A `rpc.subscription.close` JSON RPC Method was added to close a
subscription that is active for a connection. I went back and forth
between making this a DWN Message vs some other signal. I landed on this
for the time being and am open to discussion. More notes below.
- As a part of `getDWNConfig`, `tenantGate` and `eventStream` were both
made optional, as well as the `registrationManager` for `HttpApi`. I did
this mostly out of convenience, but it also seems plausible that someone
might run this without any registration manager. Open to discuss/change.
- the `sendHttpMessage` method within `tests/utils.ts` will also be
replaced by full-fledged client, listed in a separate PR below.
- Current implementation allows anyone to connect, this will be
addressed in a subsequent PR, issue listed below.

### Usage of `subscription.close` JSON RPC method.

Q: Why not use a specific `Unsubscribe` DWN message such as
`RecordsUnsubscribe`?
A: This would be the first message of it's kind that would need to
specifically target the DWN and potentially transport of the DWN which
holds the subscription. Instead the DWN `RecordsSubscribe` message
returns a close method which the transport can keep a reference to given
a specific `JSON RPC Id`. This JSON RPC Id represents a specific request
to a transport that was created. Later a user can issue a
`rpc.subscription.close` JSON RPC Method to tell the server to close that
subscription.

Ultimately the owner of the JRPC Socket connection is in charge of
closing the subscription, they can close all subscriptions by simply
disconnecting. So it makes sense to give them the ability to close a
specific JRPC Subscription.

## Initial Effort Subsequent Issues/PRs:
- #111
- #109
- #110

### Separate effort:
-  #108
  • Loading branch information
LiranCohen authored Feb 27, 2024
1 parent 0e14b8d commit e1396cb
Show file tree
Hide file tree
Showing 28 changed files with 1,823 additions and 241 deletions.
48 changes: 41 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@
"devDependencies": {
"@types/bytes": "3.1.1",
"@types/chai": "4.3.4",
"@types/chai-as-promised": "7.1.5",
"@types/express": "4.17.17",
"@types/mocha": "10.0.1",
"@types/node": "18.11.18",
"@types/readable-stream": "4.0.6",
"@types/sinon": "17.0.3",
"@types/supertest": "2.0.12",
"@types/ws": "8.5.4",
"@typescript-eslint/eslint-plugin": "5.59.0",
Expand All @@ -77,7 +79,7 @@
"lint-staged": "^14.0.1",
"mocha": "^10.2.0",
"puppeteer": "^21.4.0",
"sinon": "16.1.0",
"sinon": "17.0.1",
"stream-browserify": "^3.0.0",
"supertest": "6.3.3",
"typescript": "^5.1.6"
Expand Down
39 changes: 39 additions & 0 deletions src/connection/connection-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import type { Dwn } from "@tbd54566975/dwn-sdk-js";

import type { IncomingMessage } from "http";
import type { WebSocket } from 'ws';

import { SocketConnection } from "./socket-connection.js";

/**
* Interface for managing `WebSocket` connections as they arrive.
*/
export interface ConnectionManager {
/** connect handler used for the `WebSockets` `'connection'` event. */
connect(socket: WebSocket, request?: IncomingMessage): Promise<void>;
/** closes all of the connections */
closeAll(): Promise<void>
}

/**
* A Simple In Memory ConnectionManager implementation.
* It uses a `Map<WebSocket, SocketConnection>` to manage connections.
*/
export class InMemoryConnectionManager implements ConnectionManager {
constructor(private dwn: Dwn, private connections: Map<WebSocket, SocketConnection> = new Map()) {}

async connect(socket: WebSocket): Promise<void> {
const connection = new SocketConnection(socket, this.dwn, () => {
// this is the onClose handler to clean up any closed connections.
this.connections.delete(socket);
});

this.connections.set(socket, connection);
}

async closeAll(): Promise<void> {
const closePromises = [];
this.connections.forEach(connection => closePromises.push(connection.close()));
await Promise.all(closePromises);
}
}
221 changes: 221 additions & 0 deletions src/connection/socket-connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
import type { Dwn, GenericMessage, MessageEvent } from "@tbd54566975/dwn-sdk-js";
import { DwnMethodName } from "@tbd54566975/dwn-sdk-js";

import type { WebSocket } from "ws";
import log from 'loglevel';
import { v4 as uuidv4 } from 'uuid';

import type { RequestContext } from "../lib/json-rpc-router.js";
import type { JsonRpcErrorResponse, JsonRpcId, JsonRpcRequest, JsonRpcResponse, JsonRpcSubscription } from "../lib/json-rpc.js";

import { requestCounter } from "../metrics.js";
import { jsonRpcRouter } from "../json-rpc-api.js";
import { JsonRpcErrorCodes, createJsonRpcErrorResponse, createJsonRpcSuccessResponse } from "../lib/json-rpc.js";
import { DwnServerError, DwnServerErrorCode } from "../dwn-error.js";

const HEARTBEAT_INTERVAL = 30_000;

/**
* SocketConnection handles a WebSocket connection to a DWN using JSON RPC.
* It also manages references to the long running RPC subscriptions for the connection.
*/
export class SocketConnection {
private heartbeatInterval: NodeJS.Timer;
private subscriptions: Map<JsonRpcId, JsonRpcSubscription> = new Map();
private isAlive: boolean;

constructor(
private socket: WebSocket,
private dwn: Dwn,
private onClose?: () => void
){
socket.on('message', this.message.bind(this));
socket.on('close', this.close.bind(this));
socket.on('error', this.error.bind(this));
socket.on('pong', this.pong.bind(this));

// Sometimes connections between client <-> server can get borked in such a way that
// leaves both unaware of the borkage. ping messages can be used as a means to verify
// that the remote endpoint is still responsive. Server will ping each socket every 30s
// if a pong hasn't received from a socket by the next ping, the server will terminate
// the socket connection
this.isAlive = true;
this.heartbeatInterval = setInterval(() => {
if (this.isAlive === false) {
this.close();
}
this.isAlive = false;
this.socket.ping();
}, HEARTBEAT_INTERVAL);
}

/**
* Checks to see if the incoming `JsonRpcId` is already in use for a subscription.
*/
hasSubscription(id: JsonRpcId): boolean {
return this.subscriptions.has(id);
}

/**
* Adds a reference for the JSON RPC Subscription to this connection.
* Used for cleanup if the connection is closed.
*/
async addSubscription(subscription: JsonRpcSubscription): Promise<void> {
if (this.subscriptions.has(subscription.id)) {
throw new DwnServerError(
DwnServerErrorCode.ConnectionSubscriptionJsonRpcIdExists,
`the subscription with id ${subscription.id} already exists`
)
}

this.subscriptions.set(subscription.id, subscription);
}

/**
* Closes and removes the reference for a given subscription from this connection.
*
* @param id the `JsonRpcId` of the JSON RPC subscription request.
*/
async closeSubscription(id: JsonRpcId): Promise<void> {
if (!this.subscriptions.has(id)) {
throw new DwnServerError(
DwnServerErrorCode.ConnectionSubscriptionJsonRpcIdNotFound,
`the subscription with id ${id} was not found`
)
}

const connection = this.subscriptions.get(id);
await connection.close();
this.subscriptions.delete(id);
}

/**
* Closes the existing connection and cleans up any listeners or subscriptions.
*/
async close(): Promise<void> {
clearInterval(this.heartbeatInterval);

// clean up all socket event listeners
this.socket.removeAllListeners();

const closePromises = [];
for (const [id, subscription] of this.subscriptions) {
closePromises.push(subscription.close());
this.subscriptions.delete(id);
}

// close all of the associated subscriptions
await Promise.all(closePromises);

// close the socket.
this.socket.close();

// if there was a close handler passed call it after the connection has been closed
if (this.onClose !== undefined) {
this.onClose();
}
}

/**
* Pong messages are automatically sent in response to ping messages as required by
* the websocket spec. So, no need to send explicit pongs.
*/
private pong(): void {
this.isAlive = true;
}

/**
* Log the error and close the connection.
*/
private async error(error:Error): Promise<void>{
log.error(`SocketConnection error, terminating connection`, error);
this.socket.terminate();
await this.close();
}

/**
* Handles a `JSON RPC 2.0` encoded message.
*/
private async message(dataBuffer: Buffer): Promise<void> {
const requestData = dataBuffer.toString();
if (!requestData) {
return this.send(createJsonRpcErrorResponse(
uuidv4(),
JsonRpcErrorCodes.BadRequest,
'request payload required.'
))
}

let jsonRequest: JsonRpcRequest;
try {
jsonRequest = JSON.parse(requestData);
} catch(error) {
const errorResponse = createJsonRpcErrorResponse(
uuidv4(),
JsonRpcErrorCodes.BadRequest,
(error as Error).message
);
return this.send(errorResponse);
};

const requestContext = await this.buildRequestContext(jsonRequest);
const { jsonRpcResponse } = await jsonRpcRouter.handle(jsonRequest, requestContext);
if (jsonRpcResponse.error) {
requestCounter.inc({ method: jsonRequest.method, error: 1 });
} else {
requestCounter.inc({
method: jsonRequest.method,
status: jsonRpcResponse?.result?.reply?.status?.code || 0,
});
}
this.send(jsonRpcResponse);
}

/**
* Sends a JSON encoded Buffer through the Websocket.
*/
private send(response: JsonRpcResponse | JsonRpcErrorResponse): void {
this.socket.send(Buffer.from(JSON.stringify(response)));
}

/**
* Creates a subscription handler to send messages matching the subscription requested.
*
* Wraps the incoming `message` in a `JSON RPC Success Response` using the original subscription`JSON RPC Id` to send through the WebSocket.
*/
private createSubscriptionHandler(id: JsonRpcId): (message: MessageEvent) => void {
return (event) => {
const response = createJsonRpcSuccessResponse(id, { event });
this.send(response);
}
}

/**
* Builds a `RequestContext` object to use with the `JSON RPC API`.
*
* Adds a `subscriptionHandler` for `Subscribe` messages.
*/
private async buildRequestContext(request: JsonRpcRequest): Promise<RequestContext> {
const { params, method, subscription } = request;

const requestContext: RequestContext = {
transport : 'ws',
dwn : this.dwn,
socketConnection : this,
}

// methods that expect a long-running subscription begin with `rpc.subscribe.`
if (method.startsWith('rpc.subscribe.') && subscription) {
const { message } = params as { message?: GenericMessage };
if (message?.descriptor.method === DwnMethodName.Subscribe) {
const handlerFunc = this.createSubscriptionHandler(subscription.id);
requestContext.subscriptionRequest = {
id: subscription.id,
subscriptionHandler: (message): void => handlerFunc(message),
}
}
}

return requestContext;
}
}
2 changes: 2 additions & 0 deletions src/dwn-error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ export class DwnServerError extends Error {
* DWN Server error codes.
*/
export enum DwnServerErrorCode {
ConnectionSubscriptionJsonRpcIdExists = 'ConnectionSubscriptionJsonRpcIdExists',
ConnectionSubscriptionJsonRpcIdNotFound = 'ConnectionSubscriptionJsonRpcIdNotFound',
ProofOfWorkInsufficientSolutionNonce = 'ProofOfWorkInsufficientSolutionNonce',
ProofOfWorkInvalidOrExpiredChallenge = 'ProofOfWorkInvalidOrExpiredChallenge',
ProofOfWorkManagerInvalidChallengeNonce = 'ProofOfWorkManagerInvalidChallengeNonce',
Expand Down
Loading

0 comments on commit e1396cb

Please sign in to comment.