Skip to content

Commit

Permalink
initial passive example
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarrod Baumann committed Jun 21, 2023
1 parent 5d9bcff commit 7997a5f
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 12 deletions.
22 changes: 12 additions & 10 deletions cmd/gobmp/gobmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ import (
)

var (
dstPort int
srcPort int
perfPort int
kafkaSrv string
natsSrv string
intercept string
splitAF string
dump string
file string
dstPort int
srcPort int
perfPort int
kafkaSrv string
natsSrv string
passiveRtr string
intercept string
splitAF string
dump string
file string
)

func init() {
Expand All @@ -39,6 +40,7 @@ func init() {
flag.IntVar(&dstPort, "destination-port", 5050, "port openBMP is listening")
flag.StringVar(&kafkaSrv, "kafka-server", "", "URL to access Kafka server")
flag.StringVar(&natsSrv, "nats-server", "", "URL to access NATS server")
flag.StringVar(&passiveRtr, "passive-router", "", "Passive BMP router to connect outbound (<host>:<port>)")
flag.StringVar(&intercept, "intercept", "false", "When intercept set \"true\", all incomming BMP messges will be copied to TCP port specified by destination-port, otherwise received BMP messages will be published to Kafka.")
flag.StringVar(&splitAF, "split-af", "true", "When set \"true\" (default) ipv4 and ipv6 will be published in separate topics. if set \"false\" the same topic will be used for both address families.")
flag.IntVar(&perfPort, "performance-port", 56767, "port used for performance debugging")
Expand Down Expand Up @@ -98,7 +100,7 @@ func main() {
glog.Errorf("failed to parse to bool the value of the intercept flag with error: %+v", err)
os.Exit(1)
}
bmpSrv, err := gobmpsrv.NewBMPServer(srcPort, dstPort, interceptFlag, publisher, splitAFFlag)
bmpSrv, err := gobmpsrv.NewBMPServer(srcPort, dstPort, interceptFlag, publisher, splitAFFlag, passiveRtr)
if err != nil {
glog.Errorf("failed to setup new gobmp server with error: %+v", err)
os.Exit(1)
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ require (
github.com/go-test/deep v1.0.8
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/protobuf v1.5.3 // indirect
github.com/nats-io/nats-server/v2 v2.9.16 // indirect
github.com/nats-io/nats-server/v2 v2.9.16
github.com/nats-io/nats.go v1.25.0
github.com/pkg/errors v0.9.1 // indirect
github.com/sbezverk/tools v0.0.0-20220706091339-17ec2f713538
google.golang.org/protobuf v1.30.0 // indirect
gotest.tools v2.2.0+incompatible
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OS
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI=
github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
Expand Down Expand Up @@ -171,3 +173,5 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0-20200601152816-913338de1bd2/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
15 changes: 14 additions & 1 deletion pkg/gobmpsrv/gobmpsrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type bmpServer struct {
destinationPort int
incoming net.Listener
stop chan struct{}
passiveRouter string
}

func (srv *bmpServer) Start() {
Expand All @@ -43,6 +44,17 @@ func (srv *bmpServer) Stop() {
}

func (srv *bmpServer) server() {
// Establish connection to passive router if specified
if srv.passiveRouter != "" {
conn, err := net.Dial("tcp", srv.passiveRouter)
if err != nil {
glog.Errorf("failed to connect to passive router with error: %+v", err)
return
}
glog.Infof("connected to passive router %+v, calling bmpWorker", conn.RemoteAddr())
go srv.bmpWorker(conn)
}

for {
client, err := srv.incoming.Accept()
if err != nil {
Expand Down Expand Up @@ -117,7 +129,7 @@ func (srv *bmpServer) bmpWorker(client net.Conn) {
}

// NewBMPServer instantiates a new instance of BMP Server
func NewBMPServer(sPort, dPort int, intercept bool, p pub.Publisher, splitAF bool) (BMPServer, error) {
func NewBMPServer(sPort, dPort int, intercept bool, p pub.Publisher, splitAF bool, passiveRouter string) (BMPServer, error) {
incoming, err := net.Listen("tcp", fmt.Sprintf(":%d", sPort))
if err != nil {
glog.Errorf("fail to setup listener on port %d with error: %+v", sPort, err)
Expand All @@ -131,6 +143,7 @@ func NewBMPServer(sPort, dPort int, intercept bool, p pub.Publisher, splitAF boo
publisher: p,
incoming: incoming,
splitAF: splitAF,
passiveRouter: passiveRouter,
}

return &bmp, nil
Expand Down
126 changes: 126 additions & 0 deletions pkg/nats/nats-publisher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package nats

import (
"encoding/json"
"fmt"
"net"
"testing"
"time"

natssrv "github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"github.com/sbezverk/gobmp/pkg/gobmpsrv"
"github.com/sbezverk/gobmp/pkg/message"
"github.com/sbezverk/gobmp/pkg/pub"
"gotest.tools/assert"
)

const (
streamName = "gobmp"
natsPrefix = "gobmp"
bmpPort = 5000
)

var (
p pub.Publisher
s nats.JetStreamContext
b gobmpsrv.BMPServer
)

func TestMain(m *testing.M) {
// build in-memory NATS server
natsSrv, err := natssrv.NewServer(&natssrv.Options{
Host: "127.0.0.1",
Debug: false,
Port: natssrv.RANDOM_PORT,
JetStream: true,
})
if err != nil {
panic(err)
}

defer natsSrv.Shutdown()

// Start NATS server
if err := natssrv.Run(natsSrv); err != nil {
panic(err)
}

// Create a NATS connection for subscribing
nc, err := nats.Connect(natsSrv.ClientURL())
if err != nil {
panic(err)
}
defer nc.Close()

s, err = nc.JetStream()
if err != nil {
panic(err)
}

// Create the stream
_, err = s.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{streamName + ".>"},
})
if err != nil {
panic(err)
}

// Create the Publisher
p, err = NewPublisher(natsSrv.ClientURL())
if err != nil {
panic(err)
}

// Start BMP
b, err = gobmpsrv.NewBMPServer(bmpPort, 0, false, p, false, "")
if err != nil {
panic(err)
}

// Starting Interceptor server
b.Start()

m.Run()
}

// TestNATSProducer tests NATS producer
func TestNATSProducer(t *testing.T) {
input := []byte{3, 0, 0, 0, 32, 4, 0, 1, 0, 10, 32, 55, 46, 50, 46, 49, 46, 50, 51, 73, 0, 2, 0, 8, 120, 114, 118, 57, 107, 45, 114, 49, 3, 0, 0, 0, 234, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 192, 168, 80, 103, 0, 0, 19, 206, 57, 112, 1, 254, 94, 98, 129, 171, 0, 0, 215, 126, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 192, 168, 80, 128, 0, 179, 131, 152, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 91, 1, 4, 19, 206, 0, 90, 192, 168, 8, 8, 62, 2, 6, 1, 4, 0, 1, 0, 1, 2, 6, 1, 4, 0, 1, 0, 4, 2, 6, 1, 4, 0, 1, 0, 128, 2, 2, 128, 0, 2, 2, 2, 0, 2, 6, 65, 4, 0, 0, 19, 206, 2, 20, 5, 18, 0, 1, 0, 1, 0, 2, 0, 1, 0, 2, 0, 2, 0, 1, 0, 128, 0, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 75, 1, 4, 19, 206, 0, 90, 57, 112, 1, 254, 46, 2, 44, 2, 0, 1, 4, 0, 1, 0, 1, 1, 4, 0, 2, 0, 1, 1, 4, 0, 1, 0, 4, 1, 4, 0, 2, 0, 4, 1, 4, 0, 1, 0, 128, 1, 4, 0, 2, 0, 128, 65, 4, 0, 0, 19, 206}

// Create a connection to the BMP server
conn, err := net.Dial("tcp", fmt.Sprintf(":%d", bmpPort))
if err != nil {
t.Fatalf("failed to connect to gobmp server with error: %+v", err)
}
defer conn.Close()

// Send message to the BMP server
_, err = conn.Write(input)
if err != nil {
t.Fatalf("failed to send message to gobmp server with error: %+v", err)
}

// Wait for message to be published
sub, err := s.SubscribeSync(streamName + ".>")
if err != nil {
t.Fatalf("failed to subscribe to stream with error: %+v", err)
}

// Wait to receive the message from the publisher
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("failed to pull message from stream with error: %+v", err)
}

// Unmarshal msg.Data (bytes) to message.PeerStateChange
var peerStateChange message.PeerStateChange
err = json.Unmarshal(msg.Data, &peerStateChange)
if err != nil {
t.Fatalf("failed to unmarshal message with error: %+v", err)
}

assert.Equal(t, peerStateChange.Action, "add")
assert.Equal(t, peerStateChange.RouterHash, "4371c52d8d4a6a67a4c438964f61700b")
}

0 comments on commit 7997a5f

Please sign in to comment.