Skip to content

Commit

Permalink
instrumented Avro DatumReader
Browse files Browse the repository at this point in the history
  • Loading branch information
nayeem-kamal committed Jun 24, 2024
1 parent 64f4dee commit bb7a49a
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 287 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.avro;

import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
Expand All @@ -9,56 +10,56 @@
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.concurrent.ExecutionException;
import net.bytebuddy.asm.Advice;
import org.apache.avro.hadoop.io.AvroSerializer;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.avro.Schema;

@AutoService(InstrumenterModule.class)
public final class AvroSerializerInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForSingleType {
implements Instrumenter.ForTypeHierarchy {

static final String instrumentationName = "avro";
static final String TARGET_TYPE = "org.apache.avro.hadoop.io.AvroSerializer";
static final String TARGET_TYPE = "org.apache.avro.io.DatumReader";

public AvroSerializerInstrumentation() {
super(instrumentationName);
}

@Override
public String instrumentedType() {
return TARGET_TYPE;
public String[] helperClassNames() {
return new String[] {
packageName + ".SchemaExtractor", "datadog.trace.instrumentation.avro.SchemaExtractor$1",
};
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(named("serialize")).and(takesArguments(1)),
isMethod().and(named("setSchema").and(takesArguments(1))),
AvroSerializerInstrumentation.class.getName() + "$SerializeAdvice");
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".SchemaExtractor", "datadog.trace.instrumentation.avro.SchemaExtractor$1",
};
public String hierarchyMarkerType() {
return TARGET_TYPE;
}

@Override
public ElementMatcher<TypeDescription> hierarchyMatcher() {
return implementsInterface(named(hierarchyMarkerType()));
}

public static class SerializeAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.This final AvroSerializer serializer, @Advice.Thrown final Throwable throwable) {
public static void stopSpan(@Advice.Argument(0) final Schema schema) {
AgentSpan span = activeSpan();
if (span == null) {
return;
}
if (throwable != null) {
span.addThrowable(
throwable instanceof ExecutionException ? throwable.getCause() : throwable);
}

if (serializer != null) {
SchemaExtractor.attachSchemaOnSpan(
serializer.getWriterSchema(), span, SchemaExtractor.serialization);
if (schema != null) {
System.out.println("Attaching schema to span:" + schema.toString());
SchemaExtractor.attachSchemaOnSpan(schema, span, SchemaExtractor.serialization);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.DDTags
import org.apache.avro.Schema
import org.apache.avro.io.DatumReader
import org.apache.avro.specific.SpecificDatumReader


import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace

class AvroDatumReaderTest extends AgentTestRunner {



@Override

protected boolean isDataStreamsEnabled() {

return true
}
String schemaID = "5493435211744749109"
String openApiSchemaDef = "{\"components\":{\"schemas\":{\"TestRecord\":{\"properties\":{\"stringField\":{\"type\":\"string\"},\"intField\":{\"format\":\"int32\",\"type\":\"integer\"},\"longField\":{\"format\":\"int64\",\"type\":\"integer\"},\"floatField\":{\"format\":\"float\",\"type\":\"number\"},\"doubleField\":{\"format\":\"double\",\"type\":\"number\"},\"booleanField\":{\"type\":\"boolean\"},\"bytesField\":{\"format\":\"byte\",\"type\":\"string\"},\"nullField\":{\"type\":\"null\"},\"enumField\":{\"enum\":[\"A\",\"B\",\"C\"],\"type\":\"string\"},\"fixedField\":{\"type\":\"string\"},\"recordField\":{\"type\":\"object\"},\"arrayField\":{\"items\":{\"type\":\"integer\"},\"type\":\"array\"},\"mapField\":{\"description\":\"Map type\",\"type\":\"object\"}},\"type\":\"object\"}}},\"openapi\":\"3.0.0\"}"
String schemaStr = '''
{
"type": "record",
"name": "TestRecord",
"fields": [
{"name": "stringField", "type": "string"},
{"name": "intField", "type": "int"},
{"name": "longField", "type": "long"},
{"name": "floatField", "type": "float"},
{"name": "doubleField", "type": "double"},
{"name": "booleanField", "type": "boolean"},
{"name": "bytesField", "type": "bytes"},
{"name": "nullField", "type": "null"},
{"name": "enumField", "type": {"type": "enum", "name": "TestEnum", "symbols": ["A", "B", "C"]}},
{"name": "fixedField", "type": {"type": "fixed", "name": "TestFixed", "size": 16}},
{"name": "recordField", "type": {"type": "record", "name": "NestedRecord", "fields": [{"name": "nestedString", "type": "string"}]}},
{"name": "arrayField", "type": {"type": "array", "items": "int"}},
{"name": "mapField", "type": {"type": "map", "values": "string"}}
]
}
'''
Schema schemaDef = new Schema.Parser().parse(schemaStr)



void 'test extract avro schema on serialize'() {

setup:
DatumReader datumReader = new SpecificDatumReader()
when:
runUnderTrace("parent_serialize") {
datumReader.setSchema(schemaDef)
}

TEST_WRITER.waitForTraces(1)

then:

assertTraces(1, SORT_TRACES_BY_ID) {
trace(1) {
span {
hasServiceName()
operationName "parent_serialize"
resourceName "parent_serialize"
errored false
measured false
tags {
"$DDTags.SCHEMA_DEFINITION" openApiSchemaDef
"$DDTags.SCHEMA_WEIGHT" 1
"$DDTags.SCHEMA_TYPE" "avro"
"$DDTags.SCHEMA_NAME" "TestRecord"
"$DDTags.SCHEMA_OPERATION" "serialization"
"$DDTags.SCHEMA_ID" schemaID
defaultTags(false)
}
}
}
}
}
}
Loading

0 comments on commit bb7a49a

Please sign in to comment.