Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib/IBM/sarama.v1: add DSM support #2788

Merged
merged 6 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions contrib/IBM/sarama.v1/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type config struct {
consumerSpanName string
producerSpanName string
analyticsRate float64
dataStreamsEnabled bool
groupID string
}

func defaults(cfg *config) {
Expand All @@ -29,6 +31,8 @@ func defaults(cfg *config) {
cfg.consumerSpanName = namingschema.OpName(namingschema.KafkaInbound)
cfg.producerSpanName = namingschema.OpName(namingschema.KafkaOutbound)

cfg.dataStreamsEnabled = internal.BoolEnv("DD_DATA_STREAMS_ENABLED", false)

// cfg.analyticsRate = globalconfig.AnalyticsRate()
if internal.BoolEnv("DD_TRACE_SARAMA_ANALYTICS_ENABLED", false) {
cfg.analyticsRate = 1.0
Expand All @@ -48,6 +52,20 @@ func WithServiceName(name string) Option {
}
}

// WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/
func WithDataStreams() Option {
return func(cfg *config) {
cfg.dataStreamsEnabled = true
}
}

// WithGroupID tags the produced data streams metrics with the given groupID (aka consumer group)
func WithGroupID(groupID string) Option {
return func(cfg *config) {
cfg.groupID = groupID
}
}

// WithAnalytics enables Trace Analytics for all started spans.
func WithAnalytics(on bool) Option {
return func(cfg *config) {
Expand Down
39 changes: 39 additions & 0 deletions contrib/IBM/sarama.v1/option_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package sarama

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestDataStreamsActivation(t *testing.T) {
t.Run("default", func(t *testing.T) {
cfg := new(config)
defaults(cfg)
assert.False(t, cfg.dataStreamsEnabled)
})
t.Run("withOption", func(t *testing.T) {
cfg := new(config)
defaults(cfg)
WithDataStreams()(cfg)
assert.True(t, cfg.dataStreamsEnabled)
})
t.Run("withEnv", func(t *testing.T) {
t.Setenv("DD_DATA_STREAMS_ENABLED", "true")
cfg := new(config)
defaults(cfg)
assert.True(t, cfg.dataStreamsEnabled)
})
t.Run("optionOverridesEnv", func(t *testing.T) {
t.Setenv("DD_DATA_STREAMS_ENABLED", "false")
cfg := new(config)
defaults(cfg)
WithDataStreams()(cfg)
assert.True(t, cfg.dataStreamsEnabled)
})
}
74 changes: 74 additions & 0 deletions contrib/IBM/sarama.v1/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
package sarama // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/IBM/sarama"

import (
"context"
"math"

"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
"gopkg.in/DataDog/dd-trace-go.v1/datastreams/options"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
Expand Down Expand Up @@ -76,6 +79,7 @@ func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.P
next := tracer.StartSpan(cfg.consumerSpanName, opts...)
// reinject the span context so consumers can pick it up
tracer.Inject(next.Context(), carrier)
setConsumeCheckpoint(cfg.dataStreamsEnabled, cfg.groupID, msg)

wrapped.messages <- msg

Expand Down Expand Up @@ -127,8 +131,12 @@ type syncProducer struct {
// SendMessage calls sarama.SyncProducer.SendMessage and traces the request.
func (p *syncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
span := startProducerSpan(p.cfg, p.version, msg)
setProduceCheckpoint(p.cfg.dataStreamsEnabled, msg, p.version)
partition, offset, err = p.SyncProducer.SendMessage(msg)
finishProducerSpan(span, partition, offset, err)
if err == nil && p.cfg.dataStreamsEnabled {
tracer.TrackKafkaProduceOffset(msg.Topic, partition, offset)
}
return partition, offset, err
}

Expand All @@ -138,12 +146,19 @@ func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
// treated individually, so we create a span for each one
spans := make([]ddtrace.Span, len(msgs))
for i, msg := range msgs {
setProduceCheckpoint(p.cfg.dataStreamsEnabled, msg, p.version)
spans[i] = startProducerSpan(p.cfg, p.version, msg)
}
err := p.SyncProducer.SendMessages(msgs)
for i, span := range spans {
finishProducerSpan(span, msgs[i].Partition, msgs[i].Offset, err)
}
if err == nil && p.cfg.dataStreamsEnabled {
// we only track Kafka lag if messages have been sent successfully. Otherwise, we have no way to know to which partition data was sent to.
for _, msg := range msgs {
tracer.TrackKafkaProduceOffset(msg.Topic, msg.Partition, msg.Offset)
}
}
return err
}

Expand Down Expand Up @@ -221,6 +236,7 @@ func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts
select {
case msg := <-wrapped.input:
span := startProducerSpan(cfg, saramaConfig.Version, msg)
setProduceCheckpoint(cfg.dataStreamsEnabled, msg, saramaConfig.Version)
p.Input() <- msg
if saramaConfig.Producer.Return.Successes {
spanID := span.Context().SpanID()
Expand All @@ -236,6 +252,10 @@ func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts
// producer was closed, so exit
return
}
if cfg.dataStreamsEnabled {
// we only track Kafka lag if returning successes is enabled. Otherwise, we have no way to know to which partition data was sent to.
tracer.TrackKafkaProduceOffset(msg.Topic, msg.Partition, msg.Offset)
}
if spanctx, spanFound := getSpanContext(msg); spanFound {
spanID := spanctx.SpanID()
if span, ok := spans[spanID]; ok {
Expand Down Expand Up @@ -303,3 +323,57 @@ func getSpanContext(msg *sarama.ProducerMessage) (ddtrace.SpanContext, bool) {

return spanctx, true
}

func setProduceCheckpoint(enabled bool, msg *sarama.ProducerMessage, version sarama.KafkaVersion) {
if !enabled || msg == nil {
return
}
edges := []string{"direction:out", "topic:" + msg.Topic, "type:kafka"}
carrier := NewProducerMessageCarrier(msg)
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, edges...)
if !ok || !version.IsAtLeast(sarama.V0_11_0_0) {
return
}
datastreams.InjectToBase64Carrier(ctx, carrier)
}

func setConsumeCheckpoint(enabled bool, groupID string, msg *sarama.ConsumerMessage) {
if !enabled || msg == nil {
return
}
edges := []string{"direction:in", "topic:" + msg.Topic, "type:kafka"}
if groupID != "" {
edges = append(edges, "group:"+groupID)
}
carrier := NewConsumerMessageCarrier(msg)
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getConsumerMsgSize(msg)}, edges...)
if !ok {
return
}
datastreams.InjectToBase64Carrier(ctx, carrier)
if groupID != "" {
// only track Kafka lag if a consumer group is set.
// since there is no ack mechanism, we consider that messages read are committed right away.
tracer.TrackKafkaCommitOffset(groupID, msg.Topic, msg.Partition, msg.Offset)
}
}

func getProducerMsgSize(msg *sarama.ProducerMessage) (size int64) {
for _, header := range msg.Headers {
size += int64(len(header.Key) + len(header.Value))
}
if msg.Value != nil {
size += int64(msg.Value.Length())
}
if msg.Key != nil {
size += int64(msg.Key.Length())
}
return size
}

func getConsumerMsgSize(msg *sarama.ConsumerMessage) (size int64) {
for _, header := range msg.Headers {
size += int64(len(header.Key) + len(header.Value))
}
return size + int64(len(msg.Value)+len(msg.Key))
}
Loading
Loading