Skip to content

Commit

Permalink
increase testing across the board
Browse files Browse the repository at this point in the history
  • Loading branch information
LiranCohen committed Feb 23, 2024
1 parent fcdf960 commit 7de0ad1
Show file tree
Hide file tree
Showing 12 changed files with 530 additions and 45 deletions.
15 changes: 4 additions & 11 deletions src/connection/socket-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ export class SocketConnection {
JsonRpcErrorCodes.BadRequest,
(error as Error).message
);

return this.send(errorResponse);
};

Expand All @@ -166,16 +165,10 @@ export class SocketConnection {
}

/**
* Sends a JSON encoded Buffer through the Websocket. Accepts a callback, if none is provided an error logger is used.
* Sends a JSON encoded Buffer through the Websocket.
*/
private send(response: JsonRpcResponse | JsonRpcErrorResponse, cb?: (error?: Error) => void): void {
if (!cb) {
cb = (error):void => {
if(error) { log.error('socket send error', error, response); }
}
}

this.socket.send(Buffer.from(JSON.stringify(response)), cb);
private send(response: JsonRpcResponse | JsonRpcErrorResponse): void {
this.socket.send(Buffer.from(JSON.stringify(response)));
}

/**
Expand All @@ -185,7 +178,7 @@ export class SocketConnection {
*/
private createSubscriptionHandler(id: JsonRpcId): (message: MessageEvent) => void {
return (event) => {
const response = createJsonRpcSuccessResponse(id, { reply: { event } });
const response = createJsonRpcSuccessResponse(id, { event });
this.send(response);
}
}
Expand Down
10 changes: 10 additions & 0 deletions src/json-rpc-handlers/subscription/close.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ export const handleSubscriptionsClose: JsonRpcHandler = async (
context,
) => {
const requestId = dwnRequest.id ?? uuidv4();
if (context.socketConnection === undefined) {
const jsonRpcResponse = createJsonRpcErrorResponse(requestId, JsonRpcErrorCodes.InvalidRequest, 'socket connection does not exist');
return { jsonRpcResponse };
}

if (dwnRequest.subscribe === undefined) {
const jsonRpcResponse = createJsonRpcErrorResponse(requestId, JsonRpcErrorCodes.InvalidRequest, 'subscribe options do not exist');
return { jsonRpcResponse };
}

const { socketConnection } = context;
const { id } = dwnRequest.subscribe as { id: JsonRpcId };

Expand Down
20 changes: 10 additions & 10 deletions src/json-rpc-socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import log from 'loglevel';
import { v4 as uuidv4 } from 'uuid';
import WebSocket from 'ws';

import type { JsonRpcRequest, JsonRpcResponse } from "./lib/json-rpc.js";
import type { JsonRpcId, JsonRpcRequest, JsonRpcResponse } from "./lib/json-rpc.js";
import { createJsonRpcSubscribeRequest } from "./lib/json-rpc.js";

// These were arbitrarily chosen, but can be modified via connect options
Expand Down Expand Up @@ -89,7 +89,7 @@ export class JsonRpcSocket {
* Sends a JSON-RPC request through the socket and keeps a listener open to read associated responses as they arrive.
* Returns a close method to clean up the listener.
*/
async subscribe(request: JsonRpcRequest, listener: (response: JsonRpcResponse) => void): Promise<{
async subscribe(request: JsonRpcRequest, listener: (response: JsonRpcResponse) => void): Promise<{
response: JsonRpcResponse;
close?: () => Promise<void>;
}> {
Expand All @@ -109,8 +109,8 @@ export class JsonRpcSocket {
if (jsonRpcResponse.error !== undefined) {
// remove the event listener upon receipt of a JSON RPC Error.
this.socket.removeEventListener('message', messageHandler);
this.closeSubscription(subscriptionId);
}

listener(jsonRpcResponse);
}
};
Expand All @@ -125,12 +125,7 @@ export class JsonRpcSocket {
// clean up listener and create a `rpc.subscribe.close` message to use when closing this JSON RPC subscription
const close = async (): Promise<void> => {
this.socket.removeEventListener('message', messageHandler);
const requestId = uuidv4();
const request = createJsonRpcSubscribeRequest(requestId, 'rpc.subscribe.close', {}, subscriptionId)
const response = await this.request(request);
if (response.error) {
throw response.error;
}
await this.closeSubscription(subscriptionId);
}

return {
Expand All @@ -139,11 +134,16 @@ export class JsonRpcSocket {
}
}

private closeSubscription(id: JsonRpcId): Promise<JsonRpcResponse> {
const requestId = uuidv4();
const request = createJsonRpcSubscribeRequest(requestId, 'rpc.subscribe.close', {}, id);
return this.request(request);
}

/**
* Sends a JSON-RPC request through the socket. You must subscribe to a message listener separately to capture the response.
*/
send(request: JsonRpcRequest):void {
this.socket.send(Buffer.from(JSON.stringify(request)));
return;
}
}
4 changes: 2 additions & 2 deletions src/lib/json-rpc-router.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Dwn, MessageEvent } from '@tbd54566975/dwn-sdk-js';
import type { Dwn, EventSubscriptionHandler } from '@tbd54566975/dwn-sdk-js';

import type { Readable } from 'node:stream';

Expand All @@ -13,7 +13,7 @@ export type RequestContext = {
/** The JsonRpcId of the subscription handler */
id: JsonRpcId;
/** The `MessageEvent` handler associated with a subscription request, only used in `ws` requests */
subscriptionHandler: (message: MessageEvent) => void;
subscriptionHandler: EventSubscriptionHandler;
}
/** The `Readable` stream associated with a `RecordsWrite` request only used in `http` requests */
dataStream?: Readable;
Expand Down
8 changes: 0 additions & 8 deletions src/lib/json-rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,3 @@ export const createJsonRpcSuccessResponse = (
result: result ?? null,
};
};

export function parseJson(text: string): object | null {
try {
return JSON.parse(text);
} catch {
return null;
}
}
45 changes: 45 additions & 0 deletions tests/connection/connection-manager.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import type { Dwn } from '@tbd54566975/dwn-sdk-js';

import chaiAsPromised from 'chai-as-promised';
import chai, { expect } from 'chai';

import sinon from 'sinon';
import { WebSocket } from 'ws';
import { getTestDwn } from '../test-dwn.js';
import { InMemoryConnectionManager } from '../../src/connection/connection-manager.js';

chai.use(chaiAsPromised);

describe('InMemoryConnectionManager', () => {
let dwn: Dwn;
let connectionManager: InMemoryConnectionManager;

beforeEach(async () => {
dwn = await getTestDwn({ withEvents: true });
connectionManager = new InMemoryConnectionManager(dwn);
});

afterEach(async () => {
await connectionManager.closeAll();
await dwn.close();
sinon.restore();
});

it('adds connection to the connections map and closes all', async () => {
const socket1 = sinon.createStubInstance(WebSocket);
await connectionManager.connect(socket1);
expect((connectionManager as any).connections.size).to.equal(1);

const socket2 = sinon.createStubInstance(WebSocket);
await connectionManager.connect(socket2);
expect((connectionManager as any).connections.size).to.equal(2);
});

xit('closes all connections', async () => {
const socket = sinon.createStubInstance(WebSocket);
await connectionManager.connect(socket);
expect((connectionManager as any).connections.size).to.equal(1);
await connectionManager.closeAll();
expect((connectionManager as any).connections.size).to.equal(0);
});
});
159 changes: 159 additions & 0 deletions tests/connection/socket-connection.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import type { Dwn } from '@tbd54566975/dwn-sdk-js';

import chaiAsPromised from 'chai-as-promised';
import chai, { expect } from 'chai';

import sinon from 'sinon';
import { WebSocket } from 'ws';
import { SocketConnection } from '../../src/connection/socket-connection.js';
import { getTestDwn } from '../test-dwn.js';
import log from 'loglevel';

chai.use(chaiAsPromised);

describe('SocketConnection', () => {
let dwn: Dwn;

before(async () => {
dwn = await getTestDwn();
});

after(async () => {
await dwn.close();
sinon.restore();
});

it('should assign socket handlers', async () => {
const socket = sinon.createStubInstance(WebSocket);
const connection = new SocketConnection(socket, dwn);
expect(socket.on.callCount).to.equal(4);
expect(socket.on.args.map(arg => arg[0])).to.have.members(['message', 'close', 'error', 'pong']);
await connection.close();
});

it('should add a subscription to the subscription manager map', async () => {
const socket = sinon.createStubInstance(WebSocket);
const connection = new SocketConnection(socket, dwn);
const subscriptionRequest = {
id: 'id',
method: 'method',
params: { param1: 'param' },
close: async ():Promise<void> => {}
}

await connection.addSubscription(subscriptionRequest);
expect((connection as any).subscriptions.size).to.equal(1);
await connection.close();
expect((connection as any).subscriptions.size).to.equal(0);
});

it('should reject a subscription with an Id of an existing subscription', async () => {
const socket = sinon.createStubInstance(WebSocket);
const connection = new SocketConnection(socket, dwn);

const id = 'some-id';

const subscriptionRequest = {
id,
method: 'method',
params: { param1: 'param' },
close: async ():Promise<void> => {}
}

await connection.addSubscription(subscriptionRequest);
expect((connection as any).subscriptions.size).to.equal(1);

const addDuplicatePromise = connection.addSubscription(subscriptionRequest);
await expect(addDuplicatePromise).to.eventually.be.rejectedWith(`the subscription with id ${id} already exists`);
expect((connection as any).subscriptions.size).to.equal(1);
await connection.close();
expect((connection as any).subscriptions.size).to.equal(0);
});

it('should close a subscription and remove it from the connection manager map', async () => {
const socket = sinon.createStubInstance(WebSocket);
const connection = new SocketConnection(socket, dwn);

const id = 'some-id';

const subscriptionRequest = {
id,
method: 'method',
params: { param1: 'param' },
close: async ():Promise<void> => {}
}

await connection.addSubscription(subscriptionRequest);
expect((connection as any).subscriptions.size).to.equal(1);

await connection.closeSubscription(id);
expect((connection as any).subscriptions.size).to.equal(0);

const closeAgainPromise = connection.closeSubscription(id);
await expect(closeAgainPromise).to.eventually.be.rejectedWith(`the subscription with id ${id} was not found`);
await connection.close();
});

it('hasSubscription returns whether a subscription with the id already exists', async () => {
const socket = sinon.createStubInstance(WebSocket);
const connection = new SocketConnection(socket, dwn);
const subscriptionRequest = {
id: 'id',
method: 'method',
params: { param1: 'param' },
close: async ():Promise<void> => {}
}

await connection.addSubscription(subscriptionRequest);
expect((connection as any).subscriptions.size).to.equal(1);
expect(connection.hasSubscription(subscriptionRequest.id)).to.be.true;
expect(connection.hasSubscription('does-not-exist')).to.be.false;

await connection.closeSubscription(subscriptionRequest.id);
expect(connection.hasSubscription(subscriptionRequest.id)).to.be.false;
await connection.close();
});

it('should close if pong is not triggered between heartbeat intervals', async () => {
const socket = sinon.createStubInstance(WebSocket);
const clock = sinon.useFakeTimers();
const connection = new SocketConnection(socket, dwn);
const closeSpy = sinon.spy(connection, 'close');

clock.tick(60_100); // interval has to run twice
clock.restore();

expect(closeSpy.callCount).to.equal(1);
});

it('should not close if pong is called within the heartbeat interval', async () => {
const socket = sinon.createStubInstance(WebSocket);
const clock = sinon.useFakeTimers();
const connection = new SocketConnection(socket, dwn);
const closeSpy = sinon.spy(connection, 'close');

(connection as any).pong(); // trigger a pong
clock.tick(30_100); // first interval

(connection as any).pong(); // trigger a pong
clock.tick(30_100); // second interval

expect(closeSpy.callCount).to.equal(0);

clock.tick(30_100); // another interval without a ping
clock.restore();
expect(closeSpy.callCount).to.equal(1);
});

it('logs an error and closes connection if error is triggered', async () => {
const socket = sinon.createStubInstance(WebSocket);
const connection = new SocketConnection(socket, dwn);
const logSpy = sinon.stub(log, 'error');
const closeSpy = sinon.spy(connection, 'close');

(connection as any).error(new Error('some error'));

expect(logSpy.callCount).to.equal(1);
expect(closeSpy.callCount).to.equal(1);
});
});
Loading

0 comments on commit 7de0ad1

Please sign in to comment.