-
Notifications
You must be signed in to change notification settings - Fork 1
/
aws-reporter.ts
113 lines (95 loc) · 3.07 KB
/
aws-reporter.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import { CloudWatch, MetricDatum, StandardUnit } from "@aws-sdk/client-cloudwatch";
import leoLogger from "leo-logger";
import { Metric, MetricReporter, ReporterConfigs } from './types';
import async from 'async';
import { removeColon } from "./utils";
const logger = leoLogger("aws-reporter");
export interface AWSConfig {
region?: string;
dontSendMetrics?: boolean;
}
export class AwsReporter implements MetricReporter {
static GetStaticReporter(configs: ReporterConfigs): AwsReporter | null {
const awsConfig = configs.AWS;
if (awsConfig) {
awsConfig.dontSendMetrics = configs.dontSendMetrics;
return new AwsReporter(awsConfig);
}
return null;
}
public name = "AWS";
static MetricsSendLimit = 20;
private cloudwatch: CloudWatch;
metrics = [];
metricPromises = [];
sendMetrics: boolean;
constructor(config?: Partial<AWSConfig>) {
this.cloudwatch = new CloudWatch({
region: config?.region || process.env.AWS_REGION || "us-east-1"
});
this.sendMetrics = config?.dontSendMetrics !== true;
}
async start() {
// Nothing to do
}
log(metric: Metric) {
this.metrics.push(metric);
if (this.metrics.length >= AwsReporter.MetricsSendLimit) {
this.metricPromises.push(this.send(this.metrics.splice(0, AwsReporter.MetricsSendLimit)));
}
}
async end() {
this.metricPromises.push(this.send(this.metrics.splice(0, this.metrics.length)));
await Promise.all(this.metricPromises.splice(0, this.metricPromises.length));
}
async send(metrics: Metric[]) {
// Split into chunks of 20. AWS only allows 20 metrics per
const awsMetrics: MetricDatum[][] = metrics.reduce((chunks, metric) => {
let currentChunk = chunks[chunks.length - 1];
if (currentChunk.length == AwsReporter.MetricsSendLimit) {
currentChunk = [];
chunks.push(currentChunk);
}
const metricId = metric.id;
// Map object tags to a string array
// colons seperate the key:value in the tags
const tags = Object.entries(metric.tags || {})
.filter(([key, value]) => value != null && key != "service")
.map(([key, value]) => {
const cleanKey = removeColon(key);
if (Array.isArray(value)) {
return value.map(value => ({
Name: cleanKey,
Value: removeColon(value)
}));
} else {
return [{
Name: cleanKey,
Value: removeColon(value)
}];
}
}).reduce((all, one) => all.concat(one), []);
logger.debug(metricId, metric.value, ...tags);
const units = metric.units || "Count";
currentChunk.push({
MetricName: metricId,
Value: metric.value,
Unit: units[0].toUpperCase() + units.substring(1) as StandardUnit,
Dimensions: tags,
Timestamp: metric.timestamp
});
return chunks;
}, [[]] as MetricDatum[][]).filter(a => a.length);
// Send it all to AWS
if (this.sendMetrics) {
await new Promise((resolve, reject) => {
async.eachOfLimit(awsMetrics, 10, (value: MetricDatum[], _key: string, cb) => {
this.cloudwatch.putMetricData({
Namespace: "rstreams",
MetricData: value
}, (err) => cb(err));
}, (err, data) => err ? reject(err) : resolve(data));
});
}
}
}