diff --git a/src/connection/socket-connection.ts b/src/connection/socket-connection.ts index e890075..1178506 100644 --- a/src/connection/socket-connection.ts +++ b/src/connection/socket-connection.ts @@ -196,8 +196,7 @@ export class SocketConnection { * Adds a `subscriptionHandler` for `Subscribe` messages. */ private async buildRequestContext(request: JsonRpcRequest): Promise { - const { params, method } = request; - const { subscribe } = params.rpc || {}; + const { params, method, subscribe } = request; const requestContext: RequestContext = { transport : 'ws', @@ -206,11 +205,11 @@ export class SocketConnection { } if (method.startsWith('rpc.subscribe.') && subscribe) { - const { message } = params as { message: GenericMessage }; - if (message.descriptor.method === DwnMethodName.Subscribe) { - const handlerFunc = this.createSubscriptionHandler(subscribe); + const { message } = params as { message?: GenericMessage }; + if (message?.descriptor.method === DwnMethodName.Subscribe) { + const handlerFunc = this.createSubscriptionHandler(subscribe.id); requestContext.subscriptionRequest = { - id: subscribe, + id: subscribe.id, subscriptionHandler: (message): void => handlerFunc(message), } } diff --git a/src/json-rpc-api.ts b/src/json-rpc-api.ts index cc1dc07..bb96f29 100644 --- a/src/json-rpc-api.ts +++ b/src/json-rpc-api.ts @@ -7,4 +7,5 @@ export const jsonRpcApi = new JsonRpcRouter(); jsonRpcApi.on('dwn.processMessage', handleDwnProcessMessage); jsonRpcApi.on('rpc.subscribe.dwn.processMessage', handleDwnProcessMessage); + jsonRpcApi.on('rpc.subscribe.close', handleSubscriptionsClose); diff --git a/src/json-rpc-handlers/subscription/close.ts b/src/json-rpc-handlers/subscription/close.ts index 3c47fe1..fa15853 100644 --- a/src/json-rpc-handlers/subscription/close.ts +++ b/src/json-rpc-handlers/subscription/close.ts @@ -26,7 +26,7 @@ export const handleSubscriptionsClose: JsonRpcHandler = async ( ) => { const requestId = dwnRequest.id ?? uuidv4(); const { socketConnection } = context; - const { id } = dwnRequest.params as { id: JsonRpcId}; + const { id } = dwnRequest.subscribe as { id: JsonRpcId }; let jsonRpcResponse:JsonRpcResponse; try { diff --git a/src/json-rpc-socket.ts b/src/json-rpc-socket.ts index d84197d..ce327c8 100644 --- a/src/json-rpc-socket.ts +++ b/src/json-rpc-socket.ts @@ -3,7 +3,7 @@ import { v4 as uuidv4 } from 'uuid'; import WebSocket from 'ws'; import type { JsonRpcRequest, JsonRpcResponse } from "./lib/json-rpc.js"; -import { createJsonRpcRequest } from "./lib/json-rpc.js"; +import { createJsonRpcSubscribeRequest } from "./lib/json-rpc.js"; // These were arbitrarily chosen, but can be modified via connect options const CONNECT_TIMEOUT = 3_000; @@ -73,7 +73,6 @@ export class JsonRpcSocket { return resolve(jsonRpsResponse); } }; - // subscribe to the listener before sending the request this.socket.addEventListener('message', handleResponse); this.send(request); @@ -96,21 +95,14 @@ export class JsonRpcSocket { }> { if (!request.method.startsWith('rpc.subscribe.')) { - throw new Error('subscribe rpc messages must include the `rpc.subscribe` prefix'); + throw new Error('subscribe rpc requests must include the `rpc.subscribe` prefix'); } - // extract optional `rpc.subscribe` param - const { rpc } = request.params; - const { subscribe } = rpc || {}; - const subscriptionId = subscribe || uuidv4(); - - // When subscribing to a JSON RPC Message, we want to generate the subscription update Json PRC Id ahead of time and create a listener. - // We then set the subscription Id within a special rpc.subscribe params namespace preserving any other properties - request.params.rpc = { - ...rpc, - subscribe: subscriptionId, - }; + if (!request.subscribe) { + throw new Error('subscribe rpc requests must include subscribe options'); + } + const subscriptionId = request.subscribe.id; const messageHandler = (event: { data: any }):void => { const jsonRpcResponse = JSON.parse(event.data.toString()) as JsonRpcResponse; if (jsonRpcResponse.id === subscriptionId) { @@ -134,7 +126,7 @@ export class JsonRpcSocket { const close = async (): Promise => { this.socket.removeEventListener('message', messageHandler); const requestId = uuidv4(); - const request = createJsonRpcRequest(requestId, 'rpc.subscribe.close', { id: subscriptionId }); + const request = createJsonRpcSubscribeRequest(requestId, 'rpc.subscribe.close', {}, subscriptionId) const response = await this.request(request); if (response.error) { throw response.error; diff --git a/src/lib/json-rpc.ts b/src/lib/json-rpc.ts index a460873..bb4021d 100644 --- a/src/lib/json-rpc.ts +++ b/src/lib/json-rpc.ts @@ -1,3 +1,5 @@ +import { v4 as uuidv4 } from 'uuid'; + export type JsonRpcId = string | number | null; export type JsonRpcParams = any; export type JsonRpcVersion = '2.0'; @@ -7,6 +9,10 @@ export interface JsonRpcRequest { id?: JsonRpcId; method: string; params?: JsonRpcParams; + /** JSON RPC Subscribe Extension Parameters */ + subscribe?: { + id: JsonRpcId + }; } export interface JsonRpcError { @@ -81,6 +87,23 @@ export const createJsonRpcNotification = ( }; }; +export const createJsonRpcSubscribeRequest = ( + id: JsonRpcId, + method: string, + params?: JsonRpcParams, + subscriptionId?: JsonRpcId +): JsonRpcRequest => { + return { + jsonrpc: '2.0', + id, + method, + params, + subscribe: { + id: subscriptionId ?? uuidv4(), + } + } +} + export const createJsonRpcRequest = ( id: JsonRpcId, method: string, diff --git a/tests/json-rpc-socket.spec.ts b/tests/json-rpc-socket.spec.ts index 1279aa4..20f2daf 100644 --- a/tests/json-rpc-socket.spec.ts +++ b/tests/json-rpc-socket.spec.ts @@ -7,7 +7,7 @@ import { WebSocketServer } from 'ws'; import type { JsonRpcId, JsonRpcRequest, JsonRpcSuccessResponse } from '../src/lib/json-rpc.js'; import { JsonRpcSocket } from '../src/json-rpc-socket.js'; -import { createJsonRpcRequest, createJsonRpcSuccessResponse } from '../src/lib/json-rpc.js'; +import { createJsonRpcRequest, createJsonRpcSubscribeRequest, createJsonRpcSuccessResponse } from '../src/lib/json-rpc.js'; chai.use(chaiAsPromised); @@ -72,12 +72,10 @@ describe('JsonRpcSocket', () => { // initial response const response = createJsonRpcSuccessResponse(request.id, { reply: {} }) socket.send(Buffer.from(JSON.stringify(response))); - - const { params } = request; - const { subscribe } = params.rpc || {}; + const { subscribe } = request; // send 3 messages for (let i = 0; i < 3; i++) { - const response = createJsonRpcSuccessResponse(subscribe, { count: i }); + const response = createJsonRpcSuccessResponse(subscribe.id, { count: i }); socket.send(Buffer.from(JSON.stringify(response))); } }); @@ -85,7 +83,12 @@ describe('JsonRpcSocket', () => { const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003', { responseTimeout: 5 }); const requestId = uuidv4(); const subscribeId = uuidv4(); - const request = createJsonRpcRequest(requestId, 'rpc.subscribe.test.method', { param1: 'test-param1', param2: 'test-param2', rpc: { subscribe: subscribeId } }); + const request = createJsonRpcSubscribeRequest( + requestId, + 'rpc.subscribe.test.method', + { param1: 'test-param1', param2: 'test-param2' }, + subscribeId, + ); let responseCounter = 0; const responseListener = (response: JsonRpcSuccessResponse): void => { @@ -101,7 +104,7 @@ describe('JsonRpcSocket', () => { await new Promise((resolve) => setTimeout(resolve, 5)); // the original response expect(responseCounter).to.equal(3); - subscription.close(); + await subscription.close(); }); it('sends message', async () => { diff --git a/tests/utils.ts b/tests/utils.ts index 7655a57..55761e6 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -242,6 +242,9 @@ export async function subscriptionRequest(options: { }): Promise<{ close?: () => Promise, response: JsonRpcResponse, connection?: JsonRpcSocket }> { const { url, connection: incomingConnection, request, messageHandler, responseTimeout } = options; const connection = incomingConnection ?? await JsonRpcSocket.connect(url, { responseTimeout }); + request.subscribe ??= { + id: uuidv4(), + }; const { close, response } = await connection.subscribe(request, (response) => { const { event } = response.result.reply; diff --git a/tests/ws-api.spec.ts b/tests/ws-api.spec.ts index 486b5fd..502ce7c 100644 --- a/tests/ws-api.spec.ts +++ b/tests/ws-api.spec.ts @@ -12,6 +12,7 @@ import { v4 as uuidv4 } from 'uuid'; import { createJsonRpcRequest, + createJsonRpcSubscribeRequest, JsonRpcErrorCodes, } from '../src/lib/json-rpc.js'; import { config } from '../src/config.js'; @@ -111,7 +112,7 @@ describe('websocket api', function () { }; const requestId = uuidv4(); - const dwnRequest = createJsonRpcRequest(requestId, 'rpc.subscribe.dwn.processMessage', { + const dwnRequest = createJsonRpcSubscribeRequest(requestId, 'rpc.subscribe.dwn.processMessage', { message: message, target: alice.did, }); @@ -262,11 +263,10 @@ describe('websocket api', function () { const requestId = uuidv4(); const subscribeId = uuidv4(); - const dwnRequest = createJsonRpcRequest(requestId, 'rpc.subscribe.dwn.processMessage', { + const dwnRequest = createJsonRpcSubscribeRequest(requestId, 'rpc.subscribe.dwn.processMessage', { message: message, - target: alice.did, - rpc: { subscribe: subscribeId } - }); + target: alice.did + }, subscribeId); const { response, close, connection } = await subscriptionRequest({ url : 'ws://127.0.0.1:9002', @@ -282,11 +282,10 @@ describe('websocket api', function () { // We are checking for the subscription Id not the request Id const request2Id = uuidv4(); - const dwnRequest2 = createJsonRpcRequest(request2Id, 'rpc.subscribe.dwn.processMessage', { + const dwnRequest2 = createJsonRpcSubscribeRequest(request2Id, 'rpc.subscribe.dwn.processMessage', { message: message2, - target: alice.did, - rpc: { subscribe: subscribeId } - }); + target: alice.did + }, subscribeId); const { response: response2 } = await subscriptionRequest({ connection,