Skip to content

Commit

Permalink
Scala 3 bundle (#1313)
Browse files Browse the repository at this point in the history
* jdbc support (scala 3)

* fix okhttp + spring for scala 3. Add a kamon-bundle-3

* fix bundle for scala 3

* Self hosted runner isn't picking up jobs, move to GitHub one...
  • Loading branch information
hughsimpson committed Dec 3, 2023
1 parent 5f36f94 commit 25386b2
Show file tree
Hide file tree
Showing 21 changed files with 249 additions and 161 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on: [push, pull_request]

jobs:
ci:
runs-on: self-hosted
runs-on: ubuntu-latest
env:
JAVA_OPTS: -Xms5120M -Xmx5120M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8
JVM_OPTS: -Xms5120M -Xmx5120M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:

jobs:
release:
runs-on: self-hosted
runs-on: ubuntu-latest
env:
JAVA_OPTS: -Xms5120M -Xmx5120M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8
JVM_OPTS: -Xms5120M -Xmx5120M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8
Expand Down
102 changes: 68 additions & 34 deletions build.sbt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ import kamon.instrumentation.cassandra.CassandraInstrumentation
import kamon.util.CallingThreadExecutionContext
import kanela.agent.libs.net.bytebuddy.asm.Advice

import scala.annotation.static

class PoolConstructorAdvice
object PoolConstructorAdvice {

@Advice.OnMethodExit
def onConstructed(
@static def onConstructed(
@Advice.This poolWithMetrics: HostConnectionPool with HasPoolMetrics,
@Advice.FieldValue("host") host: Host,
@Advice.FieldValue("totalInFlight") totalInflight: AtomicInteger
Expand All @@ -46,10 +49,11 @@ object PoolConstructorAdvice {
}
}

class PoolCloseAdvice
object PoolCloseAdvice {

@Advice.OnMethodExit
def onClose(@Advice.This poolWithMetrics: HostConnectionPool with HasPoolMetrics): Unit = {
@static def onClose(@Advice.This poolWithMetrics: HostConnectionPool with HasPoolMetrics): Unit = {
poolWithMetrics.nodeMonitor.cleanup()
}
}
Expand All @@ -62,12 +66,12 @@ class BorrowAdvice
object BorrowAdvice {

@Advice.OnMethodEnter
def startBorrow(@Advice.This poolMetrics: HasPoolMetrics): Long = {
@static def startBorrow(@Advice.This poolMetrics: HasPoolMetrics): Long = {
Kamon.clock().nanos()
}

@Advice.OnMethodExit(suppress = classOf[Throwable], inline = false)
def onBorrowed(
@static def onBorrowed(
@Advice.Return connection: ListenableFuture[Connection],
@Advice.Enter start: Long,
@Advice.This poolMetrics: HasPoolMetrics,
Expand All @@ -91,10 +95,11 @@ object BorrowAdvice {
* Incremented when new connection requested and decremented either on
* connection being explicitly trashed or defunct
*/
class InitPoolAdvice
object InitPoolAdvice {

@Advice.OnMethodExit
def onPoolInited(
@static def onPoolInited(
@Advice.This hasPoolMetrics: HasPoolMetrics,
@Advice.Return done: ListenableFuture[_],
@Advice.FieldValue("open") openConnections: AtomicInteger
Expand All @@ -108,10 +113,11 @@ object InitPoolAdvice {
}
}

class CreateConnectionAdvice
object CreateConnectionAdvice {

@Advice.OnMethodExit
def onConnectionCreated(
@static def onConnectionCreated(
@Advice.This hasPoolMetrics: HasPoolMetrics,
@Advice.Return created: Boolean
): Unit =
Expand All @@ -120,21 +126,23 @@ object CreateConnectionAdvice {
}
}

class TrashConnectionAdvice
object TrashConnectionAdvice {

@Advice.OnMethodExit
def onConnectionTrashed(
@static def onConnectionTrashed(
@Advice.This hasPoolMetrics: HasPoolMetrics,
@Advice.FieldValue("host") host: Host
): Unit = {
hasPoolMetrics.nodeMonitor.connectionTrashed
}
}

class ConnectionDefunctAdvice
object ConnectionDefunctAdvice {

@Advice.OnMethodExit
def onConnectionDefunct(@Advice.This hasPoolMetrics: HasPoolMetrics): Unit = {
@static def onConnectionDefunct(@Advice.This hasPoolMetrics: HasPoolMetrics): Unit = {
hasPoolMetrics.nodeMonitor.connectionClosed
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.datastax.driver.core

import java.util.concurrent.atomic.AtomicReference

import com.datastax.driver.core.Message.Response
import com.datastax.driver.core.RequestHandler.QueryState
import kamon.Kamon
Expand All @@ -31,20 +30,23 @@ import kamon.instrumentation.context.HasContext
import kamon.trace.Span
import kanela.agent.libs.net.bytebuddy.asm.Advice

import scala.annotation.static

object QueryOperations {
val QueryOperationName = "cassandra.query"
val BatchOperationName = "cassandra.batch"
val QueryPrepareOperationName: String = QueryOperationName + ".prepare"
val ExecutionOperationName: String = QueryOperationName + ".execution"
}

class QueryExecutionAdvice
object QueryExecutionAdvice {
import QueryOperations._

val ParentSpanKey: Context.Key[Span] = Context.key[Span]("__parent-span", Span.Empty)

@Advice.OnMethodEnter
def onQueryExec(
@static def onQueryExec(
@Advice.This execution: HasContext,
@Advice.Argument(0) host: Host with HasPoolMetrics,
@Advice.FieldValue("position") position: Int,
Expand Down Expand Up @@ -84,10 +86,11 @@ object QueryExecutionAdvice {
/**
* Transfer context from msg to created result set so it can be used for further page fetches
*/
class OnResultSetConstruction
object OnResultSetConstruction {

@Advice.OnMethodExit
def onCreateResultSet(
@static def onCreateResultSet(
@Advice.Return rs: ArrayBackedResultSet,
@Advice.Argument(0) msg: Responses.Result with HasContext
): Unit = if (rs.isInstanceOf[HasContext]) {
Expand All @@ -96,36 +99,39 @@ object OnResultSetConstruction {

}

class OnFetchMore
object OnFetchMore {

@Advice.OnMethodEnter
def onFetchMore(@Advice.This hasContext: HasContext): Scope = {
@static def onFetchMore(@Advice.This hasContext: HasContext): Scope = {
val clientSpan = hasContext.context.get(QueryExecutionAdvice.ParentSpanKey)
Kamon.storeContext(Context.of(Span.Key, clientSpan))
}

@Advice.OnMethodExit
def onFetched(@Advice.Enter scope: Scope): Unit = {
@static def onFetched(@Advice.Enter scope: Scope): Unit = {
scope.close()
}
}

class QueryWriteAdvice
object QueryWriteAdvice {

@Advice.OnMethodEnter
def onStartWriting(@Advice.This execution: HasContext): Unit = {
@static def onStartWriting(@Advice.This execution: HasContext): Unit = {
execution.context
.get(Span.Key)
.mark("cassandra.connection.write-started")
}
}

//Server timeouts and exceptions
class OnSetAdvice
object OnSetAdvice {
import QueryOperations._

@Advice.OnMethodEnter
def onSetResult(
@static def onSetResult(
@Advice.This execution: Connection.ResponseCallback with HasContext,
@Advice.Argument(0) connection: Connection,
@Advice.Argument(1) response: Message.Response,
Expand Down Expand Up @@ -159,10 +165,11 @@ object OnSetAdvice {
/**
* Handling of client exceptions
*/
class OnExceptionAdvice
object OnExceptionAdvice {

@Advice.OnMethodEnter
def onException(
@static def onException(
@Advice.This execution: HasContext,
@Advice.Argument(0) connection: Connection,
@Advice.Argument(1) exception: Exception,
Expand All @@ -181,10 +188,11 @@ object OnExceptionAdvice {
/**
* Handling of client timeouts
*/
class OnTimeoutAdvice
object OnTimeoutAdvice {

@Advice.OnMethodEnter
def onTimeout(
@static def onTimeout(
@Advice.This execution: HasContext,
@Advice.Argument(0) connection: Connection,
@Advice.FieldValue("current") currentHost: Host with HasPoolMetrics
Expand All @@ -199,10 +207,11 @@ object OnTimeoutAdvice {
}
}

class HostLocationAdvice
object HostLocationAdvice {

@Advice.OnMethodExit
def onHostLocationUpdate(
@static def onHostLocationUpdate(
@Advice.This host: Host with HasPoolMetrics,
@Advice.FieldValue("manager") clusterManager: Any
): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
package com.datastax.driver.core

import java.util.concurrent.Callable

import kamon.instrumentation.cassandra.driver.InstrumentedSession
import kanela.agent.libs.net.bytebuddy.implementation.bind.annotation.{RuntimeType, SuperCall}

import scala.annotation.static

class SessionInterceptor
object SessionInterceptor {

@RuntimeType
def newSession(@SuperCall session: Callable[Session]): Session = {
@static def newSession(@SuperCall session: Callable[Session]): Session = {
new InstrumentedSession(session.call())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import kanela.agent.libs.net.bytebuddy.asm.Advice

import java.util.concurrent.CompletionStage
import java.util.function.BiConsumer
import scala.annotation.static

class DriverInstrumentation extends InstrumentationBuilder {

Expand All @@ -51,12 +52,12 @@ class DriverInstrumentation extends InstrumentationBuilder {
*/
onType("com.datastax.driver.core.HostConnectionPool")
.advise(method("borrowConnection"), classOf[BorrowAdvice])
.advise(method("trashConnection"), TrashConnectionAdvice)
.advise(method("addConnectionIfUnderMaximum"), CreateConnectionAdvice)
.advise(method("onConnectionDefunct"), ConnectionDefunctAdvice)
.advise(isConstructor, PoolConstructorAdvice)
.advise(method("initAsync"), InitPoolAdvice)
.advise(method("closeAsync"), PoolCloseAdvice)
.advise(method("trashConnection"), classOf[TrashConnectionAdvice])
.advise(method("addConnectionIfUnderMaximum"), classOf[CreateConnectionAdvice])
.advise(method("onConnectionDefunct"), classOf[ConnectionDefunctAdvice])
.advise(isConstructor, classOf[PoolConstructorAdvice])
.advise(method("initAsync"), classOf[InitPoolAdvice])
.advise(method("closeAsync"), classOf[PoolCloseAdvice])
.mixin(classOf[HasPoolMetrics.Mixin])

/**
Expand All @@ -66,18 +67,18 @@ class DriverInstrumentation extends InstrumentationBuilder {
* to be used for further fetches
*/
onType("com.datastax.driver.core.RequestHandler$SpeculativeExecution")
.advise(method("query"), QueryExecutionAdvice)
.advise(method("write"), QueryWriteAdvice)
.advise(method("onException"), OnExceptionAdvice)
.advise(method("onTimeout"), OnTimeoutAdvice)
.advise(method("onSet"), OnSetAdvice)
.advise(method("query"), classOf[QueryExecutionAdvice])
.advise(method("write"), classOf[QueryWriteAdvice])
.advise(method("onException"), classOf[OnExceptionAdvice])
.advise(method("onTimeout"), classOf[OnTimeoutAdvice])
.advise(method("onSet"), classOf[OnSetAdvice])
.mixin(classOf[HasContext.MixinWithInitializer])

onSubTypesOf("com.datastax.driver.core.Message$Response")
.mixin(classOf[HasContext.MixinWithInitializer])

onType("com.datastax.driver.core.ArrayBackedResultSet")
.advise(method("fromMessage"), OnResultSetConstruction)
.advise(method("fromMessage"), classOf[OnResultSetConstruction])

/**
* In order for fetchMore execution to be a sibling of original execution
Expand All @@ -86,15 +87,15 @@ class DriverInstrumentation extends InstrumentationBuilder {
onType("com.datastax.driver.core.ArrayBackedResultSet$MultiPage")
.mixin(classOf[HasContext.MixinWithInitializer])
onType("com.datastax.driver.core.ArrayBackedResultSet$MultiPage")
.advise(method("queryNextPage"), OnFetchMore)
.advise(method("queryNextPage"), classOf[OnFetchMore])

/**
* Query metrics are tagged with target information (based on config)
* so all query metrics are mixed into a Host object
*/
onType("com.datastax.driver.core.Host")
.mixin(classOf[HasPoolMetrics.Mixin])
.advise(method("setLocationInfo"), HostLocationAdvice)
.advise(method("setLocationInfo"), classOf[HostLocationAdvice])


/**
Expand All @@ -104,8 +105,8 @@ class DriverInstrumentation extends InstrumentationBuilder {
"com.datastax.oss.driver.internal.core.cql.CqlPrepareHandler",
"com.datastax.oss.driver.internal.core.cql.CqlRequestHandler")
.mixin(classOf[HasContext.Mixin])
.advise(isConstructor(), OnRequestHandlerConstructorAdvice)
.advise(method("onThrottleReady"), OnThrottleReadyAdvice)
.advise(isConstructor(), classOf[OnRequestHandlerConstructorAdvice])
.advise(method("onThrottleReady"), classOf[OnThrottleReadyAdvice])

}

Expand All @@ -116,10 +117,11 @@ object DriverInstrumentation {
}
}

class OnRequestHandlerConstructorAdvice
object OnRequestHandlerConstructorAdvice {

@Advice.OnMethodExit()
def exit(@Advice.This requestHandler: HasContext, @Advice.Argument(0) req: Any): Unit = {
@static def exit(@Advice.This requestHandler: HasContext, @Advice.Argument(0) req: Any): Unit = {
val (operationName, statement) = req match {
case pr: PrepareRequest => (QueryOperations.QueryPrepareOperationName, pr.getQuery())
case ss: cql.SimpleStatement => (QueryOperations.QueryOperationName, ss.getQuery())
Expand Down Expand Up @@ -158,10 +160,11 @@ object OnRequestHandlerConstructorAdvice {
}
}

class OnThrottleReadyAdvice
object OnThrottleReadyAdvice {

@Advice.OnMethodEnter()
def enter(@Advice.This requestHandler: HasContext): Unit = {
@static def enter(@Advice.This requestHandler: HasContext): Unit = {
val querySpan = requestHandler.context.get(Span.Key)
querySpan.mark("cassandra.throttle.ready")
}
Expand Down
Loading

0 comments on commit 25386b2

Please sign in to comment.