diff --git a/CHANGELOG.md b/CHANGELOG.md index 27a44e96f77..2f69a0ddfd6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - ⚠️ Metrics SDK Breaking ⚠️ : the `AttributeFilter` fields of the `Stream` from `go.opentelemetry.io/otel/sdk/metric` is replaced by the `AttributeKeys` field. The `AttributeKeys` fields allows users to specify an allow-list of attributes allowed to be recorded for a view. This change is made to ensure compatibility with the OpenTelemetry specification. (#4288) +- If an attribute set is omitted from an async callback, the previous value will no longer be exported. (#4290) ### Fixed diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index 50a59697e16..1d1a7b0320c 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -255,10 +255,12 @@ type precomputedDeltaSum[N int64 | float64] struct { // collection cycle, and the unfiltered-sum is kept for the next collection // cycle. func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { + newReported := make(map[attribute.Set]N) s.Lock() defer s.Unlock() if len(s.values) == 0 { + s.reported = newReported return nil } @@ -277,16 +279,12 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { Time: t, Value: delta, }) - if delta != 0 { - s.reported[attr] = v - } - value.filtered = N(0) - s.values[attr] = value - // TODO (#3006): This will use an unbounded amount of memory if there - // are unbounded number of attribute sets being aggregated. Attribute - // sets that become "stale" need to be forgotten so this will not - // overload the system. + newReported[attr] = v + // Unused attribute sets do not report. + delete(s.values, attr) } + // Unused attribute sets are forgotten. + s.reported = newReported // The delta collection cycle resets. s.start = t return out @@ -349,12 +347,8 @@ func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation { Time: t, Value: value.measured + value.filtered, }) - value.filtered = N(0) - s.values[attr] = value - // TODO (#3006): This will use an unbounded amount of memory if there - // are unbounded number of attribute sets being aggregated. Attribute - // sets that become "stale" need to be forgotten so this will not - // overload the system. + // Unused attribute sets do not report. + delete(s.values, attr) } return out } diff --git a/sdk/metric/internal/aggregate/sum_test.go b/sdk/metric/internal/aggregate/sum_test.go index 3d206df5412..4fff11b4128 100644 --- a/sdk/metric/internal/aggregate/sum_test.go +++ b/sdk/metric/internal/aggregate/sum_test.go @@ -180,10 +180,9 @@ func TestPreComputedDeltaSum(t *testing.T) { opt := metricdatatest.IgnoreTimestamp() metricdatatest.AssertAggregationsEqual(t, want, got, opt) - // Delta values should zero. + // No observation means no metric data got = agg.Aggregation() - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) + metricdatatest.AssertAggregationsEqual(t, nil, got, opt) agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs) got = agg.Aggregation() @@ -193,13 +192,8 @@ func TestPreComputedDeltaSum(t *testing.T) { // Filtered values should not persist. got = agg.Aggregation() - // measured(+): 1, previous(-): 2, filtered(+): 0 - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -1)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) - got = agg.Aggregation() - // measured(+): 1, previous(-): 1, filtered(+): 0 - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) + // No observation means no metric data + metricdatatest.AssertAggregationsEqual(t, nil, got, opt) // Override set value. agg.Aggregate(2, attrs) @@ -208,8 +202,8 @@ func TestPreComputedDeltaSum(t *testing.T) { agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) got = agg.Aggregation() - // measured(+): 5, previous(-): 1, filtered(+): 13 - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 17)} + // measured(+): 5, previous(-): 0, filtered(+): 13 + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)} metricdatatest.AssertAggregationsEqual(t, want, got, opt) // Filtered values should not persist. @@ -251,19 +245,18 @@ func TestPreComputedCumulativeSum(t *testing.T) { opt := metricdatatest.IgnoreTimestamp() metricdatatest.AssertAggregationsEqual(t, want, got, opt) - // Cumulative values should persist. + // Cumulative values should not persist. got = agg.Aggregation() - metricdatatest.AssertAggregationsEqual(t, want, got, opt) + metricdatatest.AssertAggregationsEqual(t, nil, got, opt) agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs) got = agg.Aggregation() - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 2)} + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)} metricdatatest.AssertAggregationsEqual(t, want, got, opt) // Filtered values should not persist. got = agg.Aggregation() - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) + metricdatatest.AssertAggregationsEqual(t, nil, got, opt) // Override set value. agg.Aggregate(5, attrs) @@ -276,8 +269,7 @@ func TestPreComputedCumulativeSum(t *testing.T) { // Filtered values should not persist. got = agg.Aggregation() - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 5)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) + metricdatatest.AssertAggregationsEqual(t, nil, got, opt) // Order should not affect measure. // Filtered should add. @@ -287,9 +279,6 @@ func TestPreComputedCumulativeSum(t *testing.T) { got = agg.Aggregation() want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 20)} metricdatatest.AssertAggregationsEqual(t, want, got, opt) - got = agg.Aggregation() - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 7)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) } func TestEmptySumNilAggregation(t *testing.T) { diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 50ea34e100d..64ff1d9e705 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -107,6 +107,7 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti // Int64ObservableCounter returns a new instrument identified by name and // configured with options. The instrument is used to asynchronously record // increasing int64 measurements once per a measurement collection cycle. +// Only the measurements recorded during the collection cycle are exported. func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) { cfg := metric.NewInt64ObservableCounterConfig(options...) const kind = InstrumentKindObservableCounter @@ -121,7 +122,8 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser // Int64ObservableUpDownCounter returns a new instrument identified by name and // configured with options. The instrument is used to asynchronously record -// int64 measurements once per a measurement collection cycle. +// int64 measurements once per a measurement collection cycle. Only the +// measurements recorded during the collection cycle are exported. func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) { cfg := metric.NewInt64ObservableUpDownCounterConfig(options...) const kind = InstrumentKindObservableUpDownCounter @@ -137,6 +139,7 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6 // Int64ObservableGauge returns a new instrument identified by name and // configured with options. The instrument is used to asynchronously record // instantaneous int64 measurements once per a measurement collection cycle. +// Only the measurements recorded during the collection cycle are exported. func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) { cfg := metric.NewInt64ObservableGaugeConfig(options...) const kind = InstrumentKindObservableGauge @@ -194,6 +197,7 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram // Float64ObservableCounter returns a new instrument identified by name and // configured with options. The instrument is used to asynchronously record // increasing float64 measurements once per a measurement collection cycle. +// Only the measurements recorded during the collection cycle are exported. func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) { cfg := metric.NewFloat64ObservableCounterConfig(options...) const kind = InstrumentKindObservableCounter @@ -208,7 +212,8 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O // Float64ObservableUpDownCounter returns a new instrument identified by name // and configured with options. The instrument is used to asynchronously record -// float64 measurements once per a measurement collection cycle. +// float64 measurements once per a measurement collection cycle. Only the +// measurements recorded during the collection cycle are exported. func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) { cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...) const kind = InstrumentKindObservableUpDownCounter @@ -224,6 +229,7 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl // Float64ObservableGauge returns a new instrument identified by name and // configured with options. The instrument is used to asynchronously record // instantaneous float64 measurements once per a measurement collection cycle. +// Only the measurements recorded during the collection cycle are exported. func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) { cfg := metric.NewFloat64ObservableGaugeConfig(options...) const kind = InstrumentKindObservableGauge @@ -272,6 +278,10 @@ func isAlphanumeric(c rune) bool { // Only instruments from this meter can be registered with f, an error is // returned if other instrument are provided. // +// Only observations made in the callback will be exported. Unlike synchronous +// instruments, asynchronous callbacks can "forget" attribute sets that are no +// longer relevant by omitting the observation during the callback. +// // The returned Registration can be used to unregister f. func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) (metric.Registration, error) { if len(insts) == 0 { diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 7a3d5c0b8dd..185095e4f8d 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -1687,8 +1687,7 @@ func TestObservableExample(t *testing.T) { Temporality: temporality, IsMonotonic: true, DataPoints: []metricdata.DataPoint[int64]{ - // Thread 1 remains at last measured value. - {Attributes: thread1, Value: 60}, + // Thread 1 is no longer exported. {Attributes: thread2, Value: 53}, {Attributes: thread3, Value: 5}, }, @@ -1762,8 +1761,7 @@ func TestObservableExample(t *testing.T) { Temporality: temporality, IsMonotonic: true, DataPoints: []metricdata.DataPoint[int64]{ - // Thread 1 remains at last measured value. - {Attributes: thread1, Value: 0}, + // Thread 1 is no longer exported. {Attributes: thread2, Value: 6}, {Attributes: thread3, Value: 5}, },