From 16694f27b62224b8989a976ab840187c17c879f4 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Wed, 21 Feb 2024 15:47:23 -0500 Subject: [PATCH] update jrpc subscription support - more error proof and concise --- src/connection/socket-connection.ts | 31 ++-- src/json-rpc-api.ts | 3 +- src/json-rpc-handlers/dwn/process-message.ts | 67 ++++---- src/json-rpc-socket.ts | 62 ++++++-- src/lib/json-rpc-router.ts | 12 +- src/lib/json-rpc.ts | 8 +- tests/http-api.spec.ts | 1 - tests/json-rpc-socket.spec.ts | 22 ++- tests/utils.ts | 83 ++++------ tests/ws-api.spec.ts | 159 ++++++++++++++++--- 10 files changed, 300 insertions(+), 148 deletions(-) diff --git a/src/connection/socket-connection.ts b/src/connection/socket-connection.ts index 0567daf..e890075 100644 --- a/src/connection/socket-connection.ts +++ b/src/connection/socket-connection.ts @@ -1,4 +1,4 @@ -import type { Dwn, GenericMessage } from "@tbd54566975/dwn-sdk-js"; +import type { Dwn, GenericMessage, MessageEvent } from "@tbd54566975/dwn-sdk-js"; import { DwnMethodName } from "@tbd54566975/dwn-sdk-js"; import type { WebSocket } from "ws"; @@ -6,7 +6,7 @@ import log from 'loglevel'; import { v4 as uuidv4 } from 'uuid'; import type { RequestContext } from "../lib/json-rpc-router.js"; -import type { JsonRpcErrorResponse, JsonRpcId, JsonRpcRequest, JsonRpcResponse } from "../lib/json-rpc.js"; +import type { JsonRpcErrorResponse, JsonRpcId, JsonRpcRequest, JsonRpcResponse, JsonRpcSubscription } from "../lib/json-rpc.js"; import { requestCounter } from "../metrics.js"; import { jsonRpcApi } from "../json-rpc-api.js"; @@ -15,12 +15,6 @@ import { DwnServerError, DwnServerErrorCode } from "../dwn-error.js"; const HEARTBEAT_INTERVAL = 30_000; -export interface JsonRpcSubscription { - /** JSON RPC Id of the Subscription Request */ - id: JsonRpcId; - close: () => Promise; -} - /** * SocketConnection handles a WebSocket connection to a DWN using JSON RPC. * It also manages references to the long running RPC subscriptions for the connection. @@ -54,6 +48,13 @@ export class SocketConnection { }, HEARTBEAT_INTERVAL); } + /** + * Checks to see if the incoming `JsonRpcId` is already in use for a subscription. + */ + hasSubscription(id: JsonRpcId): boolean { + return this.subscriptions.has(id); + } + /** * Adds a reference for the JSON RPC Subscription to this connection. * Used for cleanup if the connection is closed. @@ -182,7 +183,7 @@ export class SocketConnection { * * Wraps the incoming `message` in a `JSON RPC Success Response` using the original subscription`JSON RPC Id` to send through the WebSocket. */ - private createSubscriptionHandler(id: JsonRpcId): (message: GenericMessage) => void { + private createSubscriptionHandler(id: JsonRpcId): (message: MessageEvent) => void { return (event) => { const response = createJsonRpcSuccessResponse(id, { reply: { event } }); this.send(response); @@ -195,17 +196,23 @@ export class SocketConnection { * Adds a `subscriptionHandler` for `Subscribe` messages. */ private async buildRequestContext(request: JsonRpcRequest): Promise { - const { id, params, method } = request; + const { params, method } = request; + const { subscribe } = params.rpc || {}; + const requestContext: RequestContext = { transport : 'ws', dwn : this.dwn, socketConnection : this, } - if (method === 'dwn.processMessage') { + if (method.startsWith('rpc.subscribe.') && subscribe) { const { message } = params as { message: GenericMessage }; if (message.descriptor.method === DwnMethodName.Subscribe) { - requestContext.subscriptionHandler = this.createSubscriptionHandler(id).bind(this); + const handlerFunc = this.createSubscriptionHandler(subscribe); + requestContext.subscriptionRequest = { + id: subscribe, + subscriptionHandler: (message): void => handlerFunc(message), + } } } diff --git a/src/json-rpc-api.ts b/src/json-rpc-api.ts index 647ff21..cc1dc07 100644 --- a/src/json-rpc-api.ts +++ b/src/json-rpc-api.ts @@ -6,4 +6,5 @@ import { handleSubscriptionsClose } from './json-rpc-handlers/subscription/index export const jsonRpcApi = new JsonRpcRouter(); jsonRpcApi.on('dwn.processMessage', handleDwnProcessMessage); -jsonRpcApi.on('subscription.close', handleSubscriptionsClose); +jsonRpcApi.on('rpc.subscribe.dwn.processMessage', handleDwnProcessMessage); +jsonRpcApi.on('rpc.subscribe.close', handleSubscriptionsClose); diff --git a/src/json-rpc-handlers/dwn/process-message.ts b/src/json-rpc-handlers/dwn/process-message.ts index 2f88c6f..0d419c7 100644 --- a/src/json-rpc-handlers/dwn/process-message.ts +++ b/src/json-rpc-handlers/dwn/process-message.ts @@ -2,26 +2,27 @@ import type { GenericMessage } from '@tbd54566975/dwn-sdk-js'; import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js'; import type { Readable as IsomorphicReadable } from 'readable-stream'; +import log from 'loglevel'; import { v4 as uuidv4 } from 'uuid'; +import type { JsonRpcSubscription } from '../../lib/json-rpc.js'; import type { HandlerResponse, JsonRpcHandler, } from '../../lib/json-rpc-router.js'; -import { DwnServerErrorCode } from '../../dwn-error.js'; import { createJsonRpcErrorResponse, createJsonRpcSuccessResponse, JsonRpcErrorCodes, } from '../../lib/json-rpc.js'; -import log from 'loglevel'; + export const handleDwnProcessMessage: JsonRpcHandler = async ( dwnRequest, context, ) => { - const { dwn, dataStream, subscriptionHandler, socketConnection, transport } = context; + const { dwn, dataStream, subscriptionRequest, socketConnection, transport } = context; const { target, message } = dwnRequest.params as { target: string, message: GenericMessage }; const requestId = dwnRequest.id ?? uuidv4(); @@ -41,23 +42,41 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( return { jsonRpcResponse }; } + // subscribe methods must come with a subscriptionRequest context + if (message.descriptor.method === DwnMethodName.Subscribe && subscriptionRequest === undefined) { + const jsonRpcResponse = createJsonRpcErrorResponse( + requestId, + JsonRpcErrorCodes.InvalidRequest, + `subscribe methods must contain a subscriptionRequest context` + ); + return { jsonRpcResponse }; + } + // Subscribe methods are only supported on 'ws' (WebSockets) - if (transport !== 'ws' && message.descriptor.method === DwnMethodName.Subscribe) { + if (transport !== 'ws' && subscriptionRequest !== undefined) { const jsonRpcResponse = createJsonRpcErrorResponse( requestId, JsonRpcErrorCodes.InvalidParams, - `Subscribe not supported via ${context.transport}` + `subscriptions are not supported via ${context.transport}` + ) + return { jsonRpcResponse }; + } + + if (subscriptionRequest !== undefined && socketConnection?.hasSubscription(subscriptionRequest.id)) { + const jsonRpcResponse = createJsonRpcErrorResponse( + requestId, + JsonRpcErrorCodes.InvalidParams, + `the subscribe id: ${subscriptionRequest.id} is in use by an active subscription` ) return { jsonRpcResponse }; } const reply = await dwn.processMessage(target, message, { dataStream: dataStream as IsomorphicReadable, - subscriptionHandler, + subscriptionHandler: subscriptionRequest?.subscriptionHandler, }); - const { record, subscription } = reply; - + const { record } = reply; // RecordsRead messages return record data as a stream to for accommodate large amounts of data let recordDataStream: IsomorphicReadable; if (record !== undefined && record.data !== undefined) { @@ -66,29 +85,16 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( } // Subscribe messages return a close function to facilitate closing the subscription - if (subscription !== undefined) { - const { close } = subscription; - try { - // adding a reference to the close function for this subscription request to the connection. - // this will facilitate closing the subscription later. - await socketConnection.addSubscription({ id: requestId, close }); - delete reply.subscription.close // not serializable via JSON - } catch(error) { - // close the subscription upon receiving an error here - await close(); - if (error.code === DwnServerErrorCode.ConnectionSubscriptionJsonRpcIdExists) { - // a subscription with this request id already exists - const errorResponse = createJsonRpcErrorResponse( - requestId, - JsonRpcErrorCodes.BadRequest, - `the request id ${requestId} already has an active subscription` - ); - return { jsonRpcResponse: errorResponse }; - } else { - // will catch as an unknown error below - throw new Error('unknown error adding subscription'); - } + if (subscriptionRequest && reply.subscription) { + const { close } = reply.subscription; + // we add a reference to the close function for this subscription request to the socket connection. + // this will facilitate closing the subscription later. + const subscriptionReply: JsonRpcSubscription = { + id: subscriptionRequest.id, + close, } + await socketConnection.addSubscription(subscriptionReply); + delete reply.subscription.close // delete the close method from the reply as it's not JSON serializable and has a held reference. } const jsonRpcResponse = createJsonRpcSuccessResponse(requestId, { reply }); @@ -107,7 +113,6 @@ export const handleDwnProcessMessage: JsonRpcHandler = async ( // log the error response log.error('handleDwnProcessMessage error', jsonRpcResponse); - return { jsonRpcResponse } as HandlerResponse; } }; diff --git a/src/json-rpc-socket.ts b/src/json-rpc-socket.ts index 826a353..d84197d 100644 --- a/src/json-rpc-socket.ts +++ b/src/json-rpc-socket.ts @@ -3,6 +3,7 @@ import { v4 as uuidv4 } from 'uuid'; import WebSocket from 'ws'; import type { JsonRpcRequest, JsonRpcResponse } from "./lib/json-rpc.js"; +import { createJsonRpcRequest } from "./lib/json-rpc.js"; // These were arbitrarily chosen, but can be modified via connect options const CONNECT_TIMEOUT = 3_000; @@ -89,26 +90,61 @@ export class JsonRpcSocket { * Sends a JSON-RPC request through the socket and keeps a listener open to read associated responses as they arrive. * Returns a close method to clean up the listener. */ - subscribe(request: JsonRpcRequest, listener: (response: JsonRpcResponse) => void): { close: () => void } { - request.id ??= uuidv4(); + async subscribe(request: JsonRpcRequest, listener: (response: JsonRpcResponse) => void): Promise<{ + response: JsonRpcResponse; + close?: () => Promise; + }> { + + if (!request.method.startsWith('rpc.subscribe.')) { + throw new Error('subscribe rpc messages must include the `rpc.subscribe` prefix'); + } + + // extract optional `rpc.subscribe` param + const { rpc } = request.params; + const { subscribe } = rpc || {}; + const subscriptionId = subscribe || uuidv4(); + + // When subscribing to a JSON RPC Message, we want to generate the subscription update Json PRC Id ahead of time and create a listener. + // We then set the subscription Id within a special rpc.subscribe params namespace preserving any other properties + request.params.rpc = { + ...rpc, + subscribe: subscriptionId, + }; + const messageHandler = (event: { data: any }):void => { const jsonRpcResponse = JSON.parse(event.data.toString()) as JsonRpcResponse; - if (jsonRpcResponse.id === request.id) { - // if the incoming response id matches the request id, trigger the listener - return listener(jsonRpcResponse); + if (jsonRpcResponse.id === subscriptionId) { + if (jsonRpcResponse.error !== undefined) { + // remove the event listener upon receipt of a JSON RPC Error. + this.socket.removeEventListener('message', messageHandler); + } + + listener(jsonRpcResponse); } }; - - // subscribe to the listener before sending the request this.socket.addEventListener('message', messageHandler); - this.send(request); - return { - close: ():void => { - // removes the listener for this particular request - this.socket.removeEventListener('message', messageHandler); + const response = await this.request(request); + if (response.error) { + this.socket.removeEventListener('message', messageHandler); + return { response } + } + + // clean up listener and create a `rpc.subscribe.close` message to use when closing this JSON RPC subscription + const close = async (): Promise => { + this.socket.removeEventListener('message', messageHandler); + const requestId = uuidv4(); + const request = createJsonRpcRequest(requestId, 'rpc.subscribe.close', { id: subscriptionId }); + const response = await this.request(request); + if (response.error) { + throw response.error; } - }; + } + + return { + response, + close + } } /** diff --git a/src/lib/json-rpc-router.ts b/src/lib/json-rpc-router.ts index ec63cee..580d375 100644 --- a/src/lib/json-rpc-router.ts +++ b/src/lib/json-rpc-router.ts @@ -1,16 +1,20 @@ -import type { Dwn, GenericMessage } from '@tbd54566975/dwn-sdk-js'; +import type { Dwn, MessageEvent } from '@tbd54566975/dwn-sdk-js'; import type { Readable } from 'node:stream'; -import type { JsonRpcRequest, JsonRpcResponse } from './json-rpc.js'; +import type { JsonRpcId, JsonRpcRequest, JsonRpcResponse } from './json-rpc.js'; import type { SocketConnection } from '../connection/socket-connection.js'; export type RequestContext = { transport: 'http' | 'ws'; dwn: Dwn; socketConnection?: SocketConnection; - /** The `GenericMessage` handler associated with a subscription request, only used in `ws` requests */ - subscriptionHandler?: (message: GenericMessage) => void; + subscriptionRequest?: { + /** The JsonRpcId of the subscription handler */ + id: JsonRpcId; + /** The `MessageEvent` handler associated with a subscription request, only used in `ws` requests */ + subscriptionHandler: (message: MessageEvent) => void; + } /** The `Readable` stream associated with a `RecordsWrite` request only used in `http` requests */ dataStream?: Readable; }; diff --git a/src/lib/json-rpc.ts b/src/lib/json-rpc.ts index fbffd99..a460873 100644 --- a/src/lib/json-rpc.ts +++ b/src/lib/json-rpc.ts @@ -15,6 +15,12 @@ export interface JsonRpcError { data?: any; } +export interface JsonRpcSubscription { + /** JSON RPC Id of the Subscription Request */ + id: JsonRpcId; + close: () => Promise; +} + export enum JsonRpcErrorCodes { // JSON-RPC 2.0 pre-defined errors InvalidRequest = -32600, @@ -37,7 +43,7 @@ export interface JsonRpcSuccessResponse { jsonrpc: JsonRpcVersion; id: JsonRpcId; result: any; - error?: undefined; + error?: never; } export interface JsonRpcErrorResponse { diff --git a/tests/http-api.spec.ts b/tests/http-api.spec.ts index 1b391a7..a64121e 100644 --- a/tests/http-api.spec.ts +++ b/tests/http-api.spec.ts @@ -228,7 +228,6 @@ describe('http api', function () { expect(body.error).to.not.exist; const { reply } = body.result; - console.log('reply', reply); expect(reply.status.code).to.equal(202); }); diff --git a/tests/json-rpc-socket.spec.ts b/tests/json-rpc-socket.spec.ts index b0177d6..1279aa4 100644 --- a/tests/json-rpc-socket.spec.ts +++ b/tests/json-rpc-socket.spec.ts @@ -4,7 +4,7 @@ import chai, { expect } from 'chai'; import { v4 as uuidv4 } from 'uuid'; import { WebSocketServer } from 'ws'; -import type { JsonRpcId, JsonRpcRequest, JsonRpcResponse } from '../src/lib/json-rpc.js'; +import type { JsonRpcId, JsonRpcRequest, JsonRpcSuccessResponse } from '../src/lib/json-rpc.js'; import { JsonRpcSocket } from '../src/json-rpc-socket.js'; import { createJsonRpcRequest, createJsonRpcSuccessResponse } from '../src/lib/json-rpc.js'; @@ -69,26 +69,34 @@ describe('JsonRpcSocket', () => { wsServer.addListener('connection', (socket) => { socket.on('message', (dataBuffer: Buffer) => { const request = JSON.parse(dataBuffer.toString()) as JsonRpcRequest; + // initial response + const response = createJsonRpcSuccessResponse(request.id, { reply: {} }) + socket.send(Buffer.from(JSON.stringify(response))); + + const { params } = request; + const { subscribe } = params.rpc || {}; // send 3 messages for (let i = 0; i < 3; i++) { - const response = createJsonRpcSuccessResponse(request.id, { count: i }); + const response = createJsonRpcSuccessResponse(subscribe, { count: i }); socket.send(Buffer.from(JSON.stringify(response))); } }); }); - const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003'); + const client = await JsonRpcSocket.connect('ws://127.0.0.1:9003', { responseTimeout: 5 }); const requestId = uuidv4(); - const request = createJsonRpcRequest(requestId, 'test.method', { param1: 'test-param1', param2: 'test-param2' }); + const subscribeId = uuidv4(); + const request = createJsonRpcRequest(requestId, 'rpc.subscribe.test.method', { param1: 'test-param1', param2: 'test-param2', rpc: { subscribe: subscribeId } }); let responseCounter = 0; - const responseListener = (response: JsonRpcResponse): void => { - expect(response.id).to.equal(request.id); + const responseListener = (response: JsonRpcSuccessResponse): void => { + expect(response.id).to.equal(subscribeId); const { count } = response.result; expect(count).to.equal(responseCounter); responseCounter++; } - const subscription = client.subscribe(request, responseListener); + const subscription = await client.subscribe(request, responseListener); + expect(subscription.response.error).to.be.undefined; // wait for the messages to arrive await new Promise((resolve) => setTimeout(resolve, 5)); // the original response diff --git a/tests/utils.ts b/tests/utils.ts index 1e1dda3..7655a57 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -1,4 +1,4 @@ -import type { GenericMessage, Persona, UnionMessageReply } from '@tbd54566975/dwn-sdk-js'; +import type { GenericMessage, MessageEvent, Persona, UnionMessageReply } from '@tbd54566975/dwn-sdk-js'; import { Cid, DataStream, RecordsWrite } from '@tbd54566975/dwn-sdk-js'; import type { ReadStream } from 'node:fs'; @@ -11,7 +11,7 @@ import type { Readable } from 'readable-stream'; import { fileURLToPath } from 'url'; import { WebSocket } from 'ws'; -import type { JsonRpcResponse, JsonRpcRequest, JsonRpcId } from '../src/lib/json-rpc.js'; +import type { JsonRpcResponse, JsonRpcRequest } from '../src/lib/json-rpc.js'; import { createJsonRpcRequest } from '../src/lib/json-rpc.js'; import { JsonRpcSocket } from '../src/json-rpc-socket.js'; @@ -222,60 +222,33 @@ export async function sendWsMessage( }); } -const MAX_RESPONSE_TIMEOUT = 1_500; - -export async function subscriptionRequest( - url: string, +export async function sendWsRequest(options: { + url?: string, + connection?: JsonRpcSocket, request: JsonRpcRequest, - messageHandler: (message: GenericMessage) => void, -): Promise<{ status: any, subscription?: { id: string, close: () => Promise } }> { - const { id: requestId } = request; - const connection = await JsonRpcSocket.connect(url); - - const closeSubscription = async (id: JsonRpcId, connection: JsonRpcSocket): Promise => { - const requestId = uuidv4(); - const request = createJsonRpcRequest(requestId, 'subscription.close', { id }); - return await connection.request(request); - } - - return new Promise<{ status: any, subscription?: { id: string, close: () => Promise } }>((resolve, reject) => { - const { close: subscriptionClose } = connection.subscribe(request, (response) => { - const { result, error } = response; - - // this is an error specific to the `JsonRpcRequest` requesting the subscription - if (error) { - reject(error); - return; - } - - // at this point the reply should be DwnRpcResponse - const { status, event, subscription } = result.reply; - if (event) { - messageHandler(event); - return; - } - - if (subscription) { - resolve({ - status, - subscription: { - ...subscription, - close: async (): Promise => { - subscriptionClose(); - const closeResponse = await closeSubscription(requestId, connection); - if (closeResponse.error?.message !== undefined) { - throw new Error(`unable to close subscription: ${closeResponse.error.message}`); - } - } - } - }) - } - - resolve({ status }); - }); + responseTimeout?: number, +}): Promise { + const { url, connection: incomingConnection , request, responseTimeout } = options; + const connection = incomingConnection ?? await JsonRpcSocket.connect(url, { responseTimeout }); + return connection.request(request); +} - setTimeout(() => { - return reject('subscription request timeout'); - }, MAX_RESPONSE_TIMEOUT); +export async function subscriptionRequest(options: { + url?: string, + connection?: JsonRpcSocket, + request: JsonRpcRequest, + messageHandler: (event: MessageEvent) => void, + responseTimeout?: number; +}): Promise<{ close?: () => Promise, response: JsonRpcResponse, connection?: JsonRpcSocket }> { + const { url, connection: incomingConnection, request, messageHandler, responseTimeout } = options; + const connection = incomingConnection ?? await JsonRpcSocket.connect(url, { responseTimeout }); + + const { close, response } = await connection.subscribe(request, (response) => { + const { event } = response.result.reply; + messageHandler(event); }); + + return { + response, close, connection + } } diff --git a/tests/ws-api.spec.ts b/tests/ws-api.spec.ts index 58a989f..486b5fd 100644 --- a/tests/ws-api.spec.ts +++ b/tests/ws-api.spec.ts @@ -1,11 +1,13 @@ -import type { Dwn, GenericMessage } from '@tbd54566975/dwn-sdk-js'; +import type { Dwn, MessageEvent } from '@tbd54566975/dwn-sdk-js'; import { DataStream, Message, TestDataGenerator } from '@tbd54566975/dwn-sdk-js'; import type { Server } from 'http'; import { expect } from 'chai'; import { base64url } from 'multiformats/bases/base64'; +import type { SinonFakeTimers } from 'sinon'; +import { useFakeTimers } from 'sinon'; import { v4 as uuidv4 } from 'uuid'; import { @@ -15,7 +17,7 @@ import { import { config } from '../src/config.js'; import { WsApi } from '../src/ws-api.js'; import { getTestDwn } from './test-dwn.js'; -import { createRecordsWriteMessage, sendWsMessage, sendHttpMessage, subscriptionRequest } from './utils.js'; +import { createRecordsWriteMessage, sendWsMessage, sendHttpMessage, subscriptionRequest, sendWsRequest } from './utils.js'; import { HttpApi } from '../src/http-api.js'; @@ -24,6 +26,15 @@ describe('websocket api', function () { let httpApi: HttpApi; let wsApi: WsApi; let dwn: Dwn; + let clock: SinonFakeTimers; + + before(() => { + clock = useFakeTimers({ shouldAdvanceTime: true }); + }); + + after(() => { + clock.restore(); + }); beforeEach(async function () { dwn = await getTestDwn({ withEvents: true }); @@ -73,15 +84,14 @@ describe('websocket api', function () { encodedData, }); - const data = await sendWsMessage( - 'ws://127.0.0.1:9002', - JSON.stringify(dwnRequest), - ); - const resp = JSON.parse(data.toString()); - expect(resp.id).to.equal(requestId); - expect(resp.error).to.not.be.undefined; - expect(resp.error.code).to.equal(JsonRpcErrorCodes.InvalidParams); - expect(resp.error.message).to.include('RecordsWrite is not supported via ws'); + const response = await sendWsRequest({ + url:'ws://127.0.0.1:9002', + request: dwnRequest, + }); + expect(response.id).to.equal(requestId); + expect(response.error).to.not.be.undefined; + expect(response.error.code).to.equal(JsonRpcErrorCodes.InvalidParams); + expect(response.error.message).to.include('RecordsWrite is not supported via ws'); }); it('subscribes to records and receives updates', async () => { @@ -95,19 +105,25 @@ describe('websocket api', function () { }); const records: string[] = []; - const subscriptionHandler = async (message: GenericMessage): Promise => { + const subscriptionHandler = async (event: MessageEvent): Promise => { + const { message } = event records.push(await Message.getCid(message)); }; const requestId = uuidv4(); - const dwnRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { + const dwnRequest = createJsonRpcRequest(requestId, 'rpc.subscribe.dwn.processMessage', { message: message, target: alice.did, }); - const response = await subscriptionRequest('ws://127.0.0.1:9002', dwnRequest, subscriptionHandler); - expect(response.status.code).to.equal(200); - expect(response.subscription).to.not.be.undefined; + const { response, close } = await subscriptionRequest({ + url : 'ws://127.0.0.1:9002', + request : dwnRequest, + messageHandler : subscriptionHandler + }); + expect(response.error).to.be.undefined; + expect(response.result.reply.status.code).to.equal(200); + expect(close).to.not.be.undefined; const write1Message = await TestDataGenerator.generateRecordsWrite({ author : alice, @@ -138,7 +154,7 @@ describe('websocket api', function () { expect(writeResult2.status.code).to.equal(202); // close the subscription - await response.subscription.close(); + await close(); await new Promise(resolve => setTimeout(resolve, 5)); // wait for records to be processed expect(records).to.have.members([ @@ -158,18 +174,25 @@ describe('websocket api', function () { }); const records: string[] = []; - const subscriptionHandler = async (message: GenericMessage): Promise => { + const subscriptionHandler = async (event: MessageEvent): Promise => { + const { message } = event; records.push(await Message.getCid(message)); }; const requestId = uuidv4(); - const dwnRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', { + const dwnRequest = createJsonRpcRequest(requestId, 'rpc.subscribe.dwn.processMessage', { message: message, target: alice.did, }); - const response = await subscriptionRequest('ws://127.0.0.1:9002', dwnRequest, subscriptionHandler); - expect(response.status.code).to.equal(200); - expect(response.subscription).to.not.be.undefined; + + const { response, close } = await subscriptionRequest({ + url : 'ws://127.0.0.1:9002', + request : dwnRequest, + messageHandler : subscriptionHandler + }); + expect(response.error).to.be.undefined; + expect(response.result.reply.status.code).to.equal(200); + expect(close).to.not.be.undefined; const write1Message = await TestDataGenerator.generateRecordsWrite({ author : alice, @@ -186,7 +209,7 @@ describe('websocket api', function () { expect(writeResult1.status.code).to.equal(202); // close the subscription after only 1 message - await response.subscription.close(); + await close(); // write more messages that won't show up in the subscription const write2Message = await TestDataGenerator.generateRecordsWrite({ @@ -220,4 +243,94 @@ describe('websocket api', function () { await new Promise(resolve => setTimeout(resolve, 5)); // wait for records to be processed expect(records).to.have.members([ await Message.getCid(write1Message.message) ]); }); + + it('should fail to add subscription using a `JsonRpcId` that already exists for a subscription in that socket', async () => { + const alice = await TestDataGenerator.generateDidKeyPersona(); + + const { message } = await TestDataGenerator.generateRecordsSubscribe({ + author: alice, + filter: { + schema: 'foo/bar' + } + }); + + const records: string[] = []; + const subscriptionHandler = async (event: MessageEvent): Promise => { + const { message } = event + records.push(await Message.getCid(message)); + }; + + const requestId = uuidv4(); + const subscribeId = uuidv4(); + const dwnRequest = createJsonRpcRequest(requestId, 'rpc.subscribe.dwn.processMessage', { + message: message, + target: alice.did, + rpc: { subscribe: subscribeId } + }); + + const { response, close, connection } = await subscriptionRequest({ + url : 'ws://127.0.0.1:9002', + request : dwnRequest, + messageHandler : subscriptionHandler + }); + expect(response.error).to.be.undefined; + expect(response.result.reply.status.code).to.equal(200); + expect(close).to.not.be.undefined; + + + const { message: message2 } = await TestDataGenerator.generateRecordsSubscribe({ filter: { schema: 'bar/baz' }, author: alice }); + + // We are checking for the subscription Id not the request Id + const request2Id = uuidv4(); + const dwnRequest2 = createJsonRpcRequest(request2Id, 'rpc.subscribe.dwn.processMessage', { + message: message2, + target: alice.did, + rpc: { subscribe: subscribeId } + }); + + const { response: response2 } = await subscriptionRequest({ + connection, + request: dwnRequest2, + messageHandler: subscriptionHandler, + }); + expect(response2.error.code).to.equal(JsonRpcErrorCodes.InvalidParams); + expect(response2.error.message).to.contain(`${subscribeId} is in use by an active subscription`); + + const write1Message = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar', + dataFormat : 'text/plain' + }); + + const writeResult1 = await sendHttpMessage({ + url : 'http://localhost:9002', + target : alice.did, + message : write1Message.message, + data : write1Message.dataBytes, + }); + expect(writeResult1.status.code).to.equal(202); + + const write2Message = await TestDataGenerator.generateRecordsWrite({ + author : alice, + schema : 'foo/bar', + dataFormat : 'text/plain' + }); + + const writeResult2 = await sendHttpMessage({ + url : 'http://localhost:9002', + target : alice.did, + message : write2Message.message, + data : write2Message.dataBytes, + }) + expect(writeResult2.status.code).to.equal(202); + + // close the subscription + await close(); + + await new Promise(resolve => setTimeout(resolve, 5)); // wait for records to be processed + expect(records).to.have.members([ + await Message.getCid(write1Message.message), + await Message.getCid(write2Message.message) + ]); + }); });