Skip to content

Commit

Permalink
feat: initially separate on pers and non-pers redis dbs
Browse files Browse the repository at this point in the history
  • Loading branch information
alexey-yarmosh committed Dec 19, 2023
1 parent 6c42d17 commit b5217d2
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 21 deletions.
4 changes: 2 additions & 2 deletions probes-stats/known.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import fs from 'node:fs';
import { getRedisClient, initRedis } from '../src/lib/redis/client.js';
import { getRedisClient, initRedisClient } from '../src/lib/redis/client.js';
import { LocationInfo, createGeoipClient } from '../src/lib/geoip/client.js';
import { normalizeCityName } from '../src/lib/geoip/utils.js';
import { fastlyLookup } from '../src/lib/geoip/providers/fastly.js';
Expand All @@ -19,7 +19,7 @@ import sheet from './known-probes.json' assert { type: 'json' };
import { populateCitiesList } from '../src/lib/geoip/city-approximation.js';

await populateIpWhiteList();
await initRedis();
await initRedisClient();
await populateCitiesList();
const geoIpClient = createGeoipClient();

Expand Down
6 changes: 3 additions & 3 deletions src/lib/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import process from 'node:process';
import type { Server as SocketServer } from 'socket.io';
import newrelic from 'newrelic';

import { getRedisClient, type RedisClient } from './redis/client.js';
import { getWsServer, PROBES_NAMESPACE } from './ws/server.js';
import { scopedLogger } from './logger.js';
import { getPersistentRedisClient, PersistentRedisClient } from './redis/persistent-client.js';

const logger = scopedLogger('metrics');

Expand All @@ -13,7 +13,7 @@ export class MetricsAgent {

constructor (
private readonly io: SocketServer,
private readonly redis: RedisClient,
private readonly redis: PersistentRedisClient,
) {}

run (): void {
Expand Down Expand Up @@ -62,7 +62,7 @@ let agent: MetricsAgent;

export const getMetricsAgent = () => {
if (!agent) {
agent = new MetricsAgent(getWsServer(), getRedisClient());
agent = new MetricsAgent(getWsServer(), getPersistentRedisClient());
}

return agent;
Expand Down
4 changes: 2 additions & 2 deletions src/lib/ratelimiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ import type { Context } from 'koa';
import { RateLimiterRedis } from 'rate-limiter-flexible';
import requestIp from 'request-ip';
import type { RateLimiterRes } from 'rate-limiter-flexible';
import { createRedisClient } from './redis/client.js';
import { createPersistentRedisClient } from './redis/persistent-client.js';
import createHttpError from 'http-errors';

const redisClient = await createRedisClient({ legacyMode: true });
const redisClient = await createPersistentRedisClient({ legacyMode: true });

const rateLimiter = new RateLimiterRedis({
storeClient: redisClient,
Expand Down
9 changes: 6 additions & 3 deletions src/lib/redis/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@ export type RedisClient = RedisClientType<RedisDefaultModules, RedisFunctions, R

let redis: RedisClient;

export const initRedis = async () => {
export const initRedisClient = async () => {
redis = await createRedisClient();
await redis.flushDb();
};

export const createRedisClient = async (options?: RedisClientOptions): Promise<RedisClient> => {
const createRedisClient = async (options?: RedisClientOptions): Promise<RedisClient> => {
const client = createClient({
...config.util.toObject(config.get('redis')) as RedisClientOptions,
...options,
database: 0,
name: 'non-persistent',
scripts: { count, recordResult, markFinished },
});

Expand All @@ -38,7 +41,7 @@ export const createRedisClient = async (options?: RedisClientOptions): Promise<R

export const getRedisClient = (): RedisClient => {
if (!redis) {
throw new Error('redis connection is not initialize yet');
throw new Error('redis connection is not initialized yet');
}

return redis;
Expand Down
47 changes: 47 additions & 0 deletions src/lib/redis/persistent-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import config from 'config';
import {
createClient,
type RedisClientType,
type RedisDefaultModules,
type RedisClientOptions,
type RedisFunctions,
} from 'redis';
import { scopedLogger } from '../logger.js';
import { count, recordResult, markFinished, type RedisScripts } from './scripts.js';

const logger = scopedLogger('persistent-redis-client');

export type PersistentRedisClient = RedisClientType<RedisDefaultModules, RedisFunctions, RedisScripts>;

let redis: PersistentRedisClient;

export const initPersistentRedisClient = async () => {
redis = await createPersistentRedisClient();
};

export const createPersistentRedisClient = async (options?: RedisClientOptions): Promise<PersistentRedisClient> => {
const client = createClient({
...config.util.toObject(config.get('redis')) as RedisClientOptions,
...options,
database: 1,
name: 'persistent',
scripts: { count, recordResult, markFinished },
});

client
.on('error', (error: Error) => logger.error('connection error', error))
.on('ready', () => logger.info('connection ready'))
.on('reconnecting', () => logger.info('reconnecting'));

await client.connect();

return client;
};

export const getPersistentRedisClient = (): PersistentRedisClient => {
if (!redis) {
throw new Error('redis connection to persistent db is not initialized yet');
}

return redis;
};
6 changes: 4 additions & 2 deletions src/lib/server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Server } from 'node:http';
import { initRedis } from './redis/client.js';
import { initRedisClient } from './redis/client.js';
import { initWsServer } from './ws/server.js';
import { getMetricsAgent } from './metrics.js';
import { populateMemList as populateMemMalwareList } from './malware/client.js';
Expand All @@ -8,9 +8,11 @@ import { populateMemList as populateIpWhiteList } from './geoip/whitelist.js';
import { populateCitiesList } from './geoip/city-approximation.js';
import { adoptedProbes } from './adopted-probes.js';
import { reconnectProbes } from './ws/helper/reconnect-probes.js';
import { initPersistentRedisClient } from './redis/persistent-client.js';

export const createServer = async (): Promise<Server> => {
await initRedis();
await initRedisClient();
await initPersistentRedisClient();

// Populate memory malware list
await populateMemMalwareList();
Expand Down
7 changes: 3 additions & 4 deletions src/measurement/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ import config from 'config';
import _ from 'lodash';
import cryptoRandomString from 'crypto-random-string';
import type { OfflineProbe, Probe } from '../probe/types.js';
import type { RedisClient } from '../lib/redis/client.js';
import { getRedisClient } from '../lib/redis/client.js';
import { scopedLogger } from '../lib/logger.js';
import type { MeasurementRecord, MeasurementResult, MeasurementRequest, MeasurementProgressMessage, RequestType, MeasurementResultMessage } from './types.js';
import { getDefaults } from './schema/utils.js';
import { getPersistentRedisClient, PersistentRedisClient } from '../lib/redis/persistent-client.js';

const logger = scopedLogger('store');

Expand Down Expand Up @@ -42,7 +41,7 @@ const substractObjects = (obj1: Record<string, unknown>, obj2: Record<string, un
};

export class MeasurementStore {
constructor (private readonly redis: RedisClient) {}
constructor (private readonly redis: PersistentRedisClient) {}

async getMeasurementString (id: string): Promise<string> {
return this.redis.sendCommand([ 'JSON.GET', getMeasurementKey(id) ]);
Expand Down Expand Up @@ -239,7 +238,7 @@ let store: MeasurementStore;

export const getMeasurementStore = () => {
if (!store) {
store = new MeasurementStore(getRedisClient());
store = new MeasurementStore(getPersistentRedisClient());
store.scheduleCleanup();
}

Expand Down
10 changes: 6 additions & 4 deletions test/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import {
populateNockCitiesList,
} from './utils/populate-static-files.js';
import chaiOas from './plugins/oas/index.js';
import { getRedisClient, initRedis } from '../src/lib/redis/client.js';
import { initRedisClient } from '../src/lib/redis/client.js';
import { getPersistentRedisClient, initPersistentRedisClient } from '../src/lib/redis/persistent-client.js';
import { client as sql } from '../src/lib/sql/client.js';

const dbConfig = config.get<{ connection: { database: string, host: string } }>('db');
Expand All @@ -26,9 +27,10 @@ if (!dbConfig.connection.database.endsWith('-test') && dbConfig.connection.host
before(async () => {
chai.use(await chaiOas({ specPath: path.join(fileURLToPath(new URL('.', import.meta.url)), '../public/v1/spec.yaml') }));

await initRedis();
const redis = getRedisClient();
await redis.flushDb();
await initRedisClient();
await initPersistentRedisClient();
const persistentRedisClient = getPersistentRedisClient();
await persistentRedisClient.flushDb();

await dropAllTables(sql);
await sql.migrate.latest();
Expand Down
2 changes: 1 addition & 1 deletion test/tests/unit/measurement/store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ describe('measurement store', () => {

before(async () => {
td.replaceEsm('crypto-random-string', null, () => 'measurementid');
await td.replaceEsm('../../../../src/lib/redis/client.ts', { getRedisClient: () => redisMock });
await td.replaceEsm('../../../../src/lib/redis/persistent-client.ts', { getPersistentRedisClient: () => redisMock });
getMeasurementStore = (await import('../../../../src/measurement/store.js')).getMeasurementStore;
});

Expand Down

0 comments on commit b5217d2

Please sign in to comment.