diff --git a/CONFIG.md b/CONFIG.md index 579c8629..745b59a2 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -114,8 +114,24 @@ input: poll_interval: 5s # should NOT be needed in most cases, see below ``` +Example 3: + +```yaml +input: + type: file + line_delimiter: "[0-9]{2}.[0-9]{2}.[0-9]{4}" + path: + - /var/logdir2/*.log + readall: false + fail_on_missing_logfile: true + poll_interval: 5s # should NOT be needed in most cases, see below +``` + The `path` is the path to the log file. `path` is used if you want to monitor a single path. If you want to monitor a list of paths, use `paths` instead, as in example 2 above. [Glob] patterns are supported on the file level, but not on the directory level. If you want to monitor multiple logfiles, see also [restricting a metric to specific log files](#restricting-a-metric-to-specific-log-files) and [pre-defined label variables](#pre-defined-label-variables) below. +The `line_delimiter` you may use to handle multiline messages. Some services, such as Oracle DB, produce logs in a format +with specific message delimiter (for example, date). If not set, will be line break symbol by default. + The `readall` flag defines if `grok_exporter` starts reading from the beginning or the end of the file. True means we read the whole file, false means we start at the end of the file and read only new lines. True is good for debugging, because we process all available log lines. @@ -375,7 +391,7 @@ labels: With the incoming log object being: ```json -{"message": "Login occured", "user": "Skeen", "ip": "1.1.1.1"}' +{"message": "Login occured", "user": "Skeen", "ip": "1.1.1.1"} ``` ### Label Template Functions diff --git a/config/config_test.go b/config/config_test.go index 38654694..a49760aa 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -24,6 +24,7 @@ global: PLACEHOLDER input: type: file + line_delimiter: \n path: x/x/x readall: true grok: @@ -41,6 +42,7 @@ server: const globalMissing = ` input: type: file + line_delimiter: \n path: x/x/x readall: true grok: diff --git a/config/v3/configV3.go b/config/v3/configV3.go index 9479205c..8561e88e 100644 --- a/config/v3/configV3.go +++ b/config/v3/configV3.go @@ -92,6 +92,7 @@ type GlobalConfig struct { type InputConfig struct { Type string `yaml:",omitempty"` + LineDelimiter string `yaml:"line_delimiter"` PathsAndGlobs `yaml:",inline"` FailOnMissingLogfileString string `yaml:"fail_on_missing_logfile,omitempty"` // cannot use bool directly, because yaml.v2 doesn't support true as default value. FailOnMissingLogfile bool `yaml:"-"` @@ -249,6 +250,11 @@ func (c *InputConfig) addDefaults() { if c.Type == inputTypeFile && len(c.FailOnMissingLogfileString) == 0 { c.FailOnMissingLogfileString = "true" } + if c.Type == inputTypeFile || c.Type == inputTypeStdin { + if len(c.LineDelimiter) == 0 { + c.LineDelimiter = "\n" + } + } if c.Type == inputTypeWebhook { if len(c.WebhookPath) == 0 { c.WebhookPath = "/webhook" @@ -680,6 +686,7 @@ func (cfg *Config) copy() *Config { func (cfg *Config) marshalToString() string { var newlineEscape = "___GROK_EXPORTER_NEWLINE_ESCAPE___" cfg.Input.WebhookTextBulkSeparator = strings.Replace(cfg.Input.WebhookTextBulkSeparator, "\n", newlineEscape, -1) + cfg.Input.LineDelimiter = strings.Replace(cfg.Input.LineDelimiter, "\n", newlineEscape, -1) out, err := yaml.Marshal(cfg) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "unexpected fatal error: failed to marshal config: %v", err) diff --git a/config/v3/configV3_test.go b/config/v3/configV3_test.go index 66de53ef..e85d80ec 100644 --- a/config/v3/configV3_test.go +++ b/config/v3/configV3_test.go @@ -16,6 +16,7 @@ package v3 import ( "fmt" + "regexp" "strings" "testing" "time" @@ -26,6 +27,7 @@ global: config_version: 3 input: type: file + line_delimiter: \n path: x/x/x fail_on_missing_logfile: false readall: true @@ -47,6 +49,7 @@ global: config_version: 3 input: type: file + line_delimiter: \n path: x/x/x metrics: - type: gauge @@ -66,6 +69,7 @@ global: config_version: 3 input: type: stdin + line_delimiter: \n metrics: - type: histogram name: test_histogram @@ -83,6 +87,7 @@ global: config_version: 3 input: type: stdin + line_delimiter: \n metrics: - type: summary name: test_summary @@ -100,6 +105,7 @@ global: config_version: 3 input: type: stdin + line_delimiter: \n metrics: - type: counter name: test_count_total @@ -121,6 +127,7 @@ global: config_version: 3 input: type: stdin + line_delimiter: \n metrics: - type: counter name: test_count_total @@ -139,6 +146,7 @@ global: config_version: 3 input: type: file + line_delimiter: \n paths: - /tmp/dir1/*.log - /tmp/dir2/*.log @@ -165,6 +173,7 @@ global: config_version: 3 input: type: file + line_delimiter: \n path: /tmp/test/*.log metrics: - type: counter @@ -181,6 +190,7 @@ global: config_version: 3 input: type: stdin + line_delimiter: \n imports: - type: metrics file: /etc/grok/metrics.d/*.yaml @@ -432,6 +442,19 @@ func TestImportSuccess(t *testing.T) { expectMetric(t, cfg.AllMetrics[4], "test_summary_2", []string{"/var/log/syslog/*"}, 0, 4, 2*time.Hour+30*time.Minute) } +func TestLineDelimiterGeneration(t *testing.T) { + re := regexp.MustCompile("[\\s]*line_delimiter:[^\\n]*") + cfgWithoutDelimiter := re.ReplaceAllString(counter_config, "") + cfgWithDelimiter, err := Unmarshal([]byte(cfgWithoutDelimiter)) + if err != nil { + t.Fatalf("unexpected unmarshalling error: %v", err) + } + err = equalsIgnoreIndentation(cfgWithDelimiter.String(), counter_config) + if err != nil { + t.Fatalf("Expected:\n%v\nActual:\n%v\n%v", counter_config, cfgWithDelimiter, err) + } +} + func expectMetric(t *testing.T, metric MetricConfig, name string, paths []string, bucketLen, quantilesLen int, retention time.Duration) { if metric.Name != name { t.Fatalf("expected metric %v but found %v", name, metric.Name) diff --git a/config/v3/converter.go b/config/v3/converter.go index 37be496e..8c029b95 100644 --- a/config/v3/converter.go +++ b/config/v3/converter.go @@ -43,6 +43,7 @@ func convertGlobal(v2cfg *v2.Config) GlobalConfig { func convertInput(v2cfg *v2.Config) InputConfig { return InputConfig{ Type: v2cfg.Input.Type, + LineDelimiter: "\n", PathsAndGlobs: convertPathsAndGlobs(v2cfg.Input.PathsAndGlobs), FailOnMissingLogfileString: v2cfg.Input.FailOnMissingLogfileString, FailOnMissingLogfile: v2cfg.Input.FailOnMissingLogfile, diff --git a/config/v3/converter_test.go b/config/v3/converter_test.go index 31f7f08e..c2dc8d7a 100644 --- a/config/v3/converter_test.go +++ b/config/v3/converter_test.go @@ -25,6 +25,8 @@ const empty_v2 = `` const empty_v3 = ` global: config_version: 3 +input: + line_delimiter: \n ` const full_v2 = ` @@ -33,6 +35,7 @@ global: retention_check_interval: 3s input: type: file + line_delimiter: \n paths: - /path/to/file1.log - /dir/with/*.log @@ -89,6 +92,7 @@ global: retention_check_interval: 3s input: type: file + line_delimiter: \n paths: - /path/to/file1.log - /dir/with/*.log diff --git a/grok_exporter.go b/grok_exporter.go index c42b87e1..a24c800a 100644 --- a/grok_exporter.go +++ b/grok_exporter.go @@ -338,18 +338,18 @@ func startTailer(cfg *v3.Config, registry prometheus.Registerer) (fswatcher.File switch { case cfg.Input.Type == "file": if cfg.Input.PollInterval == 0 { - tail, err = fswatcher.RunFileTailer(cfg.Input.Globs, cfg.Input.Readall, cfg.Input.FailOnMissingLogfile, logger) + tail, err = fswatcher.RunFileTailer(cfg.Input.Globs, cfg.Input.Readall, cfg.Input.FailOnMissingLogfile, cfg.Input.LineDelimiter, logger) if err != nil { return nil, err } } else { - tail, err = fswatcher.RunPollingFileTailer(cfg.Input.Globs, cfg.Input.Readall, cfg.Input.FailOnMissingLogfile, cfg.Input.PollInterval, logger) + tail, err = fswatcher.RunPollingFileTailer(cfg.Input.Globs, cfg.Input.Readall, cfg.Input.FailOnMissingLogfile, cfg.Input.LineDelimiter, cfg.Input.PollInterval, logger) if err != nil { return nil, err } } case cfg.Input.Type == "stdin": - tail = tailer.RunStdinTailer() + tail = tailer.RunStdinTailer(cfg.Input.LineDelimiter) case cfg.Input.Type == "webhook": tail = tailer.InitWebhookTailer(&cfg.Input) default: diff --git a/tailer/fswatcher/fswatcher.go b/tailer/fswatcher/fswatcher.go index a9ffa797..d2172a51 100644 --- a/tailer/fswatcher/fswatcher.go +++ b/tailer/fswatcher/fswatcher.go @@ -48,13 +48,14 @@ type Line struct { // Moreover, we should provide vars {{.filename}} and {{.filepath}} for labels. type fileTailer struct { - globs []glob.Glob - watchedDirs []*Dir - watchedFiles map[string]*fileWithReader // path -> fileWithReader - osSpecific fswatcher - lines chan *Line - errors chan Error - done chan struct{} + lineDelimiter string + globs []glob.Glob + watchedDirs []*Dir + watchedFiles map[string]*fileWithReader // path -> fileWithReader + osSpecific fswatcher + lines chan *Line + errors chan Error + done chan struct{} } type fswatcher interface { @@ -96,18 +97,18 @@ func (t *fileTailer) Close() { close(t.done) } -func RunFileTailer(globs []glob.Glob, readall bool, failOnMissingFile bool, log logrus.FieldLogger) (FileTailer, error) { - return runFileTailer(initWatcher, globs, readall, failOnMissingFile, log) +func RunFileTailer(globs []glob.Glob, readall bool, failOnMissingFile bool, lineDelimiter string, log logrus.FieldLogger) (FileTailer, error) { + return runFileTailer(initWatcher, globs, readall, failOnMissingFile, lineDelimiter, log) } -func RunPollingFileTailer(globs []glob.Glob, readall bool, failOnMissingFile bool, pollInterval time.Duration, log logrus.FieldLogger) (FileTailer, error) { +func RunPollingFileTailer(globs []glob.Glob, readall bool, failOnMissingFile bool, lineDelimiter string, pollInterval time.Duration, log logrus.FieldLogger) (FileTailer, error) { initFunc := func() (fswatcher, Error) { return initPollingWatcher(pollInterval) } - return runFileTailer(initFunc, globs, readall, failOnMissingFile, log) + return runFileTailer(initFunc, globs, readall, failOnMissingFile, lineDelimiter, log) } -func runFileTailer(initFunc func() (fswatcher, Error), globs []glob.Glob, readall bool, failOnMissingFile bool, log logrus.FieldLogger) (FileTailer, error) { +func runFileTailer(initFunc func() (fswatcher, Error), globs []glob.Glob, readall bool, failOnMissingFile bool, lineDelimiter string, log logrus.FieldLogger) (FileTailer, error) { var ( t *fileTailer @@ -115,11 +116,12 @@ func runFileTailer(initFunc func() (fswatcher, Error), globs []glob.Glob, readal ) t = &fileTailer{ - globs: globs, - watchedFiles: make(map[string]*fileWithReader), - lines: make(chan *Line), - errors: make(chan Error), - done: make(chan struct{}), + lineDelimiter: lineDelimiter, + globs: globs, + watchedFiles: make(map[string]*fileWithReader), + lines: make(chan *Line), + errors: make(chan Error), + done: make(chan struct{}), } t.osSpecific, Err = initFunc() @@ -326,7 +328,7 @@ func (t *fileTailer) syncFilesInDir(dir *Dir, readall bool, log logrus.FieldLogg return Err } - newFileWithReader := &fileWithReader{file: newFile, reader: NewLineReader()} + newFileWithReader := &fileWithReader{file: newFile, reader: NewLineReader(t.lineDelimiter)} Err = t.readNewLines(newFileWithReader, fileLogger) if Err != nil { newFile.Close() diff --git a/tailer/fswatcher/linereader.go b/tailer/fswatcher/linereader.go index 5dc74584..422ebdf6 100644 --- a/tailer/fswatcher/linereader.go +++ b/tailer/fswatcher/linereader.go @@ -15,16 +15,19 @@ package fswatcher import ( - "bytes" "io" + "regexp" + "strings" ) type lineReader struct { + lineDelimiter string remainingBytesFromLastRead []byte } -func NewLineReader() *lineReader { +func NewLineReader(lineDelimiter string) *lineReader { return &lineReader{ + lineDelimiter: lineDelimiter, remainingBytesFromLastRead: []byte{}, } } @@ -41,19 +44,32 @@ func (r *lineReader) ReadLine(file io.Reader) (string, bool, error) { err error buf = make([]byte, 512) n = 0 + reg = regexp.MustCompile(r.lineDelimiter) ) for { - newlinePos := bytes.IndexByte(r.remainingBytesFromLastRead, '\n') - if newlinePos >= 0 { + newlinesLoc := reg.FindAllIndex(r.remainingBytesFromLastRead, 2) + var newlinePos int + if newlinesLoc != nil { + // if first found match is not the first symbol(s) + newlinePos = newlinesLoc[0][0] + if len(newlinesLoc) == 2 && newlinePos == 0 { + // if first found match is message start symbol(s) + newlinePos = newlinesLoc[1][0] + } + } + if newlinePos != 0 { l := len(r.remainingBytesFromLastRead) result := make([]byte, newlinePos) copy(result, r.remainingBytesFromLastRead[:newlinePos]) copy(r.remainingBytesFromLastRead, r.remainingBytesFromLastRead[newlinePos+1:]) r.remainingBytesFromLastRead = r.remainingBytesFromLastRead[:l-(newlinePos+1)] - return string(stripWindowsLineEnding(result)), false, nil + return stripLineEnding(string(result)), false, nil } else if err != nil { if err == io.EOF { - return "", true, nil + result := make([]byte, len(r.remainingBytesFromLastRead)) + copy(result, r.remainingBytesFromLastRead) + r.remainingBytesFromLastRead = []byte{} + return stripLineEnding(string(result)), true, nil } else { return "", false, err } @@ -67,12 +83,11 @@ func (r *lineReader) ReadLine(file io.Reader) (string, bool, error) { } } -func stripWindowsLineEnding(s []byte) []byte { - if len(s) > 0 && s[len(s)-1] == '\r' { - return s[:len(s)-1] - } else { - return s - } +func stripLineEnding(s string) string { + // in standard delimiter case + s = strings.TrimPrefix(s, "\n") + s = strings.TrimSuffix(s, "\r") + return s } func (r *lineReader) Clear() { diff --git a/tailer/fswatcher_test.go b/tailer/fswatcher_test.go index 97455f86..2d094207 100644 --- a/tailer/fswatcher_test.go +++ b/tailer/fswatcher_test.go @@ -600,6 +600,7 @@ func startFileTailer(t *testing.T, ctx *context, params []string) { tailer fswatcher.FileTailer readall = false failOnMissingFile = true + lineDelimiter = "\n" globs []string err error ) @@ -625,9 +626,9 @@ func startFileTailer(t *testing.T, ctx *context, params []string) { parsedGlobs = append(parsedGlobs, parsedGlob) } if ctx.tailerCfg == fseventTailer { - tailer, err = fswatcher.RunFileTailer(parsedGlobs, readall, failOnMissingFile, ctx.log) + tailer, err = fswatcher.RunFileTailer(parsedGlobs, readall, failOnMissingFile, lineDelimiter, ctx.log) } else { - tailer, err = fswatcher.RunPollingFileTailer(parsedGlobs, readall, failOnMissingFile, 10*time.Millisecond, ctx.log) + tailer, err = fswatcher.RunPollingFileTailer(parsedGlobs, readall, failOnMissingFile, lineDelimiter, 10*time.Millisecond, ctx.log) } if err != nil { fatalf(t, ctx, "%v", err) @@ -944,7 +945,7 @@ func runTestShutdown(t *testing.T, mode string) { if err != nil { fatalf(t, ctx, "%q: failed to parse glob: %q", parsedGlob, err) } - tailer, err := fswatcher.RunFileTailer([]glob.Glob{parsedGlob}, false, true, ctx.log) + tailer, err := fswatcher.RunFileTailer([]glob.Glob{parsedGlob}, false, true, "\n", ctx.log) if err != nil { fatalf(t, ctx, "failed to start tailer: %v", err) } diff --git a/tailer/stdinTailer.go b/tailer/stdinTailer.go index 49376f36..73cfe7e2 100644 --- a/tailer/stdinTailer.go +++ b/tailer/stdinTailer.go @@ -15,7 +15,6 @@ package tailer import ( - "bufio" "github.com/fstab/grok_exporter/tailer/fswatcher" "os" "strings" @@ -38,13 +37,14 @@ func (t *stdinTailer) Close() { // TODO: How to stop the go-routine reading on stdin? } -func RunStdinTailer() fswatcher.FileTailer { +func RunStdinTailer(lineDelimiter string) fswatcher.FileTailer { lineChan := make(chan *fswatcher.Line) errorChan := make(chan fswatcher.Error) go func() { - reader := bufio.NewReader(os.Stdin) + reader := fswatcher.NewLineReader(lineDelimiter) for { - line, err := reader.ReadString('\n') + // ignoring eof + line, _, err := reader.ReadLine(os.Stdin) if err != nil { errorChan <- fswatcher.NewError(fswatcher.NotSpecified, err, "") return