Skip to content

Commit

Permalink
Add reuse port to have multiple threads consumming from socket
Browse files Browse the repository at this point in the history
* Expands reuseport to have multiple threads consuming within GoFlow
(it was only possible for multiple processes and would work on sFlow)
  • Loading branch information
lspgn committed Mar 29, 2020
1 parent 8b0a2cc commit 9d60a46
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 77 deletions.
4 changes: 2 additions & 2 deletions cmd/cnetflow/cnetflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (

Addr = flag.String("addr", "", "NetFlow/IPFIX listening address")
Port = flag.Int("port", 2055, "NetFlow/IPFIX listening port")
Reuse = flag.Bool("reuse", false, "Enable so_reuseport for NetFlow/IPFIX listening port")
Reuse = flag.Int("reuse", 0, "Enable so_reuseport for NetFlow/IPFIX listening port")

Workers = flag.Int("workers", 1, "Number of NetFlow workers")
LogLevel = flag.String("loglevel", "info", "Log level")
Expand Down Expand Up @@ -86,7 +86,7 @@ func main() {
}
log.WithFields(log.Fields{
"Type": "NetFlow"}).
Infof("Listening on UDP %v:%v", *Addr, *Port)
Infof("Listening on UDP %s:%d (reuse: %d, workers: %d)", *Addr, *Port, *Reuse, *Workers)

err := s.FlowRoutine(*Workers, *Addr, *Port, *Reuse)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/cnflegacy/cnflegacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (

Addr = flag.String("addr", "", "NetFlow v5 listening address")
Port = flag.Int("port", 2055, "NetFlow v5 listening port")
Reuse = flag.Bool("reuse", false, "Enable so_reuseport for NetFlow v5 listening port")
Reuse = flag.Int("reuse", 0, "Enable so_reuseport for NetFlow v5 listening port")

Workers = flag.Int("workers", 1, "Number of NetFlow v5 workers")
LogLevel = flag.String("loglevel", "info", "Log level")
Expand Down Expand Up @@ -84,7 +84,7 @@ func main() {
}
log.WithFields(log.Fields{
"Type": "NetFlowLegacy"}).
Infof("Listening on UDP %v:%v", *Addr, *Port)
Infof("Listening on UDP %s:%d (reuse: %d, workers: %d)", *Addr, *Port, *Reuse, *Workers)

err := s.FlowRoutine(*Workers, *Addr, *Port, *Reuse)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/csflow/csflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (

Addr = flag.String("addr", "", "sFlow listening address")
Port = flag.Int("port", 6343, "sFlow listening port")
Reuse = flag.Bool("reuse", false, "Enable so_reuseport for sFlow listening port")
Reuse = flag.Int("reuse", 0, "Enable so_reuseport for sFlow listening port")

Workers = flag.Int("workers", 1, "Number of sFlow workers")
LogLevel = flag.String("loglevel", "info", "Log level")
Expand Down Expand Up @@ -84,7 +84,7 @@ func main() {
}
log.WithFields(log.Fields{
"Type": "sFlow"}).
Infof("Listening on UDP %v:%v", *Addr, *Port)
Infof("Listening on UDP %s:%d (reuse: %d, workers: %d)", *Addr, *Port, *Reuse, *Workers)

err := s.FlowRoutine(*Workers, *Addr, *Port, *Reuse)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions cmd/goflow/goflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ var (
SFlowEnable = flag.Bool("sflow", true, "Enable sFlow")
SFlowAddr = flag.String("sflow.addr", "", "sFlow listening address")
SFlowPort = flag.Int("sflow.port", 6343, "sFlow listening port")
SFlowReuse = flag.Bool("sflow.reuserport", false, "Enable so_reuseport for sFlow")
SFlowReuse = flag.Int("sflow.reuse", 0, "Enable so_reuseport for sFlow")

NFLEnable = flag.Bool("nfl", true, "Enable NetFlow v5")
NFLAddr = flag.String("nfl.addr", "", "NetFlow v5 listening address")
NFLPort = flag.Int("nfl.port", 2056, "NetFlow v5 listening port")
NFLReuse = flag.Bool("nfl.reuserport", false, "Enable so_reuseport for NetFlow v5")
NFLReuse = flag.Int("nfl.reuse", 0, "Enable so_reuseport for NetFlow v5")

NFEnable = flag.Bool("nf", true, "Enable NetFlow/IPFIX")
NFAddr = flag.String("nf.addr", "", "NetFlow/IPFIX listening address")
NFPort = flag.Int("nf.port", 2055, "NetFlow/IPFIX listening port")
NFReuse = flag.Bool("nf.reuserport", false, "Enable so_reuseport for NetFlow/IPFIX")
NFReuse = flag.Int("nf.reuse", 0, "Enable so_reuseport for NetFlow/IPFIX")

Workers = flag.Int("workers", 1, "Number of workers per collector")
LogLevel = flag.String("loglevel", "info", "Log level")
Expand Down Expand Up @@ -115,7 +115,7 @@ func main() {
go func() {
log.WithFields(log.Fields{
"Type": "sFlow"}).
Infof("Listening on UDP %v:%v", *SFlowAddr, *SFlowPort)
Infof("Listening on UDP %s:%d (reuse: %d, workers: %d)", *SFlowAddr, *SFlowPort, *SFlowReuse, *Workers)

err := sSFlow.FlowRoutine(*Workers, *SFlowAddr, *SFlowPort, *SFlowReuse)
if err != nil {
Expand All @@ -129,7 +129,7 @@ func main() {
go func() {
log.WithFields(log.Fields{
"Type": "NetFlow"}).
Infof("Listening on UDP %v:%v", *NFAddr, *NFPort)
Infof("Listening on UDP %s:%d (reuse: %d, workers: %d)", *NFAddr, *NFPort, *NFReuse, *Workers)

err := sNF.FlowRoutine(*Workers, *NFAddr, *NFPort, *NFReuse)
if err != nil {
Expand All @@ -143,7 +143,7 @@ func main() {
go func() {
log.WithFields(log.Fields{
"Type": "NetFlowLegacy"}).
Infof("Listening on UDP %v:%v", *NFLAddr, *NFLPort)
Infof("Listening on UDP %s:%d (reuse: %d, workers: %d)", *NFLAddr, *NFLPort, *NFLReuse, *Workers)

err := sNFL.FlowRoutine(*Workers, *NFLAddr, *NFLPort, *NFLReuse)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions utils/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@ var (
Name: "flow_traffic_bytes",
Help: "Bytes received by the application.",
},
[]string{"remote_ip", "remote_port", "local_ip", "local_port", "type"},
[]string{"remote_ip", "remote_port", "local_ip", "local_port", "type", "lane"},
)
MetricTrafficPackets = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "flow_traffic_packets",
Help: "Packets received by the application.",
},
[]string{"remote_ip", "remote_port", "local_ip", "local_port", "type"},
[]string{"remote_ip", "remote_port", "local_ip", "local_port", "type", "lane"},
)
MetricPacketSizeSum = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "flow_traffic_summary_size_bytes",
Help: "Summary of packet size.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"remote_ip", "remote_port", "local_ip", "local_port", "type"},
[]string{"remote_ip", "remote_port", "local_ip", "local_port", "type", "lane"},
)
DecoderStats = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down
2 changes: 1 addition & 1 deletion utils/netflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (s *StateNetFlow) InitTemplates() {
s.samplinglock = &sync.RWMutex{}
}

func (s *StateNetFlow) FlowRoutine(workers int, addr string, port int, reuseport bool) error {
func (s *StateNetFlow) FlowRoutine(workers int, addr string, port int, reuseport int) error {
s.InitTemplates()
return UDPRoutine("NetFlow", s.DecodeFlow, workers, addr, port, reuseport, s.Logger)
}
2 changes: 1 addition & 1 deletion utils/nflegacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,6 @@ func (s *StateNFLegacy) DecodeFlow(msg interface{}) error {
return nil
}

func (s *StateNFLegacy) FlowRoutine(workers int, addr string, port int, reuseport bool) error {
func (s *StateNFLegacy) FlowRoutine(workers int, addr string, port int, reuseport int) error {
return UDPRoutine("NetFlowV5", s.DecodeFlow, workers, addr, port, reuseport, s.Logger)
}
2 changes: 1 addition & 1 deletion utils/sflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,6 @@ func (s *StateSFlow) DecodeFlow(msg interface{}) error {
return nil
}

func (s *StateSFlow) FlowRoutine(workers int, addr string, port int, reuseport bool) error {
func (s *StateSFlow) FlowRoutine(workers int, addr string, port int, reuseport int) error {
return UDPRoutine("sFlow", s.DecodeFlow, workers, addr, port, reuseport, s.Logger)
}
136 changes: 77 additions & 59 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net"
"strconv"
"sync"
"time"

decoder "github.com/cloudflare/goflow/v3/decoders"
Expand Down Expand Up @@ -146,7 +147,7 @@ func FlowMessageToJSON(fmsg *flowmessage.FlowMessage) string {
return s
}

func UDPRoutine(name string, decodeFunc decoder.DecoderFunc, workers int, addr string, port int, sockReuse bool, logger Logger) error {
func UDPRoutine(name string, decodeFunc decoder.DecoderFunc, workers int, addr string, port int, sockReuse int, logger Logger) error {
ecb := DefaultErrorCallback{
Logger: logger,
}
Expand All @@ -165,73 +166,90 @@ func UDPRoutine(name string, decodeFunc decoder.DecoderFunc, workers int, addr s
Port: port,
}

var udpconn *net.UDPConn
var err error

if sockReuse {
pconn, err := reuseport.ListenPacket("udp", addrUDP.String())
defer pconn.Close()
if err != nil {
return err
}
var ok bool
udpconn, ok = pconn.(*net.UDPConn)
if !ok {
return err
udpconnL := make([]*net.UDPConn, 0)
if sockReuse > 0 {
for i := 0; i < sockReuse; i++ {
pconn, err := reuseport.ListenPacket("udp", addrUDP.String())
defer pconn.Close()
if err != nil {
return err
}
udpconn, ok := pconn.(*net.UDPConn)
if !ok {
return err
}
udpconnL = append(udpconnL, udpconn)
}
} else {
udpconn, err = net.ListenUDP("udp", &addrUDP)
defer udpconn.Close()
udpconn, err := net.ListenUDP("udp", &addrUDP)
if err != nil {
return err
}
udpconnL = append(udpconnL, udpconn)
}

payload := make([]byte, 9000)
routine := func(lane int, udpconn *net.UDPConn) {
payload := make([]byte, 9000)
localIP := addrUDP.IP.String()
if addrUDP.IP == nil {
localIP = ""
}

localIP := addrUDP.IP.String()
if addrUDP.IP == nil {
localIP = ""
for {
size, pktAddr, _ := udpconn.ReadFromUDP(payload)
payloadCut := make([]byte, size)
copy(payloadCut, payload[0:size])

baseMessage := BaseMessage{
Src: pktAddr.IP,
Port: pktAddr.Port,
Payload: payloadCut,
}
processor.ProcessMessage(baseMessage)

MetricTrafficBytes.With(
prometheus.Labels{
"remote_ip": pktAddr.IP.String(),
"remote_port": strconv.Itoa(pktAddr.Port),
"local_ip": localIP,
"local_port": strconv.Itoa(addrUDP.Port),
"type": name,
"lane": strconv.Itoa(lane),
}).
Add(float64(size))
MetricTrafficPackets.With(
prometheus.Labels{
"remote_ip": pktAddr.IP.String(),
"remote_port": strconv.Itoa(pktAddr.Port),
"local_ip": localIP,
"local_port": strconv.Itoa(addrUDP.Port),
"type": name,
"lane": strconv.Itoa(lane),
}).
Inc()
MetricPacketSizeSum.With(
prometheus.Labels{
"remote_ip": pktAddr.IP.String(),
"remote_port": strconv.Itoa(pktAddr.Port),
"local_ip": localIP,
"local_port": strconv.Itoa(addrUDP.Port),
"type": name,
"lane": strconv.Itoa(lane),
}).
Observe(float64(size))
}
}

for {
size, pktAddr, _ := udpconn.ReadFromUDP(payload)
payloadCut := make([]byte, size)
copy(payloadCut, payload[0:size])

baseMessage := BaseMessage{
Src: pktAddr.IP,
Port: pktAddr.Port,
Payload: payloadCut,
}
processor.ProcessMessage(baseMessage)

MetricTrafficBytes.With(
prometheus.Labels{
"remote_ip": pktAddr.IP.String(),
"remote_port": strconv.Itoa(pktAddr.Port),
"local_ip": localIP,
"local_port": strconv.Itoa(addrUDP.Port),
"type": name,
}).
Add(float64(size))
MetricTrafficPackets.With(
prometheus.Labels{
"remote_ip": pktAddr.IP.String(),
"remote_port": strconv.Itoa(pktAddr.Port),
"local_ip": localIP,
"local_port": strconv.Itoa(addrUDP.Port),
"type": name,
}).
Inc()
MetricPacketSizeSum.With(
prometheus.Labels{
"remote_ip": pktAddr.IP.String(),
"remote_port": strconv.Itoa(pktAddr.Port),
"local_ip": localIP,
"local_port": strconv.Itoa(addrUDP.Port),
"type": name,
}).
Observe(float64(size))
wg := &sync.WaitGroup{}
for i, udpconn := range udpconnL {
wg.Add(1)
go func() {
defer wg.Done()
routine(i, udpconn)
udpconn.Close()
}()
}

wg.Wait()
return nil
}

0 comments on commit 9d60a46

Please sign in to comment.