From 6f36b5fb3ecb08b3ea2c696cca444225370f083a Mon Sep 17 00:00:00 2001 From: Ying WANG Date: Tue, 17 Dec 2024 21:58:35 +0100 Subject: [PATCH] refectory --- .../config/mimir.yaml | 9 +-- pkg/costattribution/tracker.go | 44 +++++++++---- pkg/costattribution/tracker_test.go | 11 +++- pkg/distributor/distributor.go | 47 +++++++------- pkg/distributor/validate.go | 1 + pkg/ingester/activeseries/active_series.go | 63 +++++++++---------- pkg/ingester/ingester.go | 30 ++++----- pkg/ingester/user_tsdb.go | 7 ++- 8 files changed, 116 insertions(+), 96 deletions(-) diff --git a/development/mimir-microservices-mode/config/mimir.yaml b/development/mimir-microservices-mode/config/mimir.yaml index 31702611891..5d245999115 100644 --- a/development/mimir-microservices-mode/config/mimir.yaml +++ b/development/mimir-microservices-mode/config/mimir.yaml @@ -1,6 +1,4 @@ multitenancy_enabled: false -cost_attribution_registry_path: "/usage-metrics" -cost_attribution_eviction_interval: 10m distributor: ha_tracker: @@ -186,10 +184,5 @@ limits: ha_replica_label: ha_replica ha_max_clusters: 10 - cost_attribution_labels: "container" - max_cost_attribution_labels_per_user: 2 - max_cost_attribution_cardinality_per_user: 100 - cost_attribution_cooldown: 20m - runtime_config: - file: ./config/runtime.yaml \ No newline at end of file + file: ./config/runtime.yaml diff --git a/pkg/costattribution/tracker.go b/pkg/costattribution/tracker.go index 0a232195848..eaefa3bd62e 100644 --- a/pkg/costattribution/tracker.go +++ b/pkg/costattribution/tracker.go @@ -42,6 +42,7 @@ type Tracker struct { activeSeriesPerUserAttribution *prometheus.Desc receivedSamplesAttribution *prometheus.Desc discardedSampleAttribution *prometheus.Desc + failedActiveSeriesDecrement *prometheus.Desc overflowLabels []string obseveredMtx sync.RWMutex observed map[string]*Observation @@ -49,6 +50,7 @@ type Tracker struct { state TrackerState overflowCounter *Observation cooldownUntil *atomic.Int64 + totalFailedActiveSeries *atomic.Float64 cooldownDuration int64 logger log.Logger } @@ -70,15 +72,16 @@ func newTracker(userID string, trackedLabels []string, limit int, cooldown time. overflowLabels[len(trackedLabels)+1] = overflowValue tracker := &Tracker{ - userID: userID, - caLabels: trackedLabels, - caLabelMap: caLabelMap, - maxCardinality: limit, - observed: make(map[string]*Observation), - hashBuffer: make([]byte, 0, 1024), - cooldownDuration: int64(cooldown.Seconds()), - logger: logger, - overflowLabels: overflowLabels, + userID: userID, + caLabels: trackedLabels, + caLabelMap: caLabelMap, + maxCardinality: limit, + observed: make(map[string]*Observation), + hashBuffer: make([]byte, 0, 1024), + cooldownDuration: int64(cooldown.Seconds()), + logger: logger, + overflowLabels: overflowLabels, + totalFailedActiveSeries: atomic.NewFloat64(0), } tracker.discardedSampleAttribution = prometheus.NewDesc("cortex_discarded_attributed_samples_total", @@ -94,7 +97,9 @@ func newTracker(userID string, trackedLabels []string, limit int, cooldown time. tracker.activeSeriesPerUserAttribution = prometheus.NewDesc("cortex_ingester_attributed_active_series", "The total number of active series per user and attribution.", append(trackedLabels, TenantLabel), prometheus.Labels{TrackerLabel: defaultTrackerName}) - + tracker.failedActiveSeriesDecrement = prometheus.NewDesc("cortex_ingester_attributed_active_series_failure", + "The total number of failed active series decrement per user and tracker.", []string{TenantLabel}, + prometheus.Labels{TrackerLabel: defaultTrackerName}) return tracker } @@ -149,11 +154,11 @@ func (t *Tracker) IncrementActiveSeries(lbs labels.Labels, now time.Time) { t.updateCounters(lbs, now.Unix(), 1, 0, 0, nil) } -func (t *Tracker) DecrementActiveSeries(lbs labels.Labels, now time.Time) { +func (t *Tracker) DecrementActiveSeries(lbs labels.Labels) { if t == nil { return } - t.updateCounters(lbs, now.Unix(), -1, 0, 0, nil) + t.updateCounters(lbs, 0, -1, 0, 0, nil) } func (t *Tracker) Collect(out chan<- prometheus.Metric) { @@ -182,6 +187,9 @@ func (t *Tracker) Collect(out chan<- prometheus.Metric) { o.discardSamplemtx.Unlock() } } + if t.totalFailedActiveSeries.Load() > 0 { + out <- prometheus.MustNewConstMetric(t.failedActiveSeriesDecrement, prometheus.CounterValue, t.totalFailedActiveSeries.Load(), t.userID) + } } func (t *Tracker) IncrementDiscardedSamples(lbs labels.Labels, value float64, reason string, now time.Time) { @@ -198,6 +206,13 @@ func (t *Tracker) IncrementReceivedSamples(lbs labels.Labels, value float64, now t.updateCounters(lbs, now.Unix(), 0, value, 0, nil) } +func (t *Tracker) IncrementActiveSeriesFailure(value float64) { + if t == nil { + return + } + t.totalFailedActiveSeries.Add(value) +} + func (t *Tracker) updateCounters(lbls labels.Labels, ts int64, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement float64, reason *string) { labelValues := make([]string, len(t.caLabels)) lbls.Range(func(l labels.Label) { @@ -248,8 +263,11 @@ func (t *Tracker) handleObservation(stream string, ts int64, activeSeriesIncreme o.discardSamplemtx.Unlock() } } else if len(t.observed) < t.maxCardinality*2 { + // if the key is not know, and if the ts is not 0, it means that the method is called from DecrementActiveSeries, we should ignore the call // Create a new observation for the stream - t.createNewObservation(stream, ts, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement, reason) + if ts != 0 { + t.createNewObservation(stream, ts, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement, reason) + } } } diff --git a/pkg/costattribution/tracker_test.go b/pkg/costattribution/tracker_test.go index 82de4e8b64c..bd63bfae785 100644 --- a/pkg/costattribution/tracker_test.go +++ b/pkg/costattribution/tracker_test.go @@ -35,11 +35,11 @@ func Test_CreateCleanupTracker(t *testing.T) { cat.IncrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), time.Unix(1, 0)) cat.IncrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "2"), time.Unix(2, 0)) - cat.DecrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "3"), time.Unix(3, 0)) + cat.DecrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "3")) cat.IncrementReceivedSamples(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), 5, time.Unix(4, 0)) cat.IncrementDiscardedSamples(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), 2, "sample-out-of-order", time.Unix(4, 0)) - cat.IncrementActiveSeries(labels.FromStrings("platform", "bar", "tenant", "user4", "team", "2"), time.Unix(6, 0)) + cat.IncrementActiveSeriesFailure(1) expectedMetrics := ` # HELP cortex_discarded_attributed_samples_total The total number of samples that were discarded per attribution. @@ -49,6 +49,9 @@ func Test_CreateCleanupTracker(t *testing.T) { # TYPE cortex_ingester_attributed_active_series gauge cortex_ingester_attributed_active_series{platform="bar",tenant="user4",tracker="cost-attribution"} 1 cortex_ingester_attributed_active_series{platform="foo",tenant="user4",tracker="cost-attribution"} 1 + # HELP cortex_ingester_attributed_active_series_failure The total number of failed active series decrement per user and tracker. + # TYPE cortex_ingester_attributed_active_series_failure counter + cortex_ingester_attributed_active_series_failure{tenant="user4",tracker="cost-attribution"} 1 # HELP cortex_received_attributed_samples_total The total number of samples that were received per attribution. # TYPE cortex_received_attributed_samples_total counter cortex_received_attributed_samples_total{platform="foo",tenant="user4",tracker="cost-attribution"} 5 @@ -58,6 +61,7 @@ func Test_CreateCleanupTracker(t *testing.T) { "cortex_discarded_attributed_samples_total", "cortex_received_attributed_samples_total", "cortex_ingester_attributed_active_series", + "cortex_ingester_attributed_active_series_failure", } assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...)) assert.Equal(t, []string{"foo"}, cat.InactiveObservations(5)) @@ -67,6 +71,9 @@ func Test_CreateCleanupTracker(t *testing.T) { # HELP cortex_ingester_attributed_active_series The total number of active series per user and attribution. # TYPE cortex_ingester_attributed_active_series gauge cortex_ingester_attributed_active_series{platform="bar",tenant="user4",tracker="cost-attribution"} 1 + # HELP cortex_ingester_attributed_active_series_failure The total number of failed active series decrement per user and tracker. + # TYPE cortex_ingester_attributed_active_series_failure counter + cortex_ingester_attributed_active_series_failure{tenant="user4",tracker="cost-attribution"} 1 ` assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...)) tManager.deleteUserTracker("user4") diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 3594123435d..a14bf4b52f4 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -34,6 +34,19 @@ import ( "github.com/grafana/dskit/services" "github.com/grafana/dskit/tenant" "github.com/grafana/dskit/user" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" + "github.com/prometheus/prometheus/scrape" + "go.uber.org/atomic" + "golang.org/x/exp/slices" + "golang.org/x/sync/errgroup" + "github.com/grafana/mimir/pkg/cardinality" "github.com/grafana/mimir/pkg/costattribution" ingester_client "github.com/grafana/mimir/pkg/ingester/client" @@ -48,18 +61,6 @@ import ( "github.com/grafana/mimir/pkg/util/pool" "github.com/grafana/mimir/pkg/util/spanlogger" "github.com/grafana/mimir/pkg/util/validation" - "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/relabel" - "github.com/prometheus/prometheus/scrape" - "go.uber.org/atomic" - "golang.org/x/exp/slices" - "golang.org/x/sync/errgroup" ) func init() { @@ -745,15 +746,14 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica // May alter timeseries data in-place. // The returned error may retain the series labels. -func (d *Distributor) validateSamples(tnow model.Time, ts *mimirpb.PreallocTimeseries, userID, group string) error { +func (d *Distributor) validateSamples(now model.Time, ts *mimirpb.PreallocTimeseries, userID, group string) error { if len(ts.Samples) == 0 { return nil } cat := d.costAttributionMgr.TrackerForUser(userID) - if len(ts.Samples) == 1 { - return validateSample(d.sampleValidationMetrics, tnow, d.limits, userID, group, ts.Labels, ts.Samples[0], cat) + return validateSample(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, ts.Samples[0], cat) } timestamps := make(map[int64]struct{}, min(len(ts.Samples), 100)) @@ -767,7 +767,7 @@ func (d *Distributor) validateSamples(tnow model.Time, ts *mimirpb.PreallocTimes } timestamps[s.TimestampMs] = struct{}{} - if err := validateSample(d.sampleValidationMetrics, tnow, d.limits, userID, group, ts.Labels, s, cat); err != nil { + if err := validateSample(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, s, cat); err != nil { return err } @@ -787,14 +787,14 @@ func (d *Distributor) validateSamples(tnow model.Time, ts *mimirpb.PreallocTimes // Returns an error explaining the first validation finding. // May alter timeseries data in-place. // The returned error may retain the series labels. -func (d *Distributor) validateHistograms(tnow model.Time, ts *mimirpb.PreallocTimeseries, userID, group string) error { +func (d *Distributor) validateHistograms(now model.Time, ts *mimirpb.PreallocTimeseries, userID, group string) error { if len(ts.Histograms) == 0 { return nil } cat := d.costAttributionMgr.TrackerForUser(userID) if len(ts.Histograms) == 1 { - updated, err := validateSampleHistogram(d.sampleValidationMetrics, tnow, d.limits, userID, group, ts.Labels, &ts.Histograms[0], cat) + updated, err := validateSampleHistogram(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, &ts.Histograms[0], cat) if err != nil { return err } @@ -807,7 +807,6 @@ func (d *Distributor) validateHistograms(tnow model.Time, ts *mimirpb.PreallocTi timestamps := make(map[int64]struct{}, min(len(ts.Histograms), 100)) currPos := 0 histogramsUpdated := false - for idx := range ts.Histograms { if _, ok := timestamps[ts.Histograms[idx].Timestamp]; ok { // A sample with the same timestamp has already been validated, so we skip it. @@ -816,7 +815,7 @@ func (d *Distributor) validateHistograms(tnow model.Time, ts *mimirpb.PreallocTi } timestamps[ts.Histograms[idx].Timestamp] = struct{}{} - updated, err := validateSampleHistogram(d.sampleValidationMetrics, tnow, d.limits, userID, group, ts.Labels, &ts.Histograms[idx], cat) + updated, err := validateSampleHistogram(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, &ts.Histograms[idx], cat) if err != nil { return err } @@ -884,6 +883,7 @@ func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeser if err := validateLabels(d.sampleValidationMetrics, d.limits, userID, group, ts.Labels, skipLabelValidation, skipLabelCountValidation, cat, nowt); err != nil { return true, err } + now := model.TimeFromUnixNano(nowt.UnixNano()) totalSamplesAndHistograms := len(ts.Samples) + len(ts.Histograms) @@ -973,8 +973,8 @@ func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc { } numSamples := 0 - tnow := time.Now() - group := d.activeGroups.UpdateActiveGroupTimestamp(userID, validation.GroupLabel(d.limits, userID, req.Timeseries), tnow) + now := time.Now() + group := d.activeGroups.UpdateActiveGroupTimestamp(userID, validation.GroupLabel(d.limits, userID, req.Timeseries), now) for _, ts := range req.Timeseries { numSamples += len(ts.Samples) + len(ts.Histograms) } @@ -988,7 +988,7 @@ func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc { if errors.As(err, &tooManyClustersError{}) { d.discardedSamplesTooManyHaClusters.WithLabelValues(userID, group).Add(float64(numSamples)) - d.costAttributionMgr.TrackerForUser(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(req.Timeseries[0].Labels), float64(numSamples), reasonTooManyHAClusters, tnow) + d.costAttributionMgr.TrackerForUser(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(req.Timeseries[0].Labels), float64(numSamples), reasonTooManyHAClusters, now) } return err @@ -1829,7 +1829,6 @@ func tokenForMetadata(userID string, metricName string) uint32 { func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string) { var receivedSamples, receivedExemplars, receivedMetadata int - for _, ts := range req.Timeseries { receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms) receivedExemplars += len(ts.TimeSeries.Exemplars) diff --git a/pkg/distributor/validate.go b/pkg/distributor/validate.go index 8b9849ba730..5b6775cdf9f 100644 --- a/pkg/distributor/validate.go +++ b/pkg/distributor/validate.go @@ -443,6 +443,7 @@ func validateLabels(m *sampleValidationMetrics, cfg labelValidationConfig, userI m.labelNameTooLong.WithLabelValues(userID, group).Inc() return fmt.Errorf(labelNameTooLongMsgFormat, l.Name, mimirpb.FromLabelAdaptersToString(ls)) } else if !skipLabelValidation && !model.LabelValue(l.Value).IsValid() { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonInvalidLabelValue, ts) m.invalidLabelValue.WithLabelValues(userID, group).Inc() return fmt.Errorf(invalidLabelValueMsgFormat, l.Name, strings.ToValidUTF8(l.Value, ""), unsafeMetricName) } else if len(l.Value) > maxLabelValueLength { diff --git a/pkg/ingester/activeseries/active_series.go b/pkg/ingester/activeseries/active_series.go index e7895404a22..6c06a62e162 100644 --- a/pkg/ingester/activeseries/active_series.go +++ b/pkg/ingester/activeseries/active_series.go @@ -49,9 +49,10 @@ type ActiveSeries struct { // configMutex protects matchers and lastMatchersUpdate. it used by both matchers and cat configMutex sync.RWMutex matchers *asmodel.Matchers - cat *costattribution.Tracker lastConfigUpdate time.Time + cat *costattribution.Tracker + // The duration after which series become inactive. // Also used to determine if enough time has passed since configuration reload for valid results. timeout time.Duration @@ -67,7 +68,6 @@ type seriesStripe struct { // Updated in purge and when old timestamp is used when updating series (in this case, oldestEntryTs is updated // without holding the lock -- hence the atomic). oldestEntryTs atomic.Int64 - cat *costattribution.Tracker mu sync.RWMutex refs map[storage.SeriesRef]seriesEntry active uint32 // Number of active entries in this stripe. Only decreased during purge or clear. @@ -76,6 +76,8 @@ type seriesStripe struct { activeMatchingNativeHistograms []uint32 // Number of active entries (only native histograms) in this stripe matching each matcher of the configured Matchers. activeNativeHistogramBuckets uint32 // Number of buckets in active native histogram entries in this stripe. Only decreased during purge or clear. activeMatchingNativeHistogramBuckets []uint32 // Number of buckets in active native histogram entries in this stripe matching each matcher of the configured Matchers. + + cat *costattribution.Tracker } // seriesEntry holds a timestamp for single series. @@ -87,14 +89,8 @@ type seriesEntry struct { deleted bool // This series was marked as deleted, so before purging we need to remove the refence to it from the deletedSeries. } -func NewActiveSeries( - asm *asmodel.Matchers, - timeout time.Duration, - cat *costattribution.Tracker, -) *ActiveSeries { - c := &ActiveSeries{ - matchers: asm, timeout: timeout, cat: cat, - } +func NewActiveSeries(asm *asmodel.Matchers, timeout time.Duration, cat *costattribution.Tracker) *ActiveSeries { + c := &ActiveSeries{matchers: asm, timeout: timeout, cat: cat} // Stripes are pre-allocated so that we only read on them and no lock is required. for i := 0; i < numStripes; i++ { @@ -112,8 +108,7 @@ func (c *ActiveSeries) CurrentMatcherNames() []string { func (c *ActiveSeries) ConfigDiffers(ctCfg asmodel.CustomTrackersConfig, caCfg *costattribution.Tracker) bool { currentCTC, currentCAT := c.CurrentConfig() - // TODO: I think here to check the pointer is not equal is already enough, if we recreate tracker, it is for a good reason, otherwise, nothing changed - return ctCfg.String() != currentCTC.String() || caCfg != currentCAT //|| !costattribution.CompareCALabels(caCfg.CALabels(), currentCAT.CALabels()) + return ctCfg.String() != currentCTC.String() || caCfg != currentCAT } func (c *ActiveSeries) ReloadMatchers(asm *asmodel.Matchers, now time.Time) { @@ -137,6 +132,7 @@ func (c *ActiveSeries) CurrentConfig() (asmodel.CustomTrackersConfig, *costattri // Pass -1 in numNativeHistogramBuckets if the series is not a native histogram series. func (c *ActiveSeries) UpdateSeries(series labels.Labels, ref storage.SeriesRef, now time.Time, numNativeHistogramBuckets int, idx tsdb.IndexReader) { stripeID := ref % numStripes + created := c.stripes[stripeID].updateSeriesTimestamp(now, series, ref, numNativeHistogramBuckets) if created { if deleted, ok := c.deleted.find(series); ok { @@ -408,8 +404,6 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef numNativeHistogramBuckets: numNativeHistogramBuckets, } - // here if we have a cost attribution label, we can split the serie count based on the value of the label - // we also set the reference to the value of the label in the entry, so when remove, we can decrease the counter accordingly s.cat.IncrementActiveSeries(series, time.Unix(0, nowNanos)) s.refs[ref] = e return e.nanos, true @@ -432,11 +426,7 @@ func (s *seriesStripe) clear() { } // Reinitialize assigns new matchers and corresponding size activeMatching slices. -func (s *seriesStripe) reinitialize( - asm *asmodel.Matchers, - deleted *deletedSeries, - cat *costattribution.Tracker, -) { +func (s *seriesStripe) reinitialize(asm *asmodel.Matchers, deleted *deletedSeries, cat *costattribution.Tracker) { s.mu.Lock() defer s.mu.Unlock() s.deleted = deleted @@ -474,17 +464,20 @@ func (s *seriesStripe) purge(keepUntil time.Time, idx tsdb.IndexReader) { for ref, entry := range s.refs { ts := entry.nanos.Load() if ts < keepUntilNanos { + // cost attribution is enabled, if it's not nil, we need to decrement the active series count, otherwise means received error when get idx, + // we need to increment the active series failure count. + if s.cat != nil { + if idx == nil { + s.cat.IncrementActiveSeriesFailure(1) + } else if err := idx.Series(ref, &buf, nil); err != nil { + s.cat.IncrementActiveSeriesFailure(1) + } else { + s.cat.DecrementActiveSeries(buf.Labels()) + } + } if entry.deleted { s.deleted.purge(ref) } - - if idx != nil { - if err := idx.Series(ref, &buf, nil); err != nil { - //TODO: think about what to do here - _ = err - } - s.cat.DecrementActiveSeries(buf.Labels(), keepUntil) - } delete(s.refs, ref) continue } @@ -532,13 +525,17 @@ func (s *seriesStripe) remove(ref storage.SeriesRef, idx tsdb.IndexReader) { } s.active-- - if idx != nil { - buf := labels.NewScratchBuilder(10) - if err := idx.Series(ref, &buf, nil); err != nil { - //TODO: think about what to do here - _ = err + if s.cat != nil { + if idx == nil { + s.cat.IncrementActiveSeriesFailure(1) + } else { + buf := labels.NewScratchBuilder(128) + if err := idx.Series(ref, &buf, nil); err != nil { + s.cat.IncrementActiveSeriesFailure(1) + } else { + s.cat.DecrementActiveSeries(buf.Labels()) + } } - s.cat.DecrementActiveSeries(buf.Labels(), time.Now()) } if entry.numNativeHistogramBuckets >= 0 { s.activeNativeHistograms-- diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 2b3561a3530..763ce527c5c 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -371,9 +371,8 @@ func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus limits: limits, logger: logger, - tsdbs: make(map[string]*userTSDB), - usersMetadata: make(map[string]*userMetricsMetadata), - + tsdbs: make(map[string]*userTSDB), + usersMetadata: make(map[string]*userMetricsMetadata), bucket: bucketClient, tsdbMetrics: newTSDBMetrics(registerer, logger), shipperMetrics: newShipperMetrics(registerer), @@ -793,7 +792,12 @@ func (i *Ingester) updateActiveSeries(now time.Time) { i.replaceMatchers(asmodel.NewMatchers(newMatchersConfig), userDB, now) } - idx, _ := userDB.Head().Index() + // If the userDB idx is unavailable, pass nil pointer to Purge methode, and record it as a failure in metrics when decrementing active series. + idx, err := userDB.Head().Index() + if err != nil { + level.Warn(i.logger).Log("msg", "failed to get the index of the TSDB head", "user", userID, "err", err) + idx = nil + } valid := userDB.activeSeries.Purge(now, idx) if !valid { // Active series config has been reloaded, exposing loading metric until MetricsIdleTimeout passes. @@ -1167,6 +1171,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques // Note that we don't .Finish() the span in this method on purpose spanlog := spanlogger.FromContext(ctx, i.logger) spanlog.DebugLog("event", "acquired append lock") + var ( startAppend = time.Now() @@ -1411,8 +1416,10 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre var nonCopiedLabels labels.Labels // idx is used to decrease active series count in case of error for cost attribution. - idx, _ := i.getTSDB(userID).Head().Index() - // TODO: deal with the error here + idx, err := i.getTSDB(userID).Head().Index() + if err != nil { + idx = nil + } for _, ts := range timeseries { // The labels must be sorted (in our case, it's guaranteed a write request @@ -1429,7 +1436,6 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre allOutOfBoundsHistograms(ts.Histograms, minAppendTime) { stats.failedSamplesCount += len(ts.Samples) + len(ts.Histograms) - stats.sampleTimestampTooOldCount += len(ts.Samples) + len(ts.Histograms) i.costAttributionMgr.TrackerForUser(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ts.Labels), float64(len(ts.Samples)+len(ts.Histograms)), reasonSampleTimestampTooOld, startAppend) var firstTimestamp int64 @@ -2666,12 +2672,8 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD } userDB := &userTSDB{ - userID: userID, - activeSeries: activeseries.NewActiveSeries( - asmodel.NewMatchers(matchersConfig), - i.cfg.ActiveSeriesMetrics.IdleTimeout, - i.costAttributionMgr.TrackerForUser(userID), - ), + userID: userID, + activeSeries: activeseries.NewActiveSeries(asmodel.NewMatchers(matchersConfig), i.cfg.ActiveSeriesMetrics.IdleTimeout, i.costAttributionMgr.TrackerForUser(userID)), seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()), ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), ingestedRuleSamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), @@ -3274,7 +3276,7 @@ func (i *Ingester) compactBlocksToReduceInMemorySeries(ctx context.Context, now idx, err := db.Head().Index() if err != nil { level.Warn(i.logger).Log("msg", "failed to get the index of the TSDB head", "user", userID, "err", err) - continue + idx = nil } db.activeSeries.Purge(now, idx) diff --git a/pkg/ingester/user_tsdb.go b/pkg/ingester/user_tsdb.go index 2f31f41892e..61c1aa244ee 100644 --- a/pkg/ingester/user_tsdb.go +++ b/pkg/ingester/user_tsdb.go @@ -619,8 +619,11 @@ func (u *userTSDB) computeOwnedSeries() int { } count := 0 - idx, _ := u.Head().Index() - // TODO: deal with the err here + idx, err := u.Head().Index() + if err != nil { + idx = nil + } + u.Head().ForEachSecondaryHash(func(refs []chunks.HeadSeriesRef, secondaryHashes []uint32) { for i, sh := range secondaryHashes { if u.ownedTokenRanges.IncludesKey(sh) {