Skip to content

Commit a5139c3

Browse files
authored
feat: send probe logs to the API (#314)
1 parent 91e9dfe commit a5139c3

File tree

6 files changed

+447
-4
lines changed

6 files changed

+447
-4
lines changed

package-lock.json

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
"socket.io-client": "^4.8.1",
2727
"throng": "^5.0.0",
2828
"tldts": "^7.0.8",
29-
"winston": "^3.17.0"
29+
"winston": "^3.17.0",
30+
"winston-transport": "^4.9.0"
3031
},
3132
"devDependencies": {
3233
"@commitlint/cli": "^19.8.1",

src/lib/api-logs-transport.ts

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import Transport from 'winston-transport';
2+
import { Socket } from 'socket.io-client';
3+
import { Logger } from 'winston';
4+
5+
export type ApiTransportSettings = {
6+
isActive?: boolean;
7+
sendInterval?: number;
8+
maxBufferSize?: number;
9+
};
10+
11+
export type ApiTransportOptions = Transport.TransportStreamOptions & ApiTransportSettings & { socket?: Socket };
12+
13+
type Info = {
14+
message: string;
15+
timestamp: string;
16+
level: string;
17+
scope: string;
18+
};
19+
20+
class ApiLogsTransport extends Transport {
21+
private logger: Logger | undefined;
22+
private socket: Socket | undefined;
23+
private isActive: boolean;
24+
private sendInterval: number;
25+
private maxBufferSize: number;
26+
private logBuffer: Info[] = [];
27+
private droppedLogs: number = 0;
28+
private timer: NodeJS.Timeout | undefined = undefined;
29+
30+
constructor (opts?: ApiTransportOptions) {
31+
super(opts);
32+
this.isActive = opts?.isActive ?? false;
33+
this.sendInterval = opts?.sendInterval ?? 10000;
34+
this.maxBufferSize = opts?.maxBufferSize ?? 100;
35+
this.socket = opts?.socket;
36+
this.scheduleSend();
37+
}
38+
39+
override log (info: Info, callback?: () => void) {
40+
setImmediate(() => this.emit('logged', info));
41+
42+
this.logBuffer.push(info);
43+
const bufferLength = this.logBuffer.length;
44+
const bufferOverflow = bufferLength - this.maxBufferSize;
45+
46+
if (bufferOverflow > 0) {
47+
this.logBuffer.splice(0, bufferOverflow);
48+
this.droppedLogs += bufferOverflow;
49+
}
50+
51+
callback && callback();
52+
}
53+
54+
setSocket (socket: Socket) {
55+
this.socket = socket;
56+
}
57+
58+
setLogger (logger: Logger) {
59+
this.logger = logger;
60+
}
61+
62+
getCurrentSettings () {
63+
return {
64+
isActive: this.isActive,
65+
sendInterval: this.sendInterval,
66+
maxBufferSize: this.maxBufferSize,
67+
};
68+
}
69+
70+
updateSettings (settings: ApiTransportSettings) {
71+
this.isActive = settings.isActive ?? this.isActive;
72+
this.sendInterval = settings.sendInterval ?? this.sendInterval;
73+
this.maxBufferSize = settings.maxBufferSize ?? this.maxBufferSize;
74+
this.scheduleSend();
75+
}
76+
77+
private scheduleSend () {
78+
clearTimeout(this.timer);
79+
80+
if (this.isActive) {
81+
this.timer = setTimeout(() => {
82+
void this.sendLogs();
83+
}, this.sendInterval);
84+
}
85+
}
86+
87+
private async sendLogs () {
88+
if (!this.isActive || !this.socket?.connected || !this.logBuffer.length) {
89+
return this.scheduleSend();
90+
}
91+
92+
const payload = {
93+
logs: this.logBuffer.slice(),
94+
skipped: this.droppedLogs,
95+
};
96+
97+
const droppedInPayload = payload.skipped;
98+
const presentInPayload = payload.logs.length;
99+
100+
try {
101+
const response: unknown = await this.socket.emitWithAck('probe:logs', payload);
102+
103+
if (response === 'success') {
104+
const droppedWhileAwaiting = this.droppedLogs - droppedInPayload;
105+
const oldLogsRemaining = presentInPayload - droppedWhileAwaiting;
106+
107+
if (oldLogsRemaining >= 0) {
108+
this.logBuffer.splice(0, oldLogsRemaining);
109+
this.droppedLogs = 0;
110+
} else {
111+
this.droppedLogs = -oldLogsRemaining; // === droppedWhileAwaiting - presentInPayload
112+
}
113+
}
114+
} catch (e) {
115+
this.logger?.error('Failed to send logs to the API.', e);
116+
} finally {
117+
this.scheduleSend();
118+
}
119+
}
120+
}
121+
122+
export default ApiLogsTransport;

src/lib/logger.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
import process from 'node:process';
33
import { inspect } from 'node:util';
44
import * as winston from 'winston';
5+
import ApiLogsTransport from './api-logs-transport.js';
6+
7+
export const apiLogsTransport = new ApiLogsTransport();
58

69
const objectFormatter = (object: Record<string, any>) => {
710
const entries = Object.entries(object).map(([ key, value ]) => {
@@ -40,7 +43,10 @@ const logger = winston.createLogger({
4043
),
4144
transports: [
4245
new winston.transports.Console(),
46+
apiLogsTransport,
4347
],
4448
});
4549

4650
export const scopedLogger = (scope: string): winston.Logger => logger.child({ scope });
51+
52+
apiLogsTransport.setLogger(scopedLogger('api-logs-transport'));

src/probe.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ import physicalCpuCount from 'physical-cpu-count';
1010
import { getFakeIp } from './lib/fake-ip.js';
1111
import type { CommandInterface, MeasurementRequest } from './types.js';
1212
import { loadAll as loadAllDeps } from './lib/dependencies.js';
13-
import { scopedLogger } from './lib/logger.js';
13+
import { apiLogsTransport, scopedLogger } from './lib/logger.js';
14+
import { ApiTransportSettings } from './lib/api-logs-transport.js';
1415
import { initErrorHandler } from './helper/api-error-handler.js';
1516
import { handleTestError } from './helper/test-error-handler.js';
1617
import { apiConnectLocationHandler } from './helper/api-connect-handler.js';
@@ -103,6 +104,8 @@ function connect (workerId?: number) {
103104
});
104105

105106
runStatsAgent(socket, worker);
107+
apiLogsTransport.setSocket(socket);
108+
106109
const statusManager = initStatusManager(socket, pingCmd);
107110
const errorHandler = initErrorHandler(socket);
108111

@@ -150,7 +153,8 @@ function connect (workerId?: number) {
150153
}
151154
});
152155
})
153-
.on('probe:adoption:code', logAdoptionCode);
156+
.on('probe:adoption:code', logAdoptionCode)
157+
.on('api:logs-transport:set', (data: ApiTransportSettings) => apiLogsTransport.updateSettings(data));
154158

155159
process.on('SIGTERM', () => {
156160
logger.debug('SIGTERM received.');

0 commit comments

Comments
 (0)