Skip to content

Commit

Permalink
feat: mark measurement as finished if all filtered probes are offline
Browse files Browse the repository at this point in the history
  • Loading branch information
alexey-yarmosh committed Nov 30, 2023
1 parent 3139ab0 commit 38b6812
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 14 deletions.
4 changes: 2 additions & 2 deletions src/lib/redis/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
type RedisFunctions,
} from 'redis';
import { scopedLogger } from '../logger.js';
import { count, recordResult, type RedisScripts } from './scripts.js';
import { count, recordResult, markFinished, type RedisScripts } from './scripts.js';

const logger = scopedLogger('redis-client');

Expand All @@ -23,7 +23,7 @@ export const createRedisClient = async (options?: RedisClientOptions): Promise<R
const client = createClient({
...config.util.toObject(config.get('redis')) as RedisClientOptions,
...options,
scripts: { count, recordResult },
scripts: { count, recordResult, markFinished },
});

client
Expand Down
31 changes: 30 additions & 1 deletion src/lib/redis/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { MeasurementRecord, MeasurementResultMessage } from '../../measurem
type CountScript = {
NUMBER_OF_KEYS: number;
SCRIPT: string;
transformArguments (key: string): Array<string>;
transformArguments (key: string): string[];
transformReply (reply: number): number;
} & {
SHA1: string;
Expand All @@ -19,9 +19,19 @@ export type RecordResultScript = {
SHA1: string;
};

export type MarkFinishedScript = {
NUMBER_OF_KEYS: number;
SCRIPT: string;
transformArguments (measurementId: string): string[];
transformReply (): null;
} & {
SHA1: string;
};

export type RedisScripts = {
count: CountScript;
recordResult: RecordResultScript;
markFinished: MarkFinishedScript;
};

export const count: CountScript = defineScript({
Expand Down Expand Up @@ -81,3 +91,22 @@ export const recordResult: RecordResultScript = defineScript({
return JSON.parse(reply) as MeasurementRecord | null;
},
});

export const markFinished: MarkFinishedScript = defineScript({
NUMBER_OF_KEYS: 1,
SCRIPT: `
local measurementId = KEYS[1]
local key = 'gp:measurement:'..measurementId
local awaitingKey = 'gp:measurement:probes_awaiting:'..measurementId
redis.call('HDEL', 'gp:in-progress', measurementId)
redis.call('DEL', awaitingKey)
redis.call('JSON.SET', key, '$.status', '"finished"')
`,
transformArguments (measurementId) {
return [ measurementId ];
},
transformReply () {
return null;
},
});
15 changes: 9 additions & 6 deletions src/measurement/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ import config from 'config';
import type { Server } from 'socket.io';
import createHttpError from 'http-errors';
import { getWsServer } from '../lib/ws/server.js';
import type { RedisClient } from '../lib/redis/client.js';
import { getRedisClient } from '../lib/redis/client.js';
import { getProbeRouter, type ProbeRouter } from '../probe/router.js';
import type { Probe } from '../probe/types.js';
import { getMetricsAgent, type MetricsAgent } from '../lib/metrics.js';
Expand All @@ -16,7 +14,6 @@ import { checkRateLimits } from '../lib/ratelimiter.js';
export class MeasurementRunner {
constructor (
private readonly io: Server,
private readonly redis: RedisClient,
private readonly store: MeasurementStore,
private readonly router: ProbeRouter,
private readonly metrics: MetricsAgent,
Expand All @@ -34,7 +31,13 @@ export class MeasurementRunner {

const measurementId = await this.store.createMeasurement(request, probesMap, probesAndOfflineProbes);

this.sendToProbes(measurementId, probesMap, request);
if (probesMap.size) {
this.sendToProbes(measurementId, probesMap, request);
// If all selected probes are offline, immediately mark measurement as finished
} else {
await this.store.markFinished(measurementId);
}

this.metrics.recordMeasurement(request.type);

return { measurementId, probesCount: probesAndOfflineProbes.length };
Expand All @@ -45,7 +48,7 @@ export class MeasurementRunner {
}

async recordResult (data: MeasurementResultMessage): Promise<void> {
const record = await this.redis.recordResult(data.measurementId, data.testId, data.result);
const record = await this.store.storeMeasurementResult(data);

if (record) {
this.metrics.recordMeasurementTime(record.type, Date.now() - new Date(record.createdAt).getTime());
Expand Down Expand Up @@ -77,7 +80,7 @@ let runner: MeasurementRunner;

export const getMeasurementRunner = () => {
if (!runner) {
runner = new MeasurementRunner(getWsServer(), getRedisClient(), getMeasurementStore(), getProbeRouter(), getMetricsAgent());
runner = new MeasurementRunner(getWsServer(), getMeasurementStore(), getProbeRouter(), getMetricsAgent());
}

return runner;
Expand Down
24 changes: 19 additions & 5 deletions src/measurement/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { OfflineProbe, Probe } from '../probe/types.js';
import type { RedisClient } from '../lib/redis/client.js';
import { getRedisClient } from '../lib/redis/client.js';
import { scopedLogger } from '../lib/logger.js';
import type { MeasurementRecord, MeasurementResult, MeasurementRequest, MeasurementProgressMessage, RequestType } from './types.js';
import type { MeasurementRecord, MeasurementResult, MeasurementRequest, MeasurementProgressMessage, RequestType, MeasurementResultMessage } from './types.js';
import { getDefaults } from './schema/utils.js';

const logger = scopedLogger('store');
Expand Down Expand Up @@ -46,6 +46,12 @@ export class MeasurementStore {
return await this.redis.json.get(getMeasurementKey(id)) as MeasurementRecord;
}

async getIpsByMeasurementId (id: string): Promise<string[]> {
const key = getMeasurementKey(id, 'ips');
const ips = await this.redis.json.get(key) as string[] | null;
return ips || [];
}

async createMeasurement (request: MeasurementRequest, probes: Map<number, Probe>, probesAndOfflineProbes: (Probe | OfflineProbe)[]): Promise<string> {
const id = cryptoRandomString({ length: 16, type: 'alphanumeric' });
const key = getMeasurementKey(id);
Expand Down Expand Up @@ -97,10 +103,18 @@ export class MeasurementStore {
]);
}

async getIpsByMeasurementId (id: string): Promise<string[]> {
const key = getMeasurementKey(id, 'ips');
const ips = await this.redis.json.get(key) as string[] | null;
return ips || [];
async storeMeasurementResult (data: MeasurementResultMessage) {
const record = await this.redis.recordResult(data.measurementId, data.testId, data.result);

if (record) {
await this.markFinished(data.measurementId);
}

return record;
}

async markFinished (id: string) {
await this.redis.markFinished(id);
}

async markFinishedByTimeout (ids: string[]): Promise<void> {
Expand Down

0 comments on commit 38b6812

Please sign in to comment.