diff --git a/package-lock.json b/package-lock.json index 45359d5..f111f15 100644 --- a/package-lock.json +++ b/package-lock.json @@ -35,10 +35,12 @@ "devDependencies": { "@types/bytes": "3.1.1", "@types/chai": "4.3.4", + "@types/chai-as-promised": "7.1.5", "@types/express": "4.17.17", "@types/mocha": "10.0.1", "@types/node": "18.11.18", "@types/readable-stream": "4.0.6", + "@types/sinon": "17.0.3", "@types/supertest": "2.0.12", "@types/ws": "8.5.4", "@typescript-eslint/eslint-plugin": "5.59.0", @@ -62,7 +64,7 @@ "lint-staged": "^14.0.1", "mocha": "^10.2.0", "puppeteer": "^21.4.0", - "sinon": "16.1.0", + "sinon": "17.0.1", "stream-browserify": "^3.0.0", "supertest": "6.3.3", "typescript": "^5.1.6" @@ -743,6 +745,15 @@ "integrity": "sha512-KnRanxnpfpjUTqTCXslZSEdLfXExwgNxYPdiO2WGUj8+HDjFi8R3k5RVKPeSCzLjCcshCAtVO2QBbVuAV4kTnw==", "dev": true }, + "node_modules/@types/chai-as-promised": { + "version": "7.1.5", + "resolved": "https://registry.npmjs.org/@types/chai-as-promised/-/chai-as-promised-7.1.5.tgz", + "integrity": "sha512-jStwss93SITGBwt/niYrkf2C+/1KTeZCZl1LaeezTlqppAKeoQC7jxyqYuP72sxBGKCIbw7oHgbYssIRzT5FCQ==", + "dev": true, + "dependencies": { + "@types/chai": "*" + } + }, "node_modules/@types/connect": { "version": "3.4.38", "resolved": "https://registry.npmjs.org/@types/connect/-/connect-3.4.38.tgz", @@ -888,6 +899,21 @@ "@types/node": "*" } }, + "node_modules/@types/sinon": { + "version": "17.0.3", + "resolved": "https://registry.npmjs.org/@types/sinon/-/sinon-17.0.3.tgz", + "integrity": "sha512-j3uovdn8ewky9kRBG19bOwaZbexJu/XjtkHyjvUgt4xfPFz18dcORIMqnYh66Fx3Powhcr85NT5+er3+oViapw==", + "dev": true, + "dependencies": { + "@types/sinonjs__fake-timers": "*" + } + }, + "node_modules/@types/sinonjs__fake-timers": { + "version": "8.1.5", + "resolved": "https://registry.npmjs.org/@types/sinonjs__fake-timers/-/sinonjs__fake-timers-8.1.5.tgz", + "integrity": "sha512-mQkU2jY8jJEF7YHjHvsQO8+3ughTL1mcnn96igfhONmR+fUPSKIkefQYpSe8bsly2Ep7oQbn/6VG5/9/0qcArQ==", + "dev": true + }, "node_modules/@types/superagent": { "version": "4.1.21", "resolved": "https://registry.npmjs.org/@types/superagent/-/superagent-4.1.21.tgz", @@ -8222,17 +8248,16 @@ } }, "node_modules/sinon": { - "version": "16.1.0", - "resolved": "https://registry.npmjs.org/sinon/-/sinon-16.1.0.tgz", - "integrity": "sha512-ZSgzF0vwmoa8pq0GEynqfdnpEDyP1PkYmEChnkjW0Vyh8IDlyFEJ+fkMhCP0il6d5cJjPl2PUsnUSAuP5sttOQ==", - "deprecated": "16.1.1", + "version": "17.0.1", + "resolved": "https://registry.npmjs.org/sinon/-/sinon-17.0.1.tgz", + "integrity": "sha512-wmwE19Lie0MLT+ZYNpDymasPHUKTaZHUH/pKEubRXIzySv9Atnlw+BUMGCzWgV7b7wO+Hw6f1TEOr0IUnmU8/g==", "dev": true, "dependencies": { "@sinonjs/commons": "^3.0.0", - "@sinonjs/fake-timers": "^10.3.0", + "@sinonjs/fake-timers": "^11.2.2", "@sinonjs/samsam": "^8.0.0", "diff": "^5.1.0", - "nise": "^5.1.4", + "nise": "^5.1.5", "supports-color": "^7.2.0" }, "funding": { @@ -8240,6 +8265,15 @@ "url": "https://opencollective.com/sinon" } }, + "node_modules/sinon/node_modules/@sinonjs/fake-timers": { + "version": "11.2.2", + "resolved": "https://registry.npmjs.org/@sinonjs/fake-timers/-/fake-timers-11.2.2.tgz", + "integrity": "sha512-G2piCSxQ7oWOxwGSAyFHfPIsyeJGXYtc6mFbnFA+kRXkiEnTl8c/8jul2S329iFBnDI9HGoeWWAZvuvOkZccgw==", + "dev": true, + "dependencies": { + "@sinonjs/commons": "^3.0.0" + } + }, "node_modules/sinon/node_modules/diff": { "version": "5.1.0", "resolved": "https://registry.npmjs.org/diff/-/diff-5.1.0.tgz", diff --git a/package.json b/package.json index 9c5365d..fd60e4c 100644 --- a/package.json +++ b/package.json @@ -50,10 +50,12 @@ "devDependencies": { "@types/bytes": "3.1.1", "@types/chai": "4.3.4", + "@types/chai-as-promised": "7.1.5", "@types/express": "4.17.17", "@types/mocha": "10.0.1", "@types/node": "18.11.18", "@types/readable-stream": "4.0.6", + "@types/sinon": "17.0.3", "@types/supertest": "2.0.12", "@types/ws": "8.5.4", "@typescript-eslint/eslint-plugin": "5.59.0", @@ -77,7 +79,7 @@ "lint-staged": "^14.0.1", "mocha": "^10.2.0", "puppeteer": "^21.4.0", - "sinon": "16.1.0", + "sinon": "17.0.1", "stream-browserify": "^3.0.0", "supertest": "6.3.3", "typescript": "^5.1.6" diff --git a/src/connection/connection-manager.ts b/src/connection/connection-manager.ts new file mode 100644 index 0000000..b98aae8 --- /dev/null +++ b/src/connection/connection-manager.ts @@ -0,0 +1,39 @@ +import type { Dwn } from "@tbd54566975/dwn-sdk-js"; + +import type { IncomingMessage } from "http"; +import type { WebSocket } from 'ws'; + +import { SocketConnection } from "./socket-connection.js"; + +/** + * Interface for managing `WebSocket` connections as they arrive. + */ +export interface ConnectionManager { + /** connect handler used for the `WebSockets` `'connection'` event. */ + connect(socket: WebSocket, request?: IncomingMessage): Promise; + /** closes all of the connections */ + closeAll(): Promise +} + +/** + * A Simple In Memory ConnectionManager implementation. + * It uses a `Map` to manage connections. + */ +export class InMemoryConnectionManager implements ConnectionManager { + constructor(private dwn: Dwn, private connections: Map = new Map()) {} + + async connect(socket: WebSocket): Promise { + const connection = new SocketConnection(socket, this.dwn, () => { + // this is the onClose handler to clean up any closed connections. + this.connections.delete(socket); + }); + + this.connections.set(socket, connection); + } + + async closeAll(): Promise { + const closePromises = []; + this.connections.forEach(connection => closePromises.push(connection.close())); + await Promise.all(closePromises); + } +} \ No newline at end of file diff --git a/src/connection/socket-connection.ts b/src/connection/socket-connection.ts new file mode 100644 index 0000000..49625c1 --- /dev/null +++ b/src/connection/socket-connection.ts @@ -0,0 +1,221 @@ +import type { Dwn, GenericMessage, MessageEvent } from "@tbd54566975/dwn-sdk-js"; +import { DwnMethodName } from "@tbd54566975/dwn-sdk-js"; + +import type { WebSocket } from "ws"; +import log from 'loglevel'; +import { v4 as uuidv4 } from 'uuid'; + +import type { RequestContext } from "../lib/json-rpc-router.js"; +import type { JsonRpcErrorResponse, JsonRpcId, JsonRpcRequest, JsonRpcResponse, JsonRpcSubscription } from "../lib/json-rpc.js"; + +import { requestCounter } from "../metrics.js"; +import { jsonRpcRouter } from "../json-rpc-api.js"; +import { JsonRpcErrorCodes, createJsonRpcErrorResponse, createJsonRpcSuccessResponse } from "../lib/json-rpc.js"; +import { DwnServerError, DwnServerErrorCode } from "../dwn-error.js"; + +const HEARTBEAT_INTERVAL = 30_000; + +/** + * SocketConnection handles a WebSocket connection to a DWN using JSON RPC. + * It also manages references to the long running RPC subscriptions for the connection. + */ +export class SocketConnection { + private heartbeatInterval: NodeJS.Timer; + private subscriptions: Map = new Map(); + private isAlive: boolean; + + constructor( + private socket: WebSocket, + private dwn: Dwn, + private onClose?: () => void + ){ + socket.on('message', this.message.bind(this)); + socket.on('close', this.close.bind(this)); + socket.on('error', this.error.bind(this)); + socket.on('pong', this.pong.bind(this)); + + // Sometimes connections between client <-> server can get borked in such a way that + // leaves both unaware of the borkage. ping messages can be used as a means to verify + // that the remote endpoint is still responsive. Server will ping each socket every 30s + // if a pong hasn't received from a socket by the next ping, the server will terminate + // the socket connection + this.isAlive = true; + this.heartbeatInterval = setInterval(() => { + if (this.isAlive === false) { + this.close(); + } + this.isAlive = false; + this.socket.ping(); + }, HEARTBEAT_INTERVAL); + } + + /** + * Checks to see if the incoming `JsonRpcId` is already in use for a subscription. + */ + hasSubscription(id: JsonRpcId): boolean { + return this.subscriptions.has(id); + } + + /** + * Adds a reference for the JSON RPC Subscription to this connection. + * Used for cleanup if the connection is closed. + */ + async addSubscription(subscription: JsonRpcSubscription): Promise { + if (this.subscriptions.has(subscription.id)) { + throw new DwnServerError( + DwnServerErrorCode.ConnectionSubscriptionJsonRpcIdExists, + `the subscription with id ${subscription.id} already exists` + ) + } + + this.subscriptions.set(subscription.id, subscription); + } + + /** + * Closes and removes the reference for a given subscription from this connection. + * + * @param id the `JsonRpcId` of the JSON RPC subscription request. + */ + async closeSubscription(id: JsonRpcId): Promise { + if (!this.subscriptions.has(id)) { + throw new DwnServerError( + DwnServerErrorCode.ConnectionSubscriptionJsonRpcIdNotFound, + `the subscription with id ${id} was not found` + ) + } + + const connection = this.subscriptions.get(id); + await connection.close(); + this.subscriptions.delete(id); + } + + /** + * Closes the existing connection and cleans up any listeners or subscriptions. + */ + async close(): Promise { + clearInterval(this.heartbeatInterval); + + // clean up all socket event listeners + this.socket.removeAllListeners(); + + const closePromises = []; + for (const [id, subscription] of this.subscriptions) { + closePromises.push(subscription.close()); + this.subscriptions.delete(id); + } + + // close all of the associated subscriptions + await Promise.all(closePromises); + + // close the socket. + this.socket.close(); + + // if there was a close handler passed call it after the connection has been closed + if (this.onClose !== undefined) { + this.onClose(); + } + } + + /** + * Pong messages are automatically sent in response to ping messages as required by + * the websocket spec. So, no need to send explicit pongs. + */ + private pong(): void { + this.isAlive = true; + } + + /** + * Log the error and close the connection. + */ + private async error(error:Error): Promise{ + log.error(`SocketConnection error, terminating connection`, error); + this.socket.terminate(); + await this.close(); + } + + /** + * Handles a `JSON RPC 2.0` encoded message. + */ + private async message(dataBuffer: Buffer): Promise { + const requestData = dataBuffer.toString(); + if (!requestData) { + return this.send(createJsonRpcErrorResponse( + uuidv4(), + JsonRpcErrorCodes.BadRequest, + 'request payload required.' + )) + } + + let jsonRequest: JsonRpcRequest; + try { + jsonRequest = JSON.parse(requestData); + } catch(error) { + const errorResponse = createJsonRpcErrorResponse( + uuidv4(), + JsonRpcErrorCodes.BadRequest, + (error as Error).message + ); + return this.send(errorResponse); + }; + + const requestContext = await this.buildRequestContext(jsonRequest); + const { jsonRpcResponse } = await jsonRpcRouter.handle(jsonRequest, requestContext); + if (jsonRpcResponse.error) { + requestCounter.inc({ method: jsonRequest.method, error: 1 }); + } else { + requestCounter.inc({ + method: jsonRequest.method, + status: jsonRpcResponse?.result?.reply?.status?.code || 0, + }); + } + this.send(jsonRpcResponse); + } + + /** + * Sends a JSON encoded Buffer through the Websocket. + */ + private send(response: JsonRpcResponse | JsonRpcErrorResponse): void { + this.socket.send(Buffer.from(JSON.stringify(response))); + } + + /** + * Creates a subscription handler to send messages matching the subscription requested. + * + * Wraps the incoming `message` in a `JSON RPC Success Response` using the original subscription`JSON RPC Id` to send through the WebSocket. + */ + private createSubscriptionHandler(id: JsonRpcId): (message: MessageEvent) => void { + return (event) => { + const response = createJsonRpcSuccessResponse(id, { event }); + this.send(response); + } + } + + /** + * Builds a `RequestContext` object to use with the `JSON RPC API`. + * + * Adds a `subscriptionHandler` for `Subscribe` messages. + */ + private async buildRequestContext(request: JsonRpcRequest): Promise { + const { params, method, subscription } = request; + + const requestContext: RequestContext = { + transport : 'ws', + dwn : this.dwn, + socketConnection : this, + } + + // methods that expect a long-running subscription begin with `rpc.subscribe.` + if (method.startsWith('rpc.subscribe.') && subscription) { + const { message } = params as { message?: GenericMessage }; + if (message?.descriptor.method === DwnMethodName.Subscribe) { + const handlerFunc = this.createSubscriptionHandler(subscription.id); + requestContext.subscriptionRequest = { + id: subscription.id, + subscriptionHandler: (message): void => handlerFunc(message), + } + } + } + + return requestContext; + } +} diff --git a/src/dwn-error.ts b/src/dwn-error.ts index fa19159..e872524 100644 --- a/src/dwn-error.ts +++ b/src/dwn-error.ts @@ -26,6 +26,8 @@ export class DwnServerError extends Error { * DWN Server error codes. */ export enum DwnServerErrorCode { + ConnectionSubscriptionJsonRpcIdExists = 'ConnectionSubscriptionJsonRpcIdExists', + ConnectionSubscriptionJsonRpcIdNotFound = 'ConnectionSubscriptionJsonRpcIdNotFound', ProofOfWorkInsufficientSolutionNonce = 'ProofOfWorkInsufficientSolutionNonce', ProofOfWorkInvalidOrExpiredChallenge = 'ProofOfWorkInvalidOrExpiredChallenge', ProofOfWorkManagerInvalidChallengeNonce = 'ProofOfWorkManagerInvalidChallengeNonce', diff --git a/src/dwn-server.ts b/src/dwn-server.ts index 15878ef..7a9e23f 100644 --- a/src/dwn-server.ts +++ b/src/dwn-server.ts @@ -1,4 +1,5 @@ -import { Dwn } from '@tbd54566975/dwn-sdk-js'; +import type { EventStream } from '@tbd54566975/dwn-sdk-js'; +import { Dwn, EventEmitterStream } from '@tbd54566975/dwn-sdk-js'; import type { Server } from 'http'; import log from 'loglevel'; @@ -39,10 +40,9 @@ export class DwnServer { prefix.apply(log); } - async start(callback?: () => void): Promise { + async start(): Promise { await this.#setupServer(); setProcessHandlers(this); - callback?.(); } /** @@ -61,7 +61,17 @@ export class DwnServer { proofOfWorkInitialMaximumAllowedHash: this.config.registrationProofOfWorkInitialMaxHash, }); - this.dwn = await Dwn.create(getDWNConfig(this.config, registrationManager)); + let eventStream: EventStream | undefined; + if (this.config.webSocketServerEnabled) { + // setting `EventEmitterStream` as default the default `EventStream + // if an alternate implementation is needed, instantiate a `Dwn` with a custom `EventStream` and add it to server options. + eventStream = new EventEmitterStream(); + } + + this.dwn = await Dwn.create(getDWNConfig(this.config, { + tenantGate: registrationManager, + eventStream, + })); } this.#httpApi = new HttpApi(this.config, this.dwn, registrationManager); @@ -76,7 +86,8 @@ export class DwnServer { if (this.config.webSocketServerEnabled) { this.#wsApi = new WsApi(this.#httpApi.server, this.dwn); - this.#wsApi.start(() => log.info(`WebSocketServer ready...`)); + this.#wsApi.start(); + log.info('WebSocketServer ready...'); } } @@ -88,8 +99,8 @@ export class DwnServer { return this.#httpApi.server; } - get wsServer(): WebSocketServer { - return this.#wsApi.server; + get wsServer(): WebSocketServer | undefined { + return this.#wsApi?.server; } /** diff --git a/src/http-api.ts b/src/http-api.ts index 30972ae..e5bbb24 100644 --- a/src/http-api.ts +++ b/src/http-api.ts @@ -17,7 +17,7 @@ import { createJsonRpcErrorResponse, JsonRpcErrorCodes } from './lib/json-rpc.js import type { DwnServerConfig } from './config.js'; import { config } from './config.js'; import { type DwnServerError } from './dwn-error.js'; -import { jsonRpcApi } from './json-rpc-api.js'; +import { jsonRpcRouter } from './json-rpc-api.js'; import { requestCounter, responseHistogram } from './metrics.js'; import type { RegistrationManager } from './registration/registration-manager.js'; @@ -30,7 +30,7 @@ export class HttpApi { registrationManager: RegistrationManager; dwn: Dwn; - constructor(config: DwnServerConfig, dwn: Dwn, registrationManager: RegistrationManager) { + constructor(config: DwnServerConfig, dwn: Dwn, registrationManager?: RegistrationManager) { console.log(config); this.#config = config; @@ -149,7 +149,7 @@ export class HttpApi { transport : 'http', dataStream : requestDataStream, }; - const { jsonRpcResponse, dataStream: responseDataStream } = await jsonRpcApi.handle(dwnRpcRequest, requestContext as RequestContext); + const { jsonRpcResponse, dataStream: responseDataStream } = await jsonRpcRouter.handle(dwnRpcRequest, requestContext as RequestContext); // If the handler catches a thrown exception and returns a JSON RPC InternalError, return the equivalent // HTTP 500 Internal Server Error with the response. diff --git a/src/index.ts b/src/index.ts index e77275c..77b43b5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,6 @@ export { DwnServerConfig } from './config.js'; export { DwnServer, DwnServerOptions } from './dwn-server.js'; export { HttpApi } from './http-api.js'; -export { jsonRpcApi } from './json-rpc-api.js'; +export { jsonRpcRouter } from './json-rpc-api.js'; export { EStoreType, BackendTypes, StoreType } from './storage.js'; export { WsApi } from './ws-api.js'; diff --git a/src/json-rpc-api.ts b/src/json-rpc-api.ts index 41661e5..d3e4d39 100644 --- a/src/json-rpc-api.ts +++ b/src/json-rpc-api.ts @@ -1,7 +1,11 @@ import { JsonRpcRouter } from './lib/json-rpc-router.js'; import { handleDwnProcessMessage } from './json-rpc-handlers/dwn/index.js'; +import { handleSubscriptionsClose } from './json-rpc-handlers/subscription/index.js'; -export const jsonRpcApi = new JsonRpcRouter(); +export const jsonRpcRouter = new JsonRpcRouter(); -jsonRpcApi.on('dwn.processMessage', handleDwnProcessMessage); +jsonRpcRouter.on('dwn.processMessage', handleDwnProcessMessage); +jsonRpcRouter.on('rpc.subscribe.dwn.processMessage', handleDwnProcessMessage); + +jsonRpcRouter.on('rpc.subscribe.close', handleSubscriptionsClose); diff --git a/src/json-rpc-handlers/dwn/process-message.ts b/src/json-rpc-handlers/dwn/process-message.ts index 09be116..bed6db3 100644 --- a/src/json-rpc-handlers/dwn/process-message.ts +++ b/src/json-rpc-handlers/dwn/process-message.ts @@ -1,38 +1,103 @@ -import type { RecordsReadReply } from '@tbd54566975/dwn-sdk-js'; +import type { GenericMessage } from '@tbd54566975/dwn-sdk-js'; +import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js'; import type { Readable as IsomorphicReadable } from 'readable-stream'; +import log from 'loglevel'; import { v4 as uuidv4 } from 'uuid'; +import type { JsonRpcSubscription } from '../../lib/json-rpc.js'; import type { HandlerResponse, JsonRpcHandler, } from '../../lib/json-rpc-router.js'; + import { createJsonRpcErrorResponse, createJsonRpcSuccessResponse, JsonRpcErrorCodes, } from '../../lib/json-rpc.js'; + export const handleDwnProcessMessage: JsonRpcHandler = async ( dwnRequest, context, ) => { - const { dwn, dataStream } = context; - const { target, message } = dwnRequest.params; + const { dwn, dataStream, subscriptionRequest, socketConnection, transport } = context; + const { target, message } = dwnRequest.params as { target: string, message: GenericMessage }; const requestId = dwnRequest.id ?? uuidv4(); try { - const reply = (await dwn.processMessage( - target, - message, - { dataStream: dataStream as IsomorphicReadable }, - )) as RecordsReadReply; + // RecordsWrite is only supported on 'http' to support data stream for large data + // TODO: https://github.com/TBD54566975/dwn-server/issues/108 + if ( + transport !== 'http' && + message.descriptor.interface === DwnInterfaceName.Records && + message.descriptor.method === DwnMethodName.Write + ) { + const jsonRpcResponse = createJsonRpcErrorResponse( + requestId, + JsonRpcErrorCodes.InvalidParams, + `RecordsWrite is not supported via ${context.transport}` + ) + return { jsonRpcResponse }; + } + + // subscribe methods must come with a subscriptionRequest context + if (message.descriptor.method === DwnMethodName.Subscribe && subscriptionRequest === undefined) { + const jsonRpcResponse = createJsonRpcErrorResponse( + requestId, + JsonRpcErrorCodes.InvalidRequest, + `subscribe methods must contain a subscriptionRequest context` + ); + return { jsonRpcResponse }; + } + + // Subscribe methods are only supported on 'ws' (WebSockets) + if (transport !== 'ws' && subscriptionRequest !== undefined) { + const jsonRpcResponse = createJsonRpcErrorResponse( + requestId, + JsonRpcErrorCodes.InvalidParams, + `subscriptions are not supported via ${context.transport}` + ) + return { jsonRpcResponse }; + } + + // if this is a subscription request, we first check if the connection has a subscription with this Id + // we do this ahead of time to prevent opening a subscription on the dwn only to close it after attempting to add it to the subscription manager + // otherwise the subscription manager would throw an error that the Id is already in use and we would close the open subscription on the DWN. + if (subscriptionRequest !== undefined && socketConnection?.hasSubscription(subscriptionRequest.id)) { + const jsonRpcResponse = createJsonRpcErrorResponse( + requestId, + JsonRpcErrorCodes.InvalidParams, + `the subscribe id: ${subscriptionRequest.id} is in use by an active subscription` + ) + return { jsonRpcResponse }; + } + const reply = await dwn.processMessage(target, message, { + dataStream: dataStream as IsomorphicReadable, + subscriptionHandler: subscriptionRequest?.subscriptionHandler, + }); + + const { record } = reply; // RecordsRead messages return record data as a stream to for accommodate large amounts of data - let recordDataStream; - if (reply?.record?.data !== undefined) { + let recordDataStream: IsomorphicReadable; + if (record !== undefined && record.data !== undefined) { recordDataStream = reply.record.data; - delete reply.record.data; + delete reply.record.data; // not serializable via JSON + } + + if (subscriptionRequest && reply.subscription) { + const { close } = reply.subscription; + // Subscribe messages return a close function to facilitate closing the subscription + // we add a reference to the close function for this subscription request to the socket connection. + // this will facilitate closing the subscription later. + const subscriptionReply: JsonRpcSubscription = { + id: subscriptionRequest.id, + close, + } + await socketConnection.addSubscription(subscriptionReply); + delete reply.subscription.close // delete the close method from the reply as it's not JSON serializable and has a held reference. } const jsonRpcResponse = createJsonRpcSuccessResponse(requestId, { reply }); @@ -49,6 +114,8 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( e.message, ); + // log the unhandled error response + log.error('handleDwnProcessMessage error', jsonRpcResponse, e); return { jsonRpcResponse } as HandlerResponse; } }; diff --git a/src/json-rpc-handlers/subscription/close.ts b/src/json-rpc-handlers/subscription/close.ts new file mode 100644 index 0000000..fda3761 --- /dev/null +++ b/src/json-rpc-handlers/subscription/close.ts @@ -0,0 +1,59 @@ +import { v4 as uuidv4 } from 'uuid'; + +import { DwnServerErrorCode } from '../../dwn-error.js'; +import type { + HandlerResponse, + JsonRpcHandler, +} from '../../lib/json-rpc-router.js'; + +import type { JsonRpcId, JsonRpcResponse } from '../../lib/json-rpc.js'; +import { + createJsonRpcErrorResponse, + createJsonRpcSuccessResponse, + JsonRpcErrorCodes, +} from '../../lib/json-rpc.js'; + +/** + * Closes a subscription tied to a specific `SocketConnection`. + * + * @param jsonRpcRequest must include JsonRpcId of the subscription request within a `subscription object`. + * @param context must include the associated `SocketConnection`. + * + */ +export const handleSubscriptionsClose: JsonRpcHandler = async ( + jsonRpcRequest, + context, +) => { + const requestId = jsonRpcRequest.id ?? uuidv4(); + if (context.socketConnection === undefined) { + const jsonRpcResponse = createJsonRpcErrorResponse(requestId, JsonRpcErrorCodes.InvalidRequest, 'socket connection does not exist'); + return { jsonRpcResponse }; + } + + if (jsonRpcRequest.subscription === undefined) { + const jsonRpcResponse = createJsonRpcErrorResponse(requestId, JsonRpcErrorCodes.InvalidRequest, 'subscribe options do not exist'); + return { jsonRpcResponse }; + } + + const { socketConnection } = context; + const { id } = jsonRpcRequest.subscription as { id: JsonRpcId }; + + let jsonRpcResponse:JsonRpcResponse; + try { + // closing the subscription and cleaning up the reference within the given connection. + await socketConnection.closeSubscription(id); + jsonRpcResponse = createJsonRpcSuccessResponse(requestId, { reply: { status: 200, detail: 'Accepted' } }); + } catch(error) { + if (error.code === DwnServerErrorCode.ConnectionSubscriptionJsonRpcIdNotFound) { + jsonRpcResponse = createJsonRpcErrorResponse(requestId, JsonRpcErrorCodes.InvalidParams, `subscription ${id} does not exist.`); + } else { + jsonRpcResponse = createJsonRpcErrorResponse( + requestId, + JsonRpcErrorCodes.InternalError, + `unknown subscription close error for ${id}: ${error.message}` + ); + } + } + + return { jsonRpcResponse } as HandlerResponse; +} \ No newline at end of file diff --git a/src/json-rpc-handlers/subscription/index.ts b/src/json-rpc-handlers/subscription/index.ts new file mode 100644 index 0000000..1225a0f --- /dev/null +++ b/src/json-rpc-handlers/subscription/index.ts @@ -0,0 +1 @@ +export * from './close.js'; diff --git a/src/json-rpc-socket.ts b/src/json-rpc-socket.ts new file mode 100644 index 0000000..3b3cc7b --- /dev/null +++ b/src/json-rpc-socket.ts @@ -0,0 +1,150 @@ +import log from 'loglevel'; +import { v4 as uuidv4 } from 'uuid'; +import WebSocket from 'ws'; + +import type { JsonRpcId, JsonRpcRequest, JsonRpcResponse } from "./lib/json-rpc.js"; +import { createJsonRpcSubscriptionRequest } from "./lib/json-rpc.js"; + +// These were arbitrarily chosen, but can be modified via connect options +const CONNECT_TIMEOUT = 3_000; +const RESPONSE_TIMEOUT = 30_000; + +export interface JsonRpcSocketOptions { + /** socket connection timeout in milliseconds */ + connectTimeout?: number; + /** response timeout for rpc requests in milliseconds */ + responseTimeout?: number; + /** optional connection close handler */ + onclose?: () => void; + /** optional socket error handler */ + onerror?: (error?: any) => void; +} + +/** + * JSON RPC Socket Client for WebSocket request/response and long-running subscriptions. + */ +export class JsonRpcSocket { + private constructor(private socket: WebSocket, private responseTimeout: number) {} + + static async connect(url: string, options: JsonRpcSocketOptions = {}): Promise { + const { connectTimeout = CONNECT_TIMEOUT, responseTimeout = RESPONSE_TIMEOUT, onclose, onerror } = options; + + const socket = new WebSocket(url, { timeout: connectTimeout }); + + socket.onclose = onclose; + socket.onerror = onerror; + + if (!socket.onclose) { + socket.onclose = ():void => { + log.info(`JSON RPC Socket close ${url}`); + } + } + + if (!socket.onerror) { + socket.onerror = (error?: any):void => { + log.error(`JSON RPC Socket error ${url}`, error); + } + } + + return new Promise((resolve, reject) => { + socket.on('open', () => { + resolve(new JsonRpcSocket(socket, responseTimeout)); + }); + + setTimeout(() => reject, connectTimeout); + }); + } + + close(): void { + this.socket.close(); + } + + /** + * Sends a JSON-RPC request through the socket and waits for a single response. + */ + async request(request: JsonRpcRequest): Promise { + return new Promise((resolve, reject) => { + request.id ??= uuidv4(); + + const handleResponse = (event: { data: any }):void => { + const jsonRpsResponse = JSON.parse(event.data.toString()) as JsonRpcResponse; + if (jsonRpsResponse.id === request.id) { + // if the incoming response id matches the request id, we will remove the listener and resolve the response + this.socket.removeEventListener('message', handleResponse); + return resolve(jsonRpsResponse); + } + }; + // subscribe to the listener before sending the request + this.socket.addEventListener('message', handleResponse); + this.send(request); + + // reject this promise if we don't receive any response back within the timeout period + setTimeout(() => { + this.socket.removeEventListener('message', handleResponse); + reject(new Error('request timed out')); + }, this.responseTimeout); + }); + } + + /** + * Sends a JSON-RPC request through the socket and keeps a listener open to read associated responses as they arrive. + * Returns a close method to clean up the listener. + */ + async subscribe(request: JsonRpcRequest, listener: (response: JsonRpcResponse) => void): Promise<{ + response: JsonRpcResponse; + close?: () => Promise; + }> { + + if (!request.method.startsWith('rpc.subscribe.')) { + throw new Error('subscribe rpc requests must include the `rpc.subscribe` prefix'); + } + + if (!request.subscription) { + throw new Error('subscribe rpc requests must include subscribe options'); + } + + const subscriptionId = request.subscription.id; + const socketEventListener = (event: { data: any }):void => { + const jsonRpcResponse = JSON.parse(event.data.toString()) as JsonRpcResponse; + if (jsonRpcResponse.id === subscriptionId) { + if (jsonRpcResponse.error !== undefined) { + // remove the event listener upon receipt of a JSON RPC Error. + this.socket.removeEventListener('message', socketEventListener); + this.closeSubscription(subscriptionId); + } + listener(jsonRpcResponse); + } + }; + this.socket.addEventListener('message', socketEventListener); + + const response = await this.request(request); + if (response.error) { + this.socket.removeEventListener('message', socketEventListener); + return { response } + } + + // clean up listener and create a `rpc.subscribe.close` message to use when closing this JSON RPC subscription + const close = async (): Promise => { + this.socket.removeEventListener('message', socketEventListener); + await this.closeSubscription(subscriptionId); + } + + return { + response, + close + } + } + + private closeSubscription(id: JsonRpcId): Promise { + const requestId = uuidv4(); + const request = createJsonRpcSubscriptionRequest(requestId, 'rpc.subscribe.close', {}, id); + return this.request(request); + } + + /** + * Sends a JSON-RPC request through the socket. You must subscribe to a message listener separately to capture the response. + */ + send(request: JsonRpcRequest):void { + this.socket.send(Buffer.from(JSON.stringify(request))); + } +} \ No newline at end of file diff --git a/src/lib/json-rpc-router.ts b/src/lib/json-rpc-router.ts index 78036c7..8fd1408 100644 --- a/src/lib/json-rpc-router.ts +++ b/src/lib/json-rpc-router.ts @@ -1,12 +1,22 @@ -import type { Dwn } from '@tbd54566975/dwn-sdk-js'; +import type { Dwn, EventSubscriptionHandler } from '@tbd54566975/dwn-sdk-js'; import type { Readable } from 'node:stream'; -import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js'; +import type { JsonRpcId, JsonRpcRequest, JsonRpcResponse } from './json-rpc.js'; +import type { SocketConnection } from '../connection/socket-connection.js'; export type RequestContext = { - dwn: Dwn; transport: 'http' | 'ws'; + dwn: Dwn; + /** the socket connection associated with this request if over sockets */ + socketConnection?: SocketConnection; + subscriptionRequest?: { + /** The JsonRpcId of the subscription handler */ + id: JsonRpcId; + /** The `MessageEvent` handler associated with a subscription request, only used in `ws` requests */ + subscriptionHandler: EventSubscriptionHandler; + } + /** The `Readable` stream associated with a `RecordsWrite` request only used in `http` requests */ dataStream?: Readable; }; diff --git a/src/lib/json-rpc.ts b/src/lib/json-rpc.ts index dfd638a..d54a9cf 100644 --- a/src/lib/json-rpc.ts +++ b/src/lib/json-rpc.ts @@ -1,12 +1,15 @@ export type JsonRpcId = string | number | null; -export type JsonRpcParams = any; export type JsonRpcVersion = '2.0'; export interface JsonRpcRequest { jsonrpc: JsonRpcVersion; id?: JsonRpcId; method: string; - params?: JsonRpcParams; + params?: any; + /** JSON RPC Subscription Extension Parameters */ + subscription?: { + id: JsonRpcId + }; } export interface JsonRpcError { @@ -15,6 +18,12 @@ export interface JsonRpcError { data?: any; } +export interface JsonRpcSubscription { + /** JSON RPC Id of the Subscription Request */ + id: JsonRpcId; + close: () => Promise; +} + export enum JsonRpcErrorCodes { // JSON-RPC 2.0 pre-defined errors InvalidRequest = -32600, @@ -23,10 +32,12 @@ export enum JsonRpcErrorCodes { InternalError = -32603, ParseError = -32700, - // App defined errors - BadRequest = -50400, // equivalent to HTTP Status 400 - Unauthorized = -50401, // equivalent to HTTP Status 401 - Forbidden = -50403, // equivalent to HTTP Status 403 + /** App defined error equivalent to HTTP Status 400 */ + BadRequest = -50400, + /** App defined error equivalent to HTTP Status 401 */ + Unauthorized = -50401, + /** App defined error equivalent to HTTP Status 403 */ + Forbidden = -50403, } export type JsonRpcResponse = JsonRpcSuccessResponse | JsonRpcErrorResponse; @@ -35,7 +46,7 @@ export interface JsonRpcSuccessResponse { jsonrpc: JsonRpcVersion; id: JsonRpcId; result: any; - error?: undefined; + error?: never; } export interface JsonRpcErrorResponse { @@ -64,7 +75,7 @@ export const createJsonRpcErrorResponse = ( export const createJsonRpcNotification = ( method: string, - params?: JsonRpcParams, + params?: any, ): JsonRpcRequest => { return { jsonrpc: '2.0', @@ -73,10 +84,27 @@ export const createJsonRpcNotification = ( }; }; +export const createJsonRpcSubscriptionRequest = ( + id: JsonRpcId, + method: string, + params?: any, + subscriptionId?: JsonRpcId +): JsonRpcRequest => { + return { + jsonrpc: '2.0', + id, + method, + params, + subscription: { + id: subscriptionId, + } + } +} + export const createJsonRpcRequest = ( id: JsonRpcId, method: string, - params?: JsonRpcParams, + params?: any, ): JsonRpcRequest => { return { jsonrpc: '2.0', @@ -96,11 +124,3 @@ export const createJsonRpcSuccessResponse = ( result: result ?? null, }; }; - -export function parseJson(text: string): object | null { - try { - return JSON.parse(text); - } catch { - return null; - } -} diff --git a/src/storage.ts b/src/storage.ts index 1988349..a4eea8c 100644 --- a/src/storage.ts +++ b/src/storage.ts @@ -9,6 +9,7 @@ import type { DataStore, DwnConfig, EventLog, + EventStream, MessageStore, TenantGate, } from '@tbd54566975/dwn-sdk-js'; @@ -45,9 +46,13 @@ export enum BackendTypes { export type StoreType = DataStore | EventLog | MessageStore; export function getDWNConfig( - config: DwnServerConfig, - tenantGate: TenantGate, + config : DwnServerConfig, + options : { + tenantGate? : TenantGate, + eventStream? : EventStream, + } ): DwnConfig { + const { tenantGate, eventStream } = options; const dataStore: DataStore = getStore(config.dataStore, EStoreType.DataStore); const eventLog: EventLog = getStore(config.eventLog, EStoreType.EventLog); const messageStore: MessageStore = getStore( @@ -55,7 +60,7 @@ export function getDWNConfig( EStoreType.MessageStore, ); - return { eventLog, dataStore, messageStore, tenantGate }; + return { eventStream, eventLog, dataStore, messageStore, tenantGate }; } function getLevelStore( diff --git a/src/ws-api.ts b/src/ws-api.ts index cac1cea..426aadc 100644 --- a/src/ws-api.ts +++ b/src/ws-api.ts @@ -1,175 +1,46 @@ -import { DataStream, type Dwn } from '@tbd54566975/dwn-sdk-js'; -import type { IncomingMessage, Server } from 'http'; -import { base64url } from 'multiformats/bases/base64'; -import { v4 as uuidv4 } from 'uuid'; -import { type AddressInfo, type WebSocket, WebSocketServer } from 'ws'; +import type { + Dwn, +} from '@tbd54566975/dwn-sdk-js'; -import type { RequestContext } from './lib/json-rpc-router.js'; -import { - createJsonRpcErrorResponse, - JsonRpcErrorCodes, - type JsonRpcResponse, -} from './lib/json-rpc.js'; +import type { Server } from 'http'; -import { jsonRpcApi } from './json-rpc-api.js'; -import { requestCounter } from './metrics.js'; +import { WebSocketServer } from 'ws'; -const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive'); -const HEARTBEAT_INTERVAL = 30_000; +import type { ConnectionManager } from './connection/connection-manager.js'; +import { InMemoryConnectionManager } from './connection/connection-manager.js'; export class WsApi { #wsServer: WebSocketServer; dwn: Dwn; + #connectionManager: ConnectionManager - constructor(server: Server, dwn: Dwn) { + constructor(server: Server, dwn: Dwn, connectionManager?: ConnectionManager) { this.dwn = dwn; + this.#connectionManager = connectionManager || new InMemoryConnectionManager(dwn); this.#wsServer = new WebSocketServer({ server }); } - get address(): AddressInfo | string { - return this.#wsServer.address(); - } - get server(): WebSocketServer { return this.#wsServer; } - /** - * Handler for opening websocket event - `connection`. - * Sets listeners for `message`, `pong`, `close`, and `error` events. - */ - #handleConnection(socket: WebSocket, _request: IncomingMessage): void { - const dwn = this.dwn; - - socket[SOCKET_ISALIVE_SYMBOL] = true; - - // Pong messages are automatically sent in response to ping messages as required by - // the websocket spec. So, no need to send explicit pongs from browser - socket.on('pong', function () { - this[SOCKET_ISALIVE_SYMBOL] = true; - }); - - socket.on('close', function () { - // Clean up event listeners - socket.removeAllListeners(); - }); - - socket.on('error', function (error) { - console.error('WebSocket error:', error); - // Close the socket and remove all event listeners - socket.terminate(); - socket.removeAllListeners(); - }); - - socket.on('message', async function (dataBuffer) { - let dwnRequest; - - try { - // deserialize bytes into JSON object - dwnRequest = dataBuffer.toString(); - if (!dwnRequest) { - const jsonRpcResponse = createJsonRpcErrorResponse( - uuidv4(), - JsonRpcErrorCodes.BadRequest, - 'request payload required.', - ); - - const responseBuffer = WsApi.jsonRpcResponseToBuffer(jsonRpcResponse); - return socket.send(responseBuffer); - } - - dwnRequest = JSON.parse(dwnRequest); - } catch (e) { - const jsonRpcResponse = createJsonRpcErrorResponse( - uuidv4(), - JsonRpcErrorCodes.BadRequest, - e.message, - ); - - const responseBuffer = WsApi.jsonRpcResponseToBuffer(jsonRpcResponse); - return socket.send(responseBuffer); - } - - // Check whether data was provided in the request - const { encodedData } = dwnRequest.params; - const requestDataStream = encodedData - ? DataStream.fromBytes(base64url.baseDecode(encodedData)) - : undefined; - - const requestContext: RequestContext = { - dwn, - transport: 'ws', - dataStream: requestDataStream, - }; - const { jsonRpcResponse } = await jsonRpcApi.handle( - dwnRequest, - requestContext, - ); - - if (jsonRpcResponse.error) { - requestCounter.inc({ method: dwnRequest.method, error: 1 }); - } else { - requestCounter.inc({ - method: dwnRequest.method, - status: jsonRpcResponse?.result?.reply?.status?.code || 0, - }); - } - - const responseBuffer = WsApi.jsonRpcResponseToBuffer(jsonRpcResponse); - return socket.send(responseBuffer); - }); - } - - /** - * This handler returns an interval to ping clients' socket every 30s - * if a pong hasn't received from a socket by the next ping, the server will terminate the socket connection. - */ - #setupHeartbeat(): NodeJS.Timer { - // Sometimes connections between client <-> server can get borked in such a way that - // leaves both unaware of the borkage. ping messages can be used as a means to verify - // that the remote endpoint is still responsive. Server will ping each socket every 30s - // if a pong hasn't received from a socket by the next ping, the server will terminate - // the socket connection - return setInterval(() => { - this.#wsServer.clients.forEach(function (socket) { - if (socket[SOCKET_ISALIVE_SYMBOL] === false) { - return socket.terminate(); - } - - socket[SOCKET_ISALIVE_SYMBOL] = false; - socket.ping(); - }); - }, HEARTBEAT_INTERVAL); - } - /** * Handler for starting a WebSocket. * Sets listeners for `connection`, `close` events. - * It clears `heartbeatInterval` when a `close` event is made. */ #setupWebSocket(): void { - this.#wsServer.on('connection', this.#handleConnection.bind(this)); - - const heartbeatInterval = this.#setupHeartbeat(); - - this.#wsServer.on('close', function close() { - clearInterval(heartbeatInterval); - }); + this.#wsServer.on('connection', (socket, request) => this.#connectionManager.connect(socket, request)); + this.#wsServer.on('close', () => this.#connectionManager.closeAll()); } - start(callback?: () => void): WebSocketServer { + start(): WebSocketServer { this.#setupWebSocket(); - callback?.(); return this.#wsServer; } - close(): void { + async close(): Promise { this.#wsServer.close(); - } - - static jsonRpcResponseToBuffer(jsonRpcResponse: JsonRpcResponse): Buffer { - const str = JSON.stringify(jsonRpcResponse); - return Buffer.from(str); + await this.#connectionManager.closeAll(); } } diff --git a/tests/connection/connection-manager.spec.ts b/tests/connection/connection-manager.spec.ts new file mode 100644 index 0000000..ba6850b --- /dev/null +++ b/tests/connection/connection-manager.spec.ts @@ -0,0 +1,60 @@ +import type { Dwn } from '@tbd54566975/dwn-sdk-js'; + +import chaiAsPromised from 'chai-as-promised'; +import chai, { expect } from 'chai'; + +import sinon from 'sinon'; +import { getTestDwn } from '../test-dwn.js'; +import { InMemoryConnectionManager } from '../../src/connection/connection-manager.js'; +import { config } from '../../src/config.js'; +import { WsApi } from '../../src/ws-api.js'; +import type { Server } from 'http'; +import { HttpApi } from '../../src/http-api.js'; +import { JsonRpcSocket } from '../../src/json-rpc-socket.js'; + +chai.use(chaiAsPromised); + +describe('InMemoryConnectionManager', () => { + let dwn: Dwn; + let connectionManager: InMemoryConnectionManager; + let server: Server + let wsApi: WsApi; + + beforeEach(async () => { + dwn = await getTestDwn({ withEvents: true }); + connectionManager = new InMemoryConnectionManager(dwn); + const httpApi = new HttpApi(config, dwn); + server = await httpApi.start(9002); + wsApi = new WsApi(server, dwn, connectionManager); + wsApi.start(); + }); + + afterEach(async () => { + await connectionManager.closeAll(); + await dwn.close(); + await wsApi.close(); + server.close(); + server.closeAllConnections(); + sinon.restore(); + }); + + it('adds connection to the connections and removes it if that connection is closed', async () => { + const connection = await JsonRpcSocket.connect('ws://127.0.0.1:9002'); + expect((connectionManager as any).connections.size).to.equal(1); + connection.close(); + + await new Promise((resolve) => setTimeout(resolve, 5)); // wait for close event to be fired + expect((connectionManager as any).connections.size).to.equal(0); + }); + + it('closes all connections on `closeAll`', async () => { + await JsonRpcSocket.connect('ws://127.0.0.1:9002'); + expect((connectionManager as any).connections.size).to.equal(1); + + await JsonRpcSocket.connect('ws://127.0.0.1:9002'); + expect((connectionManager as any).connections.size).to.equal(2); + + await connectionManager.closeAll(); + expect((connectionManager as any).connections.size).to.equal(0); + }); +}); \ No newline at end of file diff --git a/tests/connection/socket-connection.spec.ts b/tests/connection/socket-connection.spec.ts new file mode 100644 index 0000000..3f033a5 --- /dev/null +++ b/tests/connection/socket-connection.spec.ts @@ -0,0 +1,159 @@ +import type { Dwn } from '@tbd54566975/dwn-sdk-js'; + +import chaiAsPromised from 'chai-as-promised'; +import chai, { expect } from 'chai'; + +import sinon from 'sinon'; +import { WebSocket } from 'ws'; +import { SocketConnection } from '../../src/connection/socket-connection.js'; +import { getTestDwn } from '../test-dwn.js'; +import log from 'loglevel'; + +chai.use(chaiAsPromised); + +describe('SocketConnection', () => { + let dwn: Dwn; + + before(async () => { + dwn = await getTestDwn(); + }); + + after(async () => { + await dwn.close(); + sinon.restore(); + }); + + it('should assign socket handlers', async () => { + const socket = sinon.createStubInstance(WebSocket); + const connection = new SocketConnection(socket, dwn); + expect(socket.on.callCount).to.equal(4); + expect(socket.on.args.map(arg => arg[0])).to.have.members(['message', 'close', 'error', 'pong']); + await connection.close(); + }); + + it('should add a subscription to the subscription manager map', async () => { + const socket = sinon.createStubInstance(WebSocket); + const connection = new SocketConnection(socket, dwn); + const subscriptionRequest = { + id: 'id', + method: 'method', + params: { param1: 'param' }, + close: async ():Promise => {} + } + + await connection.addSubscription(subscriptionRequest); + expect((connection as any).subscriptions.size).to.equal(1); + await connection.close(); + expect((connection as any).subscriptions.size).to.equal(0); + }); + + it('should reject a subscription with an Id of an existing subscription', async () => { + const socket = sinon.createStubInstance(WebSocket); + const connection = new SocketConnection(socket, dwn); + + const id = 'some-id'; + + const subscriptionRequest = { + id, + method: 'method', + params: { param1: 'param' }, + close: async ():Promise => {} + } + + await connection.addSubscription(subscriptionRequest); + expect((connection as any).subscriptions.size).to.equal(1); + + const addDuplicatePromise = connection.addSubscription(subscriptionRequest); + await expect(addDuplicatePromise).to.eventually.be.rejectedWith(`the subscription with id ${id} already exists`); + expect((connection as any).subscriptions.size).to.equal(1); + await connection.close(); + expect((connection as any).subscriptions.size).to.equal(0); + }); + + it('should close a subscription and remove it from the connection manager map', async () => { + const socket = sinon.createStubInstance(WebSocket); + const connection = new SocketConnection(socket, dwn); + + const id = 'some-id'; + + const subscriptionRequest = { + id, + method: 'method', + params: { param1: 'param' }, + close: async ():Promise => {} + } + + await connection.addSubscription(subscriptionRequest); + expect((connection as any).subscriptions.size).to.equal(1); + + await connection.closeSubscription(id); + expect((connection as any).subscriptions.size).to.equal(0); + + const closeAgainPromise = connection.closeSubscription(id); + await expect(closeAgainPromise).to.eventually.be.rejectedWith(`the subscription with id ${id} was not found`); + await connection.close(); + }); + + it('hasSubscription returns whether a subscription with the id already exists', async () => { + const socket = sinon.createStubInstance(WebSocket); + const connection = new SocketConnection(socket, dwn); + const subscriptionRequest = { + id: 'id', + method: 'method', + params: { param1: 'param' }, + close: async ():Promise => {} + } + + await connection.addSubscription(subscriptionRequest); + expect((connection as any).subscriptions.size).to.equal(1); + expect(connection.hasSubscription(subscriptionRequest.id)).to.be.true; + expect(connection.hasSubscription('does-not-exist')).to.be.false; + + await connection.closeSubscription(subscriptionRequest.id); + expect(connection.hasSubscription(subscriptionRequest.id)).to.be.false; + await connection.close(); + }); + + it('should close if pong is not triggered between heartbeat intervals', async () => { + const socket = sinon.createStubInstance(WebSocket); + const clock = sinon.useFakeTimers(); + const connection = new SocketConnection(socket, dwn); + const closeSpy = sinon.spy(connection, 'close'); + + clock.tick(60_100); // interval has to run twice + clock.restore(); + + expect(closeSpy.callCount).to.equal(1); + }); + + it('should not close if pong is called within the heartbeat interval', async () => { + const socket = sinon.createStubInstance(WebSocket); + const clock = sinon.useFakeTimers(); + const connection = new SocketConnection(socket, dwn); + const closeSpy = sinon.spy(connection, 'close'); + + (connection as any).pong(); // trigger a pong + clock.tick(30_100); // first interval + + (connection as any).pong(); // trigger a pong + clock.tick(30_100); // second interval + + expect(closeSpy.callCount).to.equal(0); + + clock.tick(30_100); // another interval without a ping + clock.restore(); + expect(closeSpy.callCount).to.equal(1); + }); + + it('logs an error and closes connection if error is triggered', async () => { + const socket = sinon.createStubInstance(WebSocket); + const connection = new SocketConnection(socket, dwn); + const logSpy = sinon.stub(log, 'error'); + const closeSpy = sinon.spy(connection, 'close'); + + (connection as any).error(new Error('some error')); + + expect(logSpy.callCount).to.equal(1); + expect(closeSpy.callCount).to.equal(1); + }); +}); \ No newline at end of file diff --git a/tests/dwn-process-message.spec.ts b/tests/dwn-process-message.spec.ts index 1447eb8..fe4f3d1 100644 --- a/tests/dwn-process-message.spec.ts +++ b/tests/dwn-process-message.spec.ts @@ -1,9 +1,10 @@ import { expect } from 'chai'; +import sinon from 'sinon'; import { v4 as uuidv4 } from 'uuid'; import { handleDwnProcessMessage } from '../src/json-rpc-handlers/dwn/process-message.js'; import type { RequestContext } from '../src/lib/json-rpc-router.js'; -import { createJsonRpcRequest } from '../src/lib/json-rpc.js'; +import { JsonRpcErrorCodes, createJsonRpcRequest } from '../src/lib/json-rpc.js'; import { getTestDwn } from './test-dwn.js'; import { createRecordsWriteMessage } from './utils.js'; import { TestDataGenerator } from '@tbd54566975/dwn-sdk-js'; @@ -32,6 +33,7 @@ describe('handleDwnProcessMessage', function () { const { reply } = jsonRpcResponse.result; expect(reply.status.code).to.equal(202); expect(reply.status.detail).to.equal('Accepted'); + await dwn.close(); }); it('returns a JSON RPC Success Response when DWN returns a 4XX/5XX status code', async function () { @@ -59,5 +61,76 @@ describe('handleDwnProcessMessage', function () { expect(reply.status.detail).to.exist; expect(reply.data).to.be.undefined; expect(reply.entries).to.be.undefined; + await dwn.close(); + }); + + it('should fail if no subscriptionRequest context exists for a `Subscribe` message', async function () { + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { + message: { + descriptor: { interface: 'Records', method: 'Subscribe' }, + }, + target: 'did:key:abc1234', + }); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws' }; + + const { jsonRpcResponse } = await handleDwnProcessMessage( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InvalidRequest); + expect(jsonRpcResponse.error.message).to.equal('subscribe methods must contain a subscriptionRequest context'); + await dwn.close(); + }); + + it('should fail on http requests for a `Subscribe` message', async function () { + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { + message: { + descriptor: { interface: 'Records', method: 'Subscribe' }, + }, + target: 'did:key:abc1234', + }); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'http', subscriptionRequest: { id: 'test', subscriptionHandler: () => {}} }; + + const { jsonRpcResponse } = await handleDwnProcessMessage( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InvalidParams); + expect(jsonRpcResponse.error.message).to.equal('subscriptions are not supported via http'); + await dwn.close(); + }); + + it('should return a JsonRpc Internal Error for an unexpected thrown error within the handler', async function () { + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { + message: { + descriptor: { interface: 'Records' }, + }, + target: 'did:key:abc1234', + }); + + const dwn = await getTestDwn(); + sinon.stub(dwn, 'processMessage').throws(new Error('unexpected error')); + const context: RequestContext = { dwn, transport: 'http' }; + + const { jsonRpcResponse } = await handleDwnProcessMessage( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InternalError); + expect(jsonRpcResponse.error.message).to.equal('unexpected error'); + await dwn.close(); }); }); diff --git a/tests/dwn-server.spec.ts b/tests/dwn-server.spec.ts index 0cc2888..9c79327 100644 --- a/tests/dwn-server.spec.ts +++ b/tests/dwn-server.spec.ts @@ -1,3 +1,5 @@ +import type { Dwn } from '@tbd54566975/dwn-sdk-js'; + import { expect } from 'chai'; import { config } from '../src/config.js'; @@ -6,15 +8,50 @@ import { getTestDwn } from './test-dwn.js'; describe('DwnServer', function () { const dwnServerConfig = { ...config }; + let dwn: Dwn; - it('starts with injected dwn', async function () { - const testDwn = await getTestDwn(); + dwn = await getTestDwn(); - const dwnServer = new DwnServer({ config: dwnServerConfig, dwn: testDwn }); + const dwnServer = new DwnServer({ config: dwnServerConfig, dwn }); await dwnServer.start(); dwnServer.stop(() => console.log('server Stop')); expect(dwnServer.httpServer.listening).to.be.false; }); + + describe('webSocketServerEnabled config', function() { + it('should not return a websocket server if disabled', async function() { + dwn = await getTestDwn({ withEvents: true }); + const withoutSocketServer = new DwnServer({ + dwn, + config: { + ...dwnServerConfig, + webSocketServerEnabled: false, + } + }); + + await withoutSocketServer.start(); + expect(withoutSocketServer.httpServer.listening).to.be.true; + expect(withoutSocketServer.wsServer).to.be.undefined; + withoutSocketServer.stop(() => console.log('server Stop')); + expect(withoutSocketServer.httpServer.listening).to.be.false; + }); + + it('should return a websocket server if enabled', async function() { + dwn = await getTestDwn({ withEvents: true }); + const withSocketServer = new DwnServer({ + dwn, + config: { + ...dwnServerConfig, + webSocketServerEnabled: true, + } + }); + + await withSocketServer.start(); + expect(withSocketServer.wsServer).to.not.be.undefined; + withSocketServer.stop(() => console.log('server Stop')); + expect(withSocketServer.httpServer.listening).to.be.false; + }); + }); }); diff --git a/tests/http-api.spec.ts b/tests/http-api.spec.ts index bb9a22b..a64121e 100644 --- a/tests/http-api.spec.ts +++ b/tests/http-api.spec.ts @@ -62,7 +62,7 @@ describe('http api', function () { const proofOfWorkInitialMaximumAllowedHash = config.registrationProofOfWorkInitialMaxHash; registrationManager = await RegistrationManager.create({ registrationStoreUrl, termsOfServiceFilePath, proofOfWorkInitialMaximumAllowedHash }); - dwn = await getTestDwn(registrationManager); + dwn = await getTestDwn({ tenantGate: registrationManager }); httpApi = new HttpApi(config, dwn, registrationManager); diff --git a/tests/json-rpc-socket.spec.ts b/tests/json-rpc-socket.spec.ts new file mode 100644 index 0000000..6f1c4c4 --- /dev/null +++ b/tests/json-rpc-socket.spec.ts @@ -0,0 +1,234 @@ +import chaiAsPromised from 'chai-as-promised'; +import chai, { expect } from 'chai'; + +import { v4 as uuidv4 } from 'uuid'; +import sinon from 'sinon'; +import { WebSocketServer } from 'ws'; + +import type { JsonRpcId, JsonRpcRequest, JsonRpcSuccessResponse } from '../src/lib/json-rpc.js'; + +import { JsonRpcSocket } from '../src/json-rpc-socket.js'; +import { JsonRpcErrorCodes, createJsonRpcErrorResponse, createJsonRpcRequest, createJsonRpcSubscriptionRequest, createJsonRpcSuccessResponse } from '../src/lib/json-rpc.js'; +import log from 'loglevel'; + +chai.use(chaiAsPromised); + +describe('JsonRpcSocket', () => { + let wsServer: WebSocketServer; + + before(async () => { + wsServer = new WebSocketServer({ + port: 9003, + }); + }); + + beforeEach(() => { + wsServer.removeAllListeners(); + }); + + it('connects to a url', async () => { + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003'); + expect(wsServer.clients.size).to.equal(1); + client.close(); + + // give time for the connection to close on the server. + await new Promise((resolve) => setTimeout(resolve, 5)); + expect(wsServer.clients.size).to.equal(0); + }); + + it('resolves a request with given params', async () => { + wsServer.addListener('connection', (socket) => { + socket.on('message', (dataBuffer: Buffer) => { + const request = JSON.parse(dataBuffer.toString()) as JsonRpcRequest; + const { param1, param2 } = request.params; + expect(param1).to.equal('test-param1'); + expect(param2).to.equal('test-param2'); + + // send response passed tests + const response = createJsonRpcSuccessResponse(request.id, {}); + socket.send(Buffer.from(JSON.stringify(response))); + }); + }); + + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003'); + const requestId = uuidv4(); + const request = createJsonRpcRequest(requestId, 'test.method', { param1: 'test-param1', param2: 'test-param2' }); + const response = await client.request(request); + expect(response.id).to.equal(request.id); + }); + + it('request times out', async () => { + // time out after 1 ms + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003', { responseTimeout: 1 }); + const requestId = uuidv4(); + const request = createJsonRpcRequest(requestId, 'test.method', { param1: 'test-param1', param2: 'test-param2' }); + const requestPromise = client.request(request); + + await expect(requestPromise).to.eventually.be.rejectedWith('timed out'); + }); + + it('opens a subscription', async () => { + wsServer.addListener('connection', (socket) => { + socket.on('message', (dataBuffer: Buffer) => { + const request = JSON.parse(dataBuffer.toString()) as JsonRpcRequest; + // initial response + const response = createJsonRpcSuccessResponse(request.id, { reply: {} }) + socket.send(Buffer.from(JSON.stringify(response))); + const { subscription } = request; + // send 3 messages + for (let i = 0; i < 3; i++) { + const response = createJsonRpcSuccessResponse(subscription.id, { count: i }); + socket.send(Buffer.from(JSON.stringify(response))); + } + }); + }); + + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003', { responseTimeout: 5 }); + const requestId = uuidv4(); + const subscribeId = uuidv4(); + const request = createJsonRpcSubscriptionRequest( + requestId, + 'rpc.subscribe.test.method', + { param1: 'test-param1', param2: 'test-param2' }, + subscribeId, + ); + + let responseCounter = 0; + const responseListener = (response: JsonRpcSuccessResponse): void => { + expect(response.id).to.equal(subscribeId); + const { count } = response.result; + expect(count).to.equal(responseCounter); + responseCounter++; + } + + const subscription = await client.subscribe(request, responseListener); + expect(subscription.response.error).to.be.undefined; + // wait for the messages to arrive + await new Promise((resolve) => setTimeout(resolve, 5)); + // the original response + expect(responseCounter).to.equal(3); + await subscription.close(); + }); + + it('sends message', async () => { + const receivedPromise = new Promise<{ reply: { id?: JsonRpcId }}>((resolve) => { + wsServer.addListener('connection', (socket) => { + socket.on('message', (dataBuffer: Buffer) => { + const request = JSON.parse(dataBuffer.toString()) as JsonRpcRequest; + const { param1, param2 } = request.params; + expect(param1).to.equal('test-param1'); + expect(param2).to.equal('test-param2'); + resolve({ reply: { id: request.id }}); + }); + }); + }); + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003'); + const requestId = uuidv4(); + const request = createJsonRpcRequest(requestId, 'test.method', { param1: 'test-param1', param2: 'test-param2' }); + client.send(request); + await expect(receivedPromise).to.eventually.eql({ reply: { id: request.id }}); + }); + + it('closes subscription upon receiving a JsonRpc Error for a long running subscription', async () => { + let closed = true; + wsServer.addListener('connection', (socket) => { + closed = false; + socket.on('message', (dataBuffer: Buffer) => { + const request = JSON.parse(dataBuffer.toString()) as JsonRpcRequest; + if (request.method.startsWith('rpc.subscribe') && request.method !== 'rpc.subscribe.close') { + // initial response + const response = createJsonRpcSuccessResponse(request.id, { reply: {} }) + socket.send(Buffer.from(JSON.stringify(response))); + const { subscription } = request; + + // send 1 valid message + const message1 = createJsonRpcSuccessResponse(subscription.id, { message: 1 }); + socket.send(Buffer.from(JSON.stringify(message1))); + + // send a json rpc error + const jsonRpcError = createJsonRpcErrorResponse(subscription.id, JsonRpcErrorCodes.InternalError, 'some error'); + socket.send(Buffer.from(JSON.stringify(jsonRpcError))); + + // send a 2nd message that shouldn't be handled + const message2 = createJsonRpcSuccessResponse(subscription.id, { message: 2 }); + socket.send(Buffer.from(JSON.stringify(message2))); + } else if (request.method === 'rpc.subscribe.close') { + closed = true; + } + }); + }); + + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003', { responseTimeout: 5 }); + const requestId = uuidv4(); + const subscribeId = uuidv4(); + const request = createJsonRpcSubscriptionRequest( + requestId, + 'rpc.subscribe.test.method', + { param1: 'test-param1', param2: 'test-param2' }, + subscribeId, + ); + + let responseCounter = 0; + let errorCounter = 0; + const responseListener = (response: JsonRpcSuccessResponse): void => { + expect(response.id).to.equal(subscribeId); + if (response.error) { + errorCounter++; + } + + if (response.result) { + responseCounter++; + } + } + + const subscription = await client.subscribe(request, responseListener); + expect(subscription.response.error).to.be.undefined; + // wait for the messages to arrive + await new Promise((resolve) => setTimeout(resolve, 5)); + // the original response + expect(responseCounter).to.equal(1); + expect(errorCounter).to.equal(1); + expect(closed).to.equal(true); + }); + + it('only JSON RPC Methods prefixed with `rpc.subscribe.` are accepted for a subscription', async () => { + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003'); + const requestId = uuidv4(); + const request = createJsonRpcRequest(requestId, 'test.method', { param1: 'test-param1', param2: 'test-param2' }); + const subscribePromise = client.subscribe(request, () => {}); + await expect(subscribePromise).to.eventually.be.rejectedWith('subscribe rpc requests must include the `rpc.subscribe` prefix'); + }); + + it('subscribe methods must contain a subscribe object within the request which contains the subscription JsonRpcId', async () => { + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003'); + const requestId = uuidv4(); + const request = createJsonRpcRequest(requestId, 'rpc.subscribe.test.method', { param1: 'test-param1', param2: 'test-param2' }); + const subscribePromise = client.subscribe(request, () => {}); + await expect(subscribePromise).to.eventually.be.rejectedWith('subscribe rpc requests must include subscribe options'); + }); + + it('calls onclose handler', async () => { + // test injected handler + const onCloseHandler = { onclose: ():void => {} }; + const onCloseSpy = sinon.spy(onCloseHandler, 'onclose'); + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003', { onclose: onCloseHandler.onclose }); + client.close(); + + await new Promise((resolve) => setTimeout(resolve, 5)); // wait for close event to arrive + expect(onCloseSpy.callCount).to.equal(1); + + // test default logger + const logInfoSpy = sinon.spy(log, 'info'); + const defaultClient = await JsonRpcSocket.connect('ws://127.0.0.1:9003'); + defaultClient.close(); + + await new Promise((resolve) => setTimeout(resolve, 5)); // wait for close event to arrive + expect(logInfoSpy.callCount).to.equal(1); + + // extract log message from argument + const logMessage:string = logInfoSpy.args[0][0]!; + expect(logMessage).to.equal('JSON RPC Socket close ws://127.0.0.1:9003'); + }); + + xit('calls onerror handler', async () => {}); +}); diff --git a/tests/registration/proof-of-work-manager.spec.ts b/tests/registration/proof-of-work-manager.spec.ts index 06354a2..986b6df 100644 --- a/tests/registration/proof-of-work-manager.spec.ts +++ b/tests/registration/proof-of-work-manager.spec.ts @@ -46,8 +46,8 @@ describe('ProofOfWorkManager', function () { } }; - const challengeNonceRefreshSpy = sinon.stub(proofOfWorkManager, 'refreshChallengeNonce').callsFake(stub); - const maximumAllowedHashValueRefreshSpy = sinon.stub(proofOfWorkManager, 'refreshMaximumAllowedHashValue').callsFake(stub); + const challengeNonceRefreshSpy = sinon.stub(proofOfWorkManager as any, 'refreshChallengeNonce').callsFake(stub); + const maximumAllowedHashValueRefreshSpy = sinon.stub(proofOfWorkManager as any, 'refreshMaximumAllowedHashValue').callsFake(stub); clock.tick(60 * 60 * 1000); diff --git a/tests/rpc-subscribe-close.spec.ts b/tests/rpc-subscribe-close.spec.ts new file mode 100644 index 0000000..5877387 --- /dev/null +++ b/tests/rpc-subscribe-close.spec.ts @@ -0,0 +1,126 @@ +import { expect } from 'chai'; +import sinon from 'sinon'; +import { v4 as uuidv4 } from 'uuid'; + +import type { RequestContext } from '../src/lib/json-rpc-router.js'; +import { JsonRpcErrorCodes, createJsonRpcRequest, createJsonRpcSubscriptionRequest } from '../src/lib/json-rpc.js'; +import { getTestDwn } from './test-dwn.js'; +import { handleSubscriptionsClose } from '../src/json-rpc-handlers/subscription/close.js'; +import { SocketConnection } from '../src/connection/socket-connection.js'; +import { DwnServerError, DwnServerErrorCode } from '../src/dwn-error.js'; + +describe('handleDwnProcessMessage', function () { + it('should return an error if no socket connection exists', async function () { + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest(requestId, 'rpc.subscribe.close', { }); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws' }; + + const { jsonRpcResponse } = await handleSubscriptionsClose( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InvalidRequest); + expect(jsonRpcResponse.error.message).to.equal('socket connection does not exist'); + }); + + it('should return an error if no subscribe options exist', async function () { + const requestId = uuidv4(); + const dwnRequest = createJsonRpcRequest(requestId, 'rpc.subscribe.close', { }); + const socketConnection = sinon.createStubInstance(SocketConnection); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws', socketConnection }; + + const { jsonRpcResponse } = await handleSubscriptionsClose( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InvalidRequest); + expect(jsonRpcResponse.error.message).to.equal('subscribe options do not exist'); + }); + + it('should return an error if close subscription throws ConnectionSubscriptionJsonRpcIdNotFound', async function () { + const requestId = uuidv4(); + const id = 'some-id'; + const dwnRequest = createJsonRpcSubscriptionRequest(requestId, 'rpc.subscribe.close', {}, id); + const socketConnection = sinon.createStubInstance(SocketConnection); + socketConnection.closeSubscription.throws(new DwnServerError( + DwnServerErrorCode.ConnectionSubscriptionJsonRpcIdNotFound, + '' + )); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws', socketConnection }; + + const { jsonRpcResponse } = await handleSubscriptionsClose( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InvalidParams); + expect(jsonRpcResponse.error.message).to.equal(`subscription ${id} does not exist.`); + }); + + it('should return an error if close subscription throws ConnectionSubscriptionJsonRpcIdNotFound', async function () { + const requestId = uuidv4(); + const id = 'some-id'; + const dwnRequest = createJsonRpcSubscriptionRequest(requestId, 'rpc.subscribe.close', {}, id); + const socketConnection = sinon.createStubInstance(SocketConnection); + socketConnection.closeSubscription.throws(new Error('unknown error')); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws', socketConnection }; + + const { jsonRpcResponse } = await handleSubscriptionsClose( + dwnRequest, + context, + ); + + expect(jsonRpcResponse.error).to.exist; + expect(jsonRpcResponse.error.code).to.equal(JsonRpcErrorCodes.InternalError); + expect(jsonRpcResponse.error.message).to.equal(`unknown subscription close error for ${id}: unknown error`); + }); + + it('should return a success', async function () { + const requestId = uuidv4(); + const id = 'some-id'; + const dwnRequest = createJsonRpcSubscriptionRequest(requestId, 'rpc.subscribe.close', {}, id); + const socketConnection = sinon.createStubInstance(SocketConnection); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws', socketConnection }; + + const { jsonRpcResponse } = await handleSubscriptionsClose( + dwnRequest, + context, + ); + expect(jsonRpcResponse.error).to.not.exist; + }); + + it('handler should generate a request Id if one is not provided with the request', async function () { + const requestId = uuidv4(); + const id = 'some-id'; + const dwnRequest = createJsonRpcSubscriptionRequest(requestId, 'rpc.subscribe.close', {}, id); + delete dwnRequest.id; // delete request id + + const socketConnection = sinon.createStubInstance(SocketConnection); + + const dwn = await getTestDwn(); + const context: RequestContext = { dwn, transport: 'ws', socketConnection }; + + const { jsonRpcResponse } = await handleSubscriptionsClose( + dwnRequest, + context, + ); + expect(jsonRpcResponse.error).to.not.exist; + expect(jsonRpcResponse.id).to.exist; + expect(jsonRpcResponse.id).to.not.equal(id); + }); +}); diff --git a/tests/test-dwn.ts b/tests/test-dwn.ts index b286c68..713fa55 100644 --- a/tests/test-dwn.ts +++ b/tests/test-dwn.ts @@ -1,5 +1,5 @@ import type { TenantGate } from '@tbd54566975/dwn-sdk-js'; -import { Dwn } from '@tbd54566975/dwn-sdk-js'; +import { Dwn, EventEmitterStream } from '@tbd54566975/dwn-sdk-js'; import { DataStoreSql, EventLogSql, @@ -9,26 +9,29 @@ import { import { getDialectFromURI } from '../src/storage.js'; import { DidDht, DidIon, DidKey, DidResolver } from '@web5/dids'; -export async function getTestDwn( - tenantGate?: TenantGate -): Promise { +export async function getTestDwn(options: { + tenantGate?: TenantGate, + withEvents?: boolean, +} = {}): Promise { + const { tenantGate, withEvents = false } = options; + const db = getDialectFromURI(new URL('sqlite://')); + const dataStore = new DataStoreSql(db); + const eventLog = new EventLogSql(db); + const messageStore = new MessageStoreSql(db); + const eventStream = withEvents ? new EventEmitterStream() : undefined; // NOTE: no resolver cache used here to avoid locking LevelDB const didResolver = new DidResolver({ didResolvers : [DidDht, DidIon, DidKey], }); - const db = getDialectFromURI(new URL('sqlite://')); - const dataStore = new DataStoreSql(db); - const eventLog = new EventLogSql(db); - const messageStore = new MessageStoreSql(db); - let dwn: Dwn; try { dwn = await Dwn.create({ eventLog, dataStore, messageStore, + eventStream, tenantGate, didResolver }); diff --git a/tests/utils.ts b/tests/utils.ts index 3ded3c4..af10b0a 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -1,14 +1,19 @@ -import type { Persona } from '@tbd54566975/dwn-sdk-js'; +import type { GenericMessage, Persona, UnionMessageReply } from '@tbd54566975/dwn-sdk-js'; import { Cid, DataStream, RecordsWrite } from '@tbd54566975/dwn-sdk-js'; import type { ReadStream } from 'node:fs'; import fs from 'node:fs'; import http from 'node:http'; import path from 'path'; +import { v4 as uuidv4 } from 'uuid'; +import fetch from 'node-fetch'; import type { Readable } from 'readable-stream'; import { fileURLToPath } from 'url'; import { WebSocket } from 'ws'; +import type { JsonRpcResponse } from '../src/lib/json-rpc.js'; +import { createJsonRpcRequest } from '../src/lib/json-rpc.js'; + // __filename and __dirname are not defined in ES module scope const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); @@ -139,6 +144,65 @@ export function streamHttpRequest( }); } +export async function sendHttpMessage(options: { + url: string, + target: string, + message: GenericMessage, + data?: any, +}): Promise { + const { url, target, message, data } = options; + // First RecordsWrite that creates the record. + const requestId = uuidv4(); + const jsonRpcRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { + target, + message, + }); + + const fetchOpts = { + method : 'POST', + headers : { + 'dwn-request': JSON.stringify(jsonRpcRequest) + } + }; + + if (data !== undefined) { + fetchOpts.headers['content-type'] = 'application/octet-stream'; + fetchOpts['body'] = data; + } + + const resp = await fetch(url, fetchOpts); + let dwnRpcResponse: JsonRpcResponse; + + // check to see if response is in header first. if it is, that means the response is a ReadableStream + let dataStream; + const { headers } = resp; + if (headers.has('dwn-response')) { + const jsonRpcResponse = JSON.parse(headers.get('dwn-response')) as JsonRpcResponse; + + if (jsonRpcResponse == null) { + throw new Error(`failed to parse json rpc response. dwn url: ${url}`); + } + + dataStream = resp.body; + dwnRpcResponse = jsonRpcResponse; + } else { + const responseBody = await resp.text(); + dwnRpcResponse = JSON.parse(responseBody); + } + + if (dwnRpcResponse.error) { + const { code, message } = dwnRpcResponse.error; + throw new Error(`(${code}) - ${message}`); + } + + const { reply } = dwnRpcResponse.result; + if (dataStream) { + reply['record']['data'] = dataStream; + } + + return reply as UnionMessageReply; +} + export async function sendWsMessage( address: string, message: any, diff --git a/tests/ws-api.spec.ts b/tests/ws-api.spec.ts index e2b338f..34a2150 100644 --- a/tests/ws-api.spec.ts +++ b/tests/ws-api.spec.ts @@ -1,36 +1,56 @@ -import { DataStream, TestDataGenerator } from '@tbd54566975/dwn-sdk-js'; + +import type { Dwn, MessageEvent } from '@tbd54566975/dwn-sdk-js'; +import { DataStream, Message, TestDataGenerator } from '@tbd54566975/dwn-sdk-js'; + +import type { Server } from 'http'; import { expect } from 'chai'; import { base64url } from 'multiformats/bases/base64'; -import http from 'node:http'; +import type { SinonFakeTimers } from 'sinon'; +import { useFakeTimers } from 'sinon'; import { v4 as uuidv4 } from 'uuid'; -import { type WebSocketServer } from 'ws'; import { createJsonRpcRequest, + createJsonRpcSubscriptionRequest, JsonRpcErrorCodes, } from '../src/lib/json-rpc.js'; +import { config } from '../src/config.js'; import { WsApi } from '../src/ws-api.js'; import { getTestDwn } from './test-dwn.js'; -import { createRecordsWriteMessage, sendWsMessage } from './utils.js'; +import { createRecordsWriteMessage, sendWsMessage, sendHttpMessage } from './utils.js'; +import { HttpApi } from '../src/http-api.js'; +import { JsonRpcSocket } from '../src/json-rpc-socket.js'; -let server: http.Server; -let wsServer: WebSocketServer; describe('websocket api', function () { - before(async function () { - server = http.createServer(); - server.listen(9002, '127.0.0.1'); + let server: Server; + let httpApi: HttpApi; + let wsApi: WsApi; + let dwn: Dwn; + let clock: SinonFakeTimers; + + before(() => { + clock = useFakeTimers({ shouldAdvanceTime: true }); + }); - const testDwn = await getTestDwn(); - const wsApi = new WsApi(server, testDwn); - wsServer = wsApi.start(); + after(() => { + clock.restore(); }); - after(function () { - wsServer.close(); + beforeEach(async function () { + dwn = await getTestDwn({ withEvents: true }); + httpApi = new HttpApi(config, dwn); + server = await httpApi.start(9002); + wsApi = new WsApi(server, dwn); + wsApi.start(); + }); + + afterEach(async function () { + await wsApi.close(); server.close(); server.closeAllConnections(); + await dwn.close(); }); it('returns an error response if no request payload is provided', async function () { @@ -52,7 +72,7 @@ describe('websocket api', function () { expect(resp.error.message).to.include('JSON'); }); - it('handles RecordsWrite messages', async function () { + it('RecordsWrite messages are not supported', async function () { const alice = await TestDataGenerator.generateDidKeyPersona(); const { recordsWrite, dataStream } = await createRecordsWriteMessage(alice); @@ -66,16 +86,326 @@ describe('websocket api', function () { encodedData, }); - const data = await sendWsMessage( - 'ws://127.0.0.1:9002', - JSON.stringify(dwnRequest), - ); - const resp = JSON.parse(data.toString()); - expect(resp.id).to.equal(requestId); - console.log(resp.error); - expect(resp.error).to.not.exist; + const connection = await JsonRpcSocket.connect('ws://127.0.0.1:9002'); + const response = await connection.request(dwnRequest); + + expect(response.id).to.equal(requestId); + expect(response.error).to.not.be.undefined; + expect(response.error.code).to.equal(JsonRpcErrorCodes.InvalidParams); + expect(response.error.message).to.include('RecordsWrite is not supported via ws'); + }); + + it('subscribes to records and receives updates', async () => { + const alice = await TestDataGenerator.generateDidKeyPersona(); + + const { message } = await TestDataGenerator.generateRecordsSubscribe({ + author: alice, + filter: { + schema: 'foo/bar' + } + }); + + const records: string[] = []; + const subscriptionHandler = async (event: MessageEvent): Promise => { + const { message } = event + records.push(await Message.getCid(message)); + }; + + const requestId = uuidv4(); + const dwnRequest = createJsonRpcSubscriptionRequest(requestId, 'rpc.subscribe.dwn.processMessage', { + message: message, + target: alice.did, + }); + + const connection = await JsonRpcSocket.connect('ws://127.0.0.1:9002'); + const { response, close } = await connection.subscribe(dwnRequest, (response) => { + const { event } = response.result; + subscriptionHandler(event); + }); + + expect(response.error).to.be.undefined; + expect(response.result.reply.status.code).to.equal(200); + expect(close).to.not.be.undefined; + + const write1Message = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar', + dataFormat : 'text/plain' + }); + + const writeResult1 = await sendHttpMessage({ + url : 'http://localhost:9002', + target : alice.did, + message : write1Message.message, + data : write1Message.dataBytes, + }); + expect(writeResult1.status.code).to.equal(202); + + const write2Message = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar', + dataFormat : 'text/plain' + }); + + const writeResult2 = await sendHttpMessage({ + url : 'http://localhost:9002', + target : alice.did, + message : write2Message.message, + data : write2Message.dataBytes, + }) + expect(writeResult2.status.code).to.equal(202); + + // close the subscription + await close(); + + await new Promise(resolve => setTimeout(resolve, 5)); // wait for records to be processed + expect(records).to.have.members([ + await Message.getCid(write1Message.message), + await Message.getCid(write2Message.message) + ]); + }); + + it('stops receiving updates when subscription is closed', async () => { + const alice = await TestDataGenerator.generateDidKeyPersona(); + + const { message } = await TestDataGenerator.generateRecordsSubscribe({ + author: alice, + filter: { + schema: 'foo/bar' + } + }); + + const records: string[] = []; + const subscriptionHandler = async (event: MessageEvent): Promise => { + const { message } = event; + records.push(await Message.getCid(message)); + }; + + const requestId = uuidv4(); + const subscribeId = uuidv4(); + const dwnRequest = createJsonRpcSubscriptionRequest(requestId, 'rpc.subscribe.dwn.processMessage', { + message: message, + target: alice.did, + }, subscribeId); + + const connection = await JsonRpcSocket.connect('ws://127.0.0.1:9002'); + const { response, close } = await connection.subscribe(dwnRequest, (response) => { + const { event } = response.result; + subscriptionHandler(event); + }); + + expect(response.error).to.be.undefined; + expect(response.result.reply.status.code).to.equal(200); + expect(close).to.not.be.undefined; + + const write1Message = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar', + dataFormat : 'text/plain' + }); + + const writeResult1 = await sendHttpMessage({ + url : 'http://localhost:9002', + target : alice.did, + message : write1Message.message, + data : write1Message.dataBytes, + }); + expect(writeResult1.status.code).to.equal(202); + + // close the subscription after only 1 message + await close(); + + // write more messages that won't show up in the subscription + const write2Message = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar', + dataFormat : 'text/plain' + }); + + const writeResult2 = await sendHttpMessage({ + url : 'http://localhost:9002', + target : alice.did, + message : write2Message.message, + data : write2Message.dataBytes, + }) + expect(writeResult2.status.code).to.equal(202); + + const write3Message = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar', + dataFormat : 'text/plain' + }); + + const writeResult3 = await sendHttpMessage({ + url : 'http://localhost:9002', + target : alice.did, + message : write3Message.message, + data : write3Message.dataBytes, + }) + expect(writeResult3.status.code).to.equal(202); + + await new Promise(resolve => setTimeout(resolve, 5)); // wait for records to be processed + expect(records).to.have.members([ await Message.getCid(write1Message.message) ]); + }); + + it('should fail to add subscription using a `JsonRpcId` that already exists for a subscription in that socket', async () => { + const alice = await TestDataGenerator.generateDidKeyPersona(); + + const { message } = await TestDataGenerator.generateRecordsSubscribe({ + author: alice, + filter: { + schema: 'foo/bar' + } + }); + + const records: string[] = []; + const subscriptionHandler = async (event: MessageEvent): Promise => { + const { message } = event + records.push(await Message.getCid(message)); + }; + + const requestId = uuidv4(); + const subscribeId = uuidv4(); + const dwnRequest = createJsonRpcSubscriptionRequest(requestId, 'rpc.subscribe.dwn.processMessage', { + message: message, + target: alice.did + }, subscribeId); + + const connection = await JsonRpcSocket.connect('ws://127.0.0.1:9002'); + const { close } = await connection.subscribe(dwnRequest, (response) => { + const { event } = response.result; + subscriptionHandler(event); + }); + + const { message: message2 } = await TestDataGenerator.generateRecordsSubscribe({ filter: { schema: 'bar/baz' }, author: alice }); + + // We are checking for the subscription Id not the request Id + const request2Id = uuidv4(); + const dwnRequest2 = createJsonRpcSubscriptionRequest(request2Id, 'rpc.subscribe.dwn.processMessage', { + message: message2, + target: alice.did + }, subscribeId); + + const { response: response2 } = await connection.subscribe(dwnRequest2, (response) => { + const { event } = response.result; + subscriptionHandler(event); + }); + + expect(response2.error.code).to.equal(JsonRpcErrorCodes.InvalidParams); + expect(response2.error.message).to.contain(`${subscribeId} is in use by an active subscription`); + + const write1Message = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar', + dataFormat : 'text/plain' + }); + + const writeResult1 = await sendHttpMessage({ + url : 'http://localhost:9002', + target : alice.did, + message : write1Message.message, + data : write1Message.dataBytes, + }); + expect(writeResult1.status.code).to.equal(202); + + const write2Message = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar', + dataFormat : 'text/plain' + }); + + const writeResult2 = await sendHttpMessage({ + url : 'http://localhost:9002', + target : alice.did, + message : write2Message.message, + data : write2Message.dataBytes, + }) + expect(writeResult2.status.code).to.equal(202); + + // close the subscription + await close(); + + await new Promise(resolve => setTimeout(resolve, 5)); // wait for records to be processed + expect(records).to.have.members([ + await Message.getCid(write1Message.message), + await Message.getCid(write2Message.message) + ]); + }); + + it('should receive an updated message as well as the initial write when subscribing to a record', async () => { + const alice = await TestDataGenerator.generateDidKeyPersona(); + + // write an initial message + const initialWrite = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar', + dataFormat : 'text/plain' + }); + + const writeResult1 = await sendHttpMessage({ + url : 'http://localhost:9002', + target : alice.did, + message : initialWrite.message, + data : initialWrite.dataBytes, + }); + expect(writeResult1.status.code).to.equal(202); + + // subscribe to 'foo/bar' messages + const { message } = await TestDataGenerator.generateRecordsSubscribe({ + author: alice, + filter: { + schema: 'foo/bar' + } + }); + + const records: string[] = []; + const subscriptionHandler = async (event: MessageEvent): Promise => { + const { message, initialWrite } = event + if (initialWrite) { + records.push(await Message.getCid(initialWrite)); + } + records.push(await Message.getCid(message)); + }; + + const requestId = uuidv4(); + const subscribeId = uuidv4(); + const dwnRequest = createJsonRpcSubscriptionRequest(requestId, 'rpc.subscribe.dwn.processMessage', { + message: message, + target: alice.did + }, subscribeId); + + const connection = await JsonRpcSocket.connect('ws://127.0.0.1:9002'); + const { close } = await connection.subscribe(dwnRequest, (response) => { + const { event } = response.result; + subscriptionHandler(event); + }); + + // wait for potential records to process and confirm that initial write has not been processed + await new Promise(resolve => setTimeout(resolve, 5)); + expect(records.length).length.to.equal(0); + + // update the initial message + const updatedMessage = await TestDataGenerator.generateFromRecordsWrite({ + author : alice, + existingWrite : initialWrite.recordsWrite, + }); + + const updateResult = await sendHttpMessage({ + url : 'http://localhost:9002', + target : alice.did, + message : updatedMessage.message, + data : updatedMessage.dataBytes, + }); + expect(updateResult.status.code).to.equal(202); + + await close(); + + await new Promise(resolve => setTimeout(resolve, 5)); // wait for records to be processed - const { reply } = resp.result; - expect(reply.status.code).to.equal(202); + // both initial and update should exist now + expect(records).to.have.members([ + await Message.getCid(initialWrite.message), + await Message.getCid(updatedMessage.message) + ]); }); });