diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7c0383cc..e0c5e4b7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,7 +15,7 @@ jobs: services: redis: - image: redislabs/rejson:latest + image: redis/redis-stack-server:latest ports: - 6379:6379 options: >- diff --git a/package-lock.json b/package-lock.json index aa76cd2a..d3a9d735 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,7 +12,7 @@ "@koa/router": "^12.0.1", "@maxmind/geoip2-node": "^4.2.0", "@redocly/openapi-core": "^1.6.0", - "@socket.io/redis-adapter": "^8.2.1", + "@socket.io/redis-adapter": "github:MartinKolarik/socket.io-redis-adapter#dist", "adm-zip": "^0.5.10", "any-ascii": "^0.3.2", "bluebird": "^3.7.2", @@ -5322,8 +5322,8 @@ }, "node_modules/@socket.io/redis-adapter": { "version": "8.2.1", - "resolved": "https://registry.npmjs.org/@socket.io/redis-adapter/-/redis-adapter-8.2.1.tgz", - "integrity": "sha512-6Dt7EZgGSBP0qvXeOKGx7NnSr2tPMbVDfDyL97zerZo+v69hMfL99skMCL3RKZlWVqLyRme2T0wcy3udHhtOsg==", + "resolved": "git+ssh://git@github.com/MartinKolarik/socket.io-redis-adapter.git#1f6dfc58b8e94593c6f4193b8e3702b9c733d1d0", + "license": "MIT", "dependencies": { "debug": "~4.3.1", "notepack.io": "~3.0.1", diff --git a/package.json b/package.json index 96d24d6b..bb886220 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,7 @@ "@koa/router": "^12.0.1", "@maxmind/geoip2-node": "^4.2.0", "@redocly/openapi-core": "^1.6.0", - "@socket.io/redis-adapter": "^8.2.1", + "@socket.io/redis-adapter": "github:MartinKolarik/socket.io-redis-adapter#dist", "adm-zip": "^0.5.10", "any-ascii": "^0.3.2", "bluebird": "^3.7.2", diff --git a/src/lib/ws/server.ts b/src/lib/ws/server.ts index 1dc82402..f212d0db 100644 --- a/src/lib/ws/server.ts +++ b/src/lib/ws/server.ts @@ -1,5 +1,5 @@ import { type RemoteSocket, Server, Socket } from 'socket.io'; -import { createAdapter } from '@socket.io/redis-adapter'; +import { createShardedAdapter } from '@socket.io/redis-adapter'; // eslint-disable-next-line n/no-missing-import import type { DefaultEventsMap } from 'socket.io/dist/typed-events.js'; import type { Probe } from '../../probe/types.js'; @@ -32,8 +32,8 @@ export const initWsServer = async () => { pingTimeout: 3000, }); - io.adapter(createAdapter(pubClient, subClient, { - publishOnSpecificResponseChannel: true, + io.adapter(createShardedAdapter(pubClient, subClient, { + subscriptionMode: 'static', })); }; diff --git a/src/measurement/runner.ts b/src/measurement/runner.ts index 4c134679..e05e44e6 100644 --- a/src/measurement/runner.ts +++ b/src/measurement/runner.ts @@ -1,7 +1,7 @@ import config from 'config'; import type { Server } from 'socket.io'; import createHttpError from 'http-errors'; -import { getWsServer } from '../lib/ws/server.js'; +import { getWsServer, PROBES_NAMESPACE } from '../lib/ws/server.js'; import { getProbeRouter, type ProbeRouter } from '../probe/router.js'; import type { Probe } from '../probe/types.js'; import { getMetricsAgent, type MetricsAgent } from '../lib/metrics.js'; @@ -61,7 +61,7 @@ export class MeasurementRunner { const maxInProgressProbes = config.get('measurement.maxInProgressProbes'); onlineProbesMap.forEach((probe, index) => { const inProgressUpdates = request.inProgressUpdates && inProgressProbes++ < maxInProgressProbes; - this.io.of('probes').to(probe.client).emit('probe:measurement:request', { + this.io.of(PROBES_NAMESPACE).to(probe.client).emit('probe:measurement:request', { measurementId, testId: index.toString(), measurement: { diff --git a/test/tests/unit/measurement/runner.test.ts b/test/tests/unit/measurement/runner.test.ts index 910481d8..06e73a2b 100644 --- a/test/tests/unit/measurement/runner.test.ts +++ b/test/tests/unit/measurement/runner.test.ts @@ -40,7 +40,7 @@ describe('MeasurementRunner', () => { beforeEach(() => { sinon.resetHistory(); to.returns({ emit }); - io.of.returns({ to } as any); + io.of.withArgs('/probes').returns({ to } as any); store.createMeasurement.resolves('measurementid'); testId = 0; });