Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 2 additions & 26 deletions sdk/metric/internal/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"sync"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
Expand Down Expand Up @@ -44,9 +42,6 @@ func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggreg
type filter[N int64 | float64] struct {
filter attribute.Filter
aggregator Aggregator[N]

sync.Mutex
seen map[attribute.Set]attribute.Set
}

// newFilter returns an filter Aggregator that wraps agg with the attribute
Expand All @@ -58,21 +53,13 @@ func newFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) *filte
return &filter[N]{
filter: fn,
aggregator: agg,
seen: make(map[attribute.Set]attribute.Set),
}
}

// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) {
// TODO (#3006): drop stale attributes from seen.
f.Lock()
defer f.Unlock()
fAttr, ok := f.seen[attr]
if !ok {
fAttr, _ = attr.Filter(f.filter)
f.seen[attr] = fAttr
}
fAttr, _ := attr.Filter(f.filter)
f.aggregator.Aggregate(measurement, fAttr)
}

Expand All @@ -90,9 +77,6 @@ func (f *filter[N]) Aggregation() metricdata.Aggregation {
type precomputedFilter[N int64 | float64] struct {
filter attribute.Filter
aggregator precomputeAggregator[N]

sync.Mutex
seen map[attribute.Set]attribute.Set
}

// newPrecomputedFilter returns a precomputedFilter Aggregator that wraps agg
Expand All @@ -104,21 +88,13 @@ func newPrecomputedFilter[N int64 | float64](agg precomputeAggregator[N], fn att
return &precomputedFilter[N]{
filter: fn,
aggregator: agg,
seen: make(map[attribute.Set]attribute.Set),
}
}

// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) {
// TODO (#3006): drop stale attributes from seen.
f.Lock()
defer f.Unlock()
fAttr, ok := f.seen[attr]
if !ok {
fAttr, _ = attr.Filter(f.filter)
f.seen[attr] = fAttr
}
fAttr, _ := attr.Filter(f.filter)
if fAttr.Equals(&attr) {
// No filtering done.
f.aggregator.Aggregate(measurement, fAttr)
Expand Down