Skip to content

Commit

Permalink
change instrumented method
Browse files Browse the repository at this point in the history
  • Loading branch information
vandonr committed Nov 16, 2023
1 parent 46e8651 commit 5ac65d1
Showing 1 changed file with 14 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.TIME_IN_QUEUE_ENABLED;
import static datadog.trace.instrumentation.kafka_clients.TextMapInjectAdapter.SETTER;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
Expand All @@ -30,9 +29,9 @@
import net.bytebuddy.asm.Advice;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.RecordBatch;

@AutoService(Instrumenter.class)
Expand Down Expand Up @@ -68,14 +67,18 @@ public void adviceTransformations(AdviceTransformation transformation) {
KafkaProducerInstrumentation.class.getName() + "$ProducerAdvice");

transformation.applyAdvice(
isMethod().and(isPublic()).and(isStatic()).and(named("estimateSizeInBytesUpperBound")),
KafkaProducerInstrumentation.class.getName() + "$EstimateSizeAdvice");
isMethod()
.and(isPrivate())
.and(takesArgument(0, int.class))
.and(named("ensureValidRecordSize")),
KafkaProducerInstrumentation.class.getName() + "$PayloadSizeAdvice");
}

public static class ProducerAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope onEnter(
@Advice.This KafkaProducer producer,
@Advice.FieldValue("apiVersions") final ApiVersions apiVersions,
@Advice.FieldValue("producerConfig") ProducerConfig producerConfig,
@Advice.Argument(value = 0, readOnly = false) ProducerRecord record,
Expand Down Expand Up @@ -131,29 +134,23 @@ record =
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Enter final AgentScope scope, @Advice.Thrown final Throwable throwable) {
// reset depth count for EstimateSizeAdvice
CallDepthThreadLocalMap.reset(AbstractRecords.class);

PRODUCER_DECORATE.onError(scope, throwable);
PRODUCER_DECORATE.beforeFinish(scope);
scope.close();
}
}

public static class EstimateSizeAdvice {
public static class PayloadSizeAdvice {

/**
* Instrumentation for the method AbstractRecords.estimateSizeInBytesUpperBound that is called
* as part of sending a kafka payload. This gives us an estimate of the payload size "for free",
* Instrumentation for the method KafkaProducer.ensureValidRecordSize that is called as part of
* sending a kafka payload. This gives us access to an estimate of the payload size "for free",
* that we send as a metric.
*/
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return int estimatedPayloadSize) {
// prevent multiple calls
final int callDepth = CallDepthThreadLocalMap.incrementCallDepth(AbstractRecords.class);
if (callDepth > 0) return;

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize) {
final AgentSpan encompassingSpan = activeSpan();
System.out.println("current span = " + encompassingSpan);
if (encompassingSpan == null) return;

String topic = encompassingSpan.getBaggageItem("kafka.topic");
Expand Down

0 comments on commit 5ac65d1

Please sign in to comment.