diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index ac9dda5cbad..76ea3636a27 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -88,39 +88,23 @@ func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) { // PrecomputedSum returns a sum aggregate function input and output. The // arguments passed to the input are expected to be the precomputed sum values. func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) { - var s aggregator[N] + s := newPrecomputedSum[N](monotonic) switch b.Temporality { case metricdata.DeltaTemporality: - s = newPrecomputedDeltaSum[N](monotonic) + return b.filter(s.measure), s.delta default: - s = newPrecomputedCumulativeSum[N](monotonic) - } - - return b.input(s), func(dest *metricdata.Aggregation) int { - // TODO (#4220): optimize memory reuse here. - *dest = s.Aggregation() - - sData, _ := (*dest).(metricdata.Sum[N]) - return len(sData.DataPoints) + return b.filter(s.measure), s.cumulative } } // Sum returns a sum aggregate function input and output. func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) { - var s aggregator[N] + s := newSum[N](monotonic) switch b.Temporality { case metricdata.DeltaTemporality: - s = newDeltaSum[N](monotonic) + return b.filter(s.measure), s.delta default: - s = newCumulativeSum[N](monotonic) - } - - return b.input(s), func(dest *metricdata.Aggregation) int { - // TODO (#4220): optimize memory reuse here. - *dest = s.Aggregation() - - sData, _ := (*dest).(metricdata.Sum[N]) - return len(sData.DataPoints) + return b.filter(s.measure), s.cumulative } } diff --git a/sdk/metric/internal/aggregate/aggregator_test.go b/sdk/metric/internal/aggregate/aggregator_test.go index 0bca6b01b4b..4e16175159b 100644 --- a/sdk/metric/internal/aggregate/aggregator_test.go +++ b/sdk/metric/internal/aggregate/aggregator_test.go @@ -38,10 +38,6 @@ func monoIncr[N int64 | float64]() setMap[N] { return setMap[N]{alice: 1, bob: 10, carol: 2} } -func nonMonoIncr[N int64 | float64]() setMap[N] { - return setMap[N]{alice: 1, bob: -1, carol: 2} -} - // setMap maps attribute sets to a number. type setMap[N int64 | float64] map[attribute.Set]N diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index 594068c4354..1e52ff0d1e5 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -15,6 +15,7 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" import ( + "context" "sync" "time" @@ -32,257 +33,190 @@ func newValueMap[N int64 | float64]() *valueMap[N] { return &valueMap[N]{values: make(map[attribute.Set]N)} } -func (s *valueMap[N]) Aggregate(value N, attr attribute.Set) { +func (s *valueMap[N]) measure(_ context.Context, value N, attr attribute.Set) { s.Lock() s.values[attr] += value s.Unlock() } -// newDeltaSum returns an Aggregator that summarizes a set of measurements as -// their arithmetic sum. Each sum is scoped by attributes and the aggregation -// cycle the measurements were made in. -// -// The monotonic value is used to communicate the produced Aggregation is -// monotonic or not. The returned Aggregator does not make any guarantees this -// value is accurate. It is up to the caller to ensure it. -// -// Each aggregation cycle is treated independently. When the returned -// Aggregator's Aggregation method is called it will reset all sums to zero. -func newDeltaSum[N int64 | float64](monotonic bool) aggregator[N] { - return &deltaSum[N]{ +// newSum returns an aggregator that summarizes a set of measurements as their +// arithmetic sum. Each sum is scoped by attributes and the aggregation cycle +// the measurements were made in. +func newSum[N int64 | float64](monotonic bool) *sum[N] { + return &sum[N]{ valueMap: newValueMap[N](), monotonic: monotonic, start: now(), } } -// deltaSum summarizes a set of measurements made in a single aggregation -// cycle as their arithmetic sum. -type deltaSum[N int64 | float64] struct { +// sum summarizes a set of measurements made as their arithmetic sum. +type sum[N int64 | float64] struct { *valueMap[N] monotonic bool start time.Time } -func (s *deltaSum[N]) Aggregation() metricdata.Aggregation { +func (s *sum[N]) delta(dest *metricdata.Aggregation) int { + t := now() + + // If *dest is not a metricdata.Sum, memory reuse is missed. In that case, + // use the zero-value sData and hope for better alignment next cycle. + sData, _ := (*dest).(metricdata.Sum[N]) + sData.Temporality = metricdata.DeltaTemporality + sData.IsMonotonic = s.monotonic + s.Lock() defer s.Unlock() - if len(s.values) == 0 { - return nil - } + n := len(s.values) + dPts := reset(sData.DataPoints, n, n) - t := now() - out := metricdata.Sum[N]{ - Temporality: metricdata.DeltaTemporality, - IsMonotonic: s.monotonic, - DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)), - } + var i int for attr, value := range s.values { - out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{ - Attributes: attr, - StartTime: s.start, - Time: t, - Value: value, - }) - // Unused attribute sets do not report. + dPts[i].Attributes = attr + dPts[i].StartTime = s.start + dPts[i].Time = t + dPts[i].Value = value + // Do not report stale values. delete(s.values, attr) + i++ } // The delta collection cycle resets. s.start = t - return out -} -// newCumulativeSum returns an Aggregator that summarizes a set of -// measurements as their arithmetic sum. Each sum is scoped by attributes and -// the aggregation cycle the measurements were made in. -// -// The monotonic value is used to communicate the produced Aggregation is -// monotonic or not. The returned Aggregator does not make any guarantees this -// value is accurate. It is up to the caller to ensure it. -// -// Each aggregation cycle is treated independently. When the returned -// Aggregator's Aggregation method is called it will reset all sums to zero. -func newCumulativeSum[N int64 | float64](monotonic bool) aggregator[N] { - return &cumulativeSum[N]{ - valueMap: newValueMap[N](), - monotonic: monotonic, - start: now(), - } + sData.DataPoints = dPts + *dest = sData + + return n } -// cumulativeSum summarizes a set of measurements made over all aggregation -// cycles as their arithmetic sum. -type cumulativeSum[N int64 | float64] struct { - *valueMap[N] +func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { + t := now() - monotonic bool - start time.Time -} + // If *dest is not a metricdata.Sum, memory reuse is missed. In that case, + // use the zero-value sData and hope for better alignment next cycle. + sData, _ := (*dest).(metricdata.Sum[N]) + sData.Temporality = metricdata.CumulativeTemporality + sData.IsMonotonic = s.monotonic -func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation { s.Lock() defer s.Unlock() - if len(s.values) == 0 { - return nil - } + n := len(s.values) + dPts := reset(sData.DataPoints, n, n) - t := now() - out := metricdata.Sum[N]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: s.monotonic, - DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)), - } + var i int for attr, value := range s.values { - out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{ - Attributes: attr, - StartTime: s.start, - Time: t, - Value: value, - }) + dPts[i].Attributes = attr + dPts[i].StartTime = s.start + dPts[i].Time = t + dPts[i].Value = value // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not // overload the system. + i++ } - return out + + sData.DataPoints = dPts + *dest = sData + + return n } -// newPrecomputedDeltaSum returns an Aggregator that summarizes a set of -// pre-computed sums. Each sum is scoped by attributes and the aggregation -// cycle the measurements were made in. -// -// The monotonic value is used to communicate the produced Aggregation is -// monotonic or not. The returned Aggregator does not make any guarantees this -// value is accurate. It is up to the caller to ensure it. -// -// The output Aggregation will report recorded values as delta temporality. -func newPrecomputedDeltaSum[N int64 | float64](monotonic bool) aggregator[N] { - return &precomputedDeltaSum[N]{ +// newPrecomputedSum returns an aggregator that summarizes a set of +// observatrions as their arithmetic sum. Each sum is scoped by attributes and +// the aggregation cycle the measurements were made in. +func newPrecomputedSum[N int64 | float64](monotonic bool) *precomputedSum[N] { + return &precomputedSum[N]{ valueMap: newValueMap[N](), - reported: make(map[attribute.Set]N), monotonic: monotonic, start: now(), } } -// precomputedDeltaSum summarizes a set of pre-computed sums recorded over all -// aggregation cycles as the delta of these sums. -type precomputedDeltaSum[N int64 | float64] struct { +// precomputedSum summarizes a set of observatrions as their arithmetic sum. +type precomputedSum[N int64 | float64] struct { *valueMap[N] - reported map[attribute.Set]N - monotonic bool start time.Time + + reported map[attribute.Set]N } -// Aggregation returns the recorded pre-computed sums as an Aggregation. The -// sum values are expressed as the delta between what was measured this -// collection cycle and the previous. -// -// All pre-computed sums that were recorded for attributes sets reduced by an -// attribute filter (filtered-sums) are summed together and added to any -// pre-computed sum value recorded directly for the resulting attribute set -// (unfiltered-sum). The filtered-sums are reset to zero for the next -// collection cycle, and the unfiltered-sum is kept for the next collection -// cycle. -func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { +func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int { + t := now() newReported := make(map[attribute.Set]N) + + // If *dest is not a metricdata.Sum, memory reuse is missed. In that case, + // use the zero-value sData and hope for better alignment next cycle. + sData, _ := (*dest).(metricdata.Sum[N]) + sData.Temporality = metricdata.DeltaTemporality + sData.IsMonotonic = s.monotonic + s.Lock() defer s.Unlock() - if len(s.values) == 0 { - s.reported = newReported - return nil - } + n := len(s.values) + dPts := reset(sData.DataPoints, n, n) - t := now() - out := metricdata.Sum[N]{ - Temporality: metricdata.DeltaTemporality, - IsMonotonic: s.monotonic, - DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)), - } + var i int for attr, value := range s.values { delta := value - s.reported[attr] - out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{ - Attributes: attr, - StartTime: s.start, - Time: t, - Value: delta, - }) + + dPts[i].Attributes = attr + dPts[i].StartTime = s.start + dPts[i].Time = t + dPts[i].Value = delta + newReported[attr] = value // Unused attribute sets do not report. delete(s.values, attr) + i++ } // Unused attribute sets are forgotten. s.reported = newReported // The delta collection cycle resets. s.start = t - return out -} -// newPrecomputedCumulativeSum returns an Aggregator that summarizes a set of -// pre-computed sums. Each sum is scoped by attributes and the aggregation -// cycle the measurements were made in. -// -// The monotonic value is used to communicate the produced Aggregation is -// monotonic or not. The returned Aggregator does not make any guarantees this -// value is accurate. It is up to the caller to ensure it. -// -// The output Aggregation will report recorded values as cumulative -// temporality. -func newPrecomputedCumulativeSum[N int64 | float64](monotonic bool) aggregator[N] { - return &precomputedCumulativeSum[N]{ - valueMap: newValueMap[N](), - monotonic: monotonic, - start: now(), - } + sData.DataPoints = dPts + *dest = sData + + return n } -// precomputedCumulativeSum directly records and reports a set of pre-computed sums. -type precomputedCumulativeSum[N int64 | float64] struct { - *valueMap[N] +func (s *precomputedSum[N]) cumulative(dest *metricdata.Aggregation) int { + t := now() - monotonic bool - start time.Time -} + // If *dest is not a metricdata.Sum, memory reuse is missed. In that case, + // use the zero-value sData and hope for better alignment next cycle. + sData, _ := (*dest).(metricdata.Sum[N]) + sData.Temporality = metricdata.CumulativeTemporality + sData.IsMonotonic = s.monotonic -// Aggregation returns the recorded pre-computed sums as an Aggregation. The -// sum values are expressed directly as they are assumed to be recorded as the -// cumulative sum of a some measured phenomena. -// -// All pre-computed sums that were recorded for attributes sets reduced by an -// attribute filter (filtered-sums) are summed together and added to any -// pre-computed sum value recorded directly for the resulting attribute set -// (unfiltered-sum). The filtered-sums are reset to zero for the next -// collection cycle, and the unfiltered-sum is kept for the next collection -// cycle. -func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation { s.Lock() defer s.Unlock() - if len(s.values) == 0 { - return nil - } + n := len(s.values) + dPts := reset(sData.DataPoints, n, n) - t := now() - out := metricdata.Sum[N]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: s.monotonic, - DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)), - } + var i int for attr, value := range s.values { - out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{ - Attributes: attr, - StartTime: s.start, - Time: t, - Value: value, - }) + dPts[i].Attributes = attr + dPts[i].StartTime = s.start + dPts[i].Time = t + dPts[i].Value = value + // Unused attribute sets do not report. delete(s.values, attr) + i++ } - return out + + sData.DataPoints = dPts + *dest = sData + + return n } diff --git a/sdk/metric/internal/aggregate/sum_test.go b/sdk/metric/internal/aggregate/sum_test.go index 0843bcf8429..3ac675a096b 100644 --- a/sdk/metric/internal/aggregate/sum_test.go +++ b/sdk/metric/internal/aggregate/sum_test.go @@ -15,250 +15,425 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" import ( + "context" "testing" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" ) func TestSum(t *testing.T) { t.Cleanup(mockTime(now)) - t.Run("Int64", testSum[int64]) - t.Run("Float64", testSum[float64]) -} - -func testSum[N int64 | float64](t *testing.T) { - tester := &aggregatorTester[N]{ - GoroutineN: defaultGoroutines, - MeasurementN: defaultMeasurements, - CycleN: defaultCycles, - } - totalMeasurements := defaultGoroutines * defaultMeasurements - - t.Run("Delta", func(t *testing.T) { - incr, mono := monoIncr[N](), true - eFunc := deltaExpecter[N](incr, mono) - t.Run("Monotonic", tester.Run(newDeltaSum[N](mono), incr, eFunc)) - - incr, mono = nonMonoIncr[N](), false - eFunc = deltaExpecter[N](incr, mono) - t.Run("NonMonotonic", tester.Run(newDeltaSum[N](mono), incr, eFunc)) - }) - - t.Run("Cumulative", func(t *testing.T) { - incr, mono := monoIncr[N](), true - eFunc := cumuExpecter[N](incr, mono) - t.Run("Monotonic", tester.Run(newCumulativeSum[N](mono), incr, eFunc)) - - incr, mono = nonMonoIncr[N](), false - eFunc = cumuExpecter[N](incr, mono) - t.Run("NonMonotonic", tester.Run(newCumulativeSum[N](mono), incr, eFunc)) - }) - - t.Run("PreComputedDelta", func(t *testing.T) { - incr, mono := monoIncr[N](), true - eFunc := preDeltaExpecter[N](incr, mono, N(totalMeasurements)) - t.Run("Monotonic", tester.Run(newPrecomputedDeltaSum[N](mono), incr, eFunc)) - - incr, mono = nonMonoIncr[N](), false - eFunc = preDeltaExpecter[N](incr, mono, N(totalMeasurements)) - t.Run("NonMonotonic", tester.Run(newPrecomputedDeltaSum[N](mono), incr, eFunc)) - }) - - t.Run("PreComputedCumulative", func(t *testing.T) { - incr, mono := monoIncr[N](), true - eFunc := preCumuExpecter[N](incr, mono, N(totalMeasurements)) - t.Run("Monotonic", tester.Run(newPrecomputedCumulativeSum[N](mono), incr, eFunc)) - incr, mono = nonMonoIncr[N](), false - eFunc = preCumuExpecter[N](incr, mono, N(totalMeasurements)) - t.Run("NonMonotonic", tester.Run(newPrecomputedCumulativeSum[N](mono), incr, eFunc)) - }) -} + t.Run("Int64/DeltaSum", testDeltaSum[int64]()) + t.Run("Float64/DeltaSum", testDeltaSum[float64]()) -func deltaExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc { - sum := metricdata.Sum[N]{Temporality: metricdata.DeltaTemporality, IsMonotonic: mono} - return func(m int) metricdata.Aggregation { - sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr)) - for a, v := range incr { - sum.DataPoints = append(sum.DataPoints, point(a, v*N(m))) - } - return sum - } -} + t.Run("Int64/CumulativeSum", testCumulativeSum[int64]()) + t.Run("Float64/CumulativeSum", testCumulativeSum[float64]()) -func cumuExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc { - var cycle N - sum := metricdata.Sum[N]{Temporality: metricdata.CumulativeTemporality, IsMonotonic: mono} - return func(m int) metricdata.Aggregation { - cycle++ - sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr)) - for a, v := range incr { - sum.DataPoints = append(sum.DataPoints, point(a, v*cycle*N(m))) - } - return sum - } -} + t.Run("Int64/DeltaPrecomputedSum", testDeltaPrecomputedSum[int64]()) + t.Run("Float64/DeltaPrecomputedSum", testDeltaPrecomputedSum[float64]()) -func preDeltaExpecter[N int64 | float64](incr setMap[N], mono bool, totalMeasurements N) expectFunc { - sum := metricdata.Sum[N]{Temporality: metricdata.DeltaTemporality, IsMonotonic: mono} - last := make(map[attribute.Set]N) - return func(int) metricdata.Aggregation { - sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr)) - for a, v := range incr { - l := last[a] - sum.DataPoints = append(sum.DataPoints, point(a, totalMeasurements*(N(v)-l))) - last[a] = N(v) - } - return sum - } + t.Run("Int64/CumulativePrecomputedSum", testCumulativePrecomputedSum[int64]()) + t.Run("Float64/CumulativePrecomputedSum", testCumulativePrecomputedSum[float64]()) } -func preCumuExpecter[N int64 | float64](incr setMap[N], mono bool, totalMeasurements N) expectFunc { - sum := metricdata.Sum[N]{Temporality: metricdata.CumulativeTemporality, IsMonotonic: mono} - return func(int) metricdata.Aggregation { - sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr)) - for a, v := range incr { - sum.DataPoints = append(sum.DataPoints, point(a, totalMeasurements*N(v))) - } - return sum - } -} - -// point returns a DataPoint that started and ended now. -func point[N int64 | float64](a attribute.Set, v N) metricdata.DataPoint[N] { - return metricdata.DataPoint[N]{ - Attributes: a, - StartTime: now(), - Time: now(), - Value: N(v), - } -} - -func testDeltaSumReset[N int64 | float64](t *testing.T) { - t.Cleanup(mockTime(now)) - - a := newDeltaSum[N](false) - assert.Nil(t, a.Aggregation()) - - a.Aggregate(1, alice) - expect := metricdata.Sum[N]{Temporality: metricdata.DeltaTemporality} - expect.DataPoints = []metricdata.DataPoint[N]{point[N](alice, 1)} - metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) - - // The attr set should be forgotten once Aggregations is called. - expect.DataPoints = nil - assert.Nil(t, a.Aggregation()) - - // Aggregating another set should not affect the original (alice). - a.Aggregate(1, bob) - expect.DataPoints = []metricdata.DataPoint[N]{point[N](bob, 1)} - metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) +func testDeltaSum[N int64 | float64]() func(t *testing.T) { + mono := false + in, out := Builder[N]{ + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + }.Sum(mono) + ctx := context.Background() + return test[N](in, out, []teststep[N]{ + { + input: []arg[N]{}, + expect: output{ + n: 0, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.DataPoint[N]{}, + }, + }, + }, + { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, -1, bob}, + {ctx, 1, alice}, + {ctx, 2, alice}, + {ctx, -10, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 4, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: -11, + }, + }, + }, + }, + }, + { + input: []arg[N]{ + {ctx, 10, alice}, + {ctx, 3, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 10, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: 3, + }, + }, + }, + }, + }, + { + input: []arg[N]{}, + // Delta sums are expected to reset. + expect: output{ + n: 0, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.DataPoint[N]{}, + }, + }, + }, + }) } -func TestDeltaSumReset(t *testing.T) { - t.Run("Int64", testDeltaSumReset[int64]) - t.Run("Float64", testDeltaSumReset[float64]) +func testCumulativeSum[N int64 | float64]() func(t *testing.T) { + mono := false + in, out := Builder[N]{ + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + }.Sum(mono) + ctx := context.Background() + return test[N](in, out, []teststep[N]{ + { + input: []arg[N]{}, + expect: output{ + n: 0, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[N]{}, + }, + }, + }, + { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, -1, bob}, + {ctx, 1, alice}, + {ctx, 2, alice}, + {ctx, -10, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 4, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: -11, + }, + }, + }, + }, + }, + { + input: []arg[N]{ + {ctx, 10, alice}, + {ctx, 3, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 14, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: -8, + }, + }, + }, + }, + }, + }) } -func TestPreComputedDeltaSum(t *testing.T) { - var mono bool - agg := newPrecomputedDeltaSum[int64](mono) - require.Implements(t, (*aggregator[int64])(nil), agg) - - attrs := attribute.NewSet(attribute.String("key", "val")) - agg.Aggregate(1, attrs) - want := metricdata.Sum[int64]{ - IsMonotonic: mono, +func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) { + mono := false + in, out := Builder[N]{ Temporality: metricdata.DeltaTemporality, - DataPoints: []metricdata.DataPoint[int64]{point[int64](attrs, 1)}, - } - opt := metricdatatest.IgnoreTimestamp() - metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) - - // No observation results in an empty aggregation, and causes previous - // observations to be forgotten. - metricdatatest.AssertAggregationsEqual(t, nil, agg.Aggregation(), opt) - - agg.Aggregate(1, attrs) - // measured(+): 1, previous(-): 0 - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)} - metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) - - // Duplicate observations add - agg.Aggregate(2, attrs) - agg.Aggregate(5, attrs) - agg.Aggregate(3, attrs) - agg.Aggregate(10, attrs) - // measured(+): 20, previous(-): 1 - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 19)} - metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) + Filter: attrFltr, + }.PrecomputedSum(mono) + ctx := context.Background() + return test[N](in, out, []teststep[N]{ + { + input: []arg[N]{}, + expect: output{ + n: 0, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.DataPoint[N]{}, + }, + }, + }, + { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, -1, bob}, + {ctx, 1, fltrAlice}, + {ctx, 2, alice}, + {ctx, -10, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 4, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: -11, + }, + }, + }, + }, + }, + { + input: []arg[N]{ + {ctx, 1, fltrAlice}, + {ctx, 10, alice}, + {ctx, 3, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 7, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: 14, + }, + }, + }, + }, + }, + { + input: []arg[N]{}, + // Precomputed sums are expected to reset. + expect: output{ + n: 0, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.DataPoint[N]{}, + }, + }, + }, + }) } -func TestPreComputedCumulativeSum(t *testing.T) { - var mono bool - agg := newPrecomputedCumulativeSum[int64](mono) - require.Implements(t, (*aggregator[int64])(nil), agg) - - attrs := attribute.NewSet(attribute.String("key", "val")) - agg.Aggregate(1, attrs) - want := metricdata.Sum[int64]{ - IsMonotonic: mono, +func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) { + mono := false + in, out := Builder[N]{ Temporality: metricdata.CumulativeTemporality, - DataPoints: []metricdata.DataPoint[int64]{point[int64](attrs, 1)}, - } - opt := metricdatatest.IgnoreTimestamp() - metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) - - // Cumulative values should not persist. - metricdatatest.AssertAggregationsEqual(t, nil, agg.Aggregation(), opt) - - agg.Aggregate(1, attrs) - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)} - metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) - - // Duplicate measurements add - agg.Aggregate(5, attrs) - agg.Aggregate(3, attrs) - agg.Aggregate(10, attrs) - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)} - metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) -} - -func TestEmptySumNilAggregation(t *testing.T) { - assert.Nil(t, newCumulativeSum[int64](true).Aggregation()) - assert.Nil(t, newCumulativeSum[int64](false).Aggregation()) - assert.Nil(t, newCumulativeSum[float64](true).Aggregation()) - assert.Nil(t, newCumulativeSum[float64](false).Aggregation()) - assert.Nil(t, newDeltaSum[int64](true).Aggregation()) - assert.Nil(t, newDeltaSum[int64](false).Aggregation()) - assert.Nil(t, newDeltaSum[float64](true).Aggregation()) - assert.Nil(t, newDeltaSum[float64](false).Aggregation()) - assert.Nil(t, newPrecomputedCumulativeSum[int64](true).Aggregation()) - assert.Nil(t, newPrecomputedCumulativeSum[int64](false).Aggregation()) - assert.Nil(t, newPrecomputedCumulativeSum[float64](true).Aggregation()) - assert.Nil(t, newPrecomputedCumulativeSum[float64](false).Aggregation()) - assert.Nil(t, newPrecomputedDeltaSum[int64](true).Aggregation()) - assert.Nil(t, newPrecomputedDeltaSum[int64](false).Aggregation()) - assert.Nil(t, newPrecomputedDeltaSum[float64](true).Aggregation()) - assert.Nil(t, newPrecomputedDeltaSum[float64](false).Aggregation()) + Filter: attrFltr, + }.PrecomputedSum(mono) + ctx := context.Background() + return test[N](in, out, []teststep[N]{ + { + input: []arg[N]{}, + expect: output{ + n: 0, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[N]{}, + }, + }, + }, + { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, -1, bob}, + {ctx, 1, fltrAlice}, + {ctx, 2, alice}, + {ctx, -10, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 4, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: -11, + }, + }, + }, + }, + }, + { + input: []arg[N]{ + {ctx, 1, fltrAlice}, + {ctx, 10, alice}, + {ctx, 3, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 11, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: 3, + }, + }, + }, + }, + }, + { + input: []arg[N]{}, + // Precomputed sums are expected to reset. + expect: output{ + n: 0, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[N]{}, + }, + }, + }, + }) } func BenchmarkSum(b *testing.B) { - b.Run("Int64", benchmarkSum[int64]) - b.Run("Float64", benchmarkSum[float64]) -} - -func benchmarkSum[N int64 | float64](b *testing.B) { // The monotonic argument is only used to annotate the Sum returned from // the Aggregation method. It should not have an effect on operational // performance, therefore, only monotonic=false is benchmarked here. - factory := func() aggregator[N] { return newDeltaSum[N](false) } - b.Run("Delta", benchmarkAggregator(factory)) - factory = func() aggregator[N] { return newCumulativeSum[N](false) } - b.Run("Cumulative", benchmarkAggregator(factory)) + b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { + return Builder[int64]{ + Temporality: metricdata.CumulativeTemporality, + }.Sum(false) + })) + b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { + return Builder[int64]{ + Temporality: metricdata.DeltaTemporality, + }.Sum(false) + })) + b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { + return Builder[float64]{ + Temporality: metricdata.CumulativeTemporality, + }.Sum(false) + })) + b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { + return Builder[float64]{ + Temporality: metricdata.DeltaTemporality, + }.Sum(false) + })) + + b.Run("Precomputed/Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { + return Builder[int64]{ + Temporality: metricdata.CumulativeTemporality, + }.PrecomputedSum(false) + })) + b.Run("Precomputed/Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { + return Builder[int64]{ + Temporality: metricdata.DeltaTemporality, + }.PrecomputedSum(false) + })) + b.Run("Precomputed/Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { + return Builder[float64]{ + Temporality: metricdata.CumulativeTemporality, + }.PrecomputedSum(false) + })) + b.Run("Precomputed/Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { + return Builder[float64]{ + Temporality: metricdata.DeltaTemporality, + }.PrecomputedSum(false) + })) }