Skip to content

Commit

Permalink
ServiceTalk async context propagation instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
ygree committed Jun 26, 2024
1 parent d19ceac commit 9852006
Show file tree
Hide file tree
Showing 7 changed files with 337 additions and 2 deletions.
36 changes: 36 additions & 0 deletions dd-java-agent/instrumentation/servicetalk/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
plugins {
id 'java-test-fixtures'
}

muzzle {
pass {
group = 'io.servicetalk'
module = 'servicetalk-concurrent-api'
// prev versions missing ContextMap
versions = '[0.41.12,0.42.45]'
assertInverse = true
}
pass {
group = 'io.servicetalk'
module = 'servicetalk-context-api'
versions = '[0.1.0,0.42.45]'
assertInverse = true
}
}

apply from: "$rootDir/gradle/java.gradle"

addTestSuiteForDir('latestDepTest', 'test')
addTestSuiteExtendingForDir('latestDepForkedTest', 'latestDepTest', 'test')

dependencies {
compileOnly group: 'io.servicetalk', name: 'servicetalk-concurrent-api', version: '0.42.45'
compileOnly group: 'io.servicetalk', name: 'servicetalk-context-api', version: '0.42.45'

testImplementation group: 'io.servicetalk', name: 'servicetalk-concurrent-api', version: '0.42.0'
testImplementation group: 'io.servicetalk', name: 'servicetalk-context-api', version: '0.42.0'

latestDepTestImplementation group: 'io.servicetalk', name: 'servicetalk-concurrent-api', version: '0.42+'
latestDepTestImplementation group: 'io.servicetalk', name: 'servicetalk-context-api', version: '0.42+'
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package datadog.trace.instrumentation.servicetalk;

import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.Collections;
import java.util.Map;

public abstract class AbstractAsyncContextInstrumentation extends InstrumenterModule.Tracing {

public AbstractAsyncContextInstrumentation() {
super("servicetalk", "servicetalk-concurrent");
}

@Override
public Map<String, String> contextStore() {
return Collections.singletonMap(
"io.servicetalk.context.api.ContextMap", AgentSpan.class.getName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package datadog.trace.instrumentation.servicetalk;

import static net.bytebuddy.matcher.ElementMatchers.isConstructor;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import io.servicetalk.context.api.ContextMap;
import net.bytebuddy.asm.Advice;

@AutoService(InstrumenterModule.class)
public class ContextMapInstrumentation extends AbstractAsyncContextInstrumentation
implements Instrumenter.ForSingleType {

@Override
public String instrumentedType() {
return "io.servicetalk.concurrent.api.CopyOnWriteContextMap";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(isConstructor(), getClass().getName() + "$Construct");
}

private static final class Construct {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static boolean enter(@Advice.Origin Class<?> clazz) {
int level = CallDepthThreadLocalMap.incrementCallDepth(clazz);
return level == 0;
}

@Advice.OnMethodExit(suppress = Throwable.class)
public static void exit(
@Advice.Origin Class<?> clazz,
@Advice.Enter final boolean topLevel,
@Advice.This ContextMap contextMap) {
if (!topLevel) {
return;
}
CallDepthThreadLocalMap.reset(clazz);

InstrumentationContext.get(ContextMap.class, AgentSpan.class)
.put(contextMap, AgentTracer.activeSpan());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package datadog.trace.instrumentation.servicetalk;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import io.servicetalk.context.api.ContextMap;
import net.bytebuddy.asm.Advice;

@AutoService(InstrumenterModule.class)
public class ContextPreservingInstrumentation extends AbstractAsyncContextInstrumentation
implements Instrumenter.ForKnownTypes {

@Override
public String[] knownMatchingTypes() {
return new String[] {
"io.servicetalk.concurrent.api.ContextPreservingBiConsumer",
"io.servicetalk.concurrent.api.ContextPreservingBiFunction",
"io.servicetalk.concurrent.api.ContextPreservingCallable",
"io.servicetalk.concurrent.api.ContextPreservingCancellable",
"io.servicetalk.concurrent.api.ContextPreservingCompletableSubscriber",
"io.servicetalk.concurrent.api.ContextPreservingConsumer",
"io.servicetalk.concurrent.api.ContextPreservingFunction",
"io.servicetalk.concurrent.api.ContextPreservingRunnable",
"io.servicetalk.concurrent.api.ContextPreservingSingleSubscriber",
"io.servicetalk.concurrent.api.ContextPreservingSubscriber",
"io.servicetalk.concurrent.api.ContextPreservingSubscription",
};
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
namedOneOf(
"accept",
"apply",
"call",
"cancel",
"onComplete",
"onError",
"onSuccess",
"request",
"onNext",
"onSubscribe",
"run"),
getClass().getName() + "$Wrapper");
}

public static final class Wrapper {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope enter(@Advice.FieldValue("saved") final ContextMap contextMap) {
AgentSpan parent =
InstrumentationContext.get(ContextMap.class, AgentSpan.class).get(contextMap);
if (parent != null) {
return AgentTracer.activateSpan(parent);
}
return null;
}

@Advice.OnMethodExit(suppress = Throwable.class)
public static void exit(@Advice.Enter final AgentScope agentScope) {
if (agentScope != null) {
agentScope.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.bootstrap.instrumentation.api.AgentScope
import datadog.trace.bootstrap.instrumentation.api.AgentTracer
import io.servicetalk.concurrent.api.AsyncContext
import io.servicetalk.context.api.ContextMap

import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace

class ContextPreservingInstrumentationTest extends AgentTestRunner {

def "wrapBiConsumer"() {
setup:
def parent = startParentContext()
def wrapped =
asyncContextProvider.wrapBiConsumer({ t, u -> childSpan() }, parent.contextMap)

when:
runInSeparateThread{ wrapped.accept(null, null) }
parent.releaseParentSpan()

then:
assertParentChildTrace()
}

def "wrapBiFunction"() {
setup:
def parent = startParentContext()
def wrapped =
asyncContextProvider.wrapBiFunction({ t, u -> childSpan() }, parent.contextMap)

when:
runInSeparateThread{ wrapped.apply(null, null) }
parent.releaseParentSpan()

then:
assertParentChildTrace()
}

def "wrapCallable"() {
setup:
def parent = startParentContext()
def wrapped =
asyncContextProvider.wrapCallable({ -> childSpan() }, parent.contextMap)

when:
runInSeparateThread{ wrapped.call() }
parent.releaseParentSpan()

then:
assertParentChildTrace()
}

def "wrapConsumer"() {
setup:
def parent = startParentContext()
def wrapped =
asyncContextProvider.wrapConsumer({ t -> childSpan() }, parent.contextMap)

when:
runInSeparateThread{ wrapped.accept(null) }
parent.releaseParentSpan()

then:
assertParentChildTrace()
}

def "wrapFunction"() {
setup:
def parent = startParentContext()
def wrapped =
asyncContextProvider.wrapFunction({ t -> childSpan() }, parent.contextMap)

when:
runInSeparateThread { wrapped.apply(null) }
parent.releaseParentSpan()

then:
assertParentChildTrace()
}

def "wrapRunnable"() {
setup:
def parent = startParentContext()
def wrapped =
asyncContextProvider.wrapRunnable({ -> childSpan() }, parent.contextMap)

when:
runInSeparateThread(wrapped)
parent.releaseParentSpan()

then:
assertParentChildTrace()
}

ExecutorService executor = Executors.newFixedThreadPool(5)
def asyncContextProvider = AsyncContext.provider

def cleanup() {
if (executor != null) {
executor.shutdown()
}
}

private runInSeparateThread(Runnable runnable) {
executor.submit(runnable).get()
}

/**
* Captures async context. Also uses continuation to prevent the span from being reported until it is released.
*/
private class ParentContext {
final ContextMap contextMap = AsyncContext.context().copy()
final AgentScope.Continuation spanContinuation = AgentTracer.capture()

def releaseParentSpan() {
spanContinuation.cancel()
}
}

private startParentContext() {
runUnderTrace("parent") {
new ParentContext()
}
}

/**
* Asserts a parent-child trace meaning that async context propagation works correctly.
*/
private void assertParentChildTrace() {
assertTraces(1) {
trace(2) {
sortSpansByStart()
span {
operationName "parent"
tags {
defaultTags()
}
}
span {
childOf span(0)
operationName "child"
tags {
defaultTags()
}
}
}
}
}

private childSpan() {
AgentTracer.startSpan("test", "child").finish()
}
}
5 changes: 3 additions & 2 deletions dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ private void finishAndAddToTrace(final long durationNano) {
PendingTrace.PublishState publishState = context.getTrace().onPublish(this);
log.debug("Finished span ({}): {}", publishState, this);
} else {
log.debug("Already finished: {}", this);
log.debug("Already finished 1: {}", this);
new Throwable().printStackTrace();
}
}

Expand Down Expand Up @@ -249,7 +250,7 @@ public final boolean phasedFinish() {
log.debug("Finished span (PHASED): {}", this);
return true;
} else {
log.debug("Already finished: {}", this);
log.debug("Already finished 2: {}", this);
return false;
}
}
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ include ':dd-java-agent:instrumentation:scala-promise:scala-promise-2.10'
include ':dd-java-agent:instrumentation:scala-promise:scala-promise-2.13'
include ':dd-java-agent:instrumentation:scalatest'
include ':dd-java-agent:instrumentation:selenium'
include ':dd-java-agent:instrumentation:servicetalk'
include ':dd-java-agent:instrumentation:servlet'
include ':dd-java-agent:instrumentation:servlet-common'
include ':dd-java-agent:instrumentation:servlet:request-2'
Expand Down

0 comments on commit 9852006

Please sign in to comment.