Skip to content

Commit

Permalink
add instrumentation to allow sending kafka payload size on produce (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
vandonr committed Jan 2, 2024
1 parent 8f20bc6 commit 7440eac
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class KafkaClientTest extends AgentTestRunner {
"type:kafka"
]
edgeTags.size() == 4
payloadSize.minValue > 0.0
}

StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash }
Expand All @@ -194,6 +195,7 @@ class KafkaClientTest extends AgentTestRunner {
"type:kafka"
]
edgeTags.size() == 5
payloadSize.minValue > 0.0
}

cleanup:
Expand Down Expand Up @@ -321,6 +323,7 @@ class KafkaClientTest extends AgentTestRunner {
"type:kafka"
]
edgeTags.size() == 4
payloadSize.minValue > 0.0
}

StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash }
Expand All @@ -333,6 +336,7 @@ class KafkaClientTest extends AgentTestRunner {
"type:kafka"
]
edgeTags.size() == 5
payloadSize.minValue > 0.0
}

cleanup:
Expand Down Expand Up @@ -981,6 +985,7 @@ class KafkaClientTest extends AgentTestRunner {
"type:kafka"
]
edgeTags.size() == 4
payloadSize.minValue > 0.0
}
StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash }
Expand All @@ -993,6 +998,7 @@ class KafkaClientTest extends AgentTestRunner {
"type:kafka"
]
edgeTags.size() == 5
payloadSize.minValue > 0.0
}
cleanup:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static datadog.trace.instrumentation.kafka_common.StreamingContext.STREAMING_CONTEXT;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

Expand All @@ -26,7 +27,9 @@
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
import datadog.trace.bootstrap.instrumentation.api.StatsPoint;
import java.util.LinkedHashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
Expand Down Expand Up @@ -76,6 +79,14 @@ public void adviceTransformations(AdviceTransformation transformation) {
.and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerRecord")))
.and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback"))),
KafkaProducerInstrumentation.class.getName() + "$ProducerAdvice");

transformation.applyAdvice(
isMethod()
.and(isPrivate())
.and(takesArgument(0, int.class))
.and(named("ensureValidRecordSize")), // intercepting this call allows us to see the
// estimated message size
KafkaProducerInstrumentation.class.getName() + "$PayloadSizeAdvice");
}

public static class ProducerAdvice {
Expand Down Expand Up @@ -121,7 +132,12 @@ public static AgentScope onEnter(
try {
propagate().inject(span, record.headers(), SETTER);
if (STREAMING_CONTEXT.empty() || STREAMING_CONTEXT.isSinkTopic(record.topic())) {
propagate().injectPathwayContext(span, record.headers(), SETTER, sortedTags);
// inject the context in the headers, but delay sending the stats until we know the
// message size.
// The stats are saved in the pathway context and sent in PayloadSizeAdvice.
propagate()
.injectPathwayContextWithoutSendingStats(
span, record.headers(), SETTER, sortedTags);
AvroSchemaExtractor.tryExtractProducer(record, span);
}
} catch (final IllegalStateException e) {
Expand All @@ -137,7 +153,9 @@ record =

propagate().inject(span, record.headers(), SETTER);
if (STREAMING_CONTEXT.empty() || STREAMING_CONTEXT.isSinkTopic(record.topic())) {
propagate().injectPathwayContext(span, record.headers(), SETTER, sortedTags);
propagate()
.injectPathwayContextWithoutSendingStats(
span, record.headers(), SETTER, sortedTags);
AvroSchemaExtractor.tryExtractProducer(record, span);
}
}
Expand All @@ -157,4 +175,31 @@ public static void stopSpan(
scope.close();
}
}

public static class PayloadSizeAdvice {

/**
* Instrumentation for the method KafkaProducer.ensureValidRecordSize that is called as part of
* sending a kafka payload. This gives us access to an estimate of the payload size "for free",
* that we send as a metric.
*/
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize) {
StatsPoint saved = activeSpan().context().getPathwayContext().getSavedStats();
if (saved != null) {
// create new stats including the payload size
StatsPoint updated =
new StatsPoint(
saved.getEdgeTags(),
saved.getHash(),
saved.getParentHash(),
saved.getTimestampNanos(),
saved.getPathwayLatencyNano(),
saved.getEdgeLatencyNano(),
estimatedPayloadSize);
// then send the point
AgentTracer.get().getDataStreamsMonitoring().add(updated);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public <C> void injectPathwayContext(
C carrier,
AgentPropagation.Setter<C> setter,
LinkedHashMap<String, String> sortedTags) {
injectPathwayContext(span, carrier, setter, sortedTags, 0, 0);
injectPathwayContext(span, carrier, setter, sortedTags, 0, 0, true);
}

public <C> void injectPathwayContext(
Expand All @@ -34,14 +34,37 @@ public <C> void injectPathwayContext(
LinkedHashMap<String, String> sortedTags,
long defaultTimestamp,
long payloadSizeBytes) {
PathwayContext pathwayContext = span.context().getPathwayContext();
injectPathwayContext(
span, carrier, setter, sortedTags, defaultTimestamp, payloadSizeBytes, true);
}

/** Same as injectPathwayContext, but the stats collected in the StatsPoint are not sent. */
public <C> void injectPathwayContextWithoutSendingStats(
AgentSpan span,
C carrier,
AgentPropagation.Setter<C> setter,
LinkedHashMap<String, String> sortedTags) {
injectPathwayContext(span, carrier, setter, sortedTags, 0, 0, false);
}

private <C> void injectPathwayContext(
AgentSpan span,
C carrier,
AgentPropagation.Setter<C> setter,
LinkedHashMap<String, String> sortedTags,
long defaultTimestamp,
long payloadSizeBytes,
boolean sendCheckpoint) {
PathwayContext pathwayContext = span.context().getPathwayContext();
if (pathwayContext == null
|| (span.traceConfig() != null && !span.traceConfig().isDataStreamsEnabled())) {
return;
}
pathwayContext.setCheckpoint(
sortedTags, dataStreamsMonitoring::add, defaultTimestamp, payloadSizeBytes);
sortedTags,
sendCheckpoint ? dataStreamsMonitoring::add : pathwayContext::saveStats,
defaultTimestamp,
payloadSizeBytes);

boolean injected =
setter instanceof AgentPropagation.BinarySetter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class DefaultPathwayContext implements PathwayContext {
private long pathwayStartNanos;
private long pathwayStartNanoTicks;
private long edgeStartNanoTicks;
private StatsPoint savedStats;
private long hash;
private boolean started;
// state variables used to memoize the pathway hash with
Expand Down Expand Up @@ -174,7 +175,7 @@ public void setCheckpoint(
allTags,
newHash,
hash,
timeSource.getCurrentTimeNanos(),
startNanos,
pathwayLatencyNano,
edgeLatencyNano,
payloadSizeBytes);
Expand All @@ -188,6 +189,16 @@ public void setCheckpoint(
}
}

@Override
public void saveStats(StatsPoint point) {
this.savedStats = point;
}

@Override
public StatsPoint getSavedStats() {
return this.savedStats;
}

@Override
public byte[] encode() throws IOException {
lock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ public <C> void injectPathwayContext(
span, carrier, setter, sortedTags, defaultTimestamp, payloadSizeBytes);
}

@Override
public <C> void injectPathwayContextWithoutSendingStats(
AgentSpan span, C carrier, Setter<C> setter, LinkedHashMap<String, String> sortedTags) {
this.dataStreamContextInjector.injectPathwayContextWithoutSendingStats(
span, carrier, setter, sortedTags);
}

@Override
public <C> AgentSpan.Context.Extracted extract(final C carrier, final ContextVisitor<C> getter) {
return extractor.extract(carrier, getter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ <C> void injectPathwayContext(
long defaultTimestamp,
long payloadSizeBytes);

<C> void injectPathwayContextWithoutSendingStats(
AgentSpan span, C carrier, Setter<C> setter, LinkedHashMap<String, String> sortedTags);

interface Setter<C> {
void set(C carrier, String key, String value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,10 @@ public <C> void injectPathwayContext(
long defaultTimestamp,
long payloadSizeBytes) {}

@Override
public <C> void injectPathwayContextWithoutSendingStats(
AgentSpan span, C carrier, Setter<C> setter, LinkedHashMap<String, String> sortedTags) {}

@Override
public <C> Context.Extracted extract(final C carrier, final ContextVisitor<C> getter) {
return NoopContext.INSTANCE;
Expand Down Expand Up @@ -1102,6 +1106,14 @@ public void setCheckpoint(
public void setCheckpoint(
LinkedHashMap<String, String> sortedTags, Consumer<StatsPoint> pointConsumer) {}

@Override
public void saveStats(StatsPoint point) {}

@Override
public StatsPoint getSavedStats() {
return null;
}

@Override
public byte[] encode() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ void setCheckpoint(
// The input tags should be sorted.
void setCheckpoint(LinkedHashMap<String, String> sortedTags, Consumer<StatsPoint> pointConsumer);

void saveStats(StatsPoint point);

StatsPoint getSavedStats();

byte[] encode() throws IOException;

String strEncode() throws IOException;
Expand Down

0 comments on commit 7440eac

Please sign in to comment.