Skip to content

Commit

Permalink
add opentelemetry metrics exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
hughsimpson committed Aug 4, 2023
1 parent cebcc5a commit 1b8eaa0
Show file tree
Hide file tree
Showing 11 changed files with 600 additions and 155 deletions.
6 changes: 4 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -812,8 +812,10 @@ lazy val `kamon-opentelemetry` = (project in file("reporters/kamon-opentelemetry
.settings(
crossScalaVersions += `scala_3_version`,
libraryDependencies ++= Seq(
"io.opentelemetry" % "opentelemetry-exporter-otlp-http-trace" % "1.13.0",
"io.opentelemetry" % "opentelemetry-exporter-otlp-trace" % "1.13.0",
"io.opentelemetry" % "opentelemetry-exporter-otlp-http-trace" % "1.14.0",
"io.opentelemetry" % "opentelemetry-exporter-otlp-trace" % "1.14.0",
"io.opentelemetry" % "opentelemetry-exporter-otlp-http-metrics" % "1.14.0",
"io.opentelemetry" % "opentelemetry-exporter-otlp-metrics" % "1.14.0",
// Compile-time dependency required in scala 3
"com.google.auto.value" % "auto-value-annotations" % "1.9" % "compile",

Expand Down
2 changes: 1 addition & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object BaseProject extends AutoPlugin {
val `scala_2.11_version` = "2.11.12"
val `scala_2.12_version` = "2.12.15"
val `scala_2.13_version` = "2.13.8"
val scala_3_version = "3.2.0"
val scala_3_version = "3.3.0"

// This installs the GPG signing key from the
setupGpg()
Expand Down
25 changes: 24 additions & 1 deletion reporters/kamon-opentelemetry/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,23 @@ kamon.otel {
attributes = ""
attributes = ${?OTEL_RESOURCE_ATTRIBUTES}

metrics {
endpoint = ${kamon.otel.endpoint}
full-endpoint = ${?OTEL_EXPORTER_OTLP_METRICS_ENDPOINT}

compression = ${kamon.otel.compression}
compression = ${?OTEL_EXPORTER_OTLP_METRICS_COMPRESSION}

headers = ${kamon.otel.headers}
headers = ${?OTEL_EXPORTER_OTLP_METRICS_HEADERS}

timeout = ${kamon.otel.timeout}
timeout = ${?OTEL_EXPORTER_OTLP_METRICS_TIMEOUT}

protocol = ${kamon.otel.protocol}
protocol = ${?OTEL_EXPORTER_OTLP_METRICS_PROTOCOL}
}

trace {
endpoint = ${kamon.otel.endpoint}
full-endpoint = ${?OTEL_EXPORTER_OTLP_TRACES_ENDPOINT}
Expand Down Expand Up @@ -64,7 +81,13 @@ kamon.modules {
otel-trace-reporter {
enabled = true
name = "OpenTelemetry Trace Reporter"
description = "Sends trace data to a OpenTelemetry server via gRPC"
description = "Sends trace data to a OpenTelemetry server via gRPC/REST+json"
factory = "kamon.otel.OpenTelemetryTraceReporter$Factory"
}
otel-metrics-reporter {
enabled = true
name = "OpenTelemetry Metrics Reporter"
description = "Sends metrics data to a OpenTelemetry server via gRPC/REST+json"
factory = "kamon.otel.OpenTelemetryMetricsReporter$Factory"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2013-2021 The Kamon Project <https://kamon.io>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kamon.otel

import io.opentelemetry.sdk.common.InstrumentationScopeInfo
import io.opentelemetry.sdk.metrics.data._
import io.opentelemetry.sdk.metrics.internal.data._
import io.opentelemetry.sdk.resources.Resource
import kamon.metric.Instrument.Snapshot
import kamon.metric.{Distribution, MeasurementUnit, MetricSnapshot, PeriodSnapshot}
import kamon.tag.Lookups
import kamon.trace.Span.TagKeys

import java.lang.{Double => JDouble, Long => JLong}
import java.time.Instant
import java.util.{Collection => JCollection}
import scala.collection.JavaConverters._

class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, from: Instant, to: Instant) {
private val fromNs = from.toEpochMilli * 1000000
private val toNs = to.toEpochMilli * 1000000

private def instrumentationScopeInfo(snapshot: MetricSnapshot[_, _]): InstrumentationScopeInfo = {
// logic for looking up the component doesn't really seem to make sense - to be compliant we should probably be grouping the metrics by component before calling this
InstrumentationScopeInfo.create(snapshot.instruments.headOption.flatMap(_.tags.get(Lookups.option(TagKeys.Component))) getOrElse "kamon-instrumentation", kamonVersion, null)
}

private def toString(unit: MeasurementUnit): String = unit.magnitude.name

def toGaugeDatum(g: Snapshot[Double]): DoublePointData = ImmutableDoublePointData.create(fromNs, toNs, SpanConverter.toAttributes(g.tags), g.value)

def toGaugeData(g: Seq[Snapshot[Double]]): GaugeData[DoublePointData] = ImmutableGaugeData.create(g.map(toGaugeDatum).asJava)

def convertGauge(gauge: MetricSnapshot.Values[Double]): MetricData =
ImmutableMetricData.createDoubleGauge(
resource,
instrumentationScopeInfo(gauge),
gauge.name,
gauge.description,
toString(gauge.settings.unit),
toGaugeData(gauge.instruments))

def toHistogramDatum(s: Snapshot[Distribution]): HistogramPointData =
ImmutableHistogramPointData.create(
fromNs,
toNs,
SpanConverter.toAttributes(s.tags),
JDouble valueOf s.value.sum.toDouble,
JDouble valueOf s.value.min.toDouble,
JDouble valueOf s.value.max.toDouble,
s.value.buckets.map(JDouble valueOf _.value.toDouble).asJava,
s.value.buckets.map(JLong valueOf _.frequency).asJava
)

def toHistogramData(any: Seq[Snapshot[Distribution]]): HistogramData =
ImmutableHistogramData.create(AggregationTemporality.CUMULATIVE, any.map(toHistogramDatum).asJava)

def convertHistogram(histogram: MetricSnapshot.Distributions): MetricData =
ImmutableMetricData.createDoubleHistogram(
resource,
instrumentationScopeInfo(histogram),
histogram.name,
histogram.description,
toString(histogram.settings.unit),
toHistogramData(histogram.instruments))

def toCounterDatum(g: Snapshot[Long]): LongPointData =
ImmutableLongPointData.create(fromNs, toNs, SpanConverter.toAttributes(g.tags), g.value)

def toCounterData(g: Seq[Snapshot[Long]]): SumData[LongPointData] =
ImmutableSumData.create(false, AggregationTemporality.CUMULATIVE, g.map(toCounterDatum).asJava)

def convertCounter(counter: MetricSnapshot.Values[Long]): MetricData =
ImmutableMetricData.createLongSum(
resource,
instrumentationScopeInfo(counter),
counter.name,
counter.description,
toString(counter.settings.unit),
toCounterData(counter.instruments))

}

/**
* Converts Kamon metrics to OpenTelemetry [[MetricData]]s
*/
private[otel] object MetricsConverter {
def convert(resource: Resource, kamonVersion: String)(metrics: PeriodSnapshot): JCollection[MetricData] = {
val converter = new WithResourceMetricsConverter(resource, kamonVersion, metrics.from, metrics.to)
val gauges = metrics.gauges.map(converter.convertGauge)
val histograms = (metrics.histograms ++ metrics.timers ++ metrics.rangeSamplers).map(converter.convertHistogram)
val counters = metrics.counters.map(converter.convertCounter)

(gauges ++ histograms ++ counters).asJava
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package kamon.otel

import com.typesafe.config.Config
import io.opentelemetry.sdk.resources.Resource
import kamon.Kamon
import kamon.status.Status
import kamon.tag.Tag
import org.slf4j.LoggerFactory

import java.net.URL
import java.time.Duration

case class OpenTelemetryConfiguration(protocol: String, endpoint: String, compressionEnabled: Boolean, headers: Seq[(String, String)], timeout: Duration)

object OpenTelemetryConfiguration {
private val logger = LoggerFactory.getLogger(classOf[OpenTelemetryConfiguration])

object Component extends Enumeration {
val Trace, Metrics = Value
type Component = Value
}

import Component._

/**
* Builds an otel configuration object using the provided typesafe configuration.
*
* @param config
* @return
*/
def apply(config: Config, component: Component): OpenTelemetryConfiguration = {
val name = component.toString.toLowerCase
val otelExporterConfig = config.getConfig(s"kamon.otel.$name")
val endpoint = otelExporterConfig.getString("endpoint")
val fullEndpoint = if (otelExporterConfig.hasPath("full-endpoint")) Some(otelExporterConfig.getString("full-endpoint")) else None
val compression = otelExporterConfig.getString("compression") match {
case "gzip" => true
case x =>
if (x != "") logger.warn(s"unrecognised compression $x. Defaulting to no compression")
false
}
val protocol = otelExporterConfig.getString("protocol") match {
case "http/protobuf" => "http/protobuf"
case "grpc" => "grpc"
case x =>
logger.warn(s"Unrecognised opentelemetry schema type $x. Defaulting to grpc")
"grpc"
}
val headers = otelExporterConfig.getString("headers").split(',').filter(_.nonEmpty).map(_.split("=", 2)).map {
case Array(k) => k -> ""
case Array(k, v) => k -> v
}.toSeq
val timeout = otelExporterConfig.getDuration("timeout")
// See https://opentelemetry.io/docs/reference/specification/protocol/exporter/#endpoint-urls-for-otlphttp
val httpSuffix = component match {
case Trace => "traces"
case Metrics => "metrics"
}
val url = (protocol, fullEndpoint) match {
case ("http/protobuf", Some(full)) =>
val parsed = new URL(full)
if (parsed.getPath.isEmpty) full :+ '/' else full
// Seems to be some dispute as to whether the / should technically be added in the case that the base path doesn't
// include it. Adding because it's probably what's desired most of the time, and can always be overridden by full-endpoint
case ("http/protobuf", None) => if (endpoint.endsWith("/")) s"${endpoint}v1/$httpSuffix" else s"$endpoint/v1/$httpSuffix"
case (_, Some(full)) => full
case (_, None) => endpoint
}

logger.info(s"Configured endpoint for OpenTelemetry $name reporting [$url] using $protocol protocol")

OpenTelemetryConfiguration(protocol, url, compression, headers, timeout)
}

private val kamonSettings: Status.Settings = Kamon.status().settings()
/**
* Builds the resource information added as resource labels to the exported metrics/traces
*
* @return
*/
def buildResource(attributes: Map[String, String]): Resource = {
val env = Kamon.environment
val builder = Resource.builder()
.put("host.name", kamonSettings.environment.host)
.put("service.instance.id", kamonSettings.environment.instance)
.put("service.name", env.service)
.put("telemetry.sdk.name", "kamon")
.put("telemetry.sdk.language", "scala")
.put("telemetry.sdk.version", kamonSettings.version)

attributes.foreach { case (k, v) => builder.put(k, v) }
//add all kamon.environment.tags as KeyValues to the Resource object
env.tags.iterator().foreach {
case t: Tag.String => builder.put(t.key, t.value)
case t: Tag.Boolean => builder.put(t.key, t.value)
case t: Tag.Long => builder.put(t.key, t.value)
}

builder.build()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2013-2021 The Kamon Project <https://kamon.io>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kamon.otel

import com.typesafe.config.Config
import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.resources.Resource
import kamon.Kamon
import kamon.metric.PeriodSnapshot
import kamon.module.{MetricReporter, Module, ModuleFactory}
import kamon.status.Status
import kamon.tag.Tag
import org.slf4j.LoggerFactory

import java.net.URLDecoder
import java.util
import java.util.{Collection => JCollection}
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success, Try}

object OpenTelemetryMetricsReporter {
private val logger = LoggerFactory.getLogger(classOf[OpenTelemetryMetricsReporter])
private val kamonSettings: Status.Settings = Kamon.status().settings()

class Factory extends ModuleFactory {
override def create(settings: ModuleFactory.Settings): Module = {
logger.info("Creating OpenTelemetry Metrics Reporter")

val module = new OpenTelemetryMetricsReporter(OtlpMetricsService.apply)(settings.executionContext)
module.reconfigure(settings.config)
module
}
}
}

import kamon.otel.OpenTelemetryMetricsReporter._

/**
* Converts internal Kamon metrics to OpenTelemetry format and sends to a configured OpenTelemetry endpoint using gRPC or REST.
*/
class OpenTelemetryMetricsReporter(metricsServiceFactory: Config => MetricsService)(implicit ec: ExecutionContext) extends MetricReporter {
private var metricsService: Option[MetricsService] = None
private var metricsConverterFunc: PeriodSnapshot => JCollection[MetricData] = (_ => new util.ArrayList[MetricData](0))

def isEmpty(snapshot: PeriodSnapshot): Boolean =
snapshot.gauges.isEmpty && snapshot.timers.isEmpty && snapshot.counters.isEmpty && snapshot.histograms.isEmpty && snapshot.rangeSamplers.isEmpty

override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {
if (!isEmpty(snapshot)) {
metricsService.foreach(ts => ts.exportMetrics(metricsConverterFunc(snapshot)).onComplete {
case Success(_) => logger.debug("Successfully exported metrics")

//TODO is there result for which a retry is relevant? Perhaps a glitch in the receiving service
//Keeping logs to debug as the underlying exporter will log if it fails to export metrics, and the failure isn't surfaced in the response anyway
case Failure(t) => logger.debug("Failed to export metrics", t)
})
}
}

override def reconfigure(newConfig: Config): Unit = {
logger.info("Reconfigure OpenTelemetry Metrics Reporter")

//pre-generate the function for converting Kamon metrics to proto metrics
val attributes: Map[String, String] =
newConfig.getString("kamon.otel.attributes").split(',').filter(_ contains '=').map(_.trim.split("=", 2)).map {
case Array(k, v) =>
val decoded = Try(URLDecoder.decode(v.trim, "UTF-8"))
decoded.failed.foreach(t => throw new IllegalArgumentException(s"value for attribute ${k.trim} is not a url-encoded string", t))
k.trim -> decoded.get
}.toMap
val resource: Resource = OpenTelemetryConfiguration.buildResource(attributes)
this.metricsConverterFunc = MetricsConverter.convert(resource, kamonSettings.version)

this.metricsService = Option(metricsServiceFactory.apply(newConfig))
}

override def stop(): Unit = {
logger.info("Stopping OpenTelemetry Metrics Reporter")
this.metricsService.foreach(_.close())
this.metricsService = None
}

}
Loading

0 comments on commit 1b8eaa0

Please sign in to comment.