diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..08d54f1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,17 @@ +# Build +build + +# Node & Dependencies +node_modules +coverage +package-lock.json +pnpm-lock.yaml + +# Build tools specific +npm-debug.log +yarn-error.log + +# Editors specific +.fleet +.idea +.vscode diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..5b02efd --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,9 @@ +# The MIT License + +Copyright 2024 Romain Lanz, contributors + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the 'Software'), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/bin/test.ts b/bin/test.ts new file mode 100644 index 0000000..8fc2a3f --- /dev/null +++ b/bin/test.ts @@ -0,0 +1,10 @@ +import { configure, processCLIArgs, run } from '@japa/runner' +import { assert } from '@japa/assert' + +processCLIArgs(process.argv.splice(2)) +configure({ + files: ['tests/**/*.spec.ts'], + plugins: [assert()], +}) + +void run() diff --git a/package.json b/package.json new file mode 100644 index 0000000..7f8049a --- /dev/null +++ b/package.json @@ -0,0 +1,97 @@ +{ + "name": "@boringnode/transmit", + "description": "A framework agnostic Server-Sent-Event library", + "version": "0.1.0", + "engines": { + "node": ">=20.11.1" + }, + "main": "build/index.js", + "type": "module", + "files": [ + "build" + ], + "exports": { + ".": "./build/index.js", + "./transports": "./build/transports.js", + "./types": "./build/src/types/main.js" + }, + "scripts": { + "build": "pnpm compile", + "clean": "del-cli build", + "compile": "pnpm clean && tsc", + "format": "prettier --write .", + "lint": "eslint . --ext=.ts", + "precompile": "pnpm lint", + "prepublishOnly": "pnpm build", + "pretest": "pnpm lint", + "quick:test": "node --enable-source-maps --loader=ts-node/esm bin/test.ts", + "release": "release-it", + "test": "c8 pnpm quick:test", + "typecheck": "tsc --noEmit", + "version": "npm run build" + }, + "dependencies": { + "@boringnode/bus": "^0.6.0", + "@poppinss/utils": "^6.7.3", + "emittery": "^1.0.3", + "matchit": "^1.1.0" + }, + "devDependencies": { + "@adonisjs/eslint-config": "^1.3.0", + "@adonisjs/prettier-config": "^1.3.0", + "@adonisjs/tsconfig": "^1.3.0", + "@japa/assert": "^3.0.0", + "@japa/runner": "^3.1.4", + "@swc/core": "^1.6.13", + "@types/node": "^20.14.10", + "c8": "^10.1.2", + "del-cli": "^5.1.0", + "eslint": "^8.57.0", + "prettier": "^3.3.2", + "release-it": "^17.5.0", + "ts-node": "^10.9.2", + "tsup": "^8.1.0", + "typescript": "^5.5.3" + }, + "author": "Romain Lanz ", + "license": "MIT", + "keywords": [ + "sse", + "server-sent-event", + "realtime", + "real-time" + ], + "eslintConfig": { + "extends": "@adonisjs/eslint-config/package" + }, + "prettier": "@adonisjs/prettier-config", + "publishConfig": { + "access": "public", + "tag": "latest" + }, + "release-it": { + "git": { + "commitMessage": "chore(release): ${version}", + "tagAnnotation": "v${version}", + "tagName": "v${version}" + }, + "github": { + "release": true, + "releaseName": "v${version}", + "web": true + } + }, + "c8": { + "reporter": [ + "text", + "html" + ], + "exclude": [ + "tests/**" + ] + }, + "volta": { + "node": "20.11.1" + }, + "packageManager": "pnpm@9.5.0+sha512.140036830124618d624a2187b50d04289d5a087f326c9edfc0ccd733d76c4f52c3a313d4fc148794a2a9d81553016004e6742e8cf850670268a7387fc220c903" +} diff --git a/src/storage.ts b/src/storage.ts new file mode 100644 index 0000000..008c109 --- /dev/null +++ b/src/storage.ts @@ -0,0 +1,135 @@ +/* + * @boringnode/transmit + * + * @license MIT + * @copyright Boring Node + */ + +import matchit from 'matchit' +import { Stream } from './stream.js' +import type { Route } from 'matchit' + +export class Storage { + /** + * Channels subscribed to a given stream + */ + #subscriptions = new Map>() + + /** + * Channels subscribed to a given Stream UID + */ + #channelsByUid = new Map>() + + /** + * Secured channels definition + */ + #securedChannelsDefinition: Route[] = [] + + /** + * Secure a channel + */ + secure(channel: string) { + const encodedDefinition = matchit.parse(channel) + + this.#securedChannelsDefinition.push(encodedDefinition) + } + + /** + * Check if a channel is secured and return the matched channel + */ + getSecuredChannelDefinition(channel: string) { + const matchedChannel = matchit.match(channel, this.#securedChannelsDefinition) + + if (matchedChannel.length > 0) { + const params = matchit.exec(channel, matchedChannel) + return { params, channel: matchedChannel[0].old } + } + } + + /** + * Get the number of secured channels + */ + getSecuredChannelCount() { + return this.#securedChannelsDefinition.length + } + + /** + * Get the number of streams + */ + getStreamCount() { + return this.#subscriptions.size + } + + /** + * Add a stream to the storage + */ + add(stream: Stream) { + const channels = new Set() + + this.#subscriptions.set(stream, channels) + this.#channelsByUid.set(stream.getUid(), channels) + } + + /** + * Remove a stream from the storage + */ + remove(stream: Stream) { + this.#subscriptions.delete(stream) + this.#channelsByUid.delete(stream.getUid()) + } + + /** + * Add a channel to a stream + */ + subscribe(uid: string, channel: string) { + const channels = this.#channelsByUid.get(uid) + + if (!channels) return false + + channels.add(channel) + + return true + } + + /** + * Remove a channel from a stream + */ + unsubscribe(uid: string, channel: string) { + const channels = this.#channelsByUid.get(uid) + + if (!channels) return false + + channels.delete(channel) + + return true + } + + /** + * Find all subscribers to a channel + */ + findByChannel(channel: string) { + const subscribers = new Set() + + for (const [stream, streamChannels] of this.#subscriptions) { + if (streamChannels.has(channel)) { + subscribers.add(stream) + } + } + + return subscribers + } + + /** + * Get channels for a given client + */ + getChannelByClient(uid: string) { + return this.#channelsByUid.get(uid) + } + + /** + * Get all subscribers + */ + getAllSubscribers() { + return this.#subscriptions + } +} diff --git a/src/stream.ts b/src/stream.ts new file mode 100644 index 0000000..d55b025 --- /dev/null +++ b/src/stream.ts @@ -0,0 +1,90 @@ +/* + * @boringnode/transmit + * + * @license MIT + * @copyright Boring Node + */ + +import { Transform } from 'node:stream' +import { dataToString } from './utils.js' +import type { IncomingMessage, OutgoingHttpHeaders } from 'node:http' +import type { Broadcastable } from './types/main.js' + +interface Message { + data: Broadcastable +} + +interface WriteHeaders { + writeHead?(statusCode: number, headers?: OutgoingHttpHeaders): WriteHeaders + flushHeaders?(): void +} + +export type HeaderStream = NodeJS.WritableStream & WriteHeaders + +export class Stream extends Transform { + readonly #uid: string + + constructor(uid: string, request?: IncomingMessage) { + super({ objectMode: true }) + + this.#uid = uid + + if (request?.socket) { + request.socket.setKeepAlive(true) + request.socket.setNoDelay(true) + request.socket.setTimeout(0) + } + } + + getUid() { + return this.#uid + } + + pipe( + destination: T, + options?: { end?: boolean }, + forwardHeaders?: Record + ): T { + if (destination.writeHead) { + // @see https://github.com/dunglas/mercure/blob/9e080c8dc9a141d4294412d14efdecfb15bf7f43/subscribe.go#L219 + destination.writeHead(200, { + ...forwardHeaders, + 'Cache-Control': 'private, no-cache, no-store, must-revalidate, max-age=0, no-transform', + 'Connection': 'keep-alive', + 'Content-Type': 'text/event-stream', + 'Expire': '0', + 'Pragma': 'no-cache', + // @see https://www.nginx.com/resources/wiki/start/topics/examples/x-accel/#x-accel-buffering + 'X-Accel-Buffering': 'no', + }) + + destination.flushHeaders?.() + } + + // Some clients (Safari) don't trigger onopen until the first frame is received. + destination.write(':ok\n\n') + return super.pipe(destination, options) + } + + _transform( + message: Message, + _encoding: string, + callback: (error?: Error | null, data?: any) => void + ) { + if (message.data) { + this.push(dataToString(message.data)) + } + + this.push('\n') + + callback() + } + + writeMessage( + message: Message, + encoding?: BufferEncoding, + cb?: (error: Error | null | undefined) => void + ): boolean { + return this.write(message, encoding, cb) + } +} diff --git a/src/stream_manager.ts b/src/stream_manager.ts new file mode 100644 index 0000000..1103743 --- /dev/null +++ b/src/stream_manager.ts @@ -0,0 +1,145 @@ +/* + * @boringnode/transmit + * + * @license MIT + * @copyright Boring Node + */ + +import { Stream } from './stream.js' +import { Storage } from './storage.js' +import type { IncomingMessage, ServerResponse } from 'node:http' + +type AccessCallback = (context: T, params?: any) => Promise | boolean + +interface OnConnectParams { + uid: string + context?: T +} + +interface OnDisconnectParams { + uid: string + context?: T +} + +interface OnSubscribeParams { + uid: string + channel: string + context?: T +} + +interface OnUnsubscribeParams { + uid: string + channel: string + context?: T +} + +export interface CreateStreamParams { + uid: string + request: IncomingMessage + response: ServerResponse + context?: T + onConnect?: (params: OnConnectParams) => void + onDisconnect?: (params: OnDisconnectParams) => void +} + +export interface SubscribeParams { + uid: string + channel: string + context?: T + onSubscribe?: (params: OnSubscribeParams) => void +} + +export interface UnsubscribeParams { + uid: string + channel: string + context?: T + onUnsubscribe?: (params: OnUnsubscribeParams) => void +} + +export class StreamManager { + #storage: Storage + + #securedChannels = new Map() + + constructor() { + this.#storage = new Storage() + } + + createStream({ + uid, + context, + request, + response, + onConnect, + onDisconnect, + }: CreateStreamParams) { + const stream = new Stream(uid, request) + stream.pipe(response, undefined, response.getHeaders()) + + this.#storage.add(stream) + + onConnect?.({ uid, context }) + + response.on('close', () => { + this.#storage.remove(stream) + onDisconnect?.({ uid, context }) + }) + + return stream + } + + async subscribe({ uid, channel, context, onSubscribe }: SubscribeParams) { + const canAccessChannel = await this.verifyAccess(channel, context) + + if (!canAccessChannel) { + return false + } + + this.#storage.subscribe(uid, channel) + onSubscribe?.({ uid, channel, context }) + + return true + } + + async unsubscribe({ uid, channel, context, onUnsubscribe }: UnsubscribeParams) { + this.#storage.unsubscribe(uid, channel) + onUnsubscribe?.({ uid, channel, context }) + + return true + } + + authorize(channel: string, callback: AccessCallback) { + this.#storage.secure(channel) + this.#securedChannels.set(channel, callback) + } + + async verifyAccess(channel: string, context: T) { + const definitions = this.#storage.getSecuredChannelDefinition(channel) + + if (!definitions) { + return true + } + + const callback = this.#securedChannels.get(definitions.channel) + + try { + return await callback!(context, definitions.params) + } catch (e) { + return false + } + } + + /** + * Get all subscribers + */ + getAllSubscribers() { + return this.#storage.getAllSubscribers() + } + + /** + * Find all subscribers to a channel + */ + findByChannel(channel: string) { + return this.#storage.findByChannel(channel) + } +} diff --git a/src/transmit.ts b/src/transmit.ts new file mode 100644 index 0000000..66feebf --- /dev/null +++ b/src/transmit.ts @@ -0,0 +1,223 @@ +/* + * @boringnode/transmit + * + * @license MIT + * @copyright Boring Node + */ + +import Emittery from 'emittery' +import { Bus } from '@boringnode/bus' +import string from '@poppinss/utils/string' +import { StreamManager } from './stream_manager.js' +import { TransportMessageType } from './transport_message_type.js' +import type { Transport } from '@boringnode/bus/types/main' +import type { Broadcastable, TransmitConfig } from './types/main.js' +import type { CreateStreamParams, SubscribeParams, UnsubscribeParams } from './stream_manager.js' +import { clearInterval } from 'node:timers' + +export interface TransmitLifecycleHooks { + connect: { uid: string; context: T } + disconnect: { uid: string; context: T } + broadcast: { channel: string; payload: Broadcastable } + subscribe: { uid: string; channel: string; context: T } + unsubscribe: { uid: string; channel: string; context: T } +} + +type TransmitMessage = + | { + type: typeof TransportMessageType.Broadcast + channel: string + payload: Broadcastable + } + | { + type: typeof TransportMessageType.Subscribe + channel: string + payload: { uid: string } + } + | { + type: typeof TransportMessageType.Unsubscribe + channel: string + payload: { uid: string } + } + +export class Transmit { + /** + * The configuration for the transmit instance + */ + #config: TransmitConfig + + /** + * The stream manager instance + */ + readonly #manager: StreamManager + + /** + * The transport channel to synchronize messages and subscriptions + * across multiple instance. + */ + readonly #transportChannel: string + + /** + * The transport provider to synchronize messages and subscriptions + * across multiple instance. + */ + readonly #bus: Bus | null + + /** + * The emittery instance to emit events. + */ + #emittery: Emittery> + + /** + * The interval to send ping messages to all the subscribers. + */ + readonly #interval: NodeJS.Timeout | undefined + + constructor(config: TransmitConfig, transport?: Transport | null) { + this.#config = config + this.#manager = new StreamManager() + this.#emittery = new Emittery() + this.#bus = transport ? new Bus(transport, { retryQueue: { enabled: true } }) : null + this.#transportChannel = this.#config.transport?.channel ?? 'transmit::broadcast' + + // Subscribe to the transport channel and handle incoming messages + void this.#bus?.subscribe(this.#transportChannel, (message) => { + const { type, channel, payload } = message + + if (type === TransportMessageType.Broadcast) { + void this.#broadcastLocally(channel, payload) + } else if (type === TransportMessageType.Subscribe) { + void this.#manager.subscribe({ uid: payload.uid, channel }) + } else if (type === TransportMessageType.Unsubscribe) { + void this.#manager.unsubscribe({ uid: payload.uid, channel }) + } + }) + + // Start the ping interval if configured + if (this.#config.pingInterval) { + const intervalValue = + typeof this.#config.pingInterval === 'number' + ? this.#config.pingInterval + : string.milliseconds.parse(this.#config.pingInterval) + + this.#interval = setInterval(() => this.#ping(), intervalValue) + } + } + + getManager() { + return this.#manager + } + + createStream(params: Omit, 'onConnect' | 'onDisconnect'>) { + return this.#manager.createStream({ + ...params, + onConnect: () => { + void this.#emittery.emit('connect', { + uid: params.uid, + context: params.context, + }) + }, + onDisconnect: () => { + void this.#emittery.emit('disconnect', { + uid: params.uid, + context: params.context, + }) + }, + }) + } + + subscribe(params: Omit, 'onSubscribe'>) { + return this.#manager.subscribe({ + ...params, + onSubscribe: ({ uid, channel, context }) => { + void this.#emittery.emit('subscribe', { + uid, + channel, + context, + }) + + void this.#bus?.publish(this.#transportChannel, { + type: TransportMessageType.Subscribe, + channel, + payload: { uid }, + }) + }, + }) + } + + unsubscribe(params: Omit, 'onUnsubscribe'>) { + return this.#manager.unsubscribe({ + ...params, + onUnsubscribe: ({ uid, channel, context }) => { + void this.#emittery.emit('unsubscribe', { + uid, + channel, + context, + }) + + void this.#bus?.publish(this.#transportChannel, { + type: TransportMessageType.Unsubscribe, + channel, + payload: { uid }, + }) + }, + }) + } + + #broadcastLocally(channel: string, payload: Broadcastable, senderUid?: string | string[]) { + const subscribers = this.#manager.findByChannel(channel) + + for (const subscriber of subscribers) { + if ( + Array.isArray(senderUid) + ? senderUid.includes(subscriber.getUid()) + : senderUid === subscriber.getUid() + ) { + continue + } + + subscriber.writeMessage({ data: { channel, payload } }) + } + } + + broadcastExcept(channel: string, payload: Broadcastable, senderUid: string | string[]) { + return this.#broadcastLocally(channel, payload, senderUid) + } + + broadcast(channel: string, payload?: Broadcastable) { + if (!payload) { + payload = {} + } + + void this.#bus?.publish(this.#transportChannel, { + type: TransportMessageType.Broadcast, + channel, + payload, + }) + + this.#broadcastLocally(channel, payload) + + void this.#emittery.emit('broadcast', { channel, payload }) + } + + on, C>( + event: T, + callback: (payload: TransmitLifecycleHooks[T]) => void + ) { + return this.#emittery.on(event, callback) + } + + async shutdown() { + if (this.#interval) { + clearInterval(this.#interval) + } + + await this.#bus?.disconnect() + } + + #ping() { + for (const [stream] of this.#manager.getAllSubscribers()) { + stream.writeMessage({ data: { channel: '$$transmit/ping', payload: {} } }) + } + } +} diff --git a/src/transport_message_type.ts b/src/transport_message_type.ts new file mode 100644 index 0000000..06e440c --- /dev/null +++ b/src/transport_message_type.ts @@ -0,0 +1,5 @@ +export const TransportMessageType = { + Broadcast: 1, + Subscribe: 2, + Unsubscribe: 3, +} as const diff --git a/src/types/main.ts b/src/types/main.ts new file mode 100644 index 0000000..20441e9 --- /dev/null +++ b/src/types/main.ts @@ -0,0 +1,43 @@ +/* + * @boringnode/transmit + * + * @license MIT + * @copyright Boring Node + */ + +import type { TransportFactory } from '@boringnode/bus/types/main' + +/** + * A Duration can be a number in milliseconds or a string formatted as a duration + * + * Formats accepted are : + * - Simple number in milliseconds + * - String formatted as a duration. Uses https://github.com/lukeed/ms under the hood + */ +export type Duration = number | string + +/** + * A Broadcastable is a value that can be broadcasted to other clients + */ +export type Broadcastable = + | { [key: string]: Broadcastable } + | string + | number + | boolean + | null + | Broadcastable[] + +export interface TransmitConfig { + /** + * The interval in milliseconds to send ping messages to the client + */ + pingInterval?: Duration | false + + /** + * The transport driver to use for transmitting messages + */ + transport: null | { + driver: TransportFactory + channel?: string + } +} diff --git a/src/utils.ts b/src/utils.ts new file mode 100644 index 0000000..d3f4e95 --- /dev/null +++ b/src/utils.ts @@ -0,0 +1,23 @@ +/* + * @boringnode/transmit + * + * @license MIT + * @copyright Boring Node + */ + +import { Broadcastable } from './types/main.js' + +export function dataToString(data: Broadcastable): string { + if (typeof data === 'object') { + return dataToString(JSON.stringify(data)) + } + + if (typeof data === 'number' || typeof data === 'boolean') { + return `data: ${data}\n` + } + + return data + .split(/\r\n|\r|\n/) + .map((line) => `data: ${line}\n`) + .join('') +} diff --git a/tests/fixtures/stream.ts b/tests/fixtures/stream.ts new file mode 100644 index 0000000..e9c41df --- /dev/null +++ b/tests/fixtures/stream.ts @@ -0,0 +1,16 @@ +import { randomUUID } from 'node:crypto' +import { IncomingMessage, ServerResponse } from 'node:http' +import { Socket } from '../mocks/socket.js' +import type { Transmit } from '../../src/transmit.js' + +export function makeStream(transmit: Transmit, uid = randomUUID()) { + const socket = new Socket() + const request = new IncomingMessage(socket) + const response = new ServerResponse(request) + + return transmit.createStream({ + uid, + request, + response, + }) +} diff --git a/tests/fixtures/transmit.ts b/tests/fixtures/transmit.ts new file mode 100644 index 0000000..6f36e0f --- /dev/null +++ b/tests/fixtures/transmit.ts @@ -0,0 +1,22 @@ +import { memory } from '@boringnode/bus/transports/memory' +import { Transmit } from '../../src/transmit.js' + +export function makeTransport() { + const transport = memory()() + + return { + transport, + driver: memory(), + } +} + +export function makeTransmitWithTransport(params: ReturnType) { + return new Transmit( + { + transport: { + driver: params.driver, + }, + }, + params.transport + ) +} diff --git a/tests/mocks/sink.ts b/tests/mocks/sink.ts new file mode 100644 index 0000000..31ae5da --- /dev/null +++ b/tests/mocks/sink.ts @@ -0,0 +1,26 @@ +import { Writable } from 'node:stream' +import type { HeaderStream } from '../../src/stream.js' + +export class Sink extends Writable implements HeaderStream { + #chunks: Buffer[] = [] + + constructor() { + super({ objectMode: true }) + } + + assertWriteHead(assertion: (statusCode: number, headers: any) => void) { + // @ts-expect-error - Mocking the writeHead method + this.writeHead = (statusCode, headers) => { + assertion(statusCode, headers) + } + } + + get content() { + return this.#chunks.join('') + } + + _write(chunk: Buffer, _encoding: BufferEncoding, callback: (error?: Error | null) => void): void { + this.#chunks.push(chunk) + callback() + } +} diff --git a/tests/mocks/socket.ts b/tests/mocks/socket.ts new file mode 100644 index 0000000..d39dc00 --- /dev/null +++ b/tests/mocks/socket.ts @@ -0,0 +1,34 @@ +import { Socket as NodeSocket } from 'node:net' + +export class Socket extends NodeSocket { + #keepAlive = false + #noDelay = false + #timeout = 0 + + getKeepAlive() { + return this.#keepAlive + } + + getNoDelay() { + return this.#noDelay + } + + getTimeout() { + return this.#timeout + } + + setKeepAlive(enable?: boolean, initialDelay?: number): this { + this.#keepAlive = enable === true + return super.setKeepAlive(enable, initialDelay) + } + + setNoDelay(noDelay?: boolean): this { + this.#noDelay = noDelay === true + return super.setNoDelay(noDelay) + } + + setTimeout(timeout: number, callback?: () => void): this { + this.#timeout = timeout + return super.setTimeout(timeout, callback) + } +} diff --git a/tests/storage.spec.ts b/tests/storage.spec.ts new file mode 100644 index 0000000..14f7743 --- /dev/null +++ b/tests/storage.spec.ts @@ -0,0 +1,141 @@ +/* + * @boringnode/transmit + * + * @license MIT + * @copyright Boring Node + */ + +import { randomUUID } from 'node:crypto' +import { test } from '@japa/runner' +import { Stream } from '../src/stream.js' +import { Storage } from '../src/storage.js' + +test.group('Storage', () => { + test('should secure a channel', async ({ assert }) => { + const storage = new Storage() + + storage.secure('foo') + assert.equal(storage.getSecuredChannelCount(), 1) + }) + + test('should get the secured channel definition', async ({ assert }) => { + const storage = new Storage() + + storage.secure('foo') + const definition = storage.getSecuredChannelDefinition('foo') + + assert.exists(definition) + assert.equal(definition!.channel, 'foo') + }) + + test('should not get the secured channel definition using params', async ({ assert }) => { + const storage = new Storage() + + storage.secure('foo/:id') + const definition = storage.getSecuredChannelDefinition('foo/1') + + assert.exists(definition) + assert.equal(definition!.channel, 'foo/:id') + assert.equal(definition!.params.id, '1') + }) + + test('should add a stream to the storage', async ({ assert }) => { + const stream1 = new Stream(randomUUID()) + const storage = new Storage() + + storage.add(stream1) + assert.equal(storage.getStreamCount(), 1) + + const stream2 = new Stream(randomUUID()) + storage.add(stream2) + assert.equal(storage.getStreamCount(), 2) + }) + + test('should remove a stream from the storage', async ({ assert }) => { + const stream = new Stream(randomUUID()) + const storage = new Storage() + + storage.add(stream) + assert.equal(storage.getStreamCount(), 1) + + storage.remove(stream) + assert.equal(storage.getStreamCount(), 0) + }) + + test('should subscribe a channel to a stream', async ({ assert }) => { + const stream = new Stream(randomUUID()) + const storage = new Storage() + + storage.add(stream) + + assert.isTrue(storage.subscribe(stream.getUid(), 'foo')) + assert.isTrue(storage.subscribe(stream.getUid(), 'bar')) + }) + + test('should not subscribe a channel to a stream that does not exist', async ({ assert }) => { + const stream = new Stream(randomUUID()) + const storage = new Storage() + + assert.isFalse(storage.subscribe(stream.getUid(), 'foo')) + }) + + test('should unsubscribe a channel from a stream', async ({ assert }) => { + const stream = new Stream(randomUUID()) + const storage = new Storage() + + storage.add(stream) + storage.subscribe(stream.getUid(), 'foo') + + assert.isTrue(storage.unsubscribe(stream.getUid(), 'foo')) + }) + + test('should not unsubscribe a channel from a stream that does not exist', async ({ assert }) => { + const stream = new Stream(randomUUID()) + const storage = new Storage() + + assert.isFalse(storage.unsubscribe(stream.getUid(), 'foo')) + }) + + test('should find all subscribers to a channel', async ({ assert }) => { + const stream1 = new Stream(randomUUID()) + const stream2 = new Stream(randomUUID()) + const storage = new Storage() + + storage.add(stream1) + storage.add(stream2) + + storage.subscribe(stream1.getUid(), 'foo') + storage.subscribe(stream2.getUid(), 'foo') + + const subscribers = storage.findByChannel('foo') + assert.equal(subscribers.size, 2) + }) + + test('should return the channel of a client', async ({ assert }) => { + const stream = new Stream(randomUUID()) + const storage = new Storage() + + storage.add(stream) + storage.subscribe(stream.getUid(), 'foo') + + const channels = storage.getChannelByClient(stream.getUid()) + + assert.exists(channels) + assert.isTrue(channels!.has('foo')) + }) + + test('should return all subscribers', async ({ assert }) => { + const stream1 = new Stream(randomUUID()) + const stream2 = new Stream(randomUUID()) + const storage = new Storage() + + storage.add(stream1) + storage.add(stream2) + + storage.subscribe(stream1.getUid(), 'foo') + storage.subscribe(stream2.getUid(), 'foo') + + const subscribers = storage.getAllSubscribers() + assert.equal(subscribers.size, 2) + }) +}) diff --git a/tests/stream.spec.ts b/tests/stream.spec.ts new file mode 100644 index 0000000..9b6ee47 --- /dev/null +++ b/tests/stream.spec.ts @@ -0,0 +1,93 @@ +/* + * @boringnode/transmit + * + * @license MIT + * @copyright Boring Node + */ + +import { randomUUID } from 'node:crypto' +import { IncomingMessage } from 'node:http' +import { test } from '@japa/runner' +import { Stream } from '../src/stream.js' +import { Sink } from './mocks/sink.js' +import { Socket } from './mocks/socket.js' + +test.group('Stream', () => { + test('should get back the uid', async ({ assert }) => { + const uid = randomUUID() + const stream = new Stream(uid) + + assert.equal(stream.getUid(), uid) + }) + + test('should write multiple chunks to the stream', async ({ assert }) => { + const stream = new Stream(randomUUID()) + const sink = new Sink() + stream.pipe(sink) + + stream.writeMessage({ data: { channel: 'foo', payload: 'bar' } }) + stream.writeMessage({ data: { channel: 'baz', payload: 'qux' } }) + + assert.equal( + sink.content, + [ + `:ok\n\n`, + `data: {"channel":"foo","payload":"bar"}\n\n`, + `data: {"channel":"baz","payload":"qux"}\n\n`, + ].join('') + ) + }) + + test('should sets headers on the response', async ({ assert }) => { + assert.plan(2) + + const stream = new Stream(randomUUID()) + const sink = new Sink() + + sink.assertWriteHead((statusCode, headers) => { + assert.equal(statusCode, 200) + assert.deepEqual(headers, { + 'Cache-Control': 'private, no-cache, no-store, must-revalidate, max-age=0, no-transform', + 'Connection': 'keep-alive', + 'Content-Type': 'text/event-stream', + 'Expire': '0', + 'Pragma': 'no-cache', + 'X-Accel-Buffering': 'no', + }) + }) + + stream.pipe(sink) + }) + + test('should forward headers to the response', async ({ assert }) => { + assert.plan(2) + + const stream = new Stream(randomUUID()) + const sink = new Sink() + + sink.assertWriteHead((statusCode, headers) => { + assert.equal(statusCode, 200) + assert.deepEqual(headers, { + 'Cache-Control': 'private, no-cache, no-store, must-revalidate, max-age=0, no-transform', + 'Connection': 'keep-alive', + 'Content-Type': 'text/event-stream', + 'Expire': '0', + 'Pragma': 'no-cache', + 'X-Accel-Buffering': 'no', + 'X-Foo': 'bar', + }) + }) + + stream.pipe(sink, undefined, { 'X-Foo': 'bar' }) + }) + + test('should set the keep alive, no delay and timeout on the socket', async ({ assert }) => { + const socket = new Socket() + const incomingMessage = new IncomingMessage(socket) + new Stream(randomUUID(), incomingMessage) + + assert.isTrue(socket.getKeepAlive()) + assert.isTrue(socket.getNoDelay()) + assert.equal(socket.getTimeout(), 0) + }) +}) diff --git a/tests/stream_manager.spec.ts b/tests/stream_manager.spec.ts new file mode 100644 index 0000000..26a20b7 --- /dev/null +++ b/tests/stream_manager.spec.ts @@ -0,0 +1,222 @@ +/* + * @boringnode/transmit + * + * @license MIT + * @copyright Boring Node + */ + +import { randomUUID } from 'node:crypto' +import { test } from '@japa/runner' +import { Stream } from '../src/stream.js' +import { StreamManager } from '../src/stream_manager.js' +import { Socket } from './mocks/socket.js' +import { IncomingMessage, ServerResponse } from 'node:http' + +test.group('StreamManager', () => { + test('should create stream', async ({ assert }) => { + const socket = new Socket() + const manager = new StreamManager() + const request = new IncomingMessage(socket) + const response = new ServerResponse(request) + + let channelConnected = false + + const stream = manager.createStream({ + uid: randomUUID(), + request, + response, + onConnect() { + channelConnected = true + }, + }) + + assert.instanceOf(stream, Stream) + assert.isTrue(channelConnected) + }) + + test('should remove stream if the response end', async ({ assert }) => { + const socket = new Socket() + const manager = new StreamManager() + const request = new IncomingMessage(socket) + const response = new ServerResponse(request) + + let channelDisconnected = false + + manager.createStream({ + uid: randomUUID(), + request, + response, + onDisconnect() { + channelDisconnected = true + }, + }) + + response.emit('close') + + assert.isTrue(channelDisconnected) + }) + + test('should authorize channel', async ({ assert }) => { + const manager = new StreamManager() + + manager.authorize('foo', () => true) + manager.authorize('bar', () => false) + + assert.isTrue(await manager.verifyAccess('foo', {})) + assert.isFalse(await manager.verifyAccess('bar', {})) + }) + + test('should return true if channel is not secured', async ({ assert }) => { + const manager = new StreamManager() + + assert.isTrue(await manager.verifyAccess('foo', {})) + }) + + test('should return false if callback throws an error', async ({ assert }) => { + const manager = new StreamManager() + + manager.authorize('foo', () => { + throw new Error('Error') + }) + + assert.isFalse(await manager.verifyAccess('foo', {})) + }) + + test('should send context to the callback', async ({ assert }) => { + assert.plan(2) + + const manager = new StreamManager() + const context = { foo: 'bar' } + + manager.authorize('foo', (ctx) => { + assert.deepEqual(ctx, context) + return true + }) + + assert.isTrue(await manager.verifyAccess('foo', context)) + }) + + test('should retrieve params from the channel', async ({ assert }) => { + const manager = new StreamManager() + + manager.authorize('users/:id', (_ctx, params) => { + assert.deepEqual(params, { id: '1' }) + return true + }) + + assert.isTrue(await manager.verifyAccess('users/1', {})) + }) + + test('should subscribe to a channel', async ({ assert }) => { + const manager = new StreamManager() + + assert.isTrue(await manager.subscribe({ uid: randomUUID(), channel: 'foo', context: {} })) + }) + + test('should not subscribe to a channel if not authorized', async ({ assert }) => { + const manager = new StreamManager() + + manager.authorize('foo', () => false) + + assert.isFalse(await manager.subscribe({ uid: randomUUID(), channel: 'foo', context: {} })) + }) + + test('should call onSubscribe callback', async ({ assert }) => { + const manager = new StreamManager() + let subscribed = false + + await manager.subscribe({ + uid: randomUUID(), + channel: 'foo', + context: {}, + onSubscribe() { + subscribed = true + }, + }) + + assert.isTrue(subscribed) + }) + + test('should unsubscribe from a channel', async ({ assert }) => { + const manager = new StreamManager() + + await manager.subscribe({ uid: randomUUID(), channel: 'foo', context: {} }) + assert.isTrue(await manager.unsubscribe({ uid: randomUUID(), channel: 'foo', context: {} })) + }) + + test('should call onUnsubscribe callback', async ({ assert }) => { + const manager = new StreamManager() + let unsubscribed = false + + await manager.subscribe({ uid: randomUUID(), channel: 'foo', context: {} }) + + await manager.unsubscribe({ + uid: randomUUID(), + channel: 'foo', + context: {}, + onUnsubscribe() { + unsubscribed = true + }, + }) + + assert.isTrue(unsubscribed) + }) + + test('should get all subscribers', async ({ assert }) => { + const manager = new StreamManager() + + const socket1 = new Socket() + const request1 = new IncomingMessage(socket1) + const response1 = new ServerResponse(request1) + const stream1 = manager.createStream({ + uid: randomUUID(), + request: request1, + response: response1, + }) + + const socket2 = new Socket() + const request2 = new IncomingMessage(socket2) + const response2 = new ServerResponse(request2) + const stream2 = manager.createStream({ + uid: randomUUID(), + request: request2, + response: response2, + }) + + await manager.subscribe({ uid: stream1.getUid(), channel: 'foo' }) + await manager.subscribe({ uid: stream2.getUid(), channel: 'bar' }) + + const subscribers = manager.getAllSubscribers() + + assert.equal(subscribers.size, 2) + }) + + test('should find subscribers for a given channel', async ({ assert }) => { + const manager = new StreamManager() + + const socket1 = new Socket() + const request1 = new IncomingMessage(socket1) + const response1 = new ServerResponse(request1) + const stream1 = manager.createStream({ + uid: randomUUID(), + request: request1, + response: response1, + }) + + const socket2 = new Socket() + const request2 = new IncomingMessage(socket2) + const response2 = new ServerResponse(request2) + const stream2 = manager.createStream({ + uid: randomUUID(), + request: request2, + response: response2, + }) + + await manager.subscribe({ uid: stream1.getUid(), channel: 'foo' }) + await manager.subscribe({ uid: stream2.getUid(), channel: 'foo' }) + + const subscribers = manager.findByChannel('foo') + + assert.equal(subscribers.size, 2) + }) +}) diff --git a/tests/transmit.spec.ts b/tests/transmit.spec.ts new file mode 100644 index 0000000..fe0fa05 --- /dev/null +++ b/tests/transmit.spec.ts @@ -0,0 +1,316 @@ +/* + * @boringnode/transmit + * + * @license MIT + * @copyright Boring Node + */ + +import { randomUUID } from 'node:crypto' +import { test } from '@japa/runner' +import { Transmit } from '../src/transmit.js' +import { StreamManager } from '../src/stream_manager.js' +import { makeStream } from './fixtures/stream.js' +import { TransportMessageType } from '../src/transport_message_type.js' +import { makeTransmitWithTransport, makeTransport } from './fixtures/transmit.js' + +test.group('Transmit', () => { + test('should return the manager instance', async ({ assert }) => { + const transmit = new Transmit({ + transport: null, + }) + + assert.instanceOf(transmit.getManager(), StreamManager) + }) + + test('should emit an connect event', async ({ assert }, done) => { + assert.plan(2) + + const transmit = new Transmit({ + transport: null, + }) + + const uid = randomUUID() + let connected = false + + transmit.on('connect', (params) => { + connected = true + + assert.equal(params.uid, uid) + }) + + makeStream(transmit, uid) + + setTimeout(() => { + assert.isTrue(connected) + done() + }, 0) + }).waitForDone() + + test('should emit an subscribe event', async ({ assert }) => { + assert.plan(3) + + const transmit = new Transmit({ + transport: null, + }) + + const uid = randomUUID() + let subscribed = false + + transmit.on('subscribe', (params) => { + subscribed = true + + assert.equal(params.uid, uid) + assert.equal(params.channel, 'users/1') + }) + + await transmit.subscribe({ + uid, + channel: 'users/1', + }) + + assert.isTrue(subscribed) + }) + + test('should emit an unsubscribe event', async ({ assert }) => { + assert.plan(3) + + const transmit = new Transmit({ + transport: null, + }) + + const uid = randomUUID() + let unsubscribed = false + + transmit.on('unsubscribe', (params) => { + unsubscribed = true + + assert.equal(params.uid, uid) + assert.equal(params.channel, 'users/1') + }) + + await transmit.unsubscribe({ + uid, + channel: 'users/1', + }) + + assert.isTrue(unsubscribed) + }) + + test('should emit a broadcast event when a message is broadcasted', async ({ assert }, done) => { + assert.plan(3) + + const transmit = new Transmit({ + transport: null, + }) + + const payload = { foo: 'bar' } + + let broadcasted = false + + transmit.on('broadcast', (params) => { + broadcasted = true + + assert.equal(params.channel, 'users/1') + assert.equal(params.payload, payload) + }) + + transmit.broadcast('users/1', payload) + + setTimeout(() => { + assert.isTrue(broadcasted) + done() + }, 0) + }).waitForDone() + + test('should ping all subscribers', async ({ assert, cleanup }, done) => { + assert.plan(1) + + const transmit = new Transmit({ + transport: null, + pingInterval: 100, + }) + + cleanup(() => transmit.shutdown()) + + const stream = makeStream(transmit, randomUUID()) + + stream.on('data', (message) => { + //? Ignore the first message + if (message === '\n') return + + assert.include(message, '$$transmit/ping') + done() + }) + }).waitForDone() + + test('should broadcast a message to all listening clients', async ({ assert }) => { + assert.plan(1) + + const transmit = new Transmit({ + transport: null, + }) + + const stream = makeStream(transmit) + const stream2 = makeStream(transmit) + + await transmit.subscribe({ + uid: stream.getUid(), + channel: 'channel1', + }) + + let dataReceived = false + stream.on('data', (message) => { + //? Ignore the first message + if (message === '\n') return + + dataReceived = true + }) + + stream2.on('data', () => { + assert.fail('Should not receive the broadcasted message') + }) + + transmit.broadcast('channel1', { message: 'hello' }) + + assert.isTrue(dataReceived) + }) + + test('should broadcast a message to all listening clients except the sender', async ({ + assert, + }) => { + assert.plan(1) + + const transmit = new Transmit({ + transport: null, + }) + + const stream = makeStream(transmit) + const stream2 = makeStream(transmit) + + await transmit.subscribe({ + uid: stream.getUid(), + channel: 'channel1', + }) + + await transmit.subscribe({ + uid: stream2.getUid(), + channel: 'channel1', + }) + + let dataReceived = false + stream.on('data', (message) => { + //? Ignore the first message + if (message === '\n') return + + dataReceived = true + }) + + stream2.on('data', () => { + assert.fail('Should not receive the broadcasted message') + }) + + transmit.broadcastExcept('channel1', { message: 'hello' }, stream2.getUid()) + + assert.isTrue(dataReceived) + }) + + test('should not broadcast to ourself when sending to the bus', async ({ assert }) => { + const transport = makeTransport() + const transmit = makeTransmitWithTransport(transport) + + const stream = makeStream(transmit) + + await transmit.subscribe({ + uid: stream.getUid(), + channel: 'channel1', + }) + + transmit.broadcast('channel1', { message: 'hello' }) + + assert.lengthOf(transport.transport.receivedMessages, 0) + }) + + test('should broadcast to the bus when a client subscribe to a channel', async ({ assert }) => { + const transport = makeTransport() + const transmit = makeTransmitWithTransport(transport) + makeTransmitWithTransport(transport) + + const stream = makeStream(transmit) + + await transmit.subscribe({ + uid: stream.getUid(), + channel: 'channel1', + }) + + assert.lengthOf(transport.transport.receivedMessages, 1) + assert.equal(transport.transport.receivedMessages[0].type, TransportMessageType.Subscribe) + }) + + test('should broadcast to the bus when a client unsubscribe a channel', async ({ assert }) => { + const transport = makeTransport() + const transmit = makeTransmitWithTransport(transport) + + makeTransmitWithTransport(transport) + + const stream = makeStream(transmit) + + await transmit.subscribe({ + uid: stream.getUid(), + channel: 'channel1', + }) + + await transmit.unsubscribe({ + uid: stream.getUid(), + channel: 'channel1', + }) + + assert.lengthOf(transport.transport.receivedMessages, 2) + assert.equal(transport.transport.receivedMessages[1].type, TransportMessageType.Unsubscribe) + }) + + test('should broadcast to the bus when sending a message', async ({ assert }) => { + const transport = makeTransport() + const transmit = makeTransmitWithTransport(transport) + makeTransmitWithTransport(transport) + + const stream = makeStream(transmit) + + await transmit.subscribe({ + uid: stream.getUid(), + channel: 'channel1', + }) + + transmit.broadcast('channel1', { message: 'hello' }) + + assert.lengthOf(transport.transport.receivedMessages, 2) + assert.equal(transport.transport.receivedMessages[1].type, TransportMessageType.Broadcast) + }) + + test('second instance should receive the broadcasted message', async ({ assert }) => { + const transport = makeTransport() + const transmit = makeTransmitWithTransport(transport) + const transmit2 = makeTransmitWithTransport(transport) + + const stream = makeStream(transmit) + const stream2 = makeStream(transmit2) + + await transmit.subscribe({ + uid: stream.getUid(), + channel: 'channel1', + }) + + await transmit2.subscribe({ + uid: stream2.getUid(), + channel: 'channel1', + }) + + let dataReceived = false + stream.on('data', () => { + dataReceived = true + }) + + transmit.broadcast('channel1', { message: 'hello' }) + + assert.isTrue(dataReceived) + }) +}) diff --git a/tests/utils.spec.ts b/tests/utils.spec.ts new file mode 100644 index 0000000..cee904c --- /dev/null +++ b/tests/utils.spec.ts @@ -0,0 +1,35 @@ +/* + * @boringnode/transmit + * + * @license MIT + * @copyright Boring Node + */ + +import { test } from '@japa/runner' +import { dataToString } from '../src/utils.js' + +test.group('Utils | dataToString', () => { + test('should transform data when it is an object', ({ assert }) => { + const value = dataToString({ name: 'Romain Lanz' }) + + assert.equal(value, 'data: {"name":"Romain Lanz"}\n') + }) + + test('should transform data when it is a number', ({ assert }) => { + const value = dataToString(42) + + assert.equal(value, 'data: 42\n') + }) + + test('should transform data when it is a boolean', ({ assert }) => { + const value = dataToString(true) + + assert.equal(value, 'data: true\n') + }) + + test('should transform data when it is a string', ({ assert }) => { + const value = dataToString('Hello world') + + assert.equal(value, 'data: Hello world\n') + }) +}) diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..ad0cc44 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,7 @@ +{ + "extends": "@adonisjs/tsconfig/tsconfig.package.json", + "compilerOptions": { + "rootDir": "./", + "outDir": "./build" + } +}