Skip to content

Commit

Permalink
refactor: support redis cluster mode
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinKolarik committed Jan 6, 2025
1 parent 96c4dcd commit 5599b32
Show file tree
Hide file tree
Showing 13 changed files with 169 additions and 84 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ Most IDEs have plugins integrating the used linter (eslint), including support f
- `SYSTEM_API_KEY={value}` used for integration with the dashboard
- `SERVER_SESSION_COOKIE_SECRET={value}` used to read the shared session cookie
- `DB_CONNECTION_HOST`, `DB_CONNECTION_USER`, `DB_CONNECTION_PASSWORD`, and `DB_CONNECTION_DATABASE` database connection details
- `REDIS_STANDALONE_PERSISTENT_URL`, `STANDALONE_NON_PERSISTENT_URL`, `CLUSTER_MEASUREMENTS_ROOT_NODES_0_URL`, `CLUSTER_MEASUREMENTS_ROOT_NODES_1_URL`, `CLUSTER_MEASUREMENTS_ROOT_NODES_2_URL`, and `REDIS_SHARED_PASSWORD` - redis connection details
26 changes: 23 additions & 3 deletions config/default.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,29 @@ module.exports = {
},
},
redis: {
url: 'redis://localhost:6379',
socket: {
tls: false,
standalonePersistent: {
url: 'redis://localhost:7001',
},
standaloneNonPersistent: {
url: 'redis://localhost:7002',
},
clusterMeasurements: {
// listing three nodes here is enough, the rest will be discovered automatically
rootNodes: [{
url: 'redis://localhost:7101',
},
{
url: 'redis://localhost:7102',
},
{
url: 'redis://localhost:7103',
}],
},
shared: {
password: 'PASSWORD',
socket: {
tls: false,
},
},
},
db: {
Expand Down
6 changes: 0 additions & 6 deletions config/development.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,6 @@ module.exports = {
cookieSecret: 'xxx',
},
},
redis: {
url: 'redis://localhost:16379',
socket: {
tls: false,
},
},
db: {
connection: {
port: 13306,
Expand Down
6 changes: 0 additions & 6 deletions config/test.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,6 @@ module.exports = {
cookieSecret: 'xxx',
},
},
redis: {
url: 'redis://localhost:16379',
socket: {
tls: false,
},
},
db: {
connection: {
port: 13306,
Expand Down
6 changes: 3 additions & 3 deletions src/lib/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { Knex } from 'knex';

import { scopedLogger } from './logger.js';
import { fetchProbes, getWsServer, PROBES_NAMESPACE } from './ws/server.js';
import { getMeasurementRedisClient, type RedisClient } from './redis/measurement-client.js';
import { getMeasurementRedisClient, type RedisCluster } from './redis/measurement-client.js';
import { USERS_TABLE } from './http/auth.js';
import { client } from './sql/client.js';

Expand All @@ -21,14 +21,14 @@ export class MetricsAgent {

constructor (
private readonly io: SocketServer,
private readonly redis: RedisClient,
private readonly redis: RedisCluster,
private readonly sql: Knex,
) {}

run (): void {
this.registerAsyncCollector(`gp.measurement.stored.count`, async () => {
const [ dbSize, awaitingSize ] = await Promise.all([
this.redis.dbSize(),
this.redis.reduceMasters<number>(async (accumulator, client) => accumulator + await client.dbSize(), 0),
this.redis.hLen('gp:in-progress'),
]);

Expand Down
25 changes: 20 additions & 5 deletions src/lib/redis/client.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,41 @@
import config from 'config';
import type { RedisClientOptions } from 'redis';
import { createRedisClientInternal, type RedisClient } from './shared.js';
import { createRedisClientInternal, type RedisClient, type RedisClientInternal } from './shared.js';

export type { RedisClient } from './shared.js';

let redis: RedisClient;
let redisConnectPromise: Promise<unknown>;

export const initRedisClient = async () => {
redis = createRedisClient();
if (redis) {
await redisConnectPromise;
return redis;
}

const { client, connectPromise } = createRedisClient();

redis = client;
redisConnectPromise = connectPromise;

await redisConnectPromise;
return redis;
};

const createRedisClient = (options?: RedisClientOptions): RedisClient => {
const createRedisClient = (options?: RedisClientOptions): RedisClientInternal => {
return createRedisClientInternal({
...config.util.toObject(config.get('redis.shared')) as RedisClientOptions,
...config.util.toObject(config.get('redis.standaloneNonPersistent')) as RedisClientOptions,
...options,
database: 2,
name: 'non-persistent',
});
};

export const getRedisClient = (): RedisClient => {
if (!redis) {
redis = createRedisClient();
const { client, connectPromise } = createRedisClient();
redis = client;
redisConnectPromise = connectPromise;
}

return redis;
Expand Down
36 changes: 25 additions & 11 deletions src/lib/redis/measurement-client.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,40 @@
import type { RedisClientOptions } from 'redis';
import { createRedisClientInternal, type RedisClient } from './shared.js';
import config from 'config';
import type { RedisClientOptions, RedisClusterOptions } from 'redis';
import { createRedisClusterInternal, type RedisCluster, type RedisClusterInternal } from './shared.js';

export type { RedisClient } from './shared.js';
export type { RedisCluster } from './shared.js';

let redis: RedisClient;
let redis: RedisCluster;
let redisConnectPromise: Promise<unknown>;

export const initMeasurementRedisClient = async () => {
redis = createMeasurementRedisClient();
if (redis) {
await redisConnectPromise;
return redis;
}

const { client, connectPromise } = createMeasurementRedisClient();

redis = client;
redisConnectPromise = connectPromise;

await redisConnectPromise;
return redis;
};

export const createMeasurementRedisClient = (options?: RedisClientOptions): RedisClient => {
return createRedisClientInternal({
export const createMeasurementRedisClient = (options?: RedisClusterOptions): RedisClusterInternal => {
return createRedisClusterInternal({
defaults: { ...config.util.toObject(config.get('redis.shared')) as RedisClientOptions },
...config.util.toObject(config.get('redis.clusterMeasurements')) as RedisClusterOptions,
...options,
database: 0,
name: 'measurement',
});
};

export const getMeasurementRedisClient = (): RedisClient => {
export const getMeasurementRedisClient = (): RedisCluster => {
if (!redis) {
redis = createMeasurementRedisClient();
const { client, connectPromise } = createMeasurementRedisClient();
redis = client;
redisConnectPromise = connectPromise;
}

return redis;
Expand Down
25 changes: 20 additions & 5 deletions src/lib/redis/persistent-client.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,41 @@
import config from 'config';
import type { RedisClientOptions } from 'redis';
import { createRedisClientInternal, type RedisClient } from './shared.js';
import { createRedisClientInternal, type RedisClient, type RedisClientInternal } from './shared.js';

export type { RedisClient } from './shared.js';

let redis: RedisClient;
let redisConnectPromise: Promise<unknown>;

export const initPersistentRedisClient = async () => {
redis = createPersistentRedisClient();
if (redis) {
await redisConnectPromise;
return redis;
}

const { client, connectPromise } = createPersistentRedisClient();

redis = client;
redisConnectPromise = connectPromise;

await redisConnectPromise;
return redis;
};

export const createPersistentRedisClient = (options?: RedisClientOptions): RedisClient => {
export const createPersistentRedisClient = (options?: RedisClientOptions): RedisClientInternal => {
return createRedisClientInternal({
...config.util.toObject(config.get('redis.shared')) as RedisClientOptions,
...config.util.toObject(config.get('redis.standalonePersistent')) as RedisClientOptions,
...options,
database: 1,
name: 'persistent',
});
};

export const getPersistentRedisClient = (): RedisClient => {
if (!redis) {
redis = createPersistentRedisClient();
const { client, connectPromise } = createPersistentRedisClient();
redis = client;
redisConnectPromise = connectPromise;
}

return redis;
Expand Down
22 changes: 9 additions & 13 deletions src/lib/redis/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export type RedisScripts = {
};

const recordResult: RecordResultScript = defineScript({
FIRST_KEY_INDEX: 0, // Needed in clusters: https://github.com/redis/node-redis/issues/2521
NUMBER_OF_KEYS: 2,
SCRIPT: `
local keyMeasurementResults = KEYS[1]
Expand All @@ -51,8 +52,8 @@ const recordResult: RecordResultScript = defineScript({
transformArguments (measurementId, testId, data) {
return [
// keys
`gp:m:${measurementId}:results`,
`gp:m:${measurementId}:probes_awaiting`,
`gp:m:{${measurementId}}:results`,
`gp:m:{${measurementId}}:probes_awaiting`,
// values
testId,
JSON.stringify(data),
Expand All @@ -65,25 +66,20 @@ const recordResult: RecordResultScript = defineScript({
});

const markFinished: MarkFinishedScript = defineScript({
NUMBER_OF_KEYS: 3,
FIRST_KEY_INDEX: 0, // Needed in clusters: https://github.com/redis/node-redis/issues/2521
NUMBER_OF_KEYS: 2,
SCRIPT: `
local keyInProgress = KEYS[1]
local keyMeasurementResults = KEYS[2]
local keyMeasurementAwaiting = KEYS[3]
local measurementId = ARGV[1]
local keyMeasurementResults = KEYS[1]
local keyMeasurementAwaiting = KEYS[2]
redis.call('HDEL', keyInProgress, measurementId)
redis.call('DEL', keyMeasurementAwaiting)
redis.call('JSON.SET', keyMeasurementResults, '$.status', '"finished"')
`,
transformArguments (measurementId) {
return [
// keys
'gp:in-progress',
`gp:m:${measurementId}:results`,
`gp:m:${measurementId}:probes_awaiting`,
// values
measurementId,
`gp:m:{${measurementId}}:results`,
`gp:m:{${measurementId}}:probes_awaiting`,
];
},
transformReply () {
Expand Down
52 changes: 47 additions & 5 deletions src/lib/redis/shared.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,72 @@
import config from 'config';
import {
createClient,
createCluster,
type RedisClientOptions,
type RedisClientType,
type RedisClusterOptions,
type RedisClusterType,
type RedisDefaultModules,
type RedisFunctions,
} from 'redis';
import Bluebird from 'bluebird';
import { type RedisScripts, scripts } from './scripts.js';
import { scopedLogger } from '../logger.js';

const logger = scopedLogger('redis-client');

type ClusterExtensions = {
mapMasters: typeof mapMasters,
reduceMasters: typeof reduceMasters,
};

export type RedisClient = RedisClientType<RedisDefaultModules, RedisFunctions, RedisScripts>;
export type RedisCluster = RedisClusterType<RedisDefaultModules, RedisFunctions, RedisScripts> & ClusterExtensions;
export type RedisClientInternal = { connectPromise: Promise<unknown>, client: RedisClient };
export type RedisClusterInternal = { connectPromise: Promise<unknown>, client: RedisCluster };

export const createRedisClientInternal = (options?: RedisClientOptions): RedisClient => {
export const createRedisClientInternal = (options: RedisClientOptions): RedisClientInternal => {
const client = createClient({
...config.util.toObject(config.get('redis')) as RedisClientOptions,
...options,
scripts,
});

client
const connectPromise = client
.on('error', (error: Error) => logger.error('Redis connection error:', error))
.on('ready', () => logger.info('Redis connection ready.'))
.on('reconnecting', () => logger.info('Redis reconnecting.'))
.connect().catch((error: Error) => logger.error('Redis connection error:', error));

return client;
return { client, connectPromise };
};

export const createRedisClusterInternal = (options: RedisClusterOptions): RedisClusterInternal => {
const cluster = createCluster({
...options,
scripts,
});

const client = Object.assign(cluster, {
mapMasters,
reduceMasters,
});

const connectPromise = client
.on('error', (error: Error) => logger.error('Redis connection error:', error))
.on('ready', () => logger.info('Redis connection ready.'))
.on('reconnecting', () => logger.info('Redis reconnecting.'))
.connect().catch((error: Error) => logger.error('Redis connection error:', error));

return { client, connectPromise };
};

function mapMasters<Result> (this: RedisCluster, mapper: (client: RedisClient) => Promise<Result>) {
return Bluebird.map(this.masters, (node) => {
return this.nodeClient(node);
}).map(mapper);
}

function reduceMasters<Result> (this: RedisCluster, reducer: (accumulator: Result, client: RedisClient) => Promise<Result>, initialValue: Result) {
return Bluebird.map(this.masters, (node) => {
return this.nodeClient(node);
}).reduce(reducer, initialValue);
}
22 changes: 8 additions & 14 deletions src/lib/redis/subscription-client.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,20 @@
import config from 'config';
import type { RedisClientOptions } from 'redis';
import { createRedisClientInternal, type RedisClient } from './shared.js';
import { createRedisClientInternal, type RedisClientInternal } from './shared.js';

export type { RedisClient } from './shared.js';

let redis: RedisClient;

export const initSubscriptionRedisClient = async () => {
redis = createSubscriptionRedisClient();
return redis;
const { connectPromise, client } = createSubscriptionRedisClient();
await connectPromise;
return client;
};

export const createSubscriptionRedisClient = (options?: RedisClientOptions): RedisClient => {
export const createSubscriptionRedisClient = (options?: RedisClientOptions): RedisClientInternal => {
return createRedisClientInternal({
...config.util.toObject(config.get('redis.shared')) as RedisClientOptions,
...config.util.toObject(config.get('redis.standaloneNonPersistent')) as RedisClientOptions,
...options,
name: 'subscription',
});
};

export const getSubscriptionRedisClient = (): RedisClient => {
if (!redis) {
redis = createSubscriptionRedisClient();
}

return redis;
};
Loading

0 comments on commit 5599b32

Please sign in to comment.