diff --git a/cmd/apm-server/main.go b/cmd/apm-server/main.go index dc17bc38a24..a6bdc4225cf 100644 --- a/cmd/apm-server/main.go +++ b/cmd/apm-server/main.go @@ -35,6 +35,9 @@ func main() { return beater.NewRunner(beater.RunnerParams{ Config: args.Config, Logger: args.Logger, + + MeterProvider: args.MeterProvider, + MetricsGatherer: args.MetricsGatherer, }) }, }) diff --git a/internal/beatcmd/beat.go b/internal/beatcmd/beat.go index b63d99d8f65..d5fa4a76f3a 100644 --- a/internal/beatcmd/beat.go +++ b/internal/beatcmd/beat.go @@ -33,6 +33,11 @@ import ( "time" "github.com/gofrs/uuid/v5" + "go.elastic.co/apm/module/apmotel/v2" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.uber.org/zap" "go.uber.org/zap/exp/zapslog" "golang.org/x/sync/errgroup" @@ -76,6 +81,10 @@ type Beat struct { rawConfig *config.C newRunner NewRunnerFunc + + metricReader *sdkmetric.ManualReader + meterProvider *sdkmetric.MeterProvider + metricGatherer *apmotel.Gatherer } // BeatParams holds parameters for NewBeat. @@ -109,6 +118,18 @@ func NewBeat(args BeatParams) (*Beat, error) { beatName = hostname } + exporter, err := apmotel.NewGatherer() + if err != nil { + return nil, err + } + + metricReader := sdkmetric.NewManualReader() + meterProvider := sdkmetric.NewMeterProvider( + sdkmetric.WithReader(exporter), + sdkmetric.WithReader(metricReader), + ) + otel.SetMeterProvider(meterProvider) + eid := uuid.FromStringOrNil(metricreport.EphemeralID().String()) b := &Beat{ Beat: beat.Beat{ @@ -127,9 +148,12 @@ func NewBeat(args BeatParams) (*Beat, error) { BeatConfig: cfg.APMServer, Registry: reload.NewRegistry(), }, - Config: cfg, - newRunner: args.NewRunner, - rawConfig: rawConfig, + Config: cfg, + newRunner: args.NewRunner, + rawConfig: rawConfig, + metricReader: metricReader, + meterProvider: meterProvider, + metricGatherer: &exporter, } if err := b.init(); err != nil { @@ -374,7 +398,7 @@ func (b *Beat) Run(ctx context.Context) error { } if b.Manager.Enabled() { - reloader, err := NewReloader(b.Info, b.Registry, b.newRunner) + reloader, err := NewReloader(b.Info, b.Registry, b.newRunner, b.meterProvider, b.metricGatherer) if err != nil { return err } @@ -390,9 +414,11 @@ func (b *Beat) Run(ctx context.Context) error { return errors.New("no output defined, please define one under the output section") } runner, err := b.newRunner(RunnerParams{ - Config: b.rawConfig, - Info: b.Info, - Logger: logp.NewLogger(""), + Config: b.rawConfig, + Info: b.Info, + Logger: logp.NewLogger(""), + MeterProvider: b.meterProvider, + MetricsGatherer: b.metricGatherer, }) if err != nil { return err @@ -410,7 +436,12 @@ func (b *Beat) Run(ctx context.Context) error { // is then exposed through the HTTP monitoring endpoint (e.g. /info and /state) // and/or pushed to Elasticsearch through the x-pack monitoring feature. func (b *Beat) registerMetrics() { - // info + b.registerInfoMetrics() + b.registerStateMetrics() + b.registerStatsMetrics() +} + +func (b *Beat) registerInfoMetrics() { infoRegistry := monitoring.GetNamespace("info").GetRegistry() monitoring.NewString(infoRegistry, "version").Set(b.Info.Version) monitoring.NewString(infoRegistry, "beat").Set(b.Info.Beat) @@ -436,7 +467,9 @@ func (b *Beat) registerMetrics() { monitoring.NewString(infoRegistry, "gid").Set(u.Gid) } }() +} +func (b *Beat) registerStateMetrics() { stateRegistry := monitoring.GetNamespace("state").GetRegistry() // state.service @@ -457,6 +490,188 @@ func (b *Beat) registerMetrics() { monitoring.NewBool(managementRegistry, "enabled").Set(b.Manager.Enabled()) } +func (b *Beat) registerStatsMetrics() { + if b.Config.Output.Name() != "elasticsearch" { + return + } + + libbeatRegistry := monitoring.Default.GetRegistry("libbeat") + monitoring.NewFunc(libbeatRegistry, "output", func(_ monitoring.Mode, v monitoring.Visitor) { + var rm metricdata.ResourceMetrics + if err := b.metricReader.Collect(context.Background(), &rm); err != nil { + return + } + v.OnRegistryStart() + defer v.OnRegistryFinished() + monitoring.ReportString(v, "type", "elasticsearch") + for _, sm := range rm.ScopeMetrics { + switch { + case sm.Scope.Name == "github.com/elastic/go-docappender": + addDocappenderLibbeatOutputMetrics(context.Background(), v, sm) + } + } + }) + monitoring.NewFunc(libbeatRegistry, "pipeline", func(_ monitoring.Mode, v monitoring.Visitor) { + var rm metricdata.ResourceMetrics + if err := b.metricReader.Collect(context.Background(), &rm); err != nil { + return + } + v.OnRegistryStart() + defer v.OnRegistryFinished() + for _, sm := range rm.ScopeMetrics { + switch { + case sm.Scope.Name == "github.com/elastic/go-docappender": + addDocappenderLibbeatPipelineMetrics(context.Background(), v, sm) + } + } + }) + monitoring.NewFunc(monitoring.Default, "output.elasticsearch", func(_ monitoring.Mode, v monitoring.Visitor) { + var rm metricdata.ResourceMetrics + if err := b.metricReader.Collect(context.Background(), &rm); err != nil { + return + } + v.OnRegistryStart() + defer v.OnRegistryFinished() + for _, sm := range rm.ScopeMetrics { + switch { + case sm.Scope.Name == "github.com/elastic/go-docappender": + addDocappenderOutputElasticsearchMetrics(context.Background(), v, sm) + } + } + }) +} + +// getScalarInt64 returns a single-value, dimensionless +// gauge or counter integer value, or (0, false) if the +// data does not match these constraints. +func getScalarInt64(data metricdata.Aggregation) (int64, bool) { + switch data := data.(type) { + case metricdata.Sum[int64]: + if len(data.DataPoints) != 1 || data.DataPoints[0].Attributes.Len() != 0 { + break + } + return data.DataPoints[0].Value, true + case metricdata.Gauge[int64]: + if len(data.DataPoints) != 1 || data.DataPoints[0].Attributes.Len() != 0 { + break + } + return data.DataPoints[0].Value, true + } + return 0, false +} + +// Adapt go-docappender's OTel metrics to beats stack monitoring metrics, +// with a mixture of libbeat-specific and apm-server specific metric names. +func addDocappenderLibbeatOutputMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) { + var writeBytes int64 + + v.OnRegistryStart() + v.OnKey("events") + for _, m := range sm.Metrics { + switch m.Name { + case "elasticsearch.events.processed": + var acked, toomany, failed int64 + data, _ := m.Data.(metricdata.Sum[int64]) + for _, dp := range data.DataPoints { + status, ok := dp.Attributes.Value(attribute.Key("status")) + if !ok { + continue + } + switch status.AsString() { + case "Success": + acked += dp.Value + case "TooMany": + toomany += dp.Value + fallthrough + default: + failed += dp.Value + } + } + monitoring.ReportInt(v, "acked", acked) + monitoring.ReportInt(v, "failed", failed) + monitoring.ReportInt(v, "toomany", toomany) + case "elasticsearch.events.count": + if value, ok := getScalarInt64(m.Data); ok { + monitoring.ReportInt(v, "total", value) + } + case "elasticsearch.events.queued": + if value, ok := getScalarInt64(m.Data); ok { + monitoring.ReportInt(v, "active", value) + } + case "elasticsearch.flushed.bytes": + if value, ok := getScalarInt64(m.Data); ok { + writeBytes = value + } + case "elasticsearch.bulk_requests.count": + if value, ok := getScalarInt64(m.Data); ok { + monitoring.ReportInt(v, "batches", value) + } + } + } + v.OnRegistryFinished() + + if writeBytes > 0 { + v.OnRegistryStart() + v.OnKey("write") + monitoring.ReportInt(v, "bytes", writeBytes) + v.OnRegistryFinished() + } +} + +func addDocappenderLibbeatPipelineMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) { + v.OnRegistryStart() + defer v.OnRegistryFinished() + v.OnKey("events") + + for _, m := range sm.Metrics { + switch m.Name { + case "elasticsearch.events.count": + if value, ok := getScalarInt64(m.Data); ok { + monitoring.ReportInt(v, "total", value) + } + } + } +} + +// Add non-libbeat Elasticsearch output metrics under "output.elasticsearch". +func addDocappenderOutputElasticsearchMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) { + var bulkRequestsCount, bulkRequestsAvailable int64 + var indexersCreated, indexersDestroyed int64 + for _, m := range sm.Metrics { + switch m.Name { + case "elasticsearch.bulk_requests.count": + if value, ok := getScalarInt64(m.Data); ok { + bulkRequestsCount = value + } + case "elasticsearch.bulk_requests.available": + if value, ok := getScalarInt64(m.Data); ok { + bulkRequestsAvailable = value + } + case "elasticsearch.indexer.created": + if value, ok := getScalarInt64(m.Data); ok { + indexersCreated = value + } + case "elasticsearch.indexer.destroyed": + if value, ok := getScalarInt64(m.Data); ok { + indexersDestroyed = value + } + } + } + + v.OnRegistryStart() + v.OnKey("bulk_requests") + monitoring.ReportInt(v, "completed", bulkRequestsCount) + monitoring.ReportInt(v, "available", bulkRequestsAvailable) + v.OnRegistryFinished() + + v.OnRegistryStart() + v.OnKey("indexers") + monitoring.ReportInt(v, "created", indexersCreated) + monitoring.ReportInt(v, "destroyed", indexersDestroyed) + monitoring.ReportInt(v, "active", indexersCreated-indexersDestroyed+1) + v.OnRegistryFinished() +} + // registerElasticsearchVerfication registers a global callback to make sure // the Elasticsearch instance we are connecting to has a valid license, and is // at least on the same version as APM Server. diff --git a/internal/beatcmd/beat_test.go b/internal/beatcmd/beat_test.go index ddd34fdf031..af748529e72 100644 --- a/internal/beatcmd/beat_test.go +++ b/internal/beatcmd/beat_test.go @@ -19,10 +19,13 @@ package beatcmd import ( "context" + "encoding/json" "errors" "fmt" + "net/http" "os" "strconv" + "strings" "sync/atomic" "testing" "time" @@ -45,6 +48,9 @@ import ( "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent-libs/paths" + "github.com/elastic/go-docappender/v2" + "github.com/elastic/go-docappender/v2/docappendertest" + "github.com/elastic/go-elasticsearch/v8/esutil" ) // TestRunMaxProcs ensures Beat.Run calls the GOMAXPROCS adjustment code by looking for log messages. @@ -125,6 +131,142 @@ func TestRunnerParams(t *testing.T) { }, m) } +// TestLibbeatMetrics tests the mapping of go-docappender OTel +// metrics to legacy libbeat monitoring metrics. +func TestLibbeatMetrics(t *testing.T) { + runnerParamsChan := make(chan RunnerParams, 1) + beat := newBeat(t, "output.elasticsearch.enabled: true", func(args RunnerParams) (Runner, error) { + runnerParamsChan <- args + return runnerFunc(func(ctx context.Context) error { + <-ctx.Done() + return ctx.Err() + }), nil + }) + stop := runBeat(t, beat) + defer func() { assert.NoError(t, stop()) }() + args := <-runnerParamsChan + + var requestIndex int + requestsChan := make(chan chan struct{}) + defer close(requestsChan) + esClient := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + select { + case <-r.Context().Done(): + return + case ch := <-requestsChan: + select { + case <-r.Context().Done(): + return + case <-ch: + } + } + _, result := docappendertest.DecodeBulkRequest(r) + switch requestIndex { + case 1: + result.HasErrors = true + result.Items[0]["create"] = esutil.BulkIndexerResponseItem{Status: 400} + case 3: + result.HasErrors = true + result.Items[0]["create"] = esutil.BulkIndexerResponseItem{Status: 429} + default: + // success + } + requestIndex++ + json.NewEncoder(w).Encode(result) + }) + appender, err := docappender.New(esClient, docappender.Config{ + MeterProvider: args.MeterProvider, + FlushBytes: 1, + Scaling: docappender.ScalingConfig{ + ActiveRatio: 10, + IdleInterval: 100 * time.Millisecond, + ScaleUp: docappender.ScaleActionConfig{ + Threshold: 1, + CoolDown: time.Minute, + }, + }, + }) + require.NoError(t, err) + defer func() { + assert.NoError(t, appender.Close(context.Background())) + }() + + require.NoError(t, appender.Add(context.Background(), "index", strings.NewReader("{}"))) + require.NoError(t, appender.Add(context.Background(), "index", strings.NewReader("{}"))) + require.NoError(t, appender.Add(context.Background(), "index", strings.NewReader("{}"))) + require.NoError(t, appender.Add(context.Background(), "index", strings.NewReader("{}"))) + + libbeatRegistry := monitoring.Default.GetRegistry("libbeat") + snapshot := monitoring.CollectStructSnapshot(libbeatRegistry, monitoring.Full, false) + assert.Equal(t, map[string]any{ + "output": map[string]any{ + "type": "elasticsearch", + "events": map[string]any{ + "active": int64(4), + "total": int64(4), + }, + }, + "pipeline": map[string]any{ + "events": map[string]any{ + "total": int64(4), + }, + }, + }, snapshot) + + assert.Eventually(t, func() bool { + return appender.Stats().IndexersActive > 1 + }, 10*time.Second, 50*time.Millisecond) + + for i := 0; i < 4; i++ { + unblockRequest := make(chan struct{}) + requestsChan <- unblockRequest + unblockRequest <- struct{}{} + } + + assert.Eventually(t, func() bool { + snapshot = monitoring.CollectStructSnapshot(libbeatRegistry, monitoring.Full, false) + output := snapshot["output"].(map[string]any) + events := output["events"].(map[string]any) + return events["active"] == int64(0) + }, 10*time.Second, 100*time.Millisecond) + assert.Equal(t, map[string]any{ + "output": map[string]any{ + "type": "elasticsearch", + "events": map[string]any{ + "acked": int64(2), + "failed": int64(2), + "toomany": int64(1), + "active": int64(0), + "total": int64(4), + "batches": int64(4), + }, + "write": map[string]any{ + "bytes": int64(132), + }, + }, + "pipeline": map[string]any{ + "events": map[string]any{ + "total": int64(4), + }, + }, + }, snapshot) + + snapshot = monitoring.CollectStructSnapshot(monitoring.Default.GetRegistry("output"), monitoring.Full, false) + assert.Equal(t, map[string]any{ + "elasticsearch": map[string]any{ + "bulk_requests": map[string]any{ + "available": int64(10), + "completed": int64(4), + }, + "indexers": map[string]any{ + "active": int64(2), + "created": int64(1), + "destroyed": int64(0), + }, + }, + }, snapshot) +} + func TestUnmanagedOutputRequired(t *testing.T) { b := newBeat(t, "", func(args RunnerParams) (Runner, error) { panic("unreachable") @@ -220,7 +362,7 @@ func TestRunManager_Reloader(t *testing.T) { stopCount.Add(1) return nil }), nil - }) + }, nil, nil) require.NoError(t, err) agentInfo := &proto.AgentInfo{ @@ -346,7 +488,7 @@ func TestRunManager_Reloader_newRunnerError(t *testing.T) { _, err := NewReloader(beat.Info{}, registry, func(_ RunnerParams) (Runner, error) { return nil, errors.New("newRunner error") - }) + }, nil, nil) require.NoError(t, err) onObserved := func(observed *proto.CheckinObserved, currentIdx int) { diff --git a/internal/beatcmd/reloader.go b/internal/beatcmd/reloader.go index 27da683e3dc..6a8cd9f208a 100644 --- a/internal/beatcmd/reloader.go +++ b/internal/beatcmd/reloader.go @@ -24,6 +24,8 @@ import ( "sync" "github.com/joeshaw/multierror" + "go.elastic.co/apm/module/apmotel/v2" + "go.opentelemetry.io/otel/metric" "golang.org/x/sync/errgroup" "github.com/elastic/beats/v7/libbeat/beat" @@ -50,6 +52,19 @@ type RunnerParams struct { // Logger holds a logger to use for logging throughout the APM Server. Logger *logp.Logger + + // MeterProvider holds a metric.MeterProvider that can be used for + // creating metrics. The same MeterProvider is expected to be used + // for each instance of the Runner, to ensure counter metrics are + // not reset. + // + // NOTE(axw) metrics registered through this provider are used for + // feeding into both Elastic APM (if enabled) and the libbeat + // monitoring framework. For the latter, only gauge and counter + // metrics are supported, and attributes (dimensions) are ignored. + MeterProvider metric.MeterProvider + + MetricsGatherer *apmotel.Gatherer } // Runner is an interface returned by NewRunnerFunc. @@ -60,12 +75,15 @@ type Runner interface { // NewReloader returns a new Reloader which creates Runners using the provided // beat.Info and NewRunnerFunc. -func NewReloader(info beat.Info, registry *reload.Registry, newRunner NewRunnerFunc) (*Reloader, error) { +func NewReloader(info beat.Info, registry *reload.Registry, newRunner NewRunnerFunc, meterProvider metric.MeterProvider, metricGatherer *apmotel.Gatherer) (*Reloader, error) { r := &Reloader{ info: info, logger: logp.NewLogger(""), newRunner: newRunner, stopped: make(chan struct{}), + + meterProvider: meterProvider, + metricGatherer: metricGatherer, } if err := registry.RegisterList(reload.InputRegName, reloadableListFunc(r.reloadInputs)); err != nil { return nil, fmt.Errorf("failed to register inputs reloader: %w", err) @@ -86,6 +104,9 @@ type Reloader struct { logger *logp.Logger newRunner NewRunnerFunc + meterProvider metric.MeterProvider + metricGatherer *apmotel.Gatherer + runner Runner stopRunner func() error diff --git a/internal/beatcmd/reloader_test.go b/internal/beatcmd/reloader_test.go index 4f90d0d5f7f..f496772b104 100644 --- a/internal/beatcmd/reloader_test.go +++ b/internal/beatcmd/reloader_test.go @@ -72,7 +72,7 @@ func TestReloader(t *testing.T) { <-ctx.Done() return nil }), nil - }) + }, nil, nil) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -156,7 +156,7 @@ func TestReloaderNewRunnerParams(t *testing.T) { <-ctx.Done() return nil }), nil - }) + }, nil, nil) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) diff --git a/internal/beater/beater.go b/internal/beater/beater.go index fe779bee8fe..1903fcdfa73 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -35,7 +35,7 @@ import ( "go.elastic.co/apm/module/apmotel/v2" "go.elastic.co/apm/v2" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" @@ -89,7 +89,9 @@ type Runner struct { outputConfig agentconfig.Namespace elasticsearchOutputConfig *agentconfig.C - listener net.Listener + meterProvider metric.MeterProvider + metricGatherer *apmotel.Gatherer + listener net.Listener } // RunnerParams holds parameters for NewRunner. @@ -101,6 +103,13 @@ type RunnerParams struct { // Logger holds a logger to use for logging throughout the APM Server. Logger *logp.Logger + // MeterProvider holds a metric.MeterProvider that can be used for + // creating metrics. + MeterProvider metric.MeterProvider + + // MetricsGatherer holds an apmotel.Gatherer + MetricsGatherer *apmotel.Gatherer + // WrapServer holds an optional WrapServerFunc, for wrapping the // ServerParams and RunServerFunc used to run the APM Server. // @@ -149,7 +158,9 @@ func NewRunner(args RunnerParams) (*Runner, error) { outputConfig: unpackedConfig.Output, elasticsearchOutputConfig: elasticsearchOutputConfig, - listener: listener, + meterProvider: args.MeterProvider, + metricGatherer: args.MetricsGatherer, + listener: listener, }, nil } @@ -280,15 +291,7 @@ func (s *Runner) Run(ctx context.Context) error { } otel.SetTracerProvider(tracerProvider) - exporter, err := apmotel.NewGatherer() - if err != nil { - return err - } - meterProvider := metric.NewMeterProvider( - metric.WithReader(exporter), - ) - otel.SetMeterProvider(meterProvider) - tracer.RegisterMetricsGatherer(exporter) + tracer.RegisterMetricsGatherer(s.metricGatherer) // Ensure the libbeat output and go-elasticsearch clients do not index // any events to Elasticsearch before the integration is ready. @@ -370,7 +373,7 @@ func (s *Runner) Run(ctx context.Context) error { apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(tracer)), interceptors.ClientMetadata(), interceptors.Logging(gRPCLogger), - interceptors.Metrics(gRPCLogger, nil), + interceptors.Metrics(gRPCLogger, s.meterProvider), interceptors.Timeout(), interceptors.Auth(authenticator), interceptors.AnonymousRateLimit(ratelimitStore), @@ -677,9 +680,9 @@ func (s *Runner) newFinalBatchProcessor( newElasticsearchClient func(cfg *elasticsearch.Config) (*elasticsearch.Client, error), memLimit float64, ) (modelpb.BatchProcessor, func(context.Context) error, error) { - monitoring.Default.Remove("libbeat") - libbeatMonitoringRegistry := monitoring.Default.NewRegistry("libbeat") if s.elasticsearchOutputConfig == nil { + monitoring.Default.Remove("libbeat") + libbeatMonitoringRegistry := monitoring.Default.GetRegistry("libbeat") return s.newLibbeatFinalBatchProcessor(tracer, libbeatMonitoringRegistry) } @@ -693,7 +696,7 @@ func (s *Runner) newFinalBatchProcessor( monitoring.NewString(outputRegistry, "name").Set("elasticsearch") // Create the docappender and Elasticsearch config - appenderCfg, esCfg, err := s.newDocappenderConfig(tracer, memLimit) + appenderCfg, esCfg, err := s.newDocappenderConfig(tracer, s.meterProvider, memLimit) if err != nil { return nil, nil, err } @@ -706,65 +709,10 @@ func (s *Runner) newFinalBatchProcessor( return nil, nil, err } - // Install our own libbeat-compatible metrics callback which uses the docappender stats. - // All the metrics below are required to be reported to be able to display all relevant - // fields in the Stack Monitoring UI. - monitoring.NewFunc(libbeatMonitoringRegistry, "output.write", func(_ monitoring.Mode, v monitoring.Visitor) { - v.OnRegistryStart() - defer v.OnRegistryFinished() - v.OnKey("bytes") - v.OnInt(appender.Stats().BytesTotal) - }) - outputType := monitoring.NewString(libbeatMonitoringRegistry.GetRegistry("output"), "type") - outputType.Set("elasticsearch") - monitoring.NewFunc(libbeatMonitoringRegistry, "output.events", func(_ monitoring.Mode, v monitoring.Visitor) { - v.OnRegistryStart() - defer v.OnRegistryFinished() - stats := appender.Stats() - v.OnKey("acked") - v.OnInt(stats.Indexed) - v.OnKey("active") - v.OnInt(stats.Active) - v.OnKey("batches") - v.OnInt(stats.BulkRequests) - v.OnKey("failed") - v.OnInt(stats.Failed) - v.OnKey("toomany") - v.OnInt(stats.TooManyRequests) - v.OnKey("total") - v.OnInt(stats.Added) - }) - monitoring.NewFunc(libbeatMonitoringRegistry, "pipeline.events", func(_ monitoring.Mode, v monitoring.Visitor) { - v.OnRegistryStart() - defer v.OnRegistryFinished() - v.OnKey("total") - v.OnInt(appender.Stats().Added) - }) - monitoring.Default.Remove("output") - monitoring.NewFunc(monitoring.Default, "output.elasticsearch.bulk_requests", func(_ monitoring.Mode, v monitoring.Visitor) { - v.OnRegistryStart() - defer v.OnRegistryFinished() - stats := appender.Stats() - v.OnKey("available") - v.OnInt(stats.AvailableBulkRequests) - v.OnKey("completed") - v.OnInt(stats.BulkRequests) - }) - monitoring.NewFunc(monitoring.Default, "output.elasticsearch.indexers", func(_ monitoring.Mode, v monitoring.Visitor) { - v.OnRegistryStart() - defer v.OnRegistryFinished() - stats := appender.Stats() - v.OnKey("active") - v.OnInt(stats.IndexersActive) - v.OnKey("created") - v.OnInt(stats.IndexersCreated) - v.OnKey("destroyed") - v.OnInt(stats.IndexersDestroyed) - }) return newDocappenderBatchProcessor(appender), appender.Close, nil } -func (s *Runner) newDocappenderConfig(tracer *apm.Tracer, memLimit float64) ( +func (s *Runner) newDocappenderConfig(tracer *apm.Tracer, mp metric.MeterProvider, memLimit float64) ( docappender.Config, *elasticsearch.Config, error, ) { esConfig := struct { @@ -809,6 +757,7 @@ func (s *Runner) newDocappenderConfig(tracer *apm.Tracer, memLimit float64) ( FlushBytes: flushBytes, FlushInterval: esConfig.FlushInterval, Tracer: tracer, + MeterProvider: mp, MaxRequests: esConfig.MaxRequests, Scaling: scalingCfg, Logger: zap.New(s.logger.Core(), zap.WithCaller(true)), diff --git a/internal/beater/beater_test.go b/internal/beater/beater_test.go index 15cd3d5f32e..a07188572c2 100644 --- a/internal/beater/beater_test.go +++ b/internal/beater/beater_test.go @@ -187,7 +187,7 @@ func TestRunnerNewDocappenderConfig(t *testing.T) { elasticsearchOutputConfig: agentconfig.NewConfig(), logger: logp.NewLogger("test"), } - docCfg, esCfg, err := r.newDocappenderConfig(nil, c.memSize) + docCfg, esCfg, err := r.newDocappenderConfig(nil, nil, c.memSize) require.NoError(t, err) assert.Equal(t, docappender.Config{ Logger: zap.New(r.logger.Core(), zap.WithCaller(true)), @@ -219,7 +219,7 @@ func TestRunnerNewDocappenderConfig(t *testing.T) { }), logger: logp.NewLogger("test"), } - docCfg, esCfg, err := r.newDocappenderConfig(nil, c.memSize) + docCfg, esCfg, err := r.newDocappenderConfig(nil, nil, c.memSize) require.NoError(t, err) assert.Equal(t, docappender.Config{ Logger: zap.New(r.logger.Core(), zap.WithCaller(true)), diff --git a/internal/beater/beatertest/server.go b/internal/beater/beatertest/server.go index 9fc64c8fdf0..10eba224a71 100644 --- a/internal/beater/beatertest/server.go +++ b/internal/beater/beatertest/server.go @@ -28,6 +28,8 @@ import ( "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" @@ -88,6 +90,7 @@ func NewUnstartedServer(t testing.TB, opts ...option) *Server { "host": "localhost:0", }, })}, + meterProvider: noop.NewMeterProvider(), } for _, o := range opts { o(&options) @@ -111,9 +114,10 @@ func NewUnstartedServer(t testing.TB, opts ...option) *Server { } runner, err := beater.NewRunner(beater.RunnerParams{ - Config: cfg, - Logger: logger, - WrapServer: options.wrapServer, + Config: cfg, + Logger: logger, + WrapServer: options.wrapServer, + MeterProvider: options.meterProvider, }) require.NoError(t, err) @@ -190,8 +194,9 @@ func (s *Server) Close() error { } type options struct { - config []*agentconfig.C - wrapServer beater.WrapServerFunc + config []*agentconfig.C + wrapServer beater.WrapServerFunc + meterProvider metric.MeterProvider } type option func(*options) @@ -214,3 +219,10 @@ func WithWrapServer(wrapServer beater.WrapServerFunc) option { opts.wrapServer = wrapServer } } + +// WithMeterProvider is an option for setting a MeterProvider +func WithMeterProvider(mp metric.MeterProvider) option { + return func(opts *options) { + opts.meterProvider = mp + } +} diff --git a/internal/beater/server_test.go b/internal/beater/server_test.go index 3f757e6ea19..df3a1a2ce85 100644 --- a/internal/beater/server_test.go +++ b/internal/beater/server_test.go @@ -40,6 +40,8 @@ import ( "github.com/jaegertracing/jaeger/proto-gen/api_v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" "google.golang.org/grpc" @@ -476,7 +478,14 @@ func TestServerElasticsearchOutput(t *testing.T) { monitoring.Default.Remove("libbeat.whatever") monitoring.NewInt(monitoring.Default, "libbeat.whatever") - srv := beatertest.NewServer(t, beatertest.WithConfig(agentconfig.MustNewConfigFrom(map[string]interface{}{ + reader := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector( + func(ik sdkmetric.InstrumentKind) metricdata.Temporality { + return metricdata.DeltaTemporality + }, + )) + mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + + srv := beatertest.NewServer(t, beatertest.WithMeterProvider(mp), beatertest.WithConfig(agentconfig.MustNewConfigFrom(map[string]interface{}{ "output.elasticsearch": map[string]interface{}{ "hosts": []string{elasticsearchServer.URL}, "flush_interval": "1ms", @@ -502,44 +511,11 @@ func TestServerElasticsearchOutput(t *testing.T) { t.Fatal("timed out waiting for bulk request") } - snapshot := monitoring.CollectStructSnapshot(monitoring.Default.GetRegistry("libbeat"), monitoring.Full, false) - assert.Equal(t, map[string]interface{}{ - "output": map[string]interface{}{ - "events": map[string]interface{}{ - "acked": int64(0), - "active": int64(5), - "batches": int64(0), - "failed": int64(0), - "toomany": int64(0), - "total": int64(5), - }, - "type": "elasticsearch", - "write": map[string]interface{}{ - // _bulk requests haven't completed, so bytes flushed won't have been updated. - "bytes": int64(0), - }, - }, - "pipeline": map[string]interface{}{ - "events": map[string]interface{}{ - "total": int64(5), - }, - }, - }, snapshot) - - snapshot = monitoring.CollectStructSnapshot(monitoring.Default.GetRegistry("output"), monitoring.Full, false) - assert.Equal(t, map[string]interface{}{ - "elasticsearch": map[string]interface{}{ - "bulk_requests": map[string]interface{}{ - "available": int64(9), - "completed": int64(0), - }, - "indexers": map[string]interface{}{ - "active": int64(1), - "destroyed": int64(0), - "created": int64(0), - }, - }, - }, snapshot) + monitoringtest.ExpectOtelMetrics(t, reader, map[string]any{ + "elasticsearch.events.count": 5, + "elasticsearch.events.queued": 5, + "elasticsearch.bulk_requests.available": 9, + }) } func TestServerPProf(t *testing.T) { diff --git a/systemtest/monitoring_test.go b/systemtest/monitoring_test.go index 354ffcc6ffb..622a4e94af0 100644 --- a/systemtest/monitoring_test.go +++ b/systemtest/monitoring_test.go @@ -77,7 +77,9 @@ func TestMonitoring(t *testing.T) { metrics.Output = nil getBeatsMonitoringStats(t, srv, &metrics) acked := gjson.GetBytes(metrics.Libbeat, "output.events.acked") - return acked.Int() == N + // There may be additional pre-aggregated metrics indexed, + // hence we check >= rather than ==. + return acked.Int() >= N }, 10*time.Second, 10*time.Millisecond) // Assert the presence of output.write.bytes, and that it is non-zero; @@ -93,8 +95,8 @@ func TestMonitoring(t *testing.T) { assert.Equal(t, int64(0), gjson.GetBytes(metrics.Libbeat, "output.events.active").Int()) assert.Equal(t, int64(0), gjson.GetBytes(metrics.Libbeat, "output.events.failed").Int()) assert.Equal(t, int64(0), gjson.GetBytes(metrics.Libbeat, "output.events.toomany").Int()) - assert.Equal(t, int64(N), gjson.GetBytes(metrics.Libbeat, "output.events.total").Int()) - assert.Equal(t, int64(N), gjson.GetBytes(metrics.Libbeat, "pipeline.events.total").Int()) + assert.GreaterOrEqual(t, int64(N), gjson.GetBytes(metrics.Libbeat, "output.events.total").Int()) + assert.GreaterOrEqual(t, int64(N), gjson.GetBytes(metrics.Libbeat, "pipeline.events.total").Int()) assert.Equal(t, "elasticsearch", gjson.GetBytes(metrics.Libbeat, "output.type").Str) bulkRequestsAvailable := gjson.GetBytes(metrics.Output, "elasticsearch.bulk_requests.available") diff --git a/x-pack/apm-server/main.go b/x-pack/apm-server/main.go index f2e026c8aab..24cf1fe115b 100644 --- a/x-pack/apm-server/main.go +++ b/x-pack/apm-server/main.go @@ -274,6 +274,9 @@ func Main() error { Config: args.Config, Logger: args.Logger, WrapServer: wrapServer, + + MeterProvider: args.MeterProvider, + MetricsGatherer: args.MetricsGatherer, }) }, )