Skip to content

Commit

Permalink
Don't panic re-registering monitoring functions (#5670)
Browse files Browse the repository at this point in the history
When the server's policy changes, the server will
internally stop and restart various workers, and
re-register monitoring related to those workers.
We must remove the existing monitoring vars first,
to avoid panicking on re-registration.
  • Loading branch information
axw authored Jul 12, 2021
1 parent cd8b93e commit 06bea7c
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 9 deletions.
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits]
* Fix panic due to misaligned 64-bit access on 32-bit architectures {pull}5277[5277]
* Fixed tail-based sampling pubsub to use _seq_no correctly {pull}5126[5126]
* Removed service name from dataset {pull}5451[5451]
* Fix panic on Fleet policy change when transaction metrics or tail-based sampling are enabled {pull}5670[5670]

[float]
==== Intake API Changes
Expand Down
22 changes: 13 additions & 9 deletions x-pack/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func newProcessors(args beater.ServerParams) ([]namedProcessor, error) {
return nil, errors.Wrapf(err, "error creating %s", name)
}
processors = append(processors, namedProcessor{name: name, processor: agg})
aggregationMonitoringRegistry.Remove("txmetrics")
monitoring.NewFunc(aggregationMonitoringRegistry, "txmetrics", agg.CollectMonitoring, monitoring.Report)
}
if args.Config.Aggregation.ServiceDestinations.Enabled {
Expand All @@ -86,6 +87,7 @@ func newProcessors(args beater.ServerParams) ([]namedProcessor, error) {
if err != nil {
return nil, errors.Wrapf(err, "error creating %s", name)
}
samplingMonitoringRegistry.Remove("tail")
monitoring.NewFunc(samplingMonitoringRegistry, "tail", sampler.CollectMonitoring, monitoring.Report)
processors = append(processors, namedProcessor{name: name, processor: sampler})
}
Expand Down Expand Up @@ -232,16 +234,18 @@ func runServerWithProcessors(ctx context.Context, runServer beater.RunServerFunc
return g.Wait()
}

var rootCmd = cmd.NewXPackRootCommand(beater.NewCreator(beater.CreatorParams{
WrapRunServer: func(runServer beater.RunServerFunc) beater.RunServerFunc {
return func(ctx context.Context, args beater.ServerParams) error {
processors, err := newProcessors(args)
if err != nil {
return err
}
return runServerWithProcessors(ctx, runServer, args, processors...)
func wrapRunServer(runServer beater.RunServerFunc) beater.RunServerFunc {
return func(ctx context.Context, args beater.ServerParams) error {
processors, err := newProcessors(args)
if err != nil {
return err
}
},
return runServerWithProcessors(ctx, runServer, args, processors...)
}
}

var rootCmd = cmd.NewXPackRootCommand(beater.NewCreator(beater.CreatorParams{
WrapRunServer: wrapRunServer,
}))

func main() {
Expand Down
54 changes: 54 additions & 0 deletions x-pack/apm-server/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,23 @@ package main
// This file is mandatory as otherwise the apm-server.test binary is not generated correctly.

import (
"context"
"flag"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/pkg/errors"
"go.elastic.co/apm/apmtest"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/paths"

"github.com/elastic/apm-server/beater"
"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/model/modelprocessor"
)

var systemTest *bool
Expand All @@ -28,3 +43,42 @@ func TestSystem(t *testing.T) {
main()
}
}

func TestMonitoring(t *testing.T) {
// samplingMonitoringRegistry will be nil, as under normal circumstances
// we rely on apm-server/sampling to create the registry.
samplingMonitoringRegistry = monitoring.NewRegistry()

var aggregationMonitoringSnapshot, tailSamplingMonitoringSnapshot monitoring.FlatSnapshot
runServerError := errors.New("runServer")
runServer := func(ctx context.Context, args beater.ServerParams) error {
aggregationMonitoringSnapshot = monitoring.CollectFlatSnapshot(aggregationMonitoringRegistry, monitoring.Full, false)
tailSamplingMonitoringSnapshot = monitoring.CollectFlatSnapshot(samplingMonitoringRegistry, monitoring.Full, false)
return runServerError
}
runServer = wrapRunServer(runServer)

home := t.TempDir()
err := paths.InitPaths(&paths.Path{Home: home})
require.NoError(t, err)

cfg := config.DefaultConfig()
cfg.Aggregation.Transactions.Enabled = true
cfg.Sampling.Tail.Enabled = true
cfg.Sampling.Tail.Policies = []config.TailSamplingPolicy{{SampleRate: 0.1}}

// Call the wrapped runServer twice, to ensure metric registration does not panic.
for i := 0; i < 2; i++ {
err := runServer(context.Background(), beater.ServerParams{
Config: cfg,
Logger: logp.NewLogger(""),
Tracer: apmtest.DiscardTracer,
BatchProcessor: modelprocessor.Nop{},
Managed: true,
Namespace: "default",
})
assert.Equal(t, runServerError, err)
assert.NotEqual(t, monitoring.MakeFlatSnapshot(), aggregationMonitoringSnapshot)
assert.NotEqual(t, monitoring.MakeFlatSnapshot(), tailSamplingMonitoringSnapshot)
}
}

0 comments on commit 06bea7c

Please sign in to comment.