Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add instrumentation to allow sending kafka payload size on produce #6228

Merged
merged 11 commits into from
Jan 2, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.TIME_IN_QUEUE_ENABLED;
import static datadog.trace.instrumentation.kafka_clients.TextMapInjectAdapter.SETTER;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

Expand All @@ -22,11 +23,13 @@
import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
import java.util.LinkedHashMap;
import net.bytebuddy.asm.Advice;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.record.RecordBatch;
Expand Down Expand Up @@ -62,12 +65,20 @@ public void adviceTransformations(AdviceTransformation transformation) {
.and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerRecord")))
.and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback"))),
KafkaProducerInstrumentation.class.getName() + "$ProducerAdvice");

transformation.applyAdvice(
isMethod()
.and(isPrivate())
.and(takesArgument(0, int.class))
.and(named("ensureValidRecordSize")),
KafkaProducerInstrumentation.class.getName() + "$PayloadSizeAdvice");
}

public static class ProducerAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope onEnter(
@Advice.This KafkaProducer producer,
@Advice.FieldValue("apiVersions") final ApiVersions apiVersions,
@Advice.FieldValue("producerConfig") ProducerConfig producerConfig,
@Advice.Argument(value = 0, readOnly = false) ProducerRecord record,
Expand All @@ -82,6 +93,8 @@ public static AgentScope onEnter(
if (record.value() == null) {
span.setTag(InstrumentationTags.TOMBSTONE, true);
}
// save the topic for EstimateSizeAdvice to read it.
span.setBaggageItem("kafka.topic", record.topic());

// Do not inject headers for batch versions below 2
// This is how similar check is being done in Kafka client itself:
Expand All @@ -93,13 +106,9 @@ public static AgentScope onEnter(
if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2
&& Config.get().isKafkaClientPropagationEnabled()
&& !Config.get().isKafkaClientPropagationDisabledForTopic(record.topic())) {
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
sortedTags.put(DIRECTION_TAG, DIRECTION_OUT);
sortedTags.put(TOPIC_TAG, record.topic());
sortedTags.put(TYPE_TAG, "kafka");
try {
propagate().inject(span, record.headers(), SETTER);
propagate().injectPathwayContext(span, record.headers(), SETTER, sortedTags);
propagate().injectPathwayContext(span, record.headers(), SETTER);
} catch (final IllegalStateException e) {
// headers must be read-only from reused record. try again with new one.
record =
Expand All @@ -112,7 +121,7 @@ record =
record.headers());

propagate().inject(span, record.headers(), SETTER);
propagate().injectPathwayContext(span, record.headers(), SETTER, sortedTags);
propagate().injectPathwayContext(span, record.headers(), SETTER);
}
if (TIME_IN_QUEUE_ENABLED) {
SETTER.injectTimeInQueue(record.headers());
Expand All @@ -130,4 +139,32 @@ public static void stopSpan(
scope.close();
}
}

public static class PayloadSizeAdvice {

/**
* Instrumentation for the method KafkaProducer.ensureValidRecordSize that is called as part of
* sending a kafka payload. This gives us access to an estimate of the payload size "for free",
* that we send as a metric.
*/
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize) {
final AgentSpan encompassingSpan = activeSpan();
System.out.println("current span = " + encompassingSpan);
vandonr marked this conversation as resolved.
Show resolved Hide resolved
if (encompassingSpan == null) return;

String topic = encompassingSpan.getBaggageItem("kafka.topic");
// if not available, the call is not coming from where we expect it.
if (topic == null) return;

LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
sortedTags.put(DIRECTION_TAG, DIRECTION_OUT);
sortedTags.put(TOPIC_TAG, topic);
sortedTags.put(TYPE_TAG, "kafka");

AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(encompassingSpan, sortedTags, 0, estimatedPayloadSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,28 @@ public <C> void injectPathwayContext(
AgentPropagation.Setter<C> setter,
LinkedHashMap<String, String> sortedTags) {
PathwayContext pathwayContext = span.context().getPathwayContext();
if (shouldInject(pathwayContext, span)) {
pathwayContext.setCheckpoint(sortedTags, dataStreamsMonitoring::add);
doInject(pathwayContext, span, carrier, setter);
}
}

if (pathwayContext == null
|| (span.traceConfig() != null && !span.traceConfig().isDataStreamsEnabled())) {
return;
public <C> void injectPathwayContext(
AgentSpan span, C carrier, AgentPropagation.Setter<C> setter) {
PathwayContext pathwayContext = span.context().getPathwayContext();
if (shouldInject(pathwayContext, span)) {
// Checkpoint is not set here. Should be done manually later on.
doInject(pathwayContext, span, carrier, setter);
}
pathwayContext.setCheckpoint(sortedTags, dataStreamsMonitoring::add);
}

private boolean shouldInject(PathwayContext pathwayContext, AgentSpan span) {
return !(pathwayContext == null
|| (span.traceConfig() != null && !span.traceConfig().isDataStreamsEnabled()));
}

private static <C> void doInject(
PathwayContext pathwayContext, AgentSpan span, C carrier, AgentPropagation.Setter<C> setter) {
boolean injected =
setter instanceof AgentPropagation.BinarySetter
? injectBinaryPathwayContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public <C> void injectPathwayContext(
this.dataStreamContextInjector.injectPathwayContext(span, carrier, setter, sortedTags);
}

@Override
public <C> void injectPathwayContext(AgentSpan span, C carrier, Setter<C> setter) {
this.dataStreamContextInjector.injectPathwayContext(span, carrier, setter);
}

@Override
public <C> AgentSpan.Context.Extracted extract(final C carrier, final ContextVisitor<C> getter) {
return extractor.extract(carrier, getter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ public interface AgentPropagation {
<C> void injectPathwayContext(
AgentSpan span, C carrier, Setter<C> setter, LinkedHashMap<String, String> sortedTags);

/** This method does not set a checkpoint */
<C> void injectPathwayContext(AgentSpan span, C carrier, Setter<C> setter);
vandonr marked this conversation as resolved.
Show resolved Hide resolved

interface Setter<C> {
void set(C carrier, String key, String value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,9 @@ public <C> void inject(
public <C> void injectPathwayContext(
AgentSpan span, C carrier, Setter<C> setter, LinkedHashMap<String, String> sortedTags) {}

@Override
public <C> void injectPathwayContext(AgentSpan span, C carrier, Setter<C> setter) {}

@Override
public <C> Context.Extracted extract(final C carrier, final ContextVisitor<C> getter) {
return NoopContext.INSTANCE;
Expand Down
Loading