diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index c8703c15..fc2ac989 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -48,3 +48,4 @@ Most IDEs have plugins integrating the used linter (eslint), including support f - `SYSTEM_API_KEY={value}` used for integration with the dashboard - `SERVER_SESSION_COOKIE_SECRET={value}` used to read the shared session cookie - `DB_CONNECTION_HOST`, `DB_CONNECTION_USER`, `DB_CONNECTION_PASSWORD`, and `DB_CONNECTION_DATABASE` database connection details +- `REDIS_STANDALONE_PERSISTENT_URL`, `STANDALONE_NON_PERSISTENT_URL`, `CLUSTER_MEASUREMENTS_ROOT_NODES_0_URL`, `CLUSTER_MEASUREMENTS_ROOT_NODES_1_URL`, `CLUSTER_MEASUREMENTS_ROOT_NODES_2_URL`, and `REDIS_SHARED_PASSWORD` - redis connection details diff --git a/config/default.cjs b/config/default.cjs index 7aea435c..258a7d91 100644 --- a/config/default.cjs +++ b/config/default.cjs @@ -17,9 +17,29 @@ module.exports = { }, }, redis: { - url: 'redis://localhost:6379', - socket: { - tls: false, + standalonePersistent: { + url: 'redis://localhost:7001', + }, + standaloneNonPersistent: { + url: 'redis://localhost:7002', + }, + clusterMeasurements: { + // listing three nodes here is enough, the rest will be discovered automatically + rootNodes: [{ + url: 'redis://localhost:7101', + }, + { + url: 'redis://localhost:7102', + }, + { + url: 'redis://localhost:7103', + }], + }, + shared: { + password: 'PASSWORD', + socket: { + tls: false, + }, }, }, db: { diff --git a/config/development.cjs b/config/development.cjs index a6916a27..f6c8bc74 100644 --- a/config/development.cjs +++ b/config/development.cjs @@ -4,12 +4,6 @@ module.exports = { cookieSecret: 'xxx', }, }, - redis: { - url: 'redis://localhost:16379', - socket: { - tls: false, - }, - }, db: { connection: { port: 13306, diff --git a/config/test.cjs b/config/test.cjs index 036a1e17..e836c2be 100644 --- a/config/test.cjs +++ b/config/test.cjs @@ -4,12 +4,6 @@ module.exports = { cookieSecret: 'xxx', }, }, - redis: { - url: 'redis://localhost:16379', - socket: { - tls: false, - }, - }, db: { connection: { port: 13306, diff --git a/src/lib/metrics.ts b/src/lib/metrics.ts index dc9fdaf2..ddda157d 100644 --- a/src/lib/metrics.ts +++ b/src/lib/metrics.ts @@ -5,7 +5,7 @@ import type { Knex } from 'knex'; import { scopedLogger } from './logger.js'; import { fetchProbes, getWsServer, PROBES_NAMESPACE } from './ws/server.js'; -import { getMeasurementRedisClient, type RedisClient } from './redis/measurement-client.js'; +import { getMeasurementRedisClient, type RedisCluster } from './redis/measurement-client.js'; import { USERS_TABLE } from './http/auth.js'; import { client } from './sql/client.js'; @@ -21,14 +21,14 @@ export class MetricsAgent { constructor ( private readonly io: SocketServer, - private readonly redis: RedisClient, + private readonly redis: RedisCluster, private readonly sql: Knex, ) {} run (): void { this.registerAsyncCollector(`gp.measurement.stored.count`, async () => { const [ dbSize, awaitingSize ] = await Promise.all([ - this.redis.dbSize(), + this.redis.reduceMasters(async (accumulator, client) => accumulator + await client.dbSize(), 0), this.redis.hLen('gp:in-progress'), ]); diff --git a/src/lib/redis/client.ts b/src/lib/redis/client.ts index 1586b6db..76c8acd5 100644 --- a/src/lib/redis/client.ts +++ b/src/lib/redis/client.ts @@ -1,26 +1,41 @@ +import config from 'config'; import type { RedisClientOptions } from 'redis'; -import { createRedisClientInternal, type RedisClient } from './shared.js'; +import { createRedisClientInternal, type RedisClient, type RedisClientInternal } from './shared.js'; export type { RedisClient } from './shared.js'; let redis: RedisClient; +let redisConnectPromise: Promise; export const initRedisClient = async () => { - redis = createRedisClient(); + if (redis) { + await redisConnectPromise; + return redis; + } + + const { client, connectPromise } = createRedisClient(); + + redis = client; + redisConnectPromise = connectPromise; + + await redisConnectPromise; return redis; }; -const createRedisClient = (options?: RedisClientOptions): RedisClient => { +const createRedisClient = (options?: RedisClientOptions): RedisClientInternal => { return createRedisClientInternal({ + ...config.util.toObject(config.get('redis.shared')) as RedisClientOptions, + ...config.util.toObject(config.get('redis.standaloneNonPersistent')) as RedisClientOptions, ...options, - database: 2, name: 'non-persistent', }); }; export const getRedisClient = (): RedisClient => { if (!redis) { - redis = createRedisClient(); + const { client, connectPromise } = createRedisClient(); + redis = client; + redisConnectPromise = connectPromise; } return redis; diff --git a/src/lib/redis/measurement-client.ts b/src/lib/redis/measurement-client.ts index cc7434e0..75e433c2 100644 --- a/src/lib/redis/measurement-client.ts +++ b/src/lib/redis/measurement-client.ts @@ -1,26 +1,40 @@ -import type { RedisClientOptions } from 'redis'; -import { createRedisClientInternal, type RedisClient } from './shared.js'; +import config from 'config'; +import type { RedisClientOptions, RedisClusterOptions } from 'redis'; +import { createRedisClusterInternal, type RedisCluster, type RedisClusterInternal } from './shared.js'; -export type { RedisClient } from './shared.js'; +export type { RedisCluster } from './shared.js'; -let redis: RedisClient; +let redis: RedisCluster; +let redisConnectPromise: Promise; export const initMeasurementRedisClient = async () => { - redis = createMeasurementRedisClient(); + if (redis) { + await redisConnectPromise; + return redis; + } + + const { client, connectPromise } = createMeasurementRedisClient(); + + redis = client; + redisConnectPromise = connectPromise; + + await redisConnectPromise; return redis; }; -export const createMeasurementRedisClient = (options?: RedisClientOptions): RedisClient => { - return createRedisClientInternal({ +export const createMeasurementRedisClient = (options?: RedisClusterOptions): RedisClusterInternal => { + return createRedisClusterInternal({ + defaults: { ...config.util.toObject(config.get('redis.shared')) as RedisClientOptions }, + ...config.util.toObject(config.get('redis.clusterMeasurements')) as RedisClusterOptions, ...options, - database: 0, - name: 'measurement', }); }; -export const getMeasurementRedisClient = (): RedisClient => { +export const getMeasurementRedisClient = (): RedisCluster => { if (!redis) { - redis = createMeasurementRedisClient(); + const { client, connectPromise } = createMeasurementRedisClient(); + redis = client; + redisConnectPromise = connectPromise; } return redis; diff --git a/src/lib/redis/persistent-client.ts b/src/lib/redis/persistent-client.ts index 081d76e9..4a956ebb 100644 --- a/src/lib/redis/persistent-client.ts +++ b/src/lib/redis/persistent-client.ts @@ -1,26 +1,41 @@ +import config from 'config'; import type { RedisClientOptions } from 'redis'; -import { createRedisClientInternal, type RedisClient } from './shared.js'; +import { createRedisClientInternal, type RedisClient, type RedisClientInternal } from './shared.js'; export type { RedisClient } from './shared.js'; let redis: RedisClient; +let redisConnectPromise: Promise; export const initPersistentRedisClient = async () => { - redis = createPersistentRedisClient(); + if (redis) { + await redisConnectPromise; + return redis; + } + + const { client, connectPromise } = createPersistentRedisClient(); + + redis = client; + redisConnectPromise = connectPromise; + + await redisConnectPromise; return redis; }; -export const createPersistentRedisClient = (options?: RedisClientOptions): RedisClient => { +export const createPersistentRedisClient = (options?: RedisClientOptions): RedisClientInternal => { return createRedisClientInternal({ + ...config.util.toObject(config.get('redis.shared')) as RedisClientOptions, + ...config.util.toObject(config.get('redis.standalonePersistent')) as RedisClientOptions, ...options, - database: 1, name: 'persistent', }); }; export const getPersistentRedisClient = (): RedisClient => { if (!redis) { - redis = createPersistentRedisClient(); + const { client, connectPromise } = createPersistentRedisClient(); + redis = client; + redisConnectPromise = connectPromise; } return redis; diff --git a/src/lib/redis/scripts.ts b/src/lib/redis/scripts.ts index bb232b40..1732146e 100644 --- a/src/lib/redis/scripts.ts +++ b/src/lib/redis/scripts.ts @@ -25,6 +25,7 @@ export type RedisScripts = { }; const recordResult: RecordResultScript = defineScript({ + FIRST_KEY_INDEX: 0, // Needed in clusters: https://github.com/redis/node-redis/issues/2521 NUMBER_OF_KEYS: 2, SCRIPT: ` local keyMeasurementResults = KEYS[1] @@ -51,8 +52,8 @@ const recordResult: RecordResultScript = defineScript({ transformArguments (measurementId, testId, data) { return [ // keys - `gp:m:${measurementId}:results`, - `gp:m:${measurementId}:probes_awaiting`, + `gp:m:{${measurementId}}:results`, + `gp:m:{${measurementId}}:probes_awaiting`, // values testId, JSON.stringify(data), @@ -65,25 +66,20 @@ const recordResult: RecordResultScript = defineScript({ }); const markFinished: MarkFinishedScript = defineScript({ - NUMBER_OF_KEYS: 3, + FIRST_KEY_INDEX: 0, // Needed in clusters: https://github.com/redis/node-redis/issues/2521 + NUMBER_OF_KEYS: 2, SCRIPT: ` - local keyInProgress = KEYS[1] - local keyMeasurementResults = KEYS[2] - local keyMeasurementAwaiting = KEYS[3] - local measurementId = ARGV[1] + local keyMeasurementResults = KEYS[1] + local keyMeasurementAwaiting = KEYS[2] - redis.call('HDEL', keyInProgress, measurementId) redis.call('DEL', keyMeasurementAwaiting) redis.call('JSON.SET', keyMeasurementResults, '$.status', '"finished"') `, transformArguments (measurementId) { return [ // keys - 'gp:in-progress', - `gp:m:${measurementId}:results`, - `gp:m:${measurementId}:probes_awaiting`, - // values - measurementId, + `gp:m:{${measurementId}}:results`, + `gp:m:{${measurementId}}:probes_awaiting`, ]; }, transformReply () { diff --git a/src/lib/redis/shared.ts b/src/lib/redis/shared.ts index 97baa145..33f419e3 100644 --- a/src/lib/redis/shared.ts +++ b/src/lib/redis/shared.ts @@ -1,30 +1,72 @@ -import config from 'config'; import { createClient, + createCluster, type RedisClientOptions, type RedisClientType, + type RedisClusterOptions, + type RedisClusterType, type RedisDefaultModules, type RedisFunctions, } from 'redis'; +import Bluebird from 'bluebird'; import { type RedisScripts, scripts } from './scripts.js'; import { scopedLogger } from '../logger.js'; const logger = scopedLogger('redis-client'); +type ClusterExtensions = { + mapMasters: typeof mapMasters, + reduceMasters: typeof reduceMasters, +}; + export type RedisClient = RedisClientType; +export type RedisCluster = RedisClusterType & ClusterExtensions; +export type RedisClientInternal = { connectPromise: Promise, client: RedisClient }; +export type RedisClusterInternal = { connectPromise: Promise, client: RedisCluster }; -export const createRedisClientInternal = (options?: RedisClientOptions): RedisClient => { +export const createRedisClientInternal = (options: RedisClientOptions): RedisClientInternal => { const client = createClient({ - ...config.util.toObject(config.get('redis')) as RedisClientOptions, ...options, scripts, }); - client + const connectPromise = client .on('error', (error: Error) => logger.error('Redis connection error:', error)) .on('ready', () => logger.info('Redis connection ready.')) .on('reconnecting', () => logger.info('Redis reconnecting.')) .connect().catch((error: Error) => logger.error('Redis connection error:', error)); - return client; + return { client, connectPromise }; }; + +export const createRedisClusterInternal = (options: RedisClusterOptions): RedisClusterInternal => { + const cluster = createCluster({ + ...options, + scripts, + }); + + const client = Object.assign(cluster, { + mapMasters, + reduceMasters, + }); + + const connectPromise = client + .on('error', (error: Error) => logger.error('Redis connection error:', error)) + .on('ready', () => logger.info('Redis connection ready.')) + .on('reconnecting', () => logger.info('Redis reconnecting.')) + .connect().catch((error: Error) => logger.error('Redis connection error:', error)); + + return { client, connectPromise }; +}; + +function mapMasters (this: RedisCluster, mapper: (client: RedisClient) => Promise) { + return Bluebird.map(this.masters, (node) => { + return this.nodeClient(node); + }).map(mapper); +} + +function reduceMasters (this: RedisCluster, reducer: (accumulator: Result, client: RedisClient) => Promise, initialValue: Result) { + return Bluebird.map(this.masters, (node) => { + return this.nodeClient(node); + }).reduce(reducer, initialValue); +} diff --git a/src/lib/redis/subscription-client.ts b/src/lib/redis/subscription-client.ts index 06cf92a7..9575c299 100644 --- a/src/lib/redis/subscription-client.ts +++ b/src/lib/redis/subscription-client.ts @@ -1,26 +1,20 @@ +import config from 'config'; import type { RedisClientOptions } from 'redis'; -import { createRedisClientInternal, type RedisClient } from './shared.js'; +import { createRedisClientInternal, type RedisClientInternal } from './shared.js'; export type { RedisClient } from './shared.js'; -let redis: RedisClient; - export const initSubscriptionRedisClient = async () => { - redis = createSubscriptionRedisClient(); - return redis; + const { connectPromise, client } = createSubscriptionRedisClient(); + await connectPromise; + return client; }; -export const createSubscriptionRedisClient = (options?: RedisClientOptions): RedisClient => { +export const createSubscriptionRedisClient = (options?: RedisClientOptions): RedisClientInternal => { return createRedisClientInternal({ + ...config.util.toObject(config.get('redis.shared')) as RedisClientOptions, + ...config.util.toObject(config.get('redis.standaloneNonPersistent')) as RedisClientOptions, ...options, name: 'subscription', }); }; - -export const getSubscriptionRedisClient = (): RedisClient => { - if (!redis) { - redis = createSubscriptionRedisClient(); - } - - return redis; -}; diff --git a/src/lib/ws/server.ts b/src/lib/ws/server.ts index a86baac9..bd14ddfd 100644 --- a/src/lib/ws/server.ts +++ b/src/lib/ws/server.ts @@ -8,7 +8,7 @@ import { ProbeOverride } from '../override/probe-override.js'; import { ProbeIpLimit } from './helper/probe-ip-limit.js'; import { AdoptedProbes } from '../override/adopted-probes.js'; import { AdminData } from '../override/admin-data.js'; -import { getSubscriptionRedisClient } from '../redis/subscription-client.js'; +import { initSubscriptionRedisClient } from '../redis/subscription-client.js'; export interface DefaultEventsMap { // TODO: maybe create type definitions for the events? @@ -35,11 +35,7 @@ let syncedProbeList: SyncedProbeList; export const initWsServer = async () => { const redis = getRedisClient(); - const redisSubClient = getSubscriptionRedisClient(); - const pubClient = redis.duplicate(); - const subClient = redis.duplicate(); - - await Promise.all([ pubClient.connect(), subClient.connect() ]); + const [ subClient1, subClient2 ] = await Promise.all([ initSubscriptionRedisClient(), initSubscriptionRedisClient() ]); io = new Server({ transports: [ 'websocket' ], @@ -48,11 +44,11 @@ export const initWsServer = async () => { pingTimeout: 3000, }); - io.adapter(createShardedAdapter(pubClient, subClient, { + io.adapter(createShardedAdapter(redis, subClient1, { subscriptionMode: 'dynamic-private', })); - syncedProbeList = new SyncedProbeList(redis, redisSubClient, io.of(PROBES_NAMESPACE), probeOverride); + syncedProbeList = new SyncedProbeList(redis, subClient2, io.of(PROBES_NAMESPACE), probeOverride); await syncedProbeList.sync(); syncedProbeList.scheduleSync(); diff --git a/src/measurement/store.ts b/src/measurement/store.ts index 9c16c9de..d4f2a7de 100644 --- a/src/measurement/store.ts +++ b/src/measurement/store.ts @@ -5,12 +5,12 @@ import type { OfflineProbe, Probe } from '../probe/types.js'; import { scopedLogger } from '../lib/logger.js'; import type { MeasurementRecord, MeasurementResult, MeasurementRequest, MeasurementProgressMessage, RequestType, MeasurementResultMessage } from './types.js'; import { getDefaults } from './schema/utils.js'; -import { getMeasurementRedisClient, type RedisClient } from '../lib/redis/measurement-client.js'; +import { getMeasurementRedisClient, type RedisCluster } from '../lib/redis/measurement-client.js'; const logger = scopedLogger('store'); export const getMeasurementKey = (id: string, suffix: string = 'results'): string => { - return `gp:m:${id}:${suffix}`; + return `gp:m:{${id}}:${suffix}`; }; const subtractObjects = (obj1: Record, obj2: Record = {}) => { @@ -35,10 +35,11 @@ const subtractObjects = (obj1: Record, obj2: Record { - return this.redis.sendCommand([ 'JSON.GET', getMeasurementKey(id) ]); + const key = getMeasurementKey(id); + return this.redis.sendCommand(key, true, [ 'JSON.GET', key ]); } async getMeasurement (id: string) { @@ -113,7 +114,10 @@ export class MeasurementStore { } async markFinished (id: string) { - await this.redis.markFinished(id); + await Promise.all([ + this.redis.markFinished(id), + this.redis.hDel('gp:in-progress', id), + ]); } async markFinishedByTimeout (ids: string[]): Promise {