From 7070128f443f251726ccdbc1c1b41309e43b1c7e Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 31 Jan 2025 20:20:59 +0000 Subject: [PATCH] fix: always register beats registries (#15439) (#15514) * fix: always recreate libbeat beats registry match the logic we had before the otel translation * ok then * always register beats registries * cleanup fix * test: fix race condition in TestLibbeatMetrics use atomic in http handler * test: clear output and apm-server beats registry before starting a beat, also clear output and apm-server registry (cherry picked from commit cd0bc2d0cc26b4054d3d9fdc70babab115cc03ae) Co-authored-by: kruskall <99559985+kruskall@users.noreply.github.com> --- internal/beatcmd/beat.go | 6 +----- internal/beatcmd/beat_test.go | 11 +++++------ 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/internal/beatcmd/beat.go b/internal/beatcmd/beat.go index 177d3b421f..13e257c395 100644 --- a/internal/beatcmd/beat.go +++ b/internal/beatcmd/beat.go @@ -491,10 +491,6 @@ func (b *Beat) registerStateMetrics() { } func (b *Beat) registerStatsMetrics() { - if b.Config.Output.Name() != "elasticsearch" { - return - } - libbeatRegistry := monitoring.Default.GetRegistry("libbeat") monitoring.NewFunc(libbeatRegistry, "output", func(_ monitoring.Mode, v monitoring.Visitor) { var rm metricdata.ResourceMetrics @@ -503,10 +499,10 @@ func (b *Beat) registerStatsMetrics() { } v.OnRegistryStart() defer v.OnRegistryFinished() - monitoring.ReportString(v, "type", "elasticsearch") for _, sm := range rm.ScopeMetrics { switch { case sm.Scope.Name == "github.com/elastic/go-docappender": + monitoring.ReportString(v, "type", "elasticsearch") addDocappenderLibbeatOutputMetrics(context.Background(), v, sm) } } diff --git a/internal/beatcmd/beat_test.go b/internal/beatcmd/beat_test.go index af748529e7..abb5822da6 100644 --- a/internal/beatcmd/beat_test.go +++ b/internal/beatcmd/beat_test.go @@ -146,7 +146,7 @@ func TestLibbeatMetrics(t *testing.T) { defer func() { assert.NoError(t, stop()) }() args := <-runnerParamsChan - var requestIndex int + var requestIndex atomic.Int64 requestsChan := make(chan chan struct{}) defer close(requestsChan) esClient := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { @@ -161,17 +161,16 @@ func TestLibbeatMetrics(t *testing.T) { } } _, result := docappendertest.DecodeBulkRequest(r) - switch requestIndex { - case 1: + switch requestIndex.Add(1) { + case 2: result.HasErrors = true result.Items[0]["create"] = esutil.BulkIndexerResponseItem{Status: 400} - case 3: + case 4: result.HasErrors = true result.Items[0]["create"] = esutil.BulkIndexerResponseItem{Status: 429} default: // success } - requestIndex++ json.NewEncoder(w).Encode(result) }) appender, err := docappender.New(esClient, docappender.Config{ @@ -603,7 +602,7 @@ func resetGlobals() { // Clear monitoring registries to allow the new Beat to populate them. monitoring.GetNamespace("info").SetRegistry(nil) monitoring.GetNamespace("state").SetRegistry(nil) - for _, name := range []string{"system", "beat", "libbeat"} { + for _, name := range []string{"system", "beat", "libbeat", "apm-server", "output"} { registry := monitoring.Default.GetRegistry(name) if registry != nil { registry.Clear()