Skip to content

Commit

Permalink
multiple logfile config
Browse files Browse the repository at this point in the history
  • Loading branch information
fstab committed Jan 2, 2020
1 parent 02d30cb commit df2f8ec
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 44 deletions.
92 changes: 83 additions & 9 deletions config/v2/configV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package v2

import (
"fmt"
"github.com/fstab/grok_exporter/tailer/glob"
"github.com/fstab/grok_exporter/template"
"gopkg.in/yaml.v2"
"strconv"
Expand Down Expand Up @@ -57,8 +58,8 @@ type GlobalConfig struct {
}

type InputConfig struct {
Type string `yaml:",omitempty"`
Path string `yaml:",omitempty"`
Type string `yaml:",omitempty"`
PathsAndGlobs `yaml:",inline"`
FailOnMissingLogfileString string `yaml:"fail_on_missing_logfile,omitempty"` // cannot use bool directly, because yaml.v2 doesn't support true as default value.
FailOnMissingLogfile bool `yaml:"-"`
Readall bool `yaml:",omitempty"`
Expand All @@ -76,10 +77,17 @@ type GrokConfig struct {
AdditionalPatterns []string `yaml:"additional_patterns,omitempty"`
}

type PathsAndGlobs struct {
Path string `yaml:",omitempty"`
Paths []string `yaml:",omitempty"`
Globs []glob.Glob `yaml:"-"`
}

type MetricConfig struct {
Type string `yaml:",omitempty"`
Name string `yaml:",omitempty"`
Help string `yaml:",omitempty"`
Type string `yaml:",omitempty"`
Name string `yaml:",omitempty"`
Help string `yaml:",omitempty"`
PathsAndGlobs `yaml:",inline"`
Match string `yaml:",omitempty"`
Retention time.Duration `yaml:",omitempty"` // implicitly parsed with time.ParseDuration()
Value string `yaml:",omitempty"`
Expand Down Expand Up @@ -184,22 +192,53 @@ func (cfg *Config) validate() error {
return nil
}

func validateGlobs(p *PathsAndGlobs, optional bool, prefix string) error {
if !optional && len(p.Path) == 0 && len(p.Paths) == 0 {
return fmt.Errorf("%v: one of 'path' or 'paths' is required", prefix)
}
if len(p.Path) > 0 && len(p.Paths) > 0 {
return fmt.Errorf("%v: use either 'path' or 'paths' but not both", prefix)
}
if len(p.Path) > 0 {
parsedGlob, err := glob.Parse(p.Path)
if err != nil {
return fmt.Errorf("%v: %v", prefix, err)
}
p.Globs = []glob.Glob{parsedGlob}
}
if len(p.Paths) > 0 {
p.Globs = make([]glob.Glob, 0, len(p.Paths))
for _, path := range p.Paths {
parsedGlob, err := glob.Parse(path)
if err != nil {
return fmt.Errorf("%v: %v", prefix, err)
}
p.Globs = append(p.Globs, parsedGlob)
}
}
return nil
}

func (c *InputConfig) validate() error {
var err error
switch {
case c.Type == inputTypeStdin:
if c.Path != "" {
if len(c.Path) > 0 {
return fmt.Errorf("invalid input configuration: cannot use 'input.path' when 'input.type' is stdin")
}
if len(c.Paths) > 0 {
return fmt.Errorf("invalid input configuration: cannot use 'input.paths' when 'input.type' is stdin")
}
if c.Readall {
return fmt.Errorf("invalid input configuration: cannot use 'input.readall' when 'input.type' is stdin")
}
if c.PollIntervalSeconds != "" {
return fmt.Errorf("invalid input configuration: cannot use 'input.poll_interval_seconds' when 'input.type' is stdin")
}
case c.Type == inputTypeFile:
if c.Path == "" {
return fmt.Errorf("invalid input configuration: 'input.path' is required for input type \"file\"")
err = validateGlobs(&c.PathsAndGlobs, false, "invalid input configuration")
if err != nil {
return err
}
if len(c.PollIntervalSeconds) > 0 { // TODO: Use duration directly, as with other durations in the config file
nSeconds, err := strconv.Atoi(c.PollIntervalSeconds)
Expand All @@ -215,6 +254,18 @@ func (c *InputConfig) validate() error {
}
}
case c.Type == inputTypeWebhook:
if c.Path != "" {
return fmt.Errorf("invalid input configuration: cannot use 'input.path' when 'input.type' is %v", inputTypeWebhook)
}
if len(c.Paths) > 0 {
return fmt.Errorf("invalid input configuration: cannot use 'input.paths' when 'input.type' is %v", inputTypeWebhook)
}
if c.Readall {
return fmt.Errorf("invalid input configuration: cannot use 'input.readall' when 'input.type' is %v", inputTypeWebhook)
}
if c.PollIntervalSeconds != "" {
return fmt.Errorf("invalid input configuration: cannot use 'input.poll_interval_seconds' when 'input.type' is %v", inputTypeWebhook)
}
if c.WebhookPath == "" {
return fmt.Errorf("invalid input configuration: 'input.webhook_path' is required for input type \"webhook\"")
} else if c.WebhookPath[0] != '/' {
Expand Down Expand Up @@ -249,7 +300,8 @@ func (c *MetricsConfig) validate() error {
return fmt.Errorf("Invalid metrics configuration: 'metrics' must not be empty.")
}
metricNames := make(map[string]bool)
for _, metric := range *c {
for i, _ := range *c {
metric := &(*c)[i] // validate modifies the metric, therefore we must use it by reference here.
err := metric.validate()
if err != nil {
return err
Expand All @@ -259,6 +311,14 @@ func (c *MetricsConfig) validate() error {
return fmt.Errorf("Invalid metric configuration: metric '%v' defined twice.", metric.Name)
}
metricNames[metric.Name] = true

if len(metric.Path) > 0 && len(metric.Paths) > 0 {
return fmt.Errorf("invalid metric configuration: metric %v defines both path and paths, you should use either one or the other", metric.Name)
}
if len(metric.Path) > 0 {
metric.Paths = []string{metric.Path}
metric.Path = ""
}
}
return nil
}
Expand All @@ -274,6 +334,10 @@ func (c *MetricConfig) validate() error {
case c.Match == "":
return fmt.Errorf("Invalid metric configuration: 'metrics.match' must not be empty.")
}
err := validateGlobs(&c.PathsAndGlobs, true, fmt.Sprintf("invalid metric configuration: %v", c.Name))
if err != nil {
return err
}
var hasValue, cumulativeAllowed, bucketsAllowed, quantilesAllowed bool
switch c.Type {
case "counter":
Expand Down Expand Up @@ -411,6 +475,16 @@ func (cfg *Config) String() string {
if stripped.Server.Path == "/metrics" {
stripped.Server.Path = ""
}
if len(stripped.Input.Paths) == 1 {
stripped.Input.Path = stripped.Input.Paths[0]
stripped.Input.Paths = nil
}
for i := range stripped.Metrics {
if len(stripped.Metrics[i].Paths) == 1 {
stripped.Metrics[i].Path = stripped.Metrics[i].Paths[i]
stripped.Metrics[i].Paths = nil
}
}
return stripped.marshalToString()
}

Expand Down
73 changes: 73 additions & 0 deletions config/v2/configV2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,34 @@ server:
port: 9144
`

const multiple_paths_config = `
global:
config_version: 2
input:
type: file
paths:
- /tmp/dir1/*.log
- /tmp/dir2/*.log
fail_on_missing_logfile: false
readall: true
grok:
patterns_dir: b/c
metrics:
- type: counter
name: test_count_total
help: Dummy help message.
paths:
- /tmp/dir1/*.log
- /tmp/dir2/*.log
match: Some text here, then a %{DATE}.
labels:
label_a: '{{.some_grok_field_a}}'
label_b: '{{.some_grok_field_b}}'
server:
protocol: https
port: 1111
`

func TestCounterValidConfig(t *testing.T) {
loadOrFail(t, counter_config)
}
Expand Down Expand Up @@ -255,6 +283,51 @@ func TestRetentionInvalidConfig(t *testing.T) {
}
}

func TestPathsValidConfig(t *testing.T) {
loadOrFail(t, multiple_paths_config)
}

func TestDuplicateInputPaths(t *testing.T) {
var s = `type: file
path: /some/path/file.log`
invalidCfg := strings.Replace(multiple_paths_config, "type: file", s, 1)
_, err := Unmarshal([]byte(invalidCfg))
if err == nil {
t.Fatal("Expected error, but unmarshalling was successful.")
}
// Make sure it's the right error and not an error accidentally caused by incorrect indentation of the injected 'path' field.
if !strings.Contains(err.Error(), "use either 'path' or 'paths' but not both") {
t.Fatalf("Expected error message about path and paths being mutually exclusive, but got %v", err)
}
}

func TestDuplicateMetricPaths(t *testing.T) {
var s = `help: Dummy help message.
path: /some/path/file.log`
invalidCfg := strings.Replace(multiple_paths_config, "help: Dummy help message.", s, 1)
_, err := Unmarshal([]byte(invalidCfg))
if err == nil {
t.Fatal("Expected error, but unmarshalling was successful.")
}
// Make sure it's the right error and not an error accidentally caused by incorrect indentation of the injected 'path' field.
if !strings.Contains(err.Error(), "use either 'path' or 'paths' but not both") {
t.Fatalf("Expected error message about path and paths being mutually exclusive, but got %v", err)
}
}

func TestGlobsAreGenerated(t *testing.T) {
cfg, err := Unmarshal([]byte(multiple_paths_config))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(cfg.Input.Globs) != 2 {
t.Fatalf("expected 2 Globs in input config, but found %v", len(cfg.Input.Globs))
}
if len(cfg.Metrics[0].Globs) != 2 {
t.Fatalf("expected 2 Globs in metric config, but found %v", len(cfg.Metrics[0].Globs))
}
}

func loadOrFail(t *testing.T, cfgString string) *Config {
cfg, err := Unmarshal([]byte(cfgString))
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions exporter/grok.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package exporter

import (
"fmt"
"github.com/fstab/grok_exporter/config/v2"
configuration "github.com/fstab/grok_exporter/config/v2"
"github.com/fstab/grok_exporter/oniguruma"
"github.com/fstab/grok_exporter/template"
"regexp"
Expand All @@ -36,7 +36,7 @@ func Compile(pattern string, patterns *Patterns) (*oniguruma.Regex, error) {
return result, nil
}

func VerifyFieldNames(m *v2.MetricConfig, regex, deleteRegex *oniguruma.Regex, additionalFieldDefinitions map[string]string) error {
func VerifyFieldNames(m *configuration.MetricConfig, regex, deleteRegex *oniguruma.Regex, additionalFieldDefinitions map[string]string) error {
for _, template := range m.LabelTemplates {
err := verifyFieldName(m.Name, template, regex, additionalFieldDefinitions)
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions exporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
configuration "github.com/fstab/grok_exporter/config/v2"
"github.com/fstab/grok_exporter/oniguruma"
"github.com/fstab/grok_exporter/tailer/glob"
"github.com/fstab/grok_exporter/template"
"github.com/prometheus/client_golang/prometheus"
"strconv"
Expand All @@ -33,6 +34,7 @@ type Metric interface {
Name() string
Collector() prometheus.Collector

PathMatches(logfilePath string) bool
// Returns the match if the line matched, and nil if the line didn't match.
ProcessMatch(line string, additionalFields map[string]string) (*Match, error)
// Returns the match if the delete pattern matched, nil otherwise.
Expand All @@ -44,6 +46,7 @@ type Metric interface {
// Common values for incMetric and observeMetric
type metric struct {
name string
globs []glob.Glob
regex *oniguruma.Regex
deleteRegex *oniguruma.Regex
retention time.Duration
Expand Down Expand Up @@ -116,6 +119,18 @@ func (m *metric) Name() string {
return m.name
}

func (m *metric) PathMatches(logfilePath string) bool {
if len(m.globs) == 0 {
return true
}
for _, g := range m.globs {
if g.Match(logfilePath) {
return true
}
}
return false
}

func (m *counterMetric) Collector() prometheus.Collector {
return m.counter
}
Expand Down Expand Up @@ -375,6 +390,7 @@ func (m *summaryVecMetric) ProcessRetention() error {
func newMetric(cfg *configuration.MetricConfig, regex, deleteRegex *oniguruma.Regex) metric {
return metric{
name: cfg.Name,
globs: cfg.Globs,
regex: regex,
deleteRegex: deleteRegex,
retention: cfg.Retention,
Expand Down
25 changes: 16 additions & 9 deletions grok_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/fstab/grok_exporter/oniguruma"
"github.com/fstab/grok_exporter/tailer"
"github.com/fstab/grok_exporter/tailer/fswatcher"
"github.com/fstab/grok_exporter/tailer/glob"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"os"
Expand Down Expand Up @@ -100,7 +99,7 @@ func main() {
case err := <-serverErrors:
exitOnError(fmt.Errorf("server error: %v", err.Error()))
case err := <-tail.Errors():
if os.IsNotExist(err.Cause()) {
if err.Type() == fswatcher.FileNotFound || os.IsNotExist(err.Cause()) {
exitOnError(fmt.Errorf("error reading log lines: %v: use 'fail_on_missing_logfile: false' in the input configuration if you want grok_exporter to start even though the logfile is missing", err))
} else {
exitOnError(fmt.Errorf("error reading log lines: %v", err.Error()))
Expand All @@ -109,6 +108,9 @@ func main() {
matched := false
for _, metric := range metrics {
start := time.Now()
if !metric.PathMatches(line.File) {
continue
}
match, err := metric.ProcessMatch(line.Line, makeAdditionalFields(line))
if err != nil {
fmt.Fprintf(os.Stderr, "WARNING: skipping log line: %v\n", err.Error())
Expand Down Expand Up @@ -306,19 +308,24 @@ func startServer(cfg v2.ServerConfig, httpHandlers []exporter.HttpServerPathHand
}

func startTailer(cfg *v2.Config) (fswatcher.FileTailer, error) {
var (
tail fswatcher.FileTailer
err error
)
logger := logrus.New()
logger.Level = logrus.WarnLevel
var tail fswatcher.FileTailer
g, err := glob.FromPath(cfg.Input.Path)
if err != nil {
return nil, err
}
switch {
case cfg.Input.Type == "file":
if cfg.Input.PollInterval == 0 {
tail, err = fswatcher.RunFileTailer([]glob.Glob{g}, cfg.Input.Readall, cfg.Input.FailOnMissingLogfile, logger)
tail, err = fswatcher.RunFileTailer(cfg.Input.Globs, cfg.Input.Readall, cfg.Input.FailOnMissingLogfile, logger)
if err != nil {
return nil, err
}
} else {
tail, err = fswatcher.RunPollingFileTailer([]glob.Glob{g}, cfg.Input.Readall, cfg.Input.FailOnMissingLogfile, cfg.Input.PollInterval, logger)
tail, err = fswatcher.RunPollingFileTailer(cfg.Input.Globs, cfg.Input.Readall, cfg.Input.FailOnMissingLogfile, cfg.Input.PollInterval, logger)
if err != nil {
return nil, err
}
}
case cfg.Input.Type == "stdin":
tail = tailer.RunStdinTailer()
Expand Down
Loading

0 comments on commit df2f8ec

Please sign in to comment.