Skip to content

Commit

Permalink
refactor SocketConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
LiranCohen committed Feb 13, 2024
1 parent 4f659d3 commit 7ee2004
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 92 deletions.
4 changes: 1 addition & 3 deletions src/connection/connection-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import type { IncomingMessage } from "http";
import type { WebSocket } from 'ws';

import { SocketConnection } from "./socket-connection.js";
import { InMemorySubscriptionManager } from "../subscription-manager.js";

/**
* Interface for managing `WebSocket` connections as they arrive.
Expand All @@ -21,7 +20,6 @@ export interface ConnectionManager {
/**
* A Simple In Memory ConnectionManager implementation.
* It uses a `Map<WebSocket, SocketConnection>` to manage connections.
* It uses am `InMemorySubscriptionManager` for individual subscription management within the connection.
*/
export class InMemoryConnectionManager implements ConnectionManager {
constructor(private dwn: Dwn, private connections: Map<WebSocket, SocketConnection> = new Map()) {}
Expand All @@ -31,7 +29,7 @@ export class InMemoryConnectionManager implements ConnectionManager {
* Sets listeners for `message`, `pong`, `close`, and `error` events.
*/
async connect(socket: WebSocket): Promise<void> {
const connection = new SocketConnection(socket, this.dwn, new InMemorySubscriptionManager());
const connection = new SocketConnection(socket, this.dwn);
this.connections.set(socket, connection);
// attach to the socket's close handler to clean up this connection.
socket.on('close', () => {
Expand Down
47 changes: 40 additions & 7 deletions src/connection/socket-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,31 @@ import log from 'loglevel';
import { v4 as uuidv4 } from 'uuid';

import type { RequestContext } from "../lib/json-rpc-router.js";
import type { SubscriptionManager } from "../subscription-manager.js";
import type { JsonRpcErrorResponse, JsonRpcId, JsonRpcRequest, JsonRpcResponse } from "../lib/json-rpc.js";

import { requestCounter } from "../metrics.js";
import { jsonRpcApi } from "../json-rpc-api.js";
import { JsonRpcErrorCodes, createJsonRpcErrorResponse, createJsonRpcSuccessResponse } from "../lib/json-rpc.js";
import { DwnServerError, DwnServerErrorCode } from "../dwn-error.js";

const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive');
const HEARTBEAT_INTERVAL = 30_000;

export interface Subscription {
id: JsonRpcId;
close: () => Promise<void>;
}

/**
* SocketConnection class sets up a socket connection along with a `ping/pong` heartbeat.
*/
export class SocketConnection {
private heartbeatInterval: NodeJS.Timer;
private subscriptions: Map<JsonRpcId, Subscription> = new Map();

constructor(
private socket: WebSocket,
private dwn: Dwn,
private subscriptions: SubscriptionManager
private dwn: Dwn
){
socket.on('close', this.close.bind(this));
socket.on('pong', this.pong.bind(this));
Expand All @@ -46,6 +52,28 @@ export class SocketConnection {
}, HEARTBEAT_INTERVAL);
}

async subscribe(subscription: Subscription): Promise<void> {
if (this.subscriptions.has(subscription.id)) {
throw new DwnServerError(
DwnServerErrorCode.ConnectionSubscriptionJsonRPCIdExists,
`the subscription with id ${subscription.id} already exists`
)
}

this.subscriptions.set(subscription.id, subscription);
}

async closeSubscription(id: JsonRpcId): Promise<void> {
if (!this.subscriptions.has(id)) {
throw new DwnServerError(
DwnServerErrorCode.ConnectionSubscriptionJsonRPCIdNotFound,
`the subscription with id ${id} was not found`
)
}

this.subscriptions.delete(id);
}

/**
* Closes the existing connection and cleans up any listeners or subscriptions.
*/
Expand All @@ -54,8 +82,13 @@ export class SocketConnection {
// clean up all socket event listeners
this.socket.removeAllListeners();

const closePromises = [];
for (const [_target, subscription] of this.subscriptions) {
closePromises.push(subscription.close());
}

// close all of the associated subscriptions
await this.subscriptions.closeAll();
await Promise.all(closePromises);

// close the socket.
this.socket.close();
Expand Down Expand Up @@ -144,9 +177,9 @@ export class SocketConnection {
private async buildRequestContext(request: JsonRpcRequest): Promise<RequestContext> {
const { id, params, method} = request;
const requestContext: RequestContext = {
transport : 'ws',
dwn : this.dwn,
subscriptionManager : this.subscriptions,
transport : 'ws',
dwn : this.dwn,
socketConnection : this,
}

if (method === 'dwn.processMessage') {
Expand Down
3 changes: 2 additions & 1 deletion src/dwn-error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ export class DwnServerError extends Error {
* DWN Server error codes.
*/
export enum DwnServerErrorCode {
ConnectionSubscriptionJsonRPCIdExists = 'ConnectionSubscriptionJsonRPCIdExists',
ConnectionSubscriptionJsonRPCIdNotFound = 'ConnectionSubscriptionJsonRPCIdNotFound',
ProofOfWorkInsufficientSolutionNonce = 'ProofOfWorkInsufficientSolutionNonce',
ProofOfWorkInvalidOrExpiredChallenge = 'ProofOfWorkInvalidOrExpiredChallenge',
ProofOfWorkManagerInvalidChallengeNonce = 'ProofOfWorkManagerInvalidChallengeNonce',
ProofOfWorkManagerInvalidResponseNonceFormat = 'ProofOfWorkManagerInvalidResponseNonceFormat',
ProofOfWorkManagerResponseNonceReused = 'ProofOfWorkManagerResponseNonceReused',
RegistrationManagerInvalidOrOutdatedTermsOfServiceHash = 'RegistrationManagerInvalidOrOutdatedTermsOfServiceHash',
SubscriptionManagerSubscriptionNotFound = 'SubscriptionManagerSubscriptionNotFound',
TenantRegistrationOutdatedTermsOfService = 'TenantRegistrationOutdatedTermsOfService',
}
33 changes: 30 additions & 3 deletions src/json-rpc-handlers/dwn/process-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js';
import type { Readable as IsomorphicReadable } from 'readable-stream';
import { v4 as uuidv4 } from 'uuid';

import type {
JsonRpcErrorResponse,
} from '../../lib/json-rpc.js';
import type {
HandlerResponse,
JsonRpcHandler,
} from '../../lib/json-rpc-router.js';

import { DwnServerErrorCode } from '../../dwn-error.js';
import {
createJsonRpcErrorResponse,
createJsonRpcSuccessResponse,
Expand All @@ -19,7 +23,7 @@ export const handleDwnProcessMessage: JsonRpcHandler = async (
dwnRequest,
context,
) => {
const { dwn, dataStream, subscriptionHandler, subscriptionManager, transport } = context;
const { dwn, dataStream, subscriptionHandler, socketConnection, transport } = context;
const { target, message } = dwnRequest.params as { target: string, message: GenericMessage };
const requestId = dwnRequest.id ?? uuidv4();

Expand Down Expand Up @@ -65,8 +69,31 @@ export const handleDwnProcessMessage: JsonRpcHandler = async (

// Subscribe messages return a close function to facilitate closing the subscription
if (subscription !== undefined) {
const { id, close } = subscription;
subscriptionManager.subscribe(target, { id, close });
const { close } = subscription;
try {
await socketConnection.subscribe({
id: requestId,
close,
})
} catch(error) {
let errorResponse: JsonRpcErrorResponse;
if (error.code === DwnServerErrorCode.ConnectionSubscriptionJsonRPCIdExists) {
// a subscription with this request id already exists
errorResponse = createJsonRpcErrorResponse(
requestId,
JsonRpcErrorCodes.BadRequest,
`the request id ${requestId} already has an active subscription`
);
} else {
// will catch as an unknown error below
throw new Error('unknown error adding subscription');
}

// close the subscription that was just opened and return an error
await close();
return { jsonRpcResponse: errorResponse };
}

delete reply.subscription.close // not serializable via JSON
}

Expand Down
21 changes: 11 additions & 10 deletions src/json-rpc-handlers/subscriptions/close.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,40 @@ import type {
JsonRpcHandler,
} from '../../lib/json-rpc-router.js';

import type { JsonRpcResponse } from '../../lib/json-rpc.js';
import type { JsonRpcId, JsonRpcResponse } from '../../lib/json-rpc.js';
import {
createJsonRpcErrorResponse,
createJsonRpcSuccessResponse,
JsonRpcErrorCodes,
} from '../../lib/json-rpc.js';

/**
* Closes a subscription for a given `target` and `subscriptionId` within a given connection's `SubscriptionManager`.
* @param dwnRequest must include the `target` and `subscriptionId` within the `params`.
* @param context must include the `subscriptionManager` for the associated connection.
* Closes a subscription for a given `id` for a given `SocketConnection`
*
* @param dwnRequest must include the `id` of the subscription to close within the `params`.
* @param context must include the associated `SocketConnection`.
*
*/
export const handleSubscriptionsClose: JsonRpcHandler = async (
dwnRequest,
context,
) => {
const requestId = dwnRequest.id ?? uuidv4();
const { subscriptionManager } = context;
const { target, subscriptionId } = dwnRequest.params as { target: string, subscriptionId: string };
const { socketConnection } = context;
const { id } = dwnRequest.params as { id: JsonRpcId};

let jsonRpcResponse:JsonRpcResponse;
try {
await subscriptionManager.close(target, subscriptionId);
await socketConnection.closeSubscription(id);
jsonRpcResponse = createJsonRpcSuccessResponse(requestId, { reply: { status: 200, detail: 'Accepted' } });
} catch(error) {
if (error.code === DwnServerErrorCode.SubscriptionManagerSubscriptionNotFound) {
jsonRpcResponse = createJsonRpcErrorResponse(requestId, JsonRpcErrorCodes.InvalidParams, `subscription ${subscriptionId} does not exist.`);
if (error.code === DwnServerErrorCode.ConnectionSubscriptionJsonRPCIdNotFound) {
jsonRpcResponse = createJsonRpcErrorResponse(requestId, JsonRpcErrorCodes.InvalidParams, `subscription ${id} does not exist.`);
} else {
jsonRpcResponse = createJsonRpcErrorResponse(
requestId,
JsonRpcErrorCodes.InternalError,
`unknown subscription close error for ${subscriptionId}: ${error.message}`
`unknown subscription close error for ${id}: ${error.message}`
);
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/lib/json-rpc-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ import type { Dwn, MessageSubscriptionHandler } from '@tbd54566975/dwn-sdk-js';

import type { Readable } from 'node:stream';

import type { SubscriptionManager } from '../subscription-manager.js';
import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js';
import type { SocketConnection } from '../connection/socket-connection.js';

export type RequestContext = {
transport: 'http' | 'ws';
dwn: Dwn;
/** The `SubscriptionManager` associated with a subscription request, only used in `ws` requests */
subscriptionManager?: SubscriptionManager;
socketConnection?: SocketConnection;
/** The `MessageSubscriptionHandler` associated with a subscription request, only used in `ws` requests */
subscriptionHandler?: MessageSubscriptionHandler;
/** The `Readable` stream associated with a `RecordsWrite` request only used in `ws` requests */
Expand Down
60 changes: 0 additions & 60 deletions src/subscription-manager.ts

This file was deleted.

10 changes: 5 additions & 5 deletions tests/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type { Readable } from 'readable-stream';
import { fileURLToPath } from 'url';
import { WebSocket } from 'ws';

import type { JsonRpcResponse, JsonRpcRequest } from '../src/lib/json-rpc.js';
import type { JsonRpcResponse, JsonRpcRequest, JsonRpcId } from '../src/lib/json-rpc.js';
import { createJsonRpcRequest } from '../src/lib/json-rpc.js';
import { JSONRPCSocket } from '../src/json-rpc-socket.js';

Expand Down Expand Up @@ -230,12 +230,12 @@ export async function subscriptionRequest(
messageHandler: MessageSubscriptionHandler
): Promise<{ status: any, subscription?: { id: string, close: () => Promise<void> } }> {
let resolved: boolean = false;
const { params: { target } } = request;
const { id: requestId } = request;
const connection = await JSONRPCSocket.connect(url);

const closeSubscription = async (id: string, target: string, connection: JSONRPCSocket): Promise<JsonRpcResponse> => {
const closeSubscription = async (id: JsonRpcId, connection: JSONRPCSocket): Promise<JsonRpcResponse> => {
const requestId = uuidv4();
const request = createJsonRpcRequest(requestId, 'subscriptions.close', { subscriptionId: id, target });
const request = createJsonRpcRequest(requestId, 'subscriptions.close', { id });
return await connection.request(request);
}

Expand Down Expand Up @@ -263,7 +263,7 @@ export async function subscriptionRequest(
...subscription,
close: async (): Promise<void> => {
subscriptionClose();
const closeResponse = await closeSubscription(subscription.id, target, connection);
const closeResponse = await closeSubscription(requestId, connection);
if (closeResponse.error?.message !== undefined) {
throw new Error(`unable to close subscription: ${closeResponse.error.message}`);
}
Expand Down

0 comments on commit 7ee2004

Please sign in to comment.