From b387afd1cf82579d7b9b89a2c6764cd2003b5cd7 Mon Sep 17 00:00:00 2001 From: Nicolas Takashi Date: Thu, 19 Dec 2024 16:13:43 +0000 Subject: [PATCH] [CHORE] processing discovered targets async (#3517) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Revert "Support configuring java runtime from configmap or secret (env.valueFrom)" (#3510) * Revert "Support configuring java runtime from configmap or secret (env.valueF…" This reverts commit 2b36f0d6f9498e3c82185a4a18f0c855c4da4a57. * chlog (#3511) * [CHORE] changing log level Signed-off-by: Nicolas Takashi * [CHORE] renaming method Signed-off-by: Nicolas Takashi * [CHORE] adding change log entry Signed-off-by: Nicolas Takashi * [CHORE] locking targets per job Signed-off-by: Nicolas Takashi * Update .chloggen/discovering-target-async.yaml Co-authored-by: Mikołaj Świątek * [REFACTORY] applying comments Signed-off-by: Nicolas Takashi * [CHORE] adding mutex back Signed-off-by: Nicolas Takashi --------- Signed-off-by: Nicolas Takashi Co-authored-by: Jacob Aronoff Co-authored-by: Mikołaj Świątek --- .chloggen/discovering-target-async.yaml | 21 +++ cmd/otel-allocator/benchmark_test.go | 28 ++-- cmd/otel-allocator/main.go | 4 +- cmd/otel-allocator/target/discovery.go | 177 +++++++++++++++----- cmd/otel-allocator/target/discovery_test.go | 24 +-- 5 files changed, 187 insertions(+), 67 deletions(-) create mode 100755 .chloggen/discovering-target-async.yaml diff --git a/.chloggen/discovering-target-async.yaml b/.chloggen/discovering-target-async.yaml new file mode 100755 index 0000000000..e2291628c6 --- /dev/null +++ b/.chloggen/discovering-target-async.yaml @@ -0,0 +1,21 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action) +component: target allocator + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Process discovered targets asyncchronously + +# One or more tracking issues related to the change +issues: [1842] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This change enables the target allocator to process discovered targets asynchronously. + This is a significant performance improvement for the target allocator, as it allows it to process targets in parallel, rather than sequentially. + This change also introduces new metrics to track the performance of the target allocator. + - opentelemetry_allocator_process_targets_duration_seconds: The duration of the process targets operation. + - opentelemetry_allocator_process_target_groups_duration_seconds: The duration of the process target groups operation. diff --git a/cmd/otel-allocator/benchmark_test.go b/cmd/otel-allocator/benchmark_test.go index 7b6c644347..0fc486d5f6 100644 --- a/cmd/otel-allocator/benchmark_test.go +++ b/cmd/otel-allocator/benchmark_test.go @@ -28,7 +28,6 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery/targetgroup" - "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" "github.com/stretchr/testify/require" ctrl "sigs.k8s.io/controller-runtime" @@ -45,18 +44,17 @@ import ( // the HTTP server afterward. Test data is chosen to be reasonably representative of what the Prometheus service discovery // outputs in the real world. func BenchmarkProcessTargets(b *testing.B) { - numTargets := 10000 + numTargets := 800000 targetsPerGroup := 5 groupsPerJob := 20 tsets := prepareBenchmarkData(numTargets, targetsPerGroup, groupsPerJob) - labelsBuilder := labels.NewBuilder(labels.EmptyLabels()) - - b.ResetTimer() for _, strategy := range allocation.GetRegisteredAllocatorNames() { b.Run(strategy, func(b *testing.B) { - targetDiscoverer, allocator := createTestDiscoverer(strategy, map[string][]*relabel.Config{}) + targetDiscoverer := createTestDiscoverer(strategy, map[string][]*relabel.Config{}) + targetDiscoverer.UpdateTsets(tsets) + b.ResetTimer() for i := 0; i < b.N; i++ { - targetDiscoverer.ProcessTargets(labelsBuilder, tsets, allocator.SetTargets) + targetDiscoverer.Reload() } }) } @@ -65,11 +63,10 @@ func BenchmarkProcessTargets(b *testing.B) { // BenchmarkProcessTargetsWithRelabelConfig is BenchmarkProcessTargets with a relabel config set. The relabel config // does not actually modify any records, but does force the prehook to perform any necessary conversions along the way. func BenchmarkProcessTargetsWithRelabelConfig(b *testing.B) { - numTargets := 10000 + numTargets := 800000 targetsPerGroup := 5 groupsPerJob := 20 tsets := prepareBenchmarkData(numTargets, targetsPerGroup, groupsPerJob) - labelsBuilder := labels.NewBuilder(labels.EmptyLabels()) prehookConfig := make(map[string][]*relabel.Config, len(tsets)) for jobName := range tsets { // keep all targets in half the jobs, drop the rest @@ -91,12 +88,13 @@ func BenchmarkProcessTargetsWithRelabelConfig(b *testing.B) { } } - b.ResetTimer() for _, strategy := range allocation.GetRegisteredAllocatorNames() { b.Run(strategy, func(b *testing.B) { - targetDiscoverer, allocator := createTestDiscoverer(strategy, prehookConfig) + targetDiscoverer := createTestDiscoverer(strategy, prehookConfig) + targetDiscoverer.UpdateTsets(tsets) + b.ResetTimer() for i := 0; i < b.N; i++ { - targetDiscoverer.ProcessTargets(labelsBuilder, tsets, allocator.SetTargets) + targetDiscoverer.Reload() } }) } @@ -172,7 +170,7 @@ func prepareBenchmarkData(numTargets, targetsPerGroup, groupsPerJob int) map[str return tsets } -func createTestDiscoverer(allocationStrategy string, prehookConfig map[string][]*relabel.Config) (*target.Discoverer, allocation.Allocator) { +func createTestDiscoverer(allocationStrategy string, prehookConfig map[string][]*relabel.Config) *target.Discoverer { ctx := context.Background() logger := ctrl.Log.WithName(fmt.Sprintf("bench-%s", allocationStrategy)) ctrl.SetLogger(logr.New(log.NullLogSink{})) @@ -187,6 +185,6 @@ func createTestDiscoverer(allocationStrategy string, prehookConfig map[string][] registry := prometheus.NewRegistry() sdMetrics, _ := discovery.CreateAndRegisterSDMetrics(registry) discoveryManager := discovery.NewManager(ctx, gokitlog.NewNopLogger(), registry, sdMetrics) - targetDiscoverer := target.NewDiscoverer(logger, discoveryManager, allocatorPrehook, srv) - return targetDiscoverer, allocator + targetDiscoverer := target.NewDiscoverer(logger, discoveryManager, allocatorPrehook, srv, allocator.SetTargets) + return targetDiscoverer } diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index be2418902e..eff7502dcd 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -112,7 +112,7 @@ func main() { } discoveryManager = discovery.NewManager(discoveryCtx, gokitlog.NewNopLogger(), prometheus.DefaultRegisterer, sdMetrics) - targetDiscoverer = target.NewDiscoverer(log, discoveryManager, allocatorPrehook, srv) + targetDiscoverer = target.NewDiscoverer(log, discoveryManager, allocatorPrehook, srv, allocator.SetTargets) collectorWatcher, collectorWatcherErr := collector.NewCollectorWatcher(log, cfg.ClusterConfig) if collectorWatcherErr != nil { setupLog.Error(collectorWatcherErr, "Unable to initialize collector watcher") @@ -175,7 +175,7 @@ func main() { setupLog.Info("Prometheus config empty, skipping initial discovery configuration") } - err := targetDiscoverer.Watch(allocator.SetTargets) + err := targetDiscoverer.Run() setupLog.Info("Target discoverer exited") return err }, diff --git a/cmd/otel-allocator/target/discovery.go b/cmd/otel-allocator/target/discovery.go index eb7498e5ad..6f2ddc931b 100644 --- a/cmd/otel-allocator/target/discovery.go +++ b/cmd/otel-allocator/target/discovery.go @@ -17,6 +17,8 @@ package target import ( "hash" "hash/fnv" + "sync" + "time" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" @@ -27,6 +29,7 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" + "go.uber.org/zap/zapcore" "gopkg.in/yaml.v3" allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" @@ -37,16 +40,33 @@ var ( Name: "opentelemetry_allocator_targets", Help: "Number of targets discovered.", }, []string{"job_name"}) + + processTargetsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "opentelemetry_allocator_process_targets_duration_seconds", + Help: "Duration of processing targets.", + Buckets: []float64{1, 5, 10, 30, 60, 120}, + }) + + processTargetGroupsDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "opentelemetry_allocator_process_target_groups_duration_seconds", + Help: "Duration of processing target groups.", + Buckets: []float64{1, 5, 10, 30, 60, 120}, + }, []string{"job_name"}) ) type Discoverer struct { - log logr.Logger - manager *discovery.Manager - close chan struct{} - configsMap map[allocatorWatcher.EventSource][]*promconfig.ScrapeConfig - hook discoveryHook - scrapeConfigsHash hash.Hash - scrapeConfigsUpdater scrapeConfigsUpdater + log logr.Logger + manager *discovery.Manager + close chan struct{} + mtxScrape sync.Mutex // Guards the fields below. + configsMap map[allocatorWatcher.EventSource][]*promconfig.ScrapeConfig + hook discoveryHook + scrapeConfigsHash hash.Hash + scrapeConfigsUpdater scrapeConfigsUpdater + targetSets map[string][]*targetgroup.Group + triggerReload chan struct{} + processTargetsCallBack func(targets map[string]*Item) + mtxTargets sync.Mutex } type discoveryHook interface { @@ -57,15 +77,17 @@ type scrapeConfigsUpdater interface { UpdateScrapeConfigResponse(map[string]*promconfig.ScrapeConfig) error } -func NewDiscoverer(log logr.Logger, manager *discovery.Manager, hook discoveryHook, scrapeConfigsUpdater scrapeConfigsUpdater) *Discoverer { +func NewDiscoverer(log logr.Logger, manager *discovery.Manager, hook discoveryHook, scrapeConfigsUpdater scrapeConfigsUpdater, setTargets func(targets map[string]*Item)) *Discoverer { return &Discoverer{ - log: log, - manager: manager, - close: make(chan struct{}), - configsMap: make(map[allocatorWatcher.EventSource][]*promconfig.ScrapeConfig), - hook: hook, - scrapeConfigsHash: nil, // we want the first update to succeed even if the config is empty - scrapeConfigsUpdater: scrapeConfigsUpdater, + log: log, + manager: manager, + close: make(chan struct{}), + triggerReload: make(chan struct{}, 1), + configsMap: make(map[allocatorWatcher.EventSource][]*promconfig.ScrapeConfig), + hook: hook, + scrapeConfigsHash: nil, // we want the first update to succeed even if the config is empty + scrapeConfigsUpdater: scrapeConfigsUpdater, + processTargetsCallBack: setTargets, } } @@ -105,43 +127,122 @@ func (m *Discoverer) ApplyConfig(source allocatorWatcher.EventSource, scrapeConf return m.manager.ApplyConfig(discoveryCfg) } -func (m *Discoverer) Watch(fn func(targets map[string]*Item)) error { - labelsBuilder := labels.NewBuilder(labels.EmptyLabels()) +func (m *Discoverer) Run() error { + err := m.run(m.manager.SyncCh()) + if err != nil { + m.log.Error(err, "Service Discovery watch event failed") + return err + } + <-m.close + m.log.Info("Service Discovery watch event stopped: discovery manager closed") + return nil +} + +// UpdateTsets updates the target sets to be scraped. +func (m *Discoverer) UpdateTsets(tsets map[string][]*targetgroup.Group) { + m.mtxScrape.Lock() + m.targetSets = tsets + m.mtxScrape.Unlock() +} + +// reloader triggers a reload of the scrape configs at regular intervals. +// The time between reloads is defined by reloadIntervalDuration to avoid overloading the system +// with too many reloads, because some service discovery mechanisms can be quite chatty. +func (m *Discoverer) reloader() { + reloadIntervalDuration := model.Duration(5 * time.Second) + ticker := time.NewTicker(time.Duration(reloadIntervalDuration)) + + defer ticker.Stop() + for { select { case <-m.close: - m.log.Info("Service Discovery watch event stopped: discovery manager closed") - return nil - case tsets := <-m.manager.SyncCh(): - m.ProcessTargets(labelsBuilder, tsets, fn) + return + case <-ticker.C: + select { + case <-m.triggerReload: + m.Reload() + case <-m.close: + return + } } } } -func (m *Discoverer) ProcessTargets(builder *labels.Builder, tsets map[string][]*targetgroup.Group, fn func(targets map[string]*Item)) { +// Reload triggers a reload of the scrape configs. +// This will process the target groups and update the targets concurrently. +func (m *Discoverer) Reload() { + m.mtxScrape.Lock() + var wg sync.WaitGroup targets := map[string]*Item{} + timer := prometheus.NewTimer(processTargetsDuration) + defer timer.ObserveDuration() + + for jobName, groups := range m.targetSets { + wg.Add(1) + // Run the sync in parallel as these take a while and at high load can't catch up. + go func(jobName string, groups []*targetgroup.Group) { + processedTargets := m.processTargetGroups(jobName, groups) + m.mtxTargets.Lock() + for k, v := range processedTargets { + targets[k] = v + } + m.mtxTargets.Unlock() + wg.Done() + }(jobName, groups) + } + m.mtxScrape.Unlock() + wg.Wait() + m.processTargetsCallBack(targets) +} - for jobName, tgs := range tsets { - var count float64 = 0 - for _, tg := range tgs { - builder.Reset(labels.EmptyLabels()) - for ln, lv := range tg.Labels { +// processTargetGroups processes the target groups and returns a map of targets. +func (m *Discoverer) processTargetGroups(jobName string, groups []*targetgroup.Group) map[string]*Item { + builder := labels.NewBuilder(labels.Labels{}) + timer := prometheus.NewTimer(processTargetGroupsDuration.WithLabelValues(jobName)) + targets := map[string]*Item{} + defer timer.ObserveDuration() + var count float64 = 0 + for _, tg := range groups { + builder.Reset(labels.EmptyLabels()) + for ln, lv := range tg.Labels { + builder.Set(string(ln), string(lv)) + } + groupLabels := builder.Labels() + for _, t := range tg.Targets { + count++ + builder.Reset(groupLabels) + for ln, lv := range t { builder.Set(string(ln), string(lv)) } - groupLabels := builder.Labels() - for _, t := range tg.Targets { - count++ - builder.Reset(groupLabels) - for ln, lv := range t { - builder.Set(string(ln), string(lv)) - } - item := NewItem(jobName, string(t[model.AddressLabel]), builder.Labels(), "") - targets[item.Hash()] = item + item := NewItem(jobName, string(t[model.AddressLabel]), builder.Labels(), "") + targets[item.Hash()] = item + } + } + targetsDiscovered.WithLabelValues(jobName).Set(count) + return targets +} + +// Run receives and saves target set updates and triggers the scraping loops reloading. +// Reloading happens in the background so that it doesn't block receiving targets updates. +func (m *Discoverer) run(tsets <-chan map[string][]*targetgroup.Group) error { + go m.reloader() + for { + select { + case ts := <-tsets: + m.log.V(int(zapcore.DebugLevel)).Info("Service Discovery watch event received", "targets groups", len(ts)) + m.UpdateTsets(ts) + + select { + case m.triggerReload <- struct{}{}: + default: } + + case <-m.close: + m.log.Info("Service Discovery watch event stopped: discovery manager closed") + return nil } - targetsDiscovered.WithLabelValues(jobName).Set(count) } - fn(targets) } func (m *Discoverer) Close() { diff --git a/cmd/otel-allocator/target/discovery_test.go b/cmd/otel-allocator/target/discovery_test.go index 7eb2883ee9..c53228f072 100644 --- a/cmd/otel-allocator/target/discovery_test.go +++ b/cmd/otel-allocator/target/discovery_test.go @@ -73,24 +73,24 @@ func TestDiscovery(t *testing.T) { sdMetrics, err := discovery.CreateAndRegisterSDMetrics(registry) require.NoError(t, err) d := discovery.NewManager(ctx, gokitlog.NewNopLogger(), registry, sdMetrics) - manager := NewDiscoverer(ctrl.Log.WithName("test"), d, nil, scu) + results := make(chan []string) + manager := NewDiscoverer(ctrl.Log.WithName("test"), d, nil, scu, func(targets map[string]*Item) { + var result []string + for _, t := range targets { + result = append(result, t.TargetURL) + } + results <- result + }) defer func() { manager.Close() }() defer cancelFunc() - results := make(chan []string) go func() { err := d.Run() assert.Error(t, err) }() go func() { - err := manager.Watch(func(targets map[string]*Item) { - var result []string - for _, t := range targets { - result = append(result, t.TargetURL) - } - results <- result - }) + err := manager.Run() assert.NoError(t, err) }() for _, tt := range tests { @@ -321,7 +321,7 @@ func TestDiscovery_ScrapeConfigHashing(t *testing.T) { sdMetrics, err := discovery.CreateAndRegisterSDMetrics(registry) require.NoError(t, err) d := discovery.NewManager(ctx, gokitlog.NewNopLogger(), registry, sdMetrics) - manager := NewDiscoverer(ctrl.Log.WithName("test"), d, nil, scu) + manager := NewDiscoverer(ctrl.Log.WithName("test"), d, nil, scu, nil) for _, tc := range tests { t.Run(tc.description, func(t *testing.T) { @@ -360,7 +360,7 @@ func TestDiscovery_NoConfig(t *testing.T) { sdMetrics, err := discovery.CreateAndRegisterSDMetrics(registry) require.NoError(t, err) d := discovery.NewManager(ctx, gokitlog.NewNopLogger(), registry, sdMetrics) - manager := NewDiscoverer(ctrl.Log.WithName("test"), d, nil, scu) + manager := NewDiscoverer(ctrl.Log.WithName("test"), d, nil, scu, nil) defer close(manager.close) defer cancelFunc() @@ -410,7 +410,7 @@ func BenchmarkApplyScrapeConfig(b *testing.B) { sdMetrics, err := discovery.CreateAndRegisterSDMetrics(registry) require.NoError(b, err) d := discovery.NewManager(ctx, gokitlog.NewNopLogger(), registry, sdMetrics) - manager := NewDiscoverer(ctrl.Log.WithName("test"), d, nil, scu) + manager := NewDiscoverer(ctrl.Log.WithName("test"), d, nil, scu, nil) b.ResetTimer() for i := 0; i < b.N; i++ {