Skip to content

Commit 3520a9e

Browse files
mariomacMrAlias
andauthored
Limit cardinality of Span Metrics (open-telemetry#545)
* first untested implementation of spannamelimiter * add testable LRU cache * expirable LRU: extra tests + eviction callback * unit tested implementation * some improvements in thelimiter. pipe tests not yet working * Fixed pipeline and some changes ready for review * Update pkg/components/helpers/cache/expirable_lru_test.go Co-authored-by: Tyler Yahn <[email protected]> * Update pkg/components/helpers/cache/expirable_lru_test.go Co-authored-by: Tyler Yahn <[email protected]> * Update pkg/components/helpers/cache/expirable_lru_test.go Co-authored-by: Tyler Yahn <[email protected]> * Update pkg/components/helpers/cache/expirable_lru_test.go Co-authored-by: Tyler Yahn <[email protected]> * Update pkg/components/helpers/cache/expirable_lru_test.go Co-authored-by: Tyler Yahn <[email protected]> * Update pkg/components/helpers/cache/expirable_lru_test.go Co-authored-by: Tyler Yahn <[email protected]> * Update pkg/components/helpers/cache/expirable_lru.go Co-authored-by: Tyler Yahn <[email protected]> * Update pkg/components/helpers/cache/expirable_lru.go Co-authored-by: Tyler Yahn <[email protected]> * Update pkg/app/request/span.go Co-authored-by: Tyler Yahn <[email protected]> * Update pkg/components/helpers/cache/expirable_lru.go Co-authored-by: Tyler Yahn <[email protected]> * fix linter complaint * Fix pipeline --------- Co-authored-by: Tyler Yahn <[email protected]>
1 parent 07de41f commit 3520a9e

File tree

14 files changed

+789
-40
lines changed

14 files changed

+789
-40
lines changed

devdocs/pipeline-map.md

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,14 @@ flowchart TD
3232
KD:::optional --> NR
3333
NR(Name resolver):::optional --> AF
3434
35-
AF(Attributes filter):::optional --> OTELM(OTEL<br/> metrics<br/> exporter):::optional
36-
AF --> OTELT(OTEL<br/> traces<br/> exporter):::optional
37-
AF --> PROM(Prometheus<br/>HTTP<br/>endpoint):::optional
38-
AF --> ALLOY(Alloy<br/>connector):::optional
35+
AF(Attributes filter):::optional --> OTELT(OTEL/ALLOY<br/> traces<br/> exporter):::optional
36+
37+
38+
AF --> IPD(Unknown IP<br/>dropper):::optional
39+
IPD --> SNCL(Span Name<br/>cardinality<br/>limiter)
40+
SNCL --> OTELRM(OTEL<br/>RED metrics<br/> exporter):::optional
41+
SNCL --> OTELSM(OTEL<br/>span/svc graph<br/>metrics<br/> exporter):::optional
42+
SNCL --> PROM(Prometheus<br/>HTTP<br/>endpoint):::optional
3943
end
4044
CU -.-> |New PIDs| KDB
4145
KDB(KubeDatabase):::optional <-.- | Aggregated & indexed Pod info | KD

pkg/app/request/span.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,10 @@ type Span struct {
181181
SQLCommand string `json:"-"`
182182
SQLError *SQLError `json:"-"`
183183
MessagingInfo *MessagingInfo `json:"-"`
184+
185+
// OverrideTraceName is set under some conditions, like spanmetrics reaching the maximum
186+
// cardinality for trace names.
187+
OverrideTraceName string `json:"-"`
184188
}
185189

186190
func (s *Span) Inside(parent *Span) bool {
@@ -533,6 +537,9 @@ func (s *Span) ServiceGraphKind() string {
533537
}
534538

535539
func (s *Span) TraceName() string {
540+
if s.OverrideTraceName != "" {
541+
return s.OverrideTraceName
542+
}
536543
switch s.Type {
537544
case EventTypeHTTP, EventTypeHTTPClient:
538545
name := s.Method
@@ -561,11 +568,11 @@ func (s *Span) TraceName() string {
561568
if s.Path == "" {
562569
return s.Method
563570
}
564-
return fmt.Sprintf("%s %s", s.Method, s.Path)
571+
return s.Method + " " + s.Path
565572
case EventTypeMongoClient:
566573
if s.Path != "" && s.Method != "" {
567574
// TODO for database operations like listCollections, we need to use s.DbNamespace instead of s.Path
568-
return fmt.Sprintf("%s %s", s.Method, s.Path)
575+
return s.Method + " " + s.Path
569576
}
570577
if s.Path != "" {
571578
return s.Path
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package cache
5+
6+
import (
7+
"container/list"
8+
"time"
9+
)
10+
11+
// ExpirableLRU cache. It is not safe for concurrent access.
12+
type ExpirableLRU[K comparable, V any] struct {
13+
ttl time.Duration
14+
ll *list.List
15+
cache map[K]*list.Element
16+
evictCallBack func(K, V)
17+
}
18+
19+
type expirableEntry[K comparable, V any] struct {
20+
key K
21+
value V
22+
lastAccess time.Time
23+
}
24+
25+
// ExpirableCacheOpt defines a functional option for configuring an ExpirableLRU cache.
26+
type ExpirableCacheOpt[K comparable, V any] func(*ExpirableLRU[K, V])
27+
28+
// WithEvictCallBack sets a callback function to be called whenever an entry is evicted
29+
// from the [ExpirableLRU] cache.
30+
func WithEvictCallBack[K comparable, V any](cb func(K, V)) ExpirableCacheOpt[K, V] {
31+
return func(c *ExpirableLRU[K, V]) {
32+
c.evictCallBack = cb
33+
}
34+
}
35+
36+
// NewExpirableLRU creates a new [ExpirableLRU] whose entries older than the provided TTL are removed
37+
// only when the ExpireAll method is explicitly called.
38+
func NewExpirableLRU[K comparable, V any](ttl time.Duration, opts ...ExpirableCacheOpt[K, V]) *ExpirableLRU[K, V] {
39+
lru := &ExpirableLRU[K, V]{
40+
ttl: ttl,
41+
ll: list.New(),
42+
cache: map[K]*list.Element{},
43+
evictCallBack: func(K, V) {},
44+
}
45+
for _, opt := range opts {
46+
opt(lru)
47+
}
48+
return lru
49+
}
50+
51+
// Put a value into the cache.
52+
func (c *ExpirableLRU[K, V]) Put(key K, value V) {
53+
timeNow := time.Now()
54+
if ee, ok := c.cache[key]; ok {
55+
c.ll.MoveToFront(ee)
56+
entry := ee.Value.(*expirableEntry[K, V])
57+
entry.value = value
58+
entry.lastAccess = timeNow
59+
return
60+
}
61+
ele := c.ll.PushFront(&expirableEntry[K, V]{key: key, value: value, lastAccess: timeNow})
62+
c.cache[key] = ele
63+
}
64+
65+
// Get looks up a key's value from the cache.
66+
func (c *ExpirableLRU[K, V]) Get(key K) (value V, ok bool) {
67+
if ele, hit := c.cache[key]; hit {
68+
c.ll.MoveToFront(ele)
69+
ele.Value.(*expirableEntry[K, V]).lastAccess = time.Now()
70+
return ele.Value.(*expirableEntry[K, V]).value, true
71+
}
72+
return
73+
}
74+
75+
// Remove the provided key from the cache.
76+
func (c *ExpirableLRU[K, V]) Remove(key K) {
77+
if ele, hit := c.cache[key]; hit {
78+
c.removeElement(ele)
79+
}
80+
}
81+
82+
func (c *ExpirableLRU[K, V]) removeElement(e *list.Element) {
83+
kv := e.Value.(*expirableEntry[K, V])
84+
c.evictCallBack(kv.key, kv.value)
85+
c.ll.Remove(e)
86+
delete(c.cache, kv.key)
87+
}
88+
89+
// ExpireAll removes all the entries that are older than the cache TTL.
90+
// Returns the number of entries removed.
91+
func (c *ExpirableLRU[K, V]) ExpireAll() int {
92+
now := time.Now()
93+
removed := 0
94+
for older := c.ll.Back(); c.expired(older, now); older = c.ll.Back() {
95+
removed++
96+
c.removeElement(older)
97+
}
98+
return removed
99+
}
100+
101+
func (c *ExpirableLRU[K, V]) expired(elem *list.Element, now time.Time) bool {
102+
return elem != nil &&
103+
elem.Value.(*expirableEntry[K, V]).lastAccess.Add(c.ttl).Before(now)
104+
}
105+
106+
// Len returns the number of elements in the cache.
107+
func (c *ExpirableLRU[K, V]) Len() int {
108+
return c.ll.Len()
109+
}
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package cache
5+
6+
import (
7+
"testing"
8+
"testing/synctest"
9+
"time"
10+
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
)
14+
15+
func TestExpirableLRU_PutGetRemove(t *testing.T) {
16+
lru := NewExpirableLRU[string, string](5 * time.Minute)
17+
18+
lru.Put("foo", "bar")
19+
v, ok := lru.Get("foo")
20+
assert.True(t, ok)
21+
assert.Equal(t, "bar", v)
22+
assert.Equal(t, 1, lru.Len())
23+
24+
v, ok = lru.Get("baz")
25+
assert.False(t, ok)
26+
assert.Empty(t, v)
27+
28+
lru.Put("baz", "bae")
29+
v, ok = lru.Get("baz")
30+
assert.True(t, ok)
31+
assert.Equal(t, "bae", v)
32+
assert.Equal(t, 2, lru.Len())
33+
34+
lru.Remove("foo")
35+
_, ok = lru.Get("foo")
36+
assert.False(t, ok)
37+
assert.Equal(t, 1, lru.Len())
38+
}
39+
40+
func TestExpirableLRU_ExpireAll(t *testing.T) {
41+
synctest.Test(t, func(t *testing.T) {
42+
cache := NewExpirableLRU[string, string](5 * time.Minute)
43+
44+
// Add items at different times
45+
cache.Put("key1", "value1")
46+
// Advance time by 2 minutes
47+
time.Sleep(2 * time.Minute)
48+
cache.Put("key2", "value2")
49+
// Advance time by 2 more minutes (key1 is now 4 minutes old, key2 is 2 minutes old)
50+
time.Sleep(2 * time.Minute)
51+
cache.Put("key3", "value3")
52+
53+
assert.Zero(t, cache.ExpireAll())
54+
55+
// All items should be there(none have exceeded 5-minute TTL)
56+
assert.Equal(t, 3, cache.Len())
57+
58+
v, ok := cache.Get("key1")
59+
require.True(t, ok, "Expected to find key1")
60+
assert.Equal(t, "value1", v)
61+
time.Sleep(2 * time.Minute)
62+
63+
v, ok = cache.Get("key2")
64+
require.True(t, ok, "Expected to find key2")
65+
assert.Equal(t, "value2", v)
66+
time.Sleep(2 * time.Minute)
67+
68+
v, ok = cache.Get("key3")
69+
require.True(t, ok, "Expected to find key3")
70+
assert.Equal(t, "value3", v)
71+
time.Sleep(2 * time.Minute)
72+
73+
assert.Equal(t, 1, cache.ExpireAll())
74+
75+
// key1 is 6 minutes old, should expire while others are still valid
76+
assert.Equal(t, 2, cache.Len())
77+
78+
_, ok = cache.Get("key1")
79+
assert.False(t, ok, "Expected key1 to be expired")
80+
v, ok = cache.Get("key2")
81+
require.True(t, ok, "Expected to find key2")
82+
assert.Equal(t, "value2", v)
83+
v, ok = cache.Get("key3")
84+
require.True(t, ok, "Expected to find key3")
85+
assert.Equal(t, "value3", v)
86+
87+
// Advance age by two minute:
88+
// - "key1": removed (expired)
89+
// - "key2": 6 minutes old
90+
// - "key3": 4 minutes old
91+
time.Sleep(2 * time.Minute)
92+
93+
// add key 4
94+
cache.Put("key4", "value4")
95+
// Advance age by four minute:
96+
// - "key1": removed (expired)
97+
// - "key2": 10 minutes old
98+
// - "key3": 8 minutes old
99+
// - "key4": 2 minutes old
100+
time.Sleep(4 * time.Minute)
101+
102+
// all the keys but key4 should be expired
103+
assert.Equal(t, 2, cache.ExpireAll())
104+
assert.Equal(t, 1, cache.Len())
105+
106+
_, ok = cache.Get("key1")
107+
assert.False(t, ok, "Expected key1 to be expired")
108+
_, ok = cache.Get("key2")
109+
assert.False(t, ok, "Expected key2 to be expired")
110+
_, ok = cache.Get("key3")
111+
assert.False(t, ok, "Expected key3 to be expired")
112+
v, ok = cache.Get("key4")
113+
require.True(t, ok, "Expected to find key4")
114+
assert.Equal(t, "value4", v)
115+
116+
// Advance age by four minute:
117+
// - "key1": removed (expired)
118+
// - "key2": removed (expired)
119+
// - "key3": removed (expired)
120+
// - "key4": 6 minutes old
121+
time.Sleep(4 * time.Minute)
122+
123+
// a re-added key should not expire
124+
cache.Put("key1", "value1")
125+
// Advance age by two minute:
126+
// - "key1": two minutes old
127+
// - "key2": removed (expired)
128+
// - "key3": removed (expired)
129+
// - "key4": 10 minutes old
130+
time.Sleep(2 * time.Minute)
131+
132+
assert.Equal(t, 1, cache.ExpireAll())
133+
134+
assert.Equal(t, 1, cache.Len())
135+
v, ok = cache.Get("key1")
136+
require.True(t, ok, "Expected to find key1")
137+
assert.Equal(t, "value1", v)
138+
_, ok = cache.Get("key2")
139+
assert.False(t, ok, "Expected key2 to be expired")
140+
_, ok = cache.Get("key3")
141+
assert.False(t, ok, "expected key 3 to be expired")
142+
_, ok = cache.Get("key4")
143+
assert.False(t, ok, "expected key 4 to be expired")
144+
})
145+
}
146+
147+
func TestWithEvictCallBack(t *testing.T) {
148+
synctest.Test(t, func(t *testing.T) {
149+
var evictedKeys []int
150+
var evictedVals []string
151+
cache := NewExpirableLRU[int, string](5*time.Minute,
152+
WithEvictCallBack(func(k int, v string) {
153+
evictedKeys = append(evictedKeys, k)
154+
evictedVals = append(evictedVals, v)
155+
}))
156+
157+
cache.Put(1, "one")
158+
time.Sleep(1 * time.Minute)
159+
cache.Put(2, "two")
160+
time.Sleep(2 * time.Minute)
161+
cache.Put(3, "three")
162+
163+
assert.Zero(t, cache.ExpireAll())
164+
assert.Empty(t, evictedKeys)
165+
assert.Empty(t, evictedVals)
166+
167+
time.Sleep(4 * time.Minute)
168+
169+
assert.Equal(t, 2, cache.ExpireAll())
170+
assert.Equal(t, []int{1, 2}, evictedKeys)
171+
assert.Equal(t, []string{"one", "two"}, evictedVals)
172+
})
173+
}
174+
175+
func TestPutAlsoUpdatesTTL(t *testing.T) {
176+
synctest.Test(t, func(t *testing.T) {
177+
cache := NewExpirableLRU[int, string](5 * time.Minute)
178+
cache.Put(1, "one")
179+
180+
time.Sleep(3 * time.Minute)
181+
cache.Put(2, "two")
182+
183+
assert.Zero(t, cache.ExpireAll())
184+
185+
time.Sleep(3 * time.Minute)
186+
cache.Put(1, "another")
187+
time.Sleep(3 * time.Minute)
188+
189+
assert.Equal(t, 1, cache.ExpireAll())
190+
assert.Equal(t, 1, cache.Len())
191+
_, ok := cache.Get(1)
192+
assert.True(t, ok, "Expected to find key1")
193+
_, ok = cache.Get(2)
194+
assert.False(t, ok, "Expected key2 to be expired")
195+
})
196+
}

0 commit comments

Comments
 (0)