From 66562c2e5f518cb6b763d182ee09bacda42c9e36 Mon Sep 17 00:00:00 2001 From: 9er Date: Fri, 24 Feb 2023 11:57:18 +0100 Subject: [PATCH] new segment: analysis/toptalkers-metrics (#66) * added first working version of toptalkers_metrics segment * moved ticker from records to DB, records are now tracking if they are elegible for export, cleanup runs as separate goroutine, bucket duration is now configurable * forward flows based on traffic levels * added a metric for the DB size after cleanup * defined the segment as a filter segment, so drops can be used in the pipeline * added documentation and example for toptalkers-metrics * go mod tidy * go mod tidy * go version bumped from 1.18 to 1.20 * fuck yaml * updated Dockerfile to go 1.20 * updated dependencies --------- Co-authored-by: Sebastian Neuner --- .github/workflows/master.yml | 2 +- .github/workflows/release.yml | 2 +- CONFIGURATION.md | 49 ++ Dockerfile | 2 +- examples/analysis/toptalkers-metrics.yml | 35 ++ go.mod | 70 +-- go.sum | 89 +++ main.go | 2 + .../toptalkers_metrics/toptalkers_metrics.go | 519 ++++++++++++++++++ 9 files changed, 734 insertions(+), 36 deletions(-) create mode 100644 examples/analysis/toptalkers-metrics.yml create mode 100644 segments/analysis/toptalkers_metrics/toptalkers_metrics.go diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index bd2b1e9..b2fceef 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -27,7 +27,7 @@ jobs: - name: setup go uses: actions/setup-go@v3 with: - go-version: 1.18 + go-version: '1.20' - name: test run: go test ./... diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index fc0aed3..1804152 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -25,7 +25,7 @@ jobs: - name: setup go uses: actions/setup-go@v3 with: - go-version: 1.18 + go-version: '1.20' - name: test before release run: go test ./... diff --git a/CONFIGURATION.md b/CONFIGURATION.md index d034ee4..180fe6e 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -70,6 +70,55 @@ conditional, limiting payload data, and multiple receivers. [godoc](https://pkg.go.dev/github.com/bwNetFlow/flowpipeline/segments/alert/http) [examples using this segment](https://github.com/search?q=%22segment%3A+http%22+extension%3Ayml+repo%3AbwNetFlow%2Fflowpipeline%2Fexamples&type=Code) +### Analysis Group +Segments in this group do higher level analysis on flow data. They usually +export or print results in some way, but might also filter given flows. + +#### toptalkers-metrics +The `toptalkers-metrics` segment calculates statistics about traffic levels +per IP address and exports them in OpenMetrics format via HTTP. + +Traffic is counted in bits per second and packets per second, categorized into +forwarded and dropped traffic. By default, only the destination IP addresses +are accounted, but the configuration allows using the source IP address or +both addresses. For the latter, a flows number of bytes and packets are +ccounted for both addresses. + +Thresholds for bits per second or packets per second can be configured. Only +metrics for addresses that exceeded this threshold during the last window size +are exported. This can be used for detection of unusual or unwanted traffic +levels. This can also be used as a flow filter: While the average traffic for +an address is above threshold, flows are passed, other flows are dropped. + +The averages are calculated with a sliding window. The window size (in number +of buckets) and the bucket duration can be configured. By default, it uses +60 buckets of 1 second each (1 minute of sliding window). Optionally, the +window size for the exported metrics calculation and for the threshold check +can be configured differently. + +The parameter "traffictype" is passed as OpenMetrics label, so this segment +can be used multiple times in one pipeline without metrics getting mixed up. + +``` +- segment: toptalkers-metrics + config: + # the lines below are optional and set to default + traffictype: "" + buckets: 60 + BucketDuration: 1 + Thresholdbuckets: 60 + reportbuckets: 60 + thresholdbps: 0 + thresholdpps: 0 + endpoint: ":8080" + metricspath: "/metrics" + flowdatapath: "/flowdata" + relevantaddress: "destination" +``` + +[godoc](https://pkg.go.dev/github.com/bwNetFlow/flowpipeline/segments/analysis/toptalkers-metrics) +[examples using this segment](https://github.com/search?q=%22segment%3A+toptalkers-metrics%22+extension%3Ayml+repo%3AbwNetFlow%2Fflowpipeline%2Fexamples&type=Code) + ### Controlflow Group Segments in this group have the ability to change the sequence of segments any given flow traverses. diff --git a/Dockerfile b/Dockerfile index afe47fe..53d6754 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.18 AS builder +FROM golang:1.20 AS builder RUN apt-get update # add local repo into the builder diff --git a/examples/analysis/toptalkers-metrics.yml b/examples/analysis/toptalkers-metrics.yml new file mode 100644 index 0000000..55e749a --- /dev/null +++ b/examples/analysis/toptalkers-metrics.yml @@ -0,0 +1,35 @@ +--- +############################################################################### +# Consume flow messages, it's best to use an enriched topic as flowdump +# printing involves interface descriptions. +- segment: kafkaconsumer + config: + server: kafka01.example.com:9093 + topic: flow-messages-enriched + group: myuser-flowdump + user: myuser + pass: $KAFKA_SASL_PASS + +############################################################################### +# filter for some interesting traffic, in this case something +# that is likely used for DDoS attacks (UDP with source port 123 +# is seen during NTP amplification attacks) +- segment: flowfilter + config: + filter: "proto udp and src port 123" + +############################################################################### +# creates OpenMetrics endpoints for traffic data +# default endpoints are: +# :8080/flowdata +# :8080/metrics +# the given labels in this example are the default ones. +# They are also applied if the labels field is omitted. +- segment: prometheus + config: + endpoint: ":8080" + # 12 buckets at 5 seconds each -> 1 minute of sliding window + buckets: 12 + bucketduration: 5 + # set some thresholds (here 1 Gbps) + thresholdbps: 1000000000 diff --git a/go.mod b/go.mod index c7077a2..2e9b8b7 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,11 @@ module github.com/bwNetFlow/flowpipeline -go 1.18 +go 1.20 require ( github.com/BelWue/bgp_routeinfo v0.0.0-20221004100427-d8095fc566dd - github.com/ClickHouse/clickhouse-go/v2 v2.6.1 - github.com/Shopify/sarama v1.37.2 + github.com/ClickHouse/clickhouse-go/v2 v2.6.3 + github.com/Shopify/sarama v1.38.1 github.com/Yawning/cryptopan v0.0.0-20170504040949-65bca51288fe github.com/alouca/gosnmp v0.0.0-20170620005048-04d83944c9ab github.com/asecurityteam/rolling v2.0.4+incompatible @@ -16,31 +16,32 @@ require ( github.com/dustin/go-humanize v1.0.1 github.com/google/gopacket v1.1.19 github.com/hashicorp/logutils v1.0.0 - github.com/influxdata/influxdb-client-go/v2 v2.11.0 - github.com/mattn/go-sqlite3 v1.14.15 + github.com/influxdata/influxdb-client-go/v2 v2.12.2 + github.com/mattn/go-sqlite3 v1.14.16 github.com/netsampler/goflow2 v1.1.1 github.com/oschwald/maxminddb-golang v1.10.0 github.com/patrickmn/go-cache v2.1.0+incompatible - github.com/prometheus/client_golang v1.13.0 + github.com/prometheus/client_golang v1.14.0 github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417 google.golang.org/protobuf v1.28.1 gopkg.in/yaml.v2 v2.4.0 ) require ( - github.com/ClickHouse/ch-go v0.51.2 // indirect + github.com/ClickHouse/ch-go v0.53.0 // indirect github.com/alecthomas/participle/v2 v2.0.0-beta.1 // indirect github.com/alouca/gologger v0.0.0-20120904114645-7d4b7291de9c // indirect - github.com/andybalholm/brotli v1.0.4 // indirect + github.com/andybalholm/brotli v1.0.5 // indirect + github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.1.2 // indirect - github.com/cilium/ebpf v0.9.3 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cilium/ebpf v0.10.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/deepmap/oapi-codegen v1.11.0 // indirect + github.com/deepmap/oapi-codegen v1.12.4 // indirect github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect github.com/eapache/channels v1.1.0 // indirect github.com/eapache/go-resiliency v1.3.0 // indirect - github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-faster/city v1.0.1 // indirect @@ -58,42 +59,45 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/k-sone/critbitgo v1.4.0 // indirect - github.com/klauspost/compress v1.15.14 // indirect + github.com/klauspost/compress v1.15.15 // indirect github.com/libp2p/go-reuseport v0.2.0 // indirect - github.com/magiconair/properties v1.8.6 // indirect - github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect + github.com/magiconair/properties v1.8.7 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect - github.com/osrg/gobgp/v3 v3.7.0 // indirect - github.com/paulmach/orb v0.8.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/osrg/gobgp/v3 v3.11.0 // indirect + github.com/paulmach/orb v0.9.0 // indirect github.com/pelletier/go-toml v1.9.5 // indirect - github.com/pelletier/go-toml/v2 v2.0.5 // indirect + github.com/pelletier/go-toml/v2 v2.0.6 // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_model v0.3.0 // indirect - github.com/prometheus/common v0.37.0 // indirect - github.com/prometheus/procfs v0.8.0 // indirect + github.com/prometheus/common v0.40.0 // indirect + github.com/prometheus/procfs v0.9.0 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/shopspring/decimal v1.3.1 // indirect github.com/sirupsen/logrus v1.9.0 // indirect - github.com/spf13/afero v1.9.2 // indirect + github.com/spf13/afero v1.9.4 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/spf13/viper v1.13.0 // indirect - github.com/subosito/gotenv v1.4.1 // indirect + github.com/spf13/viper v1.15.0 // indirect + github.com/subosito/gotenv v1.4.2 // indirect github.com/vishvananda/netlink v1.2.1-beta.2 // indirect - github.com/vishvananda/netns v0.0.0-20220913150850-18c4f4234207 // indirect - go.opentelemetry.io/otel v1.11.2 // indirect - go.opentelemetry.io/otel/trace v1.11.2 // indirect - golang.org/x/crypto v0.1.0 // indirect - golang.org/x/net v0.1.0 // indirect - golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7 // indirect - golang.org/x/sys v0.1.0 // indirect - golang.org/x/text v0.4.0 // indirect - google.golang.org/genproto v0.0.0-20221024183307-1bc688fe9f3e // indirect - google.golang.org/grpc v1.50.1 // indirect + github.com/vishvananda/netns v0.0.4 // indirect + go.opentelemetry.io/otel v1.13.0 // indirect + go.opentelemetry.io/otel/trace v1.13.0 // indirect + golang.org/x/crypto v0.6.0 // indirect + golang.org/x/net v0.7.0 // indirect + golang.org/x/sync v0.1.0 // indirect + golang.org/x/sys v0.5.0 // indirect + golang.org/x/text v0.7.0 // indirect + google.golang.org/genproto v0.0.0-20230222225845-10f96fb3dbec // indirect + google.golang.org/grpc v1.53.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 22c649f..2c054fe 100644 --- a/go.sum +++ b/go.sum @@ -42,16 +42,26 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/ClickHouse/ch-go v0.51.2 h1:PesdqjUImi21U61yPKsDhfer8wiQ3geTsjdjZzXd/3s= github.com/ClickHouse/ch-go v0.51.2/go.mod h1:z+/hEezvvHvRMV/I00CaXBnxOx+td4zRe7HJpBYLwGU= +github.com/ClickHouse/ch-go v0.53.0 h1:gD9oP15FW+1oTTYyVzmuVfM+bk5cB5wqdscBIIw/mRA= +github.com/ClickHouse/ch-go v0.53.0/go.mod h1:B9htMJ0hii/zrC2hljUKdnagRBuLqtRG/GrU3jqCwRk= +github.com/ClickHouse/clickhouse-go v1.5.4 h1:cKjXeYLNWVJIx2J1K6H2CqyRmfwVJVY1OV1coaaFcI0= github.com/ClickHouse/clickhouse-go/v2 v2.6.1 h1:82UzCrD8cYEb/Bs/LOO3dlBZZyL+SlvvH/xwZF25BIU= github.com/ClickHouse/clickhouse-go/v2 v2.6.1/go.mod h1:SvXuWqDsiHJE3VAn2+3+nz9W9exOSigyskcs4DAcxJQ= +github.com/ClickHouse/clickhouse-go/v2 v2.6.3 h1:T2iboZmzaO3NHF5QRMi7kgUhM/WEhZM5d5P2r+VCsK4= +github.com/ClickHouse/clickhouse-go/v2 v2.6.3/go.mod h1:GcNAg9SniIu+BqzOxRsTmXAGvhlSaUm/Y9GFdWUCbX8= +github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= github.com/Shopify/sarama v1.37.2 h1:LoBbU0yJPte0cE5TZCGdlzZRmMgMtZU/XgnUKZg9Cv4= github.com/Shopify/sarama v1.37.2/go.mod h1:Nxye/E+YPru//Bpaorfhc3JsSGYwCaDDj+R4bK52U5o= +github.com/Shopify/sarama v1.38.1 h1:lqqPUPQZ7zPqYlWpTh+LQ9bhYNu2xJL6k1SJN4WVe2A= +github.com/Shopify/sarama v1.38.1/go.mod h1:iwv9a67Ha8VNa+TifujYoWGxWnu2kNVAQdSdZ4X2o5g= github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc= github.com/Yawning/cryptopan v0.0.0-20170504040949-65bca51288fe h1:SKdmPMOww/faIbffys2UgnZHlQJETCw7N18AaYUYf2M= github.com/Yawning/cryptopan v0.0.0-20170504040949-65bca51288fe/go.mod h1:tGK+sH41V0mnyFBVWQoRyj7neHPwQwPM1KJ3PfS6dTI= github.com/alecthomas/assert/v2 v2.0.3 h1:WKqJODfOiQG0nEJKFKzDIG3E29CN2/4zR9XGJzKIkbg= github.com/alecthomas/participle/v2 v2.0.0-beta.1 h1:qA1IALv09wFD4fQ2anV6MCl1BIInd+Dm+ksI6ES4kfs= github.com/alecthomas/participle/v2 v2.0.0-beta.1/go.mod h1:RC764t6n4L8D8ITAJv0qdokritYSNR3wV5cVwmIEaMM= +github.com/alecthomas/participle/v2 v2.0.0-beta.5 h1:y6dsSYVb1G5eK6mgmy+BgI3Mw35a3WghArZ/Hbebrjo= +github.com/alecthomas/participle/v2 v2.0.0-beta.5/go.mod h1:RC764t6n4L8D8ITAJv0qdokritYSNR3wV5cVwmIEaMM= github.com/alecthomas/repr v0.1.0 h1:ENn2e1+J3k09gyj2shc0dHr/yjaWSHRlrJ4DPMevDqE= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -64,12 +74,17 @@ github.com/alouca/gosnmp v0.0.0-20170620005048-04d83944c9ab h1:pfx9N/EMDxIwVzGu9 github.com/alouca/gosnmp v0.0.0-20170620005048-04d83944c9ab/go.mod h1:kEcj+iUROrUCr7AIrul5NutI2kWv0ns9BL0ezVp1h/Y= github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= +github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= +github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= +github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= +github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= github.com/asecurityteam/rolling v2.0.4+incompatible h1:WOSeokINZT0IDzYGc5BVcjLlR9vPol08RvI2GAsmB0s= github.com/asecurityteam/rolling v2.0.4+incompatible/go.mod h1:2D4ba5ZfYCWrIMleUgTvc8pmLExEuvu3PDwl+vnG58Q= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= github.com/bwNetFlow/bpf_flowexport v0.0.0-20220515112212-cd8128615c05 h1:/H/qIQ0bo3uVWV9ILwv7dIR6r4MsHmqk+Nc7zGT4/0s= github.com/bwNetFlow/bpf_flowexport v0.0.0-20220515112212-cd8128615c05/go.mod h1:DuJLqsHWPJZpyo1yfuHEMjSlQlPjXWntTjGAHJCt4ns= github.com/bwNetFlow/flowfilter v0.0.0-20221025122858-60746fa15915 h1:RN9oNbfTMlAlVznyp0pI5LOZnB+rjnB/Lmzd23UVyQY= @@ -82,11 +97,15 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/cilium/ebpf v0.9.3 h1:5KtxXZU+scyERvkJMEm16TbScVvuuMrlhPly78ZMbSc= github.com/cilium/ebpf v0.9.3/go.mod h1:w27N4UjpaQ9X/DGrSugxUG+H+NhgntDuPb5lCzxCn8A= +github.com/cilium/ebpf v0.10.0 h1:nk5HPMeoBXtOzbkZBWym+ZWq1GIiHUsBFXxwewXAHLQ= +github.com/cilium/ebpf v0.10.0/go.mod h1:DPiVdY/kT534dgc9ERmvP8mWA+9gvwgKfRvk4nNWnoE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= @@ -101,6 +120,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.0-20210816181553-5444fa50b93d/go. github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs= github.com/deepmap/oapi-codegen v1.11.0 h1:f/X2NdIkaBKsSdpeuwLnY/vDI0AtPUrmB5LMgc7YD+A= github.com/deepmap/oapi-codegen v1.11.0/go.mod h1:k+ujhoQGxmQYBZBbxhOZNZf4j08qv5mC+OH+fFTnKxM= +github.com/deepmap/oapi-codegen v1.12.4 h1:pPmn6qI9MuOtCz82WY2Xaw46EQjgvxednXXrP7g5Q2s= +github.com/deepmap/oapi-codegen v1.12.4/go.mod h1:3lgHGMu6myQ2vqbbTXH2H1o4eXFTGnFiDaOaKKl5yas= github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y= github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= @@ -111,6 +132,8 @@ github.com/eapache/go-resiliency v1.3.0 h1:RRL0nge+cWGlxXbUzJ7yMcq6w2XBEr19dCN6H github.com/eapache/go-resiliency v1.3.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 h1:8yY/I9ndfrgrXUbOGObLHKBR4Fl3nZXwM2c7OYTT8hM= +github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -187,6 +210,7 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y= @@ -248,6 +272,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/influxdata/influxdb-client-go/v2 v2.11.0 h1:BrHYv38rWkAnp22gIaHFp5LpOCazOqRMRvVE1yW3ym8= github.com/influxdata/influxdb-client-go/v2 v2.11.0/go.mod h1:YteV91FiQxRdccyJ2cHvj2f/5sq4y4Njqu1fQzsQCOU= +github.com/influxdata/influxdb-client-go/v2 v2.12.2 h1:uYABKdrEKlYm+++qfKdbgaHKBPmoWR5wpbmj6MBB/2g= +github.com/influxdata/influxdb-client-go/v2 v2.12.2/go.mod h1:YteV91FiQxRdccyJ2cHvj2f/5sq4y4Njqu1fQzsQCOU= github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf h1:7JTmneyiNEwVBOHSjoMxiWAqB992atOeepeFYegn5RU= github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= @@ -268,17 +294,22 @@ github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCV github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= +github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/k-sone/critbitgo v1.4.0 h1:l71cTyBGeh6X5ATh6Fibgw3+rtNT80BA0uNNWgkPrbE= github.com/k-sone/critbitgo v1.4.0/go.mod h1:7E6pyoyADnFxlUBEKcnfS49b7SUAQGMK+OAp/UQvo0s= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.14 h1:i7WCKDToww0wA+9qrUZ1xOjp218vfFo3nTU6UHp+gOc= github.com/klauspost/compress v1.15.14/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= +github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= @@ -307,6 +338,8 @@ github.com/libp2p/go-reuseport v0.2.0 h1:18PRvIMlpY6ZK85nIAicSBuXXvrYoSw3dsBAR7z github.com/libp2p/go-reuseport v0.2.0/go.mod h1:bvVho6eLMm6Bz5hmU0LYN3ixd3nPPvtIlaURZZgOY4k= github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo= github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= +github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= +github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= @@ -318,16 +351,23 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI= github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= +github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.2 h1:hAHbPm5IJGijwng3PWk09JkG9WeqChjprR5s9bBZ+OM= github.com/matttproud/golang_protobuf_extensions v1.0.2/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= +github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/netsampler/goflow2 v1.1.1 h1:GpVlvPq4yRbyzoiz0Vp3XilNr5js/0UhHcQI7Ol/MDk= @@ -337,15 +377,22 @@ github.com/oschwald/maxminddb-golang v1.10.0 h1:Xp1u0ZhqkSuopaKmk1WwHtjF0H9Hd918 github.com/oschwald/maxminddb-golang v1.10.0/go.mod h1:Y2ELenReaLAZ0b400URyGwvYxHV1dLIxBuyOsyYjHK0= github.com/osrg/gobgp/v3 v3.7.0 h1:h+Liq90TsxNKTB/443V8b1o/pwOm94yIsm+gP0RHwOo= github.com/osrg/gobgp/v3 v3.7.0/go.mod h1:fKQPuk7+4qMiDT5viZTXT/aSEn8yYDkEs5p3NjmU2bw= +github.com/osrg/gobgp/v3 v3.11.0 h1:HismNqoeZJ96WoDjxg5A4mAvFmOcuct5QYktFc8euSo= +github.com/osrg/gobgp/v3 v3.11.0/go.mod h1:/q/mr+dOuPf5hDlLiatVll1P7BX4pmiXQxd4ZZpUbkg= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/paulmach/orb v0.8.0 h1:W5XAt5yNPNnhaMNEf0xNSkBMJ1LzOzdk2MRlB6EN0Vs= github.com/paulmach/orb v0.8.0/go.mod h1:FWRlTgl88VI1RBx/MkrwWDRhQ96ctqMCh8boXhmqB/A= +github.com/paulmach/orb v0.9.0 h1:MwA1DqOKtvCgm7u9RZ/pnYejTeDJPnr0+0oFajBbJqk= +github.com/paulmach/orb v0.9.0/go.mod h1:SudmOk85SXtmXAB3sLGyJ6tZy/8pdfrV0o6ef98Xc30= github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pelletier/go-toml/v2 v2.0.5 h1:ipoSadvV8oGUjnUbMub59IDPPwfxF694nG/jwbMiyQg= github.com/pelletier/go-toml/v2 v2.0.5/go.mod h1:OMHamSCAODeSsVrwwvcJOaoN0LIUIaFVNZzmWyNfXas= +github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvIwycIU= +github.com/pelletier/go-toml/v2 v2.0.6/go.mod h1:eumQOmlWiOPt5WriQQqoM5y18pDHwha2N+QD+EUNTek= +github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -363,6 +410,8 @@ github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqr github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= github.com/prometheus/client_golang v1.13.0 h1:b71QUfeo5M8gq2+evJdTPfZhYMAU0uKPkyPJ7TPsloU= github.com/prometheus/client_golang v1.13.0/go.mod h1:vTeo+zgvILHsnnj/39Ou/1fPN5nJFOEMgftOUOmlvYQ= +github.com/prometheus/client_golang v1.14.0 h1:nJdhIvne2eSX/XRAFV9PcvFFRbrjbcTUj0VP62TMhnw= +github.com/prometheus/client_golang v1.14.0/go.mod h1:8vpkKitgIVNcqrRBWh1C4TIUQgYNtG/XQE4E/Zae36Y= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -375,6 +424,8 @@ github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9 github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= github.com/prometheus/common v0.37.0 h1:ccBbHCgIiT9uSoFY0vX8H3zsNR5eLt17/RQLUvn8pXE= github.com/prometheus/common v0.37.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA= +github.com/prometheus/common v0.40.0 h1:Afz7EVRqGg2Mqqf4JuF9vdvp1pi220m55Pi9T2JnO4Q= +github.com/prometheus/common v0.40.0/go.mod h1:L65ZJPSmfn/UBWLQIHV7dBrKFidB/wPlF1y5TlSt9OE= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= @@ -382,6 +433,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= +github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= +github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -401,6 +454,8 @@ github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0 github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/spf13/afero v1.9.2 h1:j49Hj62F0n+DaZ1dDCvhABaPNSGNkt32oRFxI33IEMw= github.com/spf13/afero v1.9.2/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y= +github.com/spf13/afero v1.9.4 h1:Sd43wM1IWz/s1aVXdOBkjJvuP8UdyqioeE4AmM0QsBs= +github.com/spf13/afero v1.9.4/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y= github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w= github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU= github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk= @@ -409,9 +464,13 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.13.0 h1:BWSJ/M+f+3nmdz9bxB+bWX28kkALN2ok11D0rSo8EJU= github.com/spf13/viper v1.13.0/go.mod h1:Icm2xNL3/8uyh/wFuB1jI7TiTNKp8632Nwegu+zgdYw= +github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU= +github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA= +github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -421,8 +480,12 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/subosito/gotenv v1.4.1 h1:jyEFiXpy21Wm81FBN71l9VoMMV8H8jG+qIK3GCpY6Qs= github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= +github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8= +github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= +github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go v1.2.7/go.mod h1:nF9osbDWLy6bDVv/Rtoh6QgnvNDpmCalQV5urGCCS6M= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= @@ -434,11 +497,18 @@ github.com/vishvananda/netlink v1.2.1-beta.2/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhg github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.0-20220913150850-18c4f4234207 h1:nn7SOQy8xCu3iXNv7oiBhhEQtbWdnEOMnuKBlHvrqIM= github.com/vishvananda/netns v0.0.0-20220913150850-18c4f4234207/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= +github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= +github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= +github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +go.mongodb.org/mongo-driver v1.11.1/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -447,8 +517,12 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opentelemetry.io/otel v1.11.2 h1:YBZcQlsVekzFsFbjygXMOXSs6pialIZxcjfO/mBDmR0= go.opentelemetry.io/otel v1.11.2/go.mod h1:7p4EUV+AqgdlNV9gL97IgUZiVR3yrFXYo53f9BM3tRI= +go.opentelemetry.io/otel v1.13.0 h1:1ZAKnNQKwBBxFtww/GwxNUyTf0AxkZzrukO8MeXqe4Y= +go.opentelemetry.io/otel v1.13.0/go.mod h1:FH3RtdZCzRkJYFTCsAKDy9l/XYjMdNv6QrkFFB8DvVg= go.opentelemetry.io/otel/trace v1.11.2 h1:Xf7hWSF2Glv0DE3MH7fBHvtpSBsjcBUe5MYAmZM/+y0= go.opentelemetry.io/otel/trace v1.11.2/go.mod h1:4N+yC7QEz7TTsG9BSRLNAa63eg5E06ObSbKPmxQ/pKA= +go.opentelemetry.io/otel/trace v1.13.0 h1:CBgRZ6ntv+Amuj1jDsMhZtlAPT6gbyIRdaIzFhfBSdY= +go.opentelemetry.io/otel/trace v1.13.0/go.mod h1:muCvmmO9KKpvuXSf3KKAXXB2ygNYHQ+ZfI5X08d3tds= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -462,9 +536,12 @@ golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220513210258-46612604a0f9/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= +golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -541,6 +618,8 @@ golang.org/x/net v0.0.0-20220513224357-95641704303c/go.mod h1:CfG3xpIq0wQ8r1q4Su golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= golang.org/x/net v0.1.0 h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= +golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -565,6 +644,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7 h1:ZrnxWX62AgTKOSagEqxvb3ffipvEDX2pl7E1TdqLqIc= golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -624,6 +705,8 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -636,6 +719,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -760,6 +845,8 @@ google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20221024183307-1bc688fe9f3e h1:S9GbmC1iCgvbLyAokVCwiO6tVIrU9Y7c5oMx1V/ki/Y= google.golang.org/genproto v0.0.0-20221024183307-1bc688fe9f3e/go.mod h1:9qHF0xnpdSfF6knlcsnpzUu5y+rpwgbvsyGAZPBMg4s= +google.golang.org/genproto v0.0.0-20230222225845-10f96fb3dbec h1:6rwgChOSUfpzJF2/KnLgo+gMaxGpujStSkPWrbhXArU= +google.golang.org/genproto v0.0.0-20230222225845-10f96fb3dbec/go.mod h1:3Dl5ZL0q0isWJt+FVcfpQyirqemEuLAK/iFvg1UP1Hw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -778,6 +865,8 @@ google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA5 google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.50.1 h1:DS/BukOZWp8s6p4Dt/tOaJaTQyPyOoCcrjroHuCeLzY= google.golang.org/grpc v1.50.1/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= +google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc= +google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/main.go b/main.go index 8c4c8b4..bdf81ad 100644 --- a/main.go +++ b/main.go @@ -59,6 +59,8 @@ import ( _ "github.com/bwNetFlow/flowpipeline/segments/print/printdots" _ "github.com/bwNetFlow/flowpipeline/segments/print/printflowdump" _ "github.com/bwNetFlow/flowpipeline/segments/print/toptalkers" + + _ "github.com/bwNetFlow/flowpipeline/segments/analysis/toptalkers_metrics" ) var Version string diff --git a/segments/analysis/toptalkers_metrics/toptalkers_metrics.go b/segments/analysis/toptalkers_metrics/toptalkers_metrics.go new file mode 100644 index 0000000..084036e --- /dev/null +++ b/segments/analysis/toptalkers_metrics/toptalkers_metrics.go @@ -0,0 +1,519 @@ +package flowpipeline + +import ( + "bufio" + "log" + "net/http" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/bwNetFlow/flowpipeline/segments" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +const cleanupWindowSizes = 5 + +type Record struct { + FwdBytes []uint64 + FwdPackets []uint64 + DropBytes []uint64 + DropPackets []uint64 + capacity int + pointer int + aboveThreshold atomic.Bool + sync.RWMutex +} + +type ToptalkersMetrics struct { + segments.BaseFilterSegment + writer *bufio.Writer + database *Database + + TrafficType string // optional, default is "", name for the traffic type (included as label) + Buckets int // optional, default is 60, sets the number of seconds used as a sliding window size + ThresholdBuckets int // optional, use the last N buckets for calculation of averages, default: $Buckets + ReportBuckets int // optional, use the last N buckets to calculate averages that are reported as result, default: $Buckets + BucketDuration int // optional, duration of a bucket, default is 1 second + ThresholdBps uint64 // optional, default is 0, only log talkers with an average bits per second rate higher than this value + ThresholdPps uint64 // optional, default is 0, only log talkers with an average packets per second rate higher than this value + Endpoint string // optional, default value is ":8080" + MetricsPath string // optional, default is "/metrics" + FlowdataPath string // optional, default is "/flowdata" + RelevantAddress string // optional, default is "destination", options are "destination", "source", "both" +} + +type Database struct { + database map[string]*Record + thresholdBps uint64 + thresholdPps uint64 + buckets int // seconds + bucketDuration int // seconds + thresholdBuckets int + cleanupCounter int + promExporter PrometheusExporter + stopOnce sync.Once + stopCleanupC chan struct{} + stopClockC chan struct{} + sync.RWMutex +} + +func (db *Database) GetRecord(key string) *Record { + db.Lock() + defer db.Unlock() + record, found := db.database[key] + if !found || record == nil { + record = NewRecord(db.buckets) + db.database[key] = record + } + return record +} + +func NewRecord(windowSize int) *Record { + record := &Record{ + FwdBytes: make([]uint64, windowSize), + FwdPackets: make([]uint64, windowSize), + DropBytes: make([]uint64, windowSize), + DropPackets: make([]uint64, windowSize), + capacity: windowSize, + pointer: 0, + } + return record +} + +func (record *Record) Append(bytes uint64, packets uint64, statusFwd bool) { + record.Lock() + defer record.Unlock() + if statusFwd == true { + record.FwdBytes[record.pointer] += bytes + record.FwdPackets[record.pointer] += packets + } else { + record.DropBytes[record.pointer] += bytes + record.DropPackets[record.pointer] += packets + } +} + +func (record *Record) isEmpty() bool { + record.RLock() + defer record.RUnlock() + for i := 0; i < record.capacity; i++ { + if record.FwdPackets[i] > 0 || record.DropPackets[i] > 0 { + return false + } + } + return true +} + +func (record *Record) GetMetrics(buckets int, bucketDuration int) (float64, float64, float64, float64) { + // buckets == 0 means "look at the whole window" + if buckets == 0 { + buckets = record.capacity + } + sumFwdBytes := uint64(0) + sumFwdPackets := uint64(0) + sumDropBytes := uint64(0) + sumDropPackets := uint64(0) + record.RLock() + defer record.RUnlock() + pos := record.pointer + for i := 0; i < buckets; i++ { + if pos <= 0 { + pos = record.capacity - 1 + } else { + pos-- + } + sumFwdBytes += record.FwdBytes[pos] + sumFwdPackets += record.FwdPackets[pos] + sumDropBytes += record.DropBytes[pos] + sumDropPackets += record.DropPackets[pos] + } + sumFwdBps := float64(sumFwdBytes*8) / float64(buckets*bucketDuration) + sumFwdPps := float64(sumFwdPackets) / float64(buckets*bucketDuration) + sumDropBps := float64(sumDropBytes*8) / float64(buckets*bucketDuration) + sumDropPps := float64(sumDropPackets) / float64(buckets*bucketDuration) + return sumFwdBps, sumFwdPps, sumDropBps, sumDropPps +} + +func (record *Record) tick(thresholdBuckets int, bucketDuration int, thresholdBps uint64, thresholdPps uint64) { + record.Lock() + defer record.Unlock() + // advance pointer to the next position + record.pointer++ + if record.pointer >= record.capacity { + record.pointer = 0 + } + // calculate averages and check thresholds + if thresholdBuckets == 0 { + // thresholdBuckets == 0 means "look at the whole window" + thresholdBuckets = record.capacity + } + var sumBytes uint64 + var sumPackets uint64 + pos := record.pointer + for i := 0; i < thresholdBuckets; i++ { + if pos <= 0 { + pos = record.capacity - 1 + } else { + pos-- + } + sumBytes = sumBytes + record.FwdBytes[pos] + record.DropBytes[pos] + sumPackets = sumPackets + record.FwdPackets[pos] + record.DropPackets[pos] + } + bps := uint64(float64(sumBytes*8) / float64(bucketDuration*thresholdBuckets)) + pps := uint64(float64(sumPackets) / float64(bucketDuration*thresholdBuckets)) + if (bps > thresholdBps) || (pps > thresholdPps) { + record.aboveThreshold.Store(true) + } else { + record.aboveThreshold.Store(false) + } + // clear the current bucket + record.FwdBytes[record.pointer] = 0 + record.FwdPackets[record.pointer] = 0 + record.DropBytes[record.pointer] = 0 + record.DropPackets[record.pointer] = 0 +} + +func (db *Database) clock() { + ticker := time.NewTicker(time.Duration(db.bucketDuration) * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + db.Lock() + for _, record := range db.database { + record.tick(db.thresholdBuckets, db.bucketDuration, db.thresholdBps, db.thresholdPps) + } + db.Unlock() + case <-db.stopClockC: + return + } + } +} + +func (db *Database) cleanup() { + ticker := time.NewTicker(time.Duration(db.bucketDuration*db.buckets) * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + db.Lock() + db.cleanupCounter-- + if db.cleanupCounter <= 0 { + db.cleanupCounter = db.buckets * cleanupWindowSizes + for key, record := range db.database { + if record.isEmpty() == true { + delete(db.database, key) + } + } + } + db.promExporter.dbSize.Set(float64(len(db.database))) + db.Unlock() + case <-db.stopCleanupC: + return + } + } +} + +func (db *Database) GetAllRecords() <-chan struct { + key string + record *Record +} { + out := make(chan struct { + key string + record *Record + }) + go func() { + db.Lock() + defer func() { + db.Unlock() + close(out) + }() + for key, record := range db.database { + out <- struct { + key string + record *Record + }{key, record} + } + }() + return out +} + +// Exporter provides export features to Prometheus +type PrometheusExporter struct { + MetaReg *prometheus.Registry + FlowReg *prometheus.Registry + + kafkaMessageCount prometheus.Counter + dbSize prometheus.Gauge +} + +// Initialize Prometheus Exporter +func (e *PrometheusExporter) Initialize(collector *PrometheusCollector) { + // The Kafka metrics are added to the global registry. + e.kafkaMessageCount = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "kafka_messages_total", + Help: "Number of Kafka messages", + }) + e.dbSize = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "toptalkers_db_size", + Help: "Number of Keys in the current toptalkers database", + }) + e.MetaReg = prometheus.NewRegistry() + e.MetaReg.MustRegister(e.kafkaMessageCount) + e.MetaReg.MustRegister(e.dbSize) + + e.FlowReg = prometheus.NewRegistry() + e.FlowReg.MustRegister(collector) +} + +// listen on given endpoint addr with Handler for metricPath and flowdataPath +func (e *PrometheusExporter) ServeEndpoints(segment *ToptalkersMetrics) { + mux := http.NewServeMux() + mux.Handle(segment.MetricsPath, promhttp.HandlerFor(e.MetaReg, promhttp.HandlerOpts{})) + mux.Handle(segment.FlowdataPath, promhttp.HandlerFor(e.FlowReg, promhttp.HandlerOpts{})) + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(` + Flow Exporter + +

Flow Exporter

+

Metrics

+

Flow Data

+ + `)) + }) + go func() { + http.ListenAndServe(segment.Endpoint, mux) + }() + log.Printf("Enabled metrics on %s and %s, listening at %s.", segment.MetricsPath, segment.FlowdataPath, segment.Endpoint) +} + +func (segment ToptalkersMetrics) New(config map[string]string) segments.Segment { + newsegment := &ToptalkersMetrics{ + Buckets: 60, + ThresholdBuckets: 60, + ReportBuckets: 60, + BucketDuration: 1, + TrafficType: "", + Endpoint: ":8080", + MetricsPath: "/metrics", + FlowdataPath: "/flowdata", + RelevantAddress: "destination", + } + + if config["buckets"] != "" { + if parsedBuckets, err := strconv.ParseInt(config["buckets"], 10, 64); err == nil { + newsegment.Buckets = int(parsedBuckets) + if newsegment.Buckets <= 0 { + log.Println("[error] : Buckets has to be >0.") + return nil + } + } else { + log.Println("[error] ToptalkersMetrics: Could not parse 'buckets' parameter, using default 60.") + } + } else { + log.Println("[info] ToptalkersMetrics: 'buckets' set to default 60.") + } + + if config["thresholdbuckets"] != "" { + if parsedThresholdBuckets, err := strconv.ParseInt(config["thresholdbuckets"], 10, 64); err == nil { + newsegment.ThresholdBuckets = int(parsedThresholdBuckets) + if newsegment.ThresholdBuckets <= 0 { + log.Println("[error] : thresholdbuckets has to be >0.") + return nil + } + } else { + log.Println("[error] ToptalkersMetrics: Could not parse 'thresholdbuckets' parameter, using default (1 buckets).") + } + } else { + log.Println("[info] ToptalkersMetrics: 'thresholdbuckets' set to default (1 buckets).") + } + + if config["reportbuckets"] != "" { + if parsedReportBuckets, err := strconv.ParseInt(config["reportbuckets"], 10, 64); err == nil { + newsegment.ReportBuckets = int(parsedReportBuckets) + if newsegment.ReportBuckets <= 0 { + log.Println("[error] : reportbuckets has to be >0.") + return nil + } + } else { + log.Println("[error] ReportPrometheus: Could not parse 'reportbuckets' parameter, using default (1 buckets).") + } + } else { + log.Println("[info] ReportPrometheus: 'reportbuckets' set to default (1 buckets).") + } + + if config["traffictype"] != "" { + newsegment.TrafficType = config["traffictype"] + } else { + log.Println("[info] ToptalkersMetrics: 'traffictype' is empty.") + } + + if config["thresholdbps"] != "" { + if parsedThresholdBps, err := strconv.ParseUint(config["thresholdbps"], 10, 32); err == nil { + newsegment.ThresholdBps = parsedThresholdBps + } else { + log.Println("[error] ToptalkersMetrics: Could not parse 'thresholdbps' parameter, using default 0.") + } + } else { + log.Println("[info] ToptalkersMetrics: 'thresholdbps' set to default '0'.") + } + + if config["thresholdpps"] != "" { + if parsedThresholdPps, err := strconv.ParseUint(config["thresholdpps"], 10, 32); err == nil { + newsegment.ThresholdPps = parsedThresholdPps + } else { + log.Println("[error] ToptalkersMetrics: Could not parse 'thresholdpps' parameter, using default 0.") + } + } else { + log.Println("[info] ToptalkersMetrics: 'thresholdpps' set to default '0'.") + } + + if config["endpoint"] == "" { + log.Println("[info] ToptalkersMetrics Missing configuration parameter 'endpoint'. Using default port \":8080\"") + } else { + newsegment.Endpoint = config["endpoint"] + } + + if config["metricspath"] == "" { + log.Println("[info] ToptalkersMetrics: Missing configuration parameter 'metricspath'. Using default path \"/metrics\"") + } else { + newsegment.FlowdataPath = config["metricspath"] + } + if config["flowdatapath"] == "" { + log.Println("[info] ThresholdToptalkersMetrics: Missing configuration parameter 'flowdatapath'. Using default path \"/flowdata\"") + } else { + newsegment.FlowdataPath = config["flowdatapath"] + } + + switch config["relevantaddress"] { + case + "destination", + "source", + "both": + newsegment.RelevantAddress = config["relevantaddress"] + case "": + log.Println("[info] ToptalkersMetrics: 'relevantaddress' set to default 'destination'.") + default: + log.Println("[error] ToptalkersMetrics: Could not parse 'relevantaddress', using default value 'destination'.") + } + return newsegment +} + +func (segment *ToptalkersMetrics) Run(wg *sync.WaitGroup) { + defer func() { + close(segment.Out) + wg.Done() + }() + + var promExporter = PrometheusExporter{} + collector := &PrometheusCollector{segment} + promExporter.Initialize(collector) + promExporter.ServeEndpoints(segment) + + segment.database = &Database{ + database: map[string]*Record{}, + thresholdBps: segment.ThresholdBps, + thresholdPps: segment.ThresholdPps, + thresholdBuckets: segment.ThresholdBuckets, + cleanupCounter: segment.Buckets * cleanupWindowSizes, // cleanup every N windows + promExporter: promExporter, + buckets: segment.Buckets, + bucketDuration: segment.BucketDuration, + stopCleanupC: make(chan struct{}), + stopClockC: make(chan struct{}), + } + go segment.database.clock() + go segment.database.cleanup() + + for msg := range segment.In { + promExporter.kafkaMessageCount.Inc() + var keys []string + if segment.RelevantAddress == "source" { + keys = []string{msg.SrcAddrObj().String()} + } else if segment.RelevantAddress == "destination" { + keys = []string{msg.DstAddrObj().String()} + } else if segment.RelevantAddress == "both" { + keys = []string{msg.SrcAddrObj().String(), msg.DstAddrObj().String()} + } + forward := false + for _, key := range keys { + record := segment.database.GetRecord(key) + record.Append(msg.Bytes, msg.Packets, msg.IsForwarded()) + if record.aboveThreshold.Load() == true { + forward = true + } + } + if forward == true { + segment.Out <- msg + } else if segment.Drops != nil { + segment.Drops <- msg + } + } +} + +var ( + trafficBpsDesc = prometheus.NewDesc( + "traffic_bps", + "Traffic volume in bits per second for a given address.", + []string{"traffic_type", "address", "forwarding_status"}, nil, + ) + trafficPpsDesc = prometheus.NewDesc( + "traffic_pps", + "Traffic in packets per second for a given address.", + []string{"traffic_type", "address", "forwarding_status"}, nil, + ) +) + +type PrometheusCollector struct { + segment *ToptalkersMetrics +} + +func (collector *PrometheusCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- trafficBpsDesc + ch <- trafficPpsDesc +} +func (collector *PrometheusCollector) Collect(ch chan<- prometheus.Metric) { + for entry := range collector.segment.database.GetAllRecords() { + key := entry.key + record := entry.record + // check if thresholds are exceeded + buckets := collector.segment.ReportBuckets + bucketDuration := collector.segment.BucketDuration + if record.aboveThreshold.Load() == true { + sumFwdBps, sumFwdPps, sumDropBps, sumDropPps := record.GetMetrics(buckets, bucketDuration) + ch <- prometheus.MustNewConstMetric( + trafficBpsDesc, + prometheus.GaugeValue, + sumFwdBps, + collector.segment.TrafficType, key, "forwarded", + ) + ch <- prometheus.MustNewConstMetric( + trafficBpsDesc, + prometheus.GaugeValue, + sumDropBps, + collector.segment.TrafficType, key, "dropped", + ) + ch <- prometheus.MustNewConstMetric( + trafficPpsDesc, + prometheus.GaugeValue, + sumFwdPps, + collector.segment.TrafficType, key, "forwarded", + ) + ch <- prometheus.MustNewConstMetric( + trafficPpsDesc, + prometheus.GaugeValue, + sumDropPps, + collector.segment.TrafficType, key, "dropped", + ) + } + } +} + +func init() { + segment := &ToptalkersMetrics{} + segments.RegisterSegment("toptalkers_metrics", segment) +}