Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
593a45b
Add internal/exemplar pkg
MrAlias Aug 16, 2023
c37db57
Update internal/aggregate to support exemplars
MrAlias Aug 16, 2023
becb3a0
Add experimental feature functionality
MrAlias Aug 16, 2023
141afc0
Update metric SDK to support exemplars
MrAlias Aug 16, 2023
778c62e
Add exemplar pkg tests
MrAlias Aug 17, 2023
99f6b14
Use Algorithm L for rand res
MrAlias Aug 18, 2023
0486576
Move experimental support to internal pkg
MrAlias Aug 18, 2023
c423127
Merge branch 'main' into exemplar-x
MrAlias Dec 15, 2023
1d571d5
Update default fixed bucket exemplar sizes
MrAlias Dec 15, 2023
b997bd3
Merge branch 'main' into exemplar-x
MrAlias Dec 15, 2023
ac0cf70
Merge branch 'main' into exemplar-x
MrAlias Jan 17, 2024
ebe1807
Revert name changes unrelated to PR
MrAlias Jan 17, 2024
e694fb8
Document Offer params in Reservoir iface
MrAlias Jan 17, 2024
623ed1b
Add end-to-end benchmark for rand and hist
MrAlias Jan 17, 2024
6a94b22
Move to recording dropped at Offer
MrAlias Jan 17, 2024
c4d8381
Remove the unneeded exemplar Filter
MrAlias Jan 19, 2024
0741f43
Merge branch 'main' into exemplar-x
MrAlias Jan 19, 2024
f77205f
Comment drop
MrAlias Jan 19, 2024
11bace0
Reorder filter_test.go
MrAlias Jan 19, 2024
d2171e7
Rename measurement.Empty to Valid
MrAlias Jan 19, 2024
27bc3e7
Doc fixed.go
MrAlias Jan 19, 2024
b044f34
Doc rand.go
MrAlias Jan 19, 2024
b729954
Fix doc for Reservoir
MrAlias Jan 19, 2024
6d2dfaa
Move adminTrue to only use scope
MrAlias Jan 19, 2024
3b4a1b5
Add internal/exemplar/doc.go
MrAlias Jan 19, 2024
4460389
Add link to res sampling comparison repo
MrAlias Jan 19, 2024
925675f
Rename fixedRes to storage
MrAlias Jan 19, 2024
bb1c495
Rename testReservoir to ReservoirTest
MrAlias Jan 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"os"

"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/x"
)

// reservoirFunc returns the appropriately configured exemplar reservoir
// creation func based on the passed InstrumentKind and user defined
// environment variables.
//
// Note: This will only return non-nil values when the experimental exemplar
// feature is enabled.
func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.Reservoir[N] {
if !x.Enabled(x.Exemplars) {
return nil
}

// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar
const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER"
Copy link
Copy Markdown

@jsuereth jsuereth Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI - #2421 - I think we're going to recommend that the filter is a configuration setting on the SDK (likely meter provider).

IIUC the architecture, this would still be feasible in Go?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me, but we will have to wait for exemplars to become stable to add it to the stable API interface of the SDK.

I was talking with @dashpole and I was also wondering if we even need exemplar filters to start here. Might be something to consider at the spec level.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open-telemetry/opentelemetry-specification#3820 <- The interface no longer needs to be exposed to users, but the configuration needs to be SDK-wide.


var fltr exemplar.Filter[N]
switch os.Getenv(filterEnvKey) {
case "always_on":
fltr = exemplar.AlwaysSample[N]
case "always_off":
fltr = exemplar.NeverSample[N]
case "trace_based":
fallthrough
default:
fltr = exemplar.TraceBasedSample[N]
}

// TODO: This is not defined by the specification, nor is the mechanism to
// configure it.
const defaultFixedSize = 1
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI - I'm planning to update the specification here witha MAY. 1 is the fallback default, but we found in Java that a simple optimisation is to set this = the number of available CPUs for the runtime (if that's easy to grab). It can dramatically reduce contention on writing exemplars in extreme cases for not a huge investment of memory.

I'm not sure if Go suffers from the same contention issues, but might be worth benchmarking.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that's interesting. I need to look into it.

My guess is this would also benefit in using that value (which is pretty accessible).


// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
resF := func() func() exemplar.Reservoir[N] {
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 1 {
cp := make([]float64, len(a.Boundaries))
copy(cp, a.Boundaries)
return func() exemplar.Reservoir[N] {
bounds := cp
return exemplar.Histogram[N](bounds)
}
}

return func() exemplar.Reservoir[N] {
return exemplar.FixedSize[N](defaultFixedSize)
}
}()

return func() exemplar.Reservoir[N] {
return exemplar.Filtered[N](resF(), fltr)
}
}
2 changes: 1 addition & 1 deletion sdk/metric/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ require (
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/metric v1.16.0
go.opentelemetry.io/otel/sdk v1.16.0
go.opentelemetry.io/otel/trace v1.16.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
golang.org/x/sys v0.11.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
35 changes: 27 additions & 8 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand All @@ -44,17 +45,35 @@ type Builder[N int64 | float64] struct {
// Filter is the attribute filter the aggregate function will use on the
// input of measurements.
Filter attribute.Filter
// ReservoirFunc is the factory function used by aggregate functions to
// create new exemplar reservoirs for a new seen attribute set.
//
// If this is not provided a default factory function that returns an
// exemplar.Drop reservoir will be used.
ReservoirFunc func() exemplar.Reservoir[N]
}

func (b Builder[N]) filter(f Measure[N]) Measure[N] {
func (b Builder[N]) resFunc() func() exemplar.Reservoir[N] {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}

return exemplar.Drop[N]
}

type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, origAttr, fltrAttr attribute.Set)

func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] {
if b.Filter != nil {
fltr := b.Filter // Copy to make it immutable after assignment.
return func(ctx context.Context, n N, a attribute.Set) {
fAttr, _ := a.Filter(fltr)
f(ctx, n, fAttr)
f(ctx, n, a, fAttr)
}
}
return f
return func(ctx context.Context, n N, a attribute.Set) {
f(ctx, n, a, a)
}
}

// LastValue returns a last-value aggregate function input and output.
Expand All @@ -63,7 +82,7 @@ func (b Builder[N]) filter(f Measure[N]) Measure[N] {
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// Delta temporality is the only temporality that makes semantic sense for
// a last-value aggregate.
lv := newLastValue[N]()
lv := newLastValue[N](b.resFunc())

return b.filter(lv.measure), func(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory
Expand All @@ -79,7 +98,7 @@ func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// PrecomputedSum returns a sum aggregate function input and output. The
// arguments passed to the input are expected to be the precomputed sum values.
func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) {
s := newPrecomputedSum[N](monotonic)
s := newPrecomputedSum[N](monotonic, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
Expand All @@ -90,7 +109,7 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati

// Sum returns a sum aggregate function input and output.
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
s := newSum[N](monotonic)
s := newSum[N](monotonic, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
Expand All @@ -102,7 +121,7 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
// ExplicitBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
h := newHistogram[N](boundaries, noMinMax, noSum)
h := newHistogram[N](boundaries, noMinMax, noSum, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
Expand All @@ -114,7 +133,7 @@ func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSu
// ExponentialBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExponentialBucketHistogram(maxSize, maxScale int32, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum)
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
Expand Down
16 changes: 11 additions & 5 deletions sdk/metric/internal/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)
Expand Down Expand Up @@ -55,6 +56,10 @@ var (
}
)

func dropExemplars[N int64 | float64]() exemplar.Reservoir[N] {
return exemplar.Drop[N]()
}

func TestBuilderFilter(t *testing.T) {
t.Run("Int64", testBuilderFilter[int64]())
t.Run("Float64", testBuilderFilter[float64]())
Expand All @@ -65,20 +70,21 @@ func testBuilderFilter[N int64 | float64]() func(t *testing.T) {
t.Helper()

value, attr := N(1), alice
run := func(b Builder[N], wantA attribute.Set) func(*testing.T) {
run := func(b Builder[N], wantO, wantF attribute.Set) func(*testing.T) {
return func(t *testing.T) {
t.Helper()

meas := b.filter(func(_ context.Context, v N, a attribute.Set) {
meas := b.filter(func(_ context.Context, v N, o, f attribute.Set) {
assert.Equal(t, value, v, "measured incorrect value")
assert.Equal(t, wantA, a, "measured incorrect attributes")
assert.Equal(t, wantO, o, "measured incorrect original attributes")
assert.Equal(t, wantF, f, "measured incorrect filtered attributes")
})
meas(context.Background(), value, attr)
}
}

t.Run("NoFilter", run(Builder[N]{}, attr))
t.Run("Filter", run(Builder[N]{Filter: attrFltr}, fltrAlice))
t.Run("NoFilter", run(Builder[N]{}, attr, attr))
t.Run("Filter", run(Builder[N]{Filter: attrFltr}, alice, fltrAlice))
}
}

Expand Down
83 changes: 50 additions & 33 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand All @@ -40,6 +41,9 @@ const (

// expoHistogramDataPoint is a single data point in an exponential histogram.
type expoHistogramDataPoint[N int64 | float64] struct {
attr attribute.Set
res exemplar.Reservoir[N]

count uint64
min N
max N
Expand Down Expand Up @@ -288,14 +292,15 @@ func (b *expoBuckets) downscale(delta int) {
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool) *expoHistogram[N] {
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, r func() exemplar.Reservoir[N]) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
maxSize: int(maxSize),
maxScale: int(maxScale),

values: make(map[attribute.Set]*expoHistogramDataPoint[N]),
newRes: r,
values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]),

start: now(),
}
Expand All @@ -309,27 +314,35 @@ type expoHistogram[N int64 | float64] struct {
maxSize int
maxScale int

values map[attribute.Set]*expoHistogramDataPoint[N]
newRes func() exemplar.Reservoir[N]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex

start time.Time
}

func (e *expoHistogram[N]) measure(_ context.Context, value N, attr attribute.Set) {
func (e *expoHistogram[N]) measure(ctx context.Context, value N, origAttr, fltrAttr attribute.Set) {
// Ignore NaN and infinity.
if math.IsInf(float64(value), 0) || math.IsNaN(float64(value)) {
return
}

t := now()
key := fltrAttr.Equivalent()

e.valuesMu.Lock()
defer e.valuesMu.Unlock()

v, ok := e.values[attr]
v, ok := e.values[key]
if !ok {
v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum)
e.values[attr] = v
v.attr = fltrAttr
v.res = e.newRes()

e.values[key] = v
}
v.record(value)
v.res.Offer(ctx, t, value, origAttr)
}

func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
Expand All @@ -347,31 +360,33 @@ func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
hDPts := reset(h.DataPoints, n, n)

var i int
for a, b := range e.values {
hDPts[i].Attributes = a
for key, val := range e.values {
hDPts[i].Attributes = val.attr
hDPts[i].StartTime = e.start
hDPts[i].Time = t
hDPts[i].Count = b.count
hDPts[i].Scale = int32(b.scale)
hDPts[i].ZeroCount = b.zeroCount
hDPts[i].Count = val.count
hDPts[i].Scale = int32(val.scale)
hDPts[i].ZeroCount = val.zeroCount
hDPts[i].ZeroThreshold = 0.0

hDPts[i].PositiveBucket.Offset = int32(b.posBuckets.startBin)
hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(b.posBuckets.counts), len(b.posBuckets.counts))
copy(hDPts[i].PositiveBucket.Counts, b.posBuckets.counts)
hDPts[i].PositiveBucket.Offset = int32(val.posBuckets.startBin)
hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(val.posBuckets.counts), len(val.posBuckets.counts))
copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts)

hDPts[i].NegativeBucket.Offset = int32(b.negBuckets.startBin)
hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(b.negBuckets.counts), len(b.negBuckets.counts))
hDPts[i].NegativeBucket.Offset = int32(val.negBuckets.startBin)
hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(val.negBuckets.counts), len(val.negBuckets.counts))

if !e.noSum {
hDPts[i].Sum = b.sum
hDPts[i].Sum = val.sum
}
if !e.noMinMax {
hDPts[i].Min = metricdata.NewExtrema(b.min)
hDPts[i].Max = metricdata.NewExtrema(b.max)
hDPts[i].Min = metricdata.NewExtrema(val.min)
hDPts[i].Max = metricdata.NewExtrema(val.max)
}

delete(e.values, a)
val.res.Flush(&hDPts[i].Exemplars, val.attr)

delete(e.values, key)
i++
}
e.start = t
Expand All @@ -395,30 +410,32 @@ func (e *expoHistogram[N]) cumulative(dest *metricdata.Aggregation) int {
hDPts := reset(h.DataPoints, n, n)

var i int
for a, b := range e.values {
hDPts[i].Attributes = a
for _, val := range e.values {
hDPts[i].Attributes = val.attr
hDPts[i].StartTime = e.start
hDPts[i].Time = t
hDPts[i].Count = b.count
hDPts[i].Scale = int32(b.scale)
hDPts[i].ZeroCount = b.zeroCount
hDPts[i].Count = val.count
hDPts[i].Scale = int32(val.scale)
hDPts[i].ZeroCount = val.zeroCount
hDPts[i].ZeroThreshold = 0.0

hDPts[i].PositiveBucket.Offset = int32(b.posBuckets.startBin)
hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(b.posBuckets.counts), len(b.posBuckets.counts))
copy(hDPts[i].PositiveBucket.Counts, b.posBuckets.counts)
hDPts[i].PositiveBucket.Offset = int32(val.posBuckets.startBin)
hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(val.posBuckets.counts), len(val.posBuckets.counts))
copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts)

hDPts[i].NegativeBucket.Offset = int32(b.negBuckets.startBin)
hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(b.negBuckets.counts), len(b.negBuckets.counts))
hDPts[i].NegativeBucket.Offset = int32(val.negBuckets.startBin)
hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(val.negBuckets.counts), len(val.negBuckets.counts))

if !e.noSum {
hDPts[i].Sum = b.sum
hDPts[i].Sum = val.sum
}
if !e.noMinMax {
hDPts[i].Min = metricdata.NewExtrema(b.min)
hDPts[i].Max = metricdata.NewExtrema(b.max)
hDPts[i].Min = metricdata.NewExtrema(val.min)
hDPts[i].Max = metricdata.NewExtrema(val.max)
}

val.res.Collect(&hDPts[i].Exemplars, val.attr)

i++
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
Expand Down
Loading