diff --git a/CHANGELOG.md b/CHANGELOG.md index ce20392061..b66e225bae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,8 @@ Main (unreleased) - Add htpasswd file based authentication for `otelcol.auth.basic` (@pkarakal) +- Add a `regex` argument to the `logfmt` and `json` stages in `loki.process` to extract log fields matching a regular expression. (@timonegk) + ### Enhancements - update promtail converter to use `file_match` block for `loki.source.file` instead of going through `local.file_match`. (@kalleep) diff --git a/docs/sources/reference/components/loki/loki.process.md b/docs/sources/reference/components/loki/loki.process.md index 226670e9fe..fc276c555b 100644 --- a/docs/sources/reference/components/loki/loki.process.md +++ b/docs/sources/reference/components/loki/loki.process.md @@ -496,12 +496,15 @@ The following arguments are supported: | Name | Type | Description | Default | Required | | ---------------- | ------------- | ----------------------------------------------------- | ------- | -------- | -| `expressions` | `map(string)` | Key-value pairs of JMESPath expressions. | | yes | +| `expressions` | `map(string)` | Key-value pairs of JMESPath expressions. | | no | +| `regex` | `string` | Regular expression matched against JSON keys. | | no | | `drop_malformed` | `bool` | Drop lines whose input can't be parsed as valid JSON. | `false` | no | | `source` | `string` | Source of the data to parse as JSON. | `""` | no | The `expressions` field is the set of key-value pairs of JMESPath expressions to run. -The map key defines the name with which the data is extracted, while the map value is the expression used to populate the value. +The map key defines the name used to extract the data, while the map value is the expression used to populate the value. + +The `regex` field is a regular expression. All keys in the JSON source matching the regular expression are extracted. When configuring a JSON stage, the `source` field defines the source of data to parse as JSON. By default, this is the log line itself, but it can also be a previously extracted value. @@ -653,19 +656,19 @@ The `stage.logfmt` inner block configures a processing stage that reads incoming The following arguments are supported: -| Name | Type | Description | Default | Required | -| --------- | ------------- | ---------------------------------------------- | ------- | -------- | -| `mapping` | `map(string)` | Key-value pairs of `logmft` fields to extract. | | yes | -| `source` | `string` | Source of the data to parse as `logfmt`. | `""` | no | +| Name | Type | Description | Default | Required | +| --------- | ------------- | ----------------------------------------------- | ------- | -------- | +| `mapping` | `map(string)` | Key-value pairs of `logmft` fields to extract. | | no | +| `regex` | `string` | Regular expression matched against logfmt keys. | | no | +| `source` | `string` | Source of the data to parse as `logfmt`. | `""` | no | -The `source` field defines the source of data to parse as `logfmt`. -When `source` is missing or empty, the stage parses the log line itself, but it can also be used to parse a previously extracted value. +The `mapping` field is the set of key-value pairs. +The map key defines the name used to extract the data, while the map value is the logfmt field used to populate the value. -This stage uses the [go-logfmt][] unmarshaler, so that numeric or boolean types are unmarshalled into their correct form. -The stage doesn't perform any other type conversions. -If the extracted value is a complex type, it's treated as a string. +The `regex` field is a regular expression. All logfmt fields matching the regular expression are extracted. -[go-logfmt]: https://github.com/go-logfmt/logfmt +The `source` field defines the source of data to parse as `logfmt`. +When `source` is missing or empty, the stage parses the log line itself, but it can also be used to parse a previously extracted value. The following log line and stages demonstrates how this works. diff --git a/internal/component/loki/process/stages/json.go b/internal/component/loki/process/stages/json.go index 9e8bb65db3..3a758f4703 100644 --- a/internal/component/loki/process/stages/json.go +++ b/internal/component/loki/process/stages/json.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "reflect" + "regexp" "github.com/go-kit/log" "github.com/grafana/alloy/internal/runtime/logging/level" @@ -13,32 +14,33 @@ import ( // Config Errors const ( - ErrExpressionsRequired = "JMES expression is required" - ErrCouldNotCompileJMES = "could not compile JMES expression" - ErrEmptyJSONStageConfig = "empty json stage configuration" - ErrEmptyJSONStageSource = "empty source" - ErrMalformedJSON = "malformed json" + ErrExpressionsOrRegexRequired = "JMES expressions or regex is required" + ErrCouldNotCompileJMES = "could not compile JMES expression" + ErrEmptyJSONStageConfig = "empty json stage configuration" + ErrEmptyJSONStageSource = "empty source" + ErrMalformedJSON = "malformed json" ) // JSONConfig represents a JSON Stage configuration type JSONConfig struct { - Expressions map[string]string `alloy:"expressions,attr"` + Expressions map[string]string `alloy:"expressions,attr,optional"` + Regex string `alloy:"regex,attr,optional"` Source *string `alloy:"source,attr,optional"` DropMalformed bool `alloy:"drop_malformed,attr,optional"` } // validateJSONConfig validates a json config and returns a map of necessary jmespath expressions. -func validateJSONConfig(c *JSONConfig) (map[string]jmespath.JMESPath, error) { +func validateJSONConfig(c *JSONConfig) (map[string]jmespath.JMESPath, *regexp.Regexp, error) { if c == nil { - return nil, errors.New(ErrEmptyJSONStageConfig) + return nil, nil, errors.New(ErrEmptyJSONStageConfig) } - if len(c.Expressions) == 0 { - return nil, errors.New(ErrExpressionsRequired) + if len(c.Expressions) == 0 && len(c.Regex) == 0 { + return nil, nil, errors.New(ErrExpressionsOrRegexRequired) } if c.Source != nil && *c.Source == "" { - return nil, errors.New(ErrEmptyJSONStageSource) + return nil, nil, errors.New(ErrEmptyJSONStageSource) } expressions := map[string]jmespath.JMESPath{} @@ -52,28 +54,36 @@ func validateJSONConfig(c *JSONConfig) (map[string]jmespath.JMESPath, error) { } expressions[n], err = jmespath.Compile(jmes) if err != nil { - return nil, fmt.Errorf("%s: %w", ErrCouldNotCompileJMES, err) + return nil, nil, fmt.Errorf("%s: %w", ErrCouldNotCompileJMES, err) } } - return expressions, nil + + re, err := regexp.Compile(c.Regex) + if err != nil { + return nil, nil, err + } + + return expressions, re, nil } // jsonStage sets extracted data using JMESPath expressions type jsonStage struct { cfg *JSONConfig expressions map[string]jmespath.JMESPath + regex regexp.Regexp logger log.Logger } // newJSONStage creates a new json pipeline stage from a config. func newJSONStage(logger log.Logger, cfg JSONConfig) (Stage, error) { - expressions, err := validateJSONConfig(&cfg) + expressions, regex, err := validateJSONConfig(&cfg) if err != nil { return nil, err } return &jsonStage{ cfg: &cfg, expressions: expressions, + regex: *regex, logger: log.With(logger, "component", "stage", "type", "json"), }, nil } @@ -164,6 +174,31 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string extracted[n] = string(jm) } } + if j.regex.String() != "" { + for key, value := range data { + if j.regex.MatchString(key) { + switch value.(type) { + case float64: + extracted[key] = value + case string: + extracted[key] = value + case bool: + extracted[key] = value + case nil: + extracted[key] = nil + default: + jm, err := json.Marshal(value) + if err != nil { + if Debug { + level.Debug(j.logger).Log("msg", "failed to marshal complex type back to string", "err", err) + } + continue + } + extracted[key] = string(jm) + } + } + } + } if Debug { level.Debug(j.logger).Log("msg", "extracted data debug in json stage", "extracted_data", fmt.Sprintf("%v", extracted)) } diff --git a/internal/component/loki/process/stages/json_test.go b/internal/component/loki/process/stages/json_test.go index b8063ee536..bc470cb0b9 100644 --- a/internal/component/loki/process/stages/json_test.go +++ b/internal/component/loki/process/stages/json_test.go @@ -28,9 +28,28 @@ stage.json { stage.json { expressions = { "user" = "" } - source = "extra" + source = "extra" }` +var testJSONAlloyRegex = ` +stage.json { + regex = "pod_.*" +} +` + +var testJSONAlloyRegexAll = ` +stage.json { + regex = ".*" +} +` + +var testJSONAlloyExpressionsAndRegex = ` +stage.json { + expressions = {"out" = "message", "app" = ""} + regex = "(app|duration)" +} +` + var testJSONLogLine = ` { "time":"2012-11-01T22:08:41+00:00", @@ -38,7 +57,7 @@ var testJSONLogLine = ` "component": ["parser","type"], "level" : "WARN", "nested" : {"child":"value"}, - "duration" : 125, + "duration" : 125, "message" : "this is a log line", "extra": "{\"user\":\"marco\"}" } @@ -72,6 +91,37 @@ func TestPipeline_JSON(t *testing.T) { "user": "marco", }, }, + "successfully extract regex values from json": { + testJSONAlloyRegex, + `{"time":"2012-11-01T22:08:41+00:00", "pod_name": "my-pod-123", "pod_label": "my-label"}`, + map[string]interface{}{ + "pod_name": "my-pod-123", + "pod_label": "my-label", + }, + }, + "successfully extract all values from json via regex": { + testJSONAlloyRegexAll, + testJSONLogLine, + map[string]interface{}{ + "time": "2012-11-01T22:08:41+00:00", + "app": "loki", + "component": `["parser","type"]`, + "level": "WARN", + "nested": `{"child":"value"}`, + "duration": float64(125), + "message": "this is a log line", + "extra": "{\"user\":\"marco\"}", + }, + }, + "successfully extract values with expressions and regex from json": { + testJSONAlloyExpressionsAndRegex, + testJSONLogLine, + map[string]interface{}{ + "out": "this is a log line", + "app": "loki", + "duration": float64(125), + }, + }, } for testName, testData := range tests { @@ -132,7 +182,7 @@ func TestJSONConfig_validate(t *testing.T) { "no expressions": { &JSONConfig{}, 0, - errors.New(ErrExpressionsRequired), + errors.New(ErrExpressionsOrRegexRequired), }, "invalid expression": { &JSONConfig{ @@ -180,7 +230,7 @@ func TestJSONConfig_validate(t *testing.T) { for tName, tt := range tests { tt := tt t.Run(tName, func(t *testing.T) { - got, err := validateJSONConfig(tt.config) + got, _, err := validateJSONConfig(tt.config) if tt.err != nil { assert.NotNil(t, err, "JSONConfig.validate() expected error = %v, but got nil", tt.err) } diff --git a/internal/component/loki/process/stages/logfmt.go b/internal/component/loki/process/stages/logfmt.go index 082ced5042..e7cb3f493a 100644 --- a/internal/component/loki/process/stages/logfmt.go +++ b/internal/component/loki/process/stages/logfmt.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "reflect" + "regexp" "strings" "time" @@ -15,26 +16,27 @@ import ( // Config Errors var ( - ErrMappingRequired = errors.New("logfmt mapping is required") + ErrMappingOrRegexRequired = errors.New("logfmt mapping or regex is required") ErrEmptyLogfmtStageConfig = errors.New("empty logfmt stage configuration") ) // LogfmtConfig represents a logfmt Stage configuration type LogfmtConfig struct { - Mapping map[string]string `alloy:"mapping,attr"` + Mapping map[string]string `alloy:"mapping,attr,optional"` Source string `alloy:"source,attr,optional"` + Regex string `alloy:"regex,attr,optional"` } // validateLogfmtConfig validates a logfmt stage config and returns an inverse mapping of configured mapping. // Mapping inverse is done to make lookup easier. The key would be the key from parsed logfmt and // value would be the key with which the data in extracted map would be set. -func validateLogfmtConfig(c *LogfmtConfig) (map[string]string, error) { +func validateLogfmtConfig(c *LogfmtConfig) (map[string]string, *regexp.Regexp, error) { if c == nil { - return nil, ErrEmptyLogfmtStageConfig + return nil, nil, ErrEmptyLogfmtStageConfig } - if len(c.Mapping) == 0 { - return nil, ErrMappingRequired + if len(c.Mapping) == 0 && len(c.Regex) == 0 { + return nil, nil, ErrMappingOrRegexRequired } inverseMapping := make(map[string]string) @@ -46,13 +48,19 @@ func validateLogfmtConfig(c *LogfmtConfig) (map[string]string, error) { inverseMapping[v] = k } - return inverseMapping, nil + re, err := regexp.Compile(c.Regex) + if err != nil { + return nil, nil, err + } + + return inverseMapping, re, nil } // logfmtStage sets extracted data using logfmt parser type logfmtStage struct { cfg *LogfmtConfig inverseMapping map[string]string + regex regexp.Regexp logger log.Logger } @@ -60,7 +68,7 @@ type logfmtStage struct { func newLogfmtStage(logger log.Logger, config LogfmtConfig) (Stage, error) { // inverseMapping would hold the mapping in inverse which would make lookup easier. // To explain it simply, the key would be the key from parsed logfmt and value would be the key with which the data in extracted map would be set. - inverseMapping, err := validateLogfmtConfig(&config) + inverseMapping, regex, err := validateLogfmtConfig(&config) if err != nil { return nil, err } @@ -68,6 +76,7 @@ func newLogfmtStage(logger log.Logger, config LogfmtConfig) (Stage, error) { return toStage(&logfmtStage{ cfg: &config, inverseMapping: inverseMapping, + regex: *regex, logger: log.With(logger, "component", "stage", "type", "logfmt"), }), nil } @@ -98,13 +107,22 @@ func (j *logfmtStage) Process(labels model.LabelSet, extracted map[string]interf return } decoder := logfmt.NewDecoder(strings.NewReader(*input)) - extractedEntriesCount := 0 + mappingExtractedEntriesCount := 0 + regexExtractedEntriesCount := 0 for decoder.ScanRecord() { for decoder.ScanKeyval() { + // handle "mapping" mapKey, ok := j.inverseMapping[string(decoder.Key())] if ok { extracted[mapKey] = string(decoder.Value()) - extractedEntriesCount++ + mappingExtractedEntriesCount++ + } else if j.regex.String() != "" { + // handle "regex" + fmt.Println(j.regex.String(), string(decoder.Key())) + if j.regex.MatchString(string(decoder.Key())) { + extracted[string(decoder.Key())] = string(decoder.Value()) + regexExtractedEntriesCount++ + } } } } @@ -114,8 +132,11 @@ func (j *logfmtStage) Process(labels model.LabelSet, extracted map[string]interf return } - if extractedEntriesCount != len(j.inverseMapping) { - level.Debug(j.logger).Log("msg", fmt.Sprintf("found only %d out of %d configured mappings in logfmt stage", extractedEntriesCount, len(j.inverseMapping))) + if mappingExtractedEntriesCount != len(j.inverseMapping) { + level.Debug(j.logger).Log("msg", fmt.Sprintf("found only %d out of %d configured mappings in logfmt stage", mappingExtractedEntriesCount, len(j.inverseMapping))) + } + if regexExtractedEntriesCount > 0 { + level.Debug(j.logger).Log("msg", fmt.Sprintf("found %d mappings via regex in logfmt stage", regexExtractedEntriesCount)) } level.Debug(j.logger).Log("msg", "extracted data debug in logfmt stage", "extracted data", fmt.Sprintf("%v", extracted)) } diff --git a/internal/component/loki/process/stages/logfmt_test.go b/internal/component/loki/process/stages/logfmt_test.go index d18c38036f..2d9c25c09c 100644 --- a/internal/component/loki/process/stages/logfmt_test.go +++ b/internal/component/loki/process/stages/logfmt_test.go @@ -26,6 +26,25 @@ stage.logfmt { source = "extra" }` +var testLogfmtAlloyRegex = ` +stage.logfmt { + regex = "pod_.*" +} +` + +var testLogfmtAlloyRegexAll = ` +stage.logfmt { + regex = ".*" +} +` + +var testLogfmtAlloyRegexAndMapping = ` +stage.logfmt { + mapping = { "out" = "message", "app" = ""} + regex = "(app|duration)" +} +` + func TestLogfmt(t *testing.T) { var testLogfmtLogLine = ` time=2012-11-01T22:08:41+00:00 app=loki level=WARN duration=125 message="this is a log line" extra="user=foo"" @@ -54,6 +73,35 @@ func TestLogfmt(t *testing.T) { "user": "foo", }, }, + "successfully extract regex values from logfmt": { + testLogfmtAlloyRegex, + `time=2012-11-01T22:08:41+00:00 pod_name=my-pod-123 pod_label=my-label`, + map[string]interface{}{ + "pod_name": "my-pod-123", + "pod_label": "my-label", + }, + }, + "successfully extract all values via regex from logfmt": { + testLogfmtAlloyRegexAll, + testLogfmtLogLine, + map[string]interface{}{ + "time": "2012-11-01T22:08:41+00:00", + "app": "loki", + "level": "WARN", + "duration": "125", + "message": "this is a log line", + "extra": "user=foo", + }, + }, + "successfully extract values with expressions and regex from logfmt": { + testLogfmtAlloyRegexAndMapping, + testLogfmtLogLine, + map[string]interface{}{ + "out": "this is a log line", + "app": "loki", + "duration": "125", + }, + }, } for testName, testData := range tests { @@ -81,7 +129,7 @@ func TestLogfmtConfigValidation(t *testing.T) { "no mapping": { LogfmtConfig{}, 0, - ErrMappingRequired, + ErrMappingOrRegexRequired, }, "valid without source": { LogfmtConfig{ @@ -108,7 +156,7 @@ func TestLogfmtConfigValidation(t *testing.T) { for tName, tt := range tests { tt := tt t.Run(tName, func(t *testing.T) { - got, err := validateLogfmtConfig(&tt.config) + got, _, err := validateLogfmtConfig(&tt.config) if tt.err != nil { assert.EqualError(t, err, tt.err.Error()) } else {