Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiline messages support #121

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
18 changes: 17 additions & 1 deletion CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,24 @@ input:
poll_interval: 5s # should NOT be needed in most cases, see below
```

Example 3:

```yaml
input:
type: file
line_delimiter: "[0-9]{2}.[0-9]{2}.[0-9]{4}"
path:
- /var/logdir2/*.log
readall: false
fail_on_missing_logfile: true
poll_interval: 5s # should NOT be needed in most cases, see below
```

The `path` is the path to the log file. `path` is used if you want to monitor a single path. If you want to monitor a list of paths, use `paths` instead, as in example 2 above. [Glob] patterns are supported on the file level, but not on the directory level. If you want to monitor multiple logfiles, see also [restricting a metric to specific log files](#restricting-a-metric-to-specific-log-files) and [pre-defined label variables](#pre-defined-label-variables) below.

The `line_delimiter` you may use to handle multiline messages. Some services, such as Oracle DB, produce logs in a format
with specific message delimiter (for example, date). If not set, will be line break symbol by default.

The `readall` flag defines if `grok_exporter` starts reading from the beginning or the end of the file.
True means we read the whole file, false means we start at the end of the file and read only new lines.
True is good for debugging, because we process all available log lines.
Expand Down Expand Up @@ -375,7 +391,7 @@ labels:
With the incoming log object being:

```json
{"message": "Login occured", "user": "Skeen", "ip": "1.1.1.1"}'
{"message": "Login occured", "user": "Skeen", "ip": "1.1.1.1"}
```

### Label Template Functions
Expand Down
2 changes: 2 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ global:
PLACEHOLDER
input:
type: file
line_delimiter: \n
path: x/x/x
readall: true
grok:
Expand All @@ -41,6 +42,7 @@ server:
const globalMissing = `
input:
type: file
line_delimiter: \n
path: x/x/x
readall: true
grok:
Expand Down
7 changes: 7 additions & 0 deletions config/v3/configV3.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type GlobalConfig struct {

type InputConfig struct {
Type string `yaml:",omitempty"`
LineDelimiter string `yaml:"line_delimiter"`
PathsAndGlobs `yaml:",inline"`
FailOnMissingLogfileString string `yaml:"fail_on_missing_logfile,omitempty"` // cannot use bool directly, because yaml.v2 doesn't support true as default value.
FailOnMissingLogfile bool `yaml:"-"`
Expand Down Expand Up @@ -249,6 +250,11 @@ func (c *InputConfig) addDefaults() {
if c.Type == inputTypeFile && len(c.FailOnMissingLogfileString) == 0 {
c.FailOnMissingLogfileString = "true"
}
if c.Type == inputTypeFile || c.Type == inputTypeStdin {
if len(c.LineDelimiter) == 0 {
c.LineDelimiter = "\n"
}
}
if c.Type == inputTypeWebhook {
if len(c.WebhookPath) == 0 {
c.WebhookPath = "/webhook"
Expand Down Expand Up @@ -680,6 +686,7 @@ func (cfg *Config) copy() *Config {
func (cfg *Config) marshalToString() string {
var newlineEscape = "___GROK_EXPORTER_NEWLINE_ESCAPE___"
cfg.Input.WebhookTextBulkSeparator = strings.Replace(cfg.Input.WebhookTextBulkSeparator, "\n", newlineEscape, -1)
cfg.Input.LineDelimiter = strings.Replace(cfg.Input.LineDelimiter, "\n", newlineEscape, -1)
out, err := yaml.Marshal(cfg)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "unexpected fatal error: failed to marshal config: %v", err)
Expand Down
23 changes: 23 additions & 0 deletions config/v3/configV3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package v3

import (
"fmt"
"regexp"
"strings"
"testing"
"time"
Expand All @@ -26,6 +27,7 @@ global:
config_version: 3
input:
type: file
line_delimiter: \n
path: x/x/x
fail_on_missing_logfile: false
readall: true
Expand All @@ -47,6 +49,7 @@ global:
config_version: 3
input:
type: file
line_delimiter: \n
path: x/x/x
metrics:
- type: gauge
Expand All @@ -66,6 +69,7 @@ global:
config_version: 3
input:
type: stdin
line_delimiter: \n
metrics:
- type: histogram
name: test_histogram
Expand All @@ -83,6 +87,7 @@ global:
config_version: 3
input:
type: stdin
line_delimiter: \n
metrics:
- type: summary
name: test_summary
Expand All @@ -100,6 +105,7 @@ global:
config_version: 3
input:
type: stdin
line_delimiter: \n
metrics:
- type: counter
name: test_count_total
Expand All @@ -121,6 +127,7 @@ global:
config_version: 3
input:
type: stdin
line_delimiter: \n
metrics:
- type: counter
name: test_count_total
Expand All @@ -139,6 +146,7 @@ global:
config_version: 3
input:
type: file
line_delimiter: \n
paths:
- /tmp/dir1/*.log
- /tmp/dir2/*.log
Expand All @@ -165,6 +173,7 @@ global:
config_version: 3
input:
type: file
line_delimiter: \n
path: /tmp/test/*.log
metrics:
- type: counter
Expand All @@ -181,6 +190,7 @@ global:
config_version: 3
input:
type: stdin
line_delimiter: \n
imports:
- type: metrics
file: /etc/grok/metrics.d/*.yaml
Expand Down Expand Up @@ -432,6 +442,19 @@ func TestImportSuccess(t *testing.T) {
expectMetric(t, cfg.AllMetrics[4], "test_summary_2", []string{"/var/log/syslog/*"}, 0, 4, 2*time.Hour+30*time.Minute)
}

func TestLineDelimiterGeneration(t *testing.T) {
re := regexp.MustCompile("[\\s]*line_delimiter:[^\\n]*")
cfgWithoutDelimiter := re.ReplaceAllString(counter_config, "")
cfgWithDelimiter, err := Unmarshal([]byte(cfgWithoutDelimiter))
if err != nil {
t.Fatalf("unexpected unmarshalling error: %v", err)
}
err = equalsIgnoreIndentation(cfgWithDelimiter.String(), counter_config)
if err != nil {
t.Fatalf("Expected:\n%v\nActual:\n%v\n%v", counter_config, cfgWithDelimiter, err)
}
}

func expectMetric(t *testing.T, metric MetricConfig, name string, paths []string, bucketLen, quantilesLen int, retention time.Duration) {
if metric.Name != name {
t.Fatalf("expected metric %v but found %v", name, metric.Name)
Expand Down
1 change: 1 addition & 0 deletions config/v3/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func convertGlobal(v2cfg *v2.Config) GlobalConfig {
func convertInput(v2cfg *v2.Config) InputConfig {
return InputConfig{
Type: v2cfg.Input.Type,
LineDelimiter: "\n",
PathsAndGlobs: convertPathsAndGlobs(v2cfg.Input.PathsAndGlobs),
FailOnMissingLogfileString: v2cfg.Input.FailOnMissingLogfileString,
FailOnMissingLogfile: v2cfg.Input.FailOnMissingLogfile,
Expand Down
4 changes: 4 additions & 0 deletions config/v3/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const empty_v2 = ``
const empty_v3 = `
global:
config_version: 3
input:
line_delimiter: \n
`

const full_v2 = `
Expand All @@ -33,6 +35,7 @@ global:
retention_check_interval: 3s
input:
type: file
line_delimiter: \n
paths:
- /path/to/file1.log
- /dir/with/*.log
Expand Down Expand Up @@ -89,6 +92,7 @@ global:
retention_check_interval: 3s
input:
type: file
line_delimiter: \n
paths:
- /path/to/file1.log
- /dir/with/*.log
Expand Down
6 changes: 3 additions & 3 deletions grok_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,18 +338,18 @@ func startTailer(cfg *v3.Config, registry prometheus.Registerer) (fswatcher.File
switch {
case cfg.Input.Type == "file":
if cfg.Input.PollInterval == 0 {
tail, err = fswatcher.RunFileTailer(cfg.Input.Globs, cfg.Input.Readall, cfg.Input.FailOnMissingLogfile, logger)
tail, err = fswatcher.RunFileTailer(cfg.Input.Globs, cfg.Input.Readall, cfg.Input.FailOnMissingLogfile, cfg.Input.LineDelimiter, logger)
if err != nil {
return nil, err
}
} else {
tail, err = fswatcher.RunPollingFileTailer(cfg.Input.Globs, cfg.Input.Readall, cfg.Input.FailOnMissingLogfile, cfg.Input.PollInterval, logger)
tail, err = fswatcher.RunPollingFileTailer(cfg.Input.Globs, cfg.Input.Readall, cfg.Input.FailOnMissingLogfile, cfg.Input.LineDelimiter, cfg.Input.PollInterval, logger)
if err != nil {
return nil, err
}
}
case cfg.Input.Type == "stdin":
tail = tailer.RunStdinTailer()
tail = tailer.RunStdinTailer(cfg.Input.LineDelimiter)
case cfg.Input.Type == "webhook":
tail = tailer.InitWebhookTailer(&cfg.Input)
default:
Expand Down
38 changes: 20 additions & 18 deletions tailer/fswatcher/fswatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,14 @@ type Line struct {
// Moreover, we should provide vars {{.filename}} and {{.filepath}} for labels.

type fileTailer struct {
globs []glob.Glob
watchedDirs []*Dir
watchedFiles map[string]*fileWithReader // path -> fileWithReader
osSpecific fswatcher
lines chan *Line
errors chan Error
done chan struct{}
lineDelimiter string
globs []glob.Glob
watchedDirs []*Dir
watchedFiles map[string]*fileWithReader // path -> fileWithReader
osSpecific fswatcher
lines chan *Line
errors chan Error
done chan struct{}
}

type fswatcher interface {
Expand Down Expand Up @@ -96,30 +97,31 @@ func (t *fileTailer) Close() {
close(t.done)
}

func RunFileTailer(globs []glob.Glob, readall bool, failOnMissingFile bool, log logrus.FieldLogger) (FileTailer, error) {
return runFileTailer(initWatcher, globs, readall, failOnMissingFile, log)
func RunFileTailer(globs []glob.Glob, readall bool, failOnMissingFile bool, lineDelimiter string, log logrus.FieldLogger) (FileTailer, error) {
return runFileTailer(initWatcher, globs, readall, failOnMissingFile, lineDelimiter, log)
}

func RunPollingFileTailer(globs []glob.Glob, readall bool, failOnMissingFile bool, pollInterval time.Duration, log logrus.FieldLogger) (FileTailer, error) {
func RunPollingFileTailer(globs []glob.Glob, readall bool, failOnMissingFile bool, lineDelimiter string, pollInterval time.Duration, log logrus.FieldLogger) (FileTailer, error) {
initFunc := func() (fswatcher, Error) {
return initPollingWatcher(pollInterval)
}
return runFileTailer(initFunc, globs, readall, failOnMissingFile, log)
return runFileTailer(initFunc, globs, readall, failOnMissingFile, lineDelimiter, log)
}

func runFileTailer(initFunc func() (fswatcher, Error), globs []glob.Glob, readall bool, failOnMissingFile bool, log logrus.FieldLogger) (FileTailer, error) {
func runFileTailer(initFunc func() (fswatcher, Error), globs []glob.Glob, readall bool, failOnMissingFile bool, lineDelimiter string, log logrus.FieldLogger) (FileTailer, error) {

var (
t *fileTailer
Err Error
)

t = &fileTailer{
globs: globs,
watchedFiles: make(map[string]*fileWithReader),
lines: make(chan *Line),
errors: make(chan Error),
done: make(chan struct{}),
lineDelimiter: lineDelimiter,
globs: globs,
watchedFiles: make(map[string]*fileWithReader),
lines: make(chan *Line),
errors: make(chan Error),
done: make(chan struct{}),
}

t.osSpecific, Err = initFunc()
Expand Down Expand Up @@ -326,7 +328,7 @@ func (t *fileTailer) syncFilesInDir(dir *Dir, readall bool, log logrus.FieldLogg
return Err
}

newFileWithReader := &fileWithReader{file: newFile, reader: NewLineReader()}
newFileWithReader := &fileWithReader{file: newFile, reader: NewLineReader(t.lineDelimiter)}
Err = t.readNewLines(newFileWithReader, fileLogger)
if Err != nil {
newFile.Close()
Expand Down
39 changes: 27 additions & 12 deletions tailer/fswatcher/linereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
package fswatcher

import (
"bytes"
"io"
"regexp"
"strings"
)

type lineReader struct {
lineDelimiter string
remainingBytesFromLastRead []byte
}

func NewLineReader() *lineReader {
func NewLineReader(lineDelimiter string) *lineReader {
return &lineReader{
lineDelimiter: lineDelimiter,
remainingBytesFromLastRead: []byte{},
}
}
Expand All @@ -41,19 +44,32 @@ func (r *lineReader) ReadLine(file io.Reader) (string, bool, error) {
err error
buf = make([]byte, 512)
n = 0
reg = regexp.MustCompile(r.lineDelimiter)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the user has an invalid regular expression in line_delimiter, grok_exporter should fail on startup with a reasonable error message. Please add a check for this to func (c *InputConfig) validate() in configV3.go.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you compile the regular expression when loading the config, you might as well store the result so that you don't need to compile it again here.

)
for {
newlinePos := bytes.IndexByte(r.remainingBytesFromLastRead, '\n')
if newlinePos >= 0 {
newlinesLoc := reg.FindAllIndex(r.remainingBytesFromLastRead, 2)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In almost all cases the line delimiter will be \n or \r\n, and in these cases it's overkill to evaluate regular expressions all the time. Please keep the original bytes.IndexByte() for performance reasons if the line delimiter is \n, and use the regular expression only if a different value for line delimiter is configured.

var newlinePos int
if newlinesLoc != nil {
// if first found match is not the first symbol(s)
newlinePos = newlinesLoc[0][0]
if len(newlinesLoc) == 2 && newlinePos == 0 {
// if first found match is message start symbol(s)
newlinePos = newlinesLoc[1][0]
}
}
if newlinePos != 0 {
l := len(r.remainingBytesFromLastRead)
result := make([]byte, newlinePos)
copy(result, r.remainingBytesFromLastRead[:newlinePos])
copy(r.remainingBytesFromLastRead, r.remainingBytesFromLastRead[newlinePos+1:])
r.remainingBytesFromLastRead = r.remainingBytesFromLastRead[:l-(newlinePos+1)]
return string(stripWindowsLineEnding(result)), false, nil
return stripLineEnding(string(result)), false, nil
} else if err != nil {
if err == io.EOF {
return "", true, nil
result := make([]byte, len(r.remainingBytesFromLastRead))
Copy link
Owner

@fstab fstab Aug 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The application might be in the middle of writing a log line when we hit eof. If we return what we have read, we will return part of the line. As soon as the application finishes writing the line, we will return the other part of the line. That means, we cut the line in two. This should not happen. please leave

return "", true, nil

so that we wait until the line is complete.

copy(result, r.remainingBytesFromLastRead)
r.remainingBytesFromLastRead = []byte{}
return stripLineEnding(string(result)), true, nil
} else {
return "", false, err
}
Expand All @@ -67,12 +83,11 @@ func (r *lineReader) ReadLine(file io.Reader) (string, bool, error) {
}
}

func stripWindowsLineEnding(s []byte) []byte {
if len(s) > 0 && s[len(s)-1] == '\r' {
return s[:len(s)-1]
} else {
return s
}
func stripLineEnding(s string) string {
// in standard delimiter case
s = strings.TrimPrefix(s, "\n")
s = strings.TrimSuffix(s, "\r")
return s
}

func (r *lineReader) Clear() {
Expand Down
Loading