Skip to content

Commit

Permalink
Merge pull request #214 from batchcorp/blinktag/handle_binary_kafka_h…
Browse files Browse the repository at this point in the history
…eaders

Handle kafka headers containing binary values
  • Loading branch information
blinktag committed Dec 30, 2021
2 parents 8b13415 + c72108e commit dd94f91
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 52 deletions.
11 changes: 10 additions & 1 deletion backends/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ package kafka
import (
"context"
"crypto/tls"
"encoding/base64"
"fmt"
"strings"
"syscall"
"time"
"unicode/utf8"

"github.com/pkg/errors"
skafka "github.com/segmentio/kafka-go"
Expand Down Expand Up @@ -267,9 +269,16 @@ func convertKafkaHeadersToProto(original []skafka.Header) []*records.KafkaHeader
converted := make([]*records.KafkaHeader, 0)

for _, o := range original {
v := string(o.Value)

// gRPC will fail the call if the value isn't valid utf-8
if !utf8.ValidString(v) {
v = base64.StdEncoding.EncodeToString(o.Value)
}

converted = append(converted, &records.KafkaHeader{
Key: o.Key,
Value: string(o.Value),
Value: v,
})
}

Expand Down
118 changes: 68 additions & 50 deletions backends/kafka/kafka_test.go
Original file line number Diff line number Diff line change
@@ -1,52 +1,70 @@
package kafka

//import (
// . "github.com/onsi/ginkgo"
// . "github.com/onsi/gomega"
//
// "github.com/batchcorp/plumber/options"
//)
//
//var _ = Describe("Kafka", func() {
// Context("getAuthenticationMechanism", func() {
// It("Returns nil when no username/password is specified", func() {
// opts := &options.Options{Kafka: &options.KafkaOptions{
// Username: "",
// Password: "",
// }}
//
// m, err := getAuthenticationMechanism(opts)
//
// Expect(err).ToNot(HaveOccurred())
// Expect(m).To(BeNil())
// })
//
// It("Returns SCRAM mechanism", func() {
// opts := &options.Options{Kafka: &options.KafkaOptions{
// Username: "testing",
// Password: "hunter2",
// AuthenticationType: "scram",
// }}
//
// m, err := getAuthenticationMechanism(opts)
//
// Expect(err).ToNot(HaveOccurred())
// Expect(m).ToNot(BeNil())
// Expect(m.Name()).To(Equal("SCRAM-SHA-512"))
// })
//
// It("Returns Plain mechanism", func() {
// opts := &options.Options{Kafka: &options.KafkaOptions{
// Username: "testing",
// Password: "hunter2",
// AuthenticationType: "plain",
// }}
//
// m, err := getAuthenticationMechanism(opts)
//
// Expect(err).ToNot(HaveOccurred())
// Expect(m).ToNot(BeNil())
// Expect(m.Name()).To(Equal("PLAIN"))
// })
// })
//})
import (
"encoding/binary"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
skafka "github.com/segmentio/kafka-go"

"github.com/batchcorp/plumber-schemas/build/go/protos/args"
)

var _ = Describe("Kafka", func() {
Context("getAuthenticationMechanism", func() {
It("Returns nil when no username/password is specified", func() {
opts := &args.KafkaConn{
SaslUsername: "",
SaslPassword: "",
}

m, err := getAuthenticationMechanism(opts)

Expect(err).ToNot(HaveOccurred())
Expect(m).To(BeNil())
})

It("Returns SCRAM mechanism", func() {
opts := &args.KafkaConn{
SaslUsername: "testing",
SaslPassword: "hunter2",
SaslType: args.SASLType_SCRAM,
}

m, err := getAuthenticationMechanism(opts)

Expect(err).ToNot(HaveOccurred())
Expect(m).ToNot(BeNil())
Expect(m.Name()).To(Equal("SCRAM-SHA-512"))
})

It("Returns Plain mechanism", func() {
opts := &args.KafkaConn{
SaslUsername: "testing",
SaslPassword: "hunter2",
SaslType: args.SASLType_PLAIN,
}

m, err := getAuthenticationMechanism(opts)

Expect(err).ToNot(HaveOccurred())
Expect(m).ToNot(BeNil())
Expect(m.Name()).To(Equal("PLAIN"))
})
})

Context("convertKafkaHeaders", func() {
It("handles values which cannot be converted to utf8 string", func() {
var ts [8]byte
binary.LittleEndian.PutUint64(ts[:], uint64(1640101168))

headers := []skafka.Header{
{Key: "key", Value: ts[:]},
}

converted := convertKafkaHeadersToProto(headers)
Expect(len(converted)).To(Equal(1))
Expect(converted[0].Value).To(Equal("MPXBYQAAAAA="))
})
})
})
12 changes: 11 additions & 1 deletion relay/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package relay

import (
"context"
"encoding/base64"
"fmt"
"time"
"unicode/utf8"

"github.com/pkg/errors"
skafka "github.com/segmentio/kafka-go"
Expand Down Expand Up @@ -87,9 +89,17 @@ func convertKafkaHeaders(kafkaHeaders []skafka.Header) []*records.KafkaHeader {
sinkRecordHeaders := make([]*records.KafkaHeader, 0)

for _, h := range kafkaHeaders {
v := string(h.Value)

// gRPC will fail the call if the value isn't valid utf-8
// TODO: ship original header value so they can be sent back correctly in a replay
if !utf8.ValidString(v) {
v = base64.StdEncoding.EncodeToString(h.Value)
}

sinkRecordHeaders = append(sinkRecordHeaders, &records.KafkaHeader{
Key: h.Key,
Value: string(h.Value),
Value: v,
})
}

Expand Down
26 changes: 26 additions & 0 deletions relay/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package relay

import (
"encoding/binary"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
skafka "github.com/segmentio/kafka-go"
)

var _ = Describe("Relay", func() {
Context("convertKafkaHeaders", func() {
It("handles values which cannot be converted to utf8 string", func() {
var ts [8]byte
binary.LittleEndian.PutUint64(ts[:], uint64(1640101168))

headers := []skafka.Header{
{Key: "key", Value: ts[:]},
}

converted := convertKafkaHeaders(headers)
Expect(len(converted)).To(Equal(1))
Expect(converted[0].Value).To(Equal("MPXBYQAAAAA="))
})
})
})
13 changes: 13 additions & 0 deletions relay/relay_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package relay_test

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestRelay(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Relay Suite")
}

0 comments on commit dd94f91

Please sign in to comment.