diff --git a/go.mod b/go.mod index 44f3028..9a82403 100644 --- a/go.mod +++ b/go.mod @@ -31,14 +31,17 @@ require ( ) require ( - github.com/jonboulle/clockwork v0.4.0 // indirect github.com/montanaflynn/stats v0.7.1 // indirect - github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/robfig/cron/v3 v3.0.1 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect +) + +require ( + github.com/jonboulle/clockwork v0.4.0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect go.uber.org/atomic v1.10.0 // indirect golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 // indirect ) @@ -66,7 +69,7 @@ require ( github.com/go-faster/errors v0.6.1 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/google/uuid v1.6.0 // indirect + github.com/google/uuid v1.6.0 github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect @@ -80,7 +83,6 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/k-sone/critbitgo v1.4.0 // indirect github.com/kaorimatz/go-mrt v0.0.0-20210326003454-aa11f3646f93 // indirect - github.com/klauspost/compress v1.15.15 github.com/libp2p/go-reuseport v0.2.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect @@ -108,13 +110,14 @@ require ( github.com/subosito/gotenv v1.4.2 // indirect github.com/vishvananda/netlink v1.2.1-beta.2 // indirect github.com/vishvananda/netns v0.0.4 // indirect + go.mongodb.org/mongo-driver v1.17.1 go.opentelemetry.io/otel v1.13.0 // indirect go.opentelemetry.io/otel/trace v1.13.0 // indirect - golang.org/x/crypto v0.22.0 // indirect + golang.org/x/crypto v0.26.0 // indirect golang.org/x/net v0.21.0 // indirect - golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.19.0 // indirect - golang.org/x/text v0.14.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.23.0 + golang.org/x/text v0.17.0 // indirect google.golang.org/genproto v0.0.0-20230222225845-10f96fb3dbec // indirect google.golang.org/grpc v1.53.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/go.sum b/go.sum index ab27c23..b9876ed 100644 --- a/go.sum +++ b/go.sum @@ -391,6 +391,7 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= @@ -536,12 +537,16 @@ github.com/vishvananda/netns v0.0.0-20220913150850-18c4f4234207 h1:nn7SOQy8xCu3i github.com/vishvananda/netns v0.0.0-20220913150850-18c4f4234207/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -550,6 +555,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.mongodb.org/mongo-driver v1.11.1/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8= +go.mongodb.org/mongo-driver v1.17.1 h1:Wic5cJIwJgSpBhe3lx3+/RybR5PiYRMpVFgO7cOHyIM= +go.mongodb.org/mongo-driver v1.17.1/go.mod h1:wwWm/+BuOddhcq3n68LKRmgk2wXzmF6s0SFOa0GINL4= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -586,8 +593,8 @@ golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= -golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= -golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -700,8 +707,8 @@ golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7 h1:ZrnxWX62AgTKOSagEqxvb3ff golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -765,8 +772,8 @@ golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= -golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= +golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -782,8 +789,8 @@ golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/main.go b/main.go index a163970..ec30bfb 100644 --- a/main.go +++ b/main.go @@ -56,6 +56,7 @@ import ( _ "github.com/bwNetFlow/flowpipeline/segments/output/json" _ "github.com/bwNetFlow/flowpipeline/segments/output/kafkaproducer" _ "github.com/bwNetFlow/flowpipeline/segments/output/lumberjack" + _ "github.com/bwNetFlow/flowpipeline/segments/output/mongodb" _ "github.com/bwNetFlow/flowpipeline/segments/output/sqlite" _ "github.com/bwNetFlow/flowpipeline/segments/print/count" diff --git a/segments/output/mongodb/mongo.go b/segments/output/mongodb/mongo.go new file mode 100644 index 0000000..802a161 --- /dev/null +++ b/segments/output/mongodb/mongo.go @@ -0,0 +1,310 @@ +//go:build cgo +// +build cgo + +// Dumps all incoming flow messages to a local mongodb database using a capped collection to limit the used disk space +package mongodb + +import ( + "context" + "errors" + "fmt" + "log" + "net" + "reflect" + "strconv" + "strings" + "sync" + + "github.com/bwNetFlow/flowpipeline/pb" + "github.com/bwNetFlow/flowpipeline/segments" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +type Mongodb struct { + segments.BaseSegment + mongodbUri string + dbCollection *mongo.Collection + fieldTypes []string + fieldNames []string + ringbufferSize int64 + + databaseName string // default flowdata + collectionName string // default ringbuffer + Fields string // optional comma-separated list of fields to export, default is "", meaning all fields + BatchSize int // optional how many flows to hold in memory between INSERTs, default is 1000 +} + +// Every Segment must implement a New method, even if there isn't any config +// it is interested in. +func (segment Mongodb) New(configx map[string]string) segments.Segment { + newsegment := &Mongodb{} + + newsegment, err := fillSegmentWithConfig(newsegment, configx) + if err != nil { + log.Printf("[error] '%s'", err.Error()) + return nil + } + + ctx := context.Background() + + //Test if db connection works + client, err := mongo.Connect(ctx, options.Client().ApplyURI(newsegment.mongodbUri)) + if err == nil { + //test if the connection was acutally sucessful + err = client.Ping(ctx, options.Client().ReadPreference) + } + if err != nil { + log.Printf("[error] mongoDB: Could not open DB connection due to '%s", err.Error()) + return nil + } + db := client.Database(newsegment.databaseName) + + // collection in the mongdo should be capped to limit the used disk space + convertToCappedCollection(db, newsegment) + return newsegment +} + +func (segment *Mongodb) Run(wg *sync.WaitGroup) { + ctx := context.Background() + defer func() { + close(segment.Out) + wg.Done() + }() + + client, err := mongo.Connect(ctx, options.Client().ApplyURI(segment.mongodbUri)) + if err != nil { + log.Panic(err) // this has already been checked in New + } + db := client.Database(segment.databaseName) + segment.dbCollection = db.Collection(segment.collectionName) + + defer client.Disconnect(ctx) + + var unsaved []*pb.EnrichedFlow + + for msg := range segment.In { + unsaved = append(unsaved, msg) + if len(unsaved) >= segment.BatchSize { + err := segment.bulkInsert(unsaved, ctx) + if err != nil { + log.Printf("[error] %s", err) + } + unsaved = []*pb.EnrichedFlow{} + } + segment.Out <- msg + } + segment.bulkInsert(unsaved, ctx) +} + +func fillSegmentWithConfig(newsegment *Mongodb, config map[string]string) (*Mongodb, error) { + if config == nil { + return newsegment, errors.New("missing configuration for segment mongodb") + } + + if config["mongodb_uri"] == "" { + return newsegment, errors.New("mongoDB: mongodb_uri not defined") + } + newsegment.mongodbUri = config["mongodb_uri"] + + if config["database"] == "" { + log.Println("[INFO] mongoDB: no database defined - using default value (flowdata)") + config["database"] = "flowdata" + } + newsegment.databaseName = config["database"] + + if config["collection"] == "" { + log.Println("[INFO] mongoDB: no collection defined - using default value (ringbuffer)") + config["collection"] = "ringbuffer" + } + newsegment.collectionName = config["collection"] + + var ringbufferSize int64 = 10737418240 + if config["max_disk_usage"] == "" { + log.Println("[INFO] mongoDB: no ring buffer size defined - using default value (10GB)") + } else { + size, err := sizeInBytes(config["max_disk_usage"]) + if err == nil { + log.Println("[INFO] mongoDB: setting ring buffer size to " + config["max_disk_usage"]) + ringbufferSize = size + } else { + log.Println("[Warning] mongoDB: failed setting ring buffer size to " + config["max_disk_usage"] + " - using default as fallback (10GB)") + } + } + newsegment.ringbufferSize = ringbufferSize + + newsegment.BatchSize = 1000 + if config["batchsize"] != "" { + if parsedBatchSize, err := strconv.ParseUint(config["batchsize"], 10, 32); err == nil { + if parsedBatchSize == 0 { + return newsegment, errors.New("MongoDO: Batch size 0 is not allowed. Set this in relation to the expected flows per second") + } + newsegment.BatchSize = int(parsedBatchSize) + } else { + log.Println("[error] MongoDO: Could not parse 'batchsize' parameter, using default 1000.") + } + } else { + log.Println("[info] MongoDO: 'batchsize' set to default '1000'.") + } + + // determine field set + if config["fields"] != "" { + protofields := reflect.TypeOf(pb.EnrichedFlow{}) + conffields := strings.Split(config["fields"], ",") + for _, field := range conffields { + protofield, found := protofields.FieldByName(field) + if !found { + return newsegment, errors.New("csv: Field specified in 'fields' does not exist") + } + newsegment.fieldNames = append(newsegment.fieldNames, field) + newsegment.fieldTypes = append(newsegment.fieldTypes, protofield.Type.String()) + } + } else { + protofields := reflect.TypeOf(pb.EnrichedFlow{}) + // +-3 skips over protobuf state, sizeCache and unknownFields + newsegment.fieldNames = make([]string, protofields.NumField()-3) + newsegment.fieldTypes = make([]string, protofields.NumField()-3) + for i := 3; i < protofields.NumField(); i++ { + field := protofields.Field(i) + newsegment.fieldNames[i-3] = field.Name + newsegment.fieldTypes[i-3] = field.Type.String() + } + newsegment.Fields = config["fields"] + } + + return newsegment, nil +} + +func (segment Mongodb) bulkInsert(unsavedFlows []*pb.EnrichedFlow, ctx context.Context) error { + // not using transactions due to limitations of capped collectiction + // ("You cannot write to capped collections in transactions." + // https://www.mongodb.com/docs/manual/core/capped-collections/) + if len(unsavedFlows) == 0 { + return nil + } + unsavedFlowData := bson.A{} + for _, msg := range unsavedFlows { + singleFlowData := bson.M{} + values := reflect.ValueOf(msg).Elem() + for i, fieldname := range segment.fieldNames { + protofield := values.FieldByName(fieldname) + switch segment.fieldTypes[i] { + case "[]uint8": // this is neccessary for proper formatting + ipstring := net.IP(protofield.Interface().([]uint8)).String() + if ipstring == "" { + ipstring = "" + } + singleFlowData[fieldname] = ipstring + case "string": // this is because doing nothing is also much faster than Sprint + singleFlowData[fieldname] = protofield.Interface().(string) + default: + singleFlowData[fieldname] = fmt.Sprint(protofield) + } + } + unsavedFlowData = append(unsavedFlowData, singleFlowData) + } + _, err := segment.dbCollection.InsertMany(ctx, unsavedFlowData) + if err != nil { + log.Println("[error] mongoDB: Failed to insert due to " + err.Error()) + return err + } + return nil +} + +func init() { + segment := &Mongodb{} + segments.RegisterSegment("mongodb", segment) +} + +func sizeInBytes(sizeStr string) (int64, error) { + // Split into number and unit + parts := strings.Fields(sizeStr) + if len(parts) > 2 || len(parts) < 1 { + return 0, fmt.Errorf("invalid size format") + } + + size, err := strconv.ParseInt(parts[0], 10, 64) + if err != nil { + return 0, err + } + + if len(parts) == 1 { + return size, nil + } + + // Calculate bytes if a size was provided + unit := strings.ToUpper(parts[1]) + switch unit { + case "B": + return size, nil + case "KB": + return size * 1024, nil + case "MB": + return size * 1024 * 1024, nil + case "GB": + return size * 1024 * 1024 * 1024, nil + case "TB": + return size * 1024 * 1024 * 1024 * 1024, nil + default: + return 0, fmt.Errorf("unknown unit: %s", unit) + } +} + +/************************************************************************************************ +** Checks if the collection segment.collectionName in the db is a capped collection +** If not it converts it to a capped collection with the size segment.ringbufferSize +*************************************************************************************************/ +func convertToCappedCollection(db *mongo.Database, segment *Mongodb) error { + ctx := context.Background() + + collStats := db.RunCommand(ctx, bson.D{{Key: "collStats", Value: segment.collectionName}}) + + var collInfo struct { + Name string `bson:"name"` + Capped bool `bson:"capped"` + MaxSize int32 `bson:"maxSize"` + Count int64 `bson:"count"` + Size int64 `bson:"size"` + } + + if collStats.Err() != nil { + log.Printf("[Error] Failed to check Collection '%s' due to: '%s'\n", segment.collectionName, collStats.Err().Error()) + return collStats.Err() + } + + if err := collStats.Decode(&collInfo); err != nil { + return fmt.Errorf("failed to decode collection info: %v", err) + } + + if collInfo.Count == 0 { + // Create a new capped collection + cappedOptions := options.CreateCollection().SetCapped(true).SetSizeInBytes(segment.ringbufferSize) + err := db.CreateCollection(ctx, segment.collectionName, cappedOptions) + if err != nil { + return fmt.Errorf("failed to create capped collection: %v", err) + } + + log.Printf("[Debug] Capped collection '%s' created successfully.\n", segment.collectionName) + return nil + } + + if !collInfo.Capped { + log.Printf("[Warning] Collection '%s' is not capped. Starting converting it...\n", segment.collectionName) + db.RunCommand(ctx, bson.D{ + {Key: "convertToCapped", Value: segment.collectionName}, + {Key: "size", Value: segment.ringbufferSize}, + }) + return nil + } + + log.Printf("[INFO] Collection '%s' is already capped.\n", segment.collectionName) + if collInfo.MaxSize != int32(segment.ringbufferSize) { + log.Printf("[Warning] Changing max size of collection '%s' from '%d' to '%d'.\n", segment.collectionName, collInfo.MaxSize, segment.ringbufferSize) + db.RunCommand(ctx, bson.D{ + {Key: "collMod", Value: segment.collectionName}, + {Key: "cappedSize", Value: segment.ringbufferSize}, + }) + } + return nil +} diff --git a/segments/output/mongodb/mongo_test.go b/segments/output/mongodb/mongo_test.go new file mode 100644 index 0000000..8f8c80f --- /dev/null +++ b/segments/output/mongodb/mongo_test.go @@ -0,0 +1,137 @@ +//go:build cgo +// +build cgo + +package mongodb + +import ( + "io" + "log" + "os" + "sync" + "testing" + + "github.com/bwNetFlow/flowpipeline/pb" + // "github.com/bwNetFlow/flowpipeline/segments" +) + +// Mongodb Segment test, passthrough test only +func TestSegment_Mongodb_passthrough(t *testing.T) { + // result := segments.TestSegment("Mongodb", map[string]string{"mongodb_uri": "mongodb://localhost:27017/" , "database":"testing"}, + // &pb.EnrichedFlow{SrcAddr: []byte{192, 168, 88, 142}, DstAddr: []byte{192, 168, 88, 143}, Proto: 45}) + // if result == nil { + // t.Error("Segment Mongodb is not passing through flows.") + // } + segment := Mongodb{}.New(map[string]string{"mongodb_uri": "mongodb://localhost:27017/", "database": "testing"}) + if segment == nil { + t.Skip() + } + + in, out := make(chan *pb.EnrichedFlow), make(chan *pb.EnrichedFlow) + segment.Rewire(in, out) + + wg := &sync.WaitGroup{} + wg.Add(1) + go segment.Run(wg) + in <- &pb.EnrichedFlow{SrcAddr: []byte{192, 168, 88, 1}, DstAddr: []byte{192, 168, 88, 1}, Proto: 1} + <-out + in <- &pb.EnrichedFlow{SrcAddr: []byte{192, 168, 88, 2}, DstAddr: []byte{192, 168, 88, 2}, Proto: 2} + <-out + close(in) + wg.Wait() +} + +// Mongodb Segment benchmark with 1000 samples stored in memory +func BenchmarkMongodb_1000(b *testing.B) { + log.SetOutput(io.Discard) + os.Stdout, _ = os.Open(os.DevNull) + + segment := Mongodb{}.New(map[string]string{"mongodb_uri": "mongodb://localhost:27017/", "database": "testing"}) + if segment == nil { + b.Skip() + } + + in, out := make(chan *pb.EnrichedFlow), make(chan *pb.EnrichedFlow) + segment.Rewire(in, out) + + wg := &sync.WaitGroup{} + wg.Add(1) + go segment.Run(wg) + + for n := 0; n < b.N; n++ { + in <- &pb.EnrichedFlow{SrcAddr: []byte{192, 168, 88, 142}, DstAddr: []byte{192, 168, 88, 143}, Proto: 45} + _ = <-out + } + close(in) +} + +// Mongodb Segment benchmark with 10000 samples stored in memory +func BenchmarkMongodb_10000(b *testing.B) { + log.SetOutput(io.Discard) + os.Stdout, _ = os.Open(os.DevNull) + + segment := Mongodb{}.New(map[string]string{"mongodb_uri": "mongodb://localhost:27017/", "database": "testing", "batchsize": "10000"}) + if segment == nil { + b.Skip() + } + + in, out := make(chan *pb.EnrichedFlow), make(chan *pb.EnrichedFlow) + segment.Rewire(in, out) + + wg := &sync.WaitGroup{} + wg.Add(1) + go segment.Run(wg) + + for n := 0; n < b.N; n++ { + in <- &pb.EnrichedFlow{SrcAddr: []byte{192, 168, 88, 142}, DstAddr: []byte{192, 168, 88, 143}, Proto: 45} + _ = <-out + } + close(in) +} + +// Mongodb Segment benchmark with 10000 samples stored in memory +func BenchmarkMongodb_100000(b *testing.B) { + log.SetOutput(io.Discard) + os.Stdout, _ = os.Open(os.DevNull) + + segment := Mongodb{}.New(map[string]string{"mongodb_uri": "mongodb://localhost:27017/", "database": "testing", "batchsize": "100000"}) + if segment == nil { + b.Skip() + } + + in, out := make(chan *pb.EnrichedFlow), make(chan *pb.EnrichedFlow) + segment.Rewire(in, out) + + wg := &sync.WaitGroup{} + wg.Add(1) + go segment.Run(wg) + + for n := 0; n < b.N; n++ { + in <- &pb.EnrichedFlow{SrcAddr: []byte{192, 168, 88, 142}, DstAddr: []byte{192, 168, 88, 143}, Proto: 45} + _ = <-out + } + close(in) +} + +// Mongodb Segment benchmark with 10000 samples stored in memory +func BenchmarkMongodb_100000_with_storage_limit(b *testing.B) { + log.SetOutput(io.Discard) + os.Stdout, _ = os.Open(os.DevNull) + + segment := Mongodb{}.New(map[string]string{"mongodb_uri": "mongodb://localhost:27017/", "database": "testing", "batchsize": "100000", "max_disk_usage": "100 MB"}) + if segment == nil { + b.Skip() + } + + in, out := make(chan *pb.EnrichedFlow), make(chan *pb.EnrichedFlow) + segment.Rewire(in, out) + + wg := &sync.WaitGroup{} + wg.Add(1) + go segment.Run(wg) + + for n := 0; n < b.N; n++ { + in <- &pb.EnrichedFlow{SrcAddr: []byte{192, 168, 88, 142}, DstAddr: []byte{192, 168, 88, 143}, Proto: 45} + _ = <-out + } + close(in) +}