From 07ed0e2dbb2efd4d77d134675073976f7496b8dc Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 4 Feb 2025 09:04:51 +0000 Subject: [PATCH] fix: do not overwrite registries when publishing otel metrics to beats (#15517) (#15537) * fix: do not overwrite registries when publishing otel metrics to beats update apm-server registry mapping logic to avoid overwriting onkey handlers * test: add unit test for apm-server mapping logic (cherry picked from commit 9350ef565265895131d06cb3a2f4bcc268b8e56f) Co-authored-by: kruskall <99559985+kruskall@users.noreply.github.com> --- internal/beatcmd/beat.go | 36 +++++++++++++++++++++++------- internal/beatcmd/beat_test.go | 41 +++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 8 deletions(-) diff --git a/internal/beatcmd/beat.go b/internal/beatcmd/beat.go index 3970f85ec3..13e257c395 100644 --- a/internal/beatcmd/beat.go +++ b/internal/beatcmd/beat.go @@ -573,21 +573,41 @@ func getScalarInt64(data metricdata.Aggregation) (int64, bool) { } func addAPMServerMetrics(v monitoring.Visitor, sm metricdata.ScopeMetrics) { + beatsMetrics := make(map[string]any) for _, m := range sm.Metrics { if suffix, ok := strings.CutPrefix(m.Name, "apm-server."); ok { if value, ok := getScalarInt64(m.Data); ok { - keys := strings.Split(suffix, ".") - for i := 0; i < len(keys)-1; i++ { - v.OnRegistryStart() - v.OnKey(keys[i]) - } - monitoring.ReportInt(v, keys[len(keys)-1], value) - for i := 0; i < len(keys)-1; i++ { - v.OnRegistryFinished() + current := beatsMetrics + suffixSlice := strings.Split(suffix, ".") + for i := 0; i < len(suffixSlice)-1; i++ { + k := suffixSlice[i] + if _, ok := current[k]; !ok { + current[k] = make(map[string]any) + } + if currentmap, ok := current[k].(map[string]any); ok { + current = currentmap + } } + current[suffixSlice[len(suffixSlice)-1]] = value } } } + + reportOnKey(v, beatsMetrics) +} + +func reportOnKey(v monitoring.Visitor, m map[string]any) { + for key, value := range m { + if valueMap, ok := value.(map[string]any); ok { + v.OnRegistryStart() + v.OnKey(key) + reportOnKey(v, valueMap) + v.OnRegistryFinished() + } + if valueMetric, ok := value.(int64); ok { + monitoring.ReportInt(v, key, valueMetric) + } + } } // Adapt go-docappender's OTel metrics to beats stack monitoring metrics, diff --git a/internal/beatcmd/beat_test.go b/internal/beatcmd/beat_test.go index c31cfb251c..77d3d2e4fb 100644 --- a/internal/beatcmd/beat_test.go +++ b/internal/beatcmd/beat_test.go @@ -32,6 +32,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -266,6 +267,46 @@ func TestLibbeatMetrics(t *testing.T) { }, snapshot) } +func TestAddAPMServerMetrics(t *testing.T) { + r := monitoring.NewRegistry() + sm := metricdata.ScopeMetrics{ + Metrics: []metricdata.Metrics{ + { + Name: "apm-server.foo.request", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + { + Name: "apm-server.foo.response", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + }, + }, + }, + }, + }, + } + + monitoring.NewFunc(r, "apm-server", func(m monitoring.Mode, v monitoring.Visitor) { + addAPMServerMetrics(v, sm) + }) + + snapshot := monitoring.CollectStructSnapshot(r, monitoring.Full, false) + assert.Equal(t, map[string]any{ + "foo": map[string]any{ + "request": int64(1), + "response": int64(1), + }, + }, snapshot) +} + func TestUnmanagedOutputRequired(t *testing.T) { b := newBeat(t, "", func(args RunnerParams) (Runner, error) { panic("unreachable")