Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow reusing measurement IDs for selecting same group of probes #453

Merged
merged 25 commits into from
Dec 11, 2023
Merged
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b1c186a
refactor: move suffix before the id
alexey-yarmosh Nov 28, 2023
3ce4207
feat: initially implement locations as id
alexey-yarmosh Nov 28, 2023
9360811
feat: make measurement cloning work
alexey-yarmosh Nov 29, 2023
28a7db5
fix: rate limits
alexey-yarmosh Nov 29, 2023
dddd576
refactor: store empty location fields as nulls
alexey-yarmosh Nov 29, 2023
3548037
fix: get rid of undefined in probe location object
alexey-yarmosh Nov 29, 2023
33cd6ad
feat: update offline probe fields values
alexey-yarmosh Nov 29, 2023
ad15903
refactor: refactor
alexey-yarmosh Nov 30, 2023
4fdd99d
feat: mark measurement as finished if all filtered probes are offline
alexey-yarmosh Nov 30, 2023
9637454
feat: small fixes
alexey-yarmosh Nov 30, 2023
1616cf0
test: fix part of the tests
alexey-yarmosh Nov 30, 2023
1674097
test: fix existing tests
alexey-yarmosh Dec 1, 2023
1a55b6c
feat: user prev implementation of measurement key
alexey-yarmosh Dec 4, 2023
7e3c07a
refactor: rename store methods
alexey-yarmosh Dec 4, 2023
5235ab1
test: add unit tests
alexey-yarmosh Dec 4, 2023
2bd1351
docs: update openapi docs
alexey-yarmosh Dec 5, 2023
2879a12
feat: move isHosting to the location object
alexey-yarmosh Dec 5, 2023
e30da84
fix: remove redundant commands from redis script
alexey-yarmosh Dec 5, 2023
bc5599c
docs: update descriptions
MartinKolarik Dec 6, 2023
d08d7d7
feat: copy prev mes location and limit if measurement is created by id
alexey-yarmosh Dec 7, 2023
0ff18fc
feat: add "offline" test status
alexey-yarmosh Dec 7, 2023
e649720
feat: allow to pass measurement id in the magic field
alexey-yarmosh Dec 7, 2023
66b57da
feat: replace magic with id in locations field with actual location
alexey-yarmosh Dec 8, 2023
75d90b7
test: add offline test result int test
alexey-yarmosh Dec 8, 2023
4917982
docs: update descriptions
MartinKolarik Dec 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: small fixes
alexey-yarmosh committed Dec 5, 2023
commit 9637454c46881e2ae19cb195442e42f5b735f514
2 changes: 1 addition & 1 deletion src/lib/ratelimiter.ts
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ const setRateLimitHeaders = (ctx: Context, result: RateLimiterRes) => {
ctx.set('X-RateLimit-Remaining', `${result.remainingPoints}`);
};

export const checkRateLimits = async (ctx: Context, numberOfProbes: number) => {
export const checkRateLimit = async (ctx: Context, numberOfProbes: number) => {
if (ctx['isAdmin']) {
return;
}
20 changes: 10 additions & 10 deletions src/measurement/runner.ts
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ import { getMetricsAgent, type MetricsAgent } from '../lib/metrics.js';
import type { MeasurementStore } from './store.js';
import { getMeasurementStore } from './store.js';
import type { MeasurementRequest, MeasurementResultMessage, MeasurementProgressMessage } from './types.js';
import { checkRateLimits } from '../lib/ratelimiter.js';
import { checkRateLimit } from '../lib/ratelimiter.js';

export class MeasurementRunner {
constructor (
@@ -21,26 +21,26 @@ export class MeasurementRunner {

async run (ctx: Context): Promise<{measurementId: string; probesCount: number;}> {
const request = ctx.request.body as MeasurementRequest;
const { probesMap, probesAndOfflineProbes } = await this.router.findMatchingProbes(request.locations, request.limit);
const { onlineProbesMap, allProbes } = await this.router.findMatchingProbes(request.locations, request.limit);

if (probesAndOfflineProbes.length === 0) {
if (allProbes.length === 0) {
throw createHttpError(422, 'No suitable probes found.', { type: 'no_probes_found' });
}

await checkRateLimits(ctx, probesMap.size);
await checkRateLimit(ctx, onlineProbesMap.size);

const measurementId = await this.store.createMeasurement(request, probesMap, probesAndOfflineProbes);
const measurementId = await this.store.createMeasurement(request, onlineProbesMap, allProbes);

if (probesMap.size) {
this.sendToProbes(measurementId, probesMap, request);
if (onlineProbesMap.size) {
this.sendToProbes(measurementId, onlineProbesMap, request);
// If all selected probes are offline, immediately mark measurement as finished
} else {
await this.store.markFinished(measurementId);
}

this.metrics.recordMeasurement(request.type);

return { measurementId, probesCount: probesAndOfflineProbes.length };
return { measurementId, probesCount: allProbes.length };
}

async recordProgress (data: MeasurementProgressMessage): Promise<void> {
@@ -55,10 +55,10 @@ export class MeasurementRunner {
}
}

private sendToProbes (measurementId: string, probesMap: Map<number, Probe>, request: MeasurementRequest) {
private sendToProbes (measurementId: string, onlineProbesMap: Map<number, Probe>, request: MeasurementRequest) {
let inProgressProbes = 0;
const maxInProgressProbes = config.get<number>('measurement.maxInProgressProbes');
probesMap.forEach((probe, index) => {
onlineProbesMap.forEach((probe, index) => {
const inProgressUpdates = request.inProgressUpdates && inProgressProbes++ < maxInProgressProbes;
this.io.of('probes').to(probe.client).emit('probe:measurement:request', {
measurementId,
8 changes: 4 additions & 4 deletions src/measurement/store.ts
Original file line number Diff line number Diff line change
@@ -52,11 +52,11 @@ export class MeasurementStore {
return ips || [];
}

async createMeasurement (request: MeasurementRequest, probes: Map<number, Probe>, probesAndOfflineProbes: (Probe | OfflineProbe)[]): Promise<string> {
async createMeasurement (request: MeasurementRequest, probes: Map<number, Probe>, allProbes: (Probe | OfflineProbe)[]): Promise<string> {
const id = cryptoRandomString({ length: 16, type: 'alphanumeric' });
const key = getMeasurementKey(id);

const results = this.probesToResults(probesAndOfflineProbes, request.type);
const results = this.probesToResults(allProbes, request.type);
const probesAwaitingTtl = config.get<number>('measurement.timeout') + 5;
const startTime = new Date();
const measurement: MeasurementRecord = {
@@ -67,7 +67,7 @@ export class MeasurementStore {
updatedAt: startTime.toISOString(),
target: request.target,
limit: request.limit,
probesCount: probesAndOfflineProbes.length,
probesCount: allProbes.length,
locations: request.locations,
measurementOptions: request.measurementOptions,
results,
@@ -77,7 +77,7 @@ export class MeasurementStore {
await Promise.all([
this.redis.hSet('gp:in-progress', id, startTime.getTime()),
this.redis.set(getMeasurementKey(id, 'probes_awaiting'), probes.size, { EX: probesAwaitingTtl }),
this.redis.json.set(getMeasurementKey(id, 'ips'), '$', probesAndOfflineProbes.map(probe => probe.ipAddress)),
this.redis.json.set(getMeasurementKey(id, 'ips'), '$', allProbes.map(probe => probe.ipAddress)),
this.redis.json.set(key, '$', measurementWithoutDefaults),
this.redis.expire(getMeasurementKey(id, 'ips'), config.get<number>('measurement.resultTTL')),
this.redis.expire(key, config.get<number>('measurement.resultTTL')),
40 changes: 20 additions & 20 deletions src/probe/router.ts
Original file line number Diff line number Diff line change
@@ -19,26 +19,26 @@ export class ProbeRouter {
globalLimit = 1,
) {
const connectedProbes = await this.fetchProbes();
let probesMap: Map<number, Probe>;
let probesAndOfflineProbes: (Probe | OfflineProbe)[] = [];
let onlineProbesMap: Map<number, Probe>;
let allProbes: (Probe | OfflineProbe)[] = [];

if (typeof locations === 'string') {
({ probesMap, probesAndOfflineProbes } = await this.findWithMeasurementId(connectedProbes, locations));
({ onlineProbesMap, allProbes } = await this.findWithMeasurementId(connectedProbes, locations));
} else if (locations.some(l => l.limit)) {
const filtered = this.findWithLocationLimit(connectedProbes, locations);
probesAndOfflineProbes = filtered;
probesMap = new Map(filtered.entries());
allProbes = filtered;
onlineProbesMap = new Map(filtered.entries());
} else if (locations.length > 0) {
const filtered = this.findWithGlobalLimit(connectedProbes, locations, globalLimit);
probesAndOfflineProbes = filtered;
probesMap = new Map(filtered.entries());
allProbes = filtered;
onlineProbesMap = new Map(filtered.entries());
} else {
const filtered = this.findGloballyDistributed(connectedProbes, globalLimit);
probesAndOfflineProbes = filtered;
probesMap = new Map(filtered.entries());
allProbes = filtered;
onlineProbesMap = new Map(filtered.entries());
}

return { probesMap, probesAndOfflineProbes };
return { onlineProbesMap, allProbes };
}

private async fetchProbes (): Promise<Probe[]> {
@@ -85,21 +85,21 @@ export class ProbeRouter {
let prevMeasurement: MeasurementRecord | undefined;
const prevIps = await this.store.getIpsByMeasurementId(measurementId);

const probesMap: Map<number, Probe> = new Map();
const probesAndOfflineProbes: (Probe | OfflineProbe)[] = [];
const onlineProbesMap: Map<number, Probe> = new Map();
const allProbes: (Probe | OfflineProbe)[] = [];

const emptyResult = { probesMap: new Map(), probesAndOfflineProbes: [] } as {
probesMap: Map<number, Probe>;
probesAndOfflineProbes: (Probe | OfflineProbe)[];
const emptyResult = { onlineProbesMap: new Map(), allProbes: [] } as {
onlineProbesMap: Map<number, Probe>;
allProbes: (Probe | OfflineProbe)[];
};

for (let i = 0; i < prevIps.length; i++) {
const ip = prevIps[i]!;
const connectedProbe = ipToConnectedProbe.get(ip);

if (connectedProbe) {
probesMap.set(i, connectedProbe);
probesAndOfflineProbes.push(connectedProbe);
onlineProbesMap.set(i, connectedProbe);
allProbes.push(connectedProbe);
} else {
if (!prevMeasurement) {
prevMeasurement = await this.store.getMeasurementJson(measurementId);
@@ -116,11 +116,11 @@ export class ProbeRouter {
}

const offlineProbe = this.testToOfflineProbe(prevTest, ip);
probesAndOfflineProbes.push(offlineProbe);
allProbes.push(offlineProbe);
}
}

return { probesMap, probesAndOfflineProbes };
return { onlineProbesMap, allProbes };
}

private testToOfflineProbe = (test: MeasurementResult, ip: string): OfflineProbe => ({
@@ -156,7 +156,7 @@ export class ProbeRouter {
},
jobs: { count: 0 },
},
});
} as OfflineProbe);
}

// Factory
3 changes: 2 additions & 1 deletion src/probe/types.ts
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ export type ProbeStats = {
};

export type Tag = {
type: 'system' | 'admin' | 'user' | 'offline';
type: 'system' | 'admin' | 'user';
value: string;
};

@@ -61,6 +61,7 @@ export type OfflineProbe = Modify<Probe, {
host: null;
index: [];
tags: {
type: 'offline';
MartinKolarik marked this conversation as resolved.
Show resolved Hide resolved
value: string;
}[];
stats: {