From a1a4dc84f07bd9d6971578d9bebbf48ceaa99b9e Mon Sep 17 00:00:00 2001 From: Karim Radhouani Date: Mon, 16 Oct 2023 18:49:37 -0700 Subject: [PATCH] autodetect if kafka input is receiving an event or a list of events --- inputs/kafka_input/kafka_input.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/inputs/kafka_input/kafka_input.go b/inputs/kafka_input/kafka_input.go index 8f06c2a8..5d717870 100644 --- a/inputs/kafka_input/kafka_input.go +++ b/inputs/kafka_input/kafka_input.go @@ -9,6 +9,7 @@ package kafka_input import ( + "bytes" "context" "encoding/json" "errors" @@ -22,12 +23,13 @@ import ( "github.com/Shopify/sarama" "github.com/damiannolan/sasl/oauthbearer" "github.com/google/uuid" + "google.golang.org/protobuf/proto" + "github.com/openconfig/gnmic/formatters" "github.com/openconfig/gnmic/inputs" "github.com/openconfig/gnmic/outputs" "github.com/openconfig/gnmic/types" "github.com/openconfig/gnmic/utils" - "google.golang.org/protobuf/proto" ) const ( @@ -44,6 +46,9 @@ const ( var defaultVersion = sarama.V2_5_0_0 +var openSquareBracket = []byte("[") +var openCurlyBrace = []byte("{") + func init() { inputs.Register("kafka", func() inputs.Input { return &KafkaInput{ @@ -162,8 +167,16 @@ START: } switch k.Cfg.Format { case "event": + m.Value = bytes.TrimSpace(m.Value) evMsgs := make([]*formatters.EventMsg, 1) - err = json.Unmarshal(m.Value, &evMsgs) + switch { + case len(m.Value) == 0: + continue + case m.Value[0] == openSquareBracket[0]: + err = json.Unmarshal(m.Value, &evMsgs) + case m.Value[0] == openCurlyBrace[0]: + err = json.Unmarshal(m.Value, evMsgs[0]) + } if err != nil { if k.Cfg.Debug { k.logger.Printf("%s failed to unmarshal event msg: %v", workerLogPrefix, err)