Skip to content

Commit

Permalink
refectory
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Dec 17, 2024
1 parent e315ebb commit 6f36b5f
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 96 deletions.
9 changes: 1 addition & 8 deletions development/mimir-microservices-mode/config/mimir.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
multitenancy_enabled: false
cost_attribution_registry_path: "/usage-metrics"
cost_attribution_eviction_interval: 10m

distributor:
ha_tracker:
Expand Down Expand Up @@ -186,10 +184,5 @@ limits:
ha_replica_label: ha_replica
ha_max_clusters: 10

cost_attribution_labels: "container"
max_cost_attribution_labels_per_user: 2
max_cost_attribution_cardinality_per_user: 100
cost_attribution_cooldown: 20m

runtime_config:
file: ./config/runtime.yaml
file: ./config/runtime.yaml
44 changes: 31 additions & 13 deletions pkg/costattribution/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ type Tracker struct {
activeSeriesPerUserAttribution *prometheus.Desc
receivedSamplesAttribution *prometheus.Desc
discardedSampleAttribution *prometheus.Desc
failedActiveSeriesDecrement *prometheus.Desc
overflowLabels []string
obseveredMtx sync.RWMutex
observed map[string]*Observation
hashBuffer []byte
state TrackerState
overflowCounter *Observation
cooldownUntil *atomic.Int64
totalFailedActiveSeries *atomic.Float64
cooldownDuration int64
logger log.Logger
}
Expand All @@ -70,15 +72,16 @@ func newTracker(userID string, trackedLabels []string, limit int, cooldown time.
overflowLabels[len(trackedLabels)+1] = overflowValue

tracker := &Tracker{
userID: userID,
caLabels: trackedLabels,
caLabelMap: caLabelMap,
maxCardinality: limit,
observed: make(map[string]*Observation),
hashBuffer: make([]byte, 0, 1024),
cooldownDuration: int64(cooldown.Seconds()),
logger: logger,
overflowLabels: overflowLabels,
userID: userID,
caLabels: trackedLabels,
caLabelMap: caLabelMap,
maxCardinality: limit,
observed: make(map[string]*Observation),
hashBuffer: make([]byte, 0, 1024),
cooldownDuration: int64(cooldown.Seconds()),
logger: logger,
overflowLabels: overflowLabels,
totalFailedActiveSeries: atomic.NewFloat64(0),
}

tracker.discardedSampleAttribution = prometheus.NewDesc("cortex_discarded_attributed_samples_total",
Expand All @@ -94,7 +97,9 @@ func newTracker(userID string, trackedLabels []string, limit int, cooldown time.
tracker.activeSeriesPerUserAttribution = prometheus.NewDesc("cortex_ingester_attributed_active_series",
"The total number of active series per user and attribution.", append(trackedLabels, TenantLabel),
prometheus.Labels{TrackerLabel: defaultTrackerName})

tracker.failedActiveSeriesDecrement = prometheus.NewDesc("cortex_ingester_attributed_active_series_failure",
"The total number of failed active series decrement per user and tracker.", []string{TenantLabel},
prometheus.Labels{TrackerLabel: defaultTrackerName})
return tracker
}

Expand Down Expand Up @@ -149,11 +154,11 @@ func (t *Tracker) IncrementActiveSeries(lbs labels.Labels, now time.Time) {
t.updateCounters(lbs, now.Unix(), 1, 0, 0, nil)
}

func (t *Tracker) DecrementActiveSeries(lbs labels.Labels, now time.Time) {
func (t *Tracker) DecrementActiveSeries(lbs labels.Labels) {
if t == nil {
return
}
t.updateCounters(lbs, now.Unix(), -1, 0, 0, nil)
t.updateCounters(lbs, 0, -1, 0, 0, nil)
}

func (t *Tracker) Collect(out chan<- prometheus.Metric) {
Expand Down Expand Up @@ -182,6 +187,9 @@ func (t *Tracker) Collect(out chan<- prometheus.Metric) {
o.discardSamplemtx.Unlock()
}
}
if t.totalFailedActiveSeries.Load() > 0 {
out <- prometheus.MustNewConstMetric(t.failedActiveSeriesDecrement, prometheus.CounterValue, t.totalFailedActiveSeries.Load(), t.userID)
}
}

func (t *Tracker) IncrementDiscardedSamples(lbs labels.Labels, value float64, reason string, now time.Time) {
Expand All @@ -198,6 +206,13 @@ func (t *Tracker) IncrementReceivedSamples(lbs labels.Labels, value float64, now
t.updateCounters(lbs, now.Unix(), 0, value, 0, nil)
}

func (t *Tracker) IncrementActiveSeriesFailure(value float64) {
if t == nil {
return
}
t.totalFailedActiveSeries.Add(value)
}

func (t *Tracker) updateCounters(lbls labels.Labels, ts int64, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement float64, reason *string) {
labelValues := make([]string, len(t.caLabels))
lbls.Range(func(l labels.Label) {
Expand Down Expand Up @@ -248,8 +263,11 @@ func (t *Tracker) handleObservation(stream string, ts int64, activeSeriesIncreme
o.discardSamplemtx.Unlock()
}
} else if len(t.observed) < t.maxCardinality*2 {
// if the key is not know, and if the ts is not 0, it means that the method is called from DecrementActiveSeries, we should ignore the call
// Create a new observation for the stream
t.createNewObservation(stream, ts, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement, reason)
if ts != 0 {
t.createNewObservation(stream, ts, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement, reason)
}
}
}

Expand Down
11 changes: 9 additions & 2 deletions pkg/costattribution/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ func Test_CreateCleanupTracker(t *testing.T) {

cat.IncrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), time.Unix(1, 0))
cat.IncrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "2"), time.Unix(2, 0))
cat.DecrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "3"), time.Unix(3, 0))
cat.DecrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "3"))
cat.IncrementReceivedSamples(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), 5, time.Unix(4, 0))
cat.IncrementDiscardedSamples(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), 2, "sample-out-of-order", time.Unix(4, 0))

cat.IncrementActiveSeries(labels.FromStrings("platform", "bar", "tenant", "user4", "team", "2"), time.Unix(6, 0))
cat.IncrementActiveSeriesFailure(1)

expectedMetrics := `
# HELP cortex_discarded_attributed_samples_total The total number of samples that were discarded per attribution.
Expand All @@ -49,6 +49,9 @@ func Test_CreateCleanupTracker(t *testing.T) {
# TYPE cortex_ingester_attributed_active_series gauge
cortex_ingester_attributed_active_series{platform="bar",tenant="user4",tracker="cost-attribution"} 1
cortex_ingester_attributed_active_series{platform="foo",tenant="user4",tracker="cost-attribution"} 1
# HELP cortex_ingester_attributed_active_series_failure The total number of failed active series decrement per user and tracker.
# TYPE cortex_ingester_attributed_active_series_failure counter
cortex_ingester_attributed_active_series_failure{tenant="user4",tracker="cost-attribution"} 1
# HELP cortex_received_attributed_samples_total The total number of samples that were received per attribution.
# TYPE cortex_received_attributed_samples_total counter
cortex_received_attributed_samples_total{platform="foo",tenant="user4",tracker="cost-attribution"} 5
Expand All @@ -58,6 +61,7 @@ func Test_CreateCleanupTracker(t *testing.T) {
"cortex_discarded_attributed_samples_total",
"cortex_received_attributed_samples_total",
"cortex_ingester_attributed_active_series",
"cortex_ingester_attributed_active_series_failure",
}
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...))
assert.Equal(t, []string{"foo"}, cat.InactiveObservations(5))
Expand All @@ -67,6 +71,9 @@ func Test_CreateCleanupTracker(t *testing.T) {
# HELP cortex_ingester_attributed_active_series The total number of active series per user and attribution.
# TYPE cortex_ingester_attributed_active_series gauge
cortex_ingester_attributed_active_series{platform="bar",tenant="user4",tracker="cost-attribution"} 1
# HELP cortex_ingester_attributed_active_series_failure The total number of failed active series decrement per user and tracker.
# TYPE cortex_ingester_attributed_active_series_failure counter
cortex_ingester_attributed_active_series_failure{tenant="user4",tracker="cost-attribution"} 1
`
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...))
tManager.deleteUserTracker("user4")
Expand Down
47 changes: 23 additions & 24 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ import (
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
"github.com/grafana/dskit/user"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/scrape"
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"

"github.com/grafana/mimir/pkg/cardinality"
"github.com/grafana/mimir/pkg/costattribution"
ingester_client "github.com/grafana/mimir/pkg/ingester/client"
Expand All @@ -48,18 +61,6 @@ import (
"github.com/grafana/mimir/pkg/util/pool"
"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/grafana/mimir/pkg/util/validation"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/scrape"
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
)

func init() {
Expand Down Expand Up @@ -745,15 +746,14 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica
// May alter timeseries data in-place.
// The returned error may retain the series labels.

func (d *Distributor) validateSamples(tnow model.Time, ts *mimirpb.PreallocTimeseries, userID, group string) error {
func (d *Distributor) validateSamples(now model.Time, ts *mimirpb.PreallocTimeseries, userID, group string) error {
if len(ts.Samples) == 0 {
return nil
}

cat := d.costAttributionMgr.TrackerForUser(userID)

if len(ts.Samples) == 1 {
return validateSample(d.sampleValidationMetrics, tnow, d.limits, userID, group, ts.Labels, ts.Samples[0], cat)
return validateSample(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, ts.Samples[0], cat)
}

timestamps := make(map[int64]struct{}, min(len(ts.Samples), 100))
Expand All @@ -767,7 +767,7 @@ func (d *Distributor) validateSamples(tnow model.Time, ts *mimirpb.PreallocTimes
}

timestamps[s.TimestampMs] = struct{}{}
if err := validateSample(d.sampleValidationMetrics, tnow, d.limits, userID, group, ts.Labels, s, cat); err != nil {
if err := validateSample(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, s, cat); err != nil {
return err
}

Expand All @@ -787,14 +787,14 @@ func (d *Distributor) validateSamples(tnow model.Time, ts *mimirpb.PreallocTimes
// Returns an error explaining the first validation finding.
// May alter timeseries data in-place.
// The returned error may retain the series labels.
func (d *Distributor) validateHistograms(tnow model.Time, ts *mimirpb.PreallocTimeseries, userID, group string) error {
func (d *Distributor) validateHistograms(now model.Time, ts *mimirpb.PreallocTimeseries, userID, group string) error {
if len(ts.Histograms) == 0 {
return nil
}

cat := d.costAttributionMgr.TrackerForUser(userID)
if len(ts.Histograms) == 1 {
updated, err := validateSampleHistogram(d.sampleValidationMetrics, tnow, d.limits, userID, group, ts.Labels, &ts.Histograms[0], cat)
updated, err := validateSampleHistogram(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, &ts.Histograms[0], cat)
if err != nil {
return err
}
Expand All @@ -807,7 +807,6 @@ func (d *Distributor) validateHistograms(tnow model.Time, ts *mimirpb.PreallocTi
timestamps := make(map[int64]struct{}, min(len(ts.Histograms), 100))
currPos := 0
histogramsUpdated := false

for idx := range ts.Histograms {
if _, ok := timestamps[ts.Histograms[idx].Timestamp]; ok {
// A sample with the same timestamp has already been validated, so we skip it.
Expand All @@ -816,7 +815,7 @@ func (d *Distributor) validateHistograms(tnow model.Time, ts *mimirpb.PreallocTi
}

timestamps[ts.Histograms[idx].Timestamp] = struct{}{}
updated, err := validateSampleHistogram(d.sampleValidationMetrics, tnow, d.limits, userID, group, ts.Labels, &ts.Histograms[idx], cat)
updated, err := validateSampleHistogram(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, &ts.Histograms[idx], cat)
if err != nil {
return err
}
Expand Down Expand Up @@ -884,6 +883,7 @@ func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeser
if err := validateLabels(d.sampleValidationMetrics, d.limits, userID, group, ts.Labels, skipLabelValidation, skipLabelCountValidation, cat, nowt); err != nil {
return true, err
}

now := model.TimeFromUnixNano(nowt.UnixNano())
totalSamplesAndHistograms := len(ts.Samples) + len(ts.Histograms)

Expand Down Expand Up @@ -973,8 +973,8 @@ func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc {
}

numSamples := 0
tnow := time.Now()
group := d.activeGroups.UpdateActiveGroupTimestamp(userID, validation.GroupLabel(d.limits, userID, req.Timeseries), tnow)
now := time.Now()
group := d.activeGroups.UpdateActiveGroupTimestamp(userID, validation.GroupLabel(d.limits, userID, req.Timeseries), now)
for _, ts := range req.Timeseries {
numSamples += len(ts.Samples) + len(ts.Histograms)
}
Expand All @@ -988,7 +988,7 @@ func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc {

if errors.As(err, &tooManyClustersError{}) {
d.discardedSamplesTooManyHaClusters.WithLabelValues(userID, group).Add(float64(numSamples))
d.costAttributionMgr.TrackerForUser(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(req.Timeseries[0].Labels), float64(numSamples), reasonTooManyHAClusters, tnow)
d.costAttributionMgr.TrackerForUser(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(req.Timeseries[0].Labels), float64(numSamples), reasonTooManyHAClusters, now)
}

return err
Expand Down Expand Up @@ -1829,7 +1829,6 @@ func tokenForMetadata(userID string, metricName string) uint32 {

func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string) {
var receivedSamples, receivedExemplars, receivedMetadata int

for _, ts := range req.Timeseries {
receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms)
receivedExemplars += len(ts.TimeSeries.Exemplars)
Expand Down
1 change: 1 addition & 0 deletions pkg/distributor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ func validateLabels(m *sampleValidationMetrics, cfg labelValidationConfig, userI
m.labelNameTooLong.WithLabelValues(userID, group).Inc()
return fmt.Errorf(labelNameTooLongMsgFormat, l.Name, mimirpb.FromLabelAdaptersToString(ls))
} else if !skipLabelValidation && !model.LabelValue(l.Value).IsValid() {
cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonInvalidLabelValue, ts)
m.invalidLabelValue.WithLabelValues(userID, group).Inc()
return fmt.Errorf(invalidLabelValueMsgFormat, l.Name, strings.ToValidUTF8(l.Value, ""), unsafeMetricName)
} else if len(l.Value) > maxLabelValueLength {
Expand Down
Loading

0 comments on commit 6f36b5f

Please sign in to comment.