From 38b68128b0af3d183f7f06373ab203f9cdd1c189 Mon Sep 17 00:00:00 2001 From: Alexey Yarmosh Date: Thu, 30 Nov 2023 16:11:16 +0100 Subject: [PATCH] feat: mark measurement as finished if all filtered probes are offline --- src/lib/redis/client.ts | 4 ++-- src/lib/redis/scripts.ts | 31 ++++++++++++++++++++++++++++++- src/measurement/runner.ts | 15 +++++++++------ src/measurement/store.ts | 24 +++++++++++++++++++----- 4 files changed, 60 insertions(+), 14 deletions(-) diff --git a/src/lib/redis/client.ts b/src/lib/redis/client.ts index 5b3c4305..57fc91d5 100644 --- a/src/lib/redis/client.ts +++ b/src/lib/redis/client.ts @@ -7,7 +7,7 @@ import { type RedisFunctions, } from 'redis'; import { scopedLogger } from '../logger.js'; -import { count, recordResult, type RedisScripts } from './scripts.js'; +import { count, recordResult, markFinished, type RedisScripts } from './scripts.js'; const logger = scopedLogger('redis-client'); @@ -23,7 +23,7 @@ export const createRedisClient = async (options?: RedisClientOptions): Promise; + transformArguments (key: string): string[]; transformReply (reply: number): number; } & { SHA1: string; @@ -19,9 +19,19 @@ export type RecordResultScript = { SHA1: string; }; +export type MarkFinishedScript = { + NUMBER_OF_KEYS: number; + SCRIPT: string; + transformArguments (measurementId: string): string[]; + transformReply (): null; +} & { + SHA1: string; +}; + export type RedisScripts = { count: CountScript; recordResult: RecordResultScript; + markFinished: MarkFinishedScript; }; export const count: CountScript = defineScript({ @@ -81,3 +91,22 @@ export const recordResult: RecordResultScript = defineScript({ return JSON.parse(reply) as MeasurementRecord | null; }, }); + +export const markFinished: MarkFinishedScript = defineScript({ + NUMBER_OF_KEYS: 1, + SCRIPT: ` + local measurementId = KEYS[1] + local key = 'gp:measurement:'..measurementId + local awaitingKey = 'gp:measurement:probes_awaiting:'..measurementId + + redis.call('HDEL', 'gp:in-progress', measurementId) + redis.call('DEL', awaitingKey) + redis.call('JSON.SET', key, '$.status', '"finished"') + `, + transformArguments (measurementId) { + return [ measurementId ]; + }, + transformReply () { + return null; + }, +}); diff --git a/src/measurement/runner.ts b/src/measurement/runner.ts index d7746306..8080fef4 100644 --- a/src/measurement/runner.ts +++ b/src/measurement/runner.ts @@ -3,8 +3,6 @@ import config from 'config'; import type { Server } from 'socket.io'; import createHttpError from 'http-errors'; import { getWsServer } from '../lib/ws/server.js'; -import type { RedisClient } from '../lib/redis/client.js'; -import { getRedisClient } from '../lib/redis/client.js'; import { getProbeRouter, type ProbeRouter } from '../probe/router.js'; import type { Probe } from '../probe/types.js'; import { getMetricsAgent, type MetricsAgent } from '../lib/metrics.js'; @@ -16,7 +14,6 @@ import { checkRateLimits } from '../lib/ratelimiter.js'; export class MeasurementRunner { constructor ( private readonly io: Server, - private readonly redis: RedisClient, private readonly store: MeasurementStore, private readonly router: ProbeRouter, private readonly metrics: MetricsAgent, @@ -34,7 +31,13 @@ export class MeasurementRunner { const measurementId = await this.store.createMeasurement(request, probesMap, probesAndOfflineProbes); - this.sendToProbes(measurementId, probesMap, request); + if (probesMap.size) { + this.sendToProbes(measurementId, probesMap, request); + // If all selected probes are offline, immediately mark measurement as finished + } else { + await this.store.markFinished(measurementId); + } + this.metrics.recordMeasurement(request.type); return { measurementId, probesCount: probesAndOfflineProbes.length }; @@ -45,7 +48,7 @@ export class MeasurementRunner { } async recordResult (data: MeasurementResultMessage): Promise { - const record = await this.redis.recordResult(data.measurementId, data.testId, data.result); + const record = await this.store.storeMeasurementResult(data); if (record) { this.metrics.recordMeasurementTime(record.type, Date.now() - new Date(record.createdAt).getTime()); @@ -77,7 +80,7 @@ let runner: MeasurementRunner; export const getMeasurementRunner = () => { if (!runner) { - runner = new MeasurementRunner(getWsServer(), getRedisClient(), getMeasurementStore(), getProbeRouter(), getMetricsAgent()); + runner = new MeasurementRunner(getWsServer(), getMeasurementStore(), getProbeRouter(), getMetricsAgent()); } return runner; diff --git a/src/measurement/store.ts b/src/measurement/store.ts index 58adddf0..c5a5bcae 100644 --- a/src/measurement/store.ts +++ b/src/measurement/store.ts @@ -5,7 +5,7 @@ import type { OfflineProbe, Probe } from '../probe/types.js'; import type { RedisClient } from '../lib/redis/client.js'; import { getRedisClient } from '../lib/redis/client.js'; import { scopedLogger } from '../lib/logger.js'; -import type { MeasurementRecord, MeasurementResult, MeasurementRequest, MeasurementProgressMessage, RequestType } from './types.js'; +import type { MeasurementRecord, MeasurementResult, MeasurementRequest, MeasurementProgressMessage, RequestType, MeasurementResultMessage } from './types.js'; import { getDefaults } from './schema/utils.js'; const logger = scopedLogger('store'); @@ -46,6 +46,12 @@ export class MeasurementStore { return await this.redis.json.get(getMeasurementKey(id)) as MeasurementRecord; } + async getIpsByMeasurementId (id: string): Promise { + const key = getMeasurementKey(id, 'ips'); + const ips = await this.redis.json.get(key) as string[] | null; + return ips || []; + } + async createMeasurement (request: MeasurementRequest, probes: Map, probesAndOfflineProbes: (Probe | OfflineProbe)[]): Promise { const id = cryptoRandomString({ length: 16, type: 'alphanumeric' }); const key = getMeasurementKey(id); @@ -97,10 +103,18 @@ export class MeasurementStore { ]); } - async getIpsByMeasurementId (id: string): Promise { - const key = getMeasurementKey(id, 'ips'); - const ips = await this.redis.json.get(key) as string[] | null; - return ips || []; + async storeMeasurementResult (data: MeasurementResultMessage) { + const record = await this.redis.recordResult(data.measurementId, data.testId, data.result); + + if (record) { + await this.markFinished(data.measurementId); + } + + return record; + } + + async markFinished (id: string) { + await this.redis.markFinished(id); } async markFinishedByTimeout (ids: string[]): Promise {