diff --git a/kafka.go b/kafka.go index ca0e83ba..9bd5a408 100644 --- a/kafka.go +++ b/kafka.go @@ -31,7 +31,7 @@ type InputKafkaConfig struct { Host string `json:"input-kafka-host"` Topic string `json:"input-kafka-topic"` UseJSON bool `json:"input-kafka-json-format"` - Offset string `json:"input-kafka-offset"` + Offset string `json:"input-kafka-offset"` SASLConfig SASLKafkaConfig } diff --git a/output_kafka.go b/output_kafka.go index faa50519..86ef27c1 100644 --- a/output_kafka.go +++ b/output_kafka.go @@ -2,12 +2,13 @@ package goreplay import ( "encoding/json" - "github.com/buger/goreplay/internal/byteutils" - "github.com/buger/goreplay/proto" "log" "strings" "time" + "github.com/buger/goreplay/internal/byteutils" + "github.com/buger/goreplay/proto" + "github.com/Shopify/sarama" "github.com/Shopify/sarama/mocks" ) @@ -64,6 +65,8 @@ func (o *KafkaOutput) ErrorHandler() { // PluginWrite writes a message to this plugin func (o *KafkaOutput) PluginWrite(msg *Message) (n int, err error) { var message sarama.StringEncoder + meta := payloadMeta(msg.Meta) + recordKey := byteutils.SliceToString(meta[1]) if !o.config.UseJSON { message = sarama.StringEncoder(byteutils.SliceToString(msg.Meta) + byteutils.SliceToString(msg.Data)) @@ -73,10 +76,7 @@ func (o *KafkaOutput) PluginWrite(msg *Message) (n int, err error) { for k, v := range mimeHeader { header[k] = strings.Join(v, ", ") } - - meta := payloadMeta(msg.Meta) req := msg.Data - kafkaMessage := KafkaMessage{ ReqURL: byteutils.SliceToString(proto.Path(req)), ReqType: byteutils.SliceToString(meta[0]), @@ -91,8 +91,12 @@ func (o *KafkaOutput) PluginWrite(msg *Message) (n int, err error) { } o.producer.Input() <- &sarama.ProducerMessage{ - Topic: o.config.Topic, - Value: message, + Topic: o.config.Topic, + Value: message, + Timestamp: time.Now(), + // + // Making ReqID as Kafka record's key + Key: sarama.StringEncoder(recordKey), } return len(message), nil diff --git a/output_kafka_test.go b/output_kafka_test.go index b4481611..306663ad 100644 --- a/output_kafka_test.go +++ b/output_kafka_test.go @@ -23,6 +23,12 @@ func TestOutputKafkaRAW(t *testing.T) { resp := <-producer.Successes() + key, _ := resp.Key.Encode() + + if string(key) != "2" { + t.Errorf("Key not properly encoded: %q", key) + } + data, _ := resp.Value.Encode() if string(data) != "1 2 3\nGET / HTTP1.1\r\nHeader: 1\r\n\r\n" { @@ -46,6 +52,12 @@ func TestOutputKafkaJSON(t *testing.T) { resp := <-producer.Successes() + key, _ := resp.Key.Encode() + + if string(key) != "2" { + t.Errorf("Key not properly encoded: %q", key) + } + data, _ := resp.Value.Encode() if string(data) != `{"Req_URL":"","Req_Type":"1","Req_ID":"2","Req_Ts":"3","Req_Method":"GET"}` {