Skip to content

Commit

Permalink
feat(kafka): add authentication and authorization support
Browse files Browse the repository at this point in the history
  • Loading branch information
ravisuhag committed Oct 31, 2024
1 parent f2b32ed commit ef59621
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 76 deletions.
112 changes: 59 additions & 53 deletions plugins/extractors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ type AuthConfig struct {
// certificate authority file for TLS client authentication
CAFile string `mapstructure:"ca_file"`
} `mapstructure:"tls"`

SASL struct {
Enabled bool `mapstructure:"enabled"`
Mechanism string `mapstructure:"mechanism"`
}
}

var sampleConfig = `broker: "localhost:9092"`
Expand All @@ -74,7 +79,7 @@ var info = plugins.Info{
type Extractor struct {
plugins.BaseExtractor
// internal states
conn *kafka.Conn
conn sarama.Consumer
logger log.Logger
config Config
clientDurn metric.Int64Histogram
Expand Down Expand Up @@ -104,69 +109,91 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
return err
}

// create default dialer
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
}
consumerConfig := sarama.NewConfig()

if e.config.Auth.TLS.Enabled {
tlsConfig, err := e.createTLSConfig()
if err != nil {
return fmt.Errorf("create tls config: %w", err)
}

dialer.TLS = tlsConfig
consumerConfig.Net.TLS.Enable = true
consumerConfig.Net.TLS.Config = tlsConfig

if e.config.Auth.SASL.Enabled {
consumerConfig.Net.SASL.Enable = true
if e.config.Auth.SASL.Mechanism == sarama.SASLTypeOAuth {
consumerConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
consumerConfig.Net.SASL.TokenProvider = NewKubernetesTokenProvider()
}
}

// create connection
e.conn, err = dialer.DialContext(ctx, "tcp", e.config.Broker)
consumer, err := sarama.NewConsumer([]string{e.config.Broker}, consumerConfig)
if err != nil {
return fmt.Errorf("create connection: %w", err)
fmt.Printf("Error is here !! %s", err.Error())
return fmt.Errorf("failed to create kafka consumer for brokers %s and config %+v. Error %s", e.config.Broker,
consumerConfig, err.Error())
}

e.conn = consumer
return nil
}

// Extract checks if the extractor is ready to extract
// if so, then extracts metadata from the kafka broker
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) {

Check failure on line 142 in plugins/extractors/kafka/kafka.go

View workflow job for this annotation

GitHub Actions / lint

missing ',' in argument list (typecheck)

Check failure on line 142 in plugins/extractors/kafka/kafka.go

View workflow job for this annotation

GitHub Actions / build

syntax error: unexpected name context in argument list; possibly missing comma or )
defer e.conn.Close()

Check failure on line 143 in plugins/extractors/kafka/kafka.go

View workflow job for this annotation

GitHub Actions / lint

missing ',' in argument list (typecheck)

partitions, err := e.readPartitions(ctx)
if err != nil {
return fmt.Errorf("fetch partitions: %w", err)
}
defer func(start time.Time) {

Check failure on line 145 in plugins/extractors/kafka/kafka.go

View workflow job for this annotation

GitHub Actions / lint

expected operand, found 'defer' (typecheck)
attributes := []attribute.KeyValue{
attribute.String("kafka.broker", e.config.Broker),
attribute.Bool("success", err == nil),
}
if err != nil {
errorCode := "UNKNOWN"
var kErr kafka.Error
if errors.As(err, &kErr) {
errorCode = strings.ReplaceAll(
strings.ToUpper(kErr.Title()), " ", "_",
)
}
attributes = append(attributes, attribute.String("kafka.error_code", errorCode))
}

// collect topic list from partition list
topics := map[string]int{}
for _, p := range partitions {
topics[p.Topic]++
e.clientDurn.Record(
ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attributes...),
)
}(time.Now())

Check failure on line 164 in plugins/extractors/kafka/kafka.go

View workflow job for this annotation

GitHub Actions / lint

missing ',' before newline in argument list (typecheck)
topics, err := e.conn.Topics()

Check failure on line 165 in plugins/extractors/kafka/kafka.go

View workflow job for this annotation

GitHub Actions / lint

missing ',' in argument list (typecheck)
if err != nil {

Check failure on line 166 in plugins/extractors/kafka/kafka.go

View workflow job for this annotation

GitHub Actions / lint

expected operand, found 'if' (typecheck)
return fmt.Errorf("fetch topics: %w", err)

Check failure on line 167 in plugins/extractors/kafka/kafka.go

View workflow job for this annotation

GitHub Actions / lint

expected operand, found 'return' (typecheck)
}

Check failure on line 168 in plugins/extractors/kafka/kafka.go

View workflow job for this annotation

GitHub Actions / lint

missing ',' before newline in argument list (typecheck)

// build and push topics
for topic, numOfPartitions := range topics {
for _, topic := range topics {

Check failure on line 171 in plugins/extractors/kafka/kafka.go

View workflow job for this annotation

GitHub Actions / lint

expected operand, found 'for' (typecheck)
// skip if topic is a default topic
_, isDefaultTopic := defaultTopics[topic]
if isDefaultTopic {
continue

Check failure on line 175 in plugins/extractors/kafka/kafka.go

View workflow job for this annotation

GitHub Actions / lint

expected operand, found 'continue' (typecheck)
}

asset, err := e.buildAsset(topic, numOfPartitions)
partitions, err := e.conn.Partitions(topic)
if err != nil {
e.logger.Error("failed to fetch partitions for topic", "err", err, "topic", topic)
continue
}
asset, err := e.buildAsset(topic, len(partitions))
if err != nil {
e.logger.Error("failed to build asset", "err", err, "topic", topic)
continue
}
emit(models.NewRecord(asset))
}

return nil
}

func (e *Extractor) createTLSConfig() (*tls.Config, error) {
authConfig := e.config.Auth.TLS

if authConfig.CertFile == "" || authConfig.KeyFile == "" || authConfig.CAFile == "" {
if authConfig.CAFile == "" {
//nolint:gosec
return &tls.Config{
InsecureSkipVerify: e.config.Auth.TLS.InsecureSkipVerify,
Expand All @@ -178,9 +205,13 @@ func (e *Extractor) createTLSConfig() (*tls.Config, error) {
return nil, fmt.Errorf("create cert: %w", err)
}

caCert, err := os.ReadFile(authConfig.CAFile)
if err != nil {
return nil, fmt.Errorf("read ca cert file: %w", err)
var cert tls.Certificate
var err error
if authConfig.CertFile != "" && authConfig.KeyFile != "" {
cert, err = tls.LoadX509KeyPair(authConfig.CertFile, authConfig.KeyFile)
if err != nil {
return nil, fmt.Errorf("create cert: %w", err)
}
}

caCertPool := x509.NewCertPool()
Expand Down Expand Up @@ -215,31 +246,6 @@ func (e *Extractor) buildAsset(topicName string, numOfPartitions int) (*v1beta2.
}, nil
}

func (e *Extractor) readPartitions(ctx context.Context) (partitions []kafka.Partition, err error) {
defer func(start time.Time) {
attributes := []attribute.KeyValue{
attribute.String("kafka.broker", e.config.Broker),
attribute.Bool("success", err == nil),
}
if err != nil {
errorCode := "UNKNOWN"
var kErr kafka.Error
if errors.As(err, &kErr) {
errorCode = strings.ReplaceAll(
strings.ToUpper(kErr.Title()), " ", "_",
)
}
attributes = append(attributes, attribute.String("kafka.error_code", errorCode))
}

e.clientDurn.Record(
ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attributes...),
)
}(time.Now())

return e.conn.ReadPartitions()
}

func init() {
if err := registry.Extractors.Register("kafka", func() plugins.Extractor {
return New(plugins.GetLog())
Expand Down
47 changes: 24 additions & 23 deletions plugins/extractors/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ package kafka_test
import (
"context"
"errors"
"fmt"
"log"
"net"
"os"
"strconv"
"testing"
"time"

kafkaLib "github.com/IBM/sarama"

"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
Expand All @@ -27,13 +29,12 @@ import (
)

var (
brokerHost = "localhost:9093"
brokerHost = "0.0.0.0:9093"
urnScope = "test-kafka"
)

func TestMain(m *testing.M) {
var conn *kafkaLib.Conn
var broker kafkaLib.Broker
var broker *kafkaLib.Broker

// setup test
opts := dockertest.RunOptions{
Expand All @@ -49,25 +50,23 @@ func TestMain(m *testing.M) {
},
},
}

retryFn := func(resource *dockertest.Resource) (err error) {
// create client
conn, err = kafkaLib.Dial("tcp", brokerHost)
time.Sleep(30 * time.Second)
conn, err := kafkaLib.NewClient([]string{brokerHost}, nil)
if err != nil {
return
}

// healthcheck
brokerList, err := conn.Brokers()
if err != nil {
return
}
if len(brokerList) == 0 {
if len(conn.Brokers()) == 0 {
err = errors.New("not ready")
return
}

broker, err = conn.Controller()
if err != nil {
fmt.Printf("error fetching controller %s", err.Error())
conn.Close()
return
}
Expand Down Expand Up @@ -163,7 +162,7 @@ func TestInit(t *testing.T) {
},
})

assert.ErrorContains(t, err, "create connection")
assert.ErrorContains(t, err, "failed to create kafka consumer")
})
}

Expand Down Expand Up @@ -226,24 +225,26 @@ func TestExtract(t *testing.T) {
})
}

func setup(broker kafkaLib.Broker) (err error) {
// create broker connection to create topics
var conn *kafkaLib.Conn
conn, err = kafkaLib.Dial("tcp", net.JoinHostPort(broker.Host, strconv.Itoa(broker.Port)))
func setup(broker *kafkaLib.Broker) (err error) {
// create client connection to create topics
conn, err := kafkaLib.NewClient([]string{brokerHost}, nil)
if err != nil {
fmt.Printf("error creating client ")
return
}
defer conn.Close()

// create topics
topicConfigs := []kafkaLib.TopicConfig{
{Topic: "meteor-test-topic-1", NumPartitions: 1, ReplicationFactor: 1},
{Topic: "meteor-test-topic-2", NumPartitions: 1, ReplicationFactor: 1},
{Topic: "meteor-test-topic-3", NumPartitions: 1, ReplicationFactor: 1},
{Topic: "__consumer_offsets", NumPartitions: 1, ReplicationFactor: 1},
topicConfigs := map[string]*kafkaLib.TopicDetail{
"meteor-test-topic-1": {NumPartitions: 1, ReplicationFactor: 1},
"meteor-test-topic-2": {NumPartitions: 1, ReplicationFactor: 1},
"meteor-test-topic-3": {NumPartitions: 1, ReplicationFactor: 1},
"__consumer_offsets": {NumPartitions: 1, ReplicationFactor: 1},
}
err = conn.CreateTopics(topicConfigs...)
createTopicRequest := &kafkaLib.CreateTopicsRequest{TopicDetails: topicConfigs}
_, err = broker.CreateTopics(createTopicRequest)
if err != nil {
fmt.Printf("error creating topics! %s", err.Error())
return
}

Expand Down
67 changes: 67 additions & 0 deletions plugins/extractors/kafka/kubernetes_token_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package kafka

import (
"fmt"
"os"
"strings"

"github.com/IBM/sarama"

Check failure on line 8 in plugins/extractors/kafka/kubernetes_token_provider.go

View workflow job for this annotation

GitHub Actions / test

no required module provides package github.com/IBM/sarama; to add it:

Check failure on line 8 in plugins/extractors/kafka/kubernetes_token_provider.go

View workflow job for this annotation

GitHub Actions / plugins-test (extractors/kafka)

no required module provides package github.com/IBM/sarama; to add it:
"github.com/rs/zerolog/log"

Check failure on line 9 in plugins/extractors/kafka/kubernetes_token_provider.go

View workflow job for this annotation

GitHub Actions / test

missing go.sum entry for module providing package github.com/rs/zerolog/log (imported by github.com/raystack/meteor/plugins/extractors/kafka); to add:

Check failure on line 9 in plugins/extractors/kafka/kubernetes_token_provider.go

View workflow job for this annotation

GitHub Actions / plugins-test (extractors/kafka)

missing go.sum entry for module providing package github.com/rs/zerolog/log (imported by github.com/raystack/meteor/plugins/extractors/kafka); to add:
)

const (
kubernetesServiceAccountTokenPath = "/var/run/secrets/kafka/serviceaccount/token"
)

// NewKubernetesTokenProvider creates a new TokenProvider that reads the token from kubernetes pod service account
// token file. By default, the token file path for kafka is stored in `/var/run/secrets/kafka/serviceaccount/token`.
// User need to make sure there a valid projected service account token on that path.
func NewKubernetesTokenProvider(opts ...TokenProviderOption) *KubernetesTokenProvider {
options := &TokenProviderOptions{
FilePath: kubernetesServiceAccountTokenPath,
}
for _, o := range opts {
o(options)
}
log.Info().Str("token_file_path", options.FilePath).Msg("token provider options")
return &KubernetesTokenProvider{
serviceAccountFilePath: options.FilePath,
}
}

type KubernetesTokenProvider struct {
serviceAccountFilePath string
}

// Token returns the token from the service account token file.
func (tp *KubernetesTokenProvider) Token() (*sarama.AccessToken, error) {
token, err := tp.readFile()
if err != nil {
log.Error().Err(err).Msg("failed to read token from service account token file")
return nil, err
}
return &sarama.AccessToken{
Token: token,
}, nil
}
func (tp *KubernetesTokenProvider) readFile() (string, error) {
token, err := os.ReadFile(tp.serviceAccountFilePath)
if err != nil {
return "", fmt.Errorf("failed to read files: %w", err)
}
tkn := strings.TrimSpace(string(token))
return tkn, nil
}

type TokenProviderOptions struct {
// FilePath is the path to the file containing the token.
FilePath string
}
type TokenProviderOption func(*TokenProviderOptions)

// WithTokenFilePath sets the file path to the token.
func WithTokenFilePath(path string) TokenProviderOption {
return func(o *TokenProviderOptions) {
o.FilePath = path
}
}

0 comments on commit ef59621

Please sign in to comment.