Skip to content

Commit

Permalink
fix: fix kafka extractor issues (#494)
Browse files Browse the repository at this point in the history
* chore: fix lint issues

* chore: fix build issues
  • Loading branch information
ravisuhag authored Oct 31, 2024
1 parent 01a148e commit fb743a9
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 18 deletions.
29 changes: 16 additions & 13 deletions plugins/extractors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"time"

"github.com/IBM/sarama"
"github.com/raystack/meteor/models"
v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2"
"github.com/raystack/meteor/plugins"
Expand Down Expand Up @@ -110,21 +111,21 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
}

consumerConfig := sarama.NewConfig()

if e.config.Auth.TLS.Enabled {
tlsConfig, err := e.createTLSConfig()
if err != nil {
return fmt.Errorf("create tls config: %w", err)
}
consumerConfig.Net.TLS.Enable = true
consumerConfig.Net.TLS.Config = tlsConfig
}

if e.config.Auth.SASL.Enabled {
consumerConfig.Net.SASL.Enable = true
if e.config.Auth.SASL.Mechanism == sarama.SASLTypeOAuth {
consumerConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
consumerConfig.Net.SASL.TokenProvider = NewKubernetesTokenProvider()
}
if e.config.Auth.SASL.Enabled {
consumerConfig.Net.SASL.Enable = true
if e.config.Auth.SASL.Mechanism == sarama.SASLTypeOAuth {
consumerConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
consumerConfig.Net.SASL.TokenProvider = NewKubernetesTokenProvider()
}
}

consumer, err := sarama.NewConsumer([]string{e.config.Broker}, consumerConfig)
Expand All @@ -133,6 +134,7 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
return fmt.Errorf("failed to create kafka consumer for brokers %s and config %+v. Error %s", e.config.Broker,
consumerConfig, err.Error())
}

e.conn = consumer
return nil
}
Expand Down Expand Up @@ -162,6 +164,7 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error)
ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attributes...),
)
}(time.Now())

topics, err := e.conn.Topics()
if err != nil {
return fmt.Errorf("fetch topics: %w", err)
Expand Down Expand Up @@ -200,11 +203,6 @@ func (e *Extractor) createTLSConfig() (*tls.Config, error) {
}, nil
}

cert, err := tls.LoadX509KeyPair(authConfig.CertFile, authConfig.KeyFile)
if err != nil {
return nil, fmt.Errorf("create cert: %w", err)
}

var cert tls.Certificate
var err error
if authConfig.CertFile != "" && authConfig.KeyFile != "" {
Expand All @@ -214,6 +212,11 @@ func (e *Extractor) createTLSConfig() (*tls.Config, error) {
}
}

caCert, err := os.ReadFile(authConfig.CAFile)
if err != nil {
return nil, fmt.Errorf("read ca cert file: %w", err)
}

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

Expand All @@ -231,7 +234,7 @@ func (e *Extractor) buildAsset(topicName string, numOfPartitions int) (*v1beta2.
Profile: &v1beta2.TopicProfile{
NumberOfPartitions: int64(numOfPartitions),
},
Attributes: &structpb.Struct{},
Attributes: &structpb.Struct{}, // ensure attributes don't get overwritten if present
})
if err != nil {
e.logger.Warn("error creating Any struct", "error", err)
Expand Down
10 changes: 5 additions & 5 deletions plugins/extractors/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ var (

func TestMain(m *testing.M) {
var broker *kafkaLib.Broker

// setup test
opts := dockertest.RunOptions{
Repository: "moeenz/docker-kafka-kraft",
Expand All @@ -55,6 +54,7 @@ func TestMain(m *testing.M) {
time.Sleep(30 * time.Second)
conn, err := kafkaLib.NewClient([]string{brokerHost}, nil)
if err != nil {
fmt.Printf("error creating client ")
return
}

Expand All @@ -70,9 +70,9 @@ func TestMain(m *testing.M) {
conn.Close()
return
}

return
}

purgeContainer, err := utils.CreateContainer(opts, retryFn)
if err != nil {
log.Fatal(err)
Expand All @@ -86,8 +86,6 @@ func TestMain(m *testing.M) {
// run tests
code := m.Run()

conn.Close()

// purge container
if err := purgeContainer(); err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -179,7 +177,6 @@ func TestExtract(t *testing.T) {
if err != nil {
t.Fatal(err)
}

emitter := mocks.NewEmitter()
err = extr.Extract(ctx, emitter.Push)
assert.NoError(t, err)
Expand Down Expand Up @@ -226,12 +223,14 @@ func TestExtract(t *testing.T) {
}

func setup(broker *kafkaLib.Broker) (err error) {

// create client connection to create topics
conn, err := kafkaLib.NewClient([]string{brokerHost}, nil)
if err != nil {
fmt.Printf("error creating client ")
return
}

defer conn.Close()

// create topics
Expand All @@ -241,6 +240,7 @@ func setup(broker *kafkaLib.Broker) (err error) {
"meteor-test-topic-3": {NumPartitions: 1, ReplicationFactor: 1},
"__consumer_offsets": {NumPartitions: 1, ReplicationFactor: 1},
}

createTopicRequest := &kafkaLib.CreateTopicsRequest{TopicDetails: topicConfigs}
_, err = broker.CreateTopics(createTopicRequest)
if err != nil {
Expand Down

0 comments on commit fb743a9

Please sign in to comment.