Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add instrumentation to allow sending kafka payload size on produce #6228

Merged
merged 11 commits into from
Jan 2, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.TIME_IN_QUEUE_ENABLED;
import static datadog.trace.instrumentation.kafka_clients.TextMapInjectAdapter.SETTER;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

Expand All @@ -22,7 +23,9 @@
import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
import datadog.trace.bootstrap.instrumentation.api.StatsPoint;
import java.util.LinkedHashMap;
import net.bytebuddy.asm.Advice;
import org.apache.kafka.clients.ApiVersions;
Expand Down Expand Up @@ -62,6 +65,13 @@ public void adviceTransformations(AdviceTransformation transformation) {
.and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerRecord")))
.and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback"))),
KafkaProducerInstrumentation.class.getName() + "$ProducerAdvice");

transformation.applyAdvice(
isMethod()
.and(isPrivate())
.and(takesArgument(0, int.class))
.and(named("ensureValidRecordSize")),
KafkaProducerInstrumentation.class.getName() + "$PayloadSizeAdvice");
}

public static class ProducerAdvice {
Expand Down Expand Up @@ -99,7 +109,8 @@ public static AgentScope onEnter(
sortedTags.put(TYPE_TAG, "kafka");
try {
propagate().inject(span, record.headers(), SETTER);
propagate().injectPathwayContext(span, record.headers(), SETTER, sortedTags);
propagate()
.injectPathwayContextWithoutSendingStats(span, record.headers(), SETTER, sortedTags);
} catch (final IllegalStateException e) {
// headers must be read-only from reused record. try again with new one.
record =
Expand All @@ -112,7 +123,8 @@ record =
record.headers());

propagate().inject(span, record.headers(), SETTER);
propagate().injectPathwayContext(span, record.headers(), SETTER, sortedTags);
propagate()
.injectPathwayContextWithoutSendingStats(span, record.headers(), SETTER, sortedTags);
}
if (TIME_IN_QUEUE_ENABLED) {
SETTER.injectTimeInQueue(record.headers());
Expand All @@ -130,4 +142,31 @@ public static void stopSpan(
scope.close();
}
}

public static class PayloadSizeAdvice {

/**
* Instrumentation for the method KafkaProducer.ensureValidRecordSize that is called as part of
* sending a kafka payload. This gives us access to an estimate of the payload size "for free",
* that we send as a metric.
*/
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize) {
StatsPoint saved = activeSpan().context().getPathwayContext().getSavedStats();
if (saved != null) {
// create new stats including the payload size
StatsPoint updated =
new StatsPoint(
saved.getEdgeTags(),
saved.getHash(),
saved.getParentHash(),
saved.getTimestampNanos(),
saved.getPathwayLatencyNano(),
saved.getEdgeLatencyNano(),
estimatedPayloadSize);
// then send the point
AgentTracer.get().getDataStreamsMonitoring().add(updated);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,31 @@ public <C> void injectPathwayContext(
C carrier,
AgentPropagation.Setter<C> setter,
LinkedHashMap<String, String> sortedTags) {
PathwayContext pathwayContext = span.context().getPathwayContext();
injectPathwayContext(span, carrier, setter, sortedTags, true);
}

/** Same as injectPathwayContext, but the stats collected in the StatsPoint are not sent. */
public <C> void injectPathwayContextWithoutSendingStats(
AgentSpan span,
C carrier,
AgentPropagation.Setter<C> setter,
LinkedHashMap<String, String> sortedTags) {
injectPathwayContext(span, carrier, setter, sortedTags, false);
}

private <C> void injectPathwayContext(
AgentSpan span,
C carrier,
AgentPropagation.Setter<C> setter,
LinkedHashMap<String, String> sortedTags,
boolean sendCheckpoint) {
PathwayContext pathwayContext = span.context().getPathwayContext();
if (pathwayContext == null
|| (span.traceConfig() != null && !span.traceConfig().isDataStreamsEnabled())) {
return;
}
pathwayContext.setCheckpoint(sortedTags, dataStreamsMonitoring::add);

pathwayContext.setCheckpoint(
sortedTags, sendCheckpoint ? dataStreamsMonitoring::add : pathwayContext::saveStats);
boolean injected =
setter instanceof AgentPropagation.BinarySetter
? injectBinaryPathwayContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class DefaultPathwayContext implements PathwayContext {
private long pathwayStartNanos;
private long pathwayStartNanoTicks;
private long edgeStartNanoTicks;
private StatsPoint savedStats;
private long hash;
private boolean started;
// state variables used to memoize the pathway hash with
Expand Down Expand Up @@ -174,7 +175,7 @@ public void setCheckpoint(
allTags,
newHash,
hash,
timeSource.getCurrentTimeNanos(),
startNanos,
pathwayLatencyNano,
edgeLatencyNano,
payloadSizeBytes);
Expand All @@ -188,6 +189,16 @@ public void setCheckpoint(
}
}

@Override
public void saveStats(StatsPoint point) {
this.savedStats = point;
}

@Override
public StatsPoint getSavedStats() {
return this.savedStats;
}

@Override
public byte[] encode() throws IOException {
lock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ public <C> void injectPathwayContext(
this.dataStreamContextInjector.injectPathwayContext(span, carrier, setter, sortedTags);
}

@Override
public <C> void injectPathwayContextWithoutSendingStats(
AgentSpan span, C carrier, Setter<C> setter, LinkedHashMap<String, String> sortedTags) {
this.dataStreamContextInjector.injectPathwayContextWithoutSendingStats(
span, carrier, setter, sortedTags);
}

@Override
public <C> AgentSpan.Context.Extracted extract(final C carrier, final ContextVisitor<C> getter) {
return extractor.extract(carrier, getter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ public interface AgentPropagation {
<C> void injectPathwayContext(
AgentSpan span, C carrier, Setter<C> setter, LinkedHashMap<String, String> sortedTags);

<C> void injectPathwayContextWithoutSendingStats(
AgentSpan span, C carrier, Setter<C> setter, LinkedHashMap<String, String> sortedTags);

interface Setter<C> {
void set(C carrier, String key, String value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,10 @@ public <C> void inject(
public <C> void injectPathwayContext(
AgentSpan span, C carrier, Setter<C> setter, LinkedHashMap<String, String> sortedTags) {}

@Override
public <C> void injectPathwayContextWithoutSendingStats(
AgentSpan span, C carrier, Setter<C> setter, LinkedHashMap<String, String> sortedTags) {}

@Override
public <C> Context.Extracted extract(final C carrier, final ContextVisitor<C> getter) {
return NoopContext.INSTANCE;
Expand Down Expand Up @@ -1082,6 +1086,14 @@ public void setCheckpoint(
public void setCheckpoint(
LinkedHashMap<String, String> sortedTags, Consumer<StatsPoint> pointConsumer) {}

@Override
public void saveStats(StatsPoint point) {}

@Override
public StatsPoint getSavedStats() {
return null;
}

@Override
public byte[] encode() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ void setCheckpoint(
// The input tags should be sorted.
void setCheckpoint(LinkedHashMap<String, String> sortedTags, Consumer<StatsPoint> pointConsumer);

void saveStats(StatsPoint point);

StatsPoint getSavedStats();

byte[] encode() throws IOException;

String strEncode() throws IOException;
Expand Down
Loading