Skip to content

Commit

Permalink
Add ZIO2 context propagation (#1318)
Browse files Browse the repository at this point in the history
* Add ZIO2 context propagation via adding a Supervisor

* Skip fiber init advice, use the supervisor exclusively

* Attempt to close the scope on end
  • Loading branch information
dispalt authored Mar 1, 2024
1 parent 25386b2 commit d362be3
Show file tree
Hide file tree
Showing 5 changed files with 323 additions and 0 deletions.
18 changes: 18 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ val instrumentationProjects = Seq[ProjectReference](
`kamon-scalaz-future`,
`kamon-cats-io`,
`kamon-cats-io-3`,
`kamon-zio-2`,
`kamon-logback`,
`kamon-jdbc`,
`kamon-kafka`,
Expand Down Expand Up @@ -268,6 +269,21 @@ lazy val `kamon-cats-io-3` = (project in file("instrumentation/kamon-cats-io-3")

).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test")

lazy val `kamon-zio-2` = (project in file("instrumentation/kamon-zio-2"))
.disablePlugins(AssemblyPlugin)
.enablePlugins(JavaAgent)
.settings(instrumentationSettings)
.settings(
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version),
libraryDependencies ++= Seq(
kanelaAgent % "provided",
"dev.zio" %% "zio" % "2.0.21" % "provided",
scalatest % "test",
logbackClassic % "test"
),

).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test")


lazy val `kamon-logback` = (project in file("instrumentation/kamon-logback"))
.disablePlugins(AssemblyPlugin)
Expand Down Expand Up @@ -1028,6 +1044,7 @@ lazy val `kamon-bundle-dependencies-2-12-and-up` = (project in file("bundle/kamo
`kamon-bundle-dependencies-all`,
`kamon-akka-grpc`,
`kamon-cats-io-3`,
`kamon-zio-2`,
`kamon-finagle`,
`kamon-pekko`,
`kamon-pekko-http`,
Expand Down Expand Up @@ -1066,6 +1083,7 @@ lazy val `kamon-bundle-dependencies-3` = (project in file("bundle/kamon-bundle-d
`kamon-caffeine`,
`kamon-aws-sdk`,
`kamon-cats-io-3`,
`kamon-zio-2`,
`kamon-pekko`,
`kamon-pekko-http`,
`kamon-pekko-grpc`
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kamon.instrumentation.zio2;

import kanela.agent.libs.net.bytebuddy.asm.Advice;
import zio.Supervisor;

public class SupervisorAdvice {

public static class OverrideDefaultSupervisor {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return(readOnly = false) Supervisor<?> supervisor) {
supervisor = supervisor.$plus$plus(new NewSupervisor());
}
}
}
20 changes: 20 additions & 0 deletions instrumentation/kamon-zio-2/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#############################################
# Kamon ZIO 2 Reference Configuration #
#############################################
kanela.modules {
zio-2 {
name = "ZIO 2 Instrumentation"
description = "Provides instrumentation for ZIO"

instrumentations = [
"kamon.instrumentation.zio2.ZIO2Instrumentation"
]

within = [
"zio.internal.FiberRuntime",
"zio\\.Runtime.*",
]
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package kamon.instrumentation.zio2

import kamon.Kamon
import kamon.context.Storage
import kamon.instrumentation.context.HasContext
import kanela.agent.api.instrumentation.InstrumentationBuilder
import zio.{Exit, Fiber, Supervisor, UIO, Unsafe, ZEnvironment, ZIO}


/**
* This works as follows.
* - patches the defaultSupervisor val from Runtime to add our own supervisor.
* - Mixes in the [[HasContext.Mixin]] class so we don't have to keep a separate map of Fiber -> Context
* - Performs context shifting based on starting/suspending of fibers.
*
*/
class ZIO2Instrumentation extends InstrumentationBuilder {

onType("zio.internal.FiberRuntime")
.mixin(classOf[HasContext.Mixin])
.mixin(classOf[HasStorage.Mixin])

onType("zio.Runtime$")
.advise(method("defaultSupervisor"), classOf[SupervisorAdvice.OverrideDefaultSupervisor])
}

/**
* Mixin that exposes access to the scope captured by an instrumented instance. The interface exposes means of getting and more importantly
* closing of the scope.
*/
trait HasStorage {

/**
* Returns the [[Storage.Scope]] stored in the instrumented instance.
*/
def kamonScope: Storage.Scope

/**
* Updates the [[Storage.Scope]] stored in the instrumented instance
*/
def setKamonScope(scope: Storage.Scope): Unit

}

object HasStorage {

/**
* [[HasStorage]] implementation that keeps the scope in a mutable field.
*/
class Mixin(@transient private var _scope: Storage.Scope) extends HasStorage {

override def kamonScope: Storage.Scope = if (_scope != null) _scope else Storage.Scope.Empty

override def setKamonScope(scope: Storage.Scope): Unit = _scope = scope
}
}


class NewSupervisor extends Supervisor[Any] {

override def value(implicit trace: zio.Trace): UIO[Any] = ZIO.unit

override def onStart[R, E, A_](environment: ZEnvironment[R], effect: ZIO[R, E, A_], parent: Option[Fiber.Runtime[Any, Any]], fiber: Fiber.Runtime[E, A_])(implicit unsafe: Unsafe): Unit = {
fiber.asInstanceOf[HasContext].setContext(Kamon.currentContext())
}

override def onSuspend[E, A_](fiber: Fiber.Runtime[E, A_])(implicit unsafe: Unsafe): Unit = {
fiber.asInstanceOf[HasContext].setContext(Kamon.currentContext())
}

override def onResume[E, A_](fiber: Fiber.Runtime[E, A_])(implicit unsafe: Unsafe): Unit = {
val fiberInstance = fiber.asInstanceOf[HasContext with HasStorage]
val ctx = fiberInstance.context
fiberInstance.setKamonScope(Kamon.storeContext(ctx))
}

override def onEnd[R, E, A_](value: Exit[E, A_], fiber: Fiber.Runtime[E, A_])(implicit unsafe: Unsafe): Unit = {
val fiberInstance = fiber.asInstanceOf[HasContext with HasStorage]
fiberInstance.kamonScope.close()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package kamon.instrumentation.zio2


import kamon.Kamon
import kamon.context.Context
import kamon.tag.Lookups.plain
import org.scalatest.{Assertion, BeforeAndAfterEach, OptionValues}
import org.scalatest.concurrent.{Eventually, PatienceConfiguration, ScalaFutures}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import java.util.concurrent.{Executors, TimeUnit}
import scala.concurrent.{Await, ExecutionContext, Future}
import kamon.trace.Identifier.Scheme
import kamon.trace.{Identifier, Span, Trace}
import zio._

import scala.concurrent.duration.FiniteDuration

class ZIO2InstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutures with PatienceConfiguration
with OptionValues with Eventually with BeforeAndAfterEach {

protected implicit val zioRuntime: Runtime[Any] =
Runtime.default

protected def unsafeRunZIO[E, A](r: ZIO[Any, E, A])(implicit trace: zio.Trace): A = {
Unsafe.unsafe(implicit unsafe => zioRuntime.unsafe.run(r)) match {
case Exit.Success(value) => value
case f @ Exit.Failure(cause) => fail(cause.squashWith(_ => new Exception(f.toString)))
}
}

java.lang.System.setProperty("kamon.context.debug", "true")

"a ZIO created when instrumentation is active" should {
"capture the active span available when created" which {

"must capture the current context when creating and running fibers" in {

val context = Context.of("tool", "kamon")

val effect = for {
contextOnAnotherThread <- ZIO.succeed(Kamon.currentContext())
_ <- ZIO.succeed(Seq("hello", "world"))
_ <- ZIO.sleep(Duration(10, TimeUnit.MILLISECONDS))
contextAnother <- ZIO.attemptBlocking(Kamon.currentContext())
} yield {
val currentContext = Kamon.currentContext()
currentContext shouldBe context
contextOnAnotherThread shouldBe context
currentContext
}

val contextInsideYield = Kamon.runWithContext(context) {
// This is what would happen at the edges of the system, when Kamon has already
// started a Span in an outer layer (usually the HTTP server instrumentation) and
// when processing gets to user-level code, the users want to run their business
// logic as an effect. We should always propagate the context that was available
// at this point to the moment when the effect runs.

unsafeRunZIO(effect)
}

context shouldBe contextInsideYield
}

"must allow the context to be cleaned" in {
val anotherExecutor = Executor.fromExecutionContext(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(10)))
val context = Context.of("key", "value")

val test =
for {
_ <- ZIO.succeed(Kamon.storeContext(context))
_ <- ZIO.onExecutor(anotherExecutor)(ZIO.sleep(10.millis))
beforeCleaning <- ZIO.succeed(Kamon.currentContext())
_ <- ZIO.succeed(Kamon.storeContext(Context.Empty))
_ <- ZIO.onExecutor(anotherExecutor)(ZIO.sleep(10.millis))
afterCleaning <- ZIO.succeed(Kamon.currentContext())
} yield {
afterCleaning shouldBe Context.Empty
beforeCleaning shouldBe context
}

unsafeRunZIO(test)
}

"must be available across asynchronous boundaries" in {
val anotherExecutor: Executor = Executor.fromExecutionContext(ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1))) //pool 7
val context = Context.of("key", "value")
val test =
for {
scope <- ZIO.succeed(Kamon.storeContext(context))
len <- ZIO.succeed("Hello Kamon!").map(_.length)
_ <- ZIO.succeed(len.toString)
beforeChanging <- getKey
evalOnGlobalRes <-ZIO.sleep(Duration.Zero) *> getKey
outerSpanIdBeginning <- ZIO.succeed(Kamon.currentSpan().id.string)
innerSpan <- ZIO.succeed(Kamon.clientSpanBuilder("Foo", "attempt").context(context).start())
innerSpanId1 <- ZIO.onExecutor(anotherExecutor)(ZIO.succeed(Kamon.currentSpan()))
innerSpanId2 <- ZIO.succeed(Kamon.currentSpan())
_ <- ZIO.succeed(innerSpan.finish())
outerSpanIdEnd <- ZIO.succeed(Kamon.currentSpan().id.string)
evalOnAnotherEx <-ZIO.onExecutor(anotherExecutor)(ZIO.sleep(Duration.Zero).flatMap(_ => getKey))
} yield {
scope.close()
withClue("before changing")(beforeChanging shouldBe "value")
withClue("on the global exec context")(evalOnGlobalRes shouldBe "value")
withClue("on a different exec context")(evalOnAnotherEx shouldBe "value")
withClue("final result")(evalOnAnotherEx shouldBe "value")
withClue("inner span should be the same on different exec")(innerSpanId1 shouldBe innerSpan)
withClue("inner span should be the same on same exec")(innerSpanId2 shouldBe innerSpan)
withClue("inner and outer should be different")(outerSpanIdBeginning should not equal innerSpan)
}

unsafeRunZIO(test)

}

"must allow complex Span topologies to be created" in {
val parentSpan = Span.Remote(
Scheme.Single.spanIdFactory.generate(),
Identifier.Empty,
Trace.create(Scheme.Single.traceIdFactory.generate(), Trace.SamplingDecision.Sample)
)
val context = Context.of(Span.Key, parentSpan)
implicit val ec = ExecutionContext.global
/**
* test
* - nestedLevel0
* - nestedUpToLevel2
* - nestedUpToLevel2._2._1
* - fiftyInParallel
*/
val test = for {
span <- ZIO.succeed(Kamon.currentSpan())
nestedLevel0 <- meteredWithSpanCapture("level1-A")(ZIO.sleep(100.millis))
nestedUpToLevel2 <- meteredWithSpanCapture("level1-B")(meteredWithSpanCapture("level2-B")(ZIO.sleep(100.millis)))
fiftyInParallel <- ZIO.foreachPar((0 to 49).toList)(i => meteredWithSpanCapture(s"operation$i")(ZIO.sleep(100.millis)))
afterCede <- meteredWithSpanCapture("yieldNow")(ZIO.yieldNow.as(Kamon.currentSpan()))
afterEverything <- ZIO.succeed(Kamon.currentSpan())
} yield {
span.id.string should not be empty
span.id.string shouldBe nestedLevel0._1.parentId.string
span.id.string shouldBe nestedUpToLevel2._1.parentId.string
nestedUpToLevel2._1.id.string shouldBe nestedUpToLevel2._2._1.parentId.string
fiftyInParallel.map(_._1.parentId.string).toSet shouldBe Set(span.id.string)
fiftyInParallel.map(_._1.id.string).toSet should have size 50
afterCede._1.id.string shouldBe afterCede._2.id.string //A cede should not cause the span to be lost
afterEverything.id.string shouldBe span.id.string
}



val result = scala.concurrent.Future.sequence(
(1 to 100).toList.map { _ =>
Unsafe.unsafe { implicit unsafe =>
zioRuntime.unsafe.runToFuture {
(ZIO.succeed(Kamon.init()) *> ZIO.succeed(Kamon.storeContext(context)) *> test)
}: Future[Assertion]
}
}
)
Await.result(result, FiniteDuration(100, "seconds"))
}
}
}

private def getKey: UIO[String] = {
ZIO.succeed(Kamon.currentContext().getTag(plain("key")))
}

private def meteredWithSpanCapture[A](operation: String)(io: UIO[A]): UIO[(Span, A)] = {
ZIO.scoped {
ZIO.acquireRelease {
for {
initialCtx <- ZIO.succeed(Kamon.currentContext())
parentSpan <- ZIO.succeed(Kamon.currentSpan())
newSpan <- ZIO.succeed(Kamon.spanBuilder(operation).context(initialCtx).asChildOf(parentSpan).start())
_ <- ZIO.succeed(Kamon.storeContext(initialCtx.withEntry(Span.Key, newSpan)))
} yield (initialCtx, newSpan)
}{
case (initialCtx, span) =>
for {
_ <- ZIO.succeed(span.finish())
_ <- ZIO.succeed(Kamon.storeContext(initialCtx))
} yield ()
} *> ZIO.succeed(Kamon.currentSpan()).zipPar(io)
}
}
}

0 comments on commit d362be3

Please sign in to comment.