forked from pinojs/pino-elasticsearch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
lib.js
108 lines (94 loc) · 2.73 KB
/
lib.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
'use strict'
/* eslint no-prototype-builtins: 0 */
const split = require('split2')
const { Client, Connection } = require('@elastic/elasticsearch')
function pinoElasticSearch (opts) {
if (opts['bulk-size']) {
process.emitWarning('The "bulk-size" option has been deprecated, "flush-bytes" instead')
delete opts['bulk-size']
}
const splitter = split(function (line) {
let value
try {
value = JSON.parse(line)
} catch (error) {
this.emit('unknown', line, error)
return
}
if (typeof value === 'boolean') {
this.emit('unknown', line, 'Boolean value ignored')
return
}
if (value === null) {
this.emit('unknown', line, 'Null value ignored')
return
}
if (typeof value !== 'object') {
value = {
data: value,
time: setDateTimeString(value)
}
} else {
if (value['@timestamp'] === undefined) {
value.time = setDateTimeString(value)
}
}
function setDateTimeString (value) {
if (typeof value === 'object' && value.hasOwnProperty('time')) {
if (
(typeof value.time === 'string' && value.time.length) ||
(typeof value.time === 'number' && value.time >= 0)
) {
return new Date(value.time).toISOString()
}
}
return new Date().toISOString()
}
return value
}, { autoDestroy: true })
const client = new Client({
node: opts.node,
auth: opts.auth,
cloud: opts.cloud,
ssl: { rejectUnauthorized: opts.rejectUnauthorized },
Connection: opts.Connection || Connection
})
const esVersion = Number(opts['es-version']) || 7
const index = opts.index || 'pino'
const buildIndexName = typeof index === 'function' ? index : null
const type = esVersion >= 7 ? undefined : (opts.type || 'log')
const b = client.helpers.bulk({
datasource: splitter,
flushBytes: opts['flush-bytes'] || 1000,
flushInterval: opts['flush-interval'] || 30000,
refreshOnCompletion: getIndexName(),
onDocument (doc) {
return {
index: {
_index: getIndexName(doc.time || doc['@timestamp']),
_type: type
}
}
},
onDrop (doc) {
const error = new Error('Dropped document')
error.document = doc
splitter.emit('insertError', error)
}
})
b.then(
(stats) => splitter.emit('insert', stats),
(err) => splitter.emit('error', err)
)
splitter._destroy = function (err, cb) {
b.then(() => cb(err), (e2) => cb(e2 || err))
}
function getIndexName (time = new Date().toISOString()) {
if (buildIndexName) {
return buildIndexName(time)
}
return index.replace('%{DATE}', time.substring(0, 10))
}
return splitter
}
module.exports = pinoElasticSearch