Skip to content

Commit

Permalink
feat: small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
alexey-yarmosh committed Nov 30, 2023
1 parent 38b6812 commit 20997b1
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/lib/ratelimiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const setRateLimitHeaders = (ctx: Context, result: RateLimiterRes) => {
ctx.set('X-RateLimit-Remaining', `${result.remainingPoints}`);
};

export const checkRateLimits = async (ctx: Context, numberOfProbes: number) => {
export const checkRateLimit = async (ctx: Context, numberOfProbes: number) => {
if (ctx['isAdmin']) {
return;
}
Expand Down
20 changes: 10 additions & 10 deletions src/measurement/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { getMetricsAgent, type MetricsAgent } from '../lib/metrics.js';
import type { MeasurementStore } from './store.js';
import { getMeasurementStore } from './store.js';
import type { MeasurementRequest, MeasurementResultMessage, MeasurementProgressMessage } from './types.js';
import { checkRateLimits } from '../lib/ratelimiter.js';
import { checkRateLimit } from '../lib/ratelimiter.js';

export class MeasurementRunner {
constructor (
Expand All @@ -21,26 +21,26 @@ export class MeasurementRunner {

async run (ctx: Context): Promise<{measurementId: string; probesCount: number;}> {
const request = ctx.request.body as MeasurementRequest;
const { probesMap, probesAndOfflineProbes } = await this.router.findMatchingProbes(request.locations, request.limit);
const { onlineProbesMap, allProbes } = await this.router.findMatchingProbes(request.locations, request.limit);

if (probesAndOfflineProbes.length === 0) {
if (allProbes.length === 0) {
throw createHttpError(422, 'No suitable probes found.', { type: 'no_probes_found' });
}

await checkRateLimits(ctx, probesMap.size);
await checkRateLimit(ctx, onlineProbesMap.size);

const measurementId = await this.store.createMeasurement(request, probesMap, probesAndOfflineProbes);
const measurementId = await this.store.createMeasurement(request, onlineProbesMap, allProbes);

if (probesMap.size) {
this.sendToProbes(measurementId, probesMap, request);
if (onlineProbesMap.size) {
this.sendToProbes(measurementId, onlineProbesMap, request);
// If all selected probes are offline, immediately mark measurement as finished
} else {
await this.store.markFinished(measurementId);
}

this.metrics.recordMeasurement(request.type);

return { measurementId, probesCount: probesAndOfflineProbes.length };
return { measurementId, probesCount: allProbes.length };
}

async recordProgress (data: MeasurementProgressMessage): Promise<void> {
Expand All @@ -55,10 +55,10 @@ export class MeasurementRunner {
}
}

private sendToProbes (measurementId: string, probesMap: Map<number, Probe>, request: MeasurementRequest) {
private sendToProbes (measurementId: string, onlineProbesMap: Map<number, Probe>, request: MeasurementRequest) {
let inProgressProbes = 0;
const maxInProgressProbes = config.get<number>('measurement.maxInProgressProbes');
probesMap.forEach((probe, index) => {
onlineProbesMap.forEach((probe, index) => {
const inProgressUpdates = request.inProgressUpdates && inProgressProbes++ < maxInProgressProbes;
this.io.of('probes').to(probe.client).emit('probe:measurement:request', {
measurementId,
Expand Down
8 changes: 4 additions & 4 deletions src/measurement/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ export class MeasurementStore {
return ips || [];
}

async createMeasurement (request: MeasurementRequest, probes: Map<number, Probe>, probesAndOfflineProbes: (Probe | OfflineProbe)[]): Promise<string> {
async createMeasurement (request: MeasurementRequest, probes: Map<number, Probe>, allProbes: (Probe | OfflineProbe)[]): Promise<string> {
const id = cryptoRandomString({ length: 16, type: 'alphanumeric' });
const key = getMeasurementKey(id);

const results = this.probesToResults(probesAndOfflineProbes, request.type);
const results = this.probesToResults(allProbes, request.type);
const probesAwaitingTtl = config.get<number>('measurement.timeout') + 5;
const startTime = new Date();
const measurement: MeasurementRecord = {
Expand All @@ -67,7 +67,7 @@ export class MeasurementStore {
updatedAt: startTime.toISOString(),
target: request.target,
limit: request.limit,
probesCount: probesAndOfflineProbes.length,
probesCount: allProbes.length,
locations: request.locations,
measurementOptions: request.measurementOptions,
results,
Expand All @@ -77,7 +77,7 @@ export class MeasurementStore {
await Promise.all([
this.redis.hSet('gp:in-progress', id, startTime.getTime()),
this.redis.set(getMeasurementKey(id, 'probes_awaiting'), probes.size, { EX: probesAwaitingTtl }),
this.redis.json.set(getMeasurementKey(id, 'ips'), '$', probesAndOfflineProbes.map(probe => probe.ipAddress)),
this.redis.json.set(getMeasurementKey(id, 'ips'), '$', allProbes.map(probe => probe.ipAddress)),
this.redis.json.set(key, '$', measurementWithoutDefaults),
this.redis.expire(getMeasurementKey(id, 'ips'), config.get<number>('measurement.resultTTL')),
this.redis.expire(key, config.get<number>('measurement.resultTTL')),
Expand Down
40 changes: 20 additions & 20 deletions src/probe/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,26 @@ export class ProbeRouter {
globalLimit = 1,
) {
const connectedProbes = await this.fetchProbes();
let probesMap: Map<number, Probe>;
let probesAndOfflineProbes: (Probe | OfflineProbe)[] = [];
let onlineProbesMap: Map<number, Probe>;
let allProbes: (Probe | OfflineProbe)[] = [];

if (typeof locations === 'string') {
({ probesMap, probesAndOfflineProbes } = await this.findWithMeasurementId(connectedProbes, locations));
({ onlineProbesMap, allProbes } = await this.findWithMeasurementId(connectedProbes, locations));
} else if (locations.some(l => l.limit)) {
const filtered = this.findWithLocationLimit(connectedProbes, locations);
probesAndOfflineProbes = filtered;
probesMap = new Map(filtered.entries());
allProbes = filtered;
onlineProbesMap = new Map(filtered.entries());
} else if (locations.length > 0) {
const filtered = this.findWithGlobalLimit(connectedProbes, locations, globalLimit);
probesAndOfflineProbes = filtered;
probesMap = new Map(filtered.entries());
allProbes = filtered;
onlineProbesMap = new Map(filtered.entries());
} else {
const filtered = this.findGloballyDistributed(connectedProbes, globalLimit);
probesAndOfflineProbes = filtered;
probesMap = new Map(filtered.entries());
allProbes = filtered;
onlineProbesMap = new Map(filtered.entries());
}

return { probesMap, probesAndOfflineProbes };
return { onlineProbesMap, allProbes };
}

private async fetchProbes (): Promise<Probe[]> {
Expand Down Expand Up @@ -85,21 +85,21 @@ export class ProbeRouter {
let prevMeasurement: MeasurementRecord | undefined;
const prevIps = await this.store.getIpsByMeasurementId(measurementId);

const probesMap: Map<number, Probe> = new Map();
const probesAndOfflineProbes: (Probe | OfflineProbe)[] = [];
const onlineProbesMap: Map<number, Probe> = new Map();
const allProbes: (Probe | OfflineProbe)[] = [];

const emptyResult = { probesMap: new Map(), probesAndOfflineProbes: [] } as {
probesMap: Map<number, Probe>;
probesAndOfflineProbes: (Probe | OfflineProbe)[];
const emptyResult = { onlineProbesMap: new Map(), allProbes: [] } as {
onlineProbesMap: Map<number, Probe>;
allProbes: (Probe | OfflineProbe)[];
};

for (let i = 0; i < prevIps.length; i++) {
const ip = prevIps[i]!;
const connectedProbe = ipToConnectedProbe.get(ip);

if (connectedProbe) {
probesMap.set(i, connectedProbe);
probesAndOfflineProbes.push(connectedProbe);
onlineProbesMap.set(i, connectedProbe);
allProbes.push(connectedProbe);
} else {
if (!prevMeasurement) {
prevMeasurement = await this.store.getMeasurementJson(measurementId);
Expand All @@ -116,11 +116,11 @@ export class ProbeRouter {
}

const offlineProbe = this.testToOfflineProbe(prevTest, ip);
probesAndOfflineProbes.push(offlineProbe);
allProbes.push(offlineProbe);
}
}

return { probesMap, probesAndOfflineProbes };
return { onlineProbesMap, allProbes };
}

private testToOfflineProbe = (test: MeasurementResult, ip: string): OfflineProbe => ({
Expand Down Expand Up @@ -156,7 +156,7 @@ export class ProbeRouter {
},
jobs: { count: 0 },
},
});
} as OfflineProbe);
}

// Factory
Expand Down
3 changes: 2 additions & 1 deletion src/probe/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export type ProbeStats = {
};

export type Tag = {
type: 'system' | 'admin' | 'user' | 'offline';
type: 'system' | 'admin' | 'user';
value: string;
};

Expand Down Expand Up @@ -61,6 +61,7 @@ export type OfflineProbe = Modify<Probe, {
host: null;
index: [];
tags: {
type: 'offline';
value: string;
}[];
stats: {
Expand Down

0 comments on commit 20997b1

Please sign in to comment.