diff --git a/.changeset/yellow-pugs-melt.md b/.changeset/yellow-pugs-melt.md new file mode 100644 index 0000000..fb63cc0 --- /dev/null +++ b/.changeset/yellow-pugs-melt.md @@ -0,0 +1,5 @@ +--- +"next-ws": minor +--- + +Add WebSocket adapter support for multi-instance deployments diff --git a/.github/workflows/redis-adapter-tests.yml b/.github/workflows/redis-adapter-tests.yml new file mode 100644 index 0000000..7b595ef --- /dev/null +++ b/.github/workflows/redis-adapter-tests.yml @@ -0,0 +1,57 @@ +name: Redis Adapter Tests + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + test-redis-adapter: + runs-on: ubuntu-latest + + services: + redis: + image: redis:7-alpine + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + steps: + - uses: actions/checkout@v4 + + - uses: pnpm/action-setup@v4 + with: + version: 10.15.0 + + - uses: actions/setup-node@v4 + with: + node-version: "22" + cache: "pnpm" + + - name: Install dependencies + run: pnpm install --frozen-lockfile + + - name: Build project + run: pnpm build + + - name: Install Playwright browsers + run: pnpm exec playwright install --with-deps chromium + + - name: Run Redis adapter tests + run: pnpm test tests/redis-adapter.test.ts + env: + REDIS_URL: redis://localhost:6379 + CI: true + + - name: Upload test results + if: failure() + uses: actions/upload-artifact@v4 + with: + name: playwright-report + path: tests/.report/ + retention-days: 7 diff --git a/examples/redis-adapter/app/(simple)/api/ws/route.ts b/examples/redis-adapter/app/(simple)/api/ws/route.ts new file mode 100644 index 0000000..c53e6d0 --- /dev/null +++ b/examples/redis-adapter/app/(simple)/api/ws/route.ts @@ -0,0 +1,19 @@ +import { headers } from 'next/headers'; + +export function GET() { + const headers = new Headers(); + headers.set('Connection', 'Upgrade'); + headers.set('Upgrade', 'websocket'); + return new Response('Upgrade Required', { status: 426, headers }); +} + +export async function UPGRADE(client: import('ws').WebSocket) { + await headers(); + + client.send( + JSON.stringify({ + author: 'System', + content: `Connected to instance ${process.env.INSTANCE_ID || 'unknown'}`, + }), + ); +} diff --git a/examples/redis-adapter/app/(simple)/page.tsx b/examples/redis-adapter/app/(simple)/page.tsx new file mode 100644 index 0000000..764a394 --- /dev/null +++ b/examples/redis-adapter/app/(simple)/page.tsx @@ -0,0 +1,16 @@ +'use client'; + +import { MessageList, MessageSubmit, useMessaging } from 'shared/chat-room'; + +export default function Page() { + const [messages, sendMessage] = useMessaging( + () => `ws://${window.location.host}/api/ws`, + ); + + return ( +
+ + +
+ ); +} diff --git a/examples/redis-adapter/app/layout.tsx b/examples/redis-adapter/app/layout.tsx new file mode 100644 index 0000000..6fb2199 --- /dev/null +++ b/examples/redis-adapter/app/layout.tsx @@ -0,0 +1,17 @@ +export default function Layout({ children }: React.PropsWithChildren) { + return ( + + + {children} + + + ); +} diff --git a/examples/redis-adapter/global.js b/examples/redis-adapter/global.js new file mode 100644 index 0000000..5950105 --- /dev/null +++ b/examples/redis-adapter/global.js @@ -0,0 +1 @@ +globalThis.AsyncLocalStorage = require('node:async_hooks').AsyncLocalStorage; diff --git a/examples/redis-adapter/next-env.d.ts b/examples/redis-adapter/next-env.d.ts new file mode 100644 index 0000000..830fb59 --- /dev/null +++ b/examples/redis-adapter/next-env.d.ts @@ -0,0 +1,6 @@ +/// +/// +/// + +// NOTE: This file should not be edited +// see https://nextjs.org/docs/app/api-reference/config/typescript for more information. diff --git a/examples/redis-adapter/package.json b/examples/redis-adapter/package.json new file mode 100644 index 0000000..f6f8df0 --- /dev/null +++ b/examples/redis-adapter/package.json @@ -0,0 +1,20 @@ +{ + "name": "@examples/redis-adapter", + "private": true, + "scripts": { + "dev": "tsx --require=\"./global.js\" server.ts", + "prepare": "next-ws patch" + }, + "dependencies": { + "ioredis": "catalog:", + "next": "catalog:", + "next-ws": "workspace:^", + "react": "catalog:", + "react-dom": "catalog:", + "shared": "workspace:^" + }, + "devDependencies": { + "@types/react": "catalog:", + "tsx": "^4.20.4" + } +} diff --git a/examples/redis-adapter/server.ts b/examples/redis-adapter/server.ts new file mode 100644 index 0000000..8c19dfc --- /dev/null +++ b/examples/redis-adapter/server.ts @@ -0,0 +1,63 @@ +import { Server } from 'node:http'; +import { parse } from 'node:url'; +import Redis from 'ioredis'; +import next from 'next'; +import { + type Adapter, + setAdapter, + setHttpServer, + setWebSocketServer, +} from 'next-ws/server'; +import { WebSocketServer } from 'ws'; + +class RedisAdapter implements Adapter { + private pub = new Redis(process.env.REDIS_URL || 'redis://localhost:6379'); + private sub = new Redis(process.env.REDIS_URL || 'redis://localhost:6379'); + + async broadcast(room: string, message: unknown): Promise { + const messageStr = + typeof message === 'string' + ? message + : Buffer.isBuffer(message) + ? message.toString('utf-8') + : JSON.stringify(message); + await this.pub.publish(room, messageStr); + } + + onMessage(room: string, handler: (message: unknown) => void): void { + this.sub.subscribe(room); + this.sub.on('message', (channel: string, msg: string) => { + if (channel === room) handler(msg); + }); + } + + async close(): Promise { + await Promise.all([this.pub.quit(), this.sub.quit()]); + } +} + +const httpServer = new Server(); +setHttpServer(httpServer); +const webSocketServer = new WebSocketServer({ noServer: true }); +setWebSocketServer(webSocketServer); +setAdapter(new RedisAdapter()); + +const dev = process.env.NODE_ENV !== 'production'; +const hostname = 'localhost'; +const port = Number.parseInt(process.env.PORT ?? '3000', 10); +const app = next({ dev, hostname, port, customServer: true }); +const handle = app.getRequestHandler(); + +app.prepare().then(() => { + httpServer + .on('request', async (req, res) => { + if (!req.url) return; + const parsedUrl = parse(req.url, true); + await handle(req, res, parsedUrl); + }) + .listen(port, () => { + console.log( + ` ▲ Ready on http://${hostname}:${port} (Instance ${process.env.INSTANCE_ID || 'unknown'})`, + ); + }); +}); diff --git a/examples/redis-adapter/tsconfig.json b/examples/redis-adapter/tsconfig.json new file mode 100644 index 0000000..8e78cb6 --- /dev/null +++ b/examples/redis-adapter/tsconfig.json @@ -0,0 +1,55 @@ +{ + "$schema": "https://json.schemastore.org/tsconfig.json", + + "include": ["app/**/*", ".next/types/**/*.ts"], + "exclude": ["node_modules", ".next"], + + "compileOnSave": true, + "compilerOptions": { + "allowUnreachableCode": false, + "allowUnusedLabels": false, + "exactOptionalPropertyTypes": false, + "noFallthroughCasesInSwitch": true, + "noImplicitOverride": true, + "noImplicitReturns": true, + "noPropertyAccessFromIndexSignature": false, + "noUncheckedIndexedAccess": true, + "noUnusedLocals": true, + "noUnusedParameters": true, + "strict": true, + + "allowArbitraryExtensions": false, + "allowImportingTsExtensions": false, + "allowJs": true, + "module": "ESNext", + "moduleResolution": "Bundler", + "resolveJsonModule": true, + "resolvePackageJsonExports": true, + "resolvePackageJsonImports": true, + + "declaration": true, + "declarationMap": true, + "importHelpers": false, + "newLine": "lf", + "noEmit": true, + "noEmitHelpers": true, + "removeComments": false, + "sourceMap": true, + "allowSyntheticDefaultImports": true, + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "isolatedModules": true, + + "experimentalDecorators": true, + "lib": ["DOM", "DOM.Iterable", "ESNext"], + "target": "ES2022", + "useDefineForClassFields": true, + "skipLibCheck": true, + + "jsx": "preserve", + "outDir": "./dist", + + "plugins": [{ "name": "next" }], + "incremental": true + } +} diff --git a/package.json b/package.json index 1a8cb54..da48a32 100644 --- a/package.json +++ b/package.json @@ -69,6 +69,7 @@ "@types/ws": "^8.18.1", "chalk": "^5.6.0", "husky": "^9.1.7", + "ioredis": "^5.8.1", "next": "catalog:", "pinst": "^3.0.0", "react": "catalog:", diff --git a/playwright.config.ts b/playwright.config.ts index 206ad24..6443519 100644 --- a/playwright.config.ts +++ b/playwright.config.ts @@ -30,5 +30,6 @@ export default defineConfig({ port: 3002, reuseExistingServer: !process.env.CI, }, + // Note: redis-adapter tests spawn their own instances ], }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ca22a1c..6beff2f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -9,6 +9,9 @@ catalogs: '@types/react': specifier: 19.1.10 version: 19.1.10 + ioredis: + specifier: 5.8.1 + version: 5.8.1 next: specifier: 15.5.4 version: 15.5.4 @@ -75,6 +78,9 @@ importers: husky: specifier: ^9.1.7 version: 9.1.7 + ioredis: + specifier: ^5.8.1 + version: 5.8.1 next: specifier: 'catalog:' version: 15.5.4(@babel/core@7.28.3)(@playwright/test@1.55.0)(react-dom@19.1.1(react@19.1.1))(react@19.1.1) @@ -173,6 +179,34 @@ importers: specifier: ^4.20.4 version: 4.20.4 + examples/redis-adapter: + dependencies: + ioredis: + specifier: 'catalog:' + version: 5.8.1 + next: + specifier: 'catalog:' + version: 15.5.4(@babel/core@7.28.3)(@playwright/test@1.55.0)(react-dom@19.1.1(react@19.1.1))(react@19.1.1) + next-ws: + specifier: workspace:^ + version: link:../.. + react: + specifier: 'catalog:' + version: 19.1.1 + react-dom: + specifier: 'catalog:' + version: 19.1.1(react@19.1.1) + shared: + specifier: workspace:^ + version: link:../_shared + devDependencies: + '@types/react': + specifier: 'catalog:' + version: 19.1.10 + tsx: + specifier: ^4.20.4 + version: 4.20.4 + packages: '@ampproject/remapping@2.3.0': @@ -763,6 +797,9 @@ packages: '@types/node': optional: true + '@ioredis/commands@1.4.0': + resolution: {integrity: sha512-aFT2yemJJo+TZCmieA7qnYGQooOS7QfNmYrzGtsYd3g9j5iDP8AimYYAesf79ohjbLG12XxC4nG5DyEnC88AsQ==} + '@isaacs/cliui@8.0.2': resolution: {integrity: sha512-O8jcjabXaleOG9DQ0+ARXWZBTfnP4WNAqzuiJK7ll44AmxGKv/J2M4TPjxjY3znBCfvBXFzucm1twdyFybFqEA==} engines: {node: '>=12'} @@ -1120,6 +1157,10 @@ packages: resolution: {integrity: sha512-neHB9xuzh/wk0dIHweyAXv2aPGZIVk3pLMe+/RNzINf17fe0OG96QroktYAUm7SM1PBnzTabaLboqqxDyMU+SQ==} engines: {node: '>=6'} + cluster-key-slot@1.1.2: + resolution: {integrity: sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==} + engines: {node: '>=0.10.0'} + color-convert@2.0.1: resolution: {integrity: sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==} engines: {node: '>=7.0.0'} @@ -1174,6 +1215,10 @@ packages: supports-color: optional: true + denque@2.1.0: + resolution: {integrity: sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==} + engines: {node: '>=0.10'} + detect-indent@6.1.0: resolution: {integrity: sha512-reYkTUJAZb9gUuZ2RvVCNhVHdg62RHnJ7WJl8ftMi4diZ6NWlciOzQN88pUhSELEwflJht4oQDv0F0BMlwaYtA==} engines: {node: '>=8'} @@ -1348,6 +1393,10 @@ packages: resolution: {integrity: sha512-JmXMZ6wuvDmLiHEml9ykzqO6lwFbof0GG4IkcGaENdCRDDmMVnny7s5HsIgHCbaq0w2MyPhDqkhTUgS2LU2PHA==} engines: {node: '>=0.8.19'} + ioredis@5.8.1: + resolution: {integrity: sha512-Qho8TgIamqEPdgiMadJwzRMW3TudIg6vpg4YONokGDudy4eqRIJtDbVX72pfLBcWxvbn3qm/40TyGUObdW4tLQ==} + engines: {node: '>=12.22.0'} + ip-address@10.0.1: resolution: {integrity: sha512-NWv9YLW4PoW2B7xtzaS3NCot75m6nK7Icdv0o3lfMceJVRfSoQwqD4wEH5rLwoKJwUiZ/rfpiVBhnaF0FK4HoA==} engines: {node: '>= 12'} @@ -1458,6 +1507,12 @@ packages: resolution: {integrity: sha512-t7hw9pI+WvuwNJXwk5zVHpyhIqzg2qTlklJOf0mVxGSbe3Fp2VieZcduNYjaLDoy6p9uGpQEGWG87WpMKlNq8g==} engines: {node: '>=8'} + lodash.defaults@4.2.0: + resolution: {integrity: sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==} + + lodash.isarguments@3.1.0: + resolution: {integrity: sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==} + lodash.sortby@4.7.0: resolution: {integrity: sha512-HDWXG8isMntAyRF5vZ7xKuEvOhT4AhlRt/3czTSjvGUxjYCBVRQY48ViDHyfYz9VIoBkW4TMGQNapx+l3RUwdA==} @@ -1761,6 +1816,14 @@ packages: resolution: {integrity: sha512-YTUo+Flmw4ZXiWfQKGcwwc11KnoRAYgzAE2E7mXKCjSviTKShtxBsN6YUUBB2gtaBzKzeKunxhUwNHQuRryhWA==} engines: {node: '>= 4'} + redis-errors@1.2.0: + resolution: {integrity: sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==} + engines: {node: '>=4'} + + redis-parser@3.0.0: + resolution: {integrity: sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==} + engines: {node: '>=4'} + resolve-from@5.0.0: resolution: {integrity: sha512-qYg9KP24dD5qka9J47d0aVky0N+b4fTU89LN9iDnjB5waksiC49rvMB0PrUJQGoTmH50XPiqOvAjDfaijGxYZw==} engines: {node: '>=8'} @@ -1868,6 +1931,9 @@ packages: resolution: {integrity: sha512-S7iGNosepx9RadX82oimUkvr0Ct7IjJbEbs4mJcTxst8um95J3sDYU1RBEOvdu6oL1Wek2ODI5i4MAw+dZ6cAQ==} engines: {node: ^18.17.0 || >=20.5.0} + standard-as-callback@2.1.0: + resolution: {integrity: sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==} + string-width@4.2.3: resolution: {integrity: sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g==} engines: {node: '>=8'} @@ -2673,6 +2739,8 @@ snapshots: optionalDependencies: '@types/node': 24.3.0 + '@ioredis/commands@1.4.0': {} + '@isaacs/cliui@8.0.2': dependencies: string-width: 5.1.2 @@ -2967,6 +3035,8 @@ snapshots: kind-of: 6.0.3 shallow-clone: 3.0.1 + cluster-key-slot@1.1.2: {} + color-convert@2.0.1: dependencies: color-name: 1.1.4 @@ -3011,6 +3081,8 @@ snapshots: dependencies: ms: 2.1.3 + denque@2.1.0: {} + detect-indent@6.1.0: {} detect-libc@2.0.4: @@ -3207,6 +3279,20 @@ snapshots: imurmurhash@0.1.4: {} + ioredis@5.8.1: + dependencies: + '@ioredis/commands': 1.4.0 + cluster-key-slot: 1.1.2 + debug: 4.4.1 + denque: 2.1.0 + lodash.defaults: 4.2.0 + lodash.isarguments: 3.1.0 + redis-errors: 1.2.0 + redis-parser: 3.0.0 + standard-as-callback: 2.1.0 + transitivePeerDependencies: + - supports-color + ip-address@10.0.1: {} is-arrayish@0.3.2: @@ -3305,6 +3391,10 @@ snapshots: dependencies: p-locate: 4.1.0 + lodash.defaults@4.2.0: {} + + lodash.isarguments@3.1.0: {} + lodash.sortby@4.7.0: {} lodash.startcase@4.4.0: {} @@ -3587,6 +3677,12 @@ snapshots: tiny-invariant: 1.3.3 tslib: 2.8.1 + redis-errors@1.2.0: {} + + redis-parser@3.0.0: + dependencies: + redis-errors: 1.2.0 + resolve-from@5.0.0: {} resolve-pkg-maps@1.0.0: {} @@ -3723,6 +3819,8 @@ snapshots: dependencies: minipass: 7.1.2 + standard-as-callback@2.1.0: {} + string-width@4.2.3: dependencies: emoji-regex: 8.0.0 diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index bffc6a5..d75a5e1 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -7,3 +7,4 @@ catalog: react: "19.1.1" react-dom: "19.1.1" "@types/react": "19.1.10" + ioredis: "5.8.1" diff --git a/src/patches/patch-1.ts b/src/patches/patch-1.ts index 4330261..42c4044 100644 --- a/src/patches/patch-1.ts +++ b/src/patches/patch-1.ts @@ -21,7 +21,8 @@ export const patchNextNodeServer = definePatchStep({ try { nextWs ??= require('next-ws/server') } catch {} try { nextWs ??= require(require.resolve('next-ws/server', { paths: [process.cwd()] }) )} catch {} try { nextWs ??= require('${resolveNextWsDirectory().replaceAll(sep, '/').replaceAll("'", "\\'")}/dist/server/index.cjs') } catch {} - nextWs?.setupWebSocketServer(this); + const adapter = nextWs?.getAdapter?.(); + nextWs?.setupWebSocketServer(this, adapter ? { adapter } : undefined); `); const block = $.blockStatement(snippet.nodes()[0].program.body); diff --git a/src/server/helpers/adapter.ts b/src/server/helpers/adapter.ts new file mode 100644 index 0000000..84cadd9 --- /dev/null +++ b/src/server/helpers/adapter.ts @@ -0,0 +1,47 @@ +/** + * Adapter interface for multi-instance WebSocket deployments. + * Enables message broadcasting across multiple server instances via pub/sub. + * + * @example + * ```typescript + * class RedisAdapter implements Adapter { + * private pub = new Redis(process.env.REDIS_URL); + * private sub = new Redis(process.env.REDIS_URL); + * + * async broadcast(room: string, message: unknown): Promise { + * await this.pub.publish(room, JSON.stringify(message)); + * } + * + * onMessage(room: string, handler: (message: unknown) => void): void { + * this.sub.subscribe(room); + * this.sub.on('message', (channel, msg) => { + * if (channel === room) handler(JSON.parse(msg)); + * }); + * } + * + * async close(): Promise { + * await Promise.all([this.pub.quit(), this.sub.quit()]); + * } + * } + * ``` + */ +export interface Adapter { + /** + * Broadcast a message to all instances subscribed to the specified room. + * @param room Room identifier (typically the route pathname) + * @param message Message to broadcast (will be JSON stringified if needed) + */ + broadcast(room: string, message: unknown): Promise; + + /** + * Subscribe to messages for a specific room. + * @param room Room identifier to subscribe to + * @param handler Callback invoked when messages arrive for this room + */ + onMessage(room: string, handler: (message: unknown) => void): void; + + /** + * Clean up adapter resources (close connections, unsubscribe, etc.) + */ + close(): Promise; +} diff --git a/src/server/index.ts b/src/server/index.ts index 89e8b43..03912c3 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -1,9 +1,13 @@ +export type { Adapter } from './helpers/adapter.js'; export type { RouteContext } from './helpers/module.js'; export type { SocketHandler, UpgradeHandler } from './helpers/socket.js'; export { + getAdapter, getHttpServer, getWebSocketServer, + setAdapter, setHttpServer, setWebSocketServer, } from './persistent.js'; +export type { SetupOptions } from './setup.js'; export * from './setup.js'; diff --git a/src/server/persistent.ts b/src/server/persistent.ts index 1479e68..f920e8e 100644 --- a/src/server/persistent.ts +++ b/src/server/persistent.ts @@ -92,3 +92,29 @@ export { */ useRequestStorage, }; + +// ===== Adapter ===== // + +const [getAdapter, setAdapter, useAdapter] = // + useGlobal( + Symbol.for('next-ws.adapter'), // + ); + +export { + /** + * Get the adapter instance. + * @returns Existing adapter instance if set + */ + getAdapter, + /** + * Set the adapter instance. + * @param value Adapter instance + */ + setAdapter, + /** + * Get or set the adapter instance. + * @param getter Function to get the adapter instance + * @returns Existing or created adapter instance + */ + useAdapter, +}; diff --git a/src/server/setup.ts b/src/server/setup.ts index 736ffbe..8d5e4bc 100644 --- a/src/server/setup.ts +++ b/src/server/setup.ts @@ -1,7 +1,8 @@ import { AsyncLocalStorage } from 'node:async_hooks'; import * as logger from 'next/dist/build/output/log.js'; import type NextNodeServer from 'next/dist/server/next-server.js'; -import { WebSocketServer } from 'ws'; +import { WebSocketServer, WebSocket } from 'ws'; +import type { Adapter } from './helpers/adapter.js'; import { findMatchingRoute } from './helpers/match.js'; import { importRouteModule } from './helpers/module.js'; import { toNextRequest } from './helpers/request.js'; @@ -12,11 +13,19 @@ import { useWebSocketServer, } from './persistent.js'; +export interface SetupOptions { + adapter?: Adapter; +} + /** * Attach the WebSocket server to the HTTP server. * @param nextServer Next.js Node server instance + * @param options Setup options including optional adapter */ -export function setupWebSocketServer(nextServer: NextNodeServer) { +export function setupWebSocketServer( + nextServer: NextNodeServer, + options?: SetupOptions, +) { const httpServer = // // @ts-expect-error - serverOptions is protected useHttpServer(() => nextServer.serverOptions?.httpServer); @@ -26,14 +35,22 @@ export function setupWebSocketServer(nextServer: NextNodeServer) { useWebSocketServer(() => new WebSocketServer({ noServer: true })); const requestStorage = // useRequestStorage(() => new AsyncLocalStorage()); + const adapter = options?.adapter; - logger.ready('[next-ws] has started the WebSocket server'); + if (adapter) { + logger.ready('[next-ws] adapter configured for multi-instance support'); + } else { + logger.ready('[next-ws] has started the WebSocket server'); + } // Prevent double-attaching const kAttached = Symbol.for('next-ws.http-server.attached'); if (Reflect.has(httpServer, kAttached)) return; Reflect.set(httpServer, kAttached, true); + // Store route -> clients mapping for adapter broadcasts + const routeClients = new Map>(); + httpServer.on('upgrade', async (message, socket, head) => { const request = toNextRequest(message); @@ -69,6 +86,30 @@ export function setupWebSocketServer(nextServer: NextNodeServer) { wsServer.handleUpgrade(message, socket, head, async (client) => { wsServer.emit('connection', client, message); + // Track client by route for adapter + if (adapter) { + if (!routeClients.has(pathname)) { + routeClients.set(pathname, new Set()); + + // Subscribe to adapter messages for this route + adapter.onMessage(pathname, (message: unknown) => { + const clients = routeClients.get(pathname); + if (!clients) return; + + for (const localClient of clients) { + if (localClient.readyState === WebSocket.OPEN) { + localClient.send( + typeof message === 'string' + ? message + : JSON.stringify(message), + ); + } + } + }); + } + routeClients.get(pathname)?.add(client); + } + try { const context = { params: route.params }; if (handleUpgrade) { @@ -84,6 +125,15 @@ export function setupWebSocketServer(nextServer: NextNodeServer) { if (typeof handleClose === 'function') client.once('close', () => handleClose()); } + + // Intercept client messages to broadcast via adapter + if (adapter) { + client.on('message', (data) => { + adapter.broadcast(pathname, data).catch((err: unknown) => { + logger.error('[next-ws] adapter broadcast failed:', err); + }); + }); + } } catch (cause) { logger.error( `[next-ws] error in socket handler for '${pathname}'`, @@ -93,8 +143,23 @@ export function setupWebSocketServer(nextServer: NextNodeServer) { client.close(1011, 'Internal Server Error'); } catch {} } + + client.once('close', () => { + if (adapter) { + routeClients.get(pathname)?.delete(client); + } + }); }); return; }); + + // Cleanup adapter on server close + if (adapter) { + httpServer.once('close', async () => { + await adapter.close().catch((err: unknown) => { + logger.error('[next-ws] adapter cleanup failed:', err); + }); + }); + } } diff --git a/tests/redis-adapter.test.ts b/tests/redis-adapter.test.ts new file mode 100644 index 0000000..520b996 --- /dev/null +++ b/tests/redis-adapter.test.ts @@ -0,0 +1,280 @@ +import type { ChildProcess } from 'node:child_process'; +import { spawn } from 'node:child_process'; +import { setTimeout as sleep } from 'node:timers/promises'; +import { expect, test } from '@playwright/test'; + +let instance1: ChildProcess; +let instance2: ChildProcess; +const PORT_1 = 3004; +const PORT_2 = 3005; + +async function isRedisAvailable(): Promise { + try { + const Redis = (await import('ioredis')).default; + const testRedis = new Redis({ + host: 'localhost', + port: 6379, + retryStrategy: () => null, + lazyConnect: true, + }); + await testRedis.connect(); + await testRedis.ping(); + await testRedis.quit(); + return true; + } catch { + return false; + } +} + +test.describe('Redis Adapter Multi-Instance', () => { + test.beforeAll(async () => { + const redisAvailable = await isRedisAvailable(); + if (!redisAvailable) { + test.skip(); + return; + } + + instance1 = spawn('npm', ['run', 'dev'], { + cwd: 'examples/redis-adapter', + env: { + ...process.env, + PORT: String(PORT_1), + INSTANCE_ID: 'instance-1', + REDIS_URL: 'redis://localhost:6379', + }, + stdio: 'pipe', + }); + + instance2 = spawn('npm', ['run', 'dev'], { + cwd: 'examples/redis-adapter', + env: { + ...process.env, + PORT: String(PORT_2), + INSTANCE_ID: 'instance-2', + REDIS_URL: 'redis://localhost:6379', + }, + stdio: 'pipe', + }); + + await Promise.all([ + waitForServer(PORT_1, instance1), + waitForServer(PORT_2, instance2), + ]); + + await sleep(1000); + }); + + test.afterAll(async () => { + if (instance1) { + try { + instance1.kill('SIGTERM'); + } catch {} + await waitForProcessExit(instance1); + } + if (instance2) { + try { + instance2.kill('SIGTERM'); + } catch {} + await waitForProcessExit(instance2); + } + }); + + test('messages broadcast across instances via Redis adapter', async ({ + browser, + }) => { + const page1 = await browser.newPage(); + const page2 = await browser.newPage(); + + try { + await page1.goto(`http://localhost:${PORT_1}`); + await page1.waitForLoadState('networkidle'); + + await page2.goto(`http://localhost:${PORT_2}`); + await page2.waitForLoadState('networkidle'); + + await sleep(1000); + + const welcomeMessage1 = await page1.textContent('li:first-child'); + expect(welcomeMessage1).toContain('instance-1'); + + const welcomeMessage2 = await page2.textContent('li:first-child'); + expect(welcomeMessage2).toContain('instance-2'); + + await page1.fill('input[name=author]', 'Alice'); + await page1.fill('input[name=content]', 'Hello from instance 1!'); + await page1.click('button[type=submit]'); + + await sleep(1500); + + const page1LastMessage = await page1.textContent('li:last-child'); + expect(page1LastMessage).toContain('Alice'); + expect(page1LastMessage).toContain('Hello from instance 1!'); + + const page2LastMessage = await page2.textContent('li:last-child'); + expect(page2LastMessage).toContain('Alice'); + expect(page2LastMessage).toContain('Hello from instance 1!'); + + await page2.fill('input[name=author]', 'Bob'); + await page2.fill('input[name=content]', 'Hello from instance 2!'); + await page2.click('button[type=submit]'); + + await sleep(1500); + + const page2NewMessage = await page2.textContent('li:last-child'); + expect(page2NewMessage).toContain('Bob'); + expect(page2NewMessage).toContain('Hello from instance 2!'); + + const page1NewMessage = await page1.textContent('li:last-child'); + expect(page1NewMessage).toContain('Bob'); + expect(page1NewMessage).toContain('Hello from instance 2!'); + } finally { + await page1.close(); + await page2.close(); + } + }); + + test('multiple clients on same instance receive broadcasts from other instances', async ({ + browser, + }) => { + const page1a = await browser.newPage(); + const page1b = await browser.newPage(); + const page2 = await browser.newPage(); + + try { + await page1a.goto(`http://localhost:${PORT_1}`); + await page1b.goto(`http://localhost:${PORT_1}`); + await page2.goto(`http://localhost:${PORT_2}`); + + await page1a.waitForLoadState('networkidle'); + await page1b.waitForLoadState('networkidle'); + await page2.waitForLoadState('networkidle'); + + await sleep(1000); + + await page2.fill('input[name=author]', 'Charlie'); + await page2.fill('input[name=content]', 'Broadcasting to all!'); + await page2.click('button[type=submit]'); + + await sleep(1500); + + const message1a = await page1a.textContent('li:last-child'); + expect(message1a).toContain('Charlie'); + expect(message1a).toContain('Broadcasting to all!'); + + const message1b = await page1b.textContent('li:last-child'); + expect(message1b).toContain('Charlie'); + expect(message1b).toContain('Broadcasting to all!'); + + const message2 = await page2.textContent('li:last-child'); + expect(message2).toContain('Charlie'); + expect(message2).toContain('Broadcasting to all!'); + } finally { + await page1a.close(); + await page1b.close(); + await page2.close(); + } + }); + + test('adapter handles client disconnection and reconnection', async ({ + browser, + }) => { + const page1 = await browser.newPage(); + const page2 = await browser.newPage(); + + try { + await page1.goto(`http://localhost:${PORT_1}`); + await page2.goto(`http://localhost:${PORT_2}`); + + await page1.waitForLoadState('networkidle'); + await page2.waitForLoadState('networkidle'); + await sleep(1000); + + await page1.fill('input[name=author]', 'Dave'); + await page1.fill('input[name=content]', 'Initial message'); + await page1.click('button[type=submit]'); + await sleep(1500); + + let message = await page2.textContent('li:last-child'); + expect(message).toContain('Initial message'); + + await page2.reload(); + await page2.waitForLoadState('networkidle'); + await sleep(1000); + + await page1.fill('input[name=author]', 'Dave'); + await page1.fill('input[name=content]', 'After reconnect'); + await page1.click('button[type=submit]'); + await sleep(1500); + + message = await page2.textContent('li:last-child'); + expect(message).toContain('After reconnect'); + } finally { + await page1.close(); + await page2.close(); + } + }); +}); + +async function waitForServer( + port: number, + process: ChildProcess, + timeout = 30000, +): Promise { + const readyPromise = new Promise((resolve, reject) => { + let timer: NodeJS.Timeout; + + const onData = (data: Buffer) => { + const output = data.toString(); + if (output.includes('Ready on')) { + cleanup(); + resolve(); + } + }; + + const onExit = (code: number | null) => { + cleanup(); + reject(new Error(`Server process exited with code ${code}`)); + }; + + const cleanup = () => { + clearTimeout(timer); + process.stdout?.off('data', onData); + process.off('exit', onExit); + }; + + process.stdout?.on('data', onData); + process.on('exit', onExit); + + timer = setTimeout(() => { + cleanup(); + reject( + new Error(`Server on port ${port} did not start within ${timeout}ms`), + ); + }, timeout); + }); + + await readyPromise; +} + +async function waitForProcessExit( + proc: ChildProcess, + timeout = 5000, +): Promise { + return new Promise((resolve) => { + // Already exited + if (proc.exitCode !== null || proc.signalCode) return resolve(); + + const timer = setTimeout(() => { + try { + // Only attempt if still running + if (proc.exitCode === null && !proc.killed) proc.kill('SIGKILL'); + } catch {} + resolve(); + }, timeout); + + proc.once('exit', () => { + clearTimeout(timer); + resolve(); + }); + }); +}