Skip to content

Commit 6b9ed84

Browse files
committed
Added custom rate limits
Added unit tests for netflow parsing
1 parent 3b7add6 commit 6b9ed84

9 files changed

+395
-2
lines changed

common/common.go

+39
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,27 @@ import (
66
"encoding/binary"
77
"fmt"
88
"github.com/mostlygeek/arp"
9+
"gopkg.in/yaml.v2"
910
"log"
1011
"math/big"
1112
"net"
13+
"os"
1214
"os/exec"
1315
"runtime"
1416
"strings"
1517
)
1618

19+
type CustomDeviceRateLimits struct {
20+
RateLimits []CustomDeviceRateLimit `yaml:"rateLimits"`
21+
}
22+
23+
type CustomDeviceRateLimit struct {
24+
DeviceIp string `yaml:"deviceIp"`
25+
RateLimit int `yaml:"rateLimit"`
26+
}
27+
28+
var customDeviceRateLimits CustomDeviceRateLimits
29+
1730
func CryptoRandomNumber(max int64) int64 {
1831
n, err := crand.Int(crand.Reader, big.NewInt(max))
1932
if err != nil {
@@ -109,3 +122,29 @@ func DarwinMACFormat(macString string) string {
109122
builder.WriteString(macString[len(macString)-group:])
110123
return builder.String()
111124
}
125+
126+
func InitCustomRateLimits(rateLimitPath string) {
127+
var customRateLimits CustomDeviceRateLimits
128+
configFileName := rateLimitPath
129+
source, err := os.ReadFile(configFileName)
130+
if err != nil {
131+
fmt.Println("failed reading custom device rate limits")
132+
os.Exit(1)
133+
}
134+
err = yaml.Unmarshal(source, &customRateLimits)
135+
if err != nil {
136+
log.Fatalf("error: %v", err)
137+
fmt.Println("failed unmarshal the custom rate limits")
138+
os.Exit(1)
139+
}
140+
customDeviceRateLimits = customRateLimits
141+
}
142+
143+
func HasCustomRateLimit(ip string) (bool, int) {
144+
for _, customRate := range customDeviceRateLimits.RateLimits {
145+
if customRate.DeviceIp == ip {
146+
return true, customRate.RateLimit
147+
}
148+
}
149+
return false, 0
150+
}

flowproxy.go

+3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"flag"
55
"fmt"
6+
"github.com/myENA/flowproxy/common"
67
"github.com/myENA/flowproxy/proxy"
78
"github.com/myENA/flowproxy/tproxy"
89
"os"
@@ -75,6 +76,8 @@ func main() {
7576
tproxyVerbose := tproxyCmd.Bool("verbose", false, "Whether to log every flow received. "+
7677
"Warning can be a lot")
7778

79+
common.InitCustomRateLimits("rateLimits.yaml")
80+
7881
// Start parsing command line args
7982
if len(os.Args) < 2 {
8083
printHelpHeader()

go.mod

+3
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ go 1.21.3
55
require (
66
github.com/google/gopacket v1.1.19
77
golang.org/x/time v0.5.0
8+
gopkg.in/yaml.v2 v2.4.0
89
)
910

11+
require golang.org/x/net v0.0.0-20190620200207-3b0461eec859 // indirect
12+
1013
require (
1114
github.com/mostlygeek/arp v0.0.0-20170424181311-541a2129847a
1215
golang.org/x/sys v0.1.0 // indirect

rateLimits.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
rateLimits:
2+
- {deviceIp: "192.168.56.106", rateLimit: 20}
Binary file not shown.
Binary file not shown.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
rateLimits:
2+
- {deviceIp: "192.168.56.106", rateLimit: 20}

tproxy/tproxy.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,9 @@ func parseNetflow(ctx context.Context, wg *sync.WaitGroup, proxyChan <-chan gopa
264264
srcIP = ip.SrcIP
265265
}
266266
}
267+
268+
hasCustomRateLimit, customRateLimit := common.HasCustomRateLimit(srcIP.String())
269+
267270
udpLayer := packet.Layer(layers.LayerTypeUDP)
268271
payload := udpLayer.LayerPayload()
269272
ok9, err9 := netflow.IsValidNetFlow(payload, 9)
@@ -275,7 +278,9 @@ func parseNetflow(ctx context.Context, wg *sync.WaitGroup, proxyChan <-chan gopa
275278
deviceManager.SeenDevice(srcIP.String())
276279
} else {
277280
deviceManager.AddDevice(srcIP.String())
278-
if rateLimit {
281+
if hasCustomRateLimit {
282+
deviceManager.SetSampleRate(srcIP.String(), customRateLimit)
283+
} else if rateLimit {
279284
deviceManager.SetSampleRate(srcIP.String(), rate)
280285
}
281286
}
@@ -300,7 +305,12 @@ func parseNetflow(ctx context.Context, wg *sync.WaitGroup, proxyChan <-chan gopa
300305
dataChan <- packet
301306
continue
302307
}
303-
if rateLimit {
308+
if hasCustomRateLimit {
309+
if deviceManager.CheckSampleRate(srcIP.String(), int(dataCount)) {
310+
rStats.Netflow9.DataSent = rStats.Netflow9.DataSent + dataCount
311+
dataChan <- packet
312+
}
313+
} else if rateLimit {
304314
if deviceManager.CheckSampleRate(srcIP.String(), int(dataCount)) {
305315
rStats.Netflow9.DataSent = rStats.Netflow9.DataSent + dataCount
306316
dataChan <- packet

0 commit comments

Comments
 (0)