Skip to content

Commit

Permalink
Merge pull request #1052 from SimunKaracic/rediscala-initial
Browse files Browse the repository at this point in the history
Rediscala initial
  • Loading branch information
SimunKaracic committed Jul 19, 2021
2 parents ba3eb83 + 3ed143e commit 21117d4
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 66 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ lazy val `kamon-redis` = (project in file("instrumentation/kamon-redis"))
kanelaAgent % "provided",
"redis.clients" % "jedis" % "3.6.0" % "provided",
"io.lettuce" % "lettuce-core" % "6.1.2.RELEASE" % "provided",
"com.github.etaty" %% "rediscala" % "1.9.0" % "provided",

scalatest % "test",
logbackClassic % "test",
Expand Down
15 changes: 14 additions & 1 deletion instrumentation/kamon-redis/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,16 +1,29 @@
kanela.modules {
redis {
name = "Redis Instrumentation"
description = "Provides tracing for Jedis and Lettuce libraries"
description = "Provides tracing for Jedis, Lettuce and Rediscala libraries"

instrumentations = [
"kamon.instrumentation.jedis.JedisInstrumentation",
"kamon.instrumentation.lettuce.LettuceInstrumentation",
"kamon.instrumentation.rediscala.RediscalaInstrumentation",
]

within = [
"redis.clients.jedis.Protocol",
"io.lettuce.core..*",
"redis..*",
]
}
}


# when using multiple clients, the extension will be alphabetical
# e.g. $a, $b, $c.
# so add exclude clauses as needed
kamon.instrumentation.akka.filters.actors.trace {
excludes += "*/user/RedisClient-$a/**"
excludes += "*/user/RedisClient-$a"
excludes += "*/user/RedisBlockingClient-$a/**"
excludes += "*/user/RedisBlockingClient-$a"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package kamon.instrumentation.rediscala

import kamon.Kamon
import kamon.trace.Span
import kamon.util.CallingThreadExecutionContext
import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.libs.net.bytebuddy.asm.Advice

import scala.concurrent.Future
import scala.util.{Failure, Success}

class RediscalaInstrumentation extends InstrumentationBuilder {
onTypes("redis.Request", "redis.ActorRequest", "redis.BufferedRequest",
"redis.commands.BLists", "redis.RoundRobinPoolRequest", "ActorRequest")
.advise(method("send").and(takesArguments(1)), classOf[RequestInstrumentation])

onTypes("redis.ActorRequest$class")
.advise(method("send"), classOf[ActorRequestAdvice])

}

class RequestInstrumentation
object RequestInstrumentation {
@Advice.OnMethodEnter()
def enter(@Advice.Argument(0) command: Any): Span = {
val spanName = s"redis.command.${command.getClass.getSimpleName}"

Kamon.clientSpanBuilder(spanName, "redis.client.rediscala")
.start()
}

@Advice.OnMethodExit(onThrowable = classOf[Throwable], suppress = classOf[Throwable])
def exit(@Advice.Enter span: Span,
@Advice.Thrown t: Throwable,
@Advice.Return future: Future[_]) = {
if (t != null) {
span.fail(t);
}

future.onComplete {
case Success(_value) =>
span.finish()

case Failure(exception) =>
span.fail(exception)
span.finish()

}(CallingThreadExecutionContext)
}
}

class RoundRobinRequestInstrumentation

object RoundRobinRequestInstrumentation {
@Advice.OnMethodEnter()
def enter(@Advice.Argument(1) command: Any): Span = {
println("Entering round robin")
val spanName = s"redis.command.${command.getClass.getSimpleName}"
Kamon.clientSpanBuilder(spanName, "redis.client.rediscala")
.start()
}

@Advice.OnMethodExit(onThrowable = classOf[Throwable], suppress = classOf[Throwable])
def exit(@Advice.Enter span: Span,
@Advice.Thrown t: Throwable,
@Advice.Return future: Future[_]) = {
println("Exiting round robin")
if (t != null) {
span.fail(t);
}

future.onComplete {
case Success(_value) =>
span.finish()

case Failure(exception) =>
span.fail(exception)
span.finish()

}(CallingThreadExecutionContext)
}
}

class ActorRequestAdvice
object ActorRequestAdvice {
@Advice.OnMethodEnter()
def enter(@Advice.Argument(1) command: Any): Span = {
val spanName = s"redis.command.${command.getClass.getSimpleName}"
Kamon.clientSpanBuilder(spanName, "redis.client.rediscala")
.start()
}

@Advice.OnMethodExit(onThrowable = classOf[Throwable], suppress = classOf[Throwable])
def exit(@Advice.Enter span: Span,
@Advice.Thrown t: Throwable,
@Advice.Return future: Future[_]) = {
if (t != null) {
span.fail(t);
}

future.onComplete {
case Success(_value) =>
span.finish()

case Failure(exception) =>
span.fail(exception)
span.finish()

}(CallingThreadExecutionContext)
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
package kamon.instrumentation.lettuce
package kamon.instrumentation.combined

import io.lettuce.core.RedisClient

import io.lettuce.core.{RedisClient => LettuceClient}
import kamon.testkit.{MetricInspection, TestSpanReporter}
import kamon.trace.Span.Kind
import org.scalatest.concurrent.{Eventually, ScalaFutures}
import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpec}
import org.slf4j.LoggerFactory
import org.testcontainers.containers.GenericContainer
import org.testcontainers.utility.DockerImageName
import redis.clients.jedis.Jedis
import redis.{RedisBlockingClient, RedisClient}

import java.time.Duration
import scala.concurrent.duration.DurationInt
import scala.util.control.NonFatal

class LettuceInstrumentationSpec extends WordSpec

class RedisInstrumentationsSpec extends WordSpec
with Matchers
with ScalaFutures
with Eventually
Expand All @@ -21,20 +26,45 @@ class LettuceInstrumentationSpec extends WordSpec
with OptionValues
with TestSpanReporter {

private val logger = LoggerFactory.getLogger(classOf[RedisInstrumentationsSpec])
var container: GenericContainer[Nothing] = _

override def beforeAll: Unit = {
val REDIS_IMAGE = DockerImageName.parse("redis")
container = new GenericContainer(REDIS_IMAGE).withExposedPorts(6379)

container.start()
}

override def afterAll: Unit = {
container.stop()
}

"the Jedis instrumentation" should {
"generate a client span for get and set commands" in {
val jedis = new Jedis(container.getHost, container.getFirstMappedPort)
jedis.set("foo", "bar")

eventually(timeout(2.seconds)) {
val span = testSpanReporter().nextSpan().get
span.operationName shouldBe "redis.command.SET"
span.kind shouldBe Kind.Client
}

testSpanReporter().clear()

jedis.get("foo")
eventually(timeout(2.seconds)) {
val span = testSpanReporter().nextSpan().get
span.operationName shouldBe "redis.command.GET"
span.kind shouldBe Kind.Client
}
}
}

"the Lettuce instrumentation" should {
"generate a client span for async commands" in {
val client = RedisClient.create(s"redis://${container.getHost}:${container.getFirstMappedPort}")
val client = LettuceClient.create(s"redis://${container.getHost}:${container.getFirstMappedPort}")
val connection = client.connect
val asyncCommands = connection.async()
asyncCommands.set("key", "Hello, Redis!")
Expand All @@ -49,7 +79,7 @@ class LettuceInstrumentationSpec extends WordSpec
}

"generate a client span for sync commands" in {
val client = RedisClient.create(s"redis://${container.getHost}:${container.getFirstMappedPort}")
val client = LettuceClient.create(s"redis://${container.getHost}:${container.getFirstMappedPort}")
val connection = client.connect
val commands = connection.sync()

Expand All @@ -65,7 +95,7 @@ class LettuceInstrumentationSpec extends WordSpec
}

"fail a span that times out" in {
val client = RedisClient.create(s"redis://${container.getHost}:${container.getFirstMappedPort}")
val client = LettuceClient.create(s"redis://${container.getHost}:${container.getFirstMappedPort}")
client.setDefaultTimeout(Duration.ofNanos(1))
val connection = client.connect
val commands = connection.sync()
Expand All @@ -86,4 +116,32 @@ class LettuceInstrumentationSpec extends WordSpec
client.shutdown()
}
}

"the Rediscala instrumentation" should {
implicit val akkaSystem = akka.actor.ActorSystem()
"generate only one client span for commands" in {
val client = RedisClient(host = container.getHost, port = container.getFirstMappedPort)
client.set("a", "a")

eventually(timeout(30.seconds)) {
val span = testSpanReporter().nextSpan().value
span.operationName shouldBe "redis.command.Set"
span.hasError shouldBe false
}
client.shutdown()
}

"generate only one client span when using the blocking client" in {
val blockingClient = RedisBlockingClient(host = container.getHost, port = container.getFirstMappedPort)
blockingClient.blpop(Seq("a", "b", "c"))

eventually(timeout(30.seconds)) {
val span = testSpanReporter().nextSpan().value
span.operationName shouldBe "redis.command.Blpop"
span.hasError shouldBe true
}
blockingClient.stop()
}

}
}

This file was deleted.

0 comments on commit 21117d4

Please sign in to comment.