From c2432ea56edb6fc1add61e3b16ce4420d7ca9db7 Mon Sep 17 00:00:00 2001 From: Hugh Simpson Date: Mon, 4 Mar 2024 07:37:48 +0000 Subject: [PATCH] add opentelemetry metrics exporter --- build.sbt | 6 +- .../src/main/resources/reference.conf | 71 +++- .../scala/kamon/otel/MetricsConverter.scala | 377 ++++++++++++++++++ .../otel/OpenTelemetryConfiguration.scala | 137 +++++++ .../otel/OpenTelemetryMetricsReporter.scala | 167 ++++++++ .../otel/OpenTelemetryTraceReporter.scala | 69 +--- .../src/main/scala/kamon/otel/Services.scala | 142 +++++++ .../main/scala/kamon/otel/TraceService.scala | 130 ------ .../src/test/resources/application.conf | 14 +- .../OpenTelemetryMetricReporterSpec.scala | 287 +++++++++++++ .../scala/kamon/otel/TraceServiceSpec.scala | 11 +- 11 files changed, 1220 insertions(+), 191 deletions(-) create mode 100644 reporters/kamon-opentelemetry/src/main/scala/kamon/otel/MetricsConverter.scala create mode 100644 reporters/kamon-opentelemetry/src/main/scala/kamon/otel/OpenTelemetryConfiguration.scala create mode 100644 reporters/kamon-opentelemetry/src/main/scala/kamon/otel/OpenTelemetryMetricsReporter.scala create mode 100644 reporters/kamon-opentelemetry/src/main/scala/kamon/otel/Services.scala delete mode 100644 reporters/kamon-opentelemetry/src/main/scala/kamon/otel/TraceService.scala create mode 100644 reporters/kamon-opentelemetry/src/test/scala/kamon/otel/OpenTelemetryMetricReporterSpec.scala diff --git a/build.sbt b/build.sbt index f520304a3..401ba35cc 100644 --- a/build.sbt +++ b/build.sbt @@ -963,8 +963,10 @@ lazy val `kamon-opentelemetry` = (project in file("reporters/kamon-opentelemetry .disablePlugins(AssemblyPlugin) .settings( libraryDependencies ++= Seq( - "io.opentelemetry" % "opentelemetry-exporter-otlp-http-trace" % "1.13.0", - "io.opentelemetry" % "opentelemetry-exporter-otlp-trace" % "1.13.0", + "io.opentelemetry" % "opentelemetry-exporter-otlp-http-trace" % "1.14.0", + "io.opentelemetry" % "opentelemetry-exporter-otlp-trace" % "1.14.0", + "io.opentelemetry" % "opentelemetry-exporter-otlp-http-metrics" % "1.14.0", + "io.opentelemetry" % "opentelemetry-exporter-otlp-metrics" % "1.14.0", // Compile-time dependency required in scala 3 "com.google.auto.value" % "auto-value-annotations" % "1.9" % "compile", diff --git a/reporters/kamon-opentelemetry/src/main/resources/reference.conf b/reporters/kamon-opentelemetry/src/main/resources/reference.conf index 0d9a3c31b..d16a6440e 100644 --- a/reporters/kamon-opentelemetry/src/main/resources/reference.conf +++ b/reporters/kamon-opentelemetry/src/main/resources/reference.conf @@ -27,6 +27,27 @@ kamon.otel { attributes = "" attributes = ${?OTEL_RESOURCE_ATTRIBUTES} + metrics { + endpoint = ${kamon.otel.endpoint} + full-endpoint = ${?OTEL_EXPORTER_OTLP_METRICS_ENDPOINT} + + compression = ${kamon.otel.compression} + compression = ${?OTEL_EXPORTER_OTLP_METRICS_COMPRESSION} + + headers = ${kamon.otel.headers} + headers = ${?OTEL_EXPORTER_OTLP_METRICS_HEADERS} + + timeout = ${kamon.otel.timeout} + timeout = ${?OTEL_EXPORTER_OTLP_METRICS_TIMEOUT} + + protocol = ${kamon.otel.protocol} + protocol = ${?OTEL_EXPORTER_OTLP_METRICS_PROTOCOL} + + # explicit_bucket_histogram or base2_exponential_bucket_histogram + histogram-format = explicit_bucket_histogram + histogram-format = ${?OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION} + } + trace { endpoint = ${kamon.otel.endpoint} full-endpoint = ${?OTEL_EXPORTER_OTLP_TRACES_ENDPOINT} @@ -47,6 +68,48 @@ kamon.otel { # standard attribute names; enable for 'more full' compliance with otel standard include-error-event = false } + + explicit-histo-boundaries { + # Same as defaults from https://opentelemetry.io/docs/specs/otel/metrics/sdk/#explicit-bucket-histogram-aggregation + default-buckets = [ + 0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000 + ] + + # The following are the same as for the prometheus reporter default values + time-buckets = [ + 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10 + ] + + information-buckets = [ + 512, 1024, 2048, 4096, 16384, 65536, 524288, 1048576 + ] + + percentage-buckets = [ + 20, 40, 60, 70, 80, 90, 95 + ] + + # Per metric overrides are possible by specifying the metric name and the histogram buckets here + custom { + // example: + // "akka.actor.processing-time" = [0.1, 1.0, 10.0] + } + } + + exponential-histo-boundaries { + default-bucket-count = 160 + + time-bucket-count = 160 + + information-bucket-count = 160 + + percentage-bucket-count = 100 + + # Per metric overrides are possible by specifying the metric name and the histogram buckets here + custom { + // example: + // "akka.actor.processing-time" = 3 + } + } } # Arbitrary key-value pairs that further identify the environment where this service instance is running. @@ -64,7 +127,13 @@ kamon.modules { otel-trace-reporter { enabled = true name = "OpenTelemetry Trace Reporter" - description = "Sends trace data to a OpenTelemetry server via gRPC" + description = "Sends trace data to a OpenTelemetry server via gRPC/HTTP+protobuf" factory = "kamon.otel.OpenTelemetryTraceReporter$Factory" } + otel-metrics-reporter { + enabled = true + name = "OpenTelemetry Metrics Reporter" + description = "Sends metrics data to a OpenTelemetry server via gRPC/HTTP+protobuf" + factory = "kamon.otel.OpenTelemetryMetricsReporter$Factory" + } } diff --git a/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/MetricsConverter.scala b/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/MetricsConverter.scala new file mode 100644 index 000000000..d7c73e6a1 --- /dev/null +++ b/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/MetricsConverter.scala @@ -0,0 +1,377 @@ +/* + * Copyright 2013-2021 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kamon.otel + +import io.opentelemetry.sdk.common.InstrumentationScopeInfo +import io.opentelemetry.sdk.metrics.data._ +import io.opentelemetry.sdk.metrics.internal.data._ +import io.opentelemetry.sdk.metrics.internal.data.exponentialhistogram.{ + ExponentialHistogramBuckets, + ExponentialHistogramData, + ExponentialHistogramPointData, + ImmutableExponentialHistogramData +} +import io.opentelemetry.sdk.resources.Resource +import kamon.metric.Instrument.Snapshot +import kamon.metric.{Distribution, MeasurementUnit, MetricSnapshot, PeriodSnapshot} +import kamon.otel.HistogramFormat.{Explicit, Exponential, HistogramFormat} +import kamon.otel.MetricsConverter.{ExplBucketFn, ExpoBucketFn} +import org.slf4j.LoggerFactory + +import java.lang.{Double => JDouble, Long => JLong} +import java.time.Instant +import java.util +import java.util.{ArrayList => JArrayList, Collection => JCollection} +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +class WithResourceMetricsConverter( + resource: Resource, + kamonVersion: String, + from: Instant, + to: Instant, + explBucketConfig: ExplBucketFn, + expoBucketConfig: ExpoBucketFn +) { + private val maxDouble: JDouble = JDouble.valueOf(JDouble.MAX_VALUE) + private val logger = LoggerFactory.getLogger(getClass) + private val fromNs = from.toEpochMilli * 1000000 + private val toNs = to.toEpochMilli * 1000000 + + private def instrumentationScopeInfo(snapshot: MetricSnapshot[_, _]): InstrumentationScopeInfo = + InstrumentationScopeInfo.create("kamon-metrics", kamonVersion, null) + + private def toString(unit: MeasurementUnit): String = unit.magnitude.name + + private def toGaugeDatum(g: Snapshot[Double]): DoublePointData = + ImmutableDoublePointData.create(fromNs, toNs, SpanConverter.toAttributes(g.tags), g.value) + + private def toGaugeData(g: Seq[Snapshot[Double]]): GaugeData[DoublePointData] = + ImmutableGaugeData.create(g.map(toGaugeDatum).asJava) + + def convertGauge(gauge: MetricSnapshot.Values[Double]): MetricData = + ImmutableMetricData.createDoubleGauge( + resource, + instrumentationScopeInfo(gauge), + gauge.name, + gauge.description, + toString(gauge.settings.unit), + toGaugeData(gauge.instruments) + ) + + private def getExplBucketCounts(bucketConfiguration: Seq[JDouble])(s: Snapshot[Distribution]) = { + val counts = ArrayBuffer.newBuilder[JLong] + val boundaryIterator: Iterator[JDouble] = (bucketConfiguration :+ maxDouble).iterator + var nextBoundary = boundaryIterator.next() + var inBucketCount = 0L + for (el <- s.value.bucketsIterator) { + while (el.value > nextBoundary) { + nextBoundary = boundaryIterator.next() + counts += inBucketCount + inBucketCount = 0L + } + inBucketCount += el.frequency + } + while (boundaryIterator.hasNext) { + counts += inBucketCount + boundaryIterator.next() + inBucketCount = 0L + } + counts += inBucketCount + counts + } + + private def toExplicitHistogramDatum(bucketConfiguration: Seq[JDouble])(s: Snapshot[Distribution]) + : HistogramPointData = { + val counts = getExplBucketCounts(bucketConfiguration)(s) + ImmutableHistogramPointData.create( + fromNs, + toNs, + SpanConverter.toAttributes(s.tags), + JDouble valueOf s.value.sum.toDouble, + JDouble valueOf s.value.min.toDouble, + JDouble valueOf s.value.max.toDouble, + bucketConfiguration.asJava, + counts.result().asJava + ) + } + + private def toExplicitHistogramData( + bucketConfiguration: Seq[JDouble], + distributions: Seq[Snapshot[Distribution]] + ): Option[HistogramData] = + distributions.filter(_.value.buckets.nonEmpty) match { + case Nil => None + case nonEmpty => Some(ImmutableHistogramData.create( + AggregationTemporality.DELTA, + nonEmpty.map(toExplicitHistogramDatum(bucketConfiguration)).asJava + )) + } + + def convertExplicitHistogram(histogram: MetricSnapshot.Distributions): Option[MetricData] = { + val bucketConfiguration = explBucketConfig(histogram.name, histogram.settings.unit) + toExplicitHistogramData(bucketConfiguration, histogram.instruments).map(d => + ImmutableMetricData.createDoubleHistogram( + resource, + instrumentationScopeInfo(histogram), + histogram.name, + histogram.description, + toString(histogram.settings.unit), + d + ) + ) + } + + class ItWithLast[T](it: Iterator[T], last: T) extends Iterator[T] { + private var showedLast: Boolean = false + + def hasNext: Boolean = it.hasNext || !showedLast + + def next(): T = if (it.hasNext) it.next() + else if (!showedLast) { + showedLast = true + last + } else throw new RuntimeException("Next on empty Iterator") + } + + private def getExpoBucketCounts(scale: Int, maxBucketCount: Int)(s: Snapshot[Distribution]) = { + val base = Math.pow(2, Math.pow(2, -scale)) + val lowerBoundaryIterator: Iterator[Double] = + ((-maxBucketCount to maxBucketCount).map(i => Math.pow(base, i)) :+ Double.MaxValue).iterator + val valuesIterator = new ItWithLast[Distribution.Bucket]( + s.value.bucketsIterator, + new Distribution.Bucket { + def value: Long = Long.MaxValue + + def frequency: Long = 0 + } + ) + var fromLowerBound = valuesIterator.next() + var fromUpperBound = valuesIterator.next() + var toLowerBound = lowerBoundaryIterator.next() + var toUpperBound = lowerBoundaryIterator.next() + var zeroCount: JLong = 0L + var countInBucket = 0L + + val negativeCounts = ArrayBuffer.newBuilder[JLong] + val positiveCounts = ArrayBuffer.newBuilder[JLong] + + def iterFrom: JLong = { + val d = fromLowerBound.frequency + fromLowerBound = fromUpperBound + fromUpperBound = valuesIterator.next() + d + } + + def iterTo: JLong = { + toLowerBound = toUpperBound + toUpperBound = lowerBoundaryIterator.next() + val res = countInBucket + countInBucket = 0 + res + } + // normal case + while (lowerBoundaryIterator.hasNext && valuesIterator.hasNext) { + if (fromUpperBound.value <= toLowerBound) { + countInBucket += iterFrom // Or drop? + } else if (fromLowerBound.value >= toUpperBound) toLowerBound match { + case 1 => zeroCount += iterTo + case b if b < 1 => negativeCounts += iterTo + case b if b > 1 => positiveCounts += iterTo + } + else if (fromUpperBound.value == toUpperBound) toLowerBound match { + case 1 => + countInBucket += iterFrom + zeroCount += iterTo + case b if b < 1 => + countInBucket += iterFrom + negativeCounts += iterTo + case b if b > 1 => + countInBucket += iterFrom + positiveCounts += iterTo + } + else if (fromUpperBound.value > toUpperBound) { + val firstBonus: JLong = countInBucket + var negBuckets = 0 + var zeroBuckets = 0 + var posBuckets = 0 + while (fromUpperBound.value > toUpperBound && lowerBoundaryIterator.hasNext) { + if (toLowerBound < 1) negBuckets += 1 + else if (toLowerBound == 1) zeroBuckets += 1 + else if (toLowerBound >= 1) posBuckets += 1 + toLowerBound = toUpperBound + toUpperBound = lowerBoundaryIterator.next() + } + val total = iterFrom + // Not sure about this... everything's going into the first bucket, even though we might be spanning multiple target buckets. + // Might be better to do something like push the avg.floor into each bucket, interpolating the remainder. + // OTOH it may not really come up much in practice, since the internal histos are likely to have similar or finer granularity + negativeCounts ++= (if (negBuckets > 0) + JLong.valueOf(firstBonus + total) +: Array.fill(negBuckets - 1)(JLong.valueOf(0)) + else Nil) + zeroCount += (if (negBuckets == 0 && zeroBuckets == 1) JLong.valueOf(firstBonus + total) else JLong.valueOf(0)) + positiveCounts ++= ( + if (negBuckets == 0 && zeroBuckets == 0 && posBuckets > 0) + JLong.valueOf(firstBonus + total) +: Array.fill(posBuckets - 1)(JLong.valueOf(0)) + else Array.fill(posBuckets)(JLong.valueOf(0)) + ) + } else /*if (fromUpperBound.value < toUpperBound) */ toLowerBound match { + case 1 => zeroCount += iterFrom + case _ => countInBucket += iterFrom + } + } + var usedLastValue = false + // more buckets left to fill but only one unused value, sitting in fromLowerBound. + while (lowerBoundaryIterator.hasNext) { + if (fromLowerBound.value > toLowerBound && fromLowerBound.value < toUpperBound) { + usedLastValue = true + countInBucket += fromLowerBound.frequency + } + toLowerBound match { + case 1 => zeroCount += iterTo + case b if b < 1 => negativeCounts += iterTo + case b if b > 1 => positiveCounts += iterTo + } + } + // more values left, but only one unfilled bucket, sitting in toLowerBound + while (valuesIterator.hasNext) { + countInBucket += iterFrom + } + if (!usedLastValue) countInBucket += fromLowerBound.frequency + positiveCounts += countInBucket + + val negBucket: ExponentialHistogramBuckets = new ExponentialHistogramBuckets { + val getOffset: Int = -maxBucketCount + private val longs: ArrayBuffer[JLong] = negativeCounts.result() + val getBucketCounts: util.List[JLong] = new JArrayList(longs.asJava) + val getTotalCount: Long = longs.foldLeft(0L)(_ + _) + } + val posBucket: ExponentialHistogramBuckets = new ExponentialHistogramBuckets { + val getOffset: Int = 1 + private val longs: ArrayBuffer[JLong] = positiveCounts.result() + val getBucketCounts: util.List[JLong] = new JArrayList(longs.asJava) + val getTotalCount: Long = longs.foldLeft(0L)(_ + _) + } + (negBucket, zeroCount, posBucket) + } + + private def toExponentialHistogramData( + maxBucketCount: Int, + distributions: Seq[Snapshot[Distribution]] + ): Option[ExponentialHistogramData] = + distributions.filter(_.value.buckets.nonEmpty) match { + case Nil => None + case nonEmpty => + val mapped = nonEmpty.flatMap { s => + def maxScale(v: JDouble): Int = MetricsConverter.maxScale(maxBucketCount)(v) + + // Could also calculate an 'offset' here, but defaulting to offset = 1 for simplicity + val scale = Math.min(maxScale(s.value.min.toDouble), maxScale(s.value.max.toDouble)) + val (neg, zero, pos) = getExpoBucketCounts(scale, maxBucketCount)(s) + Some(ExponentialHistogramPointData.create( + scale, + s.value.sum, + zero, + pos, + neg, + fromNs, + toNs, + SpanConverter.toAttributes(s.tags), + new JArrayList[DoubleExemplarData]() + )) + } + if (mapped.nonEmpty) Some(ImmutableExponentialHistogramData.create(AggregationTemporality.DELTA, mapped.asJava)) + else None + } + + def convertExponentialHistogram(histogram: MetricSnapshot.Distributions): Option[MetricData] = { + val maxBucketCount = expoBucketConfig(histogram.name, histogram.settings.unit) + toExponentialHistogramData(maxBucketCount, histogram.instruments).map(d => + ImmutableMetricData.createExponentialHistogram( + resource, + instrumentationScopeInfo(histogram), + histogram.name, + histogram.description, + toString(histogram.settings.unit), + d + ) + ) + } + + def convertHistogram(histogramFormat: HistogramFormat)(histogram: MetricSnapshot.Distributions): Option[MetricData] = + histogramFormat match { + case Explicit => convertExplicitHistogram(histogram) + case Exponential => convertExponentialHistogram(histogram) + } + + private def toCounterDatum(g: Snapshot[Long]): LongPointData = + ImmutableLongPointData.create(fromNs, toNs, SpanConverter.toAttributes(g.tags), g.value) + + private def toCounterData(g: Seq[Snapshot[Long]]): SumData[LongPointData] = + ImmutableSumData.create(false, AggregationTemporality.DELTA, g.map(toCounterDatum).asJava) + + def convertCounter(counter: MetricSnapshot.Values[Long]): MetricData = + ImmutableMetricData.createLongSum( + resource, + instrumentationScopeInfo(counter), + counter.name, + counter.description, + toString(counter.settings.unit), + toCounterData(counter.instruments) + ) + +} + +/** + * Converts Kamon metrics to OpenTelemetry [[MetricData]]s + */ +private[otel] object MetricsConverter { + type ExplBucketFn = (String, MeasurementUnit) => Seq[JDouble] + type ExpoBucketFn = (String, MeasurementUnit) => Int + private val minScale = -10 + private val maxScale = 20 + + def convert( + resource: Resource, + kamonVersion: String, + histogramFormat: HistogramFormat, + explicitBucketConfig: ExplBucketFn, + exponentialBucketConfig: ExpoBucketFn + )(metrics: PeriodSnapshot): JCollection[MetricData] = { + val converter = new WithResourceMetricsConverter( + resource, + kamonVersion, + metrics.from, + metrics.to, + explicitBucketConfig, + exponentialBucketConfig + ) + val gauges = metrics.gauges.filter(_.instruments.nonEmpty).map(converter.convertGauge) + val histograms = (metrics.histograms ++ metrics.timers ++ metrics.rangeSamplers).filter(_.instruments.nonEmpty) + .flatMap(converter.convertHistogram(histogramFormat)) + val counters = metrics.counters.filter(_.instruments.nonEmpty).map(converter.convertCounter) + + (gauges ++ histograms ++ counters).asJava + } + + private val bases = (maxScale to minScale by -1).map(scale => (scale, Math.pow(2, Math.pow(2, -scale)))).toArray + + def maxScale(maxBucketCount: Int)(v: JDouble): Int = { + if (v >= 1) + bases.collectFirst { case (scale, base) if Math.pow(base, maxBucketCount) >= v => scale }.getOrElse(minScale) + else bases.collectFirst { case (scale, base) if Math.pow(base, -maxBucketCount) <= v => scale }.getOrElse(minScale) + } +} diff --git a/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/OpenTelemetryConfiguration.scala b/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/OpenTelemetryConfiguration.scala new file mode 100644 index 000000000..075e74107 --- /dev/null +++ b/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/OpenTelemetryConfiguration.scala @@ -0,0 +1,137 @@ +package kamon.otel + +import com.typesafe.config.Config +import io.opentelemetry.sdk.resources.Resource +import kamon.Kamon +import kamon.status.Status +import kamon.tag.Tag +import org.slf4j.LoggerFactory + +import java.net.{URL, URLDecoder} +import java.time.Duration +import scala.util.Try + +object HistogramFormat extends Enumeration { + val Explicit, Exponential = Value + type HistogramFormat = Value +} +import HistogramFormat._ + +case class OpenTelemetryConfiguration( + protocol: String, + endpoint: String, + compressionEnabled: Boolean, + headers: Seq[(String, String)], + timeout: Duration, + histogramFormat: Option[HistogramFormat] +) + +object OpenTelemetryConfiguration { + private val logger = LoggerFactory.getLogger(classOf[OpenTelemetryConfiguration]) + + object Component extends Enumeration { + val Trace, Metrics = Value + type Component = Value + } + + import Component._ + + /** + * Builds an otel configuration object using the provided typesafe configuration. + * + * @param config + * @return + */ + def apply(config: Config, component: Component): OpenTelemetryConfiguration = { + val name = component.toString.toLowerCase + val otelExporterConfig = config.getConfig(s"kamon.otel.$name") + val endpoint = otelExporterConfig.getString("endpoint") + val fullEndpoint = + if (otelExporterConfig.hasPath("full-endpoint")) Some(otelExporterConfig.getString("full-endpoint")) else None + val compression = otelExporterConfig.getString("compression") match { + case "gzip" => true + case x => + if (x != "") logger.warn(s"unrecognised compression $x. Defaulting to no compression") + false + } + val protocol = otelExporterConfig.getString("protocol") match { + case "http/protobuf" => "http/protobuf" + case "grpc" => "grpc" + case x => + logger.warn(s"Unrecognised opentelemetry schema type $x. Defaulting to grpc") + "grpc" + } + val headers = otelExporterConfig.getString("headers").split(',').filter(_.nonEmpty).map(_.split("=", 2)).map { + case Array(k) => k -> "" + case Array(k, v) => k -> v + }.toSeq + val timeout = otelExporterConfig.getDuration("timeout") + // See https://opentelemetry.io/docs/reference/specification/protocol/exporter/#endpoint-urls-for-otlphttp + val httpSuffix = component match { + case Trace => "traces" + case Metrics => "metrics" + } + val url = (protocol, fullEndpoint) match { + case ("http/protobuf", Some(full)) => + val parsed = new URL(full) + if (parsed.getPath.isEmpty) full :+ '/' else full + // Seems to be some dispute as to whether the / should technically be added in the case that the base path doesn't + // include it. Adding because it's probably what's desired most of the time, and can always be overridden by full-endpoint + case ("http/protobuf", None) => + if (endpoint.endsWith("/")) s"${endpoint}v1/$httpSuffix" else s"$endpoint/v1/$httpSuffix" + case (_, Some(full)) => full + case (_, None) => endpoint + } + val histogramFormat = if (component == Metrics) + Some(otelExporterConfig.getString("histogram-format").toLowerCase match { + case "explicit_bucket_histogram" => Explicit + case "base2_exponential_bucket_histogram" => Exponential + case x => + logger.warn(s"unrecognised histogram-format $x. Defaulting to Explicit") + Explicit + }) + else None + + logger.info(s"Configured endpoint for OpenTelemetry $name reporting [$url] using $protocol protocol") + + OpenTelemetryConfiguration(protocol, url, compression, headers, timeout, histogramFormat) + } + + private val kamonSettings: Status.Settings = Kamon.status().settings() + + /** + * Builds the resource information added as resource labels to the exported metrics/traces + * + * @return + */ + def buildResource(attributes: Map[String, String]): Resource = { + val env = Kamon.environment + val builder = Resource.builder() + .put("host.name", kamonSettings.environment.host) + .put("service.instance.id", kamonSettings.environment.instance) + .put("service.name", env.service) + .put("telemetry.sdk.name", "kamon") + .put("telemetry.sdk.language", "scala") + .put("telemetry.sdk.version", kamonSettings.version) + + attributes.foreach { case (k, v) => builder.put(k, v) } + // add all kamon.environment.tags as KeyValues to the Resource object + env.tags.iterator().foreach { + case t: Tag.String => builder.put(t.key, t.value) + case t: Tag.Boolean => builder.put(t.key, t.value) + case t: Tag.Long => builder.put(t.key, t.value) + } + + builder.build() + } + + def getAttributes(config: Config): Map[String, String] = + config.getString("kamon.otel.attributes").split(',').filter(_ contains '=').map(_.trim.split("=", 2)).map { + case Array(k, v) => + val decoded = Try(URLDecoder.decode(v.trim, "UTF-8")) + decoded.failed.foreach(t => + throw new IllegalArgumentException(s"value for attribute ${k.trim} is not a url-encoded string", t) + ) + k.trim -> decoded.get + }.toMap +} diff --git a/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/OpenTelemetryMetricsReporter.scala b/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/OpenTelemetryMetricsReporter.scala new file mode 100644 index 000000000..c4a0ff852 --- /dev/null +++ b/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/OpenTelemetryMetricsReporter.scala @@ -0,0 +1,167 @@ +/* + * Copyright 2013-2021 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kamon.otel + +import com.typesafe.config.{Config, ConfigUtil} +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.resources.Resource +import kamon.metric.MeasurementUnit.Dimension +import kamon.{Kamon, UtilsOnConfig} +import kamon.metric.{MeasurementUnit, PeriodSnapshot} +import kamon.module.{MetricReporter, Module, ModuleFactory} +import kamon.otel.HistogramFormat.Explicit +import kamon.otel.OpenTelemetryConfiguration.Component.Metrics +import kamon.status.Status +import org.slf4j.LoggerFactory + +import java.util +import java.lang.{Double => JDouble} +import java.util.{Collection => JCollection} +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext +import scala.util.{Failure, Success} + +case class BucketConfig[T]( + defaultBuckets: T, + timeBuckets: T, + informationBuckets: T, + percentageBuckets: T, + customBuckets: Map[String, T] +) + +object Buckets { + + private def readCustomBuckets(customBuckets: Config): Map[String, Seq[java.lang.Double]] = + customBuckets + .topLevelKeys + .map(k => (k, customBuckets.getDoubleList(ConfigUtil.quoteString(k)).asScala.toSeq)) + .toMap + + private def readCustomBucketsExpo(customBuckets: Config): Map[String, Int] = + customBuckets + .topLevelKeys + .map(k => (k, customBuckets.getInt(ConfigUtil.quoteString(k)))) + .toMap + + def parseBucketConfig(newConfig: Config): BucketConfig[Seq[JDouble]] = BucketConfig[Seq[JDouble]]( + newConfig.getDoubleList("default-buckets").asScala.toSeq, + newConfig.getDoubleList("time-buckets").asScala.toSeq, + informationBuckets = newConfig.getDoubleList("information-buckets").asScala.toSeq, + percentageBuckets = newConfig.getDoubleList("percentage-buckets").asScala.toSeq, + readCustomBuckets(newConfig.getConfig("custom")) + ) + + def resolveBucketConfiguration[T](bucketConfig: BucketConfig[T])(metricName: String, unit: MeasurementUnit): T = + bucketConfig.customBuckets.getOrElse( + metricName, + unit.dimension match { + case Dimension.Time => bucketConfig.timeBuckets + case Dimension.Information => bucketConfig.informationBuckets + case Dimension.Percentage => bucketConfig.percentageBuckets + case _ => bucketConfig.defaultBuckets + } + ) + + def parseExpoBucketConfig(newConfig: Config): BucketConfig[Int] = BucketConfig[Int]( + newConfig.getInt("default-bucket-count"), + newConfig.getInt("time-bucket-count"), + informationBuckets = newConfig.getInt("information-bucket-count"), + percentageBuckets = newConfig.getInt("percentage-bucket-count"), + readCustomBucketsExpo(newConfig.getConfig("custom")) + ) + +} +object OpenTelemetryMetricsReporter { + private val logger = LoggerFactory.getLogger(classOf[OpenTelemetryMetricsReporter]) + private val kamonSettings: Status.Settings = Kamon.status().settings() + + class Factory extends ModuleFactory { + override def create(settings: ModuleFactory.Settings): Module = { + logger.info("Creating OpenTelemetry Metrics Reporter") + + val module = new OpenTelemetryMetricsReporter(OtlpMetricsService.apply)(settings.executionContext) + module.reconfigure(settings.config) + module + } + } +} + +import kamon.otel.OpenTelemetryMetricsReporter._ + +/** + * Converts internal Kamon metrics to OpenTelemetry format and sends to a configured OpenTelemetry endpoint using gRPC or REST. + */ +class OpenTelemetryMetricsReporter(metricsServiceFactory: OpenTelemetryConfiguration => MetricsService)(implicit + ec: ExecutionContext +) extends MetricReporter { + private var metricsService: Option[MetricsService] = None + private var metricsConverterFunc: PeriodSnapshot => JCollection[MetricData] = (_ => new util.ArrayList[MetricData](0)) + + def isEmpty(snapshot: PeriodSnapshot): Boolean = + snapshot.gauges.isEmpty && snapshot.timers.isEmpty && snapshot.counters.isEmpty && snapshot.histograms.isEmpty && snapshot.rangeSamplers.isEmpty + + override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = { + if (!isEmpty(snapshot)) { + metricsService.foreach(ts => + ts.exportMetrics(metricsConverterFunc(snapshot)).onComplete { + case Success(_) => logger.debug("Successfully exported metrics") + + // TODO is there result for which a retry is relevant? Perhaps a glitch in the receiving service + // Keeping logs to debug as the underlying exporter will log if it fails to export metrics, and the failure isn't surfaced in the response anyway + case Failure(t) => logger.debug("Failed to export metrics", t) + } + ) + } + } + + override def reconfigure(newConfig: Config): Unit = { + logger.info("Reconfigure OpenTelemetry Metrics Reporter") + + // pre-generate the function for converting Kamon metrics to proto metrics + val attributes: Map[String, String] = OpenTelemetryConfiguration.getAttributes(newConfig) + val resource: Resource = OpenTelemetryConfiguration.buildResource(attributes) + val config = OpenTelemetryConfiguration(newConfig, Metrics) + val histogramFormat = config.histogramFormat.getOrElse { + logger.warn("Missing histogram-format from metrics configuration, defaulting to Explicit") + Explicit + } + + val explicitBucketConfig = Buckets.parseBucketConfig(newConfig.getConfig("kamon.otel.explicit-histo-boundaries")) + val exponentialBucketConfig = + Buckets.parseExpoBucketConfig(newConfig.getConfig("kamon.otel.exponential-histo-boundaries")) + + val resolveExplicitBucketConfiguration = Buckets.resolveBucketConfiguration(explicitBucketConfig) _ + + val resolveExponentialBucketConfiguration = Buckets.resolveBucketConfiguration(exponentialBucketConfig) _ + + this.metricsConverterFunc = MetricsConverter.convert( + resource, + kamonSettings.version, + histogramFormat, + resolveExplicitBucketConfiguration, + resolveExponentialBucketConfiguration + ) + + this.metricsService = Option(metricsServiceFactory.apply(config)) + } + + override def stop(): Unit = { + logger.info("Stopping OpenTelemetry Metrics Reporter") + this.metricsService.foreach(_.close()) + this.metricsService = None + } + +} diff --git a/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/OpenTelemetryTraceReporter.scala b/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/OpenTelemetryTraceReporter.scala index 0a78de2e7..4a4936428 100644 --- a/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/OpenTelemetryTraceReporter.scala +++ b/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/OpenTelemetryTraceReporter.scala @@ -15,24 +15,20 @@ */ package kamon.otel -import java.util - import com.typesafe.config.Config +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.sdk.trace.data.SpanData import kamon.Kamon import kamon.module.{Module, ModuleFactory, SpanReporter} +import kamon.otel.OpenTelemetryConfiguration.Component.Trace +import kamon.status.Status import kamon.trace.Span import org.slf4j.LoggerFactory -import java.net.URLDecoder -import java.util.{Collection => JCollection} +import java.util +import java.util.{Collection => JCollection} import scala.concurrent.ExecutionContext -import scala.util.{Failure, Success, Try} - -import io.opentelemetry.sdk.common.InstrumentationLibraryInfo -import io.opentelemetry.sdk.resources.Resource -import io.opentelemetry.sdk.trace.data.SpanData -import kamon.status.Status -import kamon.tag.Tag +import scala.util.{Failure, Success} object OpenTelemetryTraceReporter { private val logger = LoggerFactory.getLogger(classOf[OpenTelemetryTraceReporter]) @@ -49,13 +45,14 @@ object OpenTelemetryTraceReporter { } } -import OpenTelemetryTraceReporter._ +import kamon.otel.OpenTelemetryTraceReporter._ /** - * Converts internal finished Kamon spans to OpenTelemetry format and sends to a configured OpenTelemetry endpoint using gRPC. - */ -class OpenTelemetryTraceReporter(traceServiceFactory: Config => TraceService)(implicit ec: ExecutionContext) - extends SpanReporter { + * Converts internal finished Kamon spans to OpenTelemetry format and sends to a configured OpenTelemetry endpoint using gRPC or REST. + */ +class OpenTelemetryTraceReporter(traceServiceFactory: OpenTelemetryConfiguration => TraceService)(implicit + ec: ExecutionContext +) extends SpanReporter { private var traceService: Option[TraceService] = None private var spanConverterFunc: Seq[Span.Finished] => JCollection[SpanData] = (_ => new util.ArrayList[SpanData](0)) @@ -77,23 +74,16 @@ class OpenTelemetryTraceReporter(traceServiceFactory: Config => TraceService)(im logger.info("Reconfigure OpenTelemetry Trace Reporter") // pre-generate the function for converting Kamon span to proto span - val attributes: Map[String, String] = - newConfig.getString("kamon.otel.attributes").split(',').filter(_ contains '=').map(_.trim.split("=", 2)).map { - case Array(k, v) => - val decoded = Try(URLDecoder.decode(v.trim, "UTF-8")) - decoded.failed.foreach(t => - throw new IllegalArgumentException(s"value for attribute ${k.trim} is not a url-encoded string", t) - ) - k.trim -> decoded.get - }.toMap - val resource: Resource = buildResource(attributes) + val attributes: Map[String, String] = OpenTelemetryConfiguration.getAttributes(newConfig) + val resource: Resource = OpenTelemetryConfiguration.buildResource(attributes) + val config = OpenTelemetryConfiguration(newConfig, Trace) this.spanConverterFunc = SpanConverter.convert( newConfig.getBoolean("kamon.otel.trace.include-error-event"), resource, kamonSettings.version ) - this.traceService = Option(traceServiceFactory.apply(newConfig)) + this.traceService = Option(traceServiceFactory.apply(config)) } override def stop(): Unit = { @@ -102,29 +92,4 @@ class OpenTelemetryTraceReporter(traceServiceFactory: Config => TraceService)(im this.traceService = None } - /** - * Builds the resource information added as resource labels to the exported traces - * - * @return - */ - private def buildResource(attributes: Map[String, String]): Resource = { - val env = Kamon.environment - val builder = Resource.builder() - .put("host.name", kamonSettings.environment.host) - .put("service.instance.id", kamonSettings.environment.instance) - .put("service.name", env.service) - .put("telemetry.sdk.name", "kamon") - .put("telemetry.sdk.language", "scala") - .put("telemetry.sdk.version", kamonSettings.version) - - attributes.foreach { case (k, v) => builder.put(k, v) } - // add all kamon.environment.tags as KeyValues to the Resource object - env.tags.iterator().foreach { - case t: Tag.String => builder.put(t.key, t.value) - case t: Tag.Boolean => builder.put(t.key, t.value) - case t: Tag.Long => builder.put(t.key, t.value) - } - - builder.build() - } } diff --git a/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/Services.scala b/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/Services.scala new file mode 100644 index 000000000..0cc76707d --- /dev/null +++ b/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/Services.scala @@ -0,0 +1,142 @@ +/* + * Copyright 2013-2021 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kamon.otel + +import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter +import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter +import io.opentelemetry.sdk.metrics.`export`.MetricExporter +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.trace.`export`.SpanExporter +import io.opentelemetry.sdk.trace.data.SpanData + +import java.io.Closeable +import java.util.{Collection => JCollection} +import scala.concurrent.{Future, Promise} + +/** + * Service for exporting OpenTelemetry traces + */ +private[otel] trait TraceService extends Closeable { + def exportSpans(spans: JCollection[SpanData]): Future[Unit] +} + +/** + * Companion object to [[OtlpTraceService]] + */ +private[otel] object OtlpTraceService { + + /** + * Builds the http/protobuf trace exporter using the provided configuration. + * + * @param config + * @return + */ + def apply(config: OpenTelemetryConfiguration): TraceService = new OtlpTraceService(config) +} + +private[otel] class OtlpTraceService(c: OpenTelemetryConfiguration) extends TraceService { + private val compressionMethod = if (c.compressionEnabled) "gzip" else "none" + + private val delegate: SpanExporter = c.protocol match { + case "grpc" => + val builder = + OtlpGrpcSpanExporter.builder().setEndpoint(c.endpoint).setCompression(compressionMethod).setTimeout(c.timeout) + c.headers.foreach { case (k, v) => builder.addHeader(k, v) } + builder.build() + case "http/protobuf" => + val builder = + OtlpHttpSpanExporter.builder().setEndpoint(c.endpoint).setCompression(compressionMethod).setTimeout(c.timeout) + c.headers.foreach { case (k, v) => builder.addHeader(k, v) } + builder.build() + } + + override def exportSpans(spans: JCollection[SpanData]): Future[Unit] = { + val result = Promise[Unit] + val completableResultCode = delegate.`export`(spans) + val runnable: Runnable = new Runnable { + override def run(): Unit = + if (completableResultCode.isSuccess) result.success(()) + else result.failure(SpanStatusRuntimeException) + } + completableResultCode.whenComplete { + runnable + } + result.future + } + + override def close(): Unit = delegate.close() +} + +case object SpanStatusRuntimeException extends RuntimeException("Exporting trace span failed") + +/** + * Service for exporting OpenTelemetry metrics + */ +private[otel] trait MetricsService extends Closeable { + def exportMetrics(metrics: JCollection[MetricData]): Future[Unit] +} + +/** + * Companion object to [[OtlpMetricsService]] + */ +private[otel] object OtlpMetricsService { + + /** + * Builds the http/protobuf metrics exporter using the provided configuration. + * + * @param config + * @return + */ + def apply(config: OpenTelemetryConfiguration): MetricsService = new OtlpMetricsService(config) +} + +private[otel] class OtlpMetricsService(c: OpenTelemetryConfiguration) extends MetricsService { + private val compressionMethod = if (c.compressionEnabled) "gzip" else "none" + private val delegate: MetricExporter = c.protocol match { + case "grpc" => + val builder = + OtlpGrpcMetricExporter.builder().setEndpoint(c.endpoint).setCompression(compressionMethod).setTimeout(c.timeout) + c.headers.foreach { case (k, v) => builder.addHeader(k, v) } + builder.build() + case "http/protobuf" => + val builder = + OtlpHttpMetricExporter.builder().setEndpoint(c.endpoint).setCompression(compressionMethod).setTimeout(c.timeout) + c.headers.foreach { case (k, v) => builder.addHeader(k, v) } + builder.build() + } + + override def exportMetrics(metrics: JCollection[MetricData]): Future[Unit] = + if (metrics.isEmpty) Future.successful(()) + else { + val result = Promise[Unit] + val completableResultCode = delegate.`export`(metrics) + val runnable: Runnable = new Runnable { + override def run(): Unit = + if (completableResultCode.isSuccess) result.success(()) + else result.failure(MetricStatusRuntimeException) + } + completableResultCode.whenComplete { + runnable + } + result.future + } + + override def close(): Unit = delegate.close() +} + +case object MetricStatusRuntimeException extends RuntimeException("Exporting metric failed") diff --git a/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/TraceService.scala b/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/TraceService.scala deleted file mode 100644 index 96915ada7..000000000 --- a/reporters/kamon-opentelemetry/src/main/scala/kamon/otel/TraceService.scala +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright 2013-2021 The Kamon Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kamon.otel - -import java.io.Closeable -import java.net.URL -import java.time.Duration -import java.util.{Collection => JCollection} - -import scala.concurrent.{Future, Promise} - -import com.typesafe.config.Config -import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter -import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter -import io.opentelemetry.sdk.trace.`export`.SpanExporter -import io.opentelemetry.sdk.trace.data.SpanData -import org.slf4j.LoggerFactory - -/** - * Service for exporting OpenTelemetry traces - */ -private[otel] trait TraceService extends Closeable { - def exportSpans(spans: JCollection[SpanData]): Future[Unit] -} - -/** - * Companion object to [[OtlpTraceService]] - */ -private[otel] object OtlpTraceService { - private val logger = LoggerFactory.getLogger(classOf[OtlpTraceService]) - - /** - * Builds the http/protobuf trace exporter using the provided configuration. - * - * @param config - * @return - */ - def apply(config: Config): TraceService = { - val otelExporterConfig = config.getConfig("kamon.otel.trace") - val endpoint = otelExporterConfig.getString("endpoint") - val fullEndpoint = - if (otelExporterConfig.hasPath("full-endpoint")) Some(otelExporterConfig.getString("full-endpoint")) else None - val compression = otelExporterConfig.getString("compression") match { - case "gzip" => true - case x => - if (x != "") logger.warn(s"unrecognised compression $x. Defaulting to no compression") - false - } - val protocol = otelExporterConfig.getString("protocol") match { - case "http/protobuf" => "http/protobuf" - case "grpc" => "grpc" - case x => - logger.warn(s"Unrecognised opentelemetry schema type $x. Defaulting to grpc") - "grpc" - } - val headers = otelExporterConfig.getString("headers").split(',').filter(_.nonEmpty).map(_.split("=", 2)).map { - case Array(k) => k -> "" - case Array(k, v) => k -> v - }.toSeq - val timeout = otelExporterConfig.getDuration("timeout") - // See https://opentelemetry.io/docs/reference/specification/protocol/exporter/#endpoint-urls-for-otlphttp - val url = (protocol, fullEndpoint) match { - case ("http/protobuf", Some(full)) => - val parsed = new URL(full) - if (parsed.getPath.isEmpty) full :+ '/' else full - // Seems to be some dispute as to whether the / should technically be added in the case that the base path doesn't - // include it. Adding because it's probably what's desired most of the time, and can always be overridden by full-endpoint - case ("http/protobuf", None) => if (endpoint.endsWith("/")) endpoint + "v1/traces" else endpoint + "/v1/traces" - case (_, Some(full)) => full - case (_, None) => endpoint - } - - logger.info(s"Configured endpoint for OpenTelemetry trace reporting [$url] using $protocol protocol") - - new OtlpTraceService(protocol, url, compression, headers, timeout) - } -} - -private[otel] class OtlpTraceService( - protocol: String, - endpoint: String, - compressionEnabled: Boolean, - headers: Seq[(String, String)], - timeout: Duration -) extends TraceService { - private val compressionMethod = if (compressionEnabled) "gzip" else "none" - private val delegate: SpanExporter = protocol match { - case "grpc" => - val builder = - OtlpGrpcSpanExporter.builder().setEndpoint(endpoint).setCompression(compressionMethod).setTimeout(timeout) - headers.foreach { case (k, v) => builder.addHeader(k, v) } - builder.build() - case "http/protobuf" => - val builder = - OtlpHttpSpanExporter.builder().setEndpoint(endpoint).setCompression(compressionMethod).setTimeout(timeout) - headers.foreach { case (k, v) => builder.addHeader(k, v) } - builder.build() - } - - override def exportSpans(spans: JCollection[SpanData]): Future[Unit] = { - val result = Promise[Unit] - val completableResultCode = delegate.`export`(spans) - val runnable: Runnable = new Runnable { - override def run(): Unit = - if (completableResultCode.isSuccess) result.success(()) - else result.failure(StatusRuntimeException) - } - completableResultCode.whenComplete { - runnable - } - result.future - } - - override def close(): Unit = delegate.close() -} - -case object StatusRuntimeException extends RuntimeException("Exporting trace span failed") diff --git a/reporters/kamon-opentelemetry/src/test/resources/application.conf b/reporters/kamon-opentelemetry/src/test/resources/application.conf index 3852acf52..612f753e1 100644 --- a/reporters/kamon-opentelemetry/src/test/resources/application.conf +++ b/reporters/kamon-opentelemetry/src/test/resources/application.conf @@ -12,5 +12,17 @@ kamon { env = "kamon-devint" } } - otel.attributes = "att1=v1, att2 = v2 ,att3=+a%3Db%2Cc%3Dd+" + + otel { + attributes = "att1=v1, att2 = v2 ,att3=+a%3Db%2Cc%3Dd+" + explicit-histo-boundaries { + default-buckets = [ + 1, + 2, + 3, + 4, + 10 + ] + } + } } diff --git a/reporters/kamon-opentelemetry/src/test/scala/kamon/otel/OpenTelemetryMetricReporterSpec.scala b/reporters/kamon-opentelemetry/src/test/scala/kamon/otel/OpenTelemetryMetricReporterSpec.scala new file mode 100644 index 000000000..9cbf1a4c2 --- /dev/null +++ b/reporters/kamon-opentelemetry/src/test/scala/kamon/otel/OpenTelemetryMetricReporterSpec.scala @@ -0,0 +1,287 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.otel + +import com.typesafe.config.{Config, ConfigValue, ConfigValueFactory} +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.internal.data.exponentialhistogram.ExponentialHistogramData +import kamon.Kamon +import kamon.Kamon.config +import kamon.metric._ +import kamon.tag.TagSet +import kamon.testkit.Reconfigure +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +import java.lang.{Double => JDouble} +import java.time.Instant +import java.util.{Collection => JCollection} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Random + +class OpenTelemetryMetricReporterSpec extends AnyWordSpec + with Matchers with Reconfigure { + reconfigure => + + private def openTelemetryMetricsReporter(newConfig: Config = config) + : (OpenTelemetryMetricsReporter, MockMetricsService) = { + val metricsService = new MockMetricsService() + val reporter = new OpenTelemetryMetricsReporter(_ => metricsService)(ExecutionContext.global) + reporter.reconfigure(newConfig) + (reporter, metricsService) + } + + "the OpenTelemetryMetricsReporter" should { + "send counter metrics" in { + val (reporter, mockService) = openTelemetryMetricsReporter() + + val now = Instant.now() + reporter.reportPeriodSnapshot( + PeriodSnapshot.apply( + now.minusMillis(1000), + now, + MetricSnapshot.ofValues[Long]( + "test.counter", + "test", + Metric.Settings.ForValueInstrument(MeasurementUnit.none, java.time.Duration.ZERO), + Instrument.Snapshot.apply(TagSet.of("tag1", "value1"), 42L) :: Nil + ) :: Nil, + Nil, + Nil, + Nil, + Nil + ) + ) + // basic sanity + mockService.exportMetricsServiceRequest should not be empty + mockService.exportMetricsServiceRequest.get should have size 1 + val exportedMetrics: Seq[MetricData] = mockService.exportMetricsServiceRequest.get.asScala.toSeq + exportedMetrics should have size 1 + val metricData = exportedMetrics.head + + // check attributes on resource + val kamonVersion = Kamon.status().settings().version + val attributeMap = metricData.getResource.getAttributes.asMap().asScala + attributeMap should contain(AttributeKey.stringKey("service.name"), "kamon-test-application") + attributeMap should contain(AttributeKey.stringKey("telemetry.sdk.name"), "kamon") + attributeMap should contain(AttributeKey.stringKey("telemetry.sdk.language"), "scala") + attributeMap should contain(AttributeKey.stringKey("telemetry.sdk.version"), kamonVersion) + attributeMap should contain(AttributeKey.stringKey("service.version"), "x.x.x") + attributeMap should contain(AttributeKey.stringKey("env"), "kamon-devint") + attributeMap should contain(AttributeKey.stringKey("att1"), "v1") + attributeMap should contain(AttributeKey.stringKey("att2"), "v2") + attributeMap should contain(AttributeKey.stringKey("att3"), " a=b,c=d ") + val host = attributeMap.get(AttributeKey.stringKey("host.name")) + host shouldBe defined + val instance = attributeMap.get(AttributeKey.stringKey("service.instance.id")) + instance should contain(s"kamon-test-application@${host.get}") + + // assert instrumentation labels + val instrumentationScopeInfo = metricData.getInstrumentationScopeInfo + instrumentationScopeInfo.getName should be("kamon-metrics") + instrumentationScopeInfo.getVersion should be(kamonVersion) + instrumentationScopeInfo.getSchemaUrl should be(null) + + // check value + metricData.getName should equal("test.counter") + metricData.getDescription should equal("test") + val sumData = metricData.getLongSumData + val points = sumData.getPoints.asScala.toSeq + points should have size 1 + points.head.getAttributes should have size 1 + points.head.getAttributes.get(AttributeKey.stringKey("tag1")) should equal("value1") + points.head.getValue shouldEqual 42L + } + "send histogram metrics" in { + val (reporter, mockService) = openTelemetryMetricsReporter() + val now = Instant.now() + reporter.reportPeriodSnapshot( + PeriodSnapshot.apply( + now.minusMillis(1000), + now, + Nil, + Nil, + MetricSnapshot.ofDistributions( + "test.histogram", + "test", + Metric.Settings.ForDistributionInstrument( + MeasurementUnit.none, + java.time.Duration.ZERO, + DynamicRange.Default + ), + Instrument.Snapshot( + TagSet.from(Map("tag1" -> "value1")), + buildHistogramDist(Seq(1L -> 2L, 2L -> 2L, 3L -> 3L, 5L -> 1L, 15L -> 1L)) + ) :: Nil + ) :: Nil, + Nil, + Nil + ) + ) + // basic sanity + mockService.exportMetricsServiceRequest should not be empty + mockService.exportMetricsServiceRequest.get should have size 1 + val exportedMetrics: Seq[MetricData] = mockService.exportMetricsServiceRequest.get.asScala.toSeq + exportedMetrics should have size 1 + val metricData = exportedMetrics.head + + // check value + metricData.getName should equal("test.histogram") + metricData.getDescription should equal("test") + val sumData = metricData.getHistogramData + val points = sumData.getPoints.asScala.toSeq + points should have size 1 + points.head.getAttributes should have size 1 + points.head.getAttributes.get(AttributeKey.stringKey("tag1")) should equal("value1") + points.head.getMin shouldEqual 1L + points.head.getMax shouldEqual 15L + points.head.getSum shouldEqual 35L + points.head.getCount shouldEqual 9L + points.head.getBoundaries.asScala shouldEqual Seq[JDouble](1d, 2d, 3d, 4d, 10d) + points.head.getCounts.asScala shouldEqual Seq[JDouble](2d, 2d, 3d, 0d, 1d, 1d) + } + "send exponential histogram metrics" in { + val newConfig = config.withValue( + "kamon.otel.metrics.histogram-format", + ConfigValueFactory.fromAnyRef("base2_exponential_bucket_histogram") + ) + val (reporter, mockService) = openTelemetryMetricsReporter(newConfig) + val now = Instant.now() + reporter.reportPeriodSnapshot( + PeriodSnapshot.apply( + now.minusMillis(1000), + now, + Nil, + Nil, + MetricSnapshot.ofDistributions( + "test.histogram", + "test", + Metric.Settings.ForDistributionInstrument( + MeasurementUnit.none, + java.time.Duration.ZERO, + DynamicRange.Default + ), + Instrument.Snapshot( + TagSet.from(Map("tag1" -> "value1")), + buildHistogramDist(Seq(1L -> 2L, 2L -> 2L, 3L -> 3L, 5L -> 1L, 15L -> 1L)) + ) :: Nil + ) :: Nil, + Nil, + Nil + ) + ) + // basic sanity + mockService.exportMetricsServiceRequest should not be empty + mockService.exportMetricsServiceRequest.get should have size 1 + val exportedMetrics: Seq[MetricData] = mockService.exportMetricsServiceRequest.get.asScala.toSeq + exportedMetrics should have size 1 + val metricData = exportedMetrics.head + + // check value + metricData.getName should equal("test.histogram") + metricData.getDescription should equal("test") + val sumData = ExponentialHistogramData.fromMetricData(metricData) + val points = sumData.getPoints.asScala.toSeq + points should have size 1 + points.head.getAttributes should have size 1 + points.head.getAttributes.get(AttributeKey.stringKey("tag1")) should equal("value1") + points.head.getScale shouldEqual 5 + points.head.getNegativeBuckets.getTotalCount shouldEqual 0L + points.head.getZeroCount shouldEqual 2L + points.head.getPositiveBuckets.getTotalCount shouldEqual 7L + points.head.getSum shouldEqual 35L + points.head.getCount shouldEqual 9L + } + + "calculate sensible scales for values" in { + def randomDouble = Random.nextInt(10) match { + case 0 => 0d + case 1 => Random.nextDouble() * 1e-18 + case 2 => Random.nextDouble() * 1e-12 + case 3 => Random.nextDouble() * 1e-6 + case 4 => Random.nextDouble() * 1e-3 + case 5 => Random.nextDouble() + case 6 => Random.nextDouble() * 1e3 + case 7 => Random.nextDouble() * 1e6 + case 8 => Random.nextDouble() * 1e12 + case 9 => Random.nextDouble() * 1e18 + } + + for (i <- (0 to 100).map(_ => randomDouble); maxBucketCount <- (0 to 10).map(_ => Random.nextInt(320))) { + val scale = MetricsConverter.maxScale(maxBucketCount)(i) + val baseFromScale = Math.pow(2, Math.pow(2, -scale)) + val baseFromScale_plus_1 = Math.pow(2, Math.pow(2, -scale - 1)) + val maxFromBase = Math.pow(baseFromScale, maxBucketCount) + val minFromBase = 1d / Math.pow(baseFromScale, maxBucketCount) + val maxFromBase_plus_1 = Math.pow(baseFromScale_plus_1, maxBucketCount) + val minFromBase_plus_1 = 1d / Math.pow(baseFromScale_plus_1, maxBucketCount) + if (i >= 1) { + if (scale != -10) maxFromBase should be >= i + if (scale != 20) maxFromBase_plus_1 should be <= i + } else { + if (scale != -10) minFromBase should be <= i + if (scale != 20) minFromBase_plus_1 should be >= i + } + } + } + } + + private def buildHistogramDist(_buckets: Seq[(Long, Long)]): Distribution = { + + val distribution: Distribution = new Distribution() { + override def dynamicRange: DynamicRange = DynamicRange.Default + + override def min: Long = _buckets.minBy(_._1)._1 + + override def max: Long = _buckets.maxBy(_._1)._1 + + override def sum: Long = _buckets.foldLeft(0L) { case (a, (v, f)) => a + (v * f) } + + override def count: Long = _buckets.foldLeft(0L) { case (a, (_, f)) => a + f } + + override def percentile(rank: Double): Distribution.Percentile = + ??? // percentileValues.get(rank).map(r => r.toPercentile).orNull + + override def percentiles: Seq[Distribution.Percentile] = ??? // Seq(perc.toPercentile) + + override def percentilesIterator: Iterator[Distribution.Percentile] = null + + override def buckets: Seq[Distribution.Bucket] = bucketsIterator.toSeq + + override def bucketsIterator: Iterator[Distribution.Bucket] = _buckets.iterator.map { case (v, f) => + PureDistributionBucket(v, f) + } + } + distribution + } + + case class PureDistributionBucket(value: Long, frequency: Long) extends Distribution.Bucket + + private class MockMetricsService extends MetricsService { + var exportMetricsServiceRequest: Option[JCollection[MetricData]] = None + var hasBeenClosed = false + + override def exportMetrics(metrics: JCollection[MetricData]): Future[Unit] = { + exportMetricsServiceRequest = Option(metrics) + Future.successful(()) + } + + override def close(): Unit = hasBeenClosed = true + } +} diff --git a/reporters/kamon-opentelemetry/src/test/scala/kamon/otel/TraceServiceSpec.scala b/reporters/kamon-opentelemetry/src/test/scala/kamon/otel/TraceServiceSpec.scala index dc2c7da49..091f7ae7c 100644 --- a/reporters/kamon-opentelemetry/src/test/scala/kamon/otel/TraceServiceSpec.scala +++ b/reporters/kamon-opentelemetry/src/test/scala/kamon/otel/TraceServiceSpec.scala @@ -17,6 +17,7 @@ package kamon.otel import com.typesafe.config.ConfigFactory import io.opentelemetry.sdk.resources.Resource +import kamon.otel.OpenTelemetryConfiguration.Component import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.should.Matchers @@ -24,8 +25,8 @@ import org.scalatest.time.{Millis, Seconds, Span} import org.scalatest.wordspec.AnyWordSpec /** - * Tests for the [[TraceService]] - */ + * Tests for the [[TraceService]] + */ class TraceServiceSpec extends AnyWordSpec with Matchers with ScalaFutures with Utils { private implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = Span(2, Seconds), interval = Span(15, Millis)) @@ -41,18 +42,18 @@ class TraceServiceSpec extends AnyWordSpec with Matchers with ScalaFutures with "exporting traces" should { "fail in case the remote service is not operable" in { - val traceService = OtlpTraceService(config) + val traceService = OtlpTraceService(OpenTelemetryConfiguration(config, Component.Trace)) // the actual data does not really matter as this will fail due to connection issues val resources = SpanConverter.convert(false, resource, kamonVersion)(Seq(finishedSpan())) val f = traceService.exportSpans(resources) whenReady(f.failed, Timeout(Span.apply(12, Seconds))) { e => - e shouldEqual StatusRuntimeException + e shouldEqual SpanStatusRuntimeException } } } "closing service should execute without errors" in { - OtlpTraceService(config).close() + OtlpTraceService(OpenTelemetryConfiguration(config, Component.Trace)).close() } }