Skip to content

Commit

Permalink
kamon-redis supports redisson client
Browse files Browse the repository at this point in the history
  • Loading branch information
Whitilied committed Aug 8, 2022
1 parent 1212250 commit cfcf801
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 1 deletion.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ lazy val `kamon-redis` = (project in file("instrumentation/kamon-redis"))
"redis.clients" % "jedis" % "3.6.0" % "provided",
"io.lettuce" % "lettuce-core" % "6.1.2.RELEASE" % "provided",
"com.github.etaty" %% "rediscala" % "1.9.0" % "provided",
"org.redisson" % "redisson" % "3.11.6" % "provided",

scalatest % "test",
logbackClassic % "test",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
kanela.modules {
redis {
name = "Redis Instrumentation"
description = "Provides tracing for Jedis, Lettuce and Rediscala libraries"
description = "Provides tracing for Jedis, Lettuce, Rediscala and Redisson libraries"

instrumentations = [
"kamon.instrumentation.jedis.JedisInstrumentation",
"kamon.instrumentation.lettuce.LettuceInstrumentation",
"kamon.instrumentation.rediscala.RediscalaInstrumentation",
"kamon.instrumentation.redisson.RedissonInstrumentation",
]

within = [
"redis.clients.jedis..*",
"io.lettuce.core..*",
"redis..*",
"org.redisson..*",
]
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package kamon.instrumentation.redisson

import io.netty.buffer.ByteBuf
import io.netty.channel.{ChannelFuture, ChannelFutureListener}
import kamon.Kamon
import kamon.trace.Span
import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.libs.net.bytebuddy.asm.Advice
import org.redisson.client.protocol.{CommandData, CommandsData}

import scala.collection.JavaConverters._

class RedissonInstrumentation extends InstrumentationBuilder {

onType("org.redisson.client.RedisConnection")
.advise(method("send").and(takesArguments(1)), classOf[RedisConnectionInstrumentation])

}

class RedisConnectionInstrumentation

object RedisConnectionInstrumentation {
@Advice.OnMethodEnter()
def enter(@Advice.Argument(0) command: Any): Span = {

def parseCommand(command: CommandData[_, _]): String = {
command.getParams.map {
case _: ByteBuf => ""
case bytes => String.valueOf(bytes)
}.filter(_.nonEmpty).mkString(" ")
}

val (commandName, statements) = command match {
case commands: CommandsData =>
val spanName = "redis.command.batch_execute"
val statements = commands.getCommands.asScala.map(parseCommand).mkString(",")
(spanName, statements)
case command: CommandData[_, _] =>
val spanName = s"redis.command.${command.getCommand.getName}"
val statement = parseCommand(command)
(spanName, statement)
}

Kamon.clientSpanBuilder(commandName, "redisson")
.tag("db.statement", statements)
.tagMetrics("db.system", "redis")
.start()
}

@Advice.OnMethodExit(onThrowable = classOf[Throwable], suppress = classOf[Throwable])
def exit(@Advice.Enter span: Span,
@Advice.Thrown t: Throwable,
@Advice.Return future: ChannelFuture) = {
if (t != null) {
span.fail(t)
}
future.addListener(new ChannelFutureListener() {
override def operationComplete(future: ChannelFuture): Unit = {
if (future.isSuccess) {
span.finish()
} else {
span.fail(future.cause())
span.finish()
}
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import kamon.tag.Lookups
import kamon.tag.Lookups._
import kamon.testkit.{InitAndStopKamonAfterAll, MetricInspection, TestSpanReporter}
import kamon.trace.Span.Kind
import org.redisson.Redisson
import org.redisson.api.RBloomFilter
import org.redisson.config.Config
import org.scalatest.concurrent.{Eventually, ScalaFutures}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
Expand Down Expand Up @@ -129,6 +132,82 @@ class RedisInstrumentationsSpec extends AnyWordSpec
}
}

"the Redisson instrumentation" should {
"generate a client span for set operation" in {
val config: Config = new Config()
config.useSingleServer().setAddress(s"redis://${container.getHost}:${container.getFirstMappedPort}")
val redisson = Redisson.create(config)

val mySet = redisson.getSet[String]("redisson:foo")
mySet.add("bar")

eventually(timeout(30.seconds)) {
val span = testSpanReporter().nextSpan().get
span.kind shouldBe Kind.Client
span.operationName shouldBe "redis.command.SADD"
span.metricTags.get(plain("db.system")) shouldBe "redis"
testSpanReporter().spans() shouldBe empty
testSpanReporter().clear()
}
redisson.shutdown()
}

"generate a client span for blocking queue operation" in {
val config: Config = new Config()
config.useSingleServer().setAddress(s"redis://${container.getHost}:${container.getFirstMappedPort}")
val redisson = Redisson.create(config)

val queue = redisson.getBlockingQueue[String]("myQueue")
queue.add("1")
queue.add("2")
queue.add("3")
queue.add("4")

queue.contains("1")
queue.peek()
queue.poll()
queue.element()

eventually(timeout(30.seconds)) {
val span = testSpanReporter().nextSpan().get
span.kind shouldBe Kind.Client
span.metricTags.get(plain("db.system")) shouldBe "redis"
testSpanReporter().spans() shouldBe empty
testSpanReporter().clear()
}
redisson.shutdown()
}

"generate a client span for bloom filter operation" in {
val config: Config = new Config()
config.useSingleServer().setAddress(s"redis://${container.getHost}:${container.getFirstMappedPort}")
val redisson = Redisson.create(config)

val bloomFilter: RBloomFilter[String] = redisson.getBloomFilter("bloomFilter")
bloomFilter.tryInit(100000000, 0.03)

bloomFilter.add("a")
bloomFilter.add("b")
bloomFilter.add("c")
bloomFilter.add("d")

bloomFilter.getExpectedInsertions
bloomFilter.getFalseProbability
bloomFilter.getHashIterations

bloomFilter.contains("a")

eventually(timeout(30.seconds)) {
val span = testSpanReporter().nextSpan().get
span.kind shouldBe Kind.Client
span.metricTags.get(plain("db.system")) shouldBe "redis"
testSpanReporter().spans() shouldBe empty
testSpanReporter().clear()
}
redisson.shutdown()
}
}

"the Rediscala instrumentation" should {
implicit val akkaSystem = akka.actor.ActorSystem()
"generate only one client span for commands" in {
Expand Down

0 comments on commit cfcf801

Please sign in to comment.