Skip to content

Commit

Permalink
Attempt to handle the Dispatcher case
Browse files Browse the repository at this point in the history
  • Loading branch information
dispalt committed Jun 7, 2024
1 parent b7904d9 commit ff945ac
Show file tree
Hide file tree
Showing 6 changed files with 397 additions and 27 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ lazy val `kamon-cats-io-3` = (project in file("instrumentation/kamon-cats-io-3")
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version),
libraryDependencies ++= Seq(
kanelaAgent % "provided",
"org.typelevel" %% "cats-effect" % "3.3.14" % "provided",
// "org.typelevel" %% "cats-effect" % "3.3.14" % "provided",
"org.typelevel" %% "cats-effect" % "3.5.4" % "provided",
scalatest % "test",
logbackClassic % "test"
),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package kamon.instrumentation.cats3;

import kamon.Kamon;
import kamon.context.Context;
import kamon.context.Storage;
import kanela.agent.libs.net.bytebuddy.asm.Advice;
import scala.Function1;
import scala.runtime.BoxedUnit;
import scala.runtime.Nothing$;
import scala.util.Right;

public class CleanSchedulerContextAdvice35 {
@Advice.OnMethodEnter
public static void enter(@Advice.Argument(value = 1, readOnly = false) Function1<Right<Nothing$, BoxedUnit>, BoxedUnit> callback) {
callback = new CleanSchedulerContextAdvice35.ContextCleaningWrapper(callback, Kamon.currentContext());
}


public static class ContextCleaningWrapper implements Function1<Right<Nothing$, BoxedUnit>, BoxedUnit> {
private final Function1<Right<Nothing$, BoxedUnit>, BoxedUnit> runnable;
private final Context context;

public ContextCleaningWrapper(Function1<Right<Nothing$, BoxedUnit>, BoxedUnit> runnable, Context context) {
this.runnable = runnable;
this.context = context;
}

@Override
public BoxedUnit apply(Right<Nothing$, BoxedUnit> v1) {
try (Storage.Scope ignored = Kamon.storeContext(context)) {
return runnable.apply(v1);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ kanela.modules {

within = [
"cats\\.effect\\.IOFiber",
"cats\\.effect\\.std\\.Dispatcher",
"cats\\.effect\\.unsafe\\.WorkStealingThreadPool",
"cats\\.effect\\.unsafe\\.SchedulerCompanionPlatform.*"
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package cats.effect.kamonCats

import org.slf4j.LoggerFactory

/**
* Utility class to make accessing some internals from Kamon, more accessible.
*
*/
object PackageAccessor {

private val LOG = LoggerFactory.getLogger(getClass)

/**
* This uses reflection to get the objectState, which acts like a stack
* of effects so we can determine something about the lineage of a particular fiber.
*
* @param fiber A runnable or IOFiber
* @return
*/
def fiberObjectStackBuffer(fiber: Any): Array[AnyRef] = {
try {
val field = fiber.getClass.getDeclaredField("objectState")
field.setAccessible(true)
field.get(fiber).asInstanceOf[cats.effect.ArrayStack[AnyRef]].unsafeBuffer()
} catch {
case _: Exception =>
if (LOG.isWarnEnabled)
LOG.warn("Unable to get the object stack buffer.")
Array.empty
}

}

/** This frankly kinda isn't great, but I couldn't figure out how to do this */
def isDispatcherWorker(obj: AnyRef): Boolean = {
if (obj != null)
obj.getClass.getName.contains("Dispatcher$Worker")
else false
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package kamon.instrumentation.cats3

import cats.effect.kamonCats.PackageAccessor
import kamon.Kamon
import kamon.context.Storage.Scope
import kamon.context.Storage
import kamon.instrumentation.context.HasContext
import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.libs.net.bytebuddy.asm.Advice
Expand All @@ -12,8 +12,9 @@ class IOFiberInstrumentation extends InstrumentationBuilder {

onType("cats.effect.IOFiber")
.mixin(classOf[HasContext.Mixin])
.mixin(classOf[HasStorage.Mixin])
.advise(isConstructor.and(takesArguments(5).or(takesArguments(3))), AfterFiberInit)
.advise(method("suspend"), SaveCurrentContextOnExit)
.advise(method("suspend"), SaveCurrentContextOnSuspend)
.advise(method("resume"), RestoreContextOnSuccessfulResume)
.advise(method("run"), RunLoopWithContext)

Expand All @@ -31,23 +32,57 @@ class IOFiberInstrumentation extends InstrumentationBuilder {
)

onTypes("cats.effect.unsafe.WorkStealingThreadPool")
.advise(method("sleepInternal"), classOf[CleanSchedulerContextAdvice35]) // > 3.3
.advise(
anyMethods(
"scheduleFiber",
"rescheduleFiber",
"scheduleFiber", // <3.4
"rescheduleFiber", // <3.4
"reschedule",
"scheduleExternal"
),
SetContextOnNewFiberForWSTP
)

// Scheduled actions like `IO.sleep` end up calling `resume` from the scheduler thread,
// For < 3.4 cats, Scheduled actions like `IO.sleep` end up calling `resume` from the scheduler thread,
// which always leaves a dirty thread. This wrapper ensures that scheduled actions are
// executed with the same Context that was available when they were scheduled, and then
// reset the scheduler thread to the empty context.
onSubTypesOf("cats.effect.unsafe.Scheduler")
.advise(method("sleep"), classOf[CleanSchedulerContextAdvice])
}

/**
* Mixin that exposes access to the scope captured by an instrumented instance.
* The interface exposes means of getting and more importantly closing of the
* scope.
*/
trait HasStorage {

/**
* Returns the [[Storage.Scope]] stored in the instrumented instance.
*/
def kamonScope: Storage.Scope

/**
* Updates the [[Storage.Scope]] stored in the instrumented instance
*/
def setKamonScope(scope: Storage.Scope): Unit

}

object HasStorage {

/**
* [[HasStorage]] implementation that keeps the scope in a mutable field.
*/
class Mixin(@transient private var _scope: Storage.Scope) extends HasStorage {

override def kamonScope: Storage.Scope = if (_scope != null) _scope else Storage.Scope.Empty

override def setKamonScope(scope: Storage.Scope): Unit = _scope = scope
}
}

class AfterFiberInit
object AfterFiberInit {

Expand All @@ -61,13 +96,13 @@ class RunLoopWithContext
object RunLoopWithContext {

@Advice.OnMethodEnter()
@static def enter(@Advice.This fiber: Any): Scope = {
@static def enter(@Advice.This fiber: Any): Storage.Scope = {
val ctxFiber = fiber.asInstanceOf[HasContext].context
Kamon.storeContext(ctxFiber)
}

@Advice.OnMethodExit()
@static def exit(@Advice.Enter scope: Scope, @Advice.This fiber: Any): Unit = {
@static def exit(@Advice.Enter scope: Storage.Scope, @Advice.This fiber: Any): Unit = {
val leftContext = Kamon.currentContext()
scope.close()

Expand All @@ -80,19 +115,41 @@ object RestoreContextOnSuccessfulResume {

@Advice.OnMethodExit()
@static def exit(@Advice.This fiber: Any, @Advice.Return wasSuspended: Boolean): Unit = {
if (wasSuspended) {
val ctxFiber = fiber.asInstanceOf[HasContext].context
Kamon.storeContext(ctxFiber)

// Resume is tricky, most of the time we want to keep the `wasSuspended` behavior,
// but there's a single issue with Dispatcher, basically it resumes differently, so
// we try to catch that case and identify it and do something different with it.
val dispatcherIsRunningDirectly =
PackageAccessor.fiberObjectStackBuffer(fiber).exists(PackageAccessor.isDispatcherWorker)

val fi = fiber.asInstanceOf[HasContext with HasStorage]

val setContext = wasSuspended && !dispatcherIsRunningDirectly
if (setContext) {
fi.setKamonScope(Kamon.storeContext(fi.context))
} else if (dispatcherIsRunningDirectly) {
fi.setContext(Kamon.currentContext())
}
}
}

class SaveCurrentContextOnExit
object SaveCurrentContextOnExit {
class SaveCurrentContextOnSuspend
object SaveCurrentContextOnSuspend {

@Advice.OnMethodExit()
@static def exit(@Advice.This fiber: Any): Unit = {
fiber.asInstanceOf[HasContext].setContext(Kamon.currentContext())
val fi = fiber.asInstanceOf[HasContext with HasStorage]
fi.setContext(Kamon.currentContext())
}
}

class CleanFiberUp
object CleanFiberUp {

@Advice.OnMethodExit()
@static def exit(@Advice.This fiber: Any): Unit = {
val fi = fiber.asInstanceOf[HasContext with HasStorage]
fi.kamonScope.close()
}
}

Expand All @@ -108,7 +165,6 @@ class SetContextOnNewFiberForWSTP
object SetContextOnNewFiberForWSTP {

@Advice.OnMethodEnter()
@static def enter(@Advice.Argument(0) fiber: Any): Unit = {
@static def enter(@Advice.Argument(0) fiber: Any): Unit =
fiber.asInstanceOf[HasContext].setContext(Kamon.currentContext())
}
}
Loading

0 comments on commit ff945ac

Please sign in to comment.