Skip to content

Commit

Permalink
implement histograms
Browse files Browse the repository at this point in the history
  • Loading branch information
hughsimpson committed Aug 22, 2023
1 parent 7c0ab66 commit bac0985
Show file tree
Hide file tree
Showing 5 changed files with 392 additions and 41 deletions.
42 changes: 42 additions & 0 deletions reporters/kamon-opentelemetry/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,48 @@ kamon.otel {
# standard attribute names; enable for 'more full' compliance with otel standard
include-error-event = false
}

explicit-histo-boundaries {
# Same as defaults from https://opentelemetry.io/docs/specs/otel/metrics/sdk/#explicit-bucket-histogram-aggregation
default-buckets = [
0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000
]

# The following are the same as for the prometheus reporter default values
time-buckets = [
0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10
]

information-buckets = [
512, 1024, 2048, 4096, 16384, 65536, 524288, 1048576
]

percentage-buckets = [
20, 40, 60, 70, 80, 90, 95
]

# Per metric overrides are possible by specifying the metric name and the histogram buckets here
custom {
// example:
// "akka.actor.processing-time" = [0.1, 1.0, 10.0]
}
}

exponential-histo-boundaries {
default-bucket-count = 160

time-bucket-count = 160

information-bucket-count = 160

percentage-bucket-count = 100

# Per metric overrides are possible by specifying the metric name and the histogram buckets here
custom {
// example:
// "akka.actor.processing-time" = 3
}
}
}

# Arbitrary key-value pairs that further identify the environment where this service instance is running.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@ package kamon.otel
import io.opentelemetry.sdk.common.InstrumentationScopeInfo
import io.opentelemetry.sdk.metrics.data._
import io.opentelemetry.sdk.metrics.internal.data._
import io.opentelemetry.sdk.metrics.internal.data.exponentialhistogram.{ExponentialHistogramData, ExponentialHistogramPointData, ImmutableExponentialHistogramData}
import io.opentelemetry.sdk.metrics.internal.data.exponentialhistogram.{ExponentialHistogramBuckets, ExponentialHistogramData, ExponentialHistogramPointData, ImmutableExponentialHistogramData}
import io.opentelemetry.sdk.resources.Resource
import kamon.metric.Instrument.Snapshot
import kamon.metric.{Distribution, MeasurementUnit, MetricSnapshot, PeriodSnapshot}
import kamon.otel.HistogramFormat.{Explicit, Exponential, HistogramFormat}
import kamon.otel.MetricsConverter.{ExplBucketFn, ExpoBucketFn}
import org.slf4j.LoggerFactory

import java.lang.{Double => JDouble, Long => JLong}
import java.time.Instant
import java.util.{Collection => JCollection, ArrayList => JArrayList}
import java.util
import java.util.{ArrayList => JArrayList, Collection => JCollection}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, from: Instant, to: Instant) {
class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, from: Instant, to: Instant,
explBucketConfig: ExplBucketFn, expoBucketConfig: ExpoBucketFn) {
private val maxDouble: JDouble = JDouble.valueOf(JDouble.MAX_VALUE)
private val logger = LoggerFactory.getLogger(getClass)
private val fromNs = from.toEpochMilli * 1000000
private val toNs = to.toEpochMilli * 1000000
Expand All @@ -54,71 +58,212 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
toString(gauge.settings.unit),
toGaugeData(gauge.instruments))

private def toExplicitHistogramDatum(s: Snapshot[Distribution]): HistogramPointData = {
val boundaries = ArrayBuffer.newBuilder[JDouble]
private def getExplBucketCounts(bucketConfiguration: Seq[JDouble])(s: Snapshot[Distribution]) = {
val counts = ArrayBuffer.newBuilder[JLong]
val boundaryIterator: Iterator[JDouble] = (bucketConfiguration :+ maxDouble).iterator
var nextBoundary = boundaryIterator.next()
var inBucketCount = 0L
for (el <- s.value.bucketsIterator) {
counts += el.frequency
boundaries += el.value.toDouble
while (el.value > nextBoundary) {
nextBoundary = boundaryIterator.next()
counts += inBucketCount
inBucketCount = 0L
}
inBucketCount += el.frequency
}
while (boundaryIterator.hasNext) {
counts += inBucketCount
boundaryIterator.next()
inBucketCount = 0L
}
counts += inBucketCount
counts
}

private def toExplicitHistogramDatum(bucketConfiguration: Seq[JDouble])(s: Snapshot[Distribution]): HistogramPointData = {
val counts = getExplBucketCounts(bucketConfiguration)(s)
ImmutableHistogramPointData.create(
fromNs,
toNs,
SpanConverter.toAttributes(s.tags),
JDouble valueOf s.value.sum.toDouble,
JDouble valueOf s.value.min.toDouble,
JDouble valueOf s.value.max.toDouble,
boundaries.result().dropRight(1).asJava,
bucketConfiguration.asJava,
counts.result().asJava
)
}

private def toExplicitHistogramData(distributions: Seq[Snapshot[Distribution]]): Option[HistogramData] =
private def toExplicitHistogramData(bucketConfiguration: Seq[JDouble], distributions: Seq[Snapshot[Distribution]]): Option[HistogramData] =
distributions.filter(_.value.buckets.nonEmpty) match {
case Nil => None
case nonEmpty => Some(ImmutableHistogramData.create(AggregationTemporality.DELTA, nonEmpty.map(toExplicitHistogramDatum).asJava))
case nonEmpty => Some(ImmutableHistogramData.create(AggregationTemporality.DELTA, nonEmpty.map(toExplicitHistogramDatum(bucketConfiguration)).asJava))
}

def convertExplicitHistogram(histogram: MetricSnapshot.Distributions): Option[MetricData] =
toExplicitHistogramData(histogram.instruments).map(d =>
def convertExplicitHistogram(histogram: MetricSnapshot.Distributions): Option[MetricData] = {
val bucketConfiguration = explBucketConfig(histogram.name, histogram.settings.unit)
toExplicitHistogramData(bucketConfiguration, histogram.instruments).map(d =>
ImmutableMetricData.createDoubleHistogram(
resource,
instrumentationScopeInfo(histogram),
histogram.name,
histogram.description,
toString(histogram.settings.unit),
d))
}

class ItWithLast[T](it: Iterator[T], last: T) extends Iterator[T] {
private var showedLast: Boolean = false

def hasNext: Boolean = it.hasNext || !showedLast

def next(): T = if (it.hasNext) it.next() else if (!showedLast) {
showedLast = true
last
} else throw new RuntimeException("Next on empty Iterator")
}

private def toExponentialHistogramData(distributions: Seq[Snapshot[Distribution]]): Option[ExponentialHistogramData] =
private def getExpoBucketCounts(scale: Int, maxBucketCount: Int)(s: Snapshot[Distribution]) = {
val base = Math.pow(2, Math.pow(2, -scale))
val lowerBoundaryIterator: Iterator[Double] = ((-maxBucketCount to maxBucketCount).map(i => Math.pow(base, i)) :+ Double.MaxValue).iterator
val valuesIterator = new ItWithLast[Distribution.Bucket](s.value.bucketsIterator, new Distribution.Bucket {
def value: Long = Long.MaxValue

def frequency: Long = 0
})
var fromLowerBound = valuesIterator.next()
var fromUpperBound = valuesIterator.next()
var toLowerBound = lowerBoundaryIterator.next()
var toUpperBound = lowerBoundaryIterator.next()
var zeroCount: JLong = 0L
var countInBucket = 0L

val negativeCounts = ArrayBuffer.newBuilder[JLong]
val positiveCounts = ArrayBuffer.newBuilder[JLong]

def iterFrom: JLong = {
val d = fromLowerBound.frequency
fromLowerBound = fromUpperBound
fromUpperBound = valuesIterator.next()
d
}

def iterTo: JLong = {
toLowerBound = toUpperBound
toUpperBound = lowerBoundaryIterator.next()
val res = countInBucket
countInBucket = 0
res
}
// normal case
while (lowerBoundaryIterator.hasNext && valuesIterator.hasNext) {
if (fromUpperBound.value <= toLowerBound) {
countInBucket += iterFrom // Or drop?
} else if (fromLowerBound.value >= toUpperBound) toLowerBound match {
case 1 => zeroCount += iterTo
case b if b < 1 => negativeCounts += iterTo
case b if b > 1 => positiveCounts += iterTo
} else if (fromUpperBound.value == toUpperBound) toLowerBound match {
case 1 =>
zeroCount += iterFrom
iterTo
case b if b < 1 =>
countInBucket += iterFrom
negativeCounts += iterTo
case b if b > 1 =>
countInBucket += iterFrom
positiveCounts += iterTo
} else if (fromUpperBound.value > toUpperBound) {
val firstBonus: JLong = countInBucket
var negBuckets = 0
var zeroBuckets = 0
var posBuckets = 0
while (fromUpperBound.value > toUpperBound && lowerBoundaryIterator.hasNext) {
if (toLowerBound < 1) negBuckets += 1
else if (toLowerBound == 1) zeroBuckets += 1
else if (toLowerBound >= 1) posBuckets += 1
toLowerBound = toUpperBound
toUpperBound = lowerBoundaryIterator.next()
}
val total = iterFrom
// Not sure about this... everything's going into the first bucket, even though we might be spanning multiple target buckets.
// Might be better to do something like push the avg.floor into each bucket, interpolating the remainder.
// OTOH it may not really come up much in practice, since the internal histos are likely to have similar or finer granularity
negativeCounts ++= (if (negBuckets > 0) JLong.valueOf(firstBonus + total) +: Array.fill(negBuckets - 1)(JLong.valueOf(0)) else Nil)
zeroCount += (if (negBuckets == 0 && zeroBuckets == 1) JLong.valueOf(firstBonus + total) else JLong.valueOf(0))
positiveCounts ++= (
if (negBuckets == 0 && zeroBuckets == 0 && posBuckets > 0)
JLong.valueOf(firstBonus + total) +: Array.fill(posBuckets - 1)(JLong.valueOf(0))
else Array.fill(posBuckets)(JLong.valueOf(0)))
} else /*if (fromUpperBound.value < toUpperBound) */ toLowerBound match {
case 1 => zeroCount += iterFrom
case _ => countInBucket += iterFrom
}
}
var usedLastValue = false
// more buckets left to fill but only one unused value, sitting in fromLowerBound.
while (lowerBoundaryIterator.hasNext) {
if (fromLowerBound.value > toLowerBound && fromLowerBound.value < toUpperBound) {
usedLastValue = true
countInBucket += fromLowerBound.frequency
}
toLowerBound match {
case 1 => zeroCount += iterTo
case b if b < 1 => negativeCounts += iterTo
case b if b > 1 => positiveCounts += iterTo
}
}
// more values left, but only one unfilled bucket, sitting in toLowerBound
while (valuesIterator.hasNext) {
countInBucket += iterFrom
}
if (!usedLastValue) countInBucket += fromLowerBound.frequency
positiveCounts += countInBucket

val negBucket: ExponentialHistogramBuckets = new ExponentialHistogramBuckets {
val getOffset: Int = -maxBucketCount
private val longs: ArrayBuffer[JLong] = negativeCounts.result()
val getBucketCounts: util.List[JLong] = new JArrayList(longs.asJava)
val getTotalCount: Long = longs.foldLeft(0L)(_ + _)
}
val posBucket: ExponentialHistogramBuckets = new ExponentialHistogramBuckets {
val getOffset: Int = 1
private val longs: ArrayBuffer[JLong] = positiveCounts.result()
val getBucketCounts: util.List[JLong] = new JArrayList(longs.asJava)
val getTotalCount: Long = longs.foldLeft(0L)(_ + _)
}
(negBucket, zeroCount, posBucket)
}

private def toExponentialHistogramData(maxBucketCount: Int, distributions: Seq[Snapshot[Distribution]]): Option[ExponentialHistogramData] =
distributions.filter(_.value.buckets.nonEmpty) match {
case Nil => None
case nonEmpty =>
val mapped = nonEmpty.flatMap { s =>
s.value match {
case zigZag: Distribution.ZigZagCounts =>
logger.error("Unable to construct exponential histogram data - Unimplemented")
None
// Some(ExponentialHistogramPointData.create(
// ???, zigZag.sum, ???, ???, ???, fromNs, toNs, SpanConverter.toAttributes(s.tags), new JArrayList[DoubleExemplarData]()
// ))
case _ =>
logger.error("Unable to construct exponential histogram data - only ZigZagCounts distribution can be converted")
None
}
def maxScale(v: JDouble): Int = MetricsConverter.maxScale(maxBucketCount)(v)

// Could also calculate an 'offset' here, but defaulting to offset = 1 for simplicity
val scale = Math.min(maxScale(s.value.min.toDouble), maxScale(s.value.max.toDouble))
val (neg, zero, pos) = getExpoBucketCounts(scale, maxBucketCount)(s)
Some(ExponentialHistogramPointData.create(
scale, s.value.sum, zero, pos, neg, fromNs, toNs, SpanConverter.toAttributes(s.tags), new JArrayList[DoubleExemplarData]()
))
}
if (mapped.nonEmpty) Some(ImmutableExponentialHistogramData.create(AggregationTemporality.DELTA, mapped.asJava))
else None
}

def convertExponentialHistogram(histogram: MetricSnapshot.Distributions): Option[MetricData] =
toExponentialHistogramData(histogram.instruments).map(d =>
def convertExponentialHistogram(histogram: MetricSnapshot.Distributions): Option[MetricData] = {
val maxBucketCount = expoBucketConfig(histogram.name, histogram.settings.unit)
toExponentialHistogramData(maxBucketCount, histogram.instruments).map(d =>
ImmutableMetricData.createExponentialHistogram(
resource,
instrumentationScopeInfo(histogram),
histogram.name,
histogram.description,
toString(histogram.settings.unit),
d))
}

def convertHistogram(histogramFormat: HistogramFormat)(histogram: MetricSnapshot.Distributions): Option[MetricData] = histogramFormat match {
case Explicit => convertExplicitHistogram(histogram)
Expand Down Expand Up @@ -146,13 +291,26 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
* Converts Kamon metrics to OpenTelemetry [[MetricData]]s
*/
private[otel] object MetricsConverter {
def convert(resource: Resource, kamonVersion: String, histogramFormat: HistogramFormat)(metrics: PeriodSnapshot): JCollection[MetricData] = {
val converter = new WithResourceMetricsConverter(resource, kamonVersion, metrics.from, metrics.to)
type ExplBucketFn = (String, MeasurementUnit) => Seq[JDouble]
type ExpoBucketFn = (String, MeasurementUnit) => Int
private val minScale = -10
private val maxScale = 20

def convert(resource: Resource, kamonVersion: String, histogramFormat: HistogramFormat,
explicitBucketConfig: ExplBucketFn, exponentialBucketConfig: ExpoBucketFn)(metrics: PeriodSnapshot): JCollection[MetricData] = {
val converter = new WithResourceMetricsConverter(resource, kamonVersion, metrics.from, metrics.to, explicitBucketConfig, exponentialBucketConfig)
val gauges = metrics.gauges.filter(_.instruments.nonEmpty).map(converter.convertGauge)
val histograms = (metrics.histograms ++ metrics.timers ++ metrics.rangeSamplers).filter(_.instruments.nonEmpty)
.flatMap(converter.convertHistogram(histogramFormat))
val counters = metrics.counters.filter(_.instruments.nonEmpty).map(converter.convertCounter)

(gauges ++ histograms ++ counters).asJava
}

private val bases = (maxScale to minScale by -1).map(scale => (scale, Math.pow(2, Math.pow(2, -scale)))).toArray

def maxScale(maxBucketCount: Int)(v: JDouble): Int = {
if (v >= 1) bases.collectFirst { case (scale, base) if Math.pow(base, maxBucketCount) >= v => scale }.getOrElse(minScale)
else bases.collectFirst { case (scale, base) if Math.pow(base, -maxBucketCount) <= v => scale }.getOrElse(minScale)
}
}
Loading

0 comments on commit bac0985

Please sign in to comment.