Skip to content

Commit ef9e9f7

Browse files
author
Rafal Augustyniak
committed
feat: support aggregating metrics across workers in a Node.js worker_threads
1 parent 96f7495 commit ef9e9f7

File tree

2 files changed

+143
-21
lines changed

2 files changed

+143
-21
lines changed

example/workerThreads.js

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
'use strict';
2+
3+
const express = require('express');
4+
const metricsServer = express();
5+
const AggregatorRegistry = require('../').AggregatorRegistry;
6+
/* eslint-disable node/no-unsupported-features/node-builtins */
7+
const { Worker, isMainThread } = require('worker_threads');
8+
9+
const aggregatorRegistry = new AggregatorRegistry();
10+
11+
if (isMainThread) {
12+
metricsServer.get('/metrics', async (req, res) => {
13+
try {
14+
const metrics = await aggregatorRegistry.clusterMetrics();
15+
res.set('Content-Type', aggregatorRegistry.contentType);
16+
res.send(metrics);
17+
} catch (ex) {
18+
res.statusCode = 500;
19+
res.send(ex.message);
20+
}
21+
});
22+
23+
metricsServer.listen(3000);
24+
25+
const worker = new Worker(__filename);
26+
27+
aggregatorRegistry.attachWorkers([worker]);
28+
} else {
29+
const Histogram = require('../').Histogram;
30+
const h = new Histogram({
31+
name: 'test_histogram',
32+
help: 'Example of a histogram',
33+
labelNames: ['code'],
34+
});
35+
36+
h.labels('200').observe(Math.random());
37+
}

lib/cluster.js

+106-21
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,20 @@
1111
const Registry = require('./registry');
1212
const { Grouper } = require('./util');
1313
const { aggregators } = require('./metricAggregators');
14+
15+
let parentPort, MessageChannel, isMainThread, Worker;
16+
try {
17+
/* eslint-disable node/no-unsupported-features/node-builtins */
18+
const worker_threads = require('worker_threads');
19+
20+
parentPort = worker_threads.parentPort;
21+
MessageChannel = worker_threads.MessageChannel;
22+
isMainThread = worker_threads.isMainThread;
23+
Worker = worker_threads.Worker;
24+
} catch {
25+
// node version is too old
26+
}
27+
1428
// We need to lazy-load the 'cluster' module as some application servers -
1529
// namely Passenger - crash when it is imported.
1630
let cluster = () => {
@@ -25,6 +39,7 @@ const GET_METRICS_RES = 'prom-client:getMetricsRes';
2539
let registries = [Registry.globalRegistry];
2640
let requestCtr = 0; // Concurrency control
2741
let listenersAdded = false;
42+
const workersQueue = [];
2843
const requests = new Map(); // Pending requests for workers' local metrics.
2944

3045
class AggregatorRegistry extends Registry {
@@ -33,6 +48,14 @@ class AggregatorRegistry extends Registry {
3348
addListeners();
3449
}
3550

51+
attachWorkers(workers = []) {
52+
for (const worker in workers) {
53+
if (worker instanceof Worker) {
54+
workersQueue.push(worker);
55+
}
56+
}
57+
}
58+
3659
/**
3760
* Gets aggregated metrics for all workers. The optional callback and
3861
* returned Promise resolve with the same value; either may be used.
@@ -76,6 +99,9 @@ class AggregatorRegistry extends Registry {
7699
}
77100
}
78101

102+
getWorkerThreadsMetrics(message);
103+
request.pending += workersQueue.length || 0;
104+
79105
if (request.pending === 0) {
80106
// No workers were up
81107
clearTimeout(request.errorTimeout);
@@ -145,6 +171,31 @@ class AggregatorRegistry extends Registry {
145171
}
146172
}
147173

174+
function handleWorkerResponse(worker, message) {
175+
if (message.type === GET_METRICS_RES) {
176+
const request = requests.get(message.requestId);
177+
request.pending += message.workerRequests || 0;
178+
179+
if (message.error) {
180+
request.done(new Error(message.error));
181+
return;
182+
}
183+
184+
message.metrics.forEach(registry => request.responses.push(registry));
185+
request.pending--;
186+
187+
if (request.pending === 0) {
188+
// finalize
189+
requests.delete(message.requestId);
190+
clearTimeout(request.errorTimeout);
191+
192+
const registry = AggregatorRegistry.aggregate(request.responses);
193+
const promString = registry.metrics();
194+
request.done(null, promString);
195+
}
196+
}
197+
}
198+
148199
/**
149200
* Adds event listeners for cluster aggregation. Idempotent (safe to call more
150201
* than once).
@@ -155,42 +206,53 @@ function addListeners() {
155206
listenersAdded = true;
156207

157208
if (cluster().isMaster) {
158-
// Listen for worker responses to requests for local metrics
159-
cluster().on('message', (worker, message) => {
160-
if (message.type === GET_METRICS_RES) {
161-
const request = requests.get(message.requestId);
162-
163-
if (message.error) {
164-
request.done(new Error(message.error));
165-
return;
166-
}
209+
// Listen for cluster responses to requests for local metrics
210+
cluster().on('message', handleWorkerResponse);
211+
}
212+
}
167213

168-
message.metrics.forEach(registry => request.responses.push(registry));
169-
request.pending--;
214+
function getWorkerThreadsMetrics(message) {
215+
workersQueue.forEach(worker => {
216+
if (worker && worker instanceof Worker) {
217+
const metricsChannel = new MessageChannel();
170218

171-
if (request.pending === 0) {
172-
// finalize
173-
requests.delete(message.requestId);
174-
clearTimeout(request.errorTimeout);
219+
worker.postMessage(
220+
{
221+
...message,
222+
port: metricsChannel.port1,
223+
},
224+
[metricsChannel.port1],
225+
);
175226

176-
const registry = AggregatorRegistry.aggregate(request.responses);
177-
const promString = registry.metrics();
178-
request.done(null, promString);
227+
metricsChannel.port2.on('message', response => {
228+
if (response.type === GET_METRICS_RES) {
229+
if (cluster().isWorker) {
230+
process.send(response);
231+
}
232+
233+
if (cluster().isMaster) {
234+
handleWorkerResponse(worker, response);
235+
}
236+
237+
metricsChannel.port2.close();
179238
}
180-
}
181-
});
182-
}
239+
});
240+
}
241+
});
183242
}
184243

185244
// Respond to master's requests for worker's local metrics.
186245
process.on('message', message => {
187246
if (cluster().isWorker && message.type === GET_METRICS_REQ) {
247+
getWorkerThreadsMetrics(message);
248+
188249
Promise.all(registries.map(r => r.getMetricsAsJSON()))
189250
.then(metrics => {
190251
process.send({
191252
type: GET_METRICS_RES,
192253
requestId: message.requestId,
193254
metrics,
255+
workerRequests: workersQueue.length,
194256
});
195257
})
196258
.catch(error => {
@@ -203,4 +265,27 @@ process.on('message', message => {
203265
}
204266
});
205267

268+
// Respond to master's request for worker_threads worker local metrics
269+
if (!isMainThread) {
270+
parentPort.on('message', ({ type, requestId, port } = {}) => {
271+
if (type === GET_METRICS_REQ) {
272+
Promise.all(registries.map(r => r.getMetricsAsJSON()))
273+
.then(metrics => {
274+
port.postMessage({
275+
type: GET_METRICS_RES,
276+
requestId,
277+
metrics,
278+
});
279+
})
280+
.catch(error => {
281+
port.postMessage({
282+
type: GET_METRICS_RES,
283+
requestId,
284+
error: error.message,
285+
});
286+
});
287+
}
288+
});
289+
}
290+
206291
module.exports = AggregatorRegistry;

0 commit comments

Comments
 (0)