Skip to content

Commit

Permalink
Handle multiline processors (#2063)
Browse files Browse the repository at this point in the history
  • Loading branch information
haetamoudi authored Sep 20, 2024
1 parent 1801fd1 commit ee40f69
Show file tree
Hide file tree
Showing 3 changed files with 563 additions and 62 deletions.
91 changes: 80 additions & 11 deletions internal/elasticsearch/ingest/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package ingest

import (
"bufio"
"bytes"
"fmt"

"gopkg.in/yaml.v3"
Expand Down Expand Up @@ -50,14 +52,15 @@ func (p Pipeline) OriginalProcessors() (procs []Processor, err error) {
return procs, nil
}

// extract a list of processors from a pipeline definition in YAML format.
// processorsFromYAML extracts a list of processors from a pipeline definition in YAML format.
func processorsFromYAML(content []byte) (procs []Processor, err error) {
var p struct {
Processors []yaml.Node
}
if err = yaml.Unmarshal(content, &p); err != nil {
return nil, err
}

for idx, entry := range p.Processors {
if entry.Kind != yaml.MappingNode || len(entry.Content) != 2 {
return nil, fmt.Errorf("processor#%d is not a single-key map (kind:%v content:%d)", idx, entry.Kind, len(entry.Content))
Expand All @@ -70,22 +73,88 @@ func processorsFromYAML(content []byte) (procs []Processor, err error) {
return nil, fmt.Errorf("error decoding processor#%d type: %w", idx, err)
}
proc.FirstLine = entry.Line
proc.LastLine = lastLine(&entry)
lastLine, err := getProcessorLastLine(idx, p.Processors, proc, content)
if err != nil {
return nil, err
}
proc.LastLine = lastLine

procs = append(procs, proc)
}
return procs, nil
return procs, err
}

// returns the last (greater) line number used by a yaml.Node.
func lastLine(node *yaml.Node) int {
// getProcessorLastLine determines the last line number for the given processor.
func getProcessorLastLine(idx int, processors []yaml.Node, currentProcessor Processor, content []byte) (int, error) {
if idx < len(processors)-1 {
var endProcessor = processors[idx+1].Line - 1
if endProcessor < currentProcessor.FirstLine {
return currentProcessor.FirstLine, nil
} else {
return processors[idx+1].Line - 1, nil
}
}

return nextProcessorOrEndOfPipeline(content)
}

// nextProcessorOrEndOfPipeline get the line before the node after the processors node. If there is none, it returns the end of file line
func nextProcessorOrEndOfPipeline(content []byte) (int, error) {
var root yaml.Node
if err := yaml.Unmarshal(content, &root); err != nil {
return 0, fmt.Errorf("error unmarshaling YAML: %v", err)
}

var nodes []*yaml.Node
extractNodesFromMapping(&root, &nodes)
for i, node := range nodes {

if node.Value == "processors" {
if i < len(nodes)-1 {

return nodes[i+1].Line - 1, nil
}
}

}
return countLinesInBytes(content)
}

// extractNodesFromMapping recursively extracts all nodes from MappingNodes within DocumentNodes.
func extractNodesFromMapping(node *yaml.Node, nodes *[]*yaml.Node) {
if node == nil {
return 0
return
}
last := node.Line
for _, inner := range node.Content {
if line := lastLine(inner); line > last {
last = line

if node.Kind == yaml.DocumentNode {
for _, child := range node.Content {
extractNodesFromMapping(child, nodes)
}
return
}

if node.Kind == yaml.MappingNode {
for _, child := range node.Content {
if child.Kind == yaml.MappingNode || child.Kind == yaml.ScalarNode {
*nodes = append(*nodes, child)
}
extractNodesFromMapping(child, nodes)
}
}
return last
}

// countLinesInBytes counts the number of lines in the given byte slice.
func countLinesInBytes(data []byte) (int, error) {
scanner := bufio.NewScanner(bytes.NewReader(data))
lineCount := 0

for scanner.Scan() {
lineCount++
}

if err := scanner.Err(); err != nil {
return 0, fmt.Errorf("error reading data: %w", err)
}

return lineCount, nil
}
Loading

0 comments on commit ee40f69

Please sign in to comment.