Skip to content

Commit

Permalink
fix: always register beats registries (#15439)
Browse files Browse the repository at this point in the history
* fix: always recreate libbeat beats registry

match the logic we had before the otel translation

* ok then

* always register beats registries

* cleanup fix

* test: fix race condition in TestLibbeatMetrics

use atomic in http handler

* test: clear output and apm-server beats registry

before starting a beat, also clear output and apm-server registry

(cherry picked from commit cd0bc2d)
  • Loading branch information
kruskall authored and mergify[bot] committed Jan 31, 2025
1 parent fb729b8 commit c6670c4
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 11 deletions.
6 changes: 1 addition & 5 deletions internal/beatcmd/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,10 +491,6 @@ func (b *Beat) registerStateMetrics() {
}

func (b *Beat) registerStatsMetrics() {
if b.Config.Output.Name() != "elasticsearch" {
return
}

libbeatRegistry := monitoring.Default.GetRegistry("libbeat")
monitoring.NewFunc(libbeatRegistry, "output", func(_ monitoring.Mode, v monitoring.Visitor) {
var rm metricdata.ResourceMetrics
Expand All @@ -503,10 +499,10 @@ func (b *Beat) registerStatsMetrics() {
}
v.OnRegistryStart()
defer v.OnRegistryFinished()
monitoring.ReportString(v, "type", "elasticsearch")
for _, sm := range rm.ScopeMetrics {
switch {
case sm.Scope.Name == "github.com/elastic/go-docappender":
monitoring.ReportString(v, "type", "elasticsearch")
addDocappenderLibbeatOutputMetrics(context.Background(), v, sm)
}
}
Expand Down
11 changes: 5 additions & 6 deletions internal/beatcmd/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestLibbeatMetrics(t *testing.T) {
defer func() { assert.NoError(t, stop()) }()
args := <-runnerParamsChan

var requestIndex int
var requestIndex atomic.Int64
requestsChan := make(chan chan struct{})
defer close(requestsChan)
esClient := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -161,17 +161,16 @@ func TestLibbeatMetrics(t *testing.T) {
}
}
_, result := docappendertest.DecodeBulkRequest(r)
switch requestIndex {
case 1:
switch requestIndex.Add(1) {
case 2:
result.HasErrors = true
result.Items[0]["create"] = esutil.BulkIndexerResponseItem{Status: 400}
case 3:
case 4:
result.HasErrors = true
result.Items[0]["create"] = esutil.BulkIndexerResponseItem{Status: 429}
default:
// success
}
requestIndex++
json.NewEncoder(w).Encode(result)
})
appender, err := docappender.New(esClient, docappender.Config{
Expand Down Expand Up @@ -603,7 +602,7 @@ func resetGlobals() {
// Clear monitoring registries to allow the new Beat to populate them.
monitoring.GetNamespace("info").SetRegistry(nil)
monitoring.GetNamespace("state").SetRegistry(nil)
for _, name := range []string{"system", "beat", "libbeat"} {
for _, name := range []string{"system", "beat", "libbeat", "apm-server", "output"} {
registry := monitoring.Default.GetRegistry(name)
if registry != nil {
registry.Clear()
Expand Down

0 comments on commit c6670c4

Please sign in to comment.