Skip to content

Commit

Permalink
feat(main): Add possibility to concurrently spawn pipelines using -j
Browse files Browse the repository at this point in the history
  • Loading branch information
debugloop committed Nov 28, 2024
1 parent 02e9b61 commit a617f3f
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 39 deletions.
45 changes: 24 additions & 21 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,26 @@ import (
"os"
"os/signal"
"plugin"
"runtime"
"strings"

"github.com/bwNetFlow/flowpipeline/pipeline"
"github.com/hashicorp/logutils"

"github.com/bwNetFlow/flowpipeline/pipeline"
_ "github.com/bwNetFlow/flowpipeline/segments/alert/http"

_ "github.com/bwNetFlow/flowpipeline/segments/analysis/toptalkers_metrics"
_ "github.com/bwNetFlow/flowpipeline/segments/controlflow/branch"

_ "github.com/bwNetFlow/flowpipeline/segments/export/clickhouse"
_ "github.com/bwNetFlow/flowpipeline/segments/export/influx"
_ "github.com/bwNetFlow/flowpipeline/segments/export/prometheus"

_ "github.com/bwNetFlow/flowpipeline/segments/filter/drop"
_ "github.com/bwNetFlow/flowpipeline/segments/filter/elephant"

_ "github.com/bwNetFlow/flowpipeline/segments/filter/flowfilter"

_ "github.com/bwNetFlow/flowpipeline/segments/input/bpf"
_ "github.com/bwNetFlow/flowpipeline/segments/input/goflow"
_ "github.com/bwNetFlow/flowpipeline/segments/input/kafkaconsumer"
_ "github.com/bwNetFlow/flowpipeline/segments/input/packet"
_ "github.com/bwNetFlow/flowpipeline/segments/input/stdin"

_ "github.com/bwNetFlow/flowpipeline/segments/modify/addcid"
_ "github.com/bwNetFlow/flowpipeline/segments/modify/addrstrings"
_ "github.com/bwNetFlow/flowpipeline/segments/modify/anonymize"
Expand All @@ -48,21 +44,16 @@ import (
_ "github.com/bwNetFlow/flowpipeline/segments/modify/remoteaddress"
_ "github.com/bwNetFlow/flowpipeline/segments/modify/reversedns"
_ "github.com/bwNetFlow/flowpipeline/segments/modify/snmp"

_ "github.com/bwNetFlow/flowpipeline/segments/pass"

_ "github.com/bwNetFlow/flowpipeline/segments/output/csv"
_ "github.com/bwNetFlow/flowpipeline/segments/output/json"
_ "github.com/bwNetFlow/flowpipeline/segments/output/kafkaproducer"
_ "github.com/bwNetFlow/flowpipeline/segments/output/lumberjack"
_ "github.com/bwNetFlow/flowpipeline/segments/output/sqlite"

_ "github.com/bwNetFlow/flowpipeline/segments/pass"
_ "github.com/bwNetFlow/flowpipeline/segments/print/count"
_ "github.com/bwNetFlow/flowpipeline/segments/print/printdots"
_ "github.com/bwNetFlow/flowpipeline/segments/print/printflowdump"
_ "github.com/bwNetFlow/flowpipeline/segments/print/toptalkers"

_ "github.com/bwNetFlow/flowpipeline/segments/analysis/toptalkers_metrics"
)

var Version string
Expand All @@ -80,10 +71,11 @@ func (i *flagArray) Set(value string) error {

func main() {
var pluginPaths flagArray
flag.Var(&pluginPaths, "p", "path to load segment plugins from, can be specified multiple times")
loglevel := flag.String("l", "warning", "loglevel: one of 'debug', 'info', 'warning' or 'error'")
flag.Var(&pluginPaths, "p", "Path to load segment plugins from, can be specified multiple times.")
loglevel := flag.String("l", "warning", "Loglevel: one of 'debug', 'info', 'warning' or 'error'.")
version := flag.Bool("v", false, "print version")
configfile := flag.String("c", "config.yml", "location of the config file in yml format")
concurrency := flag.Uint("j", 1, "How many concurrent pipelines to spawn. Set to 0 to enable automatic setting according to GOMAXPROCS. Only the default value 1 guarantees a stable order of the flows in and out of flowpipeline.")
configfile := flag.String("c", "config.yml", "Location of the config file in yml format.")
flag.Parse()

if *version {
Expand Down Expand Up @@ -116,13 +108,24 @@ func main() {
log.Printf("[error] reading config file: %s", err)
return
}
pipe := pipeline.NewFromConfig(config)
pipe.Start()
pipe.AutoDrain()

pipelineCount := 1
if *concurrency == 0 {
pipelineCount = runtime.GOMAXPROCS(0)
} else {
pipelineCount = int(*concurrency)
}

segmentReprs := pipeline.SegmentReprsFromConfig(config)
for i := 0; i < pipelineCount; i++ {
segments := pipeline.SegmentsFromRepr(segmentReprs)
pipe := pipeline.New(segments...)
pipe.Start()
pipe.AutoDrain()
defer pipe.Close()
}

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt)
<-sigs

pipe.Close()
}
41 changes: 23 additions & 18 deletions pipeline/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,13 @@ import (
"os"
"strconv"

"gopkg.in/yaml.v2"

"github.com/bwNetFlow/flowpipeline/segments"
"github.com/bwNetFlow/flowpipeline/segments/controlflow/branch"
"gopkg.in/yaml.v2"
)

// A config representation of a segment. It is intended to look like this:
// - segment: pass
// config:
// key: value
// foo: bar
// This struct has the appropriate yaml tags inline.
// A config representation of a segment.
type SegmentRepr struct {
Name string `yaml:"segment"` // to be looked up with a registry
Config map[string]string `yaml:"config"` // to be expanded by our instance
Expand Down Expand Up @@ -51,33 +47,42 @@ func (s *SegmentRepr) ExpandedConfig() map[string]string {
// initializes a Pipeline with them.
func NewFromConfig(config []byte) *Pipeline {
// parse a list of SegmentReprs from yaml
segmentReprs := new([]SegmentRepr)
segmentReprs := SegmentReprsFromConfig(config)

// build segments from it
segments := SegmentsFromRepr(segmentReprs)

// we have Segments parsed and ready, instantiate them as actual pipeline
return New(segments...)
}

// SegmentReprsFromConfig returns a list of segment representation objects from a config.
func SegmentReprsFromConfig(config []byte) []SegmentRepr {
// parse a list of SegmentReprs from yaml
segmentReprs := []SegmentRepr{}

err := yaml.Unmarshal(config, &segmentReprs)
if err != nil {
log.Fatalf("[error] Error parsing configuration YAML: %v", err)
}

segments := SegmentsFromRepr(segmentReprs)

// we have SegmentReprs parsed, instanciate them as actual Segments
return New(segments...)
return segmentReprs
}

// Creates a list of Segments from their config representations. Handles
// recursive definitions found in Segments.
func SegmentsFromRepr(segmentReprs *[]SegmentRepr) []segments.Segment {
segmentList := make([]segments.Segment, len(*segmentReprs))
for i, segmentrepr := range *segmentReprs {
func SegmentsFromRepr(segmentReprs []SegmentRepr) []segments.Segment {
segmentList := make([]segments.Segment, len(segmentReprs))
for i, segmentrepr := range segmentReprs {
segmentTemplate := segments.LookupSegment(segmentrepr.Name) // a typed nil instance
// the Segment's New method knows how to handle our config
segment := segmentTemplate.New(segmentrepr.ExpandedConfig())
switch segment := segment.(type) { // handle special segments
case *branch.Branch:
segment.ImportBranches(
New(SegmentsFromRepr(&segmentrepr.If)...),
New(SegmentsFromRepr(&segmentrepr.Then)...),
New(SegmentsFromRepr(&segmentrepr.Else)...),
New(SegmentsFromRepr(segmentrepr.If)...),
New(SegmentsFromRepr(segmentrepr.Then)...),
New(SegmentsFromRepr(segmentrepr.Else)...),
)
}
if segment != nil {
Expand Down

0 comments on commit a617f3f

Please sign in to comment.