Skip to content

Commit 9309993

Browse files
feat: make measurement cloning work
1 parent 2bf0cfb commit 9309993

File tree

6 files changed

+253
-185
lines changed

6 files changed

+253
-185
lines changed

Diff for: src/measurement/route/get-measurement.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ const handle = async (ctx: ParameterizedContext<DefaultState, DefaultContext & R
1212
return;
1313
}
1414

15-
const result = await store.getMeasurementResults(id);
15+
const result = await store.getMeasurement(id);
1616

1717
if (!result) {
1818
ctx.status = 404;

Diff for: src/measurement/store.ts

+17-6
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import type { Probe } from '../probe/types.js';
55
import type { RedisClient } from '../lib/redis/client.js';
66
import { getRedisClient } from '../lib/redis/client.js';
77
import { scopedLogger } from '../lib/logger.js';
8-
import type { MeasurementRecord, MeasurementResult, MeasurementRequest, MeasurementProgressMessage } from './types.js';
8+
import type { MeasurementRecord, MeasurementResult, MeasurementRequest, MeasurementProgressMessage, RequestType } from './types.js';
99
import { getDefaults } from './schema/utils.js';
1010

1111
const logger = scopedLogger('store');
@@ -38,10 +38,14 @@ const substractObjects = (obj1: Record<string, unknown>, obj2: Record<string, un
3838
export class MeasurementStore {
3939
constructor (private readonly redis: RedisClient) {}
4040

41-
async getMeasurementResults (id: string): Promise<string> {
41+
async getMeasurement (id: string): Promise<string> {
4242
return this.redis.sendCommand([ 'JSON.GET', getMeasurementKey(id) ]);
4343
}
4444

45+
async getMeasurementJson (id: string): Promise<MeasurementRecord> {
46+
return await this.redis.json.get(getMeasurementKey(id)) as MeasurementRecord;
47+
}
48+
4549
async createMeasurement (request: MeasurementRequest, probes: Probe[]): Promise<string> {
4650
const id = cryptoRandomString({ length: 16, type: 'alphanumeric' });
4751
const key = getMeasurementKey(id);
@@ -66,7 +70,7 @@ export class MeasurementStore {
6670

6771
await Promise.all([
6872
this.redis.hSet('gp:in-progress', id, startTime.getTime()),
69-
this.redis.set(getMeasurementKey(id, 'probes_awaiting'), probes.length, { EX: probesAwaitingTtl }),
73+
this.redis.set(getMeasurementKey(id, 'probes_awaiting'), probes.filter(probe => probe.status !== 'offline').length, { EX: probesAwaitingTtl }),
7074
this.redis.json.set(getMeasurementKey(id, 'ips'), '$', probes.map(probe => probe.ipAddress)),
7175
this.redis.json.set(key, '$', measurementWithoutDefaults),
7276
this.redis.expire(getMeasurementKey(id, 'ips'), config.get<number>('measurement.resultTTL')),
@@ -166,7 +170,7 @@ export class MeasurementStore {
166170
return substractObjects(measurement, defaults) as Partial<MeasurementRecord>;
167171
}
168172

169-
probesToResults (probes: Probe[], type: string) {
173+
probesToResults (probes: Probe[], type: RequestType) {
170174
const results = probes.map(probe => ({
171175
probe: {
172176
continent: probe.location.continent,
@@ -181,13 +185,20 @@ export class MeasurementStore {
181185
tags: probe.tags.map(({ value }) => value),
182186
resolvers: probe.resolvers,
183187
},
184-
result: this.getInitialResult(type),
188+
result: this.getInitialResult(type, probe.status),
185189
} as MeasurementResult));
186190

187191
return results;
188192
}
189193

190-
getInitialResult (type: string) {
194+
getInitialResult (type: RequestType, status: Probe['status']) {
195+
if (status === 'offline') {
196+
return {
197+
status: 'failed',
198+
rawOutput: 'This probe is currently offline. Please try again later.',
199+
};
200+
}
201+
191202
if (type === 'http') {
192203
return {
193204
status: 'in-progress',

Diff for: src/probe/probes-location-filter.ts

+152
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
import config from 'config';
2+
import _ from 'lodash';
3+
import type { Location } from '../lib/location/types.js';
4+
import type { Probe, ProbeLocation } from './types.js';
5+
6+
/*
7+
* [
8+
* [ public key, internal key]
9+
* ]
10+
*
11+
* */
12+
const locationKeyMap = [
13+
[ 'network', 'normalizedNetwork' ],
14+
[ 'city', 'normalizedCity' ],
15+
];
16+
17+
export class ProbesLocationFilter {
18+
static magicFilter (probes: Probe[], magicLocation: string) {
19+
let filteredProbes = probes;
20+
const keywords = magicLocation.split('+');
21+
22+
for (const keyword of keywords) {
23+
const closestExactMatchPosition = probes.reduce((smallestExactMatchPosition, probe) => {
24+
const exactMatchPosition = ProbesLocationFilter.getExactIndexPosition(probe, keyword);
25+
26+
if (exactMatchPosition === -1) {
27+
return smallestExactMatchPosition;
28+
}
29+
30+
return exactMatchPosition < smallestExactMatchPosition ? exactMatchPosition : smallestExactMatchPosition;
31+
}, Number.POSITIVE_INFINITY);
32+
const noExactMatches = closestExactMatchPosition === Number.POSITIVE_INFINITY;
33+
34+
if (noExactMatches) {
35+
filteredProbes = filteredProbes.filter(probe => ProbesLocationFilter.getIndexPosition(probe, keyword) !== -1);
36+
} else {
37+
filteredProbes = filteredProbes.filter(probe => ProbesLocationFilter.getExactIndexPosition(probe, keyword) === closestExactMatchPosition);
38+
}
39+
}
40+
41+
return filteredProbes;
42+
}
43+
44+
static getExactIndexPosition (probe: Probe, value: string) {
45+
return probe.index.findIndex(category => category.some(index => index === value.replaceAll('-', ' ').trim()));
46+
}
47+
48+
static getIndexPosition (probe: Probe, value: string) {
49+
return probe.index.findIndex(category => category.some(index => index.includes(value.replaceAll('-', ' ').trim())));
50+
}
51+
52+
static hasTag (probe: Probe, tag: string) {
53+
return probe.tags.some(({ value }) => value.toLowerCase() === tag);
54+
}
55+
56+
public filterGloballyDistibuted (probes: Probe[], limit: number): Probe[] {
57+
const distribution = this.getDistibutionConfig();
58+
return this.filterByLocationAndWeight(probes, distribution, limit);
59+
}
60+
61+
public filterByLocation (probes: Probe[], location: Location): Probe[] {
62+
if (location.magic === 'world') {
63+
return _.shuffle(this.filterGloballyDistibuted(probes, probes.length));
64+
}
65+
66+
let filteredProbes = probes;
67+
68+
Object.keys(location).forEach((key) => {
69+
if (key === 'tags') {
70+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
71+
probes = probes.filter(probe => location.tags!.every(tag => ProbesLocationFilter.hasTag(probe, tag)));
72+
} else if (key === 'magic') {
73+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
74+
filteredProbes = ProbesLocationFilter.magicFilter(filteredProbes, location.magic!);
75+
} else {
76+
const probeKey = locationKeyMap.find(m => m.includes(key))?.[1] ?? key;
77+
filteredProbes = filteredProbes.filter(probe => location[key as keyof Location] === probe.location[probeKey as keyof ProbeLocation]);
78+
}
79+
});
80+
81+
const isMagicSorting = Object.keys(location).includes('magic');
82+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
83+
return isMagicSorting ? this.magicSort(filteredProbes, location.magic!) : _.shuffle(filteredProbes);
84+
}
85+
86+
public filterByLocationAndWeight (probes: Probe[], distribution: Map<Location, number>, limit: number): Probe[] {
87+
const groupedByLocation = new Map<Location, Probe[]>();
88+
89+
for (const [ location ] of distribution) {
90+
const foundProbes = this.filterByLocation(probes, location);
91+
92+
if (foundProbes.length > 0) {
93+
groupedByLocation.set(location, foundProbes);
94+
}
95+
}
96+
97+
const pickedProbes = new Set<Probe>();
98+
99+
while (groupedByLocation.size > 0 && pickedProbes.size < limit) {
100+
const selectedCount = pickedProbes.size;
101+
102+
for (const [ location, locationProbes ] of groupedByLocation) {
103+
if (pickedProbes.size === limit) {
104+
break;
105+
}
106+
107+
const locationWeight = distribution.get(location);
108+
109+
if (!locationWeight) {
110+
continue;
111+
}
112+
113+
let count = Math.ceil((limit - selectedCount) * locationWeight / 100);
114+
const remainingSpace = limit - pickedProbes.size;
115+
count = count > remainingSpace ? remainingSpace : count;
116+
117+
for (const s of locationProbes.splice(0, count)) {
118+
pickedProbes.add(s);
119+
}
120+
121+
if (locationProbes.length === 0) {
122+
groupedByLocation.delete(location);
123+
}
124+
}
125+
}
126+
127+
return [ ...pickedProbes ];
128+
}
129+
130+
private magicSort (probes: Probe[], magicString: string): Probe[] {
131+
const getClosestIndexPosition = (probe: Probe) => {
132+
const keywords = magicString.split('+');
133+
const closestIndexPosition = keywords.reduce((smallesIndex, keyword) => {
134+
const indexPosition = ProbesLocationFilter.getIndexPosition(probe, keyword);
135+
return indexPosition < smallesIndex ? indexPosition : smallesIndex;
136+
}, Number.POSITIVE_INFINITY);
137+
return closestIndexPosition;
138+
};
139+
140+
const probesGroupedByIndexPosition = _.groupBy(probes, getClosestIndexPosition);
141+
const groupsSortedByIndexPosition = Object.values(probesGroupedByIndexPosition); // Object.values sorts values by key
142+
const groupsWithShuffledItems = groupsSortedByIndexPosition.map(group => _.shuffle(group));
143+
const resultProbes = groupsWithShuffledItems.flat();
144+
145+
return resultProbes;
146+
}
147+
148+
private getDistibutionConfig () {
149+
return new Map<Location, number>(_.shuffle(Object.entries(config.get<Record<string, number>>('measurement.globalDistribution')))
150+
.map(([ value, weight ]) => [{ continent: value }, weight ]));
151+
}
152+
}

Diff for: src/probe/router.ts

+82-24
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
import _ from 'lodash';
2-
import type { RemoteProbeSocket } from '../lib/ws/server.js';
32
import { fetchSockets } from '../lib/ws/fetch-sockets.js';
4-
import type { LocationWithLimit } from '../measurement/types.js';
3+
import type { LocationWithLimit, MeasurementRecord } from '../measurement/types.js';
54
import type { Location } from '../lib/location/types.js';
65
import type { Probe } from './types.js';
7-
import { SocketsLocationFilter } from './sockets-location-filter.js';
6+
import { ProbesLocationFilter } from './probes-location-filter.js';
87
import { getMeasurementStore } from '../measurement/store.js';
98

109
export class ProbeRouter {
11-
private readonly socketsFilter = new SocketsLocationFilter();
10+
private readonly probesFilter = new ProbesLocationFilter();
1211

1312
private readonly store = getMeasurementStore();
1413

@@ -18,51 +17,51 @@ export class ProbeRouter {
1817
locations: LocationWithLimit[] | string = [],
1918
globalLimit = 1,
2019
): Promise<Probe[]> {
21-
const sockets = await this.fetchSockets();
22-
let filtered: RemoteProbeSocket[] = [];
20+
const probes = await this.fetchProbes();
21+
let filtered: Probe[] = [];
2322

2423
if (typeof locations === 'string') {
25-
filtered = await this.findWithMeasurementId(sockets, locations);
24+
filtered = await this.findWithMeasurementId(probes, locations);
2625
} else if (locations.some(l => l.limit)) {
27-
filtered = this.findWithLocationLimit(sockets, locations);
26+
filtered = this.findWithLocationLimit(probes, locations);
2827
} else if (locations.length > 0) {
29-
filtered = this.findWithGlobalLimit(sockets, locations, globalLimit);
28+
filtered = this.findWithGlobalLimit(probes, locations, globalLimit);
3029
} else {
31-
filtered = this.findGloballyDistributed(sockets, globalLimit);
30+
filtered = this.findGloballyDistributed(probes, globalLimit);
3231
}
3332

34-
return filtered.map(s => s.data.probe);
33+
return filtered;
3534
}
3635

37-
private async fetchSockets (): Promise<RemoteProbeSocket[]> {
36+
private async fetchProbes (): Promise<Probe[]> {
3837
const sockets = await this.fetchWsSockets();
39-
return sockets.filter(s => s.data.probe.status === 'ready');
38+
return sockets.filter(s => s.data.probe.status === 'ready').map(s => s.data.probe);
4039
}
4140

42-
private findGloballyDistributed (sockets: RemoteProbeSocket[], limit: number): RemoteProbeSocket[] {
43-
return this.socketsFilter.filterGloballyDistibuted(sockets, limit);
41+
private findGloballyDistributed (probes: Probe[], limit: number): Probe[] {
42+
return this.probesFilter.filterGloballyDistibuted(probes, limit);
4443
}
4544

46-
private findWithGlobalLimit (sockets: RemoteProbeSocket[], locations: Location[], limit: number): RemoteProbeSocket[] {
45+
private findWithGlobalLimit (probes: Probe[], locations: Location[], limit: number): Probe[] {
4746
const weight = Math.floor(100 / locations.length);
4847
const distribution = new Map(locations.map(l => [ l, weight ]));
4948

50-
return this.socketsFilter.filterByLocationAndWeight(sockets, distribution, limit);
49+
return this.probesFilter.filterByLocationAndWeight(probes, distribution, limit);
5150
}
5251

53-
private findWithLocationLimit (sockets: RemoteProbeSocket[], locations: LocationWithLimit[]): RemoteProbeSocket[] {
54-
const grouped = new Map<LocationWithLimit, RemoteProbeSocket[]>();
52+
private findWithLocationLimit (probes: Probe[], locations: LocationWithLimit[]): Probe[] {
53+
const grouped = new Map<LocationWithLimit, Probe[]>();
5554

5655
for (const location of locations) {
5756
const { limit, ...l } = location;
58-
const found = this.socketsFilter.filterByLocation(sockets, l);
57+
const found = this.probesFilter.filterByLocation(probes, l);
5958

6059
if (found.length > 0) {
6160
grouped.set(location, found);
6261
}
6362
}
6463

65-
const picked = new Set<RemoteProbeSocket>();
64+
const picked = new Set<Probe>();
6665

6766
for (const [ loc, soc ] of grouped) {
6867
for (const s of _.take(soc, loc.limit)) {
@@ -73,9 +72,68 @@ export class ProbeRouter {
7372
return [ ...picked ];
7473
}
7574

76-
private async findWithMeasurementId (sockets: RemoteProbeSocket[], measurementId: string): Promise<RemoteProbeSocket[]> {
77-
const ips = await this.store.getIpsByMeasurementId(measurementId);
78-
return sockets.filter(socket => ips.includes(socket.data.probe.ipAddress));
75+
private async findWithMeasurementId (probes: Probe[], measurementId: string): Promise<Probe[]> {
76+
let prevMeasurement: MeasurementRecord | undefined;
77+
const prevIps = await this.store.getIpsByMeasurementId(measurementId);
78+
const ipToProbe = new Map(probes.map(probe => [ probe.ipAddress, probe ]));
79+
const result: Probe[] = [];
80+
81+
for (let i = 0; i < prevIps.length; i++) {
82+
const ip = prevIps[i]!;
83+
const probe = ipToProbe.get(ip);
84+
85+
if (probe) {
86+
result.push(probe);
87+
} else {
88+
if (!prevMeasurement) {
89+
prevMeasurement = await this.store.getMeasurementJson(measurementId);
90+
91+
if (!prevMeasurement) { return []; }
92+
}
93+
94+
const prevTest = prevMeasurement.results[i];
95+
96+
if (!prevTest) { return []; }
97+
98+
const offlineProbe: Probe = {
99+
status: 'offline',
100+
client: '',
101+
version: '',
102+
nodeVersion: '',
103+
uuid: '',
104+
isHardware: false,
105+
hardwareDevice: null,
106+
ipAddress: ip,
107+
host: '',
108+
location: {
109+
continent: prevTest.probe.continent,
110+
region: prevTest.probe.region,
111+
country: prevTest.probe.country,
112+
city: prevTest.probe.city,
113+
normalizedCity: prevTest.probe.city.toLowerCase(),
114+
asn: prevTest.probe.asn,
115+
latitude: prevTest.probe.latitude,
116+
longitude: prevTest.probe.longitude,
117+
state: prevTest.probe.state ?? undefined,
118+
network: prevTest.probe.network,
119+
normalizedNetwork: prevTest.probe.network.toLowerCase(),
120+
},
121+
index: [],
122+
resolvers: prevTest.probe.resolvers,
123+
tags: prevTest.probe.tags.map(tag => ({ value: tag, type: 'system' })),
124+
stats: {
125+
cpu: {
126+
count: 0,
127+
load: [],
128+
},
129+
jobs: { count: 0 },
130+
},
131+
};
132+
result.push(offlineProbe);
133+
}
134+
}
135+
136+
return result;
79137
}
80138
}
81139

0 commit comments

Comments
 (0)