diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaClientTest.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaClientTest.groovy index eafe94a23db..6aa13bbd17a 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaClientTest.groovy +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/latestDepTest/groovy/KafkaClientTest.groovy @@ -182,6 +182,7 @@ class KafkaClientTest extends AgentTestRunner { "type:kafka" ] edgeTags.size() == 4 + payloadSize.minValue > 0.0 } StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash } @@ -194,6 +195,7 @@ class KafkaClientTest extends AgentTestRunner { "type:kafka" ] edgeTags.size() == 5 + payloadSize.minValue > 0.0 } cleanup: @@ -321,6 +323,7 @@ class KafkaClientTest extends AgentTestRunner { "type:kafka" ] edgeTags.size() == 4 + payloadSize.minValue > 0.0 } StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash } @@ -333,6 +336,7 @@ class KafkaClientTest extends AgentTestRunner { "type:kafka" ] edgeTags.size() == 5 + payloadSize.minValue > 0.0 } cleanup: @@ -981,6 +985,7 @@ class KafkaClientTest extends AgentTestRunner { "type:kafka" ] edgeTags.size() == 4 + payloadSize.minValue > 0.0 } StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash } @@ -993,6 +998,7 @@ class KafkaClientTest extends AgentTestRunner { "type:kafka" ] edgeTags.size() == 5 + payloadSize.minValue > 0.0 } cleanup: diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 6e89064ee81..46de0f8ce1a 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -17,6 +17,7 @@ import static datadog.trace.instrumentation.kafka_common.StreamingContext.STREAMING_CONTEXT; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPrivate; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; @@ -26,7 +27,9 @@ import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; +import datadog.trace.bootstrap.instrumentation.api.StatsPoint; import java.util.LinkedHashMap; import java.util.Map; import net.bytebuddy.asm.Advice; @@ -76,6 +79,14 @@ public void adviceTransformations(AdviceTransformation transformation) { .and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerRecord"))) .and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback"))), KafkaProducerInstrumentation.class.getName() + "$ProducerAdvice"); + + transformation.applyAdvice( + isMethod() + .and(isPrivate()) + .and(takesArgument(0, int.class)) + .and(named("ensureValidRecordSize")), // intercepting this call allows us to see the + // estimated message size + KafkaProducerInstrumentation.class.getName() + "$PayloadSizeAdvice"); } public static class ProducerAdvice { @@ -121,7 +132,12 @@ public static AgentScope onEnter( try { propagate().inject(span, record.headers(), SETTER); if (STREAMING_CONTEXT.empty() || STREAMING_CONTEXT.isSinkTopic(record.topic())) { - propagate().injectPathwayContext(span, record.headers(), SETTER, sortedTags); + // inject the context in the headers, but delay sending the stats until we know the + // message size. + // The stats are saved in the pathway context and sent in PayloadSizeAdvice. + propagate() + .injectPathwayContextWithoutSendingStats( + span, record.headers(), SETTER, sortedTags); AvroSchemaExtractor.tryExtractProducer(record, span); } } catch (final IllegalStateException e) { @@ -137,7 +153,9 @@ record = propagate().inject(span, record.headers(), SETTER); if (STREAMING_CONTEXT.empty() || STREAMING_CONTEXT.isSinkTopic(record.topic())) { - propagate().injectPathwayContext(span, record.headers(), SETTER, sortedTags); + propagate() + .injectPathwayContextWithoutSendingStats( + span, record.headers(), SETTER, sortedTags); AvroSchemaExtractor.tryExtractProducer(record, span); } } @@ -157,4 +175,31 @@ public static void stopSpan( scope.close(); } } + + public static class PayloadSizeAdvice { + + /** + * Instrumentation for the method KafkaProducer.ensureValidRecordSize that is called as part of + * sending a kafka payload. This gives us access to an estimate of the payload size "for free", + * that we send as a metric. + */ + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize) { + StatsPoint saved = activeSpan().context().getPathwayContext().getSavedStats(); + if (saved != null) { + // create new stats including the payload size + StatsPoint updated = + new StatsPoint( + saved.getEdgeTags(), + saved.getHash(), + saved.getParentHash(), + saved.getTimestampNanos(), + saved.getPathwayLatencyNano(), + saved.getEdgeLatencyNano(), + estimatedPayloadSize); + // then send the point + AgentTracer.get().getDataStreamsMonitoring().add(updated); + } + } + } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java index 2858facd4b9..d354a72c457 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextInjector.java @@ -24,7 +24,7 @@ public void injectPathwayContext( C carrier, AgentPropagation.Setter setter, LinkedHashMap sortedTags) { - injectPathwayContext(span, carrier, setter, sortedTags, 0, 0); + injectPathwayContext(span, carrier, setter, sortedTags, 0, 0, true); } public void injectPathwayContext( @@ -34,14 +34,37 @@ public void injectPathwayContext( LinkedHashMap sortedTags, long defaultTimestamp, long payloadSizeBytes) { - PathwayContext pathwayContext = span.context().getPathwayContext(); + injectPathwayContext( + span, carrier, setter, sortedTags, defaultTimestamp, payloadSizeBytes, true); + } + + /** Same as injectPathwayContext, but the stats collected in the StatsPoint are not sent. */ + public void injectPathwayContextWithoutSendingStats( + AgentSpan span, + C carrier, + AgentPropagation.Setter setter, + LinkedHashMap sortedTags) { + injectPathwayContext(span, carrier, setter, sortedTags, 0, 0, false); + } + private void injectPathwayContext( + AgentSpan span, + C carrier, + AgentPropagation.Setter setter, + LinkedHashMap sortedTags, + long defaultTimestamp, + long payloadSizeBytes, + boolean sendCheckpoint) { + PathwayContext pathwayContext = span.context().getPathwayContext(); if (pathwayContext == null || (span.traceConfig() != null && !span.traceConfig().isDataStreamsEnabled())) { return; } pathwayContext.setCheckpoint( - sortedTags, dataStreamsMonitoring::add, defaultTimestamp, payloadSizeBytes); + sortedTags, + sendCheckpoint ? dataStreamsMonitoring::add : pathwayContext::saveStats, + defaultTimestamp, + payloadSizeBytes); boolean injected = setter instanceof AgentPropagation.BinarySetter diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java index 7e4beb835fc..340af2c5596 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java @@ -44,6 +44,7 @@ public class DefaultPathwayContext implements PathwayContext { private long pathwayStartNanos; private long pathwayStartNanoTicks; private long edgeStartNanoTicks; + private StatsPoint savedStats; private long hash; private boolean started; // state variables used to memoize the pathway hash with @@ -174,7 +175,7 @@ public void setCheckpoint( allTags, newHash, hash, - timeSource.getCurrentTimeNanos(), + startNanos, pathwayLatencyNano, edgeLatencyNano, payloadSizeBytes); @@ -188,6 +189,16 @@ public void setCheckpoint( } } + @Override + public void saveStats(StatsPoint point) { + this.savedStats = point; + } + + @Override + public StatsPoint getSavedStats() { + return this.savedStats; + } + @Override public byte[] encode() throws IOException { lock.lock(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/propagation/CorePropagation.java b/dd-trace-core/src/main/java/datadog/trace/core/propagation/CorePropagation.java index 01d882020d1..135fff03275 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/propagation/CorePropagation.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/propagation/CorePropagation.java @@ -83,6 +83,13 @@ public void injectPathwayContext( span, carrier, setter, sortedTags, defaultTimestamp, payloadSizeBytes); } + @Override + public void injectPathwayContextWithoutSendingStats( + AgentSpan span, C carrier, Setter setter, LinkedHashMap sortedTags) { + this.dataStreamContextInjector.injectPathwayContextWithoutSendingStats( + span, carrier, setter, sortedTags); + } + @Override public AgentSpan.Context.Extracted extract(final C carrier, final ContextVisitor getter) { return extractor.extract(carrier, getter); diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentPropagation.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentPropagation.java index 1a084bee969..5edcf448690 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentPropagation.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentPropagation.java @@ -22,6 +22,9 @@ void injectPathwayContext( long defaultTimestamp, long payloadSizeBytes); + void injectPathwayContextWithoutSendingStats( + AgentSpan span, C carrier, Setter setter, LinkedHashMap sortedTags); + interface Setter { void set(C carrier, String key, String value); } diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java index f0dcba51b92..0e9c9564ddf 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java @@ -882,6 +882,10 @@ public void injectPathwayContext( long defaultTimestamp, long payloadSizeBytes) {} + @Override + public void injectPathwayContextWithoutSendingStats( + AgentSpan span, C carrier, Setter setter, LinkedHashMap sortedTags) {} + @Override public Context.Extracted extract(final C carrier, final ContextVisitor getter) { return NoopContext.INSTANCE; @@ -1102,6 +1106,14 @@ public void setCheckpoint( public void setCheckpoint( LinkedHashMap sortedTags, Consumer pointConsumer) {} + @Override + public void saveStats(StatsPoint point) {} + + @Override + public StatsPoint getSavedStats() { + return null; + } + @Override public byte[] encode() { return null; diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/PathwayContext.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/PathwayContext.java index 0419c6f7844..b9cd729fc19 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/PathwayContext.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/PathwayContext.java @@ -27,6 +27,10 @@ void setCheckpoint( // The input tags should be sorted. void setCheckpoint(LinkedHashMap sortedTags, Consumer pointConsumer); + void saveStats(StatsPoint point); + + StatsPoint getSavedStats(); + byte[] encode() throws IOException; String strEncode() throws IOException;