Skip to content

Commit

Permalink
+ kamon-executors: providers for Runnable and Callable wrappers (#1331)…
Browse files Browse the repository at this point in the history
… (#1333)
  • Loading branch information
alexmihailov committed Jun 5, 2024
1 parent 170aad7 commit bdf963e
Show file tree
Hide file tree
Showing 6 changed files with 451 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,45 @@

package kamon.instrumentation.executor;

import com.typesafe.config.Config;
import kamon.Kamon;
import kamon.context.Context;
import kamon.context.Storage.Scope;
import kamon.instrumentation.executor.CaptureContextOnSubmitAdvices.CallableCollectionWrapperAdvisor;
import kamon.instrumentation.executor.CaptureContextOnSubmitAdvices.CallableWrapperAdvisor;
import kamon.instrumentation.executor.CaptureContextOnSubmitAdvices.RunnableWrapperAdvisor;
import kamon.instrumentation.executor.ContextAware.ContextAwareCallableProvider;
import kamon.instrumentation.executor.ContextAware.ContextAwareRunnableProvider;
import kamon.instrumentation.executor.ContextAware.DefaultContextAwareCallable;
import kamon.instrumentation.executor.ContextAware.DefaultContextAwareRunnable;
import kanela.agent.api.instrumentation.InstrumentationBuilder;
import kanela.agent.bootstrap.context.ContextHandler;
import kanela.agent.bootstrap.context.ContextProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;

import static java.text.MessageFormat.format;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;

public final class CaptureContextOnSubmitInstrumentation extends InstrumentationBuilder {

private static final Logger LOG = LoggerFactory.getLogger(CaptureContextOnSubmitInstrumentation.class);

private volatile static Settings settings = readSettings(Kamon.config());

public CaptureContextOnSubmitInstrumentation() {

/**
* Set the ContextProvider
*/
ContextHandler.setContextProvider(new KamonContextProvider());

Kamon.onReconfigure(newConfig -> { settings = readSettings(newConfig); });

/**
* Instrument all implementations of:
*
Expand Down Expand Up @@ -74,65 +91,98 @@ public CaptureContextOnSubmitInstrumentation() {

}

/**
* Runs a Runnable within Kamon Context
*/
private static class ContextAwareRunnable implements Runnable {
private static final class Settings {
public final List<ContextAwareRunnableProvider> runnableAwareProviders;
public final List<ContextAwareCallableProvider> callableAwareProviders;

private final Runnable underlying;
private final Context context;

ContextAwareRunnable(Runnable r) {
this.context = Kamon.currentContext();
this.underlying = r;
private Settings(
List<ContextAwareRunnableProvider> runnableAwareProviders,
List<ContextAwareCallableProvider> callableAwareProviders
) {
this.runnableAwareProviders = runnableAwareProviders;
this.callableAwareProviders = callableAwareProviders;
}
}

@Override
public void run() {
final Scope scope = Kamon.storeContext(context);
try {
underlying.run();
} finally {
scope.close();
}
private static Settings readSettings(Config config) {
Config executorCaptureConfig = config.getConfig("kanela.modules.executor-service-capture-on-submit");
List<ContextAwareRunnableProvider> runnableAwareProviders ;
if (executorCaptureConfig.hasPath("context-aware-runnable-providers")) {
runnableAwareProviders = executorCaptureConfig.getStringList("context-aware-runnable-providers")
.stream()
.map(CaptureContextOnSubmitInstrumentation::loadRunnableProvider)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toList());
} else {
runnableAwareProviders = emptyList();
}
}

/**
* Runs a Callable within Kamon Context
*/
private static class ContextAwareCallable<A> implements Callable<A> {
List<ContextAwareCallableProvider> callableAwareProviders;
if (executorCaptureConfig.hasPath("context-aware-callable-providers")) {
callableAwareProviders = executorCaptureConfig.getStringList("context-aware-callable-providers")
.stream()
.map(CaptureContextOnSubmitInstrumentation::loadCallableProvider)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toList());
} else {
callableAwareProviders = emptyList();
}

private final Callable<A> underlying;
private final Context context;
return new Settings(runnableAwareProviders, callableAwareProviders);
}

ContextAwareCallable(Callable<A> c) {
this.context = Kamon.currentContext();
this.underlying = c;
private static Optional<ContextAwareRunnableProvider> loadRunnableProvider(String providerClassName) {
Optional<ContextAwareRunnableProvider> providerOpt;
try {
providerOpt = Optional.of(
(ContextAwareRunnableProvider) Class.forName(providerClassName).getConstructor().newInstance()
);
} catch (Exception e) {
LOG.warn(format("Error trying to load ContextAwareRunnableProvider: {0}.", providerClassName), e);
providerOpt = Optional.empty();
}
return providerOpt;
}

public A call() throws Exception {
final Scope scope = Kamon.storeContext(context);
try {
return underlying.call();
} finally {
scope.close();
}
private static Optional<ContextAwareCallableProvider> loadCallableProvider(String providerClassName) {
Optional<ContextAwareCallableProvider> providerOpt;
try {
providerOpt = Optional.of(
(ContextAwareCallableProvider) Class.forName(providerClassName).getConstructor().newInstance()
);
} catch (Exception e) {
LOG.warn(format("Error trying to load ContextAwareCallableProvider: {0}.", providerClassName), e);
providerOpt = Optional.empty();
}
return providerOpt;
}

/**
* implementation of kanela.agent.bootstrap.context.ContextProvider
*/
private static class KamonContextProvider implements ContextProvider {

@Override
public Runnable wrapInContextAware(Runnable runnable) {
return new ContextAwareRunnable(runnable);
public Runnable wrapInContextAware(Runnable r) {
return settings.runnableAwareProviders
.stream()
.filter(p -> p.test(r))
.findFirst()
.map(it -> it.provide(r))
.orElse(new DefaultContextAwareRunnable(r));
}

@SuppressWarnings("rawtypes")
@Override
public <A> Callable wrapInContextAware(Callable<A> callable) {
return new ContextAwareCallable<>(callable);
public <A> Callable wrapInContextAware(Callable<A> c) {
return settings.callableAwareProviders
.stream()
.filter(p -> p.test(c))
.findFirst()
.map(it -> it.provide(c))
.orElse(new DefaultContextAwareCallable<>(c));
}
}
}
}
Loading

0 comments on commit bdf963e

Please sign in to comment.