Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ Main (unreleased)

- Add htpasswd file based authentication for `otelcol.auth.basic` (@pkarakal)

- Add a `regex` argument to the `logfmt` and `json` stages in `loki.process` to extract log fields matching a regular expression. (@timonegk)

### Enhancements

- update promtail converter to use `file_match` block for `loki.source.file` instead of going through `local.file_match`. (@kalleep)
Expand Down
27 changes: 15 additions & 12 deletions docs/sources/reference/components/loki/loki.process.md
Original file line number Diff line number Diff line change
Expand Up @@ -496,12 +496,15 @@ The following arguments are supported:

| Name | Type | Description | Default | Required |
| ---------------- | ------------- | ----------------------------------------------------- | ------- | -------- |
| `expressions` | `map(string)` | Key-value pairs of JMESPath expressions. | | yes |
| `expressions` | `map(string)` | Key-value pairs of JMESPath expressions. | | no |
| `regex` | `string` | Regular expression matched against JSON keys. | | no |
| `drop_malformed` | `bool` | Drop lines whose input can't be parsed as valid JSON. | `false` | no |
| `source` | `string` | Source of the data to parse as JSON. | `""` | no |

The `expressions` field is the set of key-value pairs of JMESPath expressions to run.
The map key defines the name with which the data is extracted, while the map value is the expression used to populate the value.
The map key defines the name used to extract the data, while the map value is the expression used to populate the value.

The `regex` field is a regular expression. All keys in the JSON source matching the regular expression are extracted.

When configuring a JSON stage, the `source` field defines the source of data to parse as JSON.
By default, this is the log line itself, but it can also be a previously extracted value.
Expand Down Expand Up @@ -653,19 +656,19 @@ The `stage.logfmt` inner block configures a processing stage that reads incoming

The following arguments are supported:

| Name | Type | Description | Default | Required |
| --------- | ------------- | ---------------------------------------------- | ------- | -------- |
| `mapping` | `map(string)` | Key-value pairs of `logmft` fields to extract. | | yes |
| `source` | `string` | Source of the data to parse as `logfmt`. | `""` | no |
| Name | Type | Description | Default | Required |
| --------- | ------------- | ----------------------------------------------- | ------- | -------- |
| `mapping` | `map(string)` | Key-value pairs of `logmft` fields to extract. | | no |
| `regex` | `string` | Regular expression matched against logfmt keys. | | no |
| `source` | `string` | Source of the data to parse as `logfmt`. | `""` | no |

The `source` field defines the source of data to parse as `logfmt`.
When `source` is missing or empty, the stage parses the log line itself, but it can also be used to parse a previously extracted value.
The `mapping` field is the set of key-value pairs.
The map key defines the name used to extract the data, while the map value is the logfmt field used to populate the value.

This stage uses the [go-logfmt][] unmarshaler, so that numeric or boolean types are unmarshalled into their correct form.
The stage doesn't perform any other type conversions.
If the extracted value is a complex type, it's treated as a string.
The `regex` field is a regular expression. All logfmt fields matching the regular expression are extracted.

[go-logfmt]: https://github.com/go-logfmt/logfmt
The `source` field defines the source of data to parse as `logfmt`.
When `source` is missing or empty, the stage parses the log line itself, but it can also be used to parse a previously extracted value.

The following log line and stages demonstrates how this works.

Expand Down
63 changes: 49 additions & 14 deletions internal/component/loki/process/stages/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"reflect"
"regexp"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/runtime/logging/level"
Expand All @@ -13,32 +14,33 @@ import (

// Config Errors
const (
ErrExpressionsRequired = "JMES expression is required"
ErrCouldNotCompileJMES = "could not compile JMES expression"
ErrEmptyJSONStageConfig = "empty json stage configuration"
ErrEmptyJSONStageSource = "empty source"
ErrMalformedJSON = "malformed json"
ErrExpressionsOrRegexRequired = "JMES expressions or regex is required"
ErrCouldNotCompileJMES = "could not compile JMES expression"
ErrEmptyJSONStageConfig = "empty json stage configuration"
ErrEmptyJSONStageSource = "empty source"
ErrMalformedJSON = "malformed json"
)

// JSONConfig represents a JSON Stage configuration
type JSONConfig struct {
Expressions map[string]string `alloy:"expressions,attr"`
Expressions map[string]string `alloy:"expressions,attr,optional"`
Regex string `alloy:"regex,attr,optional"`
Source *string `alloy:"source,attr,optional"`
DropMalformed bool `alloy:"drop_malformed,attr,optional"`
}

// validateJSONConfig validates a json config and returns a map of necessary jmespath expressions.
func validateJSONConfig(c *JSONConfig) (map[string]jmespath.JMESPath, error) {
func validateJSONConfig(c *JSONConfig) (map[string]jmespath.JMESPath, *regexp.Regexp, error) {
if c == nil {
return nil, errors.New(ErrEmptyJSONStageConfig)
return nil, nil, errors.New(ErrEmptyJSONStageConfig)
}

if len(c.Expressions) == 0 {
return nil, errors.New(ErrExpressionsRequired)
if len(c.Expressions) == 0 && len(c.Regex) == 0 {
return nil, nil, errors.New(ErrExpressionsOrRegexRequired)
}

if c.Source != nil && *c.Source == "" {
return nil, errors.New(ErrEmptyJSONStageSource)
return nil, nil, errors.New(ErrEmptyJSONStageSource)
}

expressions := map[string]jmespath.JMESPath{}
Expand All @@ -52,28 +54,36 @@ func validateJSONConfig(c *JSONConfig) (map[string]jmespath.JMESPath, error) {
}
expressions[n], err = jmespath.Compile(jmes)
if err != nil {
return nil, fmt.Errorf("%s: %w", ErrCouldNotCompileJMES, err)
return nil, nil, fmt.Errorf("%s: %w", ErrCouldNotCompileJMES, err)
}
}
return expressions, nil

re, err := regexp.Compile(c.Regex)
if err != nil {
return nil, nil, err
}

return expressions, re, nil
}

// jsonStage sets extracted data using JMESPath expressions
type jsonStage struct {
cfg *JSONConfig
expressions map[string]jmespath.JMESPath
regex regexp.Regexp
logger log.Logger
}

// newJSONStage creates a new json pipeline stage from a config.
func newJSONStage(logger log.Logger, cfg JSONConfig) (Stage, error) {
expressions, err := validateJSONConfig(&cfg)
expressions, regex, err := validateJSONConfig(&cfg)
if err != nil {
return nil, err
}
return &jsonStage{
cfg: &cfg,
expressions: expressions,
regex: *regex,
logger: log.With(logger, "component", "stage", "type", "json"),
}, nil
}
Expand Down Expand Up @@ -164,6 +174,31 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string
extracted[n] = string(jm)
}
}
if j.regex.String() != "" {
for key, value := range data {
if j.regex.MatchString(key) {
switch value.(type) {
case float64:
extracted[key] = value
case string:
extracted[key] = value
case bool:
extracted[key] = value
case nil:
extracted[key] = nil
default:
jm, err := json.Marshal(value)
if err != nil {
if Debug {
level.Debug(j.logger).Log("msg", "failed to marshal complex type back to string", "err", err)
}
continue
}
extracted[key] = string(jm)
}
}
}
}
if Debug {
level.Debug(j.logger).Log("msg", "extracted data debug in json stage", "extracted_data", fmt.Sprintf("%v", extracted))
}
Expand Down
58 changes: 54 additions & 4 deletions internal/component/loki/process/stages/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,36 @@ stage.json {

stage.json {
expressions = { "user" = "" }
source = "extra"
source = "extra"
}`

var testJSONAlloyRegex = `
stage.json {
regex = "pod_.*"
}
`

var testJSONAlloyRegexAll = `
stage.json {
regex = ".*"
}
`

var testJSONAlloyExpressionsAndRegex = `
stage.json {
expressions = {"out" = "message", "app" = ""}
regex = "(app|duration)"
}
`

var testJSONLogLine = `
{
"time":"2012-11-01T22:08:41+00:00",
"app":"loki",
"component": ["parser","type"],
"level" : "WARN",
"nested" : {"child":"value"},
"duration" : 125,
"duration" : 125,
"message" : "this is a log line",
"extra": "{\"user\":\"marco\"}"
}
Expand Down Expand Up @@ -72,6 +91,37 @@ func TestPipeline_JSON(t *testing.T) {
"user": "marco",
},
},
"successfully extract regex values from json": {
testJSONAlloyRegex,
`{"time":"2012-11-01T22:08:41+00:00", "pod_name": "my-pod-123", "pod_label": "my-label"}`,
map[string]interface{}{
"pod_name": "my-pod-123",
"pod_label": "my-label",
},
},
"successfully extract all values from json via regex": {
testJSONAlloyRegexAll,
testJSONLogLine,
map[string]interface{}{
"time": "2012-11-01T22:08:41+00:00",
"app": "loki",
"component": `["parser","type"]`,
"level": "WARN",
"nested": `{"child":"value"}`,
"duration": float64(125),
"message": "this is a log line",
"extra": "{\"user\":\"marco\"}",
},
},
"successfully extract values with expressions and regex from json": {
testJSONAlloyExpressionsAndRegex,
testJSONLogLine,
map[string]interface{}{
"out": "this is a log line",
"app": "loki",
"duration": float64(125),
},
},
}

for testName, testData := range tests {
Expand Down Expand Up @@ -132,7 +182,7 @@ func TestJSONConfig_validate(t *testing.T) {
"no expressions": {
&JSONConfig{},
0,
errors.New(ErrExpressionsRequired),
errors.New(ErrExpressionsOrRegexRequired),
},
"invalid expression": {
&JSONConfig{
Expand Down Expand Up @@ -180,7 +230,7 @@ func TestJSONConfig_validate(t *testing.T) {
for tName, tt := range tests {
tt := tt
t.Run(tName, func(t *testing.T) {
got, err := validateJSONConfig(tt.config)
got, _, err := validateJSONConfig(tt.config)
if tt.err != nil {
assert.NotNil(t, err, "JSONConfig.validate() expected error = %v, but got nil", tt.err)
}
Expand Down
45 changes: 33 additions & 12 deletions internal/component/loki/process/stages/logfmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"reflect"
"regexp"
"strings"
"time"

Expand All @@ -15,26 +16,27 @@ import (

// Config Errors
var (
ErrMappingRequired = errors.New("logfmt mapping is required")
ErrMappingOrRegexRequired = errors.New("logfmt mapping or regex is required")
ErrEmptyLogfmtStageConfig = errors.New("empty logfmt stage configuration")
)

// LogfmtConfig represents a logfmt Stage configuration
type LogfmtConfig struct {
Mapping map[string]string `alloy:"mapping,attr"`
Mapping map[string]string `alloy:"mapping,attr,optional"`
Source string `alloy:"source,attr,optional"`
Regex string `alloy:"regex,attr,optional"`
}

// validateLogfmtConfig validates a logfmt stage config and returns an inverse mapping of configured mapping.
// Mapping inverse is done to make lookup easier. The key would be the key from parsed logfmt and
// value would be the key with which the data in extracted map would be set.
func validateLogfmtConfig(c *LogfmtConfig) (map[string]string, error) {
func validateLogfmtConfig(c *LogfmtConfig) (map[string]string, *regexp.Regexp, error) {
if c == nil {
return nil, ErrEmptyLogfmtStageConfig
return nil, nil, ErrEmptyLogfmtStageConfig
}

if len(c.Mapping) == 0 {
return nil, ErrMappingRequired
if len(c.Mapping) == 0 && len(c.Regex) == 0 {
return nil, nil, ErrMappingOrRegexRequired
}

inverseMapping := make(map[string]string)
Expand All @@ -46,28 +48,35 @@ func validateLogfmtConfig(c *LogfmtConfig) (map[string]string, error) {
inverseMapping[v] = k
}

return inverseMapping, nil
re, err := regexp.Compile(c.Regex)
if err != nil {
return nil, nil, err
}

return inverseMapping, re, nil
}

// logfmtStage sets extracted data using logfmt parser
type logfmtStage struct {
cfg *LogfmtConfig
inverseMapping map[string]string
regex regexp.Regexp
logger log.Logger
}

// newLogfmtStage creates a new logfmt pipeline stage from a config.
func newLogfmtStage(logger log.Logger, config LogfmtConfig) (Stage, error) {
// inverseMapping would hold the mapping in inverse which would make lookup easier.
// To explain it simply, the key would be the key from parsed logfmt and value would be the key with which the data in extracted map would be set.
inverseMapping, err := validateLogfmtConfig(&config)
inverseMapping, regex, err := validateLogfmtConfig(&config)
if err != nil {
return nil, err
}

return toStage(&logfmtStage{
cfg: &config,
inverseMapping: inverseMapping,
regex: *regex,
logger: log.With(logger, "component", "stage", "type", "logfmt"),
}), nil
}
Expand Down Expand Up @@ -98,13 +107,22 @@ func (j *logfmtStage) Process(labels model.LabelSet, extracted map[string]interf
return
}
decoder := logfmt.NewDecoder(strings.NewReader(*input))
extractedEntriesCount := 0
mappingExtractedEntriesCount := 0
regexExtractedEntriesCount := 0
for decoder.ScanRecord() {
for decoder.ScanKeyval() {
// handle "mapping"
mapKey, ok := j.inverseMapping[string(decoder.Key())]
if ok {
extracted[mapKey] = string(decoder.Value())
extractedEntriesCount++
mappingExtractedEntriesCount++
} else if j.regex.String() != "" {
// handle "regex"
fmt.Println(j.regex.String(), string(decoder.Key()))
if j.regex.MatchString(string(decoder.Key())) {
extracted[string(decoder.Key())] = string(decoder.Value())
regexExtractedEntriesCount++
}
}
}
}
Expand All @@ -114,8 +132,11 @@ func (j *logfmtStage) Process(labels model.LabelSet, extracted map[string]interf
return
}

if extractedEntriesCount != len(j.inverseMapping) {
level.Debug(j.logger).Log("msg", fmt.Sprintf("found only %d out of %d configured mappings in logfmt stage", extractedEntriesCount, len(j.inverseMapping)))
if mappingExtractedEntriesCount != len(j.inverseMapping) {
level.Debug(j.logger).Log("msg", fmt.Sprintf("found only %d out of %d configured mappings in logfmt stage", mappingExtractedEntriesCount, len(j.inverseMapping)))
}
if regexExtractedEntriesCount > 0 {
level.Debug(j.logger).Log("msg", fmt.Sprintf("found %d mappings via regex in logfmt stage", regexExtractedEntriesCount))
}
level.Debug(j.logger).Log("msg", "extracted data debug in logfmt stage", "extracted data", fmt.Sprintf("%v", extracted))
}
Expand Down
Loading