Skip to content

Commit f715c21

Browse files
fix: same target from two different jobs missing in targetallocator (#4066)
* initial changes * fix test * remove print statements * address feedback * cleanup * Fix linter warnings * Fix more linter warnings --------- Co-authored-by: Mikołaj Świątek <[email protected]>
1 parent ba12c33 commit f715c21

File tree

3 files changed

+160
-5
lines changed

3 files changed

+160
-5
lines changed

.chloggen/labelshash.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: 'bug_fix'
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
5+
component: 'target allocator'
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: 'Fixes an issue where the same target from two different jobs was being allocated for only one job'
9+
10+
# One or more tracking issues related to the change
11+
issues: [4044]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext:

cmd/otel-allocator/internal/target/discovery_test.go

Lines changed: 104 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,12 @@ func TestDiscovery(t *testing.T) {
7474
defer cancelFunc()
7575

7676
go func() {
77-
err := d.Run()
78-
assert.Error(t, err)
77+
runErr := d.Run()
78+
assert.Error(t, runErr)
7979
}()
8080
go func() {
81-
err := manager.Run()
82-
assert.NoError(t, err)
81+
runErr := manager.Run()
82+
assert.NoError(t, runErr)
8383
}()
8484
for _, tt := range tests {
8585
t.Run(tt.name, func(t *testing.T) {
@@ -341,6 +341,106 @@ func TestDiscovery_ScrapeConfigHashing(t *testing.T) {
341341
}
342342
}
343343

344+
func TestDiscoveryTargetHashing(t *testing.T) {
345+
tests := []struct {
346+
description string
347+
cfg *promconfig.Config
348+
}{
349+
{
350+
description: "same targets in two different jobs",
351+
cfg: &promconfig.Config{
352+
ScrapeConfigs: []*promconfig.ScrapeConfig{
353+
{
354+
JobName: "prometheus",
355+
HonorTimestamps: true,
356+
ScrapeInterval: model.Duration(30 * time.Second),
357+
ScrapeProtocols: defaultScrapeProtocols,
358+
ScrapeTimeout: model.Duration(30 * time.Second),
359+
MetricsPath: "/metrics",
360+
Scheme: "http",
361+
ServiceDiscoveryConfigs: discovery.Configs{
362+
discovery.StaticConfig{
363+
{
364+
Targets: []model.LabelSet{
365+
{"__address__": "prom.domain:9001"},
366+
{"__address__": "prom.domain:9002"},
367+
{"__address__": "prom.domain:9003"},
368+
},
369+
},
370+
},
371+
},
372+
},
373+
{
374+
JobName: "prometheus2",
375+
HonorTimestamps: true,
376+
ScrapeInterval: model.Duration(30 * time.Second),
377+
ScrapeProtocols: defaultScrapeProtocols,
378+
ScrapeTimeout: model.Duration(30 * time.Second),
379+
MetricsPath: "/metrics2",
380+
Scheme: "http",
381+
ServiceDiscoveryConfigs: discovery.Configs{
382+
discovery.StaticConfig{
383+
{
384+
Targets: []model.LabelSet{
385+
{"__address__": "prom.domain:9001"},
386+
{"__address__": "prom.domain:9002"},
387+
},
388+
},
389+
},
390+
},
391+
},
392+
},
393+
},
394+
},
395+
}
396+
scu := &mockScrapeConfigUpdater{}
397+
ctx, cancelFunc := context.WithCancel(context.Background())
398+
registry := prometheus.NewRegistry()
399+
sdMetrics, err := discovery.CreateAndRegisterSDMetrics(registry)
400+
require.NoError(t, err)
401+
d := discovery.NewManager(ctx, config.NopLogger, registry, sdMetrics)
402+
results := make(chan []*Item)
403+
manager := NewDiscoverer(ctrl.Log.WithName("test"), d, nil, scu, func(targets []*Item) {
404+
var result []*Item
405+
result = append(result, targets...)
406+
results <- result
407+
})
408+
409+
defer manager.Close()
410+
defer cancelFunc()
411+
412+
go func() {
413+
runErr := d.Run()
414+
assert.Error(t, runErr)
415+
}()
416+
go func() {
417+
runErr := manager.Run()
418+
assert.NoError(t, runErr)
419+
}()
420+
421+
for _, tt := range tests {
422+
t.Run(tt.description, func(t *testing.T) {
423+
assert.NoError(t, err)
424+
assert.True(t, len(tt.cfg.ScrapeConfigs) > 0)
425+
err = manager.ApplyConfig(allocatorWatcher.EventSourcePrometheusCR, tt.cfg.ScrapeConfigs)
426+
assert.NoError(t, err)
427+
428+
gotTargets := <-results
429+
430+
// Verify that all targets have different hashes
431+
targetHashes := make(map[ItemHash]bool)
432+
for _, target := range gotTargets {
433+
hash := target.Hash()
434+
if _, exists := targetHashes[hash]; exists {
435+
t.Errorf("Duplicate hash %d found for target %s (%s)", hash, target.TargetURL, target.JobName)
436+
}
437+
targetHashes[hash] = true
438+
}
439+
assert.Equal(t, len(gotTargets), len(targetHashes), "Number of unique hashes should match number of targets")
440+
})
441+
}
442+
}
443+
344444
func TestDiscovery_NoConfig(t *testing.T) {
345445
scu := &mockScrapeConfigUpdater{mockCfg: map[string]*promconfig.ScrapeConfig{}}
346446
ctx, cancelFunc := context.WithCancel(context.Background())

cmd/otel-allocator/internal/target/target.go

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package target
55

66
import (
7+
"github.com/cespare/xxhash/v2"
78
"github.com/prometheus/prometheus/model/labels"
89
)
910

@@ -33,7 +34,7 @@ type Item struct {
3334

3435
func (t *Item) Hash() ItemHash {
3536
if t.hash == 0 {
36-
t.hash = ItemHash(t.Labels.Hash())
37+
t.hash = ItemHash(LabelsHashWithJobName(t.Labels, t.JobName))
3738
}
3839
return t.hash
3940
}
@@ -64,3 +65,41 @@ func NewItem(jobName string, targetURL string, labels labels.Labels, collectorNa
6465
CollectorName: collectorName,
6566
}
6667
}
68+
69+
// LabelsHashWithJobName computes a hash of the labels and the job name.
70+
// Same logic as Prometheus labels.Hash: https://github.com/prometheus/prometheus/blob/8fd46f74aa0155e4d5aa30654f9c02e564e03743/model/labels/labels.go#L72
71+
// but adds in the job name since this is not in the labelset from the discovery manager.
72+
// The scrape manager adds it later. Address is already included in the labels, so it is not needed here.
73+
func LabelsHashWithJobName(ls labels.Labels, jobName string) uint64 {
74+
var sep byte = '\xff'
75+
var seps = []byte{sep}
76+
77+
// Use xxhash.Sum64(b) for fast path as it's faster.
78+
b := make([]byte, 0, 1024)
79+
80+
// Differs from Prometheus implementation by adding job name.
81+
b = append(b, jobName...)
82+
b = append(b, sep)
83+
84+
for i, v := range ls {
85+
if len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) {
86+
// If labels entry is 1KB+ do not allocate whole entry.
87+
h := xxhash.New()
88+
_, _ = h.Write(b)
89+
for _, v := range ls[i:] {
90+
_, _ = h.WriteString(v.Name)
91+
_, _ = h.Write(seps)
92+
_, _ = h.WriteString(v.Value)
93+
_, _ = h.Write(seps)
94+
}
95+
return h.Sum64()
96+
}
97+
98+
b = append(b, v.Name...)
99+
b = append(b, sep)
100+
b = append(b, v.Value...)
101+
b = append(b, sep)
102+
}
103+
104+
return xxhash.Sum64(b)
105+
}

0 commit comments

Comments
 (0)