-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbufferedTailer.go
168 lines (155 loc) · 5.11 KB
/
bufferedTailer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
// Copyright 2016-2020 The grok_exporter Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package go_tailer
import (
"github.com/jdrews/go-tailer/fswatcher"
"github.com/sirupsen/logrus"
)
// implements fswatcher.FileTailer
type bufferedTailer struct {
out chan *fswatcher.Line
orig fswatcher.FileTailer
done chan struct{}
}
func (b *bufferedTailer) Lines() chan *fswatcher.Line {
return b.out
}
func (b *bufferedTailer) Errors() chan fswatcher.Error {
return b.orig.Errors()
}
func (b *bufferedTailer) Close() {
b.orig.Close()
close(b.done)
}
func BufferedTailer(orig fswatcher.FileTailer) fswatcher.FileTailer {
return BufferedTailerWithMetrics(orig, &noopMetric{}, logrus.New(), 0)
}
// Wrapper around a tailer that consumes the lines channel quickly.
// The idea is that the original tailer can continue reading lines from the logfile,
// and does not need to wait until the lines are processed.
// The number of buffered lines are exposed as a Prometheus metric, if lines are constantly
// produced faster than they are consumed, we will eventually run out of memory.
//
// ---
// The buffered tailer prevents the following error (this can be reproduced on Windows,
// where we don't keep the logfile open):
//
// Example test actions
// --------------------
//
// Sequence of actions simulated in fileTailer_test:
//
// 1) write line a
// 2) write line b
// 3) move the old logfile away and create a new logfile
// 4) write line c
//
// Good case event processing
// --------------------------
//
// How Events.Process() should process the file system events triggered by the actions above:
//
// 1) MODIFIED : Process() reads line a
// 2) MODIFIED : Process() reads line b
// 3) MOVED_FROM, CREATED : Process() resets the line reader and seeks the file to position 0
// 4) MODIFIED : Process() reads line c
//
// Bad case event processing
// -------------------------
//
// When Events.Process() receives a MODIFIED event, it does not know how many lines have been written.
// Therefore, it reads all new lines until EOF is reached.
// If line processing is slow (writing to the lines channel blocks until all grok patterns are processed),
// we might read 'line b' while we are still processing the first MODIFIED event:
//
// 1) MODIFIED : Process() reads 'line a' and 'line b'
//
// Meanwhile, the test continues with steps 3 and 4, moving the logfile away, creating a new logfile,
// and writing 'line c'. When the tailer receives the second MODIFIED event, it learns that the file
// has been truncated, seeks to position 0, and reads 'line c'.
//
// 2) MODIFIED : Process() detects the truncated file, seeks to position 0, reads 'line c'
//
// The tailer now receives MOVED_FROM, which makes it close the logfile, CREATED, which makes
// it open the logfile and start reading from position 0:
//
// 3) MOVED_FROM, CREATED : seek to position 0, read line c again !!!
//
// When the last MODIFIED event is processed, there are no more changes in the file:
//
// 4) MODIFIED : no changes in file
//
// As a result, we read 'line c' two times.
//
// To minimize the risk, use the buffered tailer to make sure file system events are handled
// as quickly as possible without waiting for the grok patterns to be processed.
func BufferedTailerWithMetrics(orig fswatcher.FileTailer, bufferLoadMetric BufferLoadMetric, log logrus.FieldLogger, maxLinesInBuffer int) fswatcher.FileTailer {
buffer := NewLineBuffer()
out := make(chan *fswatcher.Line)
done := make(chan struct{})
// producer
go func() {
bufferLoadMetric.Start()
for {
line, ok := <-orig.Lines()
if ok {
if maxLinesInBuffer > 0 && buffer.Len() > maxLinesInBuffer-1 {
log.Warnf("Line buffer reached limit of %v lines. Dropping lines in buffer.", maxLinesInBuffer)
buffer.Clear()
bufferLoadMetric.Set(0)
}
buffer.Push(line)
bufferLoadMetric.Inc()
} else {
buffer.Close()
bufferLoadMetric.Stop()
return
}
}
}()
// consumer
go func() {
for {
line := buffer.BlockingPop()
if line == nil {
// buffer closed
close(out)
return
}
bufferLoadMetric.Dec()
select {
case out <- line:
case <-done:
}
}
}()
return &bufferedTailer{
out: out,
orig: orig,
done: done,
}
}
type BufferLoadMetric interface {
Start()
Inc() // put a log line into the buffer
Dec() // take a log line from the buffer
Set(value int64) // set the current number of lines in the buffer
Stop()
}
type noopMetric struct{}
func (m *noopMetric) Start() {}
func (m *noopMetric) Inc() {}
func (m *noopMetric) Dec() {}
func (m *noopMetric) Set(value int64) {}
func (m *noopMetric) Stop() {}