From 17c097d2c07df68e7b0135623208f9a0e26d4550 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Tue, 31 Oct 2023 11:17:30 +0100 Subject: [PATCH] Track payload size for Kafka integration (#6045) --- .../decorator/HttpServerDecorator.java | 2 +- .../grpc/server/TracingServerInterceptor.java | 4 +- .../aws/v2/sqs/TracingIterator.java | 2 +- .../grpc/server/TracingServerInterceptor.java | 4 +- .../KafkaConsumerInstrumentation.java | 3 +- .../kafka_clients/TracingIterator.java | 3 +- .../instrumentation/kafka_clients/Utils.java | 21 +++++++ .../KafkaStreamTaskInstrumentation.java | 17 +++++- .../rabbitmq/amqp/RabbitDecorator.java | 4 +- .../DefaultDataStreamsMonitoring.java | 14 +++-- .../datastreams/DefaultPathwayContext.java | 14 ++++- .../MsgPackDatastreamsPayloadWriter.java | 11 +++- .../trace/core/datastreams/StatsBucket.java | 5 +- .../trace/core/datastreams/StatsGroup.java | 11 +++- .../datastreams/DataStreamsWritingTest.groovy | 29 +++++---- .../DefaultDataStreamsMonitoringTest.groovy | 60 +++++++++---------- .../DefaultPathwayContextTest.groovy | 22 +++++++ .../api/AgentDataStreamsMonitoring.java | 7 ++- .../instrumentation/api/AgentTracer.java | 12 +++- .../instrumentation/api/PathwayContext.java | 6 ++ .../instrumentation/api/StatsPoint.java | 11 +++- 21 files changed, 196 insertions(+), 66 deletions(-) create mode 100644 dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/Utils.java diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java index f3523391515..334b2d665ba 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java @@ -143,7 +143,7 @@ public AgentSpan startSpan( } AgentPropagation.ContextVisitor getter = getter(); if (null != carrier && null != getter) { - tracer().getDataStreamsMonitoring().setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0); + tracer().getDataStreamsMonitoring().setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0); } return span; } diff --git a/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/TracingServerInterceptor.java b/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/TracingServerInterceptor.java index 9e7aed6b735..158c7c72d01 100644 --- a/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/TracingServerInterceptor.java +++ b/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/TracingServerInterceptor.java @@ -68,7 +68,9 @@ public ServerCall.Listener interceptCall( final AgentSpan span = startSpan(DECORATE.instrumentationNames()[0], GRPC_SERVER, spanContext).setMeasured(true); - AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0); + AgentTracer.get() + .getDataStreamsMonitoring() + .setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0); RequestContext reqContext = span.getRequestContext(); if (reqContext != null) { diff --git a/dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java b/dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java index c5b47dff8af..ee58cbf3538 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java +++ b/dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java @@ -91,7 +91,7 @@ protected void startNewMessageSpan(Message message) { sortedTags.put(DIRECTION_TAG, DIRECTION_IN); sortedTags.put(TOPIC_TAG, urlFileName(queueUrl)); sortedTags.put(TYPE_TAG, "sqs"); - AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, sortedTags, 0); + AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, sortedTags, 0, 0); CONSUMER_DECORATE.afterStart(span); CONSUMER_DECORATE.onConsume(span, queueUrl, requestId); diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java index 9deaaa32e11..a238d7ac5f6 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java @@ -67,7 +67,9 @@ public ServerCall.Listener interceptCall( CallbackProvider cbp = tracer.getCallbackProvider(RequestContextSlot.APPSEC); final AgentSpan span = startSpan(GRPC_SERVER, spanContext).setMeasured(true); - AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0); + AgentTracer.get() + .getDataStreamsMonitoring() + .setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0); RequestContext reqContext = span.getRequestContext(); if (reqContext != null) { diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java index bfbbfd39278..ee9ef210ef9 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java @@ -47,7 +47,8 @@ public String[] helperClassNames() { packageName + ".TracingIterable", packageName + ".TracingIterator", packageName + ".TracingList", - packageName + ".TracingListIterator" + packageName + ".TracingListIterator", + packageName + ".Utils" }; } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java index 57449cfbe05..7e14f23d69d 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java @@ -13,6 +13,7 @@ import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.KAFKA_DELIVER; import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.TIME_IN_QUEUE_ENABLED; import static datadog.trace.instrumentation.kafka_clients.TextMapExtractAdapter.GETTER; +import static datadog.trace.instrumentation.kafka_clients.Utils.computePayloadSizeBytes; import static java.util.concurrent.TimeUnit.MILLISECONDS; import datadog.trace.api.Config; @@ -92,7 +93,7 @@ protected void startNewRecordSpan(ConsumerRecord val) { AgentTracer.get() .getDataStreamsMonitoring() - .setCheckpoint(span, sortedTags, val.timestamp()); + .setCheckpoint(span, sortedTags, val.timestamp(), computePayloadSizeBytes(val)); } else { span = startSpan(operationName, null); } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/Utils.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/Utils.java new file mode 100644 index 00000000000..5a1fdc86e50 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/Utils.java @@ -0,0 +1,21 @@ +package datadog.trace.instrumentation.kafka_clients; + +import java.nio.charset.StandardCharsets; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; + +public final class Utils { + private Utils() {} // prevent instanciation + + // this method is used in kafka-clients and kafka-streams instrumentations + public static long computePayloadSizeBytes(ConsumerRecord val) { + long headersSize = 0; + Headers headers = val.headers(); + if (headers != null) + for (Header h : headers) { + headersSize += h.value().length + h.key().getBytes(StandardCharsets.UTF_8).length; + } + return headersSize + val.serializedKeySize() + val.serializedValueSize(); + } +} diff --git a/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java index c01a54328d5..871b3fdc240 100644 --- a/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java @@ -9,6 +9,7 @@ import static datadog.trace.core.datastreams.TagsProcessor.GROUP_TAG; import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG; import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG; +import static datadog.trace.instrumentation.kafka_clients.Utils.computePayloadSizeBytes; import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.BROKER_DECORATE; import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.CONSUMER_DECORATE; import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.KAFKA_CONSUME; @@ -36,6 +37,7 @@ import java.util.Map; import net.bytebuddy.asm.Advice; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; @@ -59,6 +61,7 @@ public String instrumentedType() { public String[] helperClassNames() { return new String[] { "datadog.trace.instrumentation.kafka_clients.TracingIterableDelegator", + "datadog.trace.instrumentation.kafka_clients.Utils", packageName + ".KafkaStreamsDecorator", packageName + ".ProcessorRecordContextVisitor", packageName + ".StampedRecordContextVisitor", @@ -244,7 +247,8 @@ public static void start( sortedTags.put(TYPE_TAG, "kafka"); AgentTracer.get() .getDataStreamsMonitoring() - .setCheckpoint(span, sortedTags, record.timestamp); + .setCheckpoint( + span, sortedTags, record.timestamp, computePayloadSizeBytes(record.value)); } else { span = startSpan(KAFKA_CONSUME, null); } @@ -307,9 +311,18 @@ public static void start( } sortedTags.put(TOPIC_TAG, record.topic()); sortedTags.put(TYPE_TAG, "kafka"); + + long payloadSize = 0; + // we have to go through Object to get the RecordMetadata here because the class of `record` + // only implements it after 2.7 (and this class is only used if v >= 2.7) + if ((Object) record instanceof RecordMetadata) { // should always be true + RecordMetadata metadata = (RecordMetadata) (Object) record; + payloadSize = metadata.serializedKeySize() + metadata.serializedValueSize(); + } + AgentTracer.get() .getDataStreamsMonitoring() - .setCheckpoint(span, sortedTags, record.timestamp()); + .setCheckpoint(span, sortedTags, record.timestamp(), payloadSize); } else { span = startSpan(KAFKA_CONSUME, null); } diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java index e83e1065c84..8b8aba561e3 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java @@ -247,7 +247,9 @@ public static AgentScope startReceivingSpan( sortedTags.put(DIRECTION_TAG, DIRECTION_IN); sortedTags.put(TOPIC_TAG, queue); sortedTags.put(TYPE_TAG, "rabbitmq"); - AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, sortedTags, produceMillis); + AgentTracer.get() + .getDataStreamsMonitoring() + .setCheckpoint(span, sortedTags, produceMillis, 0); } CONSUMER_DECORATE.afterStart(span); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index c1e6b465606..c1106e52114 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -52,9 +52,10 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even static final long DEFAULT_BUCKET_DURATION_NANOS = TimeUnit.SECONDS.toNanos(10); static final long FEATURE_CHECK_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(5); - private static final StatsPoint REPORT = new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0); + private static final StatsPoint REPORT = + new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0); private static final StatsPoint POISON_PILL = - new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0); + new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0); private final Map timeToBucket = new HashMap<>(); private final BlockingQueue inbox = new MpscBlockingConsumerArrayQueue<>(1024); @@ -204,10 +205,13 @@ public void trackBacklog(LinkedHashMap sortedTags, long value) { @Override public void setCheckpoint( - AgentSpan span, LinkedHashMap sortedTags, long defaultTimestamp) { + AgentSpan span, + LinkedHashMap sortedTags, + long defaultTimestamp, + long payloadSizeBytes) { PathwayContext pathwayContext = span.context().getPathwayContext(); if (pathwayContext != null) { - pathwayContext.setCheckpoint(sortedTags, this::add, defaultTimestamp); + pathwayContext.setCheckpoint(sortedTags, this::add, defaultTimestamp, payloadSizeBytes); if (pathwayContext.getHash() != 0) { span.setTag(PATHWAY_HASH, Long.toUnsignedString(pathwayContext.getHash())); } @@ -233,7 +237,7 @@ public void setConsumeCheckpoint(String type, String source, DataStreamsContextC sortedTags.put(TOPIC_TAG, source); sortedTags.put(TYPE_TAG, type); - setCheckpoint(span, sortedTags, 0); + setCheckpoint(span, sortedTags, 0, 0); } @Override diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java index eac3d1a310e..7e4beb835fc 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java @@ -95,7 +95,7 @@ public long getHash() { @Override public void setCheckpoint( LinkedHashMap sortedTags, Consumer pointConsumer) { - setCheckpoint(sortedTags, pointConsumer, 0); + setCheckpoint(sortedTags, pointConsumer, 0, 0); } @Override @@ -103,6 +103,15 @@ public void setCheckpoint( LinkedHashMap sortedTags, Consumer pointConsumer, long defaultTimestamp) { + setCheckpoint(sortedTags, pointConsumer, defaultTimestamp, 0); + } + + @Override + public void setCheckpoint( + LinkedHashMap sortedTags, + Consumer pointConsumer, + long defaultTimestamp, + long payloadSizeBytes) { long startNanos = timeSource.getCurrentTimeNanos(); long nanoTicks = timeSource.getNanoTicks(); lock.lock(); @@ -167,7 +176,8 @@ public void setCheckpoint( hash, timeSource.getCurrentTimeNanos(), pathwayLatencyNano, - edgeLatencyNano); + edgeLatencyNano, + payloadSizeBytes); edgeStartNanoTicks = nanoTicks; hash = newHash; diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java index 0c31e52fa08..461e5457673 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java @@ -23,6 +23,7 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter private static final byte[] DURATION = "Duration".getBytes(ISO_8859_1); private static final byte[] PATHWAY_LATENCY = "PathwayLatency".getBytes(ISO_8859_1); private static final byte[] EDGE_LATENCY = "EdgeLatency".getBytes(ISO_8859_1); + private static final byte[] PAYLOAD_SIZE = "PayloadSize".getBytes(ISO_8859_1); private static final byte[] SERVICE = "Service".getBytes(ISO_8859_1); private static final byte[] EDGE_TAGS = "EdgeTags".getBytes(ISO_8859_1); private static final byte[] BACKLOGS = "Backlogs".getBytes(ISO_8859_1); @@ -118,7 +119,7 @@ private void writeBucket(StatsBucket bucket, Writable packer) { for (StatsGroup group : groups) { boolean firstNode = group.getEdgeTags().isEmpty(); - packer.startMap(firstNode ? 4 : 5); + packer.startMap(firstNode ? 5 : 6); /* 1 */ packer.writeUTF8(PATHWAY_LATENCY); @@ -129,15 +130,19 @@ private void writeBucket(StatsBucket bucket, Writable packer) { packer.writeBinary(group.getEdgeLatency().serialize()); /* 3 */ + packer.writeUTF8(PAYLOAD_SIZE); + packer.writeBinary(group.getPayloadSize().serialize()); + + /* 4 */ packer.writeUTF8(HASH); packer.writeUnsignedLong(group.getHash()); - /* 4 */ + /* 5 */ packer.writeUTF8(PARENT_HASH); packer.writeUnsignedLong(group.getParentHash()); if (!firstNode) { - /* 5 */ + /* 6 */ packer.writeUTF8(EDGE_TAGS); packer.startArray(group.getEdgeTags().size()); for (String tag : group.getEdgeTags()) { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java index 59fe6149547..de58e3cccf2 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java @@ -29,7 +29,10 @@ public void addPoint(StatsPoint statsPoint) { hashToGroup.put(statsPoint.getHash(), statsGroup); } - statsGroup.add(statsPoint.getPathwayLatencyNano(), statsPoint.getEdgeLatencyNano()); + statsGroup.add( + statsPoint.getPathwayLatencyNano(), + statsPoint.getEdgeLatencyNano(), + statsPoint.getPayloadSizeBytes()); } public void addBacklog(Backlog backlog) { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsGroup.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsGroup.java index 4034758e936..bbdb7dcc4e2 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsGroup.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsGroup.java @@ -12,6 +12,7 @@ public class StatsGroup { private final long parentHash; private final Histogram pathwayLatency; private final Histogram edgeLatency; + private final Histogram payloadSize; public StatsGroup(List edgeTags, long hash, long parentHash) { this.edgeTags = edgeTags; @@ -19,11 +20,15 @@ public StatsGroup(List edgeTags, long hash, long parentHash) { this.parentHash = parentHash; pathwayLatency = Histograms.newHistogram(); edgeLatency = Histograms.newHistogram(); + payloadSize = Histograms.newHistogram(); } - public void add(long pathwayLatencyNano, long edgeLatencyNano) { + public void add(long pathwayLatencyNano, long edgeLatencyNano, long payloadSizeBytes) { pathwayLatency.accept(((double) pathwayLatencyNano) / NANOSECONDS_TO_SECOND); edgeLatency.accept(((double) edgeLatencyNano) / NANOSECONDS_TO_SECOND); + // payload size is set to zero when we cannot compute it + // in that case, it's probably better to have an empty histogram than filling it with zeros + if (payloadSizeBytes != 0) payloadSize.accept((double) payloadSizeBytes); } public List getEdgeTags() { @@ -46,6 +51,10 @@ public Histogram getEdgeLatency() { return edgeLatency; } + public Histogram getPayloadSize() { + return payloadSize; + } + @Override public String toString() { return "StatsGroup{" diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy index e1ec2533b88..35b5a595754 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy @@ -20,11 +20,10 @@ import spock.lang.AutoCleanup import spock.lang.Shared import spock.util.concurrent.PollingConditions -import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer import static DefaultDataStreamsMonitoring.DEFAULT_BUCKET_DURATION_NANOS +import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer import static java.util.concurrent.TimeUnit.SECONDS - /** * This test class exists because a real integration test is not possible. see DataStreamsIntegrationTest */ @@ -79,15 +78,15 @@ class DataStreamsWritingTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(fakeConfig, sharedCommObjects, timeSource, { traceConfig }) dataStreams.start() - dataStreams.add(new StatsPoint([], 9, 0, timeSource.currentTimeNanos, 0, 0)) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) - dataStreams.trackBacklog(new LinkedHashMap<>(["partition":"1", "topic":"testTopic", "type":"kafka_produce"]), 100) - dataStreams.trackBacklog(new LinkedHashMap<>(["partition":"1", "topic":"testTopic", "type":"kafka_produce"]), 130) + dataStreams.add(new StatsPoint([], 9, 0, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) + dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "1", "topic": "testTopic", "type": "kafka_produce"]), 100) + dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "1", "topic": "testTopic", "type": "kafka_produce"]), 130) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10))) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10), 10)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5))) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5), 5)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0, 2)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -132,15 +131,17 @@ class DataStreamsWritingTest extends DDCoreSpecification { assert unpacker.unpackString() == "Stats" assert unpacker.unpackArrayHeader() == 2 // 2 groups in first bucket - Set availableSizes = [4, 5] // we don't know the order the groups will be reported + Set availableSizes = [5, 6] // we don't know the order the groups will be reported 2.times { int mapHeaderSize = unpacker.unpackMapHeader() assert availableSizes.remove(mapHeaderSize) - if (mapHeaderSize == 4) { // empty topic group + if (mapHeaderSize == 5) { // empty topic group assert unpacker.unpackString() == "PathwayLatency" unpacker.skipValue() assert unpacker.unpackString() == "EdgeLatency" unpacker.skipValue() + assert unpacker.unpackString() == "PayloadSize" + unpacker.skipValue() assert unpacker.unpackString() == "Hash" assert unpacker.unpackLong() == 9 assert unpacker.unpackString() == "ParentHash" @@ -150,6 +151,8 @@ class DataStreamsWritingTest extends DDCoreSpecification { unpacker.skipValue() assert unpacker.unpackString() == "EdgeLatency" unpacker.skipValue() + assert unpacker.unpackString() == "PayloadSize" + unpacker.skipValue() assert unpacker.unpackString() == "Hash" assert unpacker.unpackLong() == 1 assert unpacker.unpackString() == "ParentHash" @@ -185,11 +188,13 @@ class DataStreamsWritingTest extends DDCoreSpecification { Set availableHashes = [1L, 3L] // we don't know the order the groups will be reported 2.times { - assert unpacker.unpackMapHeader() == 5 + assert unpacker.unpackMapHeader() == 6 assert unpacker.unpackString() == "PathwayLatency" unpacker.skipValue() assert unpacker.unpackString() == "EdgeLatency" unpacker.skipValue() + assert unpacker.unpackString() == "PayloadSize" + unpacker.skipValue() assert unpacker.unpackString() == "Hash" def hash = unpacker.unpackLong() assert availableHashes.remove(hash) diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy index 1a734ba9057..ce99bae9041 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy @@ -38,7 +38,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 0, 0, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 0, 0, timeSource.currentTimeNanos, 0, 0, 0)) dataStreams.report() then: @@ -97,7 +97,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -143,7 +143,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, bucketDuration) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(bucketDuration) then: @@ -186,9 +186,9 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 3, 4, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 3, 4, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l) dataStreams.report() @@ -232,9 +232,9 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l) dataStreams.close() @@ -288,11 +288,11 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.trackBacklog(new LinkedHashMap<>(["consumer_group":"testGroup", "partition":"2", "topic":"testTopic", "type":"kafka_commit"]), 23) - dataStreams.trackBacklog(new LinkedHashMap<>(["consumer_group":"testGroup", "partition":"2", "topic":"testTopic", "type":"kafka_commit"]), 24) - dataStreams.trackBacklog(new LinkedHashMap<>(["partition":"2", "topic":"testTopic", "type":"kafka_produce"]), 23) - dataStreams.trackBacklog(new LinkedHashMap<>(["partition":"2", "topic":"testTopic2", "type":"kafka_produce"]), 23) - dataStreams.trackBacklog(new LinkedHashMap<>(["partition":"2", "topic":"testTopic", "type":"kafka_produce"]), 45) + dataStreams.trackBacklog(new LinkedHashMap<>(["consumer_group": "testGroup", "partition": "2", "topic": "testTopic", "type": "kafka_commit"]), 23) + dataStreams.trackBacklog(new LinkedHashMap<>(["consumer_group": "testGroup", "partition": "2", "topic": "testTopic", "type": "kafka_commit"]), 24) + dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "2", "topic": "testTopic", "type": "kafka_produce"]), 23) + dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "2", "topic": "testTopic2", "type": "kafka_produce"]), 23) + dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "2", "topic": "testTopic", "type": "kafka_produce"]), 45) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -343,9 +343,9 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -400,12 +400,12 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10))) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10), 10)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5))) - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5), 5)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -477,7 +477,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "reporting points when data streams is not supported" def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -507,7 +507,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { timeSource.advance(FEATURE_CHECK_INTERVAL_NANOS) dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -553,7 +553,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { dataStreams.start() supportsDataStreaming = false dataStreams.onEvent(EventListener.EventType.DOWNGRADED, "") - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -570,7 +570,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { timeSource.advance(FEATURE_CHECK_INTERVAL_NANOS) dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -615,7 +615,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "reporting points when data streams is not enabled" def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -631,7 +631,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { dsmEnabled = true dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -664,7 +664,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "submitting points after being disabled" payloadWriter.buckets.clear() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -698,7 +698,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "reporting points when data streams is not supported" def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -715,7 +715,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { timeSource.advance(FEATURE_CHECK_INTERVAL_NANOS) dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -729,7 +729,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { dsmEnabled = true dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -774,7 +774,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { when: "reporting points when data streams is not supported" def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) dataStreams.start() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() @@ -790,7 +790,7 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { dsmEnabled = true dataStreams.report() - dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0)) + dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0)) timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) dataStreams.report() diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy index 7c94a829b97..5888c87a291 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy @@ -36,6 +36,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { assert point.parentHash == 0 assert point.pathwayLatencyNano == 0 assert point.edgeLatencyNano == 0 + assert point.payloadSizeBytes == 0 } def "First Set checkpoint starts the context."() { @@ -79,6 +80,27 @@ class DefaultPathwayContextTest extends DDCoreSpecification { } } + def "Checkpoint with payload size"() { + given: + def timeSource = new ControllableTimeSource() + def context = new DefaultPathwayContext(timeSource, wellKnownTags) + + when: + timeSource.advance(25) + context.setCheckpoint( + new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer, 0, 72) + + then: + context.isStarted() + pointConsumer.points.size() == 1 + with(pointConsumer.points[0]) { + edgeTags == ["group:group", "topic:topic", "type:kafka"] + edgeTags.size() == 3 + hash != 0 + payloadSizeBytes == 72 + } + } + def "Multiple checkpoints generated"() { given: def timeSource = new ControllableTimeSource() diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java index afc3730e577..97185d488a5 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentDataStreamsMonitoring.java @@ -15,9 +15,14 @@ public interface AgentDataStreamsMonitoring extends DataStreamsCheckpointer { * checkpoint in the chain. Zero should be passed if we can't extract the timestamp from the * message / payload itself (for instance: produce operations; http produce / consume etc). * Value will be ignored for checkpoints happening not at the start of the pipeline. + * @param payloadSizeBytes size of the message (body + headers) in bytes. Zero should be passed if + * the size cannot be evaluated. */ void setCheckpoint( - AgentSpan span, LinkedHashMap sortedTags, long defaultTimestamp); + AgentSpan span, + LinkedHashMap sortedTags, + long defaultTimestamp, + long payloadSizeBytes); PathwayContext newPathwayContext(); diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java index e1fc08400b4..ed910b31b65 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java @@ -1020,7 +1020,10 @@ public void trackBacklog(LinkedHashMap sortedTags, long value) { @Override public void setCheckpoint( - AgentSpan span, LinkedHashMap sortedTags, long defaultTimestamp) {} + AgentSpan span, + LinkedHashMap sortedTags, + long defaultTimestamp, + long payloadSizeBytes) {} @Override public PathwayContext newPathwayContext() { @@ -1052,6 +1055,13 @@ public long getHash() { return 0L; } + @Override + public void setCheckpoint( + LinkedHashMap sortedTags, + Consumer pointConsumer, + long defaultTimestamp, + long payloadSizeBytes) {} + @Override public void setCheckpoint( LinkedHashMap sortedTags, diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/PathwayContext.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/PathwayContext.java index 38ad2dc4d43..0419c6f7844 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/PathwayContext.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/PathwayContext.java @@ -13,6 +13,12 @@ public interface PathwayContext { long getHash(); + void setCheckpoint( + LinkedHashMap sortedTags, + Consumer pointConsumer, + long defaultTimestamp, + long payloadSizeBytes); + void setCheckpoint( LinkedHashMap sortedTags, Consumer pointConsumer, diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/StatsPoint.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/StatsPoint.java index 024b315df09..caa6540699b 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/StatsPoint.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/StatsPoint.java @@ -9,6 +9,7 @@ public class StatsPoint implements InboxItem { private final long timestampNanos; private final long pathwayLatencyNano; private final long edgeLatencyNano; + private final long payloadSizeBytes; public StatsPoint( List edgeTags, @@ -16,13 +17,15 @@ public StatsPoint( long parentHash, long timestampNanos, long pathwayLatencyNano, - long edgeLatencyNano) { + long edgeLatencyNano, + long payloadSizeBytes) { this.edgeTags = edgeTags; this.hash = hash; this.parentHash = parentHash; this.timestampNanos = timestampNanos; this.pathwayLatencyNano = pathwayLatencyNano; this.edgeLatencyNano = edgeLatencyNano; + this.payloadSizeBytes = payloadSizeBytes; } public List getEdgeTags() { @@ -49,6 +52,10 @@ public long getEdgeLatencyNano() { return edgeLatencyNano; } + public long getPayloadSizeBytes() { + return payloadSizeBytes; + } + @Override public String toString() { return "StatsPoint{" @@ -65,6 +72,8 @@ public String toString() { + pathwayLatencyNano + ", edgeLatencyNano=" + edgeLatencyNano + + ", payloadSizeBytes=" + + payloadSizeBytes + '}'; } }