diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index 90f88088630..9c5b6dc8a8b 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -16,6 +16,8 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" + "fmt" + "runtime" "strconv" "testing" @@ -24,6 +26,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/trace" ) var viewBenchmarks = []struct { @@ -369,3 +372,89 @@ func benchCollectAttrs(setup func(attribute.Set) Reader) func(*testing.B) { b.Run("Attributes/10", run(setup(attribute.NewSet(attrs...)))) } } + +func BenchmarkExemplars(b *testing.B) { + sc := trace.NewSpanContext(trace.SpanContextConfig{ + SpanID: trace.SpanID{01}, + TraceID: trace.TraceID{01}, + TraceFlags: trace.FlagsSampled, + }) + ctx := trace.ContextWithSpanContext(context.Background(), sc) + + attr := attribute.NewSet( + attribute.String("user", "Alice"), + attribute.Bool("admin", true), + ) + + setup := func(name string) (metric.Meter, Reader) { + r := NewManualReader() + v := NewView(Instrument{Name: "*"}, Stream{ + AttributeFilter: func(kv attribute.KeyValue) bool { + return kv.Key == attribute.Key("user") + }, + }) + mp := NewMeterProvider(WithReader(r), WithView(v)) + return mp.Meter(name), r + } + nCPU := runtime.NumCPU() + + b.Setenv("OTEL_GO_X_EXEMPLAR", "true") + + name := fmt.Sprintf("Int64Counter/%d", nCPU) + b.Run(name, func(b *testing.B) { + m, r := setup("Int64Counter") + i, err := m.Int64Counter("int64-counter") + assert.NoError(b, err) + + rm := newRM(metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + {Exemplars: make([]metricdata.Exemplar[int64], 0, nCPU)}, + }, + }) + e := &(rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Exemplars) + + b.ReportAllocs() + b.ResetTimer() + for n := 0; n < b.N; n++ { + for j := 0; j < 2*nCPU; j++ { + i.Add(ctx, 1, metric.WithAttributeSet(attr)) + } + + r.Collect(ctx, rm) + assert.Len(b, *e, nCPU) + } + }) + + name = fmt.Sprintf("Int64Histogram/%d", nCPU) + b.Run(name, func(b *testing.B) { + m, r := setup("Int64Counter") + i, err := m.Int64Histogram("int64-histogram") + assert.NoError(b, err) + + rm := newRM(metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + {Exemplars: make([]metricdata.Exemplar[int64], 0, 1)}, + }, + }) + e := &(rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[int64]).DataPoints[0].Exemplars) + + b.ReportAllocs() + b.ResetTimer() + for n := 0; n < b.N; n++ { + for j := 0; j < 2*nCPU; j++ { + i.Record(ctx, 1, metric.WithAttributeSet(attr)) + } + + r.Collect(ctx, rm) + assert.Len(b, *e, 1) + } + }) +} + +func newRM(a metricdata.Aggregation) *metricdata.ResourceMetrics { + return &metricdata.ResourceMetrics{ + ScopeMetrics: []metricdata.ScopeMetrics{ + {Metrics: []metricdata.Metrics{{Data: a}}}, + }, + } +} diff --git a/sdk/metric/exemplar.go b/sdk/metric/exemplar.go new file mode 100644 index 00000000000..7938f897198 --- /dev/null +++ b/sdk/metric/exemplar.go @@ -0,0 +1,96 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +import ( + "os" + "runtime" + + "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + "go.opentelemetry.io/otel/sdk/metric/internal/x" +) + +// reservoirFunc returns the appropriately configured exemplar reservoir +// creation func based on the passed InstrumentKind and user defined +// environment variables. +// +// Note: This will only return non-nil values when the experimental exemplar +// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable +// is not set to always_off. +func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.Reservoir[N] { + if !x.Exemplars.Enabled() { + return nil + } + + // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults + resF := func() func() exemplar.Reservoir[N] { + // Explicit bucket histogram aggregation with more than 1 bucket will + // use AlignedHistogramBucketExemplarReservoir. + a, ok := agg.(AggregationExplicitBucketHistogram) + if ok && len(a.Boundaries) > 1 { + cp := make([]float64, len(a.Boundaries)) + copy(cp, a.Boundaries) + return func() exemplar.Reservoir[N] { + bounds := cp + return exemplar.Histogram[N](bounds) + } + } + + var n int + if a, ok := agg.(AggregationBase2ExponentialHistogram); ok { + // Base2 Exponential Histogram Aggregation SHOULD use a + // SimpleFixedSizeExemplarReservoir with a reservoir equal to the + // smaller of the maximum number of buckets configured on the + // aggregation or twenty (e.g. min(20, max_buckets)). + n = int(a.MaxSize) + if n > 20 { + n = 20 + } + } else { + // https://github.com/open-telemetry/opentelemetry-specification/blob/e94af89e3d0c01de30127a0f423e912f6cda7bed/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir + // This Exemplar reservoir MAY take a configuration parameter for + // the size of the reservoir. If no size configuration is + // provided, the default size MAY be the number of possible + // concurrent threads (e.g. numer of CPUs) to help reduce + // contention. Otherwise, a default size of 1 SHOULD be used. + n = runtime.NumCPU() + if n < 1 { + // Should never be the case, but be defensive. + n = 1 + } + } + + return func() exemplar.Reservoir[N] { + return exemplar.FixedSize[N](n) + } + } + + // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar + const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER" + + switch os.Getenv(filterEnvKey) { + case "always_on": + return resF() + case "always_off": + return exemplar.Drop[N] + case "trace_based": + fallthrough + default: + newR := resF() + return func() exemplar.Reservoir[N] { + return exemplar.SampledFilter(newR()) + } + } +} diff --git a/sdk/metric/go.mod b/sdk/metric/go.mod index f41f6636be7..da93f6e68e5 100644 --- a/sdk/metric/go.mod +++ b/sdk/metric/go.mod @@ -9,12 +9,12 @@ require ( go.opentelemetry.io/otel v1.23.0-rc.1 go.opentelemetry.io/otel/metric v1.23.0-rc.1 go.opentelemetry.io/otel/sdk v1.23.0-rc.1 + go.opentelemetry.io/otel/trace v1.23.0-rc.1 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - go.opentelemetry.io/otel/trace v1.23.0-rc.1 // indirect golang.org/x/sys v0.16.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index c61f8513789..4060a2f76d3 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -19,6 +19,7 @@ import ( "time" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -44,6 +45,12 @@ type Builder[N int64 | float64] struct { // Filter is the attribute filter the aggregate function will use on the // input of measurements. Filter attribute.Filter + // ReservoirFunc is the factory function used by aggregate functions to + // create new exemplar reservoirs for a new seen attribute set. + // + // If this is not provided a default factory function that returns an + // exemplar.Drop reservoir will be used. + ReservoirFunc func() exemplar.Reservoir[N] // AggregationLimit is the cardinality limit of measurement attributes. Any // measurement for new attributes once the limit has been reached will be // aggregated into a single aggregate for the "otel.metric.overflow" @@ -54,15 +61,27 @@ type Builder[N int64 | float64] struct { AggregationLimit int } -func (b Builder[N]) filter(f Measure[N]) Measure[N] { +func (b Builder[N]) resFunc() func() exemplar.Reservoir[N] { + if b.ReservoirFunc != nil { + return b.ReservoirFunc + } + + return exemplar.Drop[N] +} + +type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) + +func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] { if b.Filter != nil { fltr := b.Filter // Copy to make it immutable after assignment. return func(ctx context.Context, n N, a attribute.Set) { - fAttr, _ := a.Filter(fltr) - f(ctx, n, fAttr) + fAttr, dropped := a.Filter(fltr) + f(ctx, n, fAttr, dropped) } } - return f + return func(ctx context.Context, n N, a attribute.Set) { + f(ctx, n, a, nil) + } } // LastValue returns a last-value aggregate function input and output. @@ -71,7 +90,7 @@ func (b Builder[N]) filter(f Measure[N]) Measure[N] { func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) { // Delta temporality is the only temporality that makes semantic sense for // a last-value aggregate. - lv := newLastValue[N](b.AggregationLimit) + lv := newLastValue[N](b.AggregationLimit, b.resFunc()) return b.filter(lv.measure), func(dest *metricdata.Aggregation) int { // Ignore if dest is not a metricdata.Gauge. The chance for memory @@ -87,7 +106,7 @@ func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) { // PrecomputedSum returns a sum aggregate function input and output. The // arguments passed to the input are expected to be the precomputed sum values. func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) { - s := newPrecomputedSum[N](monotonic, b.AggregationLimit) + s := newPrecomputedSum[N](monotonic, b.AggregationLimit, b.resFunc()) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(s.measure), s.delta @@ -98,7 +117,7 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati // Sum returns a sum aggregate function input and output. func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) { - s := newSum[N](monotonic, b.AggregationLimit) + s := newSum[N](monotonic, b.AggregationLimit, b.resFunc()) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(s.measure), s.delta @@ -110,7 +129,7 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) { // ExplicitBucketHistogram returns a histogram aggregate function input and // output. func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSum bool) (Measure[N], ComputeAggregation) { - h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit) + h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc()) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(h.measure), h.delta @@ -122,7 +141,7 @@ func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSu // ExponentialBucketHistogram returns a histogram aggregate function input and // output. func (b Builder[N]) ExponentialBucketHistogram(maxSize, maxScale int32, noMinMax, noSum bool) (Measure[N], ComputeAggregation) { - h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit) + h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit, b.resFunc()) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(h.measure), h.delta diff --git a/sdk/metric/internal/aggregate/aggregate_test.go b/sdk/metric/internal/aggregate/aggregate_test.go index 384ca51c8cf..be568f14c04 100644 --- a/sdk/metric/internal/aggregate/aggregate_test.go +++ b/sdk/metric/internal/aggregate/aggregate_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" ) @@ -59,6 +60,10 @@ var ( } ) +func dropExemplars[N int64 | float64]() exemplar.Reservoir[N] { + return exemplar.Drop[N]() +} + func TestBuilderFilter(t *testing.T) { t.Run("Int64", testBuilderFilter[int64]()) t.Run("Float64", testBuilderFilter[float64]()) @@ -69,20 +74,21 @@ func testBuilderFilter[N int64 | float64]() func(t *testing.T) { t.Helper() value, attr := N(1), alice - run := func(b Builder[N], wantA attribute.Set) func(*testing.T) { + run := func(b Builder[N], wantF attribute.Set, wantD []attribute.KeyValue) func(*testing.T) { return func(t *testing.T) { t.Helper() - meas := b.filter(func(_ context.Context, v N, a attribute.Set) { + meas := b.filter(func(_ context.Context, v N, f attribute.Set, d []attribute.KeyValue) { assert.Equal(t, value, v, "measured incorrect value") - assert.Equal(t, wantA, a, "measured incorrect attributes") + assert.Equal(t, wantF, f, "measured incorrect filtered attributes") + assert.ElementsMatch(t, wantD, d, "measured incorrect dropped attributes") }) meas(context.Background(), value, attr) } } - t.Run("NoFilter", run(Builder[N]{}, attr)) - t.Run("Filter", run(Builder[N]{Filter: attrFltr}, fltrAlice)) + t.Run("NoFilter", run(Builder[N]{}, attr, nil)) + t.Run("Filter", run(Builder[N]{Filter: attrFltr}, fltrAlice, []attribute.KeyValue{adminTrue})) } } diff --git a/sdk/metric/internal/aggregate/exponential_histogram.go b/sdk/metric/internal/aggregate/exponential_histogram.go index e9c25980aa2..0d1798d4185 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram.go +++ b/sdk/metric/internal/aggregate/exponential_histogram.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -40,6 +41,8 @@ const ( // expoHistogramDataPoint is a single data point in an exponential histogram. type expoHistogramDataPoint[N int64 | float64] struct { + res exemplar.Reservoir[N] + count uint64 min N max N @@ -288,13 +291,14 @@ func (b *expoBuckets) downscale(delta int) { // newExponentialHistogram returns an Aggregator that summarizes a set of // measurements as an exponential histogram. Each histogram is scoped by attributes // and the aggregation cycle the measurements were made in. -func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int) *expoHistogram[N] { +func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir[N]) *expoHistogram[N] { return &expoHistogram[N]{ noSum: noSum, noMinMax: noMinMax, maxSize: int(maxSize), maxScale: int(maxScale), + newRes: r, limit: newLimiter[*expoHistogramDataPoint[N]](limit), values: make(map[attribute.Set]*expoHistogramDataPoint[N]), @@ -310,6 +314,7 @@ type expoHistogram[N int64 | float64] struct { maxSize int maxScale int + newRes func() exemplar.Reservoir[N] limit limiter[*expoHistogramDataPoint[N]] values map[attribute.Set]*expoHistogramDataPoint[N] valuesMu sync.Mutex @@ -317,22 +322,27 @@ type expoHistogram[N int64 | float64] struct { start time.Time } -func (e *expoHistogram[N]) measure(_ context.Context, value N, attr attribute.Set) { +func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { // Ignore NaN and infinity. if math.IsInf(float64(value), 0) || math.IsNaN(float64(value)) { return } + t := now() + e.valuesMu.Lock() defer e.valuesMu.Unlock() - attr = e.limit.Attributes(attr, e.values) + attr := e.limit.Attributes(fltrAttr, e.values) v, ok := e.values[attr] if !ok { v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum) + v.res = e.newRes() + e.values[attr] = v } v.record(value) + v.res.Offer(ctx, t, value, droppedAttr) } func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int { @@ -374,6 +384,8 @@ func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int { hDPts[i].Max = metricdata.NewExtrema(b.max) } + b.res.Flush(&hDPts[i].Exemplars) + delete(e.values, a) i++ } @@ -422,6 +434,8 @@ func (e *expoHistogram[N]) cumulative(dest *metricdata.Aggregation) int { hDPts[i].Max = metricdata.NewExtrema(b.max) } + b.res.Collect(&hDPts[i].Exemplars) + i++ // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute diff --git a/sdk/metric/internal/aggregate/exponential_histogram_test.go b/sdk/metric/internal/aggregate/exponential_histogram_test.go index 40af402636c..3fed4897ee2 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram_test.go +++ b/sdk/metric/internal/aggregate/exponential_histogram_test.go @@ -183,9 +183,9 @@ func testExpoHistogramMinMaxSumInt64(t *testing.T) { restore := withHandler(t) defer restore() - h := newExponentialHistogram[int64](4, 20, false, false, 0) + h := newExponentialHistogram[int64](4, 20, false, false, 0, dropExemplars[int64]) for _, v := range tt.values { - h.measure(context.Background(), v, alice) + h.measure(context.Background(), v, alice, nil) } dp := h.values[alice] @@ -225,9 +225,9 @@ func testExpoHistogramMinMaxSumFloat64(t *testing.T) { restore := withHandler(t) defer restore() - h := newExponentialHistogram[float64](4, 20, false, false, 0) + h := newExponentialHistogram[float64](4, 20, false, false, 0, dropExemplars[float64]) for _, v := range tt.values { - h.measure(context.Background(), v, alice) + h.measure(context.Background(), v, alice, nil) } dp := h.values[alice] diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index 5d886360bae..0618639fb9e 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -21,10 +21,13 @@ import ( "time" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) type buckets[N int64 | float64] struct { + res exemplar.Reservoir[N] + counts []uint64 count uint64 total N @@ -54,12 +57,13 @@ type histValues[N int64 | float64] struct { noSum bool bounds []float64 + newRes func() exemplar.Reservoir[N] limit limiter[*buckets[N]] values map[attribute.Set]*buckets[N] valuesMu sync.Mutex } -func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int) *histValues[N] { +func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.Reservoir[N]) *histValues[N] { // The responsibility of keeping all buckets correctly associated with the // passed boundaries is ultimately this type's responsibility. Make a copy // here so we can always guarantee this. Or, in the case of failure, have @@ -70,6 +74,7 @@ func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int) * return &histValues[N]{ noSum: noSum, bounds: b, + newRes: r, limit: newLimiter[*buckets[N]](limit), values: make(map[attribute.Set]*buckets[N]), } @@ -77,7 +82,7 @@ func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int) * // Aggregate records the measurement value, scoped by attr, and aggregates it // into a histogram. -func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) { +func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { // This search will return an index in the range [0, len(s.bounds)], where // it will return len(s.bounds) if value is greater than the last element // of s.bounds. This aligns with the buckets in that the length of buckets @@ -85,10 +90,12 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) // (s.bounds[len(s.bounds)-1], +∞). idx := sort.SearchFloat64s(s.bounds, float64(value)) + t := now() + s.valuesMu.Lock() defer s.valuesMu.Unlock() - attr = s.limit.Attributes(attr, s.values) + attr := s.limit.Attributes(fltrAttr, s.values) b, ok := s.values[attr] if !ok { // N+1 buckets. For example: @@ -99,6 +106,8 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) // // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) b = newBuckets[N](len(s.bounds) + 1) + b.res = s.newRes() + // Ensure min and max are recorded values (not zero), for new buckets. b.min, b.max = value, value s.values[attr] = b @@ -107,13 +116,14 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) if !s.noSum { b.sum(value) } + b.res.Offer(ctx, t, value, droppedAttr) } // newHistogram returns an Aggregator that summarizes a set of measurements as // an histogram. -func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int) *histogram[N] { +func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir[N]) *histogram[N] { return &histogram[N]{ - histValues: newHistValues[N](boundaries, noSum, limit), + histValues: newHistValues[N](boundaries, noSum, limit, r), noMinMax: noMinMax, start: now(), } @@ -164,6 +174,8 @@ func (s *histogram[N]) delta(dest *metricdata.Aggregation) int { hDPts[i].Max = metricdata.NewExtrema(b.max) } + b.res.Flush(&hDPts[i].Exemplars) + // Unused attribute sets do not report. delete(s.values, a) i++ @@ -220,6 +232,9 @@ func (s *histogram[N]) cumulative(dest *metricdata.Aggregation) int { hDPts[i].Min = metricdata.NewExtrema(b.min) hDPts[i].Max = metricdata.NewExtrema(b.max) } + + b.res.Collect(&hDPts[i].Exemplars) + i++ // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index e51e9fc8f29..bd88b083492 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -313,13 +313,13 @@ func TestHistogramImmutableBounds(t *testing.T) { cpB := make([]float64, len(b)) copy(cpB, b) - h := newHistogram[int64](b, false, false, 0) + h := newHistogram[int64](b, false, false, 0, dropExemplars[int64]) require.Equal(t, cpB, h.bounds) b[0] = 10 assert.Equal(t, cpB, h.bounds, "modifying the bounds argument should not change the bounds") - h.measure(context.Background(), 5, alice) + h.measure(context.Background(), 5, alice, nil) var data metricdata.Aggregation = metricdata.Histogram[int64]{} h.cumulative(&data) @@ -329,8 +329,8 @@ func TestHistogramImmutableBounds(t *testing.T) { } func TestCumulativeHistogramImutableCounts(t *testing.T) { - h := newHistogram[int64](bounds, noMinMax, false, 0) - h.measure(context.Background(), 5, alice) + h := newHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64]) + h.measure(context.Background(), 5, alice, nil) var data metricdata.Aggregation = metricdata.Histogram[int64]{} h.cumulative(&data) @@ -347,13 +347,13 @@ func TestCumulativeHistogramImutableCounts(t *testing.T) { func TestDeltaHistogramReset(t *testing.T) { t.Cleanup(mockTime(now)) - h := newHistogram[int64](bounds, noMinMax, false, 0) + h := newHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64]) var data metricdata.Aggregation = metricdata.Histogram[int64]{} require.Equal(t, 0, h.delta(&data)) require.Len(t, data.(metricdata.Histogram[int64]).DataPoints, 0) - h.measure(context.Background(), 1, alice) + h.measure(context.Background(), 1, alice, nil) expect := metricdata.Histogram[int64]{Temporality: metricdata.DeltaTemporality} expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](alice, 1, 1)} @@ -366,7 +366,7 @@ func TestDeltaHistogramReset(t *testing.T) { assert.Len(t, data.(metricdata.Histogram[int64]).DataPoints, 0) // Aggregating another set should not affect the original (alice). - h.measure(context.Background(), 1, bob) + h.measure(context.Background(), 1, bob, nil) expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](bob, 1, 1)} h.delta(&data) metricdatatest.AssertAggregationsEqual(t, expect, data) diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index b79e80a0c8d..5d3c27a376c 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -20,6 +20,7 @@ import ( "time" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -27,10 +28,12 @@ import ( type datapoint[N int64 | float64] struct { timestamp time.Time value N + res exemplar.Reservoir[N] } -func newLastValue[N int64 | float64](limit int) *lastValue[N] { +func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir[N]) *lastValue[N] { return &lastValue[N]{ + newRes: r, limit: newLimiter[datapoint[N]](limit), values: make(map[attribute.Set]datapoint[N]), } @@ -40,16 +43,28 @@ func newLastValue[N int64 | float64](limit int) *lastValue[N] { type lastValue[N int64 | float64] struct { sync.Mutex + newRes func() exemplar.Reservoir[N] limit limiter[datapoint[N]] values map[attribute.Set]datapoint[N] } -func (s *lastValue[N]) measure(ctx context.Context, value N, attr attribute.Set) { - d := datapoint[N]{timestamp: now(), value: value} +func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { + t := now() + s.Lock() - attr = s.limit.Attributes(attr, s.values) + defer s.Unlock() + + attr := s.limit.Attributes(fltrAttr, s.values) + d, ok := s.values[attr] + if !ok { + d.res = s.newRes() + } + + d.timestamp = t + d.value = value + d.res.Offer(ctx, t, value, droppedAttr) + s.values[attr] = d - s.Unlock() } func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) { @@ -66,6 +81,7 @@ func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) { // ignored. (*dest)[i].Time = v.timestamp (*dest)[i].Value = v.value + v.res.Flush(&(*dest)[i].Exemplars) // Do not report stale values. delete(s.values, a) i++ diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index a0d26e1ddb9..35446e6a5e5 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -20,36 +20,55 @@ import ( "time" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) +type sumValue[N int64 | float64] struct { + n N + res exemplar.Reservoir[N] +} + // valueMap is the storage for sums. type valueMap[N int64 | float64] struct { sync.Mutex - limit limiter[N] - values map[attribute.Set]N + newRes func() exemplar.Reservoir[N] + limit limiter[sumValue[N]] + values map[attribute.Set]sumValue[N] } -func newValueMap[N int64 | float64](limit int) *valueMap[N] { +func newValueMap[N int64 | float64](limit int, r func() exemplar.Reservoir[N]) *valueMap[N] { return &valueMap[N]{ - limit: newLimiter[N](limit), - values: make(map[attribute.Set]N), + newRes: r, + limit: newLimiter[sumValue[N]](limit), + values: make(map[attribute.Set]sumValue[N]), } } -func (s *valueMap[N]) measure(_ context.Context, value N, attr attribute.Set) { +func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { + t := now() + s.Lock() - attr = s.limit.Attributes(attr, s.values) - s.values[attr] += value - s.Unlock() + defer s.Unlock() + + attr := s.limit.Attributes(fltrAttr, s.values) + v, ok := s.values[attr] + if !ok { + v.res = s.newRes() + } + + v.n += value + v.res.Offer(ctx, t, value, droppedAttr) + + s.values[attr] = v } // newSum returns an aggregator that summarizes a set of measurements as their // arithmetic sum. Each sum is scoped by attributes and the aggregation cycle // the measurements were made in. -func newSum[N int64 | float64](monotonic bool, limit int) *sum[N] { +func newSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.Reservoir[N]) *sum[N] { return &sum[N]{ - valueMap: newValueMap[N](limit), + valueMap: newValueMap[N](limit, r), monotonic: monotonic, start: now(), } @@ -79,11 +98,12 @@ func (s *sum[N]) delta(dest *metricdata.Aggregation) int { dPts := reset(sData.DataPoints, n, n) var i int - for attr, value := range s.values { + for attr, val := range s.values { dPts[i].Attributes = attr dPts[i].StartTime = s.start dPts[i].Time = t - dPts[i].Value = value + dPts[i].Value = val.n + val.res.Flush(&dPts[i].Exemplars) // Do not report stale values. delete(s.values, attr) i++ @@ -117,7 +137,8 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { dPts[i].Attributes = attr dPts[i].StartTime = s.start dPts[i].Time = t - dPts[i].Value = value + dPts[i].Value = value.n + value.res.Collect(&dPts[i].Exemplars) // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not @@ -134,9 +155,9 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { // newPrecomputedSum returns an aggregator that summarizes a set of // observatrions as their arithmetic sum. Each sum is scoped by attributes and // the aggregation cycle the measurements were made in. -func newPrecomputedSum[N int64 | float64](monotonic bool, limit int) *precomputedSum[N] { +func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.Reservoir[N]) *precomputedSum[N] { return &precomputedSum[N]{ - valueMap: newValueMap[N](limit), + valueMap: newValueMap[N](limit, r), monotonic: monotonic, start: now(), } @@ -170,14 +191,15 @@ func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int { var i int for attr, value := range s.values { - delta := value - s.reported[attr] + delta := value.n - s.reported[attr] dPts[i].Attributes = attr dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = delta + value.res.Flush(&dPts[i].Exemplars) - newReported[attr] = value + newReported[attr] = value.n // Unused attribute sets do not report. delete(s.values, attr) i++ @@ -209,11 +231,12 @@ func (s *precomputedSum[N]) cumulative(dest *metricdata.Aggregation) int { dPts := reset(sData.DataPoints, n, n) var i int - for attr, value := range s.values { + for attr, val := range s.values { dPts[i].Attributes = attr dPts[i].StartTime = s.start dPts[i].Time = t - dPts[i].Value = value + dPts[i].Value = val.n + val.res.Collect(&dPts[i].Exemplars) // Unused attribute sets do not report. delete(s.values, attr) diff --git a/sdk/metric/internal/exemplar/doc.go b/sdk/metric/internal/exemplar/doc.go new file mode 100644 index 00000000000..3caeb542c57 --- /dev/null +++ b/sdk/metric/internal/exemplar/doc.go @@ -0,0 +1,17 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package exemplar provides an implementation of the OpenTelemetry exemplar +// reservoir to be used in metric collection pipelines. +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" diff --git a/sdk/metric/internal/exemplar/drop.go b/sdk/metric/internal/exemplar/drop.go new file mode 100644 index 00000000000..62a95b55f9a --- /dev/null +++ b/sdk/metric/internal/exemplar/drop.go @@ -0,0 +1,41 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +// Drop returns a [Reservoir] that drops all measurements it is offered. +func Drop[N int64 | float64]() Reservoir[N] { return &dropRes[N]{} } + +type dropRes[N int64 | float64] struct{} + +// Offer does nothing, all measurements offered will be dropped. +func (r *dropRes[N]) Offer(context.Context, time.Time, N, []attribute.KeyValue) {} + +// Collect resets dest. No exemplars will ever be returned. +func (r *dropRes[N]) Collect(dest *[]metricdata.Exemplar[N]) { + *dest = (*dest)[:0] +} + +// Flush resets dest. No exemplars will ever be returned. +func (r *dropRes[N]) Flush(dest *[]metricdata.Exemplar[N]) { + *dest = (*dest)[:0] +} diff --git a/sdk/metric/internal/exemplar/drop_test.go b/sdk/metric/internal/exemplar/drop_test.go new file mode 100644 index 00000000000..5b02bf09437 --- /dev/null +++ b/sdk/metric/internal/exemplar/drop_test.go @@ -0,0 +1,29 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exemplar + +import ( + "testing" +) + +func TestDrop(t *testing.T) { + t.Run("Int64", ReservoirTest[int64](func(int) (Reservoir[int64], int) { + return Drop[int64](), 0 + })) + + t.Run("Float64", ReservoirTest[float64](func(int) (Reservoir[float64], int) { + return Drop[float64](), 0 + })) +} diff --git a/sdk/metric/internal/exemplar/filter.go b/sdk/metric/internal/exemplar/filter.go new file mode 100644 index 00000000000..4f5946fb966 --- /dev/null +++ b/sdk/metric/internal/exemplar/filter.go @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// SampledFilter returns a [Reservoir] wrapping r that will only offer measurements +// to r if the passed context associated with the measurement contains a sampled +// [go.opentelemetry.io/otel/trace.SpanContext]. +func SampledFilter[N int64 | float64](r Reservoir[N]) Reservoir[N] { + return filtered[N]{Reservoir: r} +} + +type filtered[N int64 | float64] struct { + Reservoir[N] +} + +func (f filtered[N]) Offer(ctx context.Context, t time.Time, n N, a []attribute.KeyValue) { + if trace.SpanContextFromContext(ctx).IsSampled() { + f.Reservoir.Offer(ctx, t, n, a) + } +} diff --git a/sdk/metric/internal/exemplar/filter_test.go b/sdk/metric/internal/exemplar/filter_test.go new file mode 100644 index 00000000000..ae1e276cb83 --- /dev/null +++ b/sdk/metric/internal/exemplar/filter_test.go @@ -0,0 +1,77 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/trace" +) + +func TestSampledFilter(t *testing.T) { + t.Run("Int64", testSampledFiltered[int64]) + t.Run("Float64", testSampledFiltered[float64]) +} + +func testSampledFiltered[N int64 | float64](t *testing.T) { + under := &res[N]{} + + r := SampledFilter[N](under) + + ctx := context.Background() + r.Offer(ctx, staticTime, 0, nil) + assert.False(t, under.OfferCalled, "underlying Reservoir Offer called") + r.Offer(sample(ctx), staticTime, 0, nil) + assert.True(t, under.OfferCalled, "underlying Reservoir Offer not called") + + r.Collect(nil) + assert.True(t, under.CollectCalled, "underlying Reservoir Collect not called") + + r.Flush(nil) + assert.True(t, under.FlushCalled, "underlying Reservoir Flush not called") +} + +func sample(parent context.Context) context.Context { + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: trace.TraceID{0x01}, + SpanID: trace.SpanID{0x01}, + TraceFlags: trace.FlagsSampled, + }) + return trace.ContextWithSpanContext(parent, sc) +} + +type res[N int64 | float64] struct { + OfferCalled bool + CollectCalled bool + FlushCalled bool +} + +func (r *res[N]) Offer(context.Context, time.Time, N, []attribute.KeyValue) { + r.OfferCalled = true +} + +func (r *res[N]) Collect(*[]metricdata.Exemplar[N]) { + r.CollectCalled = true +} + +func (r *res[N]) Flush(*[]metricdata.Exemplar[N]) { + r.FlushCalled = true +} diff --git a/sdk/metric/internal/exemplar/fixed.go b/sdk/metric/internal/exemplar/fixed.go new file mode 100644 index 00000000000..52c4ba13b74 --- /dev/null +++ b/sdk/metric/internal/exemplar/fixed.go @@ -0,0 +1,133 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/trace" +) + +// storage is an exemplar storage for [Reservoir] implementations. +type storage[N int64 | float64] struct { + // store are the measurements sampled. + // + // This does not use []metricdata.Exemplar because it potentially would + // require an allocation for trace and span IDs in the hot path of Offer. + store []measurement[N] +} + +func newStorage[N int64 | float64](n int) *storage[N] { + return &storage[N]{store: make([]measurement[N], n)} +} + +// Collect returns all the held exemplars. +// +// The Reservoir state is preserved after this call. See Flush to +// copy-and-clear instead. +func (r *storage[N]) Collect(dest *[]metricdata.Exemplar[N]) { + *dest = reset(*dest, len(r.store), len(r.store)) + var n int + for _, m := range r.store { + if !m.Valid() { + continue + } + + m.Exemplar(&(*dest)[n]) + n++ + } + *dest = (*dest)[:n] +} + +// Flush returns all the held exemplars. +// +// The Reservoir state is reset after this call. See Collect to preserve the +// state instead. +func (r *storage[N]) Flush(dest *[]metricdata.Exemplar[N]) { + *dest = reset(*dest, len(r.store), len(r.store)) + var n int + for i, m := range r.store { + if !m.Valid() { + continue + } + + m.Exemplar(&(*dest)[n]) + n++ + + // Reset. + r.store[i] = measurement[N]{} + } + *dest = (*dest)[:n] +} + +// measurement is a measurement made by a telemetry system. +type measurement[N int64 | float64] struct { + // FilteredAttributes are the attributes dropped during the measurement. + FilteredAttributes []attribute.KeyValue + // Time is the time when the measurement was made. + Time time.Time + // Value is the value of the measurement. + Value N + // SpanContext is the SpanContext active when a measurement was made. + SpanContext trace.SpanContext + + valid bool +} + +// newMeasurement returns a new non-empty Measurement. +func newMeasurement[N int64 | float64](ctx context.Context, ts time.Time, v N, droppedAttr []attribute.KeyValue) measurement[N] { + return measurement[N]{ + FilteredAttributes: droppedAttr, + Time: ts, + Value: v, + SpanContext: trace.SpanContextFromContext(ctx), + valid: true, + } +} + +// Valid returns true if m represents a measurement made by a telemetry +// system (created with newMeasurement), otherwise it returns false. +func (m measurement[N]) Valid() bool { return m.valid } + +// Exemplar returns m as a [metricdata.Exemplar]. +func (m measurement[N]) Exemplar(dest *metricdata.Exemplar[N]) { + dest.FilteredAttributes = m.FilteredAttributes + dest.Time = m.Time + dest.Value = m.Value + + if m.SpanContext.HasTraceID() { + traceID := m.SpanContext.TraceID() + dest.TraceID = traceID[:] + } else { + dest.TraceID = dest.TraceID[:0] + } + + if m.SpanContext.HasSpanID() { + spanID := m.SpanContext.SpanID() + dest.SpanID = spanID[:] + } else { + dest.SpanID = dest.SpanID[:0] + } +} + +func reset[T any](s []T, length, capacity int) []T { + if cap(s) < capacity { + return make([]T, length, capacity) + } + return s[:length] +} diff --git a/sdk/metric/internal/exemplar/hist.go b/sdk/metric/internal/exemplar/hist.go new file mode 100644 index 00000000000..4761cdbd18f --- /dev/null +++ b/sdk/metric/internal/exemplar/hist.go @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + +import ( + "context" + "sort" + "time" + + "go.opentelemetry.io/otel/attribute" +) + +// Histogram returns a [Reservoir] that samples the last measurement that falls +// within a histogram bucket. The histogram bucket upper-boundaries are define +// by bounds. +// +// The passed bounds will be sorted by this function. +func Histogram[N int64 | float64](bounds []float64) Reservoir[N] { + sort.Float64s(bounds) + return &histRes[N]{ + bounds: bounds, + fixedRes: newStorage[N](len(bounds) + 1), + } +} + +type histRes[N int64 | float64] struct { + *storage[N] + + // bounds are bucket bounds in ascending order. + bounds []float64 +} + +func (r *histRes[N]) Offer(ctx context.Context, t time.Time, n N, a []attribute.KeyValue) { + r.store[sort.SearchFloat64s(r.bounds, float64(n))] = newMeasurement(ctx, t, n, a) +} diff --git a/sdk/metric/internal/exemplar/hist_test.go b/sdk/metric/internal/exemplar/hist_test.go new file mode 100644 index 00000000000..c85694db78c --- /dev/null +++ b/sdk/metric/internal/exemplar/hist_test.go @@ -0,0 +1,28 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exemplar + +import "testing" + +func TestHist(t *testing.T) { + bounds := []float64{0, 100} + t.Run("Int64", ReservoirTest[int64](func(int) (Reservoir[int64], int) { + return Histogram[int64](bounds), len(bounds) + })) + + t.Run("Float64", ReservoirTest[float64](func(int) (Reservoir[float64], int) { + return Histogram[float64](bounds), len(bounds) + })) +} diff --git a/sdk/metric/internal/exemplar/rand.go b/sdk/metric/internal/exemplar/rand.go new file mode 100644 index 00000000000..5195397aa24 --- /dev/null +++ b/sdk/metric/internal/exemplar/rand.go @@ -0,0 +1,106 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + +import ( + "context" + "math" + "math/rand" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +// rng is used to make sampling decisions. +// +// Do not use crypto/rand. There is no reason for the decrease in performance +// given this is not a security sensitive decision. +var rng = rand.New(rand.NewSource(time.Now().UnixNano())) + +// FixedSize returns a [Reservoir] that samples at most n exemplars. If there +// are n or less measurements made, the Reservoir will sample each one. If +// there are more than n, the Reservoir will then randomly sample all +// additional measurement with a decreasing probability. +func FixedSize[N int64 | float64](n int) Reservoir[N] { + r := &randRes[N]{fixedRes: newStorage[N](n)} + r.reset() + return r +} + +type randRes[N int64 | float64] struct { + *storage[N] + + // count is the number of measurement seen. + count int64 + // next is the next count that will store a measurement at a randon index + // once the reservoir has been filled. + next int64 + // w is the largest random number in a distribution that is used to compute + // the next next. + w float64 +} + +func (r *randRes[N]) Offer(ctx context.Context, t time.Time, n N, a []attribute.KeyValue) { + // The following algorithm is "Algorithm L" from Li, Kim-Hung (4 December + // 1994). "Reservoir-Sampling Algorithms of Time Complexity + // O(n(1+log(N/n)))". ACM Transactions on Mathematical Software. 20 (4): + // 481–493 (https://dl.acm.org/doi/10.1145/198429.198435). + // + // It is used because of its balance of simplicity and performance. In + // particular it has an asymptotic runtime of O(k(1 + log(n/k)) where n is + // the number of measurements offered and k is the reservoir size. This is + // much more optimal for large measurement sets than the algorithm + // recommended by the OTel spcification ("Algorithm R" as described in + // Vitter, Jeffrey S. (1 March 1985). "Random sampling with a reservoir" + // (http://www.cs.umd.edu/~samir/498/vitter.pdf)) which has an asymptotic + // runtime of O(n). + // + // See https://github.com/MrAlias/reservoir-sampling for a comparison of + // reservoir sampling algorithms (including performance benchmarks). + + if int(r.count) < cap(r.store) { + r.store[r.count] = newMeasurement(ctx, t, n, a) + } else { + if r.count == r.next { + idx := int(rng.Int63n(int64(cap(r.store)))) + r.store[idx] = newMeasurement(ctx, t, n, a) + r.advance() + } + } + r.count++ +} + +func (r *randRes[N]) reset() { + r.count = 0 + r.next = int64(cap(r.store)) + r.w = math.Exp(math.Log(rng.Float64()) / float64(cap(r.store))) + r.advance() +} + +func (r *randRes[N]) advance() { + r.w *= math.Exp(math.Log(rng.Float64()) / float64(cap(r.store))) + r.next += int64(math.Log(rng.Float64())/math.Log(1-r.w)) + 1 +} + +func (r *randRes[N]) Collect(dest *[]metricdata.Exemplar[N]) { + r.fixedRes.Collect(dest) + r.reset() +} + +func (r *randRes[N]) Flush(dest *[]metricdata.Exemplar[N]) { + r.fixedRes.Flush(dest) + r.reset() +} diff --git a/sdk/metric/internal/exemplar/rand_test.go b/sdk/metric/internal/exemplar/rand_test.go new file mode 100644 index 00000000000..775679db547 --- /dev/null +++ b/sdk/metric/internal/exemplar/rand_test.go @@ -0,0 +1,59 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exemplar + +import ( + "context" + "math" + "sort" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFixedSize(t *testing.T) { + t.Run("Int64", ReservoirTest[int64](func(n int) (Reservoir[int64], int) { + return FixedSize[int64](n), n + })) + + t.Run("Float64", ReservoirTest[float64](func(n int) (Reservoir[float64], int) { + return FixedSize[float64](n), n + })) +} + +func TestFixedSizeSamplingCorrectness(t *testing.T) { + intensity := 0.1 + sampleSize := 1000 + + data := make([]float64, sampleSize*1000) + for i := range data { + data[i] = (-1.0 / intensity) * math.Log(rng.Float64()) + } + // Sort to avoid position bias. + sort.Float64s(data) + + r := FixedSize[float64](sampleSize) + for _, value := range data { + r.Offer(context.Background(), staticTime, value, nil) + } + + var sum float64 + for _, m := range r.(*randRes[float64]).store { + sum += m.Value + } + mean := sum / float64(sampleSize) + + assert.InDelta(t, 1/mean, intensity, 0.01) +} diff --git a/sdk/metric/internal/exemplar/reservoir.go b/sdk/metric/internal/exemplar/reservoir.go new file mode 100644 index 00000000000..b3654414ee7 --- /dev/null +++ b/sdk/metric/internal/exemplar/reservoir.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +// Reservoir holds the sampled exemplar of measurements made. +type Reservoir[N int64 | float64] interface { + // Offer accepts the parameters associated with a measurement. The + // parameters will be stored as an exemplar if the Reservoir decides to + // sample the measurement. + // + // The passed ctx needs to contain any baggage or span that were active + // when the measurement was made. This information may be used by the + // Reservoir in making a sampling decision. + // + // The time t is the time when the measurement was made. The val and attr + // parameters are the value and dropped (filtered) attributes of the + // measurement respectively. + Offer(ctx context.Context, t time.Time, val N, attr []attribute.KeyValue) + + // Collect returns all the held exemplars. + // + // The Reservoir state is preserved after this call. See Flush to + // copy-and-clear instead. + Collect(dest *[]metricdata.Exemplar[N]) + + // Flush returns all the held exemplars. + // + // The Reservoir state is reset after this call. See Collect to preserve + // the state instead. + Flush(dest *[]metricdata.Exemplar[N]) +} diff --git a/sdk/metric/internal/exemplar/reservoir_test.go b/sdk/metric/internal/exemplar/reservoir_test.go new file mode 100644 index 00000000000..9ef8e964538 --- /dev/null +++ b/sdk/metric/internal/exemplar/reservoir_test.go @@ -0,0 +1,180 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exemplar + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/trace" +) + +// Sat Jan 01 2000 00:00:00 GMT+0000. +var staticTime = time.Unix(946684800, 0) + +type factory[N int64 | float64] func(requstedCap int) (r Reservoir[N], actualCap int) + +func ReservoirTest[N int64 | float64](f factory[N]) func(*testing.T) { + return func(t *testing.T) { + t.Helper() + + ctx := context.Background() + + t.Run("CaptureSpanContext", func(t *testing.T) { + t.Helper() + + r, n := f(1) + if n < 1 { + t.Skip("skipping, reservoir capacity less than 1:", n) + } + + tID, sID := trace.TraceID{0x01}, trace.SpanID{0x01} + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: tID, + SpanID: sID, + TraceFlags: trace.FlagsSampled, + }) + ctx := trace.ContextWithSpanContext(ctx, sc) + + r.Offer(ctx, staticTime, 10, nil) + + var dest []metricdata.Exemplar[N] + r.Collect(&dest) + + want := metricdata.Exemplar[N]{ + Time: staticTime, + Value: 10, + SpanID: []byte(sID[:]), + TraceID: []byte(tID[:]), + } + require.Len(t, dest, 1, "number of collected exemplars") + assert.Equal(t, want, dest[0]) + }) + + t.Run("FilterAttributes", func(t *testing.T) { + t.Helper() + + r, n := f(1) + if n < 1 { + t.Skip("skipping, reservoir capacity less than 1:", n) + } + + adminTrue := attribute.Bool("admin", true) + r.Offer(ctx, staticTime, 10, []attribute.KeyValue{adminTrue}) + + var dest []metricdata.Exemplar[N] + r.Collect(&dest) + + want := metricdata.Exemplar[N]{ + FilteredAttributes: []attribute.KeyValue{adminTrue}, + Time: staticTime, + Value: 10, + } + require.Len(t, dest, 1, "number of collected exemplars") + assert.Equal(t, want, dest[0]) + }) + + t.Run("CollectDoesNotFlush", func(t *testing.T) { + t.Helper() + + r, n := f(1) + if n < 1 { + t.Skip("skipping, reservoir capacity less than 1:", n) + } + + r.Offer(ctx, staticTime, 10, nil) + + var dest []metricdata.Exemplar[N] + r.Collect(&dest) + require.Len(t, dest, 1, "number of collected exemplars") + + dest = dest[:0] + r.Collect(&dest) + assert.Len(t, dest, 1, "Collect flushed reservoir") + }) + + t.Run("FlushFlushes", func(t *testing.T) { + t.Helper() + + r, n := f(1) + if n < 1 { + t.Skip("skipping, reservoir capacity less than 1:", n) + } + + r.Offer(ctx, staticTime, 10, nil) + + var dest []metricdata.Exemplar[N] + r.Flush(&dest) + require.Len(t, dest, 1, "number of flushed exemplars") + + r.Flush(&dest) + assert.Len(t, dest, 0, "Flush did not flush reservoir") + }) + + t.Run("MultipleOffers", func(t *testing.T) { + t.Helper() + + r, n := f(3) + if n < 1 { + t.Skip("skipping, reservoir capacity less than 1:", n) + } + + for i := 0; i < n+1; i++ { + v := N(i) + r.Offer(ctx, staticTime, v, nil) + } + + var dest []metricdata.Exemplar[N] + r.Flush(&dest) + assert.Len(t, dest, n, "multiple offers did not fill reservoir") + + // Ensure the flush reset also resets any couting state. + for i := 0; i < n+1; i++ { + v := N(2 * i) + r.Offer(ctx, staticTime, v, nil) + } + + dest = dest[:0] + r.Flush(&dest) + assert.Len(t, dest, n, "internal count state not reset") + }) + + t.Run("DropAll", func(t *testing.T) { + t.Helper() + + r, n := f(0) + if n > 0 { + t.Skip("skipping, reservoir capacity greater than 0:", n) + } + + r.Offer(context.Background(), staticTime, 10, nil) + + dest := []metricdata.Exemplar[N]{{}} // Should be reset to empty. + r.Collect(&dest) + assert.Len(t, dest, 0, "no exemplars should be collected") + + r.Offer(context.Background(), staticTime, 10, nil) + dest = []metricdata.Exemplar[N]{{}} // Should be reset to empty. + r.Flush(&dest) + assert.Len(t, dest, 0, "no exemplars should be flushed") + }) + } +} diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 47d9fe07ada..da39ab961c1 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -359,7 +359,8 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum normID := id.normalize() cv := i.aggregators.Lookup(normID, func() aggVal[N] { b := aggregate.Builder[N]{ - Temporality: i.pipeline.reader.temporality(kind), + Temporality: i.pipeline.reader.temporality(kind), + ReservoirFunc: reservoirFunc[N](stream.Aggregation), } b.Filter = stream.AttributeFilter // A value less than or equal to zero will disable the aggregation