Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add instrumentation to allow sending kafka payload size on produce #6228

Merged
merged 11 commits into from
Jan 2, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,24 @@
import static datadog.trace.instrumentation.kafka_clients.TextMapInjectAdapter.SETTER;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
import java.util.LinkedHashMap;
import net.bytebuddy.asm.Advice;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.RecordBatch;

@AutoService(Instrumenter.class)
Expand Down Expand Up @@ -62,6 +66,10 @@ public void adviceTransformations(AdviceTransformation transformation) {
.and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerRecord")))
.and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback"))),
KafkaProducerInstrumentation.class.getName() + "$ProducerAdvice");

transformation.applyAdvice(
isMethod().and(isPublic()).and(isStatic()).and(named("estimateSizeInBytesUpperBound")),
KafkaProducerInstrumentation.class.getName() + "$EstimateSizeAdvice");
}

public static class ProducerAdvice {
Expand All @@ -82,6 +90,8 @@ public static AgentScope onEnter(
if (record.value() == null) {
span.setTag(InstrumentationTags.TOMBSTONE, true);
}
// save the topic for EstimateSizeAdvice to read it.
span.setBaggageItem("kafka.topic", record.topic());

// Do not inject headers for batch versions below 2
// This is how similar check is being done in Kafka client itself:
Expand All @@ -93,13 +103,9 @@ public static AgentScope onEnter(
if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2
&& Config.get().isKafkaClientPropagationEnabled()
&& !Config.get().isKafkaClientPropagationDisabledForTopic(record.topic())) {
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
sortedTags.put(DIRECTION_TAG, DIRECTION_OUT);
sortedTags.put(TOPIC_TAG, record.topic());
sortedTags.put(TYPE_TAG, "kafka");
try {
propagate().inject(span, record.headers(), SETTER);
propagate().injectPathwayContext(span, record.headers(), SETTER, sortedTags);
propagate().injectPathwayContext(span, record.headers(), SETTER);
} catch (final IllegalStateException e) {
// headers must be read-only from reused record. try again with new one.
record =
Expand All @@ -112,7 +118,7 @@ record =
record.headers());

propagate().inject(span, record.headers(), SETTER);
propagate().injectPathwayContext(span, record.headers(), SETTER, sortedTags);
propagate().injectPathwayContext(span, record.headers(), SETTER);
}
if (TIME_IN_QUEUE_ENABLED) {
SETTER.injectTimeInQueue(record.headers());
Expand All @@ -130,4 +136,35 @@ public static void stopSpan(
scope.close();
}
}

public static class EstimateSizeAdvice {

/**
* Instrumentation for the method AbstractRecords.estimateSizeInBytesUpperBound that is called
* as part of sending a kafka payload. This gives us an estimate of the payload size "for free",
* that we send as a metric.
*/
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return int estimatedPayloadSize) {
// prevent multiple calls
final int callDepth = CallDepthThreadLocalMap.incrementCallDepth(AbstractRecords.class);
if (callDepth > 0) return;

final AgentSpan encompassingSpan = activeSpan();
if (encompassingSpan == null) return;

String topic = encompassingSpan.getBaggageItem("kafka.topic");
// if not available, the call is not coming from where we expect it.
if (topic == null) return;

LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
sortedTags.put(DIRECTION_TAG, DIRECTION_OUT);
sortedTags.put(TOPIC_TAG, topic);
sortedTags.put(TYPE_TAG, "kafka");

AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(encompassingSpan, sortedTags, 0, estimatedPayloadSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,28 @@ public <C> void injectPathwayContext(
AgentPropagation.Setter<C> setter,
LinkedHashMap<String, String> sortedTags) {
PathwayContext pathwayContext = span.context().getPathwayContext();
if (shouldInject(pathwayContext, span)) {
pathwayContext.setCheckpoint(sortedTags, dataStreamsMonitoring::add);
doInject(pathwayContext, span, carrier, setter);
}
}

if (pathwayContext == null
|| (span.traceConfig() != null && !span.traceConfig().isDataStreamsEnabled())) {
return;
public <C> void injectPathwayContext(
AgentSpan span, C carrier, AgentPropagation.Setter<C> setter) {
PathwayContext pathwayContext = span.context().getPathwayContext();
if (shouldInject(pathwayContext, span)) {
// Checkpoint is not set here. Should be done manually later one.
doInject(pathwayContext, span, carrier, setter);
}
pathwayContext.setCheckpoint(sortedTags, dataStreamsMonitoring::add);
}

private boolean shouldInject(PathwayContext pathwayContext, AgentSpan span) {
return !(pathwayContext == null
|| (span.traceConfig() != null && !span.traceConfig().isDataStreamsEnabled()));
}

private static <C> void doInject(
PathwayContext pathwayContext, AgentSpan span, C carrier, AgentPropagation.Setter<C> setter) {
boolean injected =
setter instanceof AgentPropagation.BinarySetter
? injectBinaryPathwayContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public <C> void injectPathwayContext(
this.dataStreamContextInjector.injectPathwayContext(span, carrier, setter, sortedTags);
}

@Override
public <C> void injectPathwayContext(AgentSpan span, C carrier, Setter<C> setter) {
this.dataStreamContextInjector.injectPathwayContext(span, carrier, setter);
}

@Override
public <C> AgentSpan.Context.Extracted extract(final C carrier, final ContextVisitor<C> getter) {
return extractor.extract(carrier, getter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ public interface AgentPropagation {
<C> void injectPathwayContext(
AgentSpan span, C carrier, Setter<C> setter, LinkedHashMap<String, String> sortedTags);

/** This method does not set a checkpoint */
<C> void injectPathwayContext(AgentSpan span, C carrier, Setter<C> setter);
vandonr marked this conversation as resolved.
Show resolved Hide resolved

interface Setter<C> {
void set(C carrier, String key, String value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,9 @@ public <C> void inject(
public <C> void injectPathwayContext(
AgentSpan span, C carrier, Setter<C> setter, LinkedHashMap<String, String> sortedTags) {}

@Override
public <C> void injectPathwayContext(AgentSpan span, C carrier, Setter<C> setter) {}

@Override
public <C> Context.Extracted extract(final C carrier, final ContextVisitor<C> getter) {
return NoopContext.INSTANCE;
Expand Down
Loading