From b9fcdac403972de38cdb6b8d759a9fe3bbceb7f2 Mon Sep 17 00:00:00 2001 From: Rodrigo Arguello Date: Tue, 17 Dec 2024 10:45:28 +0100 Subject: [PATCH 1/3] contrib/IBM/sarama.v1: add WrapConsumerGroupHandler to trace consumer groups --- contrib/IBM/sarama.v1/consumer.go | 100 +++++ contrib/IBM/sarama.v1/consumer_group.go | 54 +++ contrib/IBM/sarama.v1/consumer_group_test.go | 155 ++++++++ contrib/IBM/sarama.v1/consumer_test.go | 100 +++++ contrib/IBM/sarama.v1/dispatcher.go | 93 +++++ contrib/IBM/sarama.v1/example_test.go | 43 ++- contrib/IBM/sarama.v1/producer.go | 252 ++++++++++++ contrib/IBM/sarama.v1/producer_test.go | 217 +++++++++++ contrib/IBM/sarama.v1/sarama.go | 368 ------------------ contrib/IBM/sarama.v1/sarama_test.go | 384 ++++--------------- 10 files changed, 1075 insertions(+), 691 deletions(-) create mode 100644 contrib/IBM/sarama.v1/consumer.go create mode 100644 contrib/IBM/sarama.v1/consumer_group.go create mode 100644 contrib/IBM/sarama.v1/consumer_group_test.go create mode 100644 contrib/IBM/sarama.v1/consumer_test.go create mode 100644 contrib/IBM/sarama.v1/dispatcher.go create mode 100644 contrib/IBM/sarama.v1/producer.go create mode 100644 contrib/IBM/sarama.v1/producer_test.go diff --git a/contrib/IBM/sarama.v1/consumer.go b/contrib/IBM/sarama.v1/consumer.go new file mode 100644 index 0000000000..519ad1dc09 --- /dev/null +++ b/contrib/IBM/sarama.v1/consumer.go @@ -0,0 +1,100 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package sarama + +import ( + "context" + "github.com/IBM/sarama" + + "gopkg.in/DataDog/dd-trace-go.v1/datastreams" + "gopkg.in/DataDog/dd-trace-go.v1/datastreams/options" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + "gopkg.in/DataDog/dd-trace-go.v1/internal/log" +) + +type partitionConsumer struct { + sarama.PartitionConsumer + dispatcher dispatcher +} + +// Messages returns the read channel for the messages that are returned by +// the broker. +func (pc *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage { + return pc.dispatcher.Messages() +} + +// WrapPartitionConsumer wraps a sarama.PartitionConsumer causing each received +// message to be traced. +func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer { + cfg := new(config) + defaults(cfg) + for _, opt := range opts { + opt(cfg) + } + log.Debug("contrib/IBM/sarama: Wrapping Partition Consumer: %#v", cfg) + + d := wrapDispatcher(pc, cfg) + go d.Run() + + wrapped := &partitionConsumer{ + PartitionConsumer: pc, + dispatcher: d, + } + return wrapped +} + +type consumer struct { + sarama.Consumer + opts []Option +} + +// ConsumePartition invokes Consumer.ConsumePartition and wraps the resulting +// PartitionConsumer. +func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) { + pc, err := c.Consumer.ConsumePartition(topic, partition, offset) + if err != nil { + return pc, err + } + return WrapPartitionConsumer(pc, c.opts...), nil +} + +// WrapConsumer wraps a sarama.Consumer wrapping any PartitionConsumer created +// via Consumer.ConsumePartition. +func WrapConsumer(c sarama.Consumer, opts ...Option) sarama.Consumer { + return &consumer{ + Consumer: c, + opts: opts, + } +} + +func setConsumeCheckpoint(enabled bool, groupID string, msg *sarama.ConsumerMessage) { + if !enabled || msg == nil { + return + } + edges := []string{"direction:in", "topic:" + msg.Topic, "type:kafka"} + if groupID != "" { + edges = append(edges, "group:"+groupID) + } + carrier := NewConsumerMessageCarrier(msg) + + ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getConsumerMsgSize(msg)}, edges...) + if !ok { + return + } + datastreams.InjectToBase64Carrier(ctx, carrier) + if groupID != "" { + // only track Kafka lag if a consumer group is set. + // since there is no ack mechanism, we consider that messages read are committed right away. + tracer.TrackKafkaCommitOffset(groupID, msg.Topic, msg.Partition, msg.Offset) + } +} + +func getConsumerMsgSize(msg *sarama.ConsumerMessage) (size int64) { + for _, header := range msg.Headers { + size += int64(len(header.Key) + len(header.Value)) + } + return size + int64(len(msg.Value)+len(msg.Key)) +} diff --git a/contrib/IBM/sarama.v1/consumer_group.go b/contrib/IBM/sarama.v1/consumer_group.go new file mode 100644 index 0000000000..afb5a85107 --- /dev/null +++ b/contrib/IBM/sarama.v1/consumer_group.go @@ -0,0 +1,54 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package sarama + +import ( + "github.com/IBM/sarama" + + "gopkg.in/DataDog/dd-trace-go.v1/internal/log" +) + +type consumerGroupHandler struct { + sarama.ConsumerGroupHandler + cfg *config +} + +func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + // Wrap claim + wd := wrapDispatcher(claim, h.cfg) + go wd.Run() + claim = &consumerGroupClaim{ + ConsumerGroupClaim: claim, + dispatcher: wd, + } + + return h.ConsumerGroupHandler.ConsumeClaim(session, claim) +} + +// WrapConsumerGroupHandler wraps a sarama.ConsumerGroupHandler causing each received +// message to be traced. +func WrapConsumerGroupHandler(handler sarama.ConsumerGroupHandler, opts ...Option) sarama.ConsumerGroupHandler { + cfg := new(config) + defaults(cfg) + for _, opt := range opts { + opt(cfg) + } + log.Debug("contrib/IBM/sarama: Wrapping Consumer Group Handler: %#v", cfg) + + return &consumerGroupHandler{ + ConsumerGroupHandler: handler, + cfg: cfg, + } +} + +type consumerGroupClaim struct { + sarama.ConsumerGroupClaim + dispatcher dispatcher +} + +func (c *consumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage { + return c.dispatcher.Messages() +} diff --git a/contrib/IBM/sarama.v1/consumer_group_test.go b/contrib/IBM/sarama.v1/consumer_group_test.go new file mode 100644 index 0000000000..f2a0ba4796 --- /dev/null +++ b/contrib/IBM/sarama.v1/consumer_group_test.go @@ -0,0 +1,155 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package sarama + +import ( + "context" + "log" + "sync" + "testing" + + "github.com/IBM/sarama" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" +) + +func TestWrapConsumerGroupHandler(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + cfg := sarama.NewConfig() + cfg.Version = sarama.V0_11_0_0 // first version that supports headers + cfg.Producer.Return.Successes = true + cfg.Producer.Flush.Messages = 1 + + cg, err := sarama.NewConsumerGroup(kafkaBrokers, testGroupID, cfg) + require.NoError(t, err) + defer func() { + assert.NoError(t, cg.Close()) + }() + + handler := &testConsumerGroupHandler{ + T: t, + ready: make(chan bool), + rcvMessages: make(chan *sarama.ConsumerMessage, 1), + } + tracedHandler := WrapConsumerGroupHandler(handler, WithDataStreams()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for { + // `Consume` should be called inside an infinite loop, when a + // server-side rebalance happens, the consumer session will need to be + // recreated to get the new claims + if err := cg.Consume(ctx, []string{testTopic}, tracedHandler); err != nil { + assert.ErrorIs(t, err, sarama.ErrClosedConsumerGroup) + return + } + // check if context was cancelled, signaling that the consumer should stop + if ctx.Err() != nil { + return + } + } + }() + + <-handler.ready // Await till the consumer has been set up + log.Println("Sarama consumer up and running!...") + + p, err := sarama.NewSyncProducer(kafkaBrokers, cfg) + require.NoError(t, err) + + require.NoError(t, err) + p = WrapSyncProducer(cfg, p, WithDataStreams()) + + produceMsg := &sarama.ProducerMessage{ + Topic: testTopic, + Value: sarama.StringEncoder("test 1"), + Metadata: "test", + } + _, _, err = p.SendMessage(produceMsg) + require.NoError(t, err) + + waitForSpans(mt, 2) + cancel() + wg.Wait() + + spans := mt.FinishedSpans() + require.Len(t, spans, 2) + consumeMsg := <-handler.rcvMessages + + s0 := spans[0] + assert.Equal(t, "kafka", s0.Tag(ext.ServiceName)) + assert.Equal(t, "queue", s0.Tag(ext.SpanType)) + assert.Equal(t, "Produce Topic gotest", s0.Tag(ext.ResourceName)) + assert.Equal(t, "kafka.produce", s0.OperationName()) + assert.Equal(t, int32(0), s0.Tag(ext.MessagingKafkaPartition)) + assert.NotEmpty(t, s0.Tag("offset")) + assert.Equal(t, "IBM/sarama", s0.Tag(ext.Component)) + assert.Equal(t, ext.SpanKindProducer, s0.Tag(ext.SpanKind)) + assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem)) + + assertDSMProducerPathway(t, testTopic, produceMsg) + + s1 := spans[1] + assert.Equal(t, "kafka", s1.Tag(ext.ServiceName)) + assert.Equal(t, "queue", s1.Tag(ext.SpanType)) + assert.Equal(t, "Consume Topic gotest", s1.Tag(ext.ResourceName)) + assert.Equal(t, "kafka.consume", s1.OperationName()) + assert.Equal(t, int32(0), s1.Tag(ext.MessagingKafkaPartition)) + assert.NotEmpty(t, s1.Tag("offset")) + assert.Equal(t, "IBM/sarama", s1.Tag(ext.Component)) + assert.Equal(t, ext.SpanKindConsumer, s1.Tag(ext.SpanKind)) + assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem)) + + assertDSMConsumerPathway(t, testTopic, "", consumeMsg, true) + + assert.Equal(t, s0.SpanID(), s1.ParentID(), "spans are not parent-child") +} + +type testConsumerGroupHandler struct { + *testing.T + ready chan bool + rcvMessages chan *sarama.ConsumerMessage +} + +func (t *testConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { + // Mark the consumer as ready + close(t.ready) + return nil +} + +func (t *testConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { + return nil +} + +func (t *testConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for { + select { + case msg, ok := <-claim.Messages(): + if !ok { + t.T.Log("message channel was closed") + return nil + } + t.T.Logf("Message claimed: value = %s, timestamp = %v, topic = %s", string(msg.Value), msg.Timestamp, msg.Topic) + session.MarkMessage(msg, "") + t.rcvMessages <- msg + + // Should return when `session.Context()` is done. + // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: + // https://github.com/IBM/sarama/issues/1192 + case <-session.Context().Done(): + return nil + } + } +} diff --git a/contrib/IBM/sarama.v1/consumer_test.go b/contrib/IBM/sarama.v1/consumer_test.go new file mode 100644 index 0000000000..8b373b2555 --- /dev/null +++ b/contrib/IBM/sarama.v1/consumer_test.go @@ -0,0 +1,100 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package sarama + +import ( + "testing" + + "github.com/IBM/sarama" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" +) + +func TestWrapConsumer(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + broker := sarama.NewMockBroker(t, 0) + defer broker.Close() + + broker.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetBroker(broker.Addr(), broker.BrokerID()). + SetLeader("test-topic", 0, broker.BrokerID()), + "OffsetRequest": sarama.NewMockOffsetResponse(t). + SetOffset("test-topic", 0, sarama.OffsetOldest, 0). + SetOffset("test-topic", 0, sarama.OffsetNewest, 1), + "FetchRequest": sarama.NewMockFetchResponse(t, 1). + SetMessage("test-topic", 0, 0, sarama.StringEncoder("hello")). + SetMessage("test-topic", 0, 1, sarama.StringEncoder("world")), + }) + cfg := sarama.NewConfig() + cfg.Version = sarama.MinVersion + + client, err := sarama.NewClient([]string{broker.Addr()}, cfg) + require.NoError(t, err) + defer client.Close() + + consumer, err := sarama.NewConsumerFromClient(client) + require.NoError(t, err) + defer consumer.Close() + + consumer = WrapConsumer(consumer, WithDataStreams()) + + partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, 0) + require.NoError(t, err) + msg1 := <-partitionConsumer.Messages() + msg2 := <-partitionConsumer.Messages() + err = partitionConsumer.Close() + require.NoError(t, err) + // wait for the channel to be closed + <-partitionConsumer.Messages() + + spans := mt.FinishedSpans() + require.Len(t, spans, 2) + { + s := spans[0] + spanctx, err := tracer.Extract(NewConsumerMessageCarrier(msg1)) + assert.NoError(t, err) + assert.Equal(t, spanctx.TraceID(), s.TraceID(), + "span context should be injected into the consumer message headers") + + assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) + assert.Equal(t, int64(0), s.Tag("offset")) + assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) + assert.Equal(t, "Consume Topic test-topic", s.Tag(ext.ResourceName)) + assert.Equal(t, "queue", s.Tag(ext.SpanType)) + assert.Equal(t, "kafka.consume", s.OperationName()) + assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) + assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind)) + assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + + assertDSMConsumerPathway(t, "test-topic", "", msg1, false) + } + { + s := spans[1] + spanctx, err := tracer.Extract(NewConsumerMessageCarrier(msg2)) + assert.NoError(t, err) + assert.Equal(t, spanctx.TraceID(), s.TraceID(), + "span context should be injected into the consumer message headers") + + assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) + assert.Equal(t, int64(1), s.Tag("offset")) + assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) + assert.Equal(t, "Consume Topic test-topic", s.Tag(ext.ResourceName)) + assert.Equal(t, "queue", s.Tag(ext.SpanType)) + assert.Equal(t, "kafka.consume", s.OperationName()) + assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) + assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind)) + assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + + assertDSMConsumerPathway(t, "test-topic", "", msg2, false) + } +} diff --git a/contrib/IBM/sarama.v1/dispatcher.go b/contrib/IBM/sarama.v1/dispatcher.go new file mode 100644 index 0000000000..dafa5577d8 --- /dev/null +++ b/contrib/IBM/sarama.v1/dispatcher.go @@ -0,0 +1,93 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package sarama + +import ( + "fmt" + "math" + + "github.com/IBM/sarama" + + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" +) + +type dispatcher interface { + Messages() <-chan *sarama.ConsumerMessage +} + +type wrappedDispatcher struct { + d dispatcher + messages chan *sarama.ConsumerMessage + + cfg *config +} + +func wrapDispatcher(d dispatcher, cfg *config) *wrappedDispatcher { + return &wrappedDispatcher{ + d: d, + messages: make(chan *sarama.ConsumerMessage), + cfg: cfg, + } +} + +func (w *wrappedDispatcher) Messages() <-chan *sarama.ConsumerMessage { + return w.messages +} + +func (w *wrappedDispatcher) Run() { + msgs := w.d.Messages() + var prev ddtrace.Span + + for msg := range msgs { + // create the next span from the message + opts := []tracer.StartSpanOption{ + tracer.ServiceName(w.cfg.consumerServiceName), + tracer.ResourceName("Consume Topic " + msg.Topic), + tracer.SpanType(ext.SpanTypeMessageConsumer), + tracer.Tag(ext.MessagingKafkaPartition, msg.Partition), + tracer.Tag("offset", msg.Offset), + tracer.Tag(ext.Component, componentName), + tracer.Tag(ext.SpanKind, ext.SpanKindConsumer), + tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), + tracer.Measured(), + } + if !math.IsNaN(w.cfg.analyticsRate) { + opts = append(opts, tracer.Tag(ext.EventSampleRate, w.cfg.analyticsRate)) + } + // kafka supports headers, so try to extract a span context + carrier := NewConsumerMessageCarrier(msg) + if spanctx, err := tracer.Extract(carrier); err == nil { + // If there are span links as a result of context extraction, add them as a StartSpanOption + if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil { + opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks())) + } + opts = append(opts, tracer.ChildOf(spanctx)) + } + next := tracer.StartSpan(w.cfg.consumerSpanName, opts...) + // reinject the span context so consumers can pick it up + tracer.Inject(next.Context(), carrier) + setConsumeCheckpoint(w.cfg.dataStreamsEnabled, w.cfg.groupID, msg) + + for _, h := range msg.Headers { + fmt.Printf("--- key: %s, value: %s\n", h.Key, h.Value) + } + + w.messages <- msg + + // if the next message was received, finish the previous span + if prev != nil { + prev.Finish() + } + prev = next + } + // finish any remaining span + if prev != nil { + prev.Finish() + } + close(w.messages) +} diff --git a/contrib/IBM/sarama.v1/example_test.go b/contrib/IBM/sarama.v1/example_test.go index 3990625b15..b26eb98391 100644 --- a/contrib/IBM/sarama.v1/example_test.go +++ b/contrib/IBM/sarama.v1/example_test.go @@ -6,6 +6,8 @@ package sarama_test import ( + "context" + "errors" "log" saramatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/IBM/sarama.v1" @@ -14,7 +16,7 @@ import ( "github.com/IBM/sarama" ) -func Example_asyncProducer() { +func ExampleWrapAsyncProducer() { cfg := sarama.NewConfig() cfg.Version = sarama.V0_11_0_0 // minimum version that supports headers which are required for tracing @@ -33,7 +35,7 @@ func Example_asyncProducer() { producer.Input() <- msg } -func Example_syncProducer() { +func ExampleWrapSyncProducer() { cfg := sarama.NewConfig() cfg.Producer.Return.Successes = true @@ -55,7 +57,7 @@ func Example_syncProducer() { } } -func Example_consumer() { +func ExampleWrapConsumer() { consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) if err != nil { panic(err) @@ -82,3 +84,38 @@ func Example_consumer() { consumed++ } } + +func ExampleWrapConsumerGroupHandler() { + cfg := sarama.NewConfig() + cfg.Version = sarama.V0_11_0_0 // first version that supports headers + cfg.Producer.Return.Successes = true + cfg.Producer.Flush.Messages = 1 + + const groupID = "group-id" + + consumerGroup, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, groupID, cfg) + if err != nil { + panic(err) + } + + // trace your sarama.ConsumerGroupHandler implementation + var myHandler sarama.ConsumerGroupHandler + handler := saramatrace.WrapConsumerGroupHandler(myHandler) + + ctx := context.Background() + for { + // `Consume` should be called inside an infinite loop, when a + // server-side rebalance happens, the consumer session will need to be + // recreated to get the new claims + if err := consumerGroup.Consume(ctx, []string{"my-topic"}, handler); err != nil { + if errors.Is(err, sarama.ErrClosedConsumerGroup) { + return + } + log.Panicf("Error from consumer: %v", err) + } + // check if context was cancelled, signaling that the consumer should stop + if ctx.Err() != nil { + return + } + } +} diff --git a/contrib/IBM/sarama.v1/producer.go b/contrib/IBM/sarama.v1/producer.go new file mode 100644 index 0000000000..b3c312eeb6 --- /dev/null +++ b/contrib/IBM/sarama.v1/producer.go @@ -0,0 +1,252 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package sarama + +import ( + "context" + "math" + + "github.com/IBM/sarama" + + "gopkg.in/DataDog/dd-trace-go.v1/datastreams" + "gopkg.in/DataDog/dd-trace-go.v1/datastreams/options" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + "gopkg.in/DataDog/dd-trace-go.v1/internal/log" +) + +type syncProducer struct { + sarama.SyncProducer + version sarama.KafkaVersion + cfg *config +} + +// SendMessage calls sarama.SyncProducer.SendMessage and traces the request. +func (p *syncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { + span := startProducerSpan(p.cfg, p.version, msg) + setProduceCheckpoint(p.cfg.dataStreamsEnabled, msg, p.version) + partition, offset, err = p.SyncProducer.SendMessage(msg) + finishProducerSpan(span, partition, offset, err) + if err == nil && p.cfg.dataStreamsEnabled { + tracer.TrackKafkaProduceOffset(msg.Topic, partition, offset) + } + return partition, offset, err +} + +// SendMessages calls sarama.SyncProducer.SendMessages and traces the requests. +func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error { + // although there's only one call made to the SyncProducer, the messages are + // treated individually, so we create a span for each one + spans := make([]ddtrace.Span, len(msgs)) + for i, msg := range msgs { + setProduceCheckpoint(p.cfg.dataStreamsEnabled, msg, p.version) + spans[i] = startProducerSpan(p.cfg, p.version, msg) + } + err := p.SyncProducer.SendMessages(msgs) + for i, span := range spans { + finishProducerSpan(span, msgs[i].Partition, msgs[i].Offset, err) + } + if err == nil && p.cfg.dataStreamsEnabled { + // we only track Kafka lag if messages have been sent successfully. Otherwise, we have no way to know to which partition data was sent to. + for _, msg := range msgs { + tracer.TrackKafkaProduceOffset(msg.Topic, msg.Partition, msg.Offset) + } + } + return err +} + +// WrapSyncProducer wraps a sarama.SyncProducer so that all produced messages +// are traced. +func WrapSyncProducer(saramaConfig *sarama.Config, producer sarama.SyncProducer, opts ...Option) sarama.SyncProducer { + cfg := new(config) + defaults(cfg) + for _, opt := range opts { + opt(cfg) + } + log.Debug("contrib/IBM/sarama: Wrapping Sync Producer: %#v", cfg) + if saramaConfig == nil { + saramaConfig = sarama.NewConfig() + } + return &syncProducer{ + SyncProducer: producer, + version: saramaConfig.Version, + cfg: cfg, + } +} + +type asyncProducer struct { + sarama.AsyncProducer + input chan *sarama.ProducerMessage + successes chan *sarama.ProducerMessage + errors chan *sarama.ProducerError +} + +// Input returns the input channel. +func (p *asyncProducer) Input() chan<- *sarama.ProducerMessage { + return p.input +} + +// Successes returns the successes channel. +func (p *asyncProducer) Successes() <-chan *sarama.ProducerMessage { + return p.successes +} + +// Errors returns the errors channel. +func (p *asyncProducer) Errors() <-chan *sarama.ProducerError { + return p.errors +} + +// WrapAsyncProducer wraps a sarama.AsyncProducer so that all produced messages +// are traced. It requires the underlying sarama Config so we can know whether +// or not successes will be returned. Tracing requires at least sarama.V0_11_0_0 +// version which is the first version that supports headers. Only spans of +// successfully published messages have partition and offset tags set. +func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts ...Option) sarama.AsyncProducer { + cfg := new(config) + defaults(cfg) + for _, opt := range opts { + opt(cfg) + } + log.Debug("contrib/IBM/sarama: Wrapping Async Producer: %#v", cfg) + if saramaConfig == nil { + saramaConfig = sarama.NewConfig() + saramaConfig.Version = sarama.V0_11_0_0 + } else if !saramaConfig.Version.IsAtLeast(sarama.V0_11_0_0) { + log.Error("Tracing Sarama async producer requires at least sarama.V0_11_0_0 version") + } + wrapped := &asyncProducer{ + AsyncProducer: p, + input: make(chan *sarama.ProducerMessage), + successes: make(chan *sarama.ProducerMessage), + errors: make(chan *sarama.ProducerError), + } + go func() { + spans := make(map[uint64]ddtrace.Span) + defer close(wrapped.input) + defer close(wrapped.successes) + defer close(wrapped.errors) + for { + select { + case msg := <-wrapped.input: + span := startProducerSpan(cfg, saramaConfig.Version, msg) + setProduceCheckpoint(cfg.dataStreamsEnabled, msg, saramaConfig.Version) + p.Input() <- msg + if saramaConfig.Producer.Return.Successes { + spanID := span.Context().SpanID() + spans[spanID] = span + } else { + // if returning successes isn't enabled, we just finish the + // span right away because there's no way to know when it will + // be done + span.Finish() + } + case msg, ok := <-p.Successes(): + if !ok { + // producer was closed, so exit + return + } + if cfg.dataStreamsEnabled { + // we only track Kafka lag if returning successes is enabled. Otherwise, we have no way to know to which partition data was sent to. + tracer.TrackKafkaProduceOffset(msg.Topic, msg.Partition, msg.Offset) + } + if spanctx, spanFound := getProducerSpanContext(msg); spanFound { + spanID := spanctx.SpanID() + if span, ok := spans[spanID]; ok { + delete(spans, spanID) + finishProducerSpan(span, msg.Partition, msg.Offset, nil) + } + } + wrapped.successes <- msg + case err, ok := <-p.Errors(): + if !ok { + // producer was closed + return + } + if spanctx, spanFound := getProducerSpanContext(err.Msg); spanFound { + spanID := spanctx.SpanID() + if span, ok := spans[spanID]; ok { + delete(spans, spanID) + span.Finish(tracer.WithError(err)) + } + } + wrapped.errors <- err + } + } + }() + return wrapped +} + +func startProducerSpan(cfg *config, version sarama.KafkaVersion, msg *sarama.ProducerMessage) ddtrace.Span { + carrier := NewProducerMessageCarrier(msg) + opts := []tracer.StartSpanOption{ + tracer.ServiceName(cfg.producerServiceName), + tracer.ResourceName("Produce Topic " + msg.Topic), + tracer.SpanType(ext.SpanTypeMessageProducer), + tracer.Tag(ext.Component, componentName), + tracer.Tag(ext.SpanKind, ext.SpanKindProducer), + tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), + } + if !math.IsNaN(cfg.analyticsRate) { + opts = append(opts, tracer.Tag(ext.EventSampleRate, cfg.analyticsRate)) + } + // if there's a span context in the headers, use that as the parent + if spanctx, err := tracer.Extract(carrier); err == nil { + // If there are span links as a result of context extraction, add them as a StartSpanOption + if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil { + opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks())) + } + opts = append(opts, tracer.ChildOf(spanctx)) + } + span := tracer.StartSpan(cfg.producerSpanName, opts...) + if version.IsAtLeast(sarama.V0_11_0_0) { + // re-inject the span context so consumers can pick it up + tracer.Inject(span.Context(), carrier) + } + return span +} + +func finishProducerSpan(span ddtrace.Span, partition int32, offset int64, err error) { + span.SetTag(ext.MessagingKafkaPartition, partition) + span.SetTag("offset", offset) + span.Finish(tracer.WithError(err)) +} + +func getProducerSpanContext(msg *sarama.ProducerMessage) (ddtrace.SpanContext, bool) { + carrier := NewProducerMessageCarrier(msg) + spanctx, err := tracer.Extract(carrier) + if err != nil { + return nil, false + } + + return spanctx, true +} + +func setProduceCheckpoint(enabled bool, msg *sarama.ProducerMessage, version sarama.KafkaVersion) { + if !enabled || msg == nil { + return + } + edges := []string{"direction:out", "topic:" + msg.Topic, "type:kafka"} + carrier := NewProducerMessageCarrier(msg) + ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, edges...) + if !ok || !version.IsAtLeast(sarama.V0_11_0_0) { + return + } + datastreams.InjectToBase64Carrier(ctx, carrier) +} + +func getProducerMsgSize(msg *sarama.ProducerMessage) (size int64) { + for _, header := range msg.Headers { + size += int64(len(header.Key) + len(header.Value)) + } + if msg.Value != nil { + size += int64(msg.Value.Length()) + } + if msg.Key != nil { + size += int64(msg.Key.Length()) + } + return size +} diff --git a/contrib/IBM/sarama.v1/producer_test.go b/contrib/IBM/sarama.v1/producer_test.go new file mode 100644 index 0000000000..92308a9eb5 --- /dev/null +++ b/contrib/IBM/sarama.v1/producer_test.go @@ -0,0 +1,217 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package sarama + +import ( + "testing" + + "github.com/IBM/sarama" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" +) + +func TestSyncProducer(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + seedBroker := sarama.NewMockBroker(t, 1) + defer seedBroker.Close() + + leader := sarama.NewMockBroker(t, 2) + defer leader.Close() + + metadataResponse := new(sarama.MetadataResponse) + metadataResponse.Version = 1 + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) + seedBroker.Returns(metadataResponse) + + prodSuccess := new(sarama.ProduceResponse) + prodSuccess.Version = 2 + prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError) + leader.Returns(prodSuccess) + + cfg := sarama.NewConfig() + cfg.Version = sarama.V0_11_0_0 // first version that supports headers + cfg.Producer.Return.Successes = true + + producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg) + require.NoError(t, err) + producer = WrapSyncProducer(cfg, producer, WithDataStreams()) + + msg1 := &sarama.ProducerMessage{ + Topic: "my_topic", + Value: sarama.StringEncoder("test 1"), + Metadata: "test", + } + _, _, err = producer.SendMessage(msg1) + require.NoError(t, err) + + spans := mt.FinishedSpans() + assert.Len(t, spans, 1) + { + s := spans[0] + assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) + assert.Equal(t, "queue", s.Tag(ext.SpanType)) + assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName)) + assert.Equal(t, "kafka.produce", s.OperationName()) + assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) + assert.Equal(t, int64(0), s.Tag("offset")) + assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) + assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) + assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + + assertDSMProducerPathway(t, "my_topic", msg1) + } +} + +func TestSyncProducerSendMessages(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + seedBroker := sarama.NewMockBroker(t, 1) + defer seedBroker.Close() + leader := sarama.NewMockBroker(t, 2) + defer leader.Close() + + metadataResponse := new(sarama.MetadataResponse) + metadataResponse.Version = 1 + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) + seedBroker.Returns(metadataResponse) + + prodSuccess := new(sarama.ProduceResponse) + prodSuccess.Version = 2 + prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError) + leader.Returns(prodSuccess) + + cfg := sarama.NewConfig() + cfg.Version = sarama.V0_11_0_0 // first version that supports headers + cfg.Producer.Return.Successes = true + cfg.Producer.Flush.Messages = 2 + + producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg) + require.NoError(t, err) + producer = WrapSyncProducer(cfg, producer, WithDataStreams()) + + msg1 := &sarama.ProducerMessage{ + Topic: "my_topic", + Value: sarama.StringEncoder("test 1"), + Metadata: "test", + } + msg2 := &sarama.ProducerMessage{ + Topic: "my_topic", + Value: sarama.StringEncoder("test 2"), + Metadata: "test", + } + err = producer.SendMessages([]*sarama.ProducerMessage{msg1, msg2}) + require.NoError(t, err) + + spans := mt.FinishedSpans() + require.Len(t, spans, 2) + for _, s := range spans { + assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) + assert.Equal(t, "queue", s.Tag(ext.SpanType)) + assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName)) + assert.Equal(t, "kafka.produce", s.OperationName()) + assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) + assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) + assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) + assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + } + + for _, msg := range []*sarama.ProducerMessage{msg1, msg2} { + assertDSMProducerPathway(t, "my_topic", msg) + } +} + +func TestWrapAsyncProducer(t *testing.T) { + // the default for producers is a fire-and-forget model that doesn't return + // successes + t.Run("Without Successes", func(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + broker := newMockBroker(t) + + cfg := sarama.NewConfig() + cfg.Version = sarama.V0_11_0_0 + producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg) + require.NoError(t, err) + producer = WrapAsyncProducer(nil, producer, WithDataStreams()) + + msg1 := &sarama.ProducerMessage{ + Topic: "my_topic", + Value: sarama.StringEncoder("test 1"), + } + producer.Input() <- msg1 + + waitForSpans(mt, 1) + + spans := mt.FinishedSpans() + require.Len(t, spans, 1) + { + s := spans[0] + assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) + assert.Equal(t, "queue", s.Tag(ext.SpanType)) + assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName)) + assert.Equal(t, "kafka.produce", s.OperationName()) + + // these tags are set in the finishProducerSpan function, but in this case it's never used, and instead we + // automatically finish spans after being started because we don't have a way to know when they are finished. + assert.Nil(t, s.Tag(ext.MessagingKafkaPartition)) + assert.Nil(t, s.Tag("offset")) + + assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) + assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) + assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + + assertDSMProducerPathway(t, "my_topic", msg1) + } + }) + + t.Run("With Successes", func(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + broker := newMockBroker(t) + + cfg := sarama.NewConfig() + cfg.Version = sarama.V0_11_0_0 + cfg.Producer.Return.Successes = true + + producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg) + require.NoError(t, err) + producer = WrapAsyncProducer(cfg, producer, WithDataStreams()) + + msg1 := &sarama.ProducerMessage{ + Topic: "my_topic", + Value: sarama.StringEncoder("test 1"), + } + producer.Input() <- msg1 + <-producer.Successes() + + spans := mt.FinishedSpans() + require.Len(t, spans, 1) + { + s := spans[0] + assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) + assert.Equal(t, "queue", s.Tag(ext.SpanType)) + assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName)) + assert.Equal(t, "kafka.produce", s.OperationName()) + assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) + assert.Equal(t, int64(0), s.Tag("offset")) + assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) + assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) + assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + + assertDSMProducerPathway(t, "my_topic", msg1) + } + }) +} diff --git a/contrib/IBM/sarama.v1/sarama.go b/contrib/IBM/sarama.v1/sarama.go index 3166977abc..1431daa044 100644 --- a/contrib/IBM/sarama.v1/sarama.go +++ b/contrib/IBM/sarama.v1/sarama.go @@ -7,18 +7,8 @@ package sarama // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/IBM/sarama" import ( - "context" - "math" - - "gopkg.in/DataDog/dd-trace-go.v1/datastreams" - "gopkg.in/DataDog/dd-trace-go.v1/datastreams/options" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" - "gopkg.in/DataDog/dd-trace-go.v1/internal/log" "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" - - "github.com/IBM/sarama" ) const componentName = "IBM/sarama" @@ -27,361 +17,3 @@ func init() { telemetry.LoadIntegration(componentName) tracer.MarkIntegrationImported("github.com/IBM/sarama") } - -type partitionConsumer struct { - sarama.PartitionConsumer - messages chan *sarama.ConsumerMessage -} - -// Messages returns the read channel for the messages that are returned by -// the broker. -func (pc *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage { - return pc.messages -} - -// WrapPartitionConsumer wraps a sarama.PartitionConsumer causing each received -// message to be traced. -func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer { - cfg := new(config) - defaults(cfg) - for _, opt := range opts { - opt(cfg) - } - log.Debug("contrib/IBM/sarama: Wrapping Partition Consumer: %#v", cfg) - wrapped := &partitionConsumer{ - PartitionConsumer: pc, - messages: make(chan *sarama.ConsumerMessage), - } - go func() { - msgs := pc.Messages() - var prev ddtrace.Span - for msg := range msgs { - // create the next span from the message - opts := []tracer.StartSpanOption{ - tracer.ServiceName(cfg.consumerServiceName), - tracer.ResourceName("Consume Topic " + msg.Topic), - tracer.SpanType(ext.SpanTypeMessageConsumer), - tracer.Tag(ext.MessagingKafkaPartition, msg.Partition), - tracer.Tag("offset", msg.Offset), - tracer.Tag(ext.Component, componentName), - tracer.Tag(ext.SpanKind, ext.SpanKindConsumer), - tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), - tracer.Measured(), - } - if !math.IsNaN(cfg.analyticsRate) { - opts = append(opts, tracer.Tag(ext.EventSampleRate, cfg.analyticsRate)) - } - // kafka supports headers, so try to extract a span context - carrier := NewConsumerMessageCarrier(msg) - if spanctx, err := tracer.Extract(carrier); err == nil { - // If there are span links as a result of context extraction, add them as a StartSpanOption - if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil { - opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks())) - } - opts = append(opts, tracer.ChildOf(spanctx)) - } - next := tracer.StartSpan(cfg.consumerSpanName, opts...) - // reinject the span context so consumers can pick it up - tracer.Inject(next.Context(), carrier) - setConsumeCheckpoint(cfg.dataStreamsEnabled, cfg.groupID, msg) - - wrapped.messages <- msg - - // if the next message was received, finish the previous span - if prev != nil { - prev.Finish() - } - prev = next - } - // finish any remaining span - if prev != nil { - prev.Finish() - } - close(wrapped.messages) - }() - return wrapped -} - -type consumer struct { - sarama.Consumer - opts []Option -} - -// ConsumePartition invokes Consumer.ConsumePartition and wraps the resulting -// PartitionConsumer. -func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) { - pc, err := c.Consumer.ConsumePartition(topic, partition, offset) - if err != nil { - return pc, err - } - return WrapPartitionConsumer(pc, c.opts...), nil -} - -// WrapConsumer wraps a sarama.Consumer wrapping any PartitionConsumer created -// via Consumer.ConsumePartition. -func WrapConsumer(c sarama.Consumer, opts ...Option) sarama.Consumer { - return &consumer{ - Consumer: c, - opts: opts, - } -} - -type syncProducer struct { - sarama.SyncProducer - version sarama.KafkaVersion - cfg *config -} - -// SendMessage calls sarama.SyncProducer.SendMessage and traces the request. -func (p *syncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { - span := startProducerSpan(p.cfg, p.version, msg) - setProduceCheckpoint(p.cfg.dataStreamsEnabled, msg, p.version) - partition, offset, err = p.SyncProducer.SendMessage(msg) - finishProducerSpan(span, partition, offset, err) - if err == nil && p.cfg.dataStreamsEnabled { - tracer.TrackKafkaProduceOffset(msg.Topic, partition, offset) - } - return partition, offset, err -} - -// SendMessages calls sarama.SyncProducer.SendMessages and traces the requests. -func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error { - // although there's only one call made to the SyncProducer, the messages are - // treated individually, so we create a span for each one - spans := make([]ddtrace.Span, len(msgs)) - for i, msg := range msgs { - setProduceCheckpoint(p.cfg.dataStreamsEnabled, msg, p.version) - spans[i] = startProducerSpan(p.cfg, p.version, msg) - } - err := p.SyncProducer.SendMessages(msgs) - for i, span := range spans { - finishProducerSpan(span, msgs[i].Partition, msgs[i].Offset, err) - } - if err == nil && p.cfg.dataStreamsEnabled { - // we only track Kafka lag if messages have been sent successfully. Otherwise, we have no way to know to which partition data was sent to. - for _, msg := range msgs { - tracer.TrackKafkaProduceOffset(msg.Topic, msg.Partition, msg.Offset) - } - } - return err -} - -// WrapSyncProducer wraps a sarama.SyncProducer so that all produced messages -// are traced. -func WrapSyncProducer(saramaConfig *sarama.Config, producer sarama.SyncProducer, opts ...Option) sarama.SyncProducer { - cfg := new(config) - defaults(cfg) - for _, opt := range opts { - opt(cfg) - } - log.Debug("contrib/IBM/sarama: Wrapping Sync Producer: %#v", cfg) - if saramaConfig == nil { - saramaConfig = sarama.NewConfig() - } - return &syncProducer{ - SyncProducer: producer, - version: saramaConfig.Version, - cfg: cfg, - } -} - -type asyncProducer struct { - sarama.AsyncProducer - input chan *sarama.ProducerMessage - successes chan *sarama.ProducerMessage - errors chan *sarama.ProducerError -} - -// Input returns the input channel. -func (p *asyncProducer) Input() chan<- *sarama.ProducerMessage { - return p.input -} - -// Successes returns the successes channel. -func (p *asyncProducer) Successes() <-chan *sarama.ProducerMessage { - return p.successes -} - -// Errors returns the errors channel. -func (p *asyncProducer) Errors() <-chan *sarama.ProducerError { - return p.errors -} - -// WrapAsyncProducer wraps a sarama.AsyncProducer so that all produced messages -// are traced. It requires the underlying sarama Config so we can know whether -// or not successes will be returned. Tracing requires at least sarama.V0_11_0_0 -// version which is the first version that supports headers. Only spans of -// successfully published messages have partition and offset tags set. -func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts ...Option) sarama.AsyncProducer { - cfg := new(config) - defaults(cfg) - for _, opt := range opts { - opt(cfg) - } - log.Debug("contrib/IBM/sarama: Wrapping Async Producer: %#v", cfg) - if saramaConfig == nil { - saramaConfig = sarama.NewConfig() - saramaConfig.Version = sarama.V0_11_0_0 - } else if !saramaConfig.Version.IsAtLeast(sarama.V0_11_0_0) { - log.Error("Tracing Sarama async producer requires at least sarama.V0_11_0_0 version") - } - wrapped := &asyncProducer{ - AsyncProducer: p, - input: make(chan *sarama.ProducerMessage), - successes: make(chan *sarama.ProducerMessage), - errors: make(chan *sarama.ProducerError), - } - go func() { - spans := make(map[uint64]ddtrace.Span) - defer close(wrapped.input) - defer close(wrapped.successes) - defer close(wrapped.errors) - for { - select { - case msg := <-wrapped.input: - span := startProducerSpan(cfg, saramaConfig.Version, msg) - setProduceCheckpoint(cfg.dataStreamsEnabled, msg, saramaConfig.Version) - p.Input() <- msg - if saramaConfig.Producer.Return.Successes { - spanID := span.Context().SpanID() - spans[spanID] = span - } else { - // if returning successes isn't enabled, we just finish the - // span right away because there's no way to know when it will - // be done - span.Finish() - } - case msg, ok := <-p.Successes(): - if !ok { - // producer was closed, so exit - return - } - if cfg.dataStreamsEnabled { - // we only track Kafka lag if returning successes is enabled. Otherwise, we have no way to know to which partition data was sent to. - tracer.TrackKafkaProduceOffset(msg.Topic, msg.Partition, msg.Offset) - } - if spanctx, spanFound := getSpanContext(msg); spanFound { - spanID := spanctx.SpanID() - if span, ok := spans[spanID]; ok { - delete(spans, spanID) - finishProducerSpan(span, msg.Partition, msg.Offset, nil) - } - } - wrapped.successes <- msg - case err, ok := <-p.Errors(): - if !ok { - // producer was closed - return - } - if spanctx, spanFound := getSpanContext(err.Msg); spanFound { - spanID := spanctx.SpanID() - if span, ok := spans[spanID]; ok { - delete(spans, spanID) - span.Finish(tracer.WithError(err)) - } - } - wrapped.errors <- err - } - } - }() - return wrapped -} - -func startProducerSpan(cfg *config, version sarama.KafkaVersion, msg *sarama.ProducerMessage) ddtrace.Span { - carrier := NewProducerMessageCarrier(msg) - opts := []tracer.StartSpanOption{ - tracer.ServiceName(cfg.producerServiceName), - tracer.ResourceName("Produce Topic " + msg.Topic), - tracer.SpanType(ext.SpanTypeMessageProducer), - tracer.Tag(ext.Component, componentName), - tracer.Tag(ext.SpanKind, ext.SpanKindProducer), - tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), - } - if !math.IsNaN(cfg.analyticsRate) { - opts = append(opts, tracer.Tag(ext.EventSampleRate, cfg.analyticsRate)) - } - // if there's a span context in the headers, use that as the parent - if spanctx, err := tracer.Extract(carrier); err == nil { - // If there are span links as a result of context extraction, add them as a StartSpanOption - if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil { - opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks())) - } - opts = append(opts, tracer.ChildOf(spanctx)) - } - span := tracer.StartSpan(cfg.producerSpanName, opts...) - if version.IsAtLeast(sarama.V0_11_0_0) { - // re-inject the span context so consumers can pick it up - tracer.Inject(span.Context(), carrier) - } - return span -} - -func finishProducerSpan(span ddtrace.Span, partition int32, offset int64, err error) { - span.SetTag(ext.MessagingKafkaPartition, partition) - span.SetTag("offset", offset) - span.Finish(tracer.WithError(err)) -} - -func getSpanContext(msg *sarama.ProducerMessage) (ddtrace.SpanContext, bool) { - carrier := NewProducerMessageCarrier(msg) - spanctx, err := tracer.Extract(carrier) - if err != nil { - return nil, false - } - - return spanctx, true -} - -func setProduceCheckpoint(enabled bool, msg *sarama.ProducerMessage, version sarama.KafkaVersion) { - if !enabled || msg == nil { - return - } - edges := []string{"direction:out", "topic:" + msg.Topic, "type:kafka"} - carrier := NewProducerMessageCarrier(msg) - ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, edges...) - if !ok || !version.IsAtLeast(sarama.V0_11_0_0) { - return - } - datastreams.InjectToBase64Carrier(ctx, carrier) -} - -func setConsumeCheckpoint(enabled bool, groupID string, msg *sarama.ConsumerMessage) { - if !enabled || msg == nil { - return - } - edges := []string{"direction:in", "topic:" + msg.Topic, "type:kafka"} - if groupID != "" { - edges = append(edges, "group:"+groupID) - } - carrier := NewConsumerMessageCarrier(msg) - ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getConsumerMsgSize(msg)}, edges...) - if !ok { - return - } - datastreams.InjectToBase64Carrier(ctx, carrier) - if groupID != "" { - // only track Kafka lag if a consumer group is set. - // since there is no ack mechanism, we consider that messages read are committed right away. - tracer.TrackKafkaCommitOffset(groupID, msg.Topic, msg.Partition, msg.Offset) - } -} - -func getProducerMsgSize(msg *sarama.ProducerMessage) (size int64) { - for _, header := range msg.Headers { - size += int64(len(header.Key) + len(header.Value)) - } - if msg.Value != nil { - size += int64(msg.Value.Length()) - } - if msg.Key != nil { - size += int64(msg.Key.Length()) - } - return size -} - -func getConsumerMsgSize(msg *sarama.ConsumerMessage) (size int64) { - for _, header := range msg.Headers { - size += int64(len(header.Key) + len(header.Value)) - } - return size + int64(len(msg.Value)+len(msg.Key)) -} diff --git a/contrib/IBM/sarama.v1/sarama_test.go b/contrib/IBM/sarama.v1/sarama_test.go index 277da155af..8855e2e267 100644 --- a/contrib/IBM/sarama.v1/sarama_test.go +++ b/contrib/IBM/sarama.v1/sarama_test.go @@ -7,12 +7,11 @@ package sarama import ( "context" + "gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/namingschematest" "testing" "time" - "gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/namingschematest" "gopkg.in/DataDog/dd-trace-go.v1/datastreams" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" @@ -21,6 +20,17 @@ import ( "github.com/stretchr/testify/require" ) +var kafkaBrokers = []string{"localhost:9092", "localhost:9093", "localhost:9094"} + +const ( + testGroupID = "gotest" + testTopic = "gotest" +) + +func TestNamingSchema(t *testing.T) { + namingschematest.NewKafkaTest(genTestSpans)(t) +} + func genTestSpans(t *testing.T, serviceOverride string) []mocktracer.Span { var opts []Option if serviceOverride != "" { @@ -82,324 +92,6 @@ func genTestSpans(t *testing.T, serviceOverride string) []mocktracer.Span { return spans } -func TestConsumer(t *testing.T) { - mt := mocktracer.Start() - defer mt.Stop() - - broker := sarama.NewMockBroker(t, 0) - defer broker.Close() - - broker.SetHandlerByMap(map[string]sarama.MockResponse{ - "MetadataRequest": sarama.NewMockMetadataResponse(t). - SetBroker(broker.Addr(), broker.BrokerID()). - SetLeader("test-topic", 0, broker.BrokerID()), - "OffsetRequest": sarama.NewMockOffsetResponse(t). - SetOffset("test-topic", 0, sarama.OffsetOldest, 0). - SetOffset("test-topic", 0, sarama.OffsetNewest, 1), - "FetchRequest": sarama.NewMockFetchResponse(t, 1). - SetMessage("test-topic", 0, 0, sarama.StringEncoder("hello")). - SetMessage("test-topic", 0, 1, sarama.StringEncoder("world")), - }) - cfg := sarama.NewConfig() - cfg.Version = sarama.MinVersion - - client, err := sarama.NewClient([]string{broker.Addr()}, cfg) - require.NoError(t, err) - defer client.Close() - - consumer, err := sarama.NewConsumerFromClient(client) - require.NoError(t, err) - defer consumer.Close() - - consumer = WrapConsumer(consumer, WithDataStreams()) - - partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, 0) - require.NoError(t, err) - msg1 := <-partitionConsumer.Messages() - msg2 := <-partitionConsumer.Messages() - err = partitionConsumer.Close() - require.NoError(t, err) - // wait for the channel to be closed - <-partitionConsumer.Messages() - - spans := mt.FinishedSpans() - require.Len(t, spans, 2) - { - s := spans[0] - spanctx, err := tracer.Extract(NewConsumerMessageCarrier(msg1)) - assert.NoError(t, err) - assert.Equal(t, spanctx.TraceID(), s.TraceID(), - "span context should be injected into the consumer message headers") - - assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, int64(0), s.Tag("offset")) - assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) - assert.Equal(t, "Consume Topic test-topic", s.Tag(ext.ResourceName)) - assert.Equal(t, "queue", s.Tag(ext.SpanType)) - assert.Equal(t, "kafka.consume", s.OperationName()) - assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) - assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind)) - assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) - - p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewConsumerMessageCarrier(msg1))) - require.True(t, ok, "pathway not found in context") - expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:in", "topic:test-topic", "type:kafka") - expected, _ := datastreams.PathwayFromContext(expectedCtx) - assert.NotEqual(t, expected.GetHash(), 0) - assert.Equal(t, expected.GetHash(), p.GetHash()) - } - { - s := spans[1] - spanctx, err := tracer.Extract(NewConsumerMessageCarrier(msg2)) - assert.NoError(t, err) - assert.Equal(t, spanctx.TraceID(), s.TraceID(), - "span context should be injected into the consumer message headers") - - assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, int64(1), s.Tag("offset")) - assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) - assert.Equal(t, "Consume Topic test-topic", s.Tag(ext.ResourceName)) - assert.Equal(t, "queue", s.Tag(ext.SpanType)) - assert.Equal(t, "kafka.consume", s.OperationName()) - assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) - assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind)) - assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) - - p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewConsumerMessageCarrier(msg2))) - require.True(t, ok, "pathway not found in context") - expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:in", "topic:test-topic", "type:kafka") - expected, _ := datastreams.PathwayFromContext(expectedCtx) - assert.NotEqual(t, expected.GetHash(), 0) - assert.Equal(t, expected.GetHash(), p.GetHash()) - } -} - -func TestSyncProducer(t *testing.T) { - mt := mocktracer.Start() - defer mt.Stop() - - seedBroker := sarama.NewMockBroker(t, 1) - defer seedBroker.Close() - - leader := sarama.NewMockBroker(t, 2) - defer leader.Close() - - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.Version = 1 - metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) - metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) - seedBroker.Returns(metadataResponse) - - prodSuccess := new(sarama.ProduceResponse) - prodSuccess.Version = 2 - prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError) - leader.Returns(prodSuccess) - - cfg := sarama.NewConfig() - cfg.Version = sarama.V0_11_0_0 // first version that supports headers - cfg.Producer.Return.Successes = true - - producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg) - require.NoError(t, err) - producer = WrapSyncProducer(cfg, producer, WithDataStreams()) - - msg1 := &sarama.ProducerMessage{ - Topic: "my_topic", - Value: sarama.StringEncoder("test 1"), - Metadata: "test", - } - _, _, err = producer.SendMessage(msg1) - require.NoError(t, err) - - spans := mt.FinishedSpans() - assert.Len(t, spans, 1) - { - s := spans[0] - assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) - assert.Equal(t, "queue", s.Tag(ext.SpanType)) - assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName)) - assert.Equal(t, "kafka.produce", s.OperationName()) - assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, int64(0), s.Tag("offset")) - assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) - assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) - assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) - - p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewProducerMessageCarrier(msg1))) - require.True(t, ok, "pathway not found in context") - expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:my_topic", "type:kafka") - expected, _ := datastreams.PathwayFromContext(expectedCtx) - assert.NotEqual(t, expected.GetHash(), 0) - assert.Equal(t, expected.GetHash(), p.GetHash()) - } -} - -func TestSyncProducerSendMessages(t *testing.T) { - mt := mocktracer.Start() - defer mt.Stop() - - seedBroker := sarama.NewMockBroker(t, 1) - defer seedBroker.Close() - leader := sarama.NewMockBroker(t, 2) - defer leader.Close() - - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.Version = 1 - metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) - metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) - seedBroker.Returns(metadataResponse) - - prodSuccess := new(sarama.ProduceResponse) - prodSuccess.Version = 2 - prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError) - leader.Returns(prodSuccess) - - cfg := sarama.NewConfig() - cfg.Version = sarama.V0_11_0_0 // first version that supports headers - cfg.Producer.Return.Successes = true - cfg.Producer.Flush.Messages = 2 - - producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg) - require.NoError(t, err) - producer = WrapSyncProducer(cfg, producer, WithDataStreams()) - - msg1 := &sarama.ProducerMessage{ - Topic: "my_topic", - Value: sarama.StringEncoder("test 1"), - Metadata: "test", - } - msg2 := &sarama.ProducerMessage{ - Topic: "my_topic", - Value: sarama.StringEncoder("test 2"), - Metadata: "test", - } - err = producer.SendMessages([]*sarama.ProducerMessage{msg1, msg2}) - require.NoError(t, err) - - spans := mt.FinishedSpans() - require.Len(t, spans, 2) - for _, s := range spans { - assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) - assert.Equal(t, "queue", s.Tag(ext.SpanType)) - assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName)) - assert.Equal(t, "kafka.produce", s.OperationName()) - assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) - assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) - assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) - } - - for _, msg := range []*sarama.ProducerMessage{msg1, msg2} { - p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewProducerMessageCarrier(msg))) - if !assert.True(t, ok, "pathway not found in context") { - continue - } - expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:my_topic", "type:kafka") - expected, _ := datastreams.PathwayFromContext(expectedCtx) - assert.NotEqual(t, expected.GetHash(), 0) - assert.Equal(t, expected.GetHash(), p.GetHash()) - } -} - -func TestAsyncProducer(t *testing.T) { - // the default for producers is a fire-and-forget model that doesn't return - // successes - t.Run("Without Successes", func(t *testing.T) { - mt := mocktracer.Start() - defer mt.Stop() - - broker := newMockBroker(t) - - cfg := sarama.NewConfig() - cfg.Version = sarama.V0_11_0_0 - producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg) - require.NoError(t, err) - producer = WrapAsyncProducer(nil, producer, WithDataStreams()) - - msg1 := &sarama.ProducerMessage{ - Topic: "my_topic", - Value: sarama.StringEncoder("test 1"), - } - producer.Input() <- msg1 - - waitForSpans(mt, 1) - - spans := mt.FinishedSpans() - require.Len(t, spans, 1) - { - s := spans[0] - assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) - assert.Equal(t, "queue", s.Tag(ext.SpanType)) - assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName)) - assert.Equal(t, "kafka.produce", s.OperationName()) - - // these tags are set in the finishProducerSpan function, but in this case it's never used, and instead we - // automatically finish spans after being started because we don't have a way to know when they are finished. - assert.Nil(t, s.Tag(ext.MessagingKafkaPartition)) - assert.Nil(t, s.Tag("offset")) - - assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) - assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) - assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) - - p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewProducerMessageCarrier(msg1))) - require.True(t, ok, "pathway not found in context") - expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:my_topic", "type:kafka") - expected, _ := datastreams.PathwayFromContext(expectedCtx) - assert.NotEqual(t, expected.GetHash(), 0) - assert.Equal(t, expected.GetHash(), p.GetHash()) - } - }) - - t.Run("With Successes", func(t *testing.T) { - mt := mocktracer.Start() - defer mt.Stop() - - broker := newMockBroker(t) - - cfg := sarama.NewConfig() - cfg.Version = sarama.V0_11_0_0 - cfg.Producer.Return.Successes = true - - producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg) - require.NoError(t, err) - producer = WrapAsyncProducer(cfg, producer, WithDataStreams()) - - msg1 := &sarama.ProducerMessage{ - Topic: "my_topic", - Value: sarama.StringEncoder("test 1"), - } - producer.Input() <- msg1 - <-producer.Successes() - - spans := mt.FinishedSpans() - require.Len(t, spans, 1) - { - s := spans[0] - assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) - assert.Equal(t, "queue", s.Tag(ext.SpanType)) - assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName)) - assert.Equal(t, "kafka.produce", s.OperationName()) - assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, int64(0), s.Tag("offset")) - assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) - assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) - assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) - - p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewProducerMessageCarrier(msg1))) - require.True(t, ok, "pathway not found in context") - expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:my_topic", "type:kafka") - expected, _ := datastreams.PathwayFromContext(expectedCtx) - assert.NotEqual(t, expected.GetHash(), 0) - assert.Equal(t, expected.GetHash(), p.GetHash()) - } - }) -} - -func TestNamingSchema(t *testing.T) { - namingschematest.NewKafkaTest(genTestSpans)(t) -} - func newMockBroker(t *testing.T) *sarama.MockBroker { broker := sarama.NewMockBroker(t, 1) @@ -433,3 +125,55 @@ func waitForSpans(mt mocktracer.Tracer, sz int) { time.Sleep(time.Millisecond * 100) } } + +func assertDSMProducerPathway(t *testing.T, topic string, msg *sarama.ProducerMessage) { + t.Helper() + + got, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier( + context.Background(), + NewProducerMessageCarrier(msg), + )) + require.True(t, ok, "pathway not found in kafka message") + + ctx, _ := tracer.SetDataStreamsCheckpoint( + context.Background(), + "direction:out", "topic:"+topic, "type:kafka", + ) + want, _ := datastreams.PathwayFromContext(ctx) + + assert.NotEqual(t, want.GetHash(), 0) + assert.Equal(t, want.GetHash(), got.GetHash()) +} + +func assertDSMConsumerPathway(t *testing.T, topic, groupID string, msg *sarama.ConsumerMessage, withProducer bool) { + t.Helper() + + carrier := NewConsumerMessageCarrier(msg) + got, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier( + context.Background(), + carrier, + )) + require.True(t, ok, "pathway not found in kafka message") + + edgeTags := []string{"direction:in", "topic:" + topic, "type:kafka"} + if groupID != "" { + edgeTags = append(edgeTags, "group:"+groupID) + } + + ctx := context.Background() + if withProducer { + ctx, _ = tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:"+testTopic, "type:kafka") + } + + //ctx, _ := tracer.SetDataStreamsCheckpointWithParams( + // datastreams.ExtractFromBase64Carrier(context.Background(), carrier), + // options.CheckpointParams{PayloadSize: getConsumerMsgSize(msg)}, + // edgeTags..., + //) + + ctx, _ = tracer.SetDataStreamsCheckpoint(ctx, edgeTags...) + want, _ := datastreams.PathwayFromContext(ctx) + + assert.NotEqual(t, want.GetHash(), 0) + assert.Equal(t, want.GetHash(), got.GetHash()) +} From 94fa0badb194d6ffe90d9bd755704501a0a7dbe0 Mon Sep 17 00:00:00 2001 From: Rodrigo Arguello Date: Tue, 17 Dec 2024 15:41:31 +0100 Subject: [PATCH 2/3] fix test --- contrib/IBM/sarama.v1/consumer_group_test.go | 4 ++-- contrib/IBM/sarama.v1/dispatcher.go | 6 ------ 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/contrib/IBM/sarama.v1/consumer_group_test.go b/contrib/IBM/sarama.v1/consumer_group_test.go index f2a0ba4796..f448c69b10 100644 --- a/contrib/IBM/sarama.v1/consumer_group_test.go +++ b/contrib/IBM/sarama.v1/consumer_group_test.go @@ -94,7 +94,7 @@ func TestWrapConsumerGroupHandler(t *testing.T) { assert.Equal(t, "Produce Topic gotest", s0.Tag(ext.ResourceName)) assert.Equal(t, "kafka.produce", s0.OperationName()) assert.Equal(t, int32(0), s0.Tag(ext.MessagingKafkaPartition)) - assert.NotEmpty(t, s0.Tag("offset")) + assert.NotNil(t, s0.Tag("offset")) assert.Equal(t, "IBM/sarama", s0.Tag(ext.Component)) assert.Equal(t, ext.SpanKindProducer, s0.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem)) @@ -107,7 +107,7 @@ func TestWrapConsumerGroupHandler(t *testing.T) { assert.Equal(t, "Consume Topic gotest", s1.Tag(ext.ResourceName)) assert.Equal(t, "kafka.consume", s1.OperationName()) assert.Equal(t, int32(0), s1.Tag(ext.MessagingKafkaPartition)) - assert.NotEmpty(t, s1.Tag("offset")) + assert.NotNil(t, s1.Tag("offset")) assert.Equal(t, "IBM/sarama", s1.Tag(ext.Component)) assert.Equal(t, ext.SpanKindConsumer, s1.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem)) diff --git a/contrib/IBM/sarama.v1/dispatcher.go b/contrib/IBM/sarama.v1/dispatcher.go index dafa5577d8..ea1bbfaee9 100644 --- a/contrib/IBM/sarama.v1/dispatcher.go +++ b/contrib/IBM/sarama.v1/dispatcher.go @@ -6,7 +6,6 @@ package sarama import ( - "fmt" "math" "github.com/IBM/sarama" @@ -72,11 +71,6 @@ func (w *wrappedDispatcher) Run() { // reinject the span context so consumers can pick it up tracer.Inject(next.Context(), carrier) setConsumeCheckpoint(w.cfg.dataStreamsEnabled, w.cfg.groupID, msg) - - for _, h := range msg.Headers { - fmt.Printf("--- key: %s, value: %s\n", h.Key, h.Value) - } - w.messages <- msg // if the next message was received, finish the previous span From c2088e03fe320ed0b65aacf14dd7e1f333757e2e Mon Sep 17 00:00:00 2001 From: Rodrigo Arguello Date: Thu, 19 Dec 2024 12:29:47 +0100 Subject: [PATCH 3/3] add group id --- contrib/IBM/sarama.v1/consumer_group_test.go | 4 ++-- contrib/IBM/sarama.v1/example_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/contrib/IBM/sarama.v1/consumer_group_test.go b/contrib/IBM/sarama.v1/consumer_group_test.go index f448c69b10..f258d89fda 100644 --- a/contrib/IBM/sarama.v1/consumer_group_test.go +++ b/contrib/IBM/sarama.v1/consumer_group_test.go @@ -39,7 +39,7 @@ func TestWrapConsumerGroupHandler(t *testing.T) { ready: make(chan bool), rcvMessages: make(chan *sarama.ConsumerMessage, 1), } - tracedHandler := WrapConsumerGroupHandler(handler, WithDataStreams()) + tracedHandler := WrapConsumerGroupHandler(handler, WithDataStreams(), WithGroupID(testGroupID)) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -112,7 +112,7 @@ func TestWrapConsumerGroupHandler(t *testing.T) { assert.Equal(t, ext.SpanKindConsumer, s1.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem)) - assertDSMConsumerPathway(t, testTopic, "", consumeMsg, true) + assertDSMConsumerPathway(t, testTopic, testGroupID, consumeMsg, true) assert.Equal(t, s0.SpanID(), s1.ParentID(), "spans are not parent-child") } diff --git a/contrib/IBM/sarama.v1/example_test.go b/contrib/IBM/sarama.v1/example_test.go index b26eb98391..092a435880 100644 --- a/contrib/IBM/sarama.v1/example_test.go +++ b/contrib/IBM/sarama.v1/example_test.go @@ -100,7 +100,7 @@ func ExampleWrapConsumerGroupHandler() { // trace your sarama.ConsumerGroupHandler implementation var myHandler sarama.ConsumerGroupHandler - handler := saramatrace.WrapConsumerGroupHandler(myHandler) + handler := saramatrace.WrapConsumerGroupHandler(myHandler, saramatrace.WithGroupID(groupID)) ctx := context.Background() for {