Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add instrumentation to allow sending kafka payload size on produce #6228

Merged
merged 11 commits into from
Jan 2, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class KafkaClientTest extends AgentTestRunner {
"type:kafka"
]
edgeTags.size() == 4
payloadSize.minValue > 0.0
}

StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash }
Expand All @@ -194,6 +195,7 @@ class KafkaClientTest extends AgentTestRunner {
"type:kafka"
]
edgeTags.size() == 5
payloadSize.minValue > 0.0
}

cleanup:
Expand Down Expand Up @@ -321,6 +323,7 @@ class KafkaClientTest extends AgentTestRunner {
"type:kafka"
]
edgeTags.size() == 4
payloadSize.minValue > 0.0
}

StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash }
Expand All @@ -333,6 +336,7 @@ class KafkaClientTest extends AgentTestRunner {
"type:kafka"
]
edgeTags.size() == 5
payloadSize.minValue > 0.0
}

cleanup:
Expand Down Expand Up @@ -981,6 +985,7 @@ class KafkaClientTest extends AgentTestRunner {
"type:kafka"
]
edgeTags.size() == 4
payloadSize.minValue > 0.0
}

StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash }
Expand All @@ -993,6 +998,7 @@ class KafkaClientTest extends AgentTestRunner {
"type:kafka"
]
edgeTags.size() == 5
payloadSize.minValue > 0.0
}

cleanup:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static datadog.trace.instrumentation.kafka_common.StreamingContext.STREAMING_CONTEXT;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

Expand All @@ -26,7 +27,9 @@
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
import datadog.trace.bootstrap.instrumentation.api.StatsPoint;
import java.util.LinkedHashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
Expand Down Expand Up @@ -76,6 +79,14 @@ public void adviceTransformations(AdviceTransformation transformation) {
.and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerRecord")))
.and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback"))),
KafkaProducerInstrumentation.class.getName() + "$ProducerAdvice");

transformation.applyAdvice(
isMethod()
.and(isPrivate())
.and(takesArgument(0, int.class))
.and(named("ensureValidRecordSize")), // intercepting this call allows us to see the
// estimated message size
KafkaProducerInstrumentation.class.getName() + "$PayloadSizeAdvice");
}

public static class ProducerAdvice {
Expand Down Expand Up @@ -121,7 +132,12 @@ public static AgentScope onEnter(
try {
propagate().inject(span, record.headers(), SETTER);
if (STREAMING_CONTEXT.empty() || STREAMING_CONTEXT.isSinkTopic(record.topic())) {
propagate().injectPathwayContext(span, record.headers(), SETTER, sortedTags);
// inject the context in the headers, but delay sending the stats until we know the
// message size.
// The stats are saved in the pathway context and sent in PayloadSizeAdvice.
propagate()
.injectPathwayContextWithoutSendingStats(
span, record.headers(), SETTER, sortedTags);
AvroSchemaExtractor.tryExtractProducer(record, span);
}
} catch (final IllegalStateException e) {
Expand All @@ -137,7 +153,9 @@ record =

propagate().inject(span, record.headers(), SETTER);
if (STREAMING_CONTEXT.empty() || STREAMING_CONTEXT.isSinkTopic(record.topic())) {
propagate().injectPathwayContext(span, record.headers(), SETTER, sortedTags);
propagate()
.injectPathwayContextWithoutSendingStats(
span, record.headers(), SETTER, sortedTags);
AvroSchemaExtractor.tryExtractProducer(record, span);
}
}
Expand All @@ -157,4 +175,31 @@ public static void stopSpan(
scope.close();
}
}

public static class PayloadSizeAdvice {

/**
* Instrumentation for the method KafkaProducer.ensureValidRecordSize that is called as part of
* sending a kafka payload. This gives us access to an estimate of the payload size "for free",
* that we send as a metric.
*/
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize) {
StatsPoint saved = activeSpan().context().getPathwayContext().getSavedStats();
if (saved != null) {
// create new stats including the payload size
StatsPoint updated =
new StatsPoint(
saved.getEdgeTags(),
saved.getHash(),
saved.getParentHash(),
saved.getTimestampNanos(),
saved.getPathwayLatencyNano(),
saved.getEdgeLatencyNano(),
estimatedPayloadSize);
// then send the point
AgentTracer.get().getDataStreamsMonitoring().add(updated);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public <C> void injectPathwayContext(
C carrier,
AgentPropagation.Setter<C> setter,
LinkedHashMap<String, String> sortedTags) {
injectPathwayContext(span, carrier, setter, sortedTags, 0, 0);
injectPathwayContext(span, carrier, setter, sortedTags, 0, 0, true);
}

public <C> void injectPathwayContext(
Expand All @@ -34,14 +34,37 @@ public <C> void injectPathwayContext(
LinkedHashMap<String, String> sortedTags,
long defaultTimestamp,
long payloadSizeBytes) {
PathwayContext pathwayContext = span.context().getPathwayContext();
injectPathwayContext(
span, carrier, setter, sortedTags, defaultTimestamp, payloadSizeBytes, false);
bantonsson marked this conversation as resolved.
Show resolved Hide resolved
}

/** Same as injectPathwayContext, but the stats collected in the StatsPoint are not sent. */
public <C> void injectPathwayContextWithoutSendingStats(
AgentSpan span,
C carrier,
AgentPropagation.Setter<C> setter,
LinkedHashMap<String, String> sortedTags) {
injectPathwayContext(span, carrier, setter, sortedTags, 0, 0, false);
}

private <C> void injectPathwayContext(
AgentSpan span,
C carrier,
AgentPropagation.Setter<C> setter,
LinkedHashMap<String, String> sortedTags,
long defaultTimestamp,
long payloadSizeBytes,
boolean sendCheckpoint) {
PathwayContext pathwayContext = span.context().getPathwayContext();
if (pathwayContext == null
|| (span.traceConfig() != null && !span.traceConfig().isDataStreamsEnabled())) {
return;
}
pathwayContext.setCheckpoint(
sortedTags, dataStreamsMonitoring::add, defaultTimestamp, payloadSizeBytes);
sortedTags,
sendCheckpoint ? dataStreamsMonitoring::add : pathwayContext::saveStats,
defaultTimestamp,
payloadSizeBytes);

boolean injected =
setter instanceof AgentPropagation.BinarySetter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class DefaultPathwayContext implements PathwayContext {
private long pathwayStartNanos;
private long pathwayStartNanoTicks;
private long edgeStartNanoTicks;
private StatsPoint savedStats;
private long hash;
private boolean started;
// state variables used to memoize the pathway hash with
Expand Down Expand Up @@ -174,7 +175,7 @@ public void setCheckpoint(
allTags,
newHash,
hash,
timeSource.getCurrentTimeNanos(),
startNanos,
pathwayLatencyNano,
edgeLatencyNano,
payloadSizeBytes);
Expand All @@ -188,6 +189,16 @@ public void setCheckpoint(
}
}

@Override
public void saveStats(StatsPoint point) {
this.savedStats = point;
}

@Override
public StatsPoint getSavedStats() {
return this.savedStats;
}

@Override
public byte[] encode() throws IOException {
lock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ public <C> void injectPathwayContext(
span, carrier, setter, sortedTags, defaultTimestamp, payloadSizeBytes);
}

@Override
public <C> void injectPathwayContextWithoutSendingStats(
AgentSpan span, C carrier, Setter<C> setter, LinkedHashMap<String, String> sortedTags) {
this.dataStreamContextInjector.injectPathwayContextWithoutSendingStats(
span, carrier, setter, sortedTags);
}

@Override
public <C> AgentSpan.Context.Extracted extract(final C carrier, final ContextVisitor<C> getter) {
return extractor.extract(carrier, getter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ <C> void injectPathwayContext(
long defaultTimestamp,
long payloadSizeBytes);

<C> void injectPathwayContextWithoutSendingStats(
AgentSpan span, C carrier, Setter<C> setter, LinkedHashMap<String, String> sortedTags);

interface Setter<C> {
void set(C carrier, String key, String value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,10 @@ public <C> void injectPathwayContext(
long defaultTimestamp,
long payloadSizeBytes) {}

@Override
public <C> void injectPathwayContextWithoutSendingStats(
AgentSpan span, C carrier, Setter<C> setter, LinkedHashMap<String, String> sortedTags) {}

@Override
public <C> Context.Extracted extract(final C carrier, final ContextVisitor<C> getter) {
return NoopContext.INSTANCE;
Expand Down Expand Up @@ -1102,6 +1106,14 @@ public void setCheckpoint(
public void setCheckpoint(
LinkedHashMap<String, String> sortedTags, Consumer<StatsPoint> pointConsumer) {}

@Override
public void saveStats(StatsPoint point) {}

@Override
public StatsPoint getSavedStats() {
return null;
}

@Override
public byte[] encode() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ void setCheckpoint(
// The input tags should be sorted.
void setCheckpoint(LinkedHashMap<String, String> sortedTags, Consumer<StatsPoint> pointConsumer);

void saveStats(StatsPoint point);

StatsPoint getSavedStats();

byte[] encode() throws IOException;

String strEncode() throws IOException;
Expand Down
Loading