diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index e556905885..f778ce733d 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -13,6 +13,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits] * Fix panic due to misaligned 64-bit access on 32-bit architectures {pull}5277[5277] * Fixed tail-based sampling pubsub to use _seq_no correctly {pull}5126[5126] * Removed service name from dataset {pull}5451[5451] +* Fix panic on Fleet policy change when transaction metrics or tail-based sampling are enabled {pull}5670[5670] [float] ==== Intake API Changes diff --git a/x-pack/apm-server/main.go b/x-pack/apm-server/main.go index 0cf2df7369..f50ef4e79a 100644 --- a/x-pack/apm-server/main.go +++ b/x-pack/apm-server/main.go @@ -65,6 +65,7 @@ func newProcessors(args beater.ServerParams) ([]namedProcessor, error) { return nil, errors.Wrapf(err, "error creating %s", name) } processors = append(processors, namedProcessor{name: name, processor: agg}) + aggregationMonitoringRegistry.Remove("txmetrics") monitoring.NewFunc(aggregationMonitoringRegistry, "txmetrics", agg.CollectMonitoring, monitoring.Report) } if args.Config.Aggregation.ServiceDestinations.Enabled { @@ -86,6 +87,7 @@ func newProcessors(args beater.ServerParams) ([]namedProcessor, error) { if err != nil { return nil, errors.Wrapf(err, "error creating %s", name) } + samplingMonitoringRegistry.Remove("tail") monitoring.NewFunc(samplingMonitoringRegistry, "tail", sampler.CollectMonitoring, monitoring.Report) processors = append(processors, namedProcessor{name: name, processor: sampler}) } @@ -232,16 +234,18 @@ func runServerWithProcessors(ctx context.Context, runServer beater.RunServerFunc return g.Wait() } -var rootCmd = cmd.NewXPackRootCommand(beater.NewCreator(beater.CreatorParams{ - WrapRunServer: func(runServer beater.RunServerFunc) beater.RunServerFunc { - return func(ctx context.Context, args beater.ServerParams) error { - processors, err := newProcessors(args) - if err != nil { - return err - } - return runServerWithProcessors(ctx, runServer, args, processors...) +func wrapRunServer(runServer beater.RunServerFunc) beater.RunServerFunc { + return func(ctx context.Context, args beater.ServerParams) error { + processors, err := newProcessors(args) + if err != nil { + return err } - }, + return runServerWithProcessors(ctx, runServer, args, processors...) + } +} + +var rootCmd = cmd.NewXPackRootCommand(beater.NewCreator(beater.CreatorParams{ + WrapRunServer: wrapRunServer, })) func main() { diff --git a/x-pack/apm-server/main_test.go b/x-pack/apm-server/main_test.go index 967130b069..261ece10ca 100644 --- a/x-pack/apm-server/main_test.go +++ b/x-pack/apm-server/main_test.go @@ -7,8 +7,23 @@ package main // This file is mandatory as otherwise the apm-server.test binary is not generated correctly. import ( + "context" "flag" "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/pkg/errors" + "go.elastic.co/apm/apmtest" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/elastic/beats/v7/libbeat/paths" + + "github.com/elastic/apm-server/beater" + "github.com/elastic/apm-server/beater/config" + "github.com/elastic/apm-server/model/modelprocessor" ) var systemTest *bool @@ -28,3 +43,42 @@ func TestSystem(t *testing.T) { main() } } + +func TestMonitoring(t *testing.T) { + // samplingMonitoringRegistry will be nil, as under normal circumstances + // we rely on apm-server/sampling to create the registry. + samplingMonitoringRegistry = monitoring.NewRegistry() + + var aggregationMonitoringSnapshot, tailSamplingMonitoringSnapshot monitoring.FlatSnapshot + runServerError := errors.New("runServer") + runServer := func(ctx context.Context, args beater.ServerParams) error { + aggregationMonitoringSnapshot = monitoring.CollectFlatSnapshot(aggregationMonitoringRegistry, monitoring.Full, false) + tailSamplingMonitoringSnapshot = monitoring.CollectFlatSnapshot(samplingMonitoringRegistry, monitoring.Full, false) + return runServerError + } + runServer = wrapRunServer(runServer) + + home := t.TempDir() + err := paths.InitPaths(&paths.Path{Home: home}) + require.NoError(t, err) + + cfg := config.DefaultConfig() + cfg.Aggregation.Transactions.Enabled = true + cfg.Sampling.Tail.Enabled = true + cfg.Sampling.Tail.Policies = []config.TailSamplingPolicy{{SampleRate: 0.1}} + + // Call the wrapped runServer twice, to ensure metric registration does not panic. + for i := 0; i < 2; i++ { + err := runServer(context.Background(), beater.ServerParams{ + Config: cfg, + Logger: logp.NewLogger(""), + Tracer: apmtest.DiscardTracer, + BatchProcessor: modelprocessor.Nop{}, + Managed: true, + Namespace: "default", + }) + assert.Equal(t, runServerError, err) + assert.NotEqual(t, monitoring.MakeFlatSnapshot(), aggregationMonitoringSnapshot) + assert.NotEqual(t, monitoring.MakeFlatSnapshot(), tailSamplingMonitoringSnapshot) + } +}