Skip to content

Commit

Permalink
couple of fixes following advice from andrea
Browse files Browse the repository at this point in the history
 - mutualize method between client and streams
 - fix code that would not have been inlined
 - add class cast check just to be sure
  • Loading branch information
vandonr committed Oct 19, 2023
1 parent 969ff58 commit 17eacbc
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public String[] helperClassNames() {
packageName + ".TracingIterator",
packageName + ".TracingList",
packageName + ".TracingListIterator",
packageName + ".Base64Decoder"
packageName + ".Base64Decoder",
packageName + ".Utils"
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,17 @@
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.KAFKA_DELIVER;
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.TIME_IN_QUEUE_ENABLED;
import static datadog.trace.instrumentation.kafka_clients.TextMapExtractAdapter.GETTER;
import static datadog.trace.instrumentation.kafka_clients.Utils.computePayloadSizeBytes;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan.Context;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.LinkedHashMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -114,16 +112,6 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {
}
}

private static long computePayloadSizeBytes(ConsumerRecord<?, ?> val) {
long headersSize = 0;
Headers headers = val.headers();
if (headers != null)
for (Header h : headers) {
headersSize += h.value().length + h.key().getBytes(StandardCharsets.UTF_8).length;
}
return headersSize + val.serializedKeySize() + val.serializedValueSize();
}

@Override
public void remove() {
delegateIterator.remove();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package datadog.trace.instrumentation.kafka_clients;

import java.nio.charset.StandardCharsets;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

public class Utils {
// this method is used in kafka-clients and kafka-streams instrumentations
public static long computePayloadSizeBytes(ConsumerRecord<?, ?> val) {
long headersSize = 0;
Headers headers = val.headers();
if (headers != null)
for (Header h : headers) {
headersSize += h.value().length + h.key().getBytes(StandardCharsets.UTF_8).length;
}
return headersSize + val.serializedKeySize() + val.serializedValueSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static datadog.trace.core.datastreams.TagsProcessor.GROUP_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
import static datadog.trace.instrumentation.kafka_clients.Utils.computePayloadSizeBytes;
import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.BROKER_DECORATE;
import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.CONSUMER_DECORATE;
import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.KAFKA_CONSUME;
Expand All @@ -32,14 +33,11 @@
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.instrumentation.kafka_clients.TracingIterableDelegator;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
Expand All @@ -63,6 +61,7 @@ public String instrumentedType() {
public String[] helperClassNames() {
return new String[] {
"datadog.trace.instrumentation.kafka_clients.TracingIterableDelegator",
"datadog.trace.instrumentation.kafka_clients.Utils",
packageName + ".KafkaStreamsDecorator",
packageName + ".ProcessorRecordContextVisitor",
packageName + ".StampedRecordContextVisitor",
Expand Down Expand Up @@ -268,16 +267,6 @@ public static void start(
InstrumentationContext.get(StreamTask.class, StreamTaskContext.class)
.put(task, streamTaskContext);
}

private static long computePayloadSizeBytes(ConsumerRecord<?, ?> val) {
long headersSize = 0;
Headers headers = val.headers();
if (headers != null)
for (Header h : headers) {
headersSize += h.value().length + h.key().getBytes(StandardCharsets.UTF_8).length;
}
return headersSize + val.serializedKeySize() + val.serializedValueSize();
}
}

/** Very similar to StartSpanAdvice, but with a different argument type for record. */
Expand Down Expand Up @@ -323,17 +312,17 @@ public static void start(
sortedTags.put(TOPIC_TAG, record.topic());
sortedTags.put(TYPE_TAG, "kafka");

// the type ProcessorRecordContext inherits from RecordMetadata starting with 2.7
// we need access to that type to be able to get the payload size info
RecordMetadata metadata = (RecordMetadata) (Object) record;
long payloadSize = 0;
// we have to go through Object to get the RecordMetadata here because the class of `record`
// only implements it after 2.7 (and this class is only used if v >= 2.7)
if ((Object) record instanceof RecordMetadata) { // should always be true
RecordMetadata metadata = (RecordMetadata) (Object) record;
payloadSize = metadata.serializedKeySize() + metadata.serializedValueSize();
}

AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(
span,
sortedTags,
record.timestamp(),
metadata.serializedKeySize() + metadata.serializedValueSize());
.setCheckpoint(span, sortedTags, record.timestamp(), payloadSize);
} else {
span = startSpan(KAFKA_CONSUME, null);
}
Expand Down

0 comments on commit 17eacbc

Please sign in to comment.