Skip to content

Commit

Permalink
merge the kamon-bundle project and several build changes required, re…
Browse files Browse the repository at this point in the history
…lated #626
  • Loading branch information
ivantopo committed Apr 1, 2020
2 parents 7032aff + 18665ba commit 9c2d8d5
Show file tree
Hide file tree
Showing 15 changed files with 436 additions and 219 deletions.
370 changes: 223 additions & 147 deletions build.sbt

Large diffs are not rendered by default.

93 changes: 93 additions & 0 deletions bundle/kamon-bundle/src/main/scala/kamon/bundle/Bundle.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package kamon.bundle

import java.lang.management.ManagementFactory
import java.nio.file.{Files, StandardCopyOption}

import net.bytebuddy.agent.ByteBuddyAgent

object Bundle {

private val _instrumentationClassLoaderProp = "kanela.instrumentation.classLoader"

/**
* Attaches the Kanela agent to the current JVM. This method will ignore any attempts to attach the agent if it has
* been attached already.
*/
def attach(): Unit = {
val springBootClassLoader = findSpringBootJarLauncherClassLoader()

if(isKanelaLoaded) {

// If Kanela has already been loaded and we are running on a Spring Boot application, we might need to reload
// Kanela to ensure it will use the proper ClassLoader for loading the instrumentations.
springBootClassLoader.foreach(sbClassLoader => {
withInstrumentationClassLoader(sbClassLoader)(reloadKanela())
})

} else {

val embeddedAgentFile = Bundle.getClass.getClassLoader.getResourceAsStream(BuildInfo.kanelaAgentJarName)
val temporaryAgentFile = Files.createTempFile(BuildInfo.kanelaAgentJarName, ".jar")
Files.copy(embeddedAgentFile, temporaryAgentFile, StandardCopyOption.REPLACE_EXISTING)

withInstrumentationClassLoader(springBootClassLoader.orNull) {
ByteBuddyAgent.attach(temporaryAgentFile.toFile, pid())
}
}
}

/**
* Tries to determine whether the Kanela agent has been loaded already. Since there are no APIs to determine what
* agents have been loaded on the current JVM, we rely on two cues that indicate that Kanela is present: first, the
* "kanela.loaded" System property which is set to "true" when the Kanela agent is started and second, the presence
* of the Kanela class in the System ClassLoader. None of these two cues are definite proof, but having both of them
* gives a level of certainty of the Kanela agent being loaded already.
*/
private def isKanelaLoaded(): Boolean = {
val isLoadedProperty = java.lang.Boolean.parseBoolean(System.getProperty("kanela.loaded"))
val hasKanelaClasses = try {
Class.forName("kanela.agent.Kanela", false, ClassLoader.getSystemClassLoader) != null
} catch { case _: Throwable => false }

hasKanelaClasses && isLoadedProperty
}

/**
* Tries to find Spring Boot's classloader, if any. When running a Spring Boot application packaged with the
* "spring-boot-maven-plugin", a fat jar will be created with all the dependencies in it and a special ClassLoader is
* used to unpack them when the jar launches. This function will try to find that ClassLoader which should be used to
* load all Kanela modules.
*/
private def findSpringBootJarLauncherClassLoader(): Option[ClassLoader] = {
Option(Thread.currentThread().getContextClassLoader())
.filter(cl => cl.getClass.getName == "org.springframework.boot.loader.LaunchedURLClassLoader")
}


/**
* Reloads the Kanela agent. This will cause all instrumentation definitions to be dropped and re-initialized.
*/
private def reloadKanela(): Unit = {

// We know that if the agent has been attached, its classes are in the System ClassLoader so we try to find
// the Kanela class from there and call reload on it.
Class.forName("kanela.agent.Kanela", true, ClassLoader.getSystemClassLoader)
.getDeclaredMethod("reload")
.invoke(null)
}

private def pid(): String = {
val jvm = ManagementFactory.getRuntimeMXBean.getName
jvm.substring(0, jvm.indexOf('@'))
}

def withInstrumentationClassLoader[T](classLoader: ClassLoader)(thunk: => T): T = {
try {
if(classLoader != null)
System.getProperties.put(_instrumentationClassLoaderProp, classLoader)
thunk
} finally {
System.getProperties.remove(_instrumentationClassLoaderProp)
}
}
}
14 changes: 14 additions & 0 deletions core/kamon-core-tests/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<configuration>
<statusListener class="ch.qos.logback.core.status.NopStatusListener"/>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<filter class="kamon.log.LogbackFilter"/>
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
34 changes: 2 additions & 32 deletions core/kamon-core/src/main/scala/kamon/metric/Distribution.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ object Distribution {
* to the provided dynamic range if necessary.
*/
def merge(left: Distribution, right: Distribution, dynamicRange: DynamicRange): Distribution = {
val h = LocalHistogram.get(dynamicRange)
val h = Histogram.Local.get(dynamicRange)
left.bucketsIterator.foreach(b => h.recordValueWithCount(b.value, b.frequency))
right.bucketsIterator.foreach(b => h.recordValueWithCount(b.value, b.frequency))
h.snapshot(true)
Expand Down Expand Up @@ -199,7 +199,7 @@ object Distribution {
if(unit == actualToUnit && distribution.dynamicRange == toDynamicRange)
distribution
else {
val scaledHistogram = LocalHistogram.get(toDynamicRange)
val scaledHistogram = Histogram.Local.get(toDynamicRange)
distribution.bucketsIterator.foreach(bucket => {
val roundValue = Math.round(MeasurementUnit.convert(bucket.value, unit, toUnit))
val convertedValue = if(roundValue == 0L) 1L else roundValue
Expand Down Expand Up @@ -333,34 +333,4 @@ object Distribution {
case class Bucket(value: Long, frequency: Long) extends Distribution.Bucket
case class Percentile(rank: Double, value: Long, countAtRank: Long) extends Distribution.Percentile
}

/**
* Histogram implementation that can only be used local to a thread or with external synchronization. This is only
* used to power aggregation of distributions given that it is a lot simpler to create histograms and dump all the
* values on them rather than trying to manually aggregate the buckets and take into account adjustments on dynamic
* range.
*/
private[kamon] class LocalHistogram(val dynamicRange: DynamicRange) extends BaseLocalHdrHistogram(dynamicRange)
with Instrument.Snapshotting[Distribution] with DistributionSnapshotBuilder

private[kamon] object LocalHistogram {

/** Keeps a thread-local cache of local histograms that can be reused when aggregating distributions. */
private val _localHistograms = new ThreadLocal[collection.mutable.Map[DynamicRange, LocalHistogram]] {
override def initialValue(): collection.mutable.Map[DynamicRange, LocalHistogram] =
collection.mutable.Map.empty
}

/**
* Creates or retrieves a local histogram for the provided dynamic range. In theory, it is safe to do this from the
* memory usage perspective since distribution merging or converting (the use cases for a Local Histogram) are only
* expected to happen on reporter threads and all reporters have their own dedicated thread so we wont have many
* instances around.
*/
def get(dynamicRange: DynamicRange): LocalHistogram = {
val histogram = _localHistograms.get().getOrElseUpdate(dynamicRange, new LocalHistogram(dynamicRange))
histogram.reset()
histogram
}
}
}
32 changes: 31 additions & 1 deletion core/kamon-core/src/main/scala/kamon/metric/Histogram.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.nio.ByteBuffer

import kamon.metric.Metric.{BaseMetric, BaseMetricAutoUpdate}
import kamon.tag.TagSet
import org.HdrHistogram.{BaseAtomicHdrHistogram, HdrHistogramInternalState, ZigZag}
import org.HdrHistogram.{BaseAtomicHdrHistogram, BaseLocalHdrHistogram, HdrHistogramInternalState, ZigZag}
import org.slf4j.LoggerFactory


Expand Down Expand Up @@ -162,4 +162,34 @@ object Histogram {
override def initialValue(): ByteBuffer = ByteBuffer.allocate(33792)
}
}

/**
* Histogram implementation that can only be used local to a thread or with external synchronization. This is only
* used to power aggregation of distributions given that it is a lot simpler to create histograms and dump all the
* values on them rather than trying to manually aggregate the buckets and take into account adjustments on dynamic
* range.
*/
private[kamon] class Local(val dynamicRange: DynamicRange) extends BaseLocalHdrHistogram(dynamicRange)
with Instrument.Snapshotting[Distribution] with DistributionSnapshotBuilder

private[kamon] object Local {

/** Keeps a thread-local cache of local histograms that can be reused when aggregating distributions. */
private val _localHistograms = new ThreadLocal[collection.mutable.Map[DynamicRange, Histogram.Local]] {
override def initialValue(): collection.mutable.Map[DynamicRange, Histogram.Local] =
collection.mutable.Map.empty
}

/**
* Creates or retrieves a local histogram for the provided dynamic range. In theory, it is safe to do this from the
* memory usage perspective since distribution merging or converting (the use cases for a Local Histogram) are only
* expected to happen on reporter threads and all reporters have their own dedicated thread so we wont have many
* instances around.
*/
def get(dynamicRange: DynamicRange): Histogram.Local = {
val histogram = _localHistograms.get().getOrElseUpdate(dynamicRange, new Histogram.Local(dynamicRange))
histogram.reset()
histogram
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package kamon.testkit

import java.time.Duration

import kamon.metric.Distribution.LocalHistogram
import kamon.metric.Histogram
import kamon.metric.Metric.Settings
import kamon.metric.{DynamicRange, Instrument, MeasurementUnit, MetricSnapshot}
import kamon.tag.TagSet
Expand Down Expand Up @@ -79,7 +79,7 @@ object MetricSnapshotBuilder {
* attributes and values.
*/
def histogram(name: String, description: String, tags: TagSet, unit: MeasurementUnit)(values: Long*): MetricSnapshot.Distributions = {
val localHistogram = LocalHistogram.get(DynamicRange.Default)
val localHistogram = Histogram.Local.get(DynamicRange.Default)
localHistogram.reset()

values.foreach(v => localHistogram.recordValue(v))
Expand Down
8 changes: 4 additions & 4 deletions instrumentation/kamon-akka/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ configs(
// The Common configuration should always depend on the latest version of Akka. All code in the Common configuration
// should be source compatible with all Akka versions.
inConfig(Common)(Defaults.compileSettings ++ Seq(
crossScalaVersions := Seq("2.12.10", "2.13.1")
crossScalaVersions := Seq("2.12.11", "2.13.1")
))

libraryDependencies ++= { if(scalaBinaryVersion.value == "2.11") Seq.empty else Seq(
Expand All @@ -50,7 +50,7 @@ libraryDependencies ++= { if(scalaBinaryVersion.value == "2.11") Seq.empty else


inConfig(`Compile-Akka-2.6`)(Defaults.compileSettings ++ Seq(
crossScalaVersions := Seq("2.12.10", "2.13.1"),
crossScalaVersions := Seq("2.12.11", "2.13.1"),
sources := joinSources(Common, `Compile-Akka-2.6`).value
))

Expand Down Expand Up @@ -130,7 +130,7 @@ lazy val baseTestSettings = Seq(
)

inConfig(TestCommon)(Defaults.testSettings ++ instrumentationSettings ++ baseTestSettings ++ Seq(
crossScalaVersions := Seq("2.12.10", "2.13.1")
crossScalaVersions := Seq("2.12.11", "2.13.1")
))

inConfig(`Test-Akka-2.5`)(Defaults.testSettings ++ instrumentationSettings ++ baseTestSettings ++ Seq(
Expand All @@ -140,7 +140,7 @@ inConfig(`Test-Akka-2.5`)(Defaults.testSettings ++ instrumentationSettings ++ ba
))

inConfig(`Test-Akka-2.6`)(Defaults.testSettings ++ instrumentationSettings ++ baseTestSettings ++ Seq(
crossScalaVersions := Seq("2.12.10", "2.13.1"),
crossScalaVersions := Seq("2.12.11", "2.13.1"),
sources := joinSources(TestCommon, `Test-Akka-2.6`).value,
unmanagedResourceDirectories ++= (unmanagedResourceDirectories in Common).value,
unmanagedResourceDirectories ++= (unmanagedResourceDirectories in TestCommon).value
Expand Down
23 changes: 23 additions & 0 deletions instrumentation/kamon-executors/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import sbt.Tests.{Group, SubProcess}

testGrouping in Test := groupByExperimentalExecutorTests((definedTests in Test).value, kanelaAgentJar.value)

def groupByExperimentalExecutorTests(tests: Seq[TestDefinition], kanelaJar: File): Seq[Group] = {
val (stable, experimental) = tests.partition(t => t.name != "kamon.instrumentation.executor.CaptureContextOnSubmitInstrumentationSpec")

val stableGroup = Group("stableTests", stable, SubProcess(
ForkOptions().withRunJVMOptions(Vector(
"-javaagent:" + kanelaJar.toString
))
))

val experimentalGroup = Group("experimentalTests", experimental, SubProcess(
ForkOptions().withRunJVMOptions(Vector(
"-javaagent:" + kanelaJar.toString,
"-Dkanela.modules.executor-service.enabled=false",
"-Dkanela.modules.executor-service-capture-on-submit.enabled=true"
))
))

Seq(stableGroup, experimentalGroup)
}
4 changes: 2 additions & 2 deletions instrumentation/kamon-play/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ lazy val baseTestSettings = Seq(
)

inConfig(TestCommon)(Defaults.testSettings ++ instrumentationSettings ++ baseTestSettings ++ Seq(
crossScalaVersions := Seq("2.11.12", "2.12.10")
crossScalaVersions := Seq("2.11.12", "2.12.11")
))

inConfig(`Test-Play-2.6`)(Defaults.testSettings ++ instrumentationSettings ++ baseTestSettings ++ Seq(
sources := joinSources(TestCommon, `Test-Play-2.6`).value,
crossScalaVersions := Seq("2.11.12", "2.12.10"),
crossScalaVersions := Seq("2.11.12", "2.12.11"),
testGrouping := singleTestPerJvm(definedTests.value, javaOptions.value),
unmanagedResourceDirectories ++= (unmanagedResourceDirectories in Compile).value,
unmanagedResourceDirectories ++= (unmanagedResourceDirectories in TestCommon).value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,25 @@ class PlayServerInstrumentation extends InstrumentationBuilder {
.mixin(classOf[HasServerInstrumentation.Mixin])
.advise(isConstructor, NettyServerInitializationAdvice)

onType("play.core.server.netty.PlayRequestHandler")
if(hasGenericFutureListener()) {
onType("play.core.server.netty.PlayRequestHandler")
.when(isNettyAround)
.mixin(classOf[HasServerInstrumentation.Mixin])
.mixin(classOf[HasTimestamp.Mixin])
.advise(isConstructor, PlayRequestHandlerConstructorAdvice)
.advise(isConstructor, CaptureCurrentTimestampOnExit)
.advise(method("handle"), NettyPlayRequestHandlerHandleAdvice)
}

/**
* This final bit ensures that we will apply an operation name right before filters get to execute.
*/
onType("play.api.http.DefaultHttpRequestHandler")
.advise(method("filterHandler").and(takesArguments(2)), GenerateOperationNameOnFilterHandler)

private def hasGenericFutureListener(): Boolean = {
try { Class.forName("io.netty.util.concurrent.GenericFutureListener") != null} catch { case _ => false }
}
}


Expand Down
Loading

0 comments on commit 9c2d8d5

Please sign in to comment.