Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Count the Collect time in the PeriodicReader timeout. (#4221)
- `New` in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` returns `*Exporter` instead of `"go.opentelemetry.io/otel/sdk/metric".Exporter`. (#4272)
- `New` in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp` returns `*Exporter` instead of `"go.opentelemetry.io/otel/sdk/metric".Exporter`. (#4272)
- ⚠️ Metrics SDK Breaking ⚠️ : the `AttributeFilter` fields of the `Stream` from `go.opentelemetry.io/otel/sdk/metric` is replaced by the `AttributeKeys` field.
The `AttributeKeys` fields allows users to specify an allow-list of attributes allowed to be recorded for a view.
This change is made to ensure compatibility with the OpenTelemetry specification. (#4288)

### Fixed

Expand Down
4 changes: 1 addition & 3 deletions sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ var viewBenchmarks = []struct {
"AttrFilterView",
[]View{NewView(
Instrument{Name: "*"},
Stream{AttributeFilter: func(kv attribute.KeyValue) bool {
return kv.Key == attribute.Key("K")
}},
Stream{AllowAttributeKeys: []attribute.Key{"K"}},
)},
},
}
Expand Down
27 changes: 25 additions & 2 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,31 @@ type Stream struct {
Unit string
// Aggregation the stream uses for an instrument.
Aggregation aggregation.Aggregation
// AttributeFilter applied to all attributes recorded for an instrument.
AttributeFilter attribute.Filter
// AllowAttributeKeys are an allow-list of attribute keys that will be
// preserved for the stream. Any attribute recorded for the stream with a
// key not in this slice will be dropped.
//
// If this slice is empty, all attributes will be kept.
AllowAttributeKeys []attribute.Key
}

// attributeFilter returns an attribute.Filter that only allows attributes
// with keys in s.AttributeKeys.
//
// If s.AttributeKeys is empty an accept-all filter is returned.
func (s Stream) attributeFilter() attribute.Filter {
if len(s.AllowAttributeKeys) <= 0 {
return func(kv attribute.KeyValue) bool { return true }
}

allowed := make(map[attribute.Key]struct{})
for _, k := range s.AllowAttributeKeys {
allowed[k] = struct{}{}
}
return func(kv attribute.KeyValue) bool {
_, ok := allowed[kv.Key]
return ok
}
}

// streamID are the identifying properties of a stream.
Expand Down
11 changes: 3 additions & 8 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1516,9 +1516,7 @@ func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) {
WithReader(rdr),
WithView(NewView(
Instrument{Name: "*"},
Stream{AttributeFilter: func(kv attribute.KeyValue) bool {
return kv.Key == attribute.Key("foo")
}},
Stream{AllowAttributeKeys: []attribute.Key{"foo"}},
)),
).Meter("TestAttributeFilter")
require.NoError(t, tt.register(t, mtr))
Expand Down Expand Up @@ -1565,11 +1563,8 @@ func TestObservableExample(t *testing.T) {
selector := func(InstrumentKind) metricdata.Temporality { return temp }
reader := NewManualReader(WithTemporalitySelector(selector))

noopFilter := func(kv attribute.KeyValue) bool { return true }
noFiltered := NewView(Instrument{Name: instName}, Stream{Name: instName, AttributeFilter: noopFilter})

filter := func(kv attribute.KeyValue) bool { return kv.Key != "tid" }
filtered := NewView(Instrument{Name: instName}, Stream{Name: filteredStream, AttributeFilter: filter})
noFiltered := NewView(Instrument{Name: instName}, Stream{Name: instName})
filtered := NewView(Instrument{Name: instName}, Stream{Name: filteredStream, AllowAttributeKeys: []attribute.Key{"pid"}})

mp := NewMeterProvider(WithReader(reader), WithView(noFiltered, filtered))
meter := mp.Meter(scopeName)
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,8 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
if agg == nil { // Drop aggregator.
return aggVal[N]{nil, nil}
}
if stream.AttributeFilter != nil {
agg = aggregate.NewFilter(agg, stream.AttributeFilter)
if len(stream.AllowAttributeKeys) > 0 {
agg = aggregate.NewFilter(agg, stream.attributeFilter())
}

i.pipeline.addSync(scope, instrumentSync{
Expand Down
10 changes: 5 additions & 5 deletions sdk/metric/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ func NewView(criteria Instrument, mask Stream) View {
return func(i Instrument) (Stream, bool) {
if matchFunc(i) {
return Stream{
Name: nonZero(mask.Name, i.Name),
Description: nonZero(mask.Description, i.Description),
Unit: nonZero(mask.Unit, i.Unit),
Aggregation: agg,
AttributeFilter: mask.AttributeFilter,
Name: nonZero(mask.Name, i.Name),
Description: nonZero(mask.Description, i.Description),
Unit: nonZero(mask.Unit, i.Unit),
Aggregation: agg,
AllowAttributeKeys: mask.AllowAttributeKeys,
}, true
}
return Stream{}, false
Expand Down
29 changes: 12 additions & 17 deletions sdk/metric/view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,18 @@ func TestNewViewReplace(t *testing.T) {
}
},
},
{
name: "AttributeKeys",
mask: Stream{AllowAttributeKeys: []attribute.Key{"test"}},
want: func(i Instrument) Stream {
return Stream{
Name: i.Name,
Description: i.Description,
Unit: i.Unit,
AllowAttributeKeys: []attribute.Key{"test"},
}
},
},
{
name: "Complete",
mask: Stream{
Expand All @@ -430,23 +442,6 @@ func TestNewViewReplace(t *testing.T) {
assert.Equal(t, test.want(completeIP), got)
})
}

// Go does not allow for the comparison of function values, even their
// addresses. Therefore, the AttributeFilter field needs an alternative
// testing strategy.
t.Run("AttributeFilter", func(t *testing.T) {
allowed := attribute.String("key", "val")
filter := func(kv attribute.KeyValue) bool {
return kv == allowed
}
mask := Stream{AttributeFilter: filter}
got, match := NewView(completeIP, mask)(completeIP)
require.True(t, match, "view did not match exact criteria")
require.NotNil(t, got.AttributeFilter, "AttributeFilter not set")
assert.True(t, got.AttributeFilter(allowed), "wrong AttributeFilter")
other := attribute.String("key", "other val")
assert.False(t, got.AttributeFilter(other), "wrong AttributeFilter")
})
}

type badAgg struct {
Expand Down