From f25d26ee30e6dfe22e4bebd05022faf164545efe Mon Sep 17 00:00:00 2001 From: prdpx7 Date: Wed, 25 Dec 2024 17:47:16 +0530 Subject: [PATCH 1/2] Fix: Set current time instead of epoch 0(1970-01-01T00:00:00) in kafka message metadata --- output_kafka.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/output_kafka.go b/output_kafka.go index faa50519..0e1d4f82 100644 --- a/output_kafka.go +++ b/output_kafka.go @@ -2,12 +2,13 @@ package goreplay import ( "encoding/json" - "github.com/buger/goreplay/internal/byteutils" - "github.com/buger/goreplay/proto" "log" "strings" "time" + "github.com/buger/goreplay/internal/byteutils" + "github.com/buger/goreplay/proto" + "github.com/Shopify/sarama" "github.com/Shopify/sarama/mocks" ) @@ -91,8 +92,9 @@ func (o *KafkaOutput) PluginWrite(msg *Message) (n int, err error) { } o.producer.Input() <- &sarama.ProducerMessage{ - Topic: o.config.Topic, - Value: message, + Topic: o.config.Topic, + Value: message, + Timestamp: time.Now(), } return len(message), nil From f41e041ac6ad1f93a8c2617f4d9bb26d960b88ee Mon Sep 17 00:00:00 2001 From: prdpx7 Date: Mon, 6 Jan 2025 16:03:57 +0530 Subject: [PATCH 2/2] Added ReqID as Kafka's record key --- kafka.go | 2 +- output_kafka.go | 8 +++++--- output_kafka_test.go | 12 ++++++++++++ 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/kafka.go b/kafka.go index ca0e83ba..9bd5a408 100644 --- a/kafka.go +++ b/kafka.go @@ -31,7 +31,7 @@ type InputKafkaConfig struct { Host string `json:"input-kafka-host"` Topic string `json:"input-kafka-topic"` UseJSON bool `json:"input-kafka-json-format"` - Offset string `json:"input-kafka-offset"` + Offset string `json:"input-kafka-offset"` SASLConfig SASLKafkaConfig } diff --git a/output_kafka.go b/output_kafka.go index 0e1d4f82..86ef27c1 100644 --- a/output_kafka.go +++ b/output_kafka.go @@ -65,6 +65,8 @@ func (o *KafkaOutput) ErrorHandler() { // PluginWrite writes a message to this plugin func (o *KafkaOutput) PluginWrite(msg *Message) (n int, err error) { var message sarama.StringEncoder + meta := payloadMeta(msg.Meta) + recordKey := byteutils.SliceToString(meta[1]) if !o.config.UseJSON { message = sarama.StringEncoder(byteutils.SliceToString(msg.Meta) + byteutils.SliceToString(msg.Data)) @@ -74,10 +76,7 @@ func (o *KafkaOutput) PluginWrite(msg *Message) (n int, err error) { for k, v := range mimeHeader { header[k] = strings.Join(v, ", ") } - - meta := payloadMeta(msg.Meta) req := msg.Data - kafkaMessage := KafkaMessage{ ReqURL: byteutils.SliceToString(proto.Path(req)), ReqType: byteutils.SliceToString(meta[0]), @@ -95,6 +94,9 @@ func (o *KafkaOutput) PluginWrite(msg *Message) (n int, err error) { Topic: o.config.Topic, Value: message, Timestamp: time.Now(), + // + // Making ReqID as Kafka record's key + Key: sarama.StringEncoder(recordKey), } return len(message), nil diff --git a/output_kafka_test.go b/output_kafka_test.go index b4481611..306663ad 100644 --- a/output_kafka_test.go +++ b/output_kafka_test.go @@ -23,6 +23,12 @@ func TestOutputKafkaRAW(t *testing.T) { resp := <-producer.Successes() + key, _ := resp.Key.Encode() + + if string(key) != "2" { + t.Errorf("Key not properly encoded: %q", key) + } + data, _ := resp.Value.Encode() if string(data) != "1 2 3\nGET / HTTP1.1\r\nHeader: 1\r\n\r\n" { @@ -46,6 +52,12 @@ func TestOutputKafkaJSON(t *testing.T) { resp := <-producer.Successes() + key, _ := resp.Key.Encode() + + if string(key) != "2" { + t.Errorf("Key not properly encoded: %q", key) + } + data, _ := resp.Value.Encode() if string(data) != `{"Req_URL":"","Req_Type":"1","Req_ID":"2","Req_Ts":"3","Req_Method":"GET"}` {