Skip to content

Commit

Permalink
misc: add probe metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinKolarik committed Dec 15, 2024
1 parent 7c5bbde commit eddf742
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 15 deletions.
64 changes: 50 additions & 14 deletions src/lib/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import _ from 'lodash';
import process from 'node:process';
import apmAgent from 'elastic-apm-node';
import type { Server as SocketServer } from 'socket.io';

import { getWsServer, PROBES_NAMESPACE } from './ws/server.js';
import { scopedLogger } from './logger.js';
import { fetchProbes, getWsServer, PROBES_NAMESPACE } from './ws/server.js';
import { getMeasurementRedisClient, type RedisClient } from './redis/measurement-client.js';

const logger = scopedLogger('metrics');
Expand All @@ -23,11 +22,7 @@ export class MetricsAgent {
) {}

run (): void {
this.registerAsyncSeries(`gp.probe.count.${process.pid}`, async () => {
return this.io.of(PROBES_NAMESPACE).local.fetchSockets().then(sockets => sockets.length);
});

this.registerAsyncSeries(`gp.measurement.stored.count`, async () => {
this.registerAsyncCollector(`gp.measurement.stored.count`, async () => {
const [ dbSize, awaitingSize ] = await Promise.all([
this.redis.dbSize(),
this.redis.hLen('gp:in-progress'),
Expand All @@ -38,6 +33,25 @@ export class MetricsAgent {
// 1 global key tracks the in-progress measurements
return Math.round((dbSize - awaitingSize - 1) / 2);
});

this.registerAsyncCollector(`gp.probe.count.local`, async () => {
return this.io.of(PROBES_NAMESPACE).local.fetchSockets().then(sockets => sockets.length);
});

this.registerAsyncGroupCollector('global probe stats', async () => {
const probes = await fetchProbes();
const byContinent = _.groupBy(probes, probe => probe.location.continent);

const countByContinent = _(byContinent)
.mapKeys((_probes, continent) => `gp.probe.count.continent.${continent}`)
.mapValues(probes => probes.length)
.value();

return {
...countByContinent,
'gp.probe.count.adopted': probes.filter(probe => probe.owner).length,
};
});
}

recordMeasurementTime (type: string, time: number): void {
Expand Down Expand Up @@ -105,22 +119,44 @@ export class MetricsAgent {
});
}

private registerAsyncSeries (name: string, callback: () => Promise<number>): void {
private recordAsyncDatapoint (name: string, value: number): void {
if (!this.asyncSeries[name]) {
this.registerAsyncSeries(name);
}

this.asyncSeries[name]!.push(value);
}

private registerAsyncSeries (name: string): void {
this.asyncSeries[name] = [];

registerGuardedMetric(name, () => {
const value = this.asyncSeries[name]!.at(-1);
this.asyncSeries[name] = [];
return value;
});
}

private registerAsyncCollector (name: string, callback: () => Promise<number>): void {
this.timers[name] = setInterval(() => {
callback().then((value) => {
this.asyncSeries[name]!.push(value);
this.recordAsyncDatapoint(name, value);
}).catch((error) => {
logger.error(`Failed to collect an async metric "${name}"`, error);
});
}, 10 * 1000);
}

registerGuardedMetric(name, () => {
const value = this.asyncSeries[name]!.at(-1);
this.asyncSeries[name] = [];
return value;
});
private registerAsyncGroupCollector (groupName: string, callback: () => Promise<{[k: string]: number}>): void {
this.timers[groupName] = setInterval(() => {
callback().then((group) => {
Object.entries(group).forEach(([ key, value ]) => {
this.recordAsyncDatapoint(key, value);
});
}).catch((error) => {
logger.error(`Failed to collect an async metric group "${groupName}"`, error);
});
}, 10 * 1000);
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/lib/override/adopted-probes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ export class AdoptedProbes {
const hasUserTags = adoption.tags && adoption.tags.length;

if (!isCustomCity && !hasUserTags) {
return probe;
return { ...probe, owner: { id: adoption.userId } };
}

const newLocation = this.getUpdatedLocation(probe) || probe.location;
Expand All @@ -198,6 +198,7 @@ export class AdoptedProbes {
location: newLocation,
tags: newTags,
index: getIndex(newLocation, newTags),
owner: { id: adoption.userId },
};
});
}
Expand Down
1 change: 1 addition & 0 deletions src/probe/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export type Probe = {
tags: Tag[];
stats: ProbeStats;
hostInfo: HostInfo;
owner?: { id: string };
};

type Modify<T, Fields> = Omit<T, keyof Fields> & Fields;
Expand Down

0 comments on commit eddf742

Please sign in to comment.