Skip to content

Commit

Permalink
instrument continuation (un)mounting
Browse files Browse the repository at this point in the history
  • Loading branch information
richardstartin committed Jun 27, 2024
1 parent 7afb3eb commit 8fa2800
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ public void close() {
@Override
public void activate(Object context) {
if (context instanceof ProfilerContext) {
ProfilerContext profilerContext = (ProfilerContext) context;
DDPROF.setSpanContext(profilerContext.getSpanId(), profilerContext.getRootSpanId());
DDPROF.setContextValue(SPAN_NAME_INDEX, profilerContext.getEncodedOperationName());
DDPROF.setContextValue(RESOURCE_NAME_INDEX, profilerContext.getEncodedResourceName());
setContext((ProfilerContext) context);
}
}
};
Expand Down Expand Up @@ -93,6 +90,14 @@ public String name() {
return "ddprof";
}

@Override
public void setContext(ProfilerContext profilerContext) {
DDPROF.setSpanContext(profilerContext.getSpanId(), profilerContext.getRootSpanId());
DDPROF.setContextValue(SPAN_NAME_INDEX, profilerContext.getEncodedOperationName());
DDPROF.setContextValue(RESOURCE_NAME_INDEX, profilerContext.getEncodedResourceName());
}

@Override
public void clearContext() {
DDPROF.clearSpanContext();
DDPROF.clearContextValue(SPAN_NAME_INDEX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
0 java.io.PushbackInputStream
1 jdk.*
0 jdk.internal.net.http.*
# for updating profiler context on mount/unmount
0 jdk.internal.vm.Continuation
1 net.bytebuddy.*
1 org.apache.felix.framework.URLHandlers*
1 org.apache.groovy.*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package datadog.trace.instrumentation.java.concurrent.virtualthread;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Platform;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.ProfilerContext;
import net.bytebuddy.asm.Advice;

@AutoService(InstrumenterModule.class)
public class ContinuationInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForBootstrap, Instrumenter.ForSingleType {
public ContinuationInstrumentation() {
super("java_concurrent", "virtual-thread-continuation");
}

@Override
public String instrumentedType() {
return "jdk.internal.vm.Continuation";
}

@Override
public boolean isEnabled() {
return Platform.isJavaVersionAtLeast(19) && super.isEnabled();
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(takesArguments(0)).and(isPrivate()).and(named("mount")),
getClass().getName() + "$Mount");
transformer.applyAdvice(
isMethod().and(takesArguments(0)).and(isPrivate()).and(named("unmount")),
getClass().getName() + "$Unmount");
}

public static final class Mount {
@Advice.OnMethodEnter
public static void mount() {
AgentScope scope = AgentTracer.activeScope();
if (scope != null && scope.span().context() instanceof ProfilerContext) {
AgentTracer.get()
.getProfilingContext()
.setContext((ProfilerContext) scope.span().context());
}
}
}

public static final class Unmount {
@Advice.OnMethodExit
public static void unmount() {
AgentTracer.get().getProfilingContext().clearContext();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import datadog.trace.agent.test.AgentTestRunner

import java.util.concurrent.atomic.AtomicLong

import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace

class ContinuationMountingTest extends AgentTestRunner {

@Override
protected void configurePreAgent() {
super.configurePreAgent()
injectSysConfig("dd.profiling.enabled", "true")
}

def "test continuation mounting intercepted"() {
setup:
def spanName = UUID.randomUUID().toString()
def duration = new AtomicLong()

when:
runUnderTrace(spanName, {
Thread.ofVirtual()
.name("test thread")
.start({
long start = System.nanoTime()
for (int i = 0; i < 3; i++) {
Thread.sleep(20)
long now = System.nanoTime()
duration.addAndGet(now - start)
start = now
}
}).join(5000)
})

then:
TEST_PROFILING_CONTEXT_INTEGRATION.getActivationBalance(spanName) == 0
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import datadog.trace.api.profiling.QueueTiming
import datadog.trace.api.profiling.Timing
import datadog.trace.bootstrap.instrumentation.api.AgentSpan
import datadog.trace.bootstrap.instrumentation.api.AgentTracer
import datadog.trace.bootstrap.instrumentation.api.ProfilerContext
import datadog.trace.bootstrap.instrumentation.api.ProfilingContextIntegration
import datadog.trace.bootstrap.instrumentation.api.TaskWrapper
import org.eclipse.jetty.util.ConcurrentHashSet
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import java.util.concurrent.BlockingDeque
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.atomic.AtomicInteger

Expand All @@ -24,6 +27,10 @@ class TestProfilingContextIntegration implements ProfilingContextIntegration {
final AtomicInteger counter = new AtomicInteger()
final BlockingDeque<Timing> closedTimings = new LinkedBlockingDeque<>()
final Logger logger = LoggerFactory.getLogger(TestProfilingContextIntegration)
final ConcurrentHashMap<CharSequence, AtomicInteger> spanIds = new ConcurrentHashMap<>()
final ThreadLocal<CharSequence> currentSpan = ThreadLocal.withInitial {
""
}
@Override
void onAttach() {
attachments.incrementAndGet()
Expand All @@ -34,11 +41,33 @@ class TestProfilingContextIntegration implements ProfilingContextIntegration {
detachments.incrementAndGet()
}

int getActivationBalance(CharSequence spanName) {
return spanIds.get(spanName)?.get() ?: -1
}

void clear() {
attachments.set(0)
detachments.set(0)
}

@Override
void setContext(ProfilerContext profilerContext) {
new Throwable("mount " + profilerContext.operationName).printStackTrace(System.err)
spanIds.computeIfAbsent(profilerContext.operationName, {
return new AtomicInteger()
}).getAndIncrement()
currentSpan.set(profilerContext.operationName)
}

@Override
void clearContext() {
def spanName = currentSpan.get()
if (spanName?.length() != 0) {
spanIds.getOrDefault(spanName, new AtomicInteger()).getAndDecrement()
currentSpan.remove()
}
}

@Override
String name() {
return "test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ default Stateful newScopeState(ProfilerContext profilerContext) {
return Stateful.DEFAULT;
}

default void setContext(ProfilerContext profilerContext) {}

default void clearContext() {}

default int encode(CharSequence constant) {
return 0;
}
Expand Down

0 comments on commit 8fa2800

Please sign in to comment.