Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add diskbuffer segment. #78

Merged
merged 2 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,32 @@ The `eofcloses` parameter can therefore be used to gracefully terminate the pipe
[godoc](https://pkg.go.dev/github.com/bwNetFlow/flowpipeline/segments/input/stdin)
[examples using this segment](https://github.com/search?q=%22segment%3A+stdin%22+extension%3Ayml+repo%3AbwNetFlow%2Fflowpipeline%2Fexamples&type=Code)

#### diskbuffer

The `diskbuffer` segment buffers flows in memory and on-demand on disk.
Writing to disk is done in the JSON representation of the flows, compressed using `zstd`.
The flows are written to disk, when the MemoryBuffer reaches the percentual fill level HighMemoryMark,
until the LowMemoryMark is reached again.
Files are read from disk if the fill level reaches ReadingMemoryMark.
The maximum file size and the maximum size on disk are configurable via the `filesize` and `maxcachesize`
parameter.
If QueueStatusInterval is greater 0s, the fill level is printed.
BatchSize specifies how many flows will be at least written to disk

```yaml
- segment: diskbuffer
config:
bufferdir: "" # must be specified, rest is optional
batchsize: 128
queuestatusinterval: 0s
filesize: 50 MB
highmemorymark: 70
lowmemorymark: 30
readingmemorymark: 5
maxcachesize: 1 GB
queuesize: 65536
```

### Modify Group
Segments in this group modify flows in some way. Generally, these segments do
not drop flows unless specifically instructed and only change fields within
Expand Down
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
_ "github.com/bwNetFlow/flowpipeline/segments/input/kafkaconsumer"
_ "github.com/bwNetFlow/flowpipeline/segments/input/packet"
_ "github.com/bwNetFlow/flowpipeline/segments/input/stdin"
_ "github.com/bwNetFlow/flowpipeline/segments/input/diskbuffer"

_ "github.com/bwNetFlow/flowpipeline/segments/modify/addcid"
_ "github.com/bwNetFlow/flowpipeline/segments/modify/addrstrings"
Expand Down Expand Up @@ -63,6 +64,8 @@ import (
_ "github.com/bwNetFlow/flowpipeline/segments/print/toptalkers"

_ "github.com/bwNetFlow/flowpipeline/segments/analysis/toptalkers_metrics"

_ "github.com/bwNetFlow/flowpipeline/segments/dev/filegate"
)

var Version string
Expand Down
62 changes: 62 additions & 0 deletions segments/dev/filegate/filegate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Serves as a template for new segments and forwards flows, otherwise does
// nothing.
package filegate

import (
"sync"
"log"
"errors"
"os"
"time"
"github.com/bwNetFlow/flowpipeline/segments"
)

type Filegate struct {
segments.BaseSegment // always embed this, no need to repeat I/O chan code
filename string
}

// Every Segment must implement a New method, even if there isn't any config
// it is interested in.
func (segment *Filegate) New(config map[string]string) segments.Segment {
var()
if config["filename"] != "" {
segment.filename = config["filename"]
log.Printf("[info] Filegate: gate file is %s", segment.filename)
} else {
log.Fatalf("[error] Filegate: No filename config option")
}
// do config stuff here, add it to fields maybe
return segment
}

func checkFileExists(filename string) bool {
log.Printf("[debug] Filegate: check if filename %s exists", filename)
_, err := os.Stat(filename)
if errors.Is(err, os.ErrNotExist) {
return false
}
return true
}

func (segment *Filegate) Run(wg *sync.WaitGroup) {
defer func() {
// This defer clause is important and needs to be present in
// any Segment.Run method in some form, but with at least the
// following two statements.
close(segment.Out)
wg.Done()
}()
for msg := range segment.In {
for checkFileExists(segment.filename) {
log.Printf("[info] Filegate: gate file %s exists", segment.filename)
time.Sleep(2 * time.Second)
}
segment.Out <- msg
}
}

func init() {
segment := &Filegate{}
segments.RegisterSegment("filegate", segment)
}
Loading