From bdf963ec25e7e23728e3bca5c9f4e12dba077d25 Mon Sep 17 00:00:00 2001 From: Alexander Mikhailov <33699084+alexmihailov@users.noreply.github.com> Date: Wed, 5 Jun 2024 10:26:31 +0300 Subject: [PATCH] + kamon-executors: providers for Runnable and Callable wrappers (#1331) (#1333) --- ...CaptureContextOnSubmitInstrumentation.java | 134 ++++++--- .../executor/ContextAware.java | 280 ++++++++++++++++++ .../src/main/resources/reference.conf | 10 +- .../src/test/resources/application.conf | 3 + .../OnSubmitContextPropagationSpec.scala | 27 +- .../executor/TestContextAware.scala | 42 +++ 6 files changed, 451 insertions(+), 45 deletions(-) create mode 100644 instrumentation/kamon-executors/src/main/java/kamon/instrumentation/executor/ContextAware.java create mode 100644 instrumentation/kamon-executors/src/test/scala/kamon/instrumentation/executor/TestContextAware.scala diff --git a/instrumentation/kamon-executors/src/main/java/kamon/instrumentation/executor/CaptureContextOnSubmitInstrumentation.java b/instrumentation/kamon-executors/src/main/java/kamon/instrumentation/executor/CaptureContextOnSubmitInstrumentation.java index c1594d36b..9931f611f 100644 --- a/instrumentation/kamon-executors/src/main/java/kamon/instrumentation/executor/CaptureContextOnSubmitInstrumentation.java +++ b/instrumentation/kamon-executors/src/main/java/kamon/instrumentation/executor/CaptureContextOnSubmitInstrumentation.java @@ -16,21 +16,36 @@ package kamon.instrumentation.executor; +import com.typesafe.config.Config; import kamon.Kamon; -import kamon.context.Context; -import kamon.context.Storage.Scope; import kamon.instrumentation.executor.CaptureContextOnSubmitAdvices.CallableCollectionWrapperAdvisor; import kamon.instrumentation.executor.CaptureContextOnSubmitAdvices.CallableWrapperAdvisor; import kamon.instrumentation.executor.CaptureContextOnSubmitAdvices.RunnableWrapperAdvisor; +import kamon.instrumentation.executor.ContextAware.ContextAwareCallableProvider; +import kamon.instrumentation.executor.ContextAware.ContextAwareRunnableProvider; +import kamon.instrumentation.executor.ContextAware.DefaultContextAwareCallable; +import kamon.instrumentation.executor.ContextAware.DefaultContextAwareRunnable; import kanela.agent.api.instrumentation.InstrumentationBuilder; import kanela.agent.bootstrap.context.ContextHandler; import kanela.agent.bootstrap.context.ContextProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collection; +import java.util.List; +import java.util.Optional; import java.util.concurrent.Callable; +import static java.text.MessageFormat.format; +import static java.util.Collections.emptyList; +import static java.util.stream.Collectors.toList; + public final class CaptureContextOnSubmitInstrumentation extends InstrumentationBuilder { + private static final Logger LOG = LoggerFactory.getLogger(CaptureContextOnSubmitInstrumentation.class); + + private volatile static Settings settings = readSettings(Kamon.config()); + public CaptureContextOnSubmitInstrumentation() { /** @@ -38,6 +53,8 @@ public CaptureContextOnSubmitInstrumentation() { */ ContextHandler.setContextProvider(new KamonContextProvider()); + Kamon.onReconfigure(newConfig -> { settings = readSettings(newConfig); }); + /** * Instrument all implementations of: * @@ -74,65 +91,98 @@ public CaptureContextOnSubmitInstrumentation() { } - /** - * Runs a Runnable within Kamon Context - */ - private static class ContextAwareRunnable implements Runnable { + private static final class Settings { + public final List runnableAwareProviders; + public final List callableAwareProviders; - private final Runnable underlying; - private final Context context; - - ContextAwareRunnable(Runnable r) { - this.context = Kamon.currentContext(); - this.underlying = r; + private Settings( + List runnableAwareProviders, + List callableAwareProviders + ) { + this.runnableAwareProviders = runnableAwareProviders; + this.callableAwareProviders = callableAwareProviders; } + } - @Override - public void run() { - final Scope scope = Kamon.storeContext(context); - try { - underlying.run(); - } finally { - scope.close(); - } + private static Settings readSettings(Config config) { + Config executorCaptureConfig = config.getConfig("kanela.modules.executor-service-capture-on-submit"); + List runnableAwareProviders ; + if (executorCaptureConfig.hasPath("context-aware-runnable-providers")) { + runnableAwareProviders = executorCaptureConfig.getStringList("context-aware-runnable-providers") + .stream() + .map(CaptureContextOnSubmitInstrumentation::loadRunnableProvider) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(toList()); + } else { + runnableAwareProviders = emptyList(); } - } - /** - * Runs a Callable within Kamon Context - */ - private static class ContextAwareCallable implements Callable { + List callableAwareProviders; + if (executorCaptureConfig.hasPath("context-aware-callable-providers")) { + callableAwareProviders = executorCaptureConfig.getStringList("context-aware-callable-providers") + .stream() + .map(CaptureContextOnSubmitInstrumentation::loadCallableProvider) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(toList()); + } else { + callableAwareProviders = emptyList(); + } - private final Callable underlying; - private final Context context; + return new Settings(runnableAwareProviders, callableAwareProviders); + } - ContextAwareCallable(Callable c) { - this.context = Kamon.currentContext(); - this.underlying = c; + private static Optional loadRunnableProvider(String providerClassName) { + Optional providerOpt; + try { + providerOpt = Optional.of( + (ContextAwareRunnableProvider) Class.forName(providerClassName).getConstructor().newInstance() + ); + } catch (Exception e) { + LOG.warn(format("Error trying to load ContextAwareRunnableProvider: {0}.", providerClassName), e); + providerOpt = Optional.empty(); } + return providerOpt; + } - public A call() throws Exception { - final Scope scope = Kamon.storeContext(context); - try { - return underlying.call(); - } finally { - scope.close(); - } + private static Optional loadCallableProvider(String providerClassName) { + Optional providerOpt; + try { + providerOpt = Optional.of( + (ContextAwareCallableProvider) Class.forName(providerClassName).getConstructor().newInstance() + ); + } catch (Exception e) { + LOG.warn(format("Error trying to load ContextAwareCallableProvider: {0}.", providerClassName), e); + providerOpt = Optional.empty(); } + return providerOpt; } /** * implementation of kanela.agent.bootstrap.context.ContextProvider */ private static class KamonContextProvider implements ContextProvider { + @Override - public Runnable wrapInContextAware(Runnable runnable) { - return new ContextAwareRunnable(runnable); + public Runnable wrapInContextAware(Runnable r) { + return settings.runnableAwareProviders + .stream() + .filter(p -> p.test(r)) + .findFirst() + .map(it -> it.provide(r)) + .orElse(new DefaultContextAwareRunnable(r)); } + @SuppressWarnings("rawtypes") @Override - public Callable wrapInContextAware(Callable callable) { - return new ContextAwareCallable<>(callable); + public Callable wrapInContextAware(Callable c) { + return settings.callableAwareProviders + .stream() + .filter(p -> p.test(c)) + .findFirst() + .map(it -> it.provide(c)) + .orElse(new DefaultContextAwareCallable<>(c)); } } -} \ No newline at end of file +} diff --git a/instrumentation/kamon-executors/src/main/java/kamon/instrumentation/executor/ContextAware.java b/instrumentation/kamon-executors/src/main/java/kamon/instrumentation/executor/ContextAware.java new file mode 100644 index 000000000..a9c0732ed --- /dev/null +++ b/instrumentation/kamon-executors/src/main/java/kamon/instrumentation/executor/ContextAware.java @@ -0,0 +1,280 @@ +package kamon.instrumentation.executor; + +import kamon.Kamon; +import kamon.context.Context; +import kamon.context.Storage; + +import java.util.concurrent.Callable; + +/** + * The idea is to make it possible to extend wrappers over {@link Runnable} and {@link Callable}, + * so that {@link ClassCastException} can be avoided when instrumenting implementations of + * {@link java.util.concurrent.Executor}, {@link java.util.concurrent.ExecutorService#submit(Runnable)}, + * {@link java.util.concurrent.ExecutorService#submit(Callable)}, + * {@link java.util.concurrent.ExecutorService#invokeAll}. + *

+ * {@link ContextAwareRunnable} interface for wrappers with the {@link Runnable} type. + *

+ * {@link ContextAwareCallable} interface for wrappers with the {@link Callable} type. + *

+ * The specified interfaces implement the logic of the {@link Runnable#run()} and {@link Callable#call()} methods as the default method. + * Thus, a specific implementation should implement {@link ContextAwareRunnable} or {@link ContextAwareCallable}, as well as the necessary interfaces, + * which will avoid the error {@link ClassCastException} after code instrumentation. + *

+ * Using {@link ContextAwareRunnableProvider}, {@link ContextAwareCallableProvider}, you can provide a specific implementation + * for {@link ContextAwareRunnable} and {@link ContextAwareCallable}. To do this, you need to implement a provider, you must have a default constructor! + * In the configuration

kanela.modules.executor-service-capture-on-submit.context-aware-runnable-providers
or + *
kanela.modules.executor-service-capture-on-submit.context-aware-callable-providers
specify the provider. + *

+ * Example: + *

+ *

+ * kanela.modules.executor-service-capture-on-submit.context-aware-runnable-providers = [
+ *   "instrumentation.executor.SlickContextAwareRunnableProvider"
+ * ]
+ * 
+ *

+ * When executing the code, {@link ContextAwareRunnableProvider#test(Runnable)}, {@link ContextAwareCallableProvider#test(Callable)} is called. + * The first provider is used, for which test returns true. + *

+ * If no matches are found, {@link DefaultContextAwareRunnable}, {@link DefaultContextAwareCallable} are used by default. + *

+ * An example for a PrioritizedRunnable from the slick library. + * If you use the standard implementation, the error {@link ClassCastException} will occur when trying to put a task in a queue, + * because the queue is parameterized by the PrioritizedRunnable type. + * Therefore, a class is created that implements {@link ContextAwareRunnable} and PrioritizedRunnable: + *

+ *

+ * {@code
+ * import kamon.Kamon;
+ * import kamon.context.Context;
+ * import slick.util.AsyncExecutor;
+ * import ru.tele2.ds.kamon.executor.ContextAware.ContextAwareRunnable;
+ * import slick.util.AsyncExecutor.PrioritizedRunnable;
+ * public class SlickContextAwareRunnable implements ContextAwareRunnable, PrioritizedRunnable {
+ *
+ *     private final PrioritizedRunnable underlying;
+ *     private final Context context;
+ *
+ *     public SlickContextAwareRunnable(Runnable r) {
+ *         this.context = Kamon.currentContext();
+ *         this.underlying = (PrioritizedRunnable) r;
+ *     }
+ *
+ *     @Override
+ *     public Runnable getUnderlying() {
+ *         return underlying;
+ *     }
+ *
+ *     @Override
+ *     public Context getContext() {
+ *         return context;
+ *     }
+ *
+ *     @Override
+ *     public AsyncExecutor.Priority priority() {
+ *         return underlying.priority();
+ *     }
+ *
+ *     @Override
+ *     public boolean connectionReleased() {
+ *         return underlying.connectionReleased();
+ *     }
+ *
+ *     @Override
+ *     public void connectionReleased_$eq(boolean connectionReleased) {
+ *         underlying.connectionReleased_$eq(connectionReleased);
+ *     }
+ *
+ *     @Override
+ *     public void inUseCounterSet_$eq(boolean inUseCounterSet) {
+ *         underlying.inUseCounterSet_$eq(inUseCounterSet);
+ *     }
+ *
+ *     @Override
+ *     public boolean inUseCounterSet() {
+ *         return underlying.inUseCounterSet();
+ *     }
+ * }
+ * }
+ * 
+ *

+ * Next, we set a new implementation through the provider: + *

+ *

+ * {@code
+ * public class SlickContextAwareRunnableProvider implements ContextAware.ContextAwareRunnableProvider {
+ *     @Override
+ *     public ContextAware.ContextAwareRunnable provide(Runnable original) {
+ *         return new SlickContextAwareRunnable(original);
+ *     }
+ *     @Override
+ *     public boolean test(Runnable r) {
+ *         return r instanceof AsyncExecutor.PrioritizedRunnable;
+ *     }
+ * }
+ * }
+ * 
+ *

+ * Setting the provider class in the configuration: + *

+ *

+ * kanela.modules.executor-service-capture-on-submit.context-aware-runnable-providers = [
+ *   "instrumentation.executor.SlickContextAwareRunnableProvider"
+ * ]
+ * 
+ */ +public final class ContextAware { + + private ContextAware() { + // nothing + } + + /** + * Wrapper over {@link Runnable}. + * Specifies the default implementation of the {@link Runnable#run} method to run in the Kamon {@link Context}. + */ + public interface ContextAwareRunnable extends Runnable { + + /** + * @return original {@link Runnable} + */ + Runnable getUnderlying(); + + /** + * @return current Kamon {@link Context} + */ + Context getContext(); + + @Override + default void run() { + final Storage.Scope scope = Kamon.storeContext(getContext()); + try { + getUnderlying().run(); + } finally { + scope.close(); + } + } + } + + /** + * Wrapper over {@link Callable}. + * Specifies the default implementation of the {@link Callable#call} method to run in the Kamon {@link Context}. + * @param
result type + */ + public interface ContextAwareCallable extends Callable { + + /** + * @return original {@link Callable} + */ + Callable getUnderlying(); + + /** + * @return current Kamon {@link Context} + */ + Context getContext(); + + @Override + default A call() throws Exception { + final Storage.Scope scope = Kamon.storeContext(getContext()); + try { + return getUnderlying().call(); + } finally { + scope.close(); + } + } + } + + /** + * Default implementation {@link ContextAwareRunnable}. + */ + public static final class DefaultContextAwareRunnable implements ContextAwareRunnable { + + private final Runnable underlying; + private final Context context; + + public DefaultContextAwareRunnable(Runnable r) { + this.context = Kamon.currentContext(); + this.underlying = r; + } + + @Override + public Runnable getUnderlying() { + return underlying; + } + + @Override + public Context getContext() { + return context; + } + } + + /** + * Default implementation {@link ContextAwareCallable}. + * + * @param result type + */ + public static final class DefaultContextAwareCallable implements ContextAwareCallable { + + private final Callable underlying; + private final Context context; + + public DefaultContextAwareCallable(Callable c) { + this.context = Kamon.currentContext(); + this.underlying = c; + } + + @Override + public Callable getUnderlying() { + return underlying; + } + + @Override + public Context getContext() { + return context; + } + } + + /** + * Provides a method for creating {@link ContextAwareRunnable}. + * The implementation must have a default constructor! + */ + public interface ContextAwareRunnableProvider { + + /** + * @param original {@link Runnable} + * @return {@link ContextAwareRunnable} + */ + ContextAwareRunnable provide(Runnable original); + + /** + * Check that the provider can provide the wrapper implementation. + * + * @param r {@link Runnable} + * @return true, if it can provide a wrapper implementation + */ + boolean test(Runnable r); + } + + /** + * Provides a method for creating {@link ContextAwareCallable}. + * The implementation must have a default constructor! + */ + public interface ContextAwareCallableProvider { + + /** + * @param original {@link Callable} + * @param result type + * @return {@link ContextAwareCallable} + */ + ContextAwareCallable provide(Callable original); + + /** + * Check that the provider can provide the wrapper implementation. + * + * @param r {@link Callable} + * @return true, if it can provide a wrapper implementation + */ + boolean test(Callable r); + } +} diff --git a/instrumentation/kamon-executors/src/main/resources/reference.conf b/instrumentation/kamon-executors/src/main/resources/reference.conf index c29576078..826388459 100644 --- a/instrumentation/kamon-executors/src/main/resources/reference.conf +++ b/instrumentation/kamon-executors/src/main/resources/reference.conf @@ -63,5 +63,13 @@ kanela.modules { "com.google.common.util.concurrent..*", "scala.concurrent.forkjoin.ForkJoinPool" ] + + # Provider classes for Runnable wrappers. + # See kamon.instrumentation.executor.ContextAware. + context-aware-runnable-providers = [] + + # Provider classes for Callalbe wrappers. + # See kamon.instrumentation.executor.ContextAware. + context-aware-callable-providers = [] } -} \ No newline at end of file +} diff --git a/instrumentation/kamon-executors/src/test/resources/application.conf b/instrumentation/kamon-executors/src/test/resources/application.conf index 65cd7fa1f..4c2318c8b 100644 --- a/instrumentation/kamon-executors/src/test/resources/application.conf +++ b/instrumentation/kamon-executors/src/test/resources/application.conf @@ -6,6 +6,7 @@ kanela.modules { } executor-service-capture-on-submit { + enabled = true within = ${?kanela.modules.executor-service-capture-on-submit.within} [ "com.google.common.util.concurrent..*", "java.util.concurrent..*", @@ -15,5 +16,7 @@ kanela.modules { "akka.actor..*", "play.api.libs.streams..*", ] + context-aware-runnable-providers += kamon.instrumentation.executor.TestContextAwareRunnableProvider + context-aware-callable-providers += kamon.instrumentation.executor.TestContextAwareCallableProvider } } \ No newline at end of file diff --git a/instrumentation/kamon-executors/src/test/scala/kamon/instrumentation/executor/OnSubmitContextPropagationSpec.scala b/instrumentation/kamon-executors/src/test/scala/kamon/instrumentation/executor/OnSubmitContextPropagationSpec.scala index 5927cdf24..326cfdf40 100644 --- a/instrumentation/kamon-executors/src/test/scala/kamon/instrumentation/executor/OnSubmitContextPropagationSpec.scala +++ b/instrumentation/kamon-executors/src/test/scala/kamon/instrumentation/executor/OnSubmitContextPropagationSpec.scala @@ -15,11 +15,12 @@ package kamon.instrumentation.executor -import java.util.concurrent.{ExecutorService, Executors => JavaExecutors} - +import java.util.concurrent.{Callable, ExecutorService, Executors => JavaExecutors} import com.google.common.util.concurrent.MoreExecutors import kamon.Kamon +import kamon.instrumentation.executor.ContextAware.{DefaultContextAwareCallable, DefaultContextAwareRunnable} import kamon.testkit.InitAndStopKamonAfterAll +import kanela.agent.bootstrap.context.ContextHandler import org.scalatest.OptionValues import org.scalatest.concurrent.Eventually import org.scalatest.matchers.should.Matchers @@ -169,6 +170,28 @@ class OnSubmitContextPropagationSpec extends AnyWordSpec with Matchers with Cont } values should contain allOf ("all-callables-should-see-this-key-A", "all-callables-should-see-this-key-B", "all-callables-should-see-this-key-C") } + + "wrap Runnable to TestContextAwareRunnable when call ContextHandler.wrapInContextAware" in { + val simpleRunnable = ContextHandler.wrapInContextAware(new SimpleRunnable) + simpleRunnable.isInstanceOf[TestContextAwareRunnable] should be(true) + simpleRunnable.isInstanceOf[DefaultContextAwareRunnable] should be(false) + + val notSimpleRunnable = ContextHandler.wrapInContextAware(new Runnable { override def run(): Unit = {} }) + notSimpleRunnable.isInstanceOf[TestContextAwareRunnable] should be(false) + notSimpleRunnable.isInstanceOf[DefaultContextAwareRunnable] should be(true) + } + + "wrap Callable to TestContextAwareCallable when call ContextHandler.wrapInContextAware" in { + val simpleCallable = ContextHandler.wrapInContextAware(new SimpleCallable) + simpleCallable.isInstanceOf[TestContextAwareCallable[_]] should be(true) + simpleCallable.isInstanceOf[DefaultContextAwareCallable[_]] should be(false) + + val notSimpleCallable = ContextHandler.wrapInContextAware(new Callable[String] { + override def call(): String = "test" + }) + notSimpleCallable.isInstanceOf[TestContextAwareCallable[_]] should be(false) + notSimpleCallable.isInstanceOf[DefaultContextAwareCallable[_]] should be(true) + } } def instrument(executor: ExecutorService): ExecutorService = { diff --git a/instrumentation/kamon-executors/src/test/scala/kamon/instrumentation/executor/TestContextAware.scala b/instrumentation/kamon-executors/src/test/scala/kamon/instrumentation/executor/TestContextAware.scala new file mode 100644 index 000000000..57a314474 --- /dev/null +++ b/instrumentation/kamon-executors/src/test/scala/kamon/instrumentation/executor/TestContextAware.scala @@ -0,0 +1,42 @@ +package kamon.instrumentation.executor + +import kamon.Kamon +import kamon.context.Context +import kamon.instrumentation.executor.ContextAware.{ + ContextAwareCallable, + ContextAwareCallableProvider, + ContextAwareRunnable, + ContextAwareRunnableProvider +} + +import java.util.concurrent.Callable + +class TestContextAwareRunnable(private val underlying: Runnable, private val ctx: Context) + extends ContextAwareRunnable { + override def getUnderlying: Runnable = underlying + override def getContext: Context = ctx +} + +class TestContextAwareCallable[A](private val underlying: Callable[A], private val ctx: Context) + extends ContextAwareCallable[A] { + + override def getUnderlying: Callable[A] = underlying + + override def getContext: Context = ctx +} + +class TestContextAwareRunnableProvider extends ContextAwareRunnableProvider { + + override def provide(original: Runnable): ContextAwareRunnable = + new TestContextAwareRunnable(original, Kamon.currentContext()) + + override def test(r: Runnable): Boolean = r.isInstanceOf[SimpleRunnable] +} + +class TestContextAwareCallableProvider extends ContextAwareCallableProvider { + + override def provide[A](original: Callable[A]): ContextAwareCallable[A] = + new TestContextAwareCallable(original, Kamon.currentContext()) + + override def test(r: Callable[_]): Boolean = r.isInstanceOf[SimpleCallable] +}