From ea035a454765cef9adaa3e85f17b044eb19f360d Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 31 Jan 2025 22:12:02 +0000 Subject: [PATCH] fix: always use background context to update metrics (#15516) (#15518) propagating the request context means the metric might not be updated if the context is done (cherry picked from commit b62fd62bd5f6249f7c06eadbf7fc89f2edc902d0) Co-authored-by: kruskall <99559985+kruskall@users.noreply.github.com> --- internal/agentcfg/elasticsearch.go | 14 +++++----- internal/beater/interceptors/metrics.go | 20 ++++++------- .../middleware/monitoring_middleware.go | 20 ++++++------- internal/model/modelprocessor/eventcounter.go | 2 +- x-pack/apm-server/sampling/processor.go | 28 +++++++++---------- 5 files changed, 41 insertions(+), 43 deletions(-) diff --git a/internal/agentcfg/elasticsearch.go b/internal/agentcfg/elasticsearch.go index bf01ddf711..75c2583fb8 100644 --- a/internal/agentcfg/elasticsearch.go +++ b/internal/agentcfg/elasticsearch.go @@ -125,22 +125,22 @@ func (f *ElasticsearchFetcher) Fetch(ctx context.Context, query Query) (Result, // Happy path: serve fetch requests using an initialized cache. f.mu.RLock() defer f.mu.RUnlock() - f.esFetchCount.Add(ctx, 1) + f.esFetchCount.Add(context.Background(), 1) return matchAgentConfig(query, f.cache), nil } if f.fallbackFetcher != nil { - f.esFetchFallbackCount.Add(ctx, 1) + f.esFetchFallbackCount.Add(context.Background(), 1) return f.fallbackFetcher.Fetch(ctx, query) } if f.invalidESCfg.Load() { - f.esFetchInvalidCount.Add(ctx, 1) + f.esFetchInvalidCount.Add(context.Background(), 1) f.rateLimitedLogger.Errorf("rejecting fetch request: no valid elasticsearch config") return Result{}, errors.New(ErrNoValidElasticsearchConfig) } - f.esFetchUnavailableCount.Add(ctx, 1) + f.esFetchUnavailableCount.Add(context.Background(), 1) f.rateLimitedLogger.Warnf("rejecting fetch request: infrastructure is not ready") return Result{}, errors.New(ErrInfrastructureNotReady) } @@ -233,9 +233,9 @@ func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) { defer func() { if err != nil { - f.esCacheRefreshFailures.Add(ctx, 1) + f.esCacheRefreshFailures.Add(context.Background(), 1) } else { - f.esCacheRefreshSuccesses.Add(ctx, 1) + f.esCacheRefreshSuccesses.Add(context.Background(), 1) } }() @@ -267,7 +267,7 @@ func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) { f.cache = buffer f.mu.Unlock() f.cacheInitialized.Store(true) - f.esCacheEntriesCount.Record(ctx, int64(len(f.cache))) + f.esCacheEntriesCount.Record(context.Background(), int64(len(f.cache))) f.last = time.Now() return nil } diff --git a/internal/beater/interceptors/metrics.go b/internal/beater/interceptors/metrics.go index c49d303944..7cce9a3c2b 100644 --- a/internal/beater/interceptors/metrics.go +++ b/internal/beater/interceptors/metrics.go @@ -69,13 +69,13 @@ func (m *metricsInterceptor) Interceptor() grpc.UnaryServerInterceptor { return handler(ctx, req) } - m.inc(ctx, legacyMetricsPrefix, request.IDRequestCount) - defer m.inc(ctx, legacyMetricsPrefix, request.IDResponseCount) + m.inc(legacyMetricsPrefix, request.IDRequestCount) + defer m.inc(legacyMetricsPrefix, request.IDResponseCount) start := time.Now() resp, err := handler(ctx, req) duration := time.Since(start) - m.getHistogram(requestDurationHistogram, metric.WithUnit("ms")).Record(ctx, duration.Milliseconds()) + m.getHistogram(requestDurationHistogram, metric.WithUnit("ms")).Record(context.Background(), duration.Milliseconds()) responseID := request.IDResponseValidCount if err != nil { @@ -83,22 +83,22 @@ func (m *metricsInterceptor) Interceptor() grpc.UnaryServerInterceptor { if s, ok := status.FromError(err); ok { switch s.Code() { case codes.Unauthenticated: - m.inc(ctx, legacyMetricsPrefix, request.IDResponseErrorsUnauthorized) + m.inc(legacyMetricsPrefix, request.IDResponseErrorsUnauthorized) case codes.DeadlineExceeded, codes.Canceled: - m.inc(ctx, legacyMetricsPrefix, request.IDResponseErrorsTimeout) + m.inc(legacyMetricsPrefix, request.IDResponseErrorsTimeout) case codes.ResourceExhausted: - m.inc(ctx, legacyMetricsPrefix, request.IDResponseErrorsRateLimit) + m.inc(legacyMetricsPrefix, request.IDResponseErrorsRateLimit) } } } - m.inc(ctx, legacyMetricsPrefix, responseID) + m.inc(legacyMetricsPrefix, responseID) return resp, err } } -func (m *metricsInterceptor) inc(ctx context.Context, legacyMetricsPrefix string, id request.ResultID) { - m.getCounter("grpc.server.", string(id)).Add(ctx, 1) - m.getCounter(legacyMetricsPrefix, string(id)).Add(ctx, 1) +func (m *metricsInterceptor) inc(legacyMetricsPrefix string, id request.ResultID) { + m.getCounter("grpc.server.", string(id)).Add(context.Background(), 1) + m.getCounter(legacyMetricsPrefix, string(id)).Add(context.Background(), 1) } func (m *metricsInterceptor) getCounter(prefix, n string) metric.Int64Counter { diff --git a/internal/beater/middleware/monitoring_middleware.go b/internal/beater/middleware/monitoring_middleware.go index 9dd06eec4d..95bc315aec 100644 --- a/internal/beater/middleware/monitoring_middleware.go +++ b/internal/beater/middleware/monitoring_middleware.go @@ -43,30 +43,28 @@ type monitoringMiddleware struct { func (m *monitoringMiddleware) Middleware() Middleware { return func(h request.Handler) (request.Handler, error) { return func(c *request.Context) { - ctx := context.Background() - - m.inc(ctx, request.IDRequestCount) + m.inc(request.IDRequestCount) start := time.Now() h(c) duration := time.Since(start) - m.getHistogram(requestDurationHistogram, metric.WithUnit("ms")).Record(ctx, duration.Milliseconds()) + m.getHistogram(requestDurationHistogram, metric.WithUnit("ms")).Record(context.Background(), duration.Milliseconds()) - m.inc(ctx, request.IDResponseCount) + m.inc(request.IDResponseCount) if c.Result.StatusCode >= http.StatusBadRequest { - m.inc(ctx, request.IDResponseErrorsCount) + m.inc(request.IDResponseErrorsCount) } else { - m.inc(ctx, request.IDResponseValidCount) + m.inc(request.IDResponseValidCount) } - m.inc(ctx, c.Result.ID) + m.inc(c.Result.ID) }, nil } } -func (m *monitoringMiddleware) inc(ctx context.Context, id request.ResultID) { - m.getCounter("http.server.", string(id)).Add(ctx, 1) - m.getCounter(m.legacyMetricsPrefix, string(id)).Add(ctx, 1) +func (m *monitoringMiddleware) inc(id request.ResultID) { + m.getCounter("http.server.", string(id)).Add(context.Background(), 1) + m.getCounter(m.legacyMetricsPrefix, string(id)).Add(context.Background(), 1) } func (m *monitoringMiddleware) getCounter(prefix, name string) metric.Int64Counter { diff --git a/internal/model/modelprocessor/eventcounter.go b/internal/model/modelprocessor/eventcounter.go index 62bc21d7cb..0452bd27db 100644 --- a/internal/model/modelprocessor/eventcounter.go +++ b/internal/model/modelprocessor/eventcounter.go @@ -55,7 +55,7 @@ func NewEventCounter(mp metric.MeterProvider) *EventCounter { // ProcessBatch counts events in b, grouping by APMEvent.Processor.Event. func (c *EventCounter) ProcessBatch(ctx context.Context, b *modelpb.Batch) error { for _, event := range *b { - c.eventCounters[event.Type()].Add(ctx, 1) + c.eventCounters[event.Type()].Add(context.Background(), 1) } return nil } diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index 32eea0e273..efbadea72d 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -105,11 +105,11 @@ func (p *Processor) ProcessBatch(ctx context.Context, batch *modelpb.Batch) erro var err error switch event.Type() { case modelpb.TransactionEventType: - p.eventMetrics.processed.Add(ctx, 1) - report, stored, err = p.processTransaction(ctx, event) + p.eventMetrics.processed.Add(context.Background(), 1) + report, stored, err = p.processTransaction(event) case modelpb.SpanEventType: - p.eventMetrics.processed.Add(ctx, 1) - report, stored, err = p.processSpan(ctx, event) + p.eventMetrics.processed.Add(context.Background(), 1) + report, stored, err = p.processSpan(event) default: continue } @@ -136,18 +136,18 @@ func (p *Processor) ProcessBatch(ctx context.Context, batch *modelpb.Batch) erro i-- } - p.updateProcessorMetrics(ctx, report, stored, failed) + p.updateProcessorMetrics(report, stored, failed) } *batch = events return nil } -func (p *Processor) updateProcessorMetrics(ctx context.Context, report, stored, failedWrite bool) { +func (p *Processor) updateProcessorMetrics(report, stored, failedWrite bool) { if failedWrite { - p.eventMetrics.failedWrites.Add(ctx, 1) + p.eventMetrics.failedWrites.Add(context.Background(), 1) } if stored { - p.eventMetrics.stored.Add(ctx, 1) + p.eventMetrics.stored.Add(context.Background(), 1) } else if !report { // We only increment the "dropped" counter if // we neither reported nor stored the event, so @@ -158,15 +158,15 @@ func (p *Processor) updateProcessorMetrics(ctx context.Context, report, stored, // The counter does not include events that are // implicitly dropped, i.e. stored and never // indexed. - p.eventMetrics.dropped.Add(ctx, 1) + p.eventMetrics.dropped.Add(context.Background(), 1) } } -func (p *Processor) processTransaction(ctx context.Context, event *modelpb.APMEvent) (report, stored bool, _ error) { +func (p *Processor) processTransaction(event *modelpb.APMEvent) (report, stored bool, _ error) { if !event.Transaction.Sampled { // (Head-based) unsampled transactions are passed through // by the tail sampler. - p.eventMetrics.headUnsampled.Add(ctx, 1) + p.eventMetrics.headUnsampled.Add(context.Background(), 1) return true, false, nil } @@ -177,7 +177,7 @@ func (p *Processor) processTransaction(ctx context.Context, event *modelpb.APMEv // if it was sampled. report := traceSampled if report { - p.eventMetrics.sampled.Add(ctx, 1) + p.eventMetrics.sampled.Add(context.Background(), 1) } return report, false, nil case eventstorage.ErrNotFound: @@ -229,7 +229,7 @@ sampling policies without service name specified. return false, true, p.eventStore.WriteTraceEvent(event.Trace.Id, event.Transaction.Id, event) } -func (p *Processor) processSpan(ctx context.Context, event *modelpb.APMEvent) (report, stored bool, _ error) { +func (p *Processor) processSpan(event *modelpb.APMEvent) (report, stored bool, _ error) { traceSampled, err := p.eventStore.IsTraceSampled(event.Trace.Id) if err != nil { if err == eventstorage.ErrNotFound { @@ -240,7 +240,7 @@ func (p *Processor) processSpan(ctx context.Context, event *modelpb.APMEvent) (r } // Tail-sampling decision has been made, report or drop the event. if traceSampled { - p.eventMetrics.sampled.Add(ctx, 1) + p.eventMetrics.sampled.Add(context.Background(), 1) } return traceSampled, false, nil }