Skip to content

Commit

Permalink
clearer comments related to connetion reference handling and close
Browse files Browse the repository at this point in the history
  • Loading branch information
LiranCohen committed Feb 13, 2024
1 parent 7ee2004 commit 55674ae
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 39 deletions.
30 changes: 22 additions & 8 deletions src/connection/socket-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,28 @@ import { DwnServerError, DwnServerErrorCode } from "../dwn-error.js";
const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive');
const HEARTBEAT_INTERVAL = 30_000;

export interface Subscription {
export interface JsonRPCSubscription {
/** JSON RPC Id of the Subscription Request */
id: JsonRpcId;
close: () => Promise<void>;
}

/**
* SocketConnection class sets up a socket connection along with a `ping/pong` heartbeat.
* SocketConnection handles a WebSocket connection to a DWN using JSON RPC.
* It also manages references to the long running RPC subscriptions for the connection.
*/
export class SocketConnection {
private heartbeatInterval: NodeJS.Timer;
private subscriptions: Map<JsonRpcId, Subscription> = new Map();
private subscriptions: Map<JsonRpcId, JsonRPCSubscription> = new Map();

constructor(
private socket: WebSocket,
private dwn: Dwn
){
socket.on('message', this.message.bind(this));
socket.on('close', this.close.bind(this));
socket.on('pong', this.pong.bind(this));
socket.on('error', this.error.bind(this));
socket.on('message', this.message.bind(this));
socket.on('pong', this.pong.bind(this));

// Sometimes connections between client <-> server can get borked in such a way that
// leaves both unaware of the borkage. ping messages can be used as a means to verify
Expand All @@ -52,7 +54,11 @@ export class SocketConnection {
}, HEARTBEAT_INTERVAL);
}

async subscribe(subscription: Subscription): Promise<void> {
/**
* Adds a reference for the JSON RPC Subscription to this connection.
* Used for cleanup if the connection is closed.
*/
async subscribe(subscription: JsonRPCSubscription): Promise<void> {
if (this.subscriptions.has(subscription.id)) {
throw new DwnServerError(
DwnServerErrorCode.ConnectionSubscriptionJsonRPCIdExists,
Expand All @@ -63,6 +69,11 @@ export class SocketConnection {
this.subscriptions.set(subscription.id, subscription);
}

/**
* Closes and removes the reference for a given subscription from this connection.
*
* @param id the `JsonRpcId` of the JSON RPC subscription request.
*/
async closeSubscription(id: JsonRpcId): Promise<void> {
if (!this.subscriptions.has(id)) {
throw new DwnServerError(
Expand All @@ -71,6 +82,8 @@ export class SocketConnection {
)
}

const connection = this.subscriptions.get(id);
await connection.close();
this.subscriptions.delete(id);
}

Expand All @@ -83,8 +96,9 @@ export class SocketConnection {
this.socket.removeAllListeners();

const closePromises = [];
for (const [_target, subscription] of this.subscriptions) {
for (const [id, subscription] of this.subscriptions) {
closePromises.push(subscription.close());
this.subscriptions.delete(id);
}

// close all of the associated subscriptions
Expand Down Expand Up @@ -158,7 +172,7 @@ export class SocketConnection {

/**
* Subscription Handler used to build the context for a `JSON RPC` API call.
* Wraps the incoming `message` in a `JSON RPC Success Response` using the origin subscription`JSON RPC Id` to send through the WebSocket.
* Wraps the incoming `message` in a `JSON RPC Success Response` using the original subscription`JSON RPC Id` to send through the WebSocket.
*/
private subscriptionHandler(id: JsonRpcId): (message: GenericMessage) => void {
return (message) => {
Expand Down
2 changes: 1 addition & 1 deletion src/dwn-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export class DwnServer {
let eventStream: EventStream | undefined;
if (this.config.webSocketServerEnabled) {
// setting `EventEmitterStream` as default the default `EventStream
// if an alternate implementation is needed instantiate a `Dwn` with a custom `EventStream` and add it to server options.
// if an alternate implementation is needed, instantiate a `Dwn` with a custom `EventStream` and add it to server options.
eventStream = new EventEmitterStream();
}

Expand Down
2 changes: 1 addition & 1 deletion src/json-rpc-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ import { handleSubscriptionsClose } from './json-rpc-handlers/subscriptions/inde
export const jsonRpcApi = new JsonRpcRouter();

jsonRpcApi.on('dwn.processMessage', handleDwnProcessMessage);
jsonRpcApi.on('subscriptions.close', handleSubscriptionsClose);
jsonRpcApi.on('subscription.close', handleSubscriptionsClose);
27 changes: 10 additions & 17 deletions src/json-rpc-handlers/dwn/process-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js';
import type { Readable as IsomorphicReadable } from 'readable-stream';
import { v4 as uuidv4 } from 'uuid';

import type {
JsonRpcErrorResponse,
} from '../../lib/json-rpc.js';
import type {
HandlerResponse,
JsonRpcHandler,
Expand All @@ -28,8 +25,8 @@ export const handleDwnProcessMessage: JsonRpcHandler = async (
const requestId = dwnRequest.id ?? uuidv4();

try {

// RecordsWrite is only supported on 'http'
// RecordsWrite is only supported on 'http' to support data stream for large data
// TODO: https://github.com/TBD54566975/dwn-server/issues/108
if (
transport !== 'http' &&
message.descriptor.interface === DwnInterfaceName.Records &&
Expand Down Expand Up @@ -71,30 +68,26 @@ export const handleDwnProcessMessage: JsonRpcHandler = async (
if (subscription !== undefined) {
const { close } = subscription;
try {
await socketConnection.subscribe({
id: requestId,
close,
})
// adding a reference to the close function for this subscription request to the connection.
// this will facilitate closing the subscription later.
await socketConnection.subscribe({ id: requestId, close });
delete reply.subscription.close // not serializable via JSON
} catch(error) {
let errorResponse: JsonRpcErrorResponse;
// close the subscription upon receiving an error here
await close();
if (error.code === DwnServerErrorCode.ConnectionSubscriptionJsonRPCIdExists) {
// a subscription with this request id already exists
errorResponse = createJsonRpcErrorResponse(
const errorResponse = createJsonRpcErrorResponse(
requestId,
JsonRpcErrorCodes.BadRequest,
`the request id ${requestId} already has an active subscription`
);
return { jsonRpcResponse: errorResponse };
} else {
// will catch as an unknown error below
throw new Error('unknown error adding subscription');
}

// close the subscription that was just opened and return an error
await close();
return { jsonRpcResponse: errorResponse };
}

delete reply.subscription.close // not serializable via JSON
}

const jsonRpcResponse = createJsonRpcSuccessResponse(requestId, { reply });
Expand Down
5 changes: 3 additions & 2 deletions src/json-rpc-handlers/subscriptions/close.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import {
} from '../../lib/json-rpc.js';

/**
* Closes a subscription for a given `id` for a given `SocketConnection`
* Closes a subscription tied to a specific `SocketConnection`.
*
* @param dwnRequest must include the `id` of the subscription to close within the `params`.
* @param dwnRequest must include JsonRpcId of the subscription request within the `params`.
* @param context must include the associated `SocketConnection`.
*
*/
Expand All @@ -30,6 +30,7 @@ export const handleSubscriptionsClose: JsonRpcHandler = async (

let jsonRpcResponse:JsonRpcResponse;
try {
// closing the subscription and cleaning up the reference within the given connection.
await socketConnection.closeSubscription(id);
jsonRpcResponse = createJsonRpcSuccessResponse(requestId, { reply: { status: 200, detail: 'Accepted' } });
} catch(error) {
Expand Down
24 changes: 15 additions & 9 deletions src/json-rpc-socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import type { JsonRpcRequest, JsonRpcResponse } from "./lib/json-rpc.js";
const CONNECT_TIMEOUT = 3_000;
const RESPONSE_TIMEOUT = 30_000;

export type JSONRPCSocketOptions = {
export interface JSONRPCSocketOptions {
connectTimeout?: number;
responseTimeout?: number;
onclose?: () => void;
onerror?: (error?: any) => void;
}

/**
Expand All @@ -20,17 +22,21 @@ export class JSONRPCSocket {
private constructor(private socket: WebSocket, private responseTimeout: number) {}

static async connect(url: string, options: JSONRPCSocketOptions = {}): Promise<JSONRPCSocket> {
const { connectTimeout = CONNECT_TIMEOUT, responseTimeout = RESPONSE_TIMEOUT } = options;
const { connectTimeout = CONNECT_TIMEOUT, responseTimeout = RESPONSE_TIMEOUT, onclose, onerror } = options;

const onclose = ():void => {
log.info(`JSON RPC Socket close ${url}`);
};
const socket = new WebSocket(url);
if (onclose === undefined) {
socket.onclose = ():void => {
log.info(`JSON RPC Socket close ${url}`);
}
}

const onerror = (event: any):void => {
log.error(`JSON RPC Socket error ${url}`, event);
};
if (onerror === undefined) {
socket.onerror = (error?: any):void => {
log.error(`JSON RPC Socket error ${url}`, error);
}
}

const socket = new WebSocket(url);
socket.onclose = onclose;
socket.onerror = onerror;

Expand Down
2 changes: 1 addition & 1 deletion tests/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ export async function subscriptionRequest(

const closeSubscription = async (id: JsonRpcId, connection: JSONRPCSocket): Promise<JsonRpcResponse> => {
const requestId = uuidv4();
const request = createJsonRpcRequest(requestId, 'subscriptions.close', { id });
const request = createJsonRpcRequest(requestId, 'subscription.close', { id });
return await connection.request(request);
}

Expand Down

0 comments on commit 55674ae

Please sign in to comment.