From c72108e862c63b553953bbed8b55af368a8fb28f Mon Sep 17 00:00:00 2001 From: Mark Gregan Date: Tue, 21 Dec 2021 10:49:18 -0500 Subject: [PATCH] Handle kafka headers containing binary values --- backends/kafka/kafka.go | 11 +++- backends/kafka/kafka_test.go | 118 ++++++++++++++++++++--------------- relay/kafka.go | 12 +++- relay/kafka_test.go | 26 ++++++++ relay/relay_suite_test.go | 13 ++++ 5 files changed, 128 insertions(+), 52 deletions(-) create mode 100644 relay/kafka_test.go create mode 100644 relay/relay_suite_test.go diff --git a/backends/kafka/kafka.go b/backends/kafka/kafka.go index 33afed31..4635646f 100644 --- a/backends/kafka/kafka.go +++ b/backends/kafka/kafka.go @@ -7,10 +7,12 @@ package kafka import ( "context" "crypto/tls" + "encoding/base64" "fmt" "strings" "syscall" "time" + "unicode/utf8" "github.com/pkg/errors" skafka "github.com/segmentio/kafka-go" @@ -267,9 +269,16 @@ func convertKafkaHeadersToProto(original []skafka.Header) []*records.KafkaHeader converted := make([]*records.KafkaHeader, 0) for _, o := range original { + v := string(o.Value) + + // gRPC will fail the call if the value isn't valid utf-8 + if !utf8.ValidString(v) { + v = base64.StdEncoding.EncodeToString(o.Value) + } + converted = append(converted, &records.KafkaHeader{ Key: o.Key, - Value: string(o.Value), + Value: v, }) } diff --git a/backends/kafka/kafka_test.go b/backends/kafka/kafka_test.go index 519f83eb..b5e903c8 100644 --- a/backends/kafka/kafka_test.go +++ b/backends/kafka/kafka_test.go @@ -1,52 +1,70 @@ package kafka -//import ( -// . "github.com/onsi/ginkgo" -// . "github.com/onsi/gomega" -// -// "github.com/batchcorp/plumber/options" -//) -// -//var _ = Describe("Kafka", func() { -// Context("getAuthenticationMechanism", func() { -// It("Returns nil when no username/password is specified", func() { -// opts := &options.Options{Kafka: &options.KafkaOptions{ -// Username: "", -// Password: "", -// }} -// -// m, err := getAuthenticationMechanism(opts) -// -// Expect(err).ToNot(HaveOccurred()) -// Expect(m).To(BeNil()) -// }) -// -// It("Returns SCRAM mechanism", func() { -// opts := &options.Options{Kafka: &options.KafkaOptions{ -// Username: "testing", -// Password: "hunter2", -// AuthenticationType: "scram", -// }} -// -// m, err := getAuthenticationMechanism(opts) -// -// Expect(err).ToNot(HaveOccurred()) -// Expect(m).ToNot(BeNil()) -// Expect(m.Name()).To(Equal("SCRAM-SHA-512")) -// }) -// -// It("Returns Plain mechanism", func() { -// opts := &options.Options{Kafka: &options.KafkaOptions{ -// Username: "testing", -// Password: "hunter2", -// AuthenticationType: "plain", -// }} -// -// m, err := getAuthenticationMechanism(opts) -// -// Expect(err).ToNot(HaveOccurred()) -// Expect(m).ToNot(BeNil()) -// Expect(m.Name()).To(Equal("PLAIN")) -// }) -// }) -//}) +import ( + "encoding/binary" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + skafka "github.com/segmentio/kafka-go" + + "github.com/batchcorp/plumber-schemas/build/go/protos/args" +) + +var _ = Describe("Kafka", func() { + Context("getAuthenticationMechanism", func() { + It("Returns nil when no username/password is specified", func() { + opts := &args.KafkaConn{ + SaslUsername: "", + SaslPassword: "", + } + + m, err := getAuthenticationMechanism(opts) + + Expect(err).ToNot(HaveOccurred()) + Expect(m).To(BeNil()) + }) + + It("Returns SCRAM mechanism", func() { + opts := &args.KafkaConn{ + SaslUsername: "testing", + SaslPassword: "hunter2", + SaslType: args.SASLType_SCRAM, + } + + m, err := getAuthenticationMechanism(opts) + + Expect(err).ToNot(HaveOccurred()) + Expect(m).ToNot(BeNil()) + Expect(m.Name()).To(Equal("SCRAM-SHA-512")) + }) + + It("Returns Plain mechanism", func() { + opts := &args.KafkaConn{ + SaslUsername: "testing", + SaslPassword: "hunter2", + SaslType: args.SASLType_PLAIN, + } + + m, err := getAuthenticationMechanism(opts) + + Expect(err).ToNot(HaveOccurred()) + Expect(m).ToNot(BeNil()) + Expect(m.Name()).To(Equal("PLAIN")) + }) + }) + + Context("convertKafkaHeaders", func() { + It("handles values which cannot be converted to utf8 string", func() { + var ts [8]byte + binary.LittleEndian.PutUint64(ts[:], uint64(1640101168)) + + headers := []skafka.Header{ + {Key: "key", Value: ts[:]}, + } + + converted := convertKafkaHeadersToProto(headers) + Expect(len(converted)).To(Equal(1)) + Expect(converted[0].Value).To(Equal("MPXBYQAAAAA=")) + }) + }) +}) diff --git a/relay/kafka.go b/relay/kafka.go index 3c87dddf..293ad6f3 100644 --- a/relay/kafka.go +++ b/relay/kafka.go @@ -2,8 +2,10 @@ package relay import ( "context" + "encoding/base64" "fmt" "time" + "unicode/utf8" "github.com/pkg/errors" skafka "github.com/segmentio/kafka-go" @@ -87,9 +89,17 @@ func convertKafkaHeaders(kafkaHeaders []skafka.Header) []*records.KafkaHeader { sinkRecordHeaders := make([]*records.KafkaHeader, 0) for _, h := range kafkaHeaders { + v := string(h.Value) + + // gRPC will fail the call if the value isn't valid utf-8 + // TODO: ship original header value so they can be sent back correctly in a replay + if !utf8.ValidString(v) { + v = base64.StdEncoding.EncodeToString(h.Value) + } + sinkRecordHeaders = append(sinkRecordHeaders, &records.KafkaHeader{ Key: h.Key, - Value: string(h.Value), + Value: v, }) } diff --git a/relay/kafka_test.go b/relay/kafka_test.go new file mode 100644 index 00000000..5422a275 --- /dev/null +++ b/relay/kafka_test.go @@ -0,0 +1,26 @@ +package relay + +import ( + "encoding/binary" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + skafka "github.com/segmentio/kafka-go" +) + +var _ = Describe("Relay", func() { + Context("convertKafkaHeaders", func() { + It("handles values which cannot be converted to utf8 string", func() { + var ts [8]byte + binary.LittleEndian.PutUint64(ts[:], uint64(1640101168)) + + headers := []skafka.Header{ + {Key: "key", Value: ts[:]}, + } + + converted := convertKafkaHeaders(headers) + Expect(len(converted)).To(Equal(1)) + Expect(converted[0].Value).To(Equal("MPXBYQAAAAA=")) + }) + }) +}) diff --git a/relay/relay_suite_test.go b/relay/relay_suite_test.go new file mode 100644 index 00000000..61ff629e --- /dev/null +++ b/relay/relay_suite_test.go @@ -0,0 +1,13 @@ +package relay_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestRelay(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Relay Suite") +}