Skip to content

Commit

Permalink
refactor: refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
alexey-yarmosh committed Nov 30, 2023
1 parent eb8c181 commit 3139ab0
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 75 deletions.
16 changes: 8 additions & 8 deletions src/measurement/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,20 @@ export class MeasurementRunner {

async run (ctx: Context): Promise<{measurementId: string; probesCount: number;}> {
const request = ctx.request.body as MeasurementRequest;
const probes = await this.router.findMatchingProbes(request.locations, request.limit);
const { probesMap, probesAndOfflineProbes } = await this.router.findMatchingProbes(request.locations, request.limit);

if (probes.length === 0) {
if (probesAndOfflineProbes.length === 0) {
throw createHttpError(422, 'No suitable probes found.', { type: 'no_probes_found' });
}

await checkRateLimits(ctx, probes.filter(probe => probe.status !== 'offline').length);
await checkRateLimits(ctx, probesMap.size);

const measurementId = await this.store.createMeasurement(request, probes);
const measurementId = await this.store.createMeasurement(request, probesMap, probesAndOfflineProbes);

this.sendToProbes(measurementId, probes, request);
this.sendToProbes(measurementId, probesMap, request);
this.metrics.recordMeasurement(request.type);

return { measurementId, probesCount: probes.length };
return { measurementId, probesCount: probesAndOfflineProbes.length };
}

async recordProgress (data: MeasurementProgressMessage): Promise<void> {
Expand All @@ -52,10 +52,10 @@ export class MeasurementRunner {
}
}

private sendToProbes (measurementId: string, probes: Probe[], request: MeasurementRequest) {
private sendToProbes (measurementId: string, probesMap: Map<number, Probe>, request: MeasurementRequest) {
let inProgressProbes = 0;
const maxInProgressProbes = config.get<number>('measurement.maxInProgressProbes');
probes.forEach((probe, index) => {
probesMap.forEach((probe, index) => {
const inProgressUpdates = request.inProgressUpdates && inProgressProbes++ < maxInProgressProbes;
this.io.of('probes').to(probe.client).emit('probe:measurement:request', {
measurementId,
Expand Down
16 changes: 8 additions & 8 deletions src/measurement/store.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import config from 'config';
import _ from 'lodash';
import cryptoRandomString from 'crypto-random-string';
import type { Probe } from '../probe/types.js';
import type { OfflineProbe, Probe } from '../probe/types.js';
import type { RedisClient } from '../lib/redis/client.js';
import { getRedisClient } from '../lib/redis/client.js';
import { scopedLogger } from '../lib/logger.js';
Expand Down Expand Up @@ -46,11 +46,11 @@ export class MeasurementStore {
return await this.redis.json.get(getMeasurementKey(id)) as MeasurementRecord;
}

async createMeasurement (request: MeasurementRequest, probes: Probe[]): Promise<string> {
async createMeasurement (request: MeasurementRequest, probes: Map<number, Probe>, probesAndOfflineProbes: (Probe | OfflineProbe)[]): Promise<string> {
const id = cryptoRandomString({ length: 16, type: 'alphanumeric' });
const key = getMeasurementKey(id);

const results = this.probesToResults(probes, request.type);
const results = this.probesToResults(probesAndOfflineProbes, request.type);
const probesAwaitingTtl = config.get<number>('measurement.timeout') + 5;
const startTime = new Date();
const measurement: MeasurementRecord = {
Expand All @@ -61,7 +61,7 @@ export class MeasurementStore {
updatedAt: startTime.toISOString(),
target: request.target,
limit: request.limit,
probesCount: probes.length,
probesCount: probesAndOfflineProbes.length,
locations: request.locations,
measurementOptions: request.measurementOptions,
results,
Expand All @@ -70,8 +70,8 @@ export class MeasurementStore {

await Promise.all([
this.redis.hSet('gp:in-progress', id, startTime.getTime()),
this.redis.set(getMeasurementKey(id, 'probes_awaiting'), probes.filter(probe => probe.status !== 'offline').length, { EX: probesAwaitingTtl }),
this.redis.json.set(getMeasurementKey(id, 'ips'), '$', probes.map(probe => probe.ipAddress)),
this.redis.set(getMeasurementKey(id, 'probes_awaiting'), probes.size, { EX: probesAwaitingTtl }),
this.redis.json.set(getMeasurementKey(id, 'ips'), '$', probesAndOfflineProbes.map(probe => probe.ipAddress)),
this.redis.json.set(key, '$', measurementWithoutDefaults),
this.redis.expire(getMeasurementKey(id, 'ips'), config.get<number>('measurement.resultTTL')),
this.redis.expire(key, config.get<number>('measurement.resultTTL')),
Expand Down Expand Up @@ -170,7 +170,7 @@ export class MeasurementStore {
return substractObjects(measurement, defaults) as Partial<MeasurementRecord>;
}

probesToResults (probes: Probe[], type: RequestType) {
probesToResults (probes: (Probe | OfflineProbe)[], type: RequestType) {
const results = probes.map(probe => ({
probe: {
continent: probe.location.continent,
Expand All @@ -191,7 +191,7 @@ export class MeasurementStore {
return results;
}

getInitialResult (type: RequestType, status: Probe['status']) {
getInitialResult (type: RequestType, status: Probe['status'] | OfflineProbe['status']) {
if (status === 'offline') {
return {
status: 'failed',
Expand Down
4 changes: 1 addition & 3 deletions src/measurement/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,6 @@ export type LocationWithLimit = Location & {limit?: number};
* Measurement Objects
*/

type MeasurementStatus = 'in-progress' | 'finished';

export type MeasurementRequest = {
type: 'ping' | 'traceroute' | 'dns' | 'http' | 'mtr';
target: string;
Expand Down Expand Up @@ -201,7 +199,7 @@ export type MeasurementResult = {
export type MeasurementRecord = {
id: string;
type: MeasurementRequest['type'];
status: MeasurementStatus;
status: 'in-progress' | 'finished';
createdAt: string;
updatedAt: string;
target: string;
Expand Down
131 changes: 76 additions & 55 deletions src/probe/router.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import _ from 'lodash';
import { fetchSockets } from '../lib/ws/fetch-sockets.js';
import type { LocationWithLimit, MeasurementRecord } from '../measurement/types.js';
import type { LocationWithLimit, MeasurementRecord, MeasurementResult } from '../measurement/types.js';
import type { Location } from '../lib/location/types.js';
import type { Probe } from './types.js';
import type { OfflineProbe, Probe } from './types.js';
import { ProbesLocationFilter } from './probes-location-filter.js';
import { getMeasurementStore } from '../measurement/store.js';
import { normalizeFromPublicName, normalizeNetworkName } from '../lib/geoip/utils.js';
Expand All @@ -17,21 +17,28 @@ export class ProbeRouter {
public async findMatchingProbes (
locations: LocationWithLimit[] | string = [],
globalLimit = 1,
): Promise<Probe[]> {
const probes = await this.fetchProbes();
let filtered: Probe[] = [];
) {
const connectedProbes = await this.fetchProbes();
let probesMap: Map<number, Probe>;
let probesAndOfflineProbes: (Probe | OfflineProbe)[] = [];

if (typeof locations === 'string') {
filtered = await this.findWithMeasurementId(probes, locations);
({ probesMap, probesAndOfflineProbes } = await this.findWithMeasurementId(connectedProbes, locations));
} else if (locations.some(l => l.limit)) {
filtered = this.findWithLocationLimit(probes, locations);
const filtered = this.findWithLocationLimit(connectedProbes, locations);
probesAndOfflineProbes = filtered;
probesMap = new Map(filtered.entries());
} else if (locations.length > 0) {
filtered = this.findWithGlobalLimit(probes, locations, globalLimit);
const filtered = this.findWithGlobalLimit(connectedProbes, locations, globalLimit);
probesAndOfflineProbes = filtered;
probesMap = new Map(filtered.entries());
} else {
filtered = this.findGloballyDistributed(probes, globalLimit);
const filtered = this.findGloballyDistributed(connectedProbes, globalLimit);
probesAndOfflineProbes = filtered;
probesMap = new Map(filtered.entries());
}

return filtered;
return { probesMap, probesAndOfflineProbes };
}

private async fetchProbes (): Promise<Probe[]> {
Expand Down Expand Up @@ -73,69 +80,83 @@ export class ProbeRouter {
return [ ...picked ];
}

private async findWithMeasurementId (probes: Probe[], measurementId: string): Promise<Probe[]> {
private async findWithMeasurementId (connectedProbes: Probe[], measurementId: string) {
const ipToConnectedProbe = new Map(connectedProbes.map(probe => [ probe.ipAddress, probe ]));
let prevMeasurement: MeasurementRecord | undefined;
const prevIps = await this.store.getIpsByMeasurementId(measurementId);
const ipToProbe = new Map(probes.map(probe => [ probe.ipAddress, probe ]));
const result: Probe[] = [];

const probesMap: Map<number, Probe> = new Map();
const probesAndOfflineProbes: (Probe | OfflineProbe)[] = [];

const emptyResult = { probesMap: new Map(), probesAndOfflineProbes: [] } as {
probesMap: Map<number, Probe>;
probesAndOfflineProbes: (Probe | OfflineProbe)[];
};

for (let i = 0; i < prevIps.length; i++) {
const ip = prevIps[i]!;
const probe = ipToProbe.get(ip);
const connectedProbe = ipToConnectedProbe.get(ip);

if (probe) {
result.push(probe);
if (connectedProbe) {
probesMap.set(i, connectedProbe);
probesAndOfflineProbes.push(connectedProbe);
} else {
if (!prevMeasurement) {
prevMeasurement = await this.store.getMeasurementJson(measurementId);

if (!prevMeasurement) { return []; }
if (!prevMeasurement) {
return emptyResult;
}
}

const prevTest = prevMeasurement.results[i];

if (!prevTest) { return []; }

const offlineProbe: Probe = {
status: 'offline',
client: '',
version: '',
nodeVersion: '',
uuid: '',
isHardware: false,
hardwareDevice: null,
ipAddress: ip,
host: '',
location: {
continent: prevTest.probe.continent,
region: prevTest.probe.region,
country: prevTest.probe.country,
city: prevTest.probe.city,
normalizedCity: normalizeFromPublicName(prevTest.probe.city),
asn: prevTest.probe.asn,
latitude: prevTest.probe.latitude,
longitude: prevTest.probe.longitude,
state: prevTest.probe.state,
network: prevTest.probe.network,
normalizedNetwork: normalizeNetworkName(prevTest.probe.network),
},
index: [],
resolvers: prevTest.probe.resolvers,
tags: prevTest.probe.tags.map(tag => ({ value: tag, type: 'offline' })),
stats: {
cpu: {
count: 0,
load: [],
},
jobs: { count: 0 },
},
};
result.push(offlineProbe);
if (!prevTest) {
return emptyResult;
}

const offlineProbe = this.testToOfflineProbe(prevTest, ip);
probesAndOfflineProbes.push(offlineProbe);
}
}

return result;
return { probesMap, probesAndOfflineProbes };
}

private testToOfflineProbe = (test: MeasurementResult, ip: string): OfflineProbe => ({
status: 'offline',
client: null,
version: null,
nodeVersion: null,
uuid: null,
isHardware: false,
hardwareDevice: null,
ipAddress: ip,
host: null,
location: {
continent: test.probe.continent,
region: test.probe.region,
country: test.probe.country,
city: test.probe.city,
normalizedCity: normalizeFromPublicName(test.probe.city),
asn: test.probe.asn,
latitude: test.probe.latitude,
longitude: test.probe.longitude,
state: test.probe.state,
network: test.probe.network,
normalizedNetwork: normalizeNetworkName(test.probe.network),
},
index: [],
resolvers: test.probe.resolvers,
tags: test.probe.tags.map(tag => ({ value: tag, type: 'offline' })),
stats: {
cpu: {
count: 0,
load: [],
},
jobs: { count: 0 },
},
});
}

// Factory
Expand Down
29 changes: 28 additions & 1 deletion src/probe/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export type Tag = {
};

export type Probe = {
status: 'initializing' | 'ready' | 'unbuffer-missing' | 'ping-test-failed' | 'sigterm' | 'offline';
status: 'initializing' | 'ready' | 'unbuffer-missing' | 'ping-test-failed' | 'sigterm';
client: string;
version: string;
nodeVersion: string;
Expand All @@ -46,3 +46,30 @@ export type Probe = {
tags: Tag[];
stats: ProbeStats;
};

type Modify<T, Fields> = Omit<T, keyof Fields> & Fields;

export type OfflineProbe = Modify<Probe, {
status: 'offline';
client: null;
version: null;
nodeVersion: null;
uuid: null;
isHardware: false;
hardwareDevice: null;
ipAddress: string;
host: null;
index: [];
tags: {
value: string;
}[];
stats: {
cpu: {
count: 0;
load: [];
};
jobs: {
count: 0
};
};
}>

0 comments on commit 3139ab0

Please sign in to comment.