From 53081cc25cd0659b7281d3bd0efcead198754cb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Fri, 13 Oct 2023 11:48:31 +0200 Subject: [PATCH] track payload size for kafka integration --- .../KafkaConsumerInstrumentation.java | 3 +- .../kafka_clients/TracingIterator.java | 3 +- .../instrumentation/kafka_clients/Utils.java | 19 ++++++ .../KafkaStreamTaskInstrumentation.java | 17 +++++- .../DefaultDataStreamsMonitoring.java | 16 ++++- .../datastreams/DefaultPathwayContext.java | 14 ++++- .../MsgPackDatastreamsPayloadWriter.java | 11 +++- .../trace/core/datastreams/StatsBucket.java | 5 +- .../trace/core/datastreams/StatsGroup.java | 11 +++- .../datastreams/DataStreamsWritingTest.groovy | 29 +++++---- .../DefaultDataStreamsMonitoringTest.groovy | 60 +++++++++---------- .../DefaultPathwayContextTest.groovy | 22 +++++++ .../api/AgentDataStreamsMonitoring.java | 18 ++++++ .../instrumentation/api/AgentTracer.java | 14 +++++ .../instrumentation/api/PathwayContext.java | 6 ++ .../instrumentation/api/StatsPoint.java | 11 +++- 16 files changed, 202 insertions(+), 57 deletions(-) create mode 100644 dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/Utils.java diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java index bf840a76e83..a3161879a86 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java @@ -48,7 +48,8 @@ public String[] helperClassNames() { packageName + ".TracingIterator", packageName + ".TracingList", packageName + ".TracingListIterator", - packageName + ".Base64Decoder" + packageName + ".Base64Decoder", + packageName + ".Utils" }; } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java index 57449cfbe05..7e14f23d69d 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java @@ -13,6 +13,7 @@ import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.KAFKA_DELIVER; import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.TIME_IN_QUEUE_ENABLED; import static datadog.trace.instrumentation.kafka_clients.TextMapExtractAdapter.GETTER; +import static datadog.trace.instrumentation.kafka_clients.Utils.computePayloadSizeBytes; import static java.util.concurrent.TimeUnit.MILLISECONDS; import datadog.trace.api.Config; @@ -92,7 +93,7 @@ protected void startNewRecordSpan(ConsumerRecord val) { AgentTracer.get() .getDataStreamsMonitoring() - .setCheckpoint(span, sortedTags, val.timestamp()); + .setCheckpoint(span, sortedTags, val.timestamp(), computePayloadSizeBytes(val)); } else { span = startSpan(operationName, null); } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/Utils.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/Utils.java new file mode 100644 index 00000000000..a07d333be57 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/Utils.java @@ -0,0 +1,19 @@ +package datadog.trace.instrumentation.kafka_clients; + +import java.nio.charset.StandardCharsets; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; + +public class Utils { + // this method is used in kafka-clients and kafka-streams instrumentations + public static long computePayloadSizeBytes(ConsumerRecord val) { + long headersSize = 0; + Headers headers = val.headers(); + if (headers != null) + for (Header h : headers) { + headersSize += h.value().length + h.key().getBytes(StandardCharsets.UTF_8).length; + } + return headersSize + val.serializedKeySize() + val.serializedValueSize(); + } +} diff --git a/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java index c01a54328d5..871b3fdc240 100644 --- a/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java @@ -9,6 +9,7 @@ import static datadog.trace.core.datastreams.TagsProcessor.GROUP_TAG; import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG; import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG; +import static datadog.trace.instrumentation.kafka_clients.Utils.computePayloadSizeBytes; import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.BROKER_DECORATE; import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.CONSUMER_DECORATE; import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.KAFKA_CONSUME; @@ -36,6 +37,7 @@ import java.util.Map; import net.bytebuddy.asm.Advice; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; @@ -59,6 +61,7 @@ public String instrumentedType() { public String[] helperClassNames() { return new String[] { "datadog.trace.instrumentation.kafka_clients.TracingIterableDelegator", + "datadog.trace.instrumentation.kafka_clients.Utils", packageName + ".KafkaStreamsDecorator", packageName + ".ProcessorRecordContextVisitor", packageName + ".StampedRecordContextVisitor", @@ -244,7 +247,8 @@ public static void start( sortedTags.put(TYPE_TAG, "kafka"); AgentTracer.get() .getDataStreamsMonitoring() - .setCheckpoint(span, sortedTags, record.timestamp); + .setCheckpoint( + span, sortedTags, record.timestamp, computePayloadSizeBytes(record.value)); } else { span = startSpan(KAFKA_CONSUME, null); } @@ -307,9 +311,18 @@ public static void start( } sortedTags.put(TOPIC_TAG, record.topic()); sortedTags.put(TYPE_TAG, "kafka"); + + long payloadSize = 0; + // we have to go through Object to get the RecordMetadata here because the class of `record` + // only implements it after 2.7 (and this class is only used if v >= 2.7) + if ((Object) record instanceof RecordMetadata) { // should always be true + RecordMetadata metadata = (RecordMetadata) (Object) record; + payloadSize = metadata.serializedKeySize() + metadata.serializedValueSize(); + } + AgentTracer.get() .getDataStreamsMonitoring() - .setCheckpoint(span, sortedTags, record.timestamp()); + .setCheckpoint(span, sortedTags, record.timestamp(), payloadSize); } else { span = startSpan(KAFKA_CONSUME, null); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index c1e6b465606..d1c3b9930eb 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -52,9 +52,10 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even static final long DEFAULT_BUCKET_DURATION_NANOS = TimeUnit.SECONDS.toNanos(10); static final long FEATURE_CHECK_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(5); - private static final StatsPoint REPORT = new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0); + private static final StatsPoint REPORT = + new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0); private static final StatsPoint POISON_PILL = - new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0); + new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0); private final Map timeToBucket = new HashMap<>(); private final BlockingQueue inbox = new MpscBlockingConsumerArrayQueue<>(1024); @@ -205,9 +206,18 @@ public void trackBacklog(LinkedHashMap sortedTags, long value) { @Override public void setCheckpoint( AgentSpan span, LinkedHashMap sortedTags, long defaultTimestamp) { + setCheckpoint(span, sortedTags, defaultTimestamp, 0); + } + + @Override + public void setCheckpoint( + AgentSpan span, + LinkedHashMap sortedTags, + long defaultTimestamp, + long payloadSizeBytes) { PathwayContext pathwayContext = span.context().getPathwayContext(); if (pathwayContext != null) { - pathwayContext.setCheckpoint(sortedTags, this::add, defaultTimestamp); + pathwayContext.setCheckpoint(sortedTags, this::add, defaultTimestamp, payloadSizeBytes); if (pathwayContext.getHash() != 0) { span.setTag(PATHWAY_HASH, Long.toUnsignedString(pathwayContext.getHash())); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java index eac3d1a310e..7e4beb835fc 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java @@ -95,7 +95,7 @@ public long getHash() { @Override public void setCheckpoint( LinkedHashMap sortedTags, Consumer pointConsumer) { - setCheckpoint(sortedTags, pointConsumer, 0); + setCheckpoint(sortedTags, pointConsumer, 0, 0); } @Override @@ -103,6 +103,15 @@ public void setCheckpoint( LinkedHashMap sortedTags, Consumer pointConsumer, long defaultTimestamp) { + setCheckpoint(sortedTags, pointConsumer, defaultTimestamp, 0); + } + + @Override + public void setCheckpoint( + LinkedHashMap sortedTags, + Consumer pointConsumer, + long defaultTimestamp, + long payloadSizeBytes) { long startNanos = timeSource.getCurrentTimeNanos(); long nanoTicks = timeSource.getNanoTicks(); lock.lock(); @@ -167,7 +176,8 @@ public void setCheckpoint( hash, timeSource.getCurrentTimeNanos(), pathwayLatencyNano, - edgeLatencyNano); + edgeLatencyNano, + payloadSizeBytes); edgeStartNanoTicks = nanoTicks; hash = newHash; diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java index 0c31e52fa08..461e5457673 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java @@ -23,6 +23,7 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter private static final byte[] DURATION = "Duration".getBytes(ISO_8859_1); private static final byte[] PATHWAY_LATENCY = "PathwayLatency".getBytes(ISO_8859_1); private static final byte[] EDGE_LATENCY = "EdgeLatency".getBytes(ISO_8859_1); + private static final byte[] PAYLOAD_SIZE = "PayloadSize".getBytes(ISO_8859_1); private static final byte[] SERVICE = "Service".getBytes(ISO_8859_1); private static final byte[] EDGE_TAGS = "EdgeTags".getBytes(ISO_8859_1); private static final byte[] BACKLOGS = "Backlogs".getBytes(ISO_8859_1); @@ -118,7 +119,7 @@ private void writeBucket(StatsBucket bucket, Writable packer) { for (StatsGroup group : groups) { boolean firstNode = group.getEdgeTags().isEmpty(); - packer.startMap(firstNode ? 4 : 5); + packer.startMap(firstNode ? 5 : 6); /* 1 */ packer.writeUTF8(PATHWAY_LATENCY); @@ -129,15 +130,19 @@ private void writeBucket(StatsBucket bucket, Writable packer) { packer.writeBinary(group.getEdgeLatency().serialize()); /* 3 */ + packer.writeUTF8(PAYLOAD_SIZE); + packer.writeBinary(group.getPayloadSize().serialize()); + + /* 4 */ packer.writeUTF8(HASH); packer.writeUnsignedLong(group.getHash()); - /* 4 */ + /* 5 */ packer.writeUTF8(PARENT_HASH); packer.writeUnsignedLong(group.getParentHash()); if (!firstNode) { - /* 5 */ + /* 6 */ packer.writeUTF8(EDGE_TAGS); packer.startArray(group.getEdgeTags().size()); for (String tag : group.getEdgeTags()) { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java index 59fe6149547..de58e3cccf2 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java @@ -29,7 +29,10 @@ public void addPoint(StatsPoint statsPoint) { hashToGroup.put(statsPoint.getHash(), statsGroup); } - statsGroup.add(statsPoint.getPathwayLatencyNano(), statsPoint.getEdgeLatencyNano()); + statsGroup.add( + statsPoint.getPathwayLatencyNano(), + statsPoint.getEdgeLatencyNano(), + statsPoint.getPayloadSizeBytes()); } public void addBacklog(Backlog backlog) { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsGroup.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsGroup.java index 4034758e936..bbdb7dcc4e2 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsGroup.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsGroup.java @@ -12,6 +12,7 @@ public class StatsGroup { private final long parentHash; private final Histogram pathwayLatency; private final Histogram edgeLatency; + private final Histogram payloadSize; public StatsGroup(List edgeTags, long hash, long parentHash) { this.edgeTags = edgeTags; @@ -19,11 +20,15 @@ public StatsGroup(List edgeTags, long hash, long parentHash) { this.parentHash = parentHash; pathwayLatency = Histograms.newHistogram(); edgeLatency = Histograms.newHistogram(); + payloadSize = Histograms.newHistogram(); } - public void add(long pathwayLatencyNano, long edgeLatencyNano) { + public void add(long pathwayLatencyNano, long edgeLatencyNano, long payloadSizeBytes) { pathwayLatency.accept(((double) pathwayLatencyNano) / NANOSECONDS_TO_SECOND); edgeLatency.accept(((double) edgeLatencyNano) / NANOSECONDS_TO_SECOND); + // payload size is set to zero when we cannot compute it + // in that case, it's probably better to have an empty histogram than filling it with zeros + if (payloadSizeBytes != 0) payloadSize.accept((double) payloadSizeBytes); } public List getEdgeTags() { @@ -46,6 +51,10 @@ public Histogram getEdgeLatency() { return edgeLatency; } + public Histogram getPayloadSize() { + return payloadSize; + } + @Override public String toString() { return "StatsGroup{" diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy index e1ec2533b88..35b5a595754 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy @@ -20,11 +20,10 @@ import spock.lang.AutoCleanup import spock.lang.Shared import spock.util.concurrent.PollingConditions -import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer import static DefaultDataStreamsMonitoring.DEFAULT_BUCKET_DURATION_NANOS +import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer import static java.util.concurrent.TimeUnit.SECONDS - /** * This test class exists because a real integration test is not possible. see DataStreamsIntegrationTest */ @@ -79,15 +78,15 @@ class DataStreamsWritingTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(fakeConfig, sharedCommObjects, timeSource, { traceConfig }) dataStreams.start() - dataStreams.add(new StatsPoint([], 9, 0, timeSource.currentTimeNanos, 0, 0)) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) - dataStreams.trackBacklog(new LinkedHashMap<>(["partition":"1", "topic":"testTopic", "type":"kafka_produce"]), 100) - dataStreams.trackBacklog(new LinkedHashMap<>(["partition":"1", "topic":"testTopic", "type":"kafka_produce"]), 130) + dataStreams.add(new StatsPoint([], 9, 0, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "1", "topic": "testTopic", "type": "kafka_produce"]), 100) + dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "1", "topic": "testTopic", "type": "kafka_produce"]), 130) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10))) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10), 10)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5))) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5), 5)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0, 2)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -132,15 +131,17 @@ class DataStreamsWritingTest extends DDCoreSpecification { assert unpacker.unpackString() == "Stats" assert unpacker.unpackArrayHeader() == 2 // 2 groups in first bucket - Set availableSizes = [4, 5] // we don't know the order the groups will be reported + Set availableSizes = [5, 6] // we don't know the order the groups will be reported 2.times { int mapHeaderSize = unpacker.unpackMapHeader() assert availableSizes.remove(mapHeaderSize) - if (mapHeaderSize == 4) { // empty topic group + if (mapHeaderSize == 5) { // empty topic group assert unpacker.unpackString() == "PathwayLatency" unpacker.skipValue() assert unpacker.unpackString() == "EdgeLatency" unpacker.skipValue() + assert unpacker.unpackString() == "PayloadSize" + unpacker.skipValue() assert unpacker.unpackString() == "Hash" assert unpacker.unpackLong() == 9 assert unpacker.unpackString() == "ParentHash" @@ -150,6 +151,8 @@ class DataStreamsWritingTest extends DDCoreSpecification { unpacker.skipValue() assert unpacker.unpackString() == "EdgeLatency" unpacker.skipValue() + assert unpacker.unpackString() == "PayloadSize" + unpacker.skipValue() assert unpacker.unpackString() == "Hash" assert unpacker.unpackLong() == 1 assert unpacker.unpackString() == "ParentHash" @@ -185,11 +188,13 @@ class DataStreamsWritingTest extends DDCoreSpecification { Set availableHashes = [1L, 3L] // we don't know the order the groups will be reported 2.times { - assert unpacker.unpackMapHeader() == 5 + assert unpacker.unpackMapHeader() == 6 assert unpacker.unpackString() == "PathwayLatency" unpacker.skipValue() assert unpacker.unpackString() == "EdgeLatency" unpacker.skipValue() + assert unpacker.unpackString() == "PayloadSize" + unpacker.skipValue() assert unpacker.unpackString() == "Hash" def hash = unpacker.unpackLong() assert availableHashes.remove(hash) diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy index 1a734ba9057..ce99bae9041 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy @@ -38,7 +38,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 0, 0, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 0, 0, timeSource.currentTimeNanos, 0, 0, 0)) dataStreams.report() then: @@ -97,7 +97,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -143,7 +143,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, bucketDuration) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(bucketDuration) then: @@ -186,9 +186,9 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 3, 4, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 3, 4, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l) dataStreams.report() @@ -232,9 +232,9 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l) dataStreams.close() @@ -288,11 +288,11 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.trackBacklog(new LinkedHashMap<>(["consumer_group":"testGroup", "partition":"2", "topic":"testTopic", "type":"kafka_commit"]), 23) - dataStreams.trackBacklog(new LinkedHashMap<>(["consumer_group":"testGroup", "partition":"2", "topic":"testTopic", "type":"kafka_commit"]), 24) - dataStreams.trackBacklog(new LinkedHashMap<>(["partition":"2", "topic":"testTopic", "type":"kafka_produce"]), 23) - dataStreams.trackBacklog(new LinkedHashMap<>(["partition":"2", "topic":"testTopic2", "type":"kafka_produce"]), 23) - dataStreams.trackBacklog(new LinkedHashMap<>(["partition":"2", "topic":"testTopic", "type":"kafka_produce"]), 45) + dataStreams.trackBacklog(new LinkedHashMap<>(["consumer_group": "testGroup", "partition": "2", "topic": "testTopic", "type": "kafka_commit"]), 23) + dataStreams.trackBacklog(new LinkedHashMap<>(["consumer_group": "testGroup", "partition": "2", "topic": "testTopic", "type": "kafka_commit"]), 24) + dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "2", "topic": "testTopic", "type": "kafka_produce"]), 23) + dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "2", "topic": "testTopic2", "type": "kafka_produce"]), 23) + dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "2", "topic": "testTopic", "type": "kafka_produce"]), 45) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -343,9 +343,9 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -400,12 +400,12 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10))) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10), 10)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5))) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5), 5)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -477,7 +477,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "reporting points when data streams is not supported" def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -507,7 +507,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { timeSource.advance(FEATURE_CHECK_INTERVAL_NANOS) dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -553,7 +553,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { dataStreams.start() supportsDataStreaming = false dataStreams.onEvent(EventListener.EventType.DOWNGRADED, "") - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -570,7 +570,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { timeSource.advance(FEATURE_CHECK_INTERVAL_NANOS) dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -615,7 +615,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "reporting points when data streams is not enabled" def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -631,7 +631,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { dsmEnabled = true dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -664,7 +664,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "submitting points after being disabled" payloadWriter.buckets.clear() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -698,7 +698,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "reporting points when data streams is not supported" def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -715,7 +715,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { timeSource.advance(FEATURE_CHECK_INTERVAL_NANOS) dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -729,7 +729,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { dsmEnabled = true dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -774,7 +774,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "reporting points when data streams is not supported" def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -790,7 +790,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { dsmEnabled = true dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy index 7c94a829b97..5888c87a291 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy @@ -36,6 +36,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { assert point.parentHash == 0 assert point.pathwayLatencyNano == 0 assert point.edgeLatencyNano == 0 + assert point.payloadSizeBytes == 0 } def "First Set checkpoint starts the context."() { @@ -79,6 +80,27 @@ class DefaultPathwayContextTest extends DDCoreSpecification { } } + def "Checkpoint with payload size"() { + given: + def timeSource = new ControllableTimeSource() + def context = new DefaultPathwayContext(timeSource, wellKnownTags) + + when: + timeSource.advance(25) + context.setCheckpoint( + new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer, 0, 72) + + then: + context.isStarted() + pointConsumer.points.size() == 1 + with(pointConsumer.points[0]) { + edgeTags == ["group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 3 + hash != 0 + payloadSizeBytes == 72 + } + } + def "Multiple checkpoints generated"() { given: def timeSource = new ControllableTimeSource() diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java index afc3730e577..911605898e5 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java @@ -19,6 +19,24 @@ public interface AgentDataStreamsMonitoring extends DataStreamsCheckpointer { void setCheckpoint( AgentSpan span, LinkedHashMap sortedTags, long defaultTimestamp); + /** + * Sets data streams checkpoint, used for both produce and consume operations. + * + * @param span active span + * @param sortedTags alphabetically sorted tags for the checkpoint (direction, queue type etc) + * @param defaultTimestamp unix timestamp to use as a start of the pathway if this is the first + * checkpoint in the chain. Zero should be passed if we can't extract the timestamp from the + * message / payload itself (for instance: produce operations; http produce / consume etc). + * Value will be ignored for checkpoints happening not at the start of the pipeline. + * @param payloadSizeBytes size of the message (body + headers) in bytes. Zero should be passed if + * the size cannot be evaluated. + */ + void setCheckpoint( + AgentSpan span, + LinkedHashMap sortedTags, + long defaultTimestamp, + long payloadSizeBytes); + PathwayContext newPathwayContext(); void add(StatsPoint statsPoint); diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java index e1fc08400b4..467e9c5457b 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java @@ -1022,6 +1022,13 @@ public void trackBacklog(LinkedHashMap sortedTags, long value) { public void setCheckpoint( AgentSpan span, LinkedHashMap sortedTags, long defaultTimestamp) {} + @Override + public void setCheckpoint( + AgentSpan span, + LinkedHashMap sortedTags, + long defaultTimestamp, + long payloadSizeBytes) {} + @Override public PathwayContext newPathwayContext() { return NoopPathwayContext.INSTANCE; @@ -1052,6 +1059,13 @@ public long getHash() { return 0L; } + @Override + public void setCheckpoint( + LinkedHashMap sortedTags, + Consumer pointConsumer, + long defaultTimestamp, + long payloadSizeBytes) {} + @Override public void setCheckpoint( LinkedHashMap sortedTags, diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/PathwayContext.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/PathwayContext.java index 38ad2dc4d43..0419c6f7844 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/PathwayContext.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/PathwayContext.java @@ -13,6 +13,12 @@ public interface PathwayContext { long getHash(); + void setCheckpoint( + LinkedHashMap sortedTags, + Consumer pointConsumer, + long defaultTimestamp, + long payloadSizeBytes); + void setCheckpoint( LinkedHashMap sortedTags, Consumer pointConsumer, diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/StatsPoint.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/StatsPoint.java index 024b315df09..caa6540699b 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/StatsPoint.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/StatsPoint.java @@ -9,6 +9,7 @@ public class StatsPoint implements InboxItem { private final long timestampNanos; private final long pathwayLatencyNano; private final long edgeLatencyNano; + private final long payloadSizeBytes; public StatsPoint( List edgeTags, @@ -16,13 +17,15 @@ public StatsPoint( long parentHash, long timestampNanos, long pathwayLatencyNano, - long edgeLatencyNano) { + long edgeLatencyNano, + long payloadSizeBytes) { this.edgeTags = edgeTags; this.hash = hash; this.parentHash = parentHash; this.timestampNanos = timestampNanos; this.pathwayLatencyNano = pathwayLatencyNano; this.edgeLatencyNano = edgeLatencyNano; + this.payloadSizeBytes = payloadSizeBytes; } public List getEdgeTags() { @@ -49,6 +52,10 @@ public long getEdgeLatencyNano() { return edgeLatencyNano; } + public long getPayloadSizeBytes() { + return payloadSizeBytes; + } + @Override public String toString() { return "StatsPoint{" @@ -65,6 +72,8 @@ public String toString() { + pathwayLatencyNano + ", edgeLatencyNano=" + edgeLatencyNano + + ", payloadSizeBytes=" + + payloadSizeBytes + '}'; } }