Skip to content

Commit

Permalink
+ kafka: permit specifying custom propagation implementation (#1292)
Browse files Browse the repository at this point in the history
* + kamon: permit specifying custom propagation implementation
  • Loading branch information
hughsimpson authored Oct 3, 2023
1 parent 14cba52 commit 0539e62
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package kamon.instrumentation.kafka.client

import com.typesafe.config.Config
import kamon.Kamon
import kamon.{ClassLoading, Kamon}
import kamon.context.Context
import kamon.instrumentation.context.HasContext
import kamon.trace.Span
Expand Down Expand Up @@ -50,7 +50,10 @@ object KafkaInstrumentation {
log.warn("W3C TraceContext propagation should be used only with identifier-scheme = double")
}
SpanPropagation.W3CTraceContext()
case other => sys.error(s"Unrecognized option [$other] for the kamon.instrumentation.kafka.client.tracing.propagator config.")
case fqcn => try ClassLoading.createInstance[KafkaPropagator](fqcn) catch {
case t: Throwable =>
sys.error(s"Failed to create kafka propagator instance from FQCN [$fqcn]. Reason: ${t.getMessage}")
}
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,45 @@ class KafkaClientsTracingInstrumentationSpec extends AnyWordSpec with Matchers
span.parentId shouldBe sendingSpan.get.id
}
}

"create a Producer/Consumer Span when publish/consume a message with custom format" in new SpanReportingTestScope(reporter) {
applyConfig("kamon.instrumentation.kafka.client.tracing.propagator = kamon.instrumentation.kafka.testutil.CustomPropagationImplementation")

val testTopicName = "custom-context-propagation"
publishStringMessageToKafka(testTopicName, "Hello world!!!")

val consumedRecord = consumeFirstRawRecord(testTopicName)

consumedRecord.headers().lastHeader("x-trace-id").value() should not be empty
consumedRecord.headers().lastHeader("traceparent") shouldBe null
consumedRecord.headers().lastHeader("kctx") shouldBe null
consumedRecord.value() shouldBe "Hello world!!!"

awaitNumReportedSpans(2)

var sendingSpan: Option[Span.Finished] = None
assertReportedSpan(_.operationName == "producer.send") { span =>
span.metricTags.get(plain("component")) shouldBe "kafka.producer"
span.metricTags.get(plain("span.kind")) shouldBe "producer"
span.tags.get(plain("kafka.topic")) shouldBe testTopicName
span.tags.get(plain("kafka.key")) shouldBe KafkaInstrumentation.Keys.Null
span.tags.get(plainLong("kafka.partition")) shouldBe 0L
sendingSpan = Some(span)
}

assertReportedSpan(_.operationName == "consumer.process") { span =>
span.metricTags.get(plain("component")) shouldBe "kafka.consumer"
span.metricTags.get(plain("span.kind")) shouldBe "consumer"
span.tags.get(plain("kafka.topic")) shouldBe testTopicName
span.tags.get(plain("kafka.client-id")) should not be empty
span.tags.get(plain("kafka.group-id")) should not be empty
span.tags.get(plainLong("kafka.partition")) shouldBe 0L
span.tags.get(plainLong("kafka.timestamp")) shouldBe consumedRecord.timestamp()
span.tags.get(plain("kafka.timestamp-type")) shouldBe consumedRecord.timestampType().name
span.trace.id shouldBe sendingSpan.get.trace.id
span.parentId shouldBe sendingSpan.get.id
}
}
}

private def publishStringMessageToKafka(topicName: String, message: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package kamon.instrumentation.kafka.testutil

import kamon.context.Context
import kamon.instrumentation.kafka.client.KafkaPropagator
import kamon.trace.Trace.SamplingDecision
import kamon.trace.{Identifier, Span, Trace}

import org.apache.kafka.common.header.{Headers => KafkaHeaders}

class CustomPropagationImplementation extends KafkaPropagator {

override def read(medium: KafkaHeaders, context: Context): Context = {

val contextWithParent = for {
traceId <- Option(medium.lastHeader("x-trace-id")).map(_.value())
traceIdStr = new String(traceId, "utf-8")
spanId <- Option(medium.lastHeader("x-span-id")).map(_.value())
spanIdStr = new String(spanId, "utf-8")
sampled <- Option(medium.lastHeader("x-trace-sampled")).map(_.value()).map{
case Array(1) => SamplingDecision.Sample
case Array(0) => SamplingDecision.DoNotSample
case _ => SamplingDecision.Unknown
}
span = Span.Remote(Identifier(spanIdStr, spanId), Identifier.Empty, Trace(Identifier(traceIdStr, traceId), sampled))
} yield context.withEntry(Span.Key, span)

contextWithParent.getOrElse(context)
}

override def write(context: Context, medium: KafkaHeaders): Unit = {
val span = context.get(Span.Key)

if (span != Span.Empty) {
medium.add("x-trace-id", span.trace.id.string.getBytes("utf-8"))
medium.add("x-span-id", span.id.string.getBytes("utf-8"))
medium.add("x-trace-sampled", if (span.trace.samplingDecision == SamplingDecision.Sample) Array(1) else Array(0))
}
}
}

0 comments on commit 0539e62

Please sign in to comment.