Skip to content

Commit

Permalink
refectory
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Dec 17, 2024
1 parent e315ebb commit 077a94a
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 115 deletions.
9 changes: 1 addition & 8 deletions development/mimir-microservices-mode/config/mimir.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
multitenancy_enabled: false
cost_attribution_registry_path: "/usage-metrics"
cost_attribution_eviction_interval: 10m

distributor:
ha_tracker:
Expand Down Expand Up @@ -186,10 +184,5 @@ limits:
ha_replica_label: ha_replica
ha_max_clusters: 10

cost_attribution_labels: "container"
max_cost_attribution_labels_per_user: 2
max_cost_attribution_cardinality_per_user: 100
cost_attribution_cooldown: 20m

runtime_config:
file: ./config/runtime.yaml
file: ./config/runtime.yaml
17 changes: 8 additions & 9 deletions pkg/costattribution/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,15 @@ func (m *Manager) inactiveObservationsForUser(userID string, deadline int64) []s
m.trackersByUserID[userID] = cat
m.mtx.Unlock()
return nil
} else {
maxCardinality := m.limits.MaxCostAttributionCardinalityPerUser(userID)
if cat.MaxCardinality() != maxCardinality {
cat.UpdateMaxCardinality(maxCardinality)
}
}
maxCardinality := m.limits.MaxCostAttributionCardinalityPerUser(userID)
if cat.MaxCardinality() != maxCardinality {
cat.UpdateMaxCardinality(maxCardinality)
}

cooldown := int64(m.limits.CostAttributionCooldown(userID).Seconds())
if cooldown != cat.CooldownDuration() {
cat.UpdateCooldownDuration(cooldown)
}
cooldown := int64(m.limits.CostAttributionCooldown(userID).Seconds())
if cooldown != cat.CooldownDuration() {
cat.UpdateCooldownDuration(cooldown)
}

return cat.InactiveObservations(deadline)
Expand Down
21 changes: 13 additions & 8 deletions pkg/costattribution/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func Test_CreateDeleteTracker(t *testing.T) {
})

t.Run("Purge inactive attributions", func(t *testing.T) {
manager.purgeInactiveAttributionsUntil(time.Unix(10, 0).Unix())
err := manager.purgeInactiveAttributionsUntil(time.Unix(10, 0).Unix())
assert.NoError(t, err)
expectedMetrics := `
# HELP cortex_discarded_attributed_samples_total The total number of samples that were discarded per attribution.
# TYPE cortex_discarded_attributed_samples_total counter
Expand All @@ -103,8 +104,10 @@ func Test_CreateDeleteTracker(t *testing.T) {
})

t.Run("Disabling user cost attribution", func(t *testing.T) {
manager.limits, _ = getMockLimits(1)
manager.purgeInactiveAttributionsUntil(time.Unix(11, 0).Unix())
var err error
manager.limits, err = getMockLimits(1)
assert.NoError(t, err)
assert.NoError(t, manager.purgeInactiveAttributionsUntil(time.Unix(11, 0).Unix()))
assert.Equal(t, 1, len(manager.trackersByUserID))

expectedMetrics := `
Expand All @@ -116,8 +119,10 @@ func Test_CreateDeleteTracker(t *testing.T) {
})

t.Run("Updating user cardinality and labels", func(t *testing.T) {
manager.limits, _ = getMockLimits(2)
manager.purgeInactiveAttributionsUntil(time.Unix(12, 0).Unix())
var err error
manager.limits, err = getMockLimits(2)
assert.NoError(t, err)
assert.NoError(t, manager.purgeInactiveAttributionsUntil(time.Unix(12, 0).Unix()))
assert.Equal(t, 1, len(manager.trackersByUserID))
assert.True(t, manager.TrackerForUser("user3").CompareCALabels([]string{"feature", "team"}))

Expand Down Expand Up @@ -151,7 +156,7 @@ func Test_PurgeInactiveAttributionsUntil(t *testing.T) {
manager.TrackerForUser("user3").IncrementDiscardedSamples(labels.FromStrings("department", "foo", "service", "bar"), 1, "out-of-window", time.Unix(10, 0))

t.Run("Purge before inactive timeout", func(t *testing.T) {
manager.purgeInactiveAttributionsUntil(time.Unix(0, 0).Unix())
assert.NoError(t, manager.purgeInactiveAttributionsUntil(time.Unix(0, 0).Unix()))
assert.Equal(t, 2, len(manager.trackersByUserID))

expectedMetrics := `
Expand All @@ -166,7 +171,7 @@ func Test_PurgeInactiveAttributionsUntil(t *testing.T) {
t.Run("Purge after inactive timeout", func(t *testing.T) {
// disable cost attribution for user1 to test purging
manager.limits, _ = getMockLimits(1)
manager.purgeInactiveAttributionsUntil(time.Unix(5, 0).Unix())
assert.NoError(t, manager.purgeInactiveAttributionsUntil(time.Unix(5, 0).Unix()))

// User3's tracker should remain since it's active, user1's tracker should be removed
assert.Equal(t, 1, len(manager.trackersByUserID), "Expected one active tracker after purging")
Expand All @@ -182,7 +187,7 @@ func Test_PurgeInactiveAttributionsUntil(t *testing.T) {

t.Run("Purge all trackers", func(t *testing.T) {
// Trigger a purge that should remove all inactive trackers
manager.purgeInactiveAttributionsUntil(time.Unix(20, 0).Unix())
assert.NoError(t, manager.purgeInactiveAttributionsUntil(time.Unix(20, 0).Unix()))

// Tracker would stay at 1 since user1's tracker is disabled
assert.Equal(t, 1, len(manager.trackersByUserID), "Expected one active tracker after full purge")
Expand Down
46 changes: 32 additions & 14 deletions pkg/costattribution/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ type Tracker struct {
activeSeriesPerUserAttribution *prometheus.Desc
receivedSamplesAttribution *prometheus.Desc
discardedSampleAttribution *prometheus.Desc
failedActiveSeriesDecrement *prometheus.Desc
overflowLabels []string
obseveredMtx sync.RWMutex
observed map[string]*Observation
hashBuffer []byte
state TrackerState
overflowCounter *Observation
cooldownUntil *atomic.Int64
totalFailedActiveSeries *atomic.Float64
cooldownDuration int64
logger log.Logger
}
Expand All @@ -70,15 +72,16 @@ func newTracker(userID string, trackedLabels []string, limit int, cooldown time.
overflowLabels[len(trackedLabels)+1] = overflowValue

tracker := &Tracker{
userID: userID,
caLabels: trackedLabels,
caLabelMap: caLabelMap,
maxCardinality: limit,
observed: make(map[string]*Observation),
hashBuffer: make([]byte, 0, 1024),
cooldownDuration: int64(cooldown.Seconds()),
logger: logger,
overflowLabels: overflowLabels,
userID: userID,
caLabels: trackedLabels,
caLabelMap: caLabelMap,
maxCardinality: limit,
observed: make(map[string]*Observation),
hashBuffer: make([]byte, 0, 1024),
cooldownDuration: int64(cooldown.Seconds()),
logger: logger,
overflowLabels: overflowLabels,
totalFailedActiveSeries: atomic.NewFloat64(0),
}

tracker.discardedSampleAttribution = prometheus.NewDesc("cortex_discarded_attributed_samples_total",
Expand All @@ -94,7 +97,9 @@ func newTracker(userID string, trackedLabels []string, limit int, cooldown time.
tracker.activeSeriesPerUserAttribution = prometheus.NewDesc("cortex_ingester_attributed_active_series",
"The total number of active series per user and attribution.", append(trackedLabels, TenantLabel),
prometheus.Labels{TrackerLabel: defaultTrackerName})

tracker.failedActiveSeriesDecrement = prometheus.NewDesc("cortex_ingester_attributed_active_series_failure",
"The total number of failed active series decrement per user and tracker.", []string{TenantLabel},
prometheus.Labels{TrackerLabel: defaultTrackerName})
return tracker
}

Expand Down Expand Up @@ -149,11 +154,11 @@ func (t *Tracker) IncrementActiveSeries(lbs labels.Labels, now time.Time) {
t.updateCounters(lbs, now.Unix(), 1, 0, 0, nil)
}

func (t *Tracker) DecrementActiveSeries(lbs labels.Labels, now time.Time) {
func (t *Tracker) DecrementActiveSeries(lbs labels.Labels) {
if t == nil {
return
}
t.updateCounters(lbs, now.Unix(), -1, 0, 0, nil)
t.updateCounters(lbs, -1, -1, 0, 0, nil)
}

func (t *Tracker) Collect(out chan<- prometheus.Metric) {
Expand Down Expand Up @@ -182,6 +187,9 @@ func (t *Tracker) Collect(out chan<- prometheus.Metric) {
o.discardSamplemtx.Unlock()
}
}
if t.totalFailedActiveSeries.Load() > 0 {
out <- prometheus.MustNewConstMetric(t.failedActiveSeriesDecrement, prometheus.CounterValue, t.totalFailedActiveSeries.Load(), t.userID)
}
}

func (t *Tracker) IncrementDiscardedSamples(lbs labels.Labels, value float64, reason string, now time.Time) {
Expand All @@ -198,6 +206,13 @@ func (t *Tracker) IncrementReceivedSamples(lbs labels.Labels, value float64, now
t.updateCounters(lbs, now.Unix(), 0, value, 0, nil)
}

func (t *Tracker) IncrementActiveSeriesFailure(value float64) {
if t == nil {
return
}
t.totalFailedActiveSeries.Add(value)
}

func (t *Tracker) updateCounters(lbls labels.Labels, ts int64, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement float64, reason *string) {
labelValues := make([]string, len(t.caLabels))
lbls.Range(func(l labels.Label) {
Expand Down Expand Up @@ -248,8 +263,11 @@ func (t *Tracker) handleObservation(stream string, ts int64, activeSeriesIncreme
o.discardSamplemtx.Unlock()
}
} else if len(t.observed) < t.maxCardinality*2 {
// Create a new observation for the stream
t.createNewObservation(stream, ts, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement, reason)
// If the ts is negative, it means that the method is called from DecrementActiveSeries, when key doesn't exist we should ignore the call
// Otherwise create a new observation for the stream
if ts >= 0 {
t.createNewObservation(stream, ts, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement, reason)
}
}
}

Expand Down
13 changes: 10 additions & 3 deletions pkg/costattribution/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ func Test_CreateCleanupTracker(t *testing.T) {

cat.IncrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), time.Unix(1, 0))
cat.IncrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "2"), time.Unix(2, 0))
cat.DecrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "3"), time.Unix(3, 0))
cat.DecrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "3"))
cat.IncrementReceivedSamples(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), 5, time.Unix(4, 0))
cat.IncrementDiscardedSamples(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), 2, "sample-out-of-order", time.Unix(4, 0))

cat.IncrementActiveSeries(labels.FromStrings("platform", "bar", "tenant", "user4", "team", "2"), time.Unix(6, 0))
cat.IncrementActiveSeriesFailure(1)

expectedMetrics := `
# HELP cortex_discarded_attributed_samples_total The total number of samples that were discarded per attribution.
Expand All @@ -49,6 +49,9 @@ func Test_CreateCleanupTracker(t *testing.T) {
# TYPE cortex_ingester_attributed_active_series gauge
cortex_ingester_attributed_active_series{platform="bar",tenant="user4",tracker="cost-attribution"} 1
cortex_ingester_attributed_active_series{platform="foo",tenant="user4",tracker="cost-attribution"} 1
# HELP cortex_ingester_attributed_active_series_failure The total number of failed active series decrement per user and tracker.
# TYPE cortex_ingester_attributed_active_series_failure counter
cortex_ingester_attributed_active_series_failure{tenant="user4",tracker="cost-attribution"} 1
# HELP cortex_received_attributed_samples_total The total number of samples that were received per attribution.
# TYPE cortex_received_attributed_samples_total counter
cortex_received_attributed_samples_total{platform="foo",tenant="user4",tracker="cost-attribution"} 5
Expand All @@ -58,15 +61,19 @@ func Test_CreateCleanupTracker(t *testing.T) {
"cortex_discarded_attributed_samples_total",
"cortex_received_attributed_samples_total",
"cortex_ingester_attributed_active_series",
"cortex_ingester_attributed_active_series_failure",
}
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...))
assert.Equal(t, []string{"foo"}, cat.InactiveObservations(5))
tManager.purgeInactiveAttributionsUntil(5)
assert.NoError(t, tManager.purgeInactiveAttributionsUntil(5))

expectedMetrics = `
# HELP cortex_ingester_attributed_active_series The total number of active series per user and attribution.
# TYPE cortex_ingester_attributed_active_series gauge
cortex_ingester_attributed_active_series{platform="bar",tenant="user4",tracker="cost-attribution"} 1
# HELP cortex_ingester_attributed_active_series_failure The total number of failed active series decrement per user and tracker.
# TYPE cortex_ingester_attributed_active_series_failure counter
cortex_ingester_attributed_active_series_failure{tenant="user4",tracker="cost-attribution"} 1
`
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...))
tManager.deleteUserTracker("user4")
Expand Down
47 changes: 23 additions & 24 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ import (
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
"github.com/grafana/dskit/user"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/scrape"
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"

"github.com/grafana/mimir/pkg/cardinality"
"github.com/grafana/mimir/pkg/costattribution"
ingester_client "github.com/grafana/mimir/pkg/ingester/client"
Expand All @@ -48,18 +61,6 @@ import (
"github.com/grafana/mimir/pkg/util/pool"
"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/grafana/mimir/pkg/util/validation"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/scrape"
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
)

func init() {
Expand Down Expand Up @@ -745,15 +746,14 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica
// May alter timeseries data in-place.
// The returned error may retain the series labels.

func (d *Distributor) validateSamples(tnow model.Time, ts *mimirpb.PreallocTimeseries, userID, group string) error {
func (d *Distributor) validateSamples(now model.Time, ts *mimirpb.PreallocTimeseries, userID, group string) error {
if len(ts.Samples) == 0 {
return nil
}

cat := d.costAttributionMgr.TrackerForUser(userID)

if len(ts.Samples) == 1 {
return validateSample(d.sampleValidationMetrics, tnow, d.limits, userID, group, ts.Labels, ts.Samples[0], cat)
return validateSample(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, ts.Samples[0], cat)
}

timestamps := make(map[int64]struct{}, min(len(ts.Samples), 100))
Expand All @@ -767,7 +767,7 @@ func (d *Distributor) validateSamples(tnow model.Time, ts *mimirpb.PreallocTimes
}

timestamps[s.TimestampMs] = struct{}{}
if err := validateSample(d.sampleValidationMetrics, tnow, d.limits, userID, group, ts.Labels, s, cat); err != nil {
if err := validateSample(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, s, cat); err != nil {
return err
}

Expand All @@ -787,14 +787,14 @@ func (d *Distributor) validateSamples(tnow model.Time, ts *mimirpb.PreallocTimes
// Returns an error explaining the first validation finding.
// May alter timeseries data in-place.
// The returned error may retain the series labels.
func (d *Distributor) validateHistograms(tnow model.Time, ts *mimirpb.PreallocTimeseries, userID, group string) error {
func (d *Distributor) validateHistograms(now model.Time, ts *mimirpb.PreallocTimeseries, userID, group string) error {
if len(ts.Histograms) == 0 {
return nil
}

cat := d.costAttributionMgr.TrackerForUser(userID)
if len(ts.Histograms) == 1 {
updated, err := validateSampleHistogram(d.sampleValidationMetrics, tnow, d.limits, userID, group, ts.Labels, &ts.Histograms[0], cat)
updated, err := validateSampleHistogram(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, &ts.Histograms[0], cat)
if err != nil {
return err
}
Expand All @@ -807,7 +807,6 @@ func (d *Distributor) validateHistograms(tnow model.Time, ts *mimirpb.PreallocTi
timestamps := make(map[int64]struct{}, min(len(ts.Histograms), 100))
currPos := 0
histogramsUpdated := false

for idx := range ts.Histograms {
if _, ok := timestamps[ts.Histograms[idx].Timestamp]; ok {
// A sample with the same timestamp has already been validated, so we skip it.
Expand All @@ -816,7 +815,7 @@ func (d *Distributor) validateHistograms(tnow model.Time, ts *mimirpb.PreallocTi
}

timestamps[ts.Histograms[idx].Timestamp] = struct{}{}
updated, err := validateSampleHistogram(d.sampleValidationMetrics, tnow, d.limits, userID, group, ts.Labels, &ts.Histograms[idx], cat)
updated, err := validateSampleHistogram(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, &ts.Histograms[idx], cat)
if err != nil {
return err
}
Expand Down Expand Up @@ -884,6 +883,7 @@ func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeser
if err := validateLabels(d.sampleValidationMetrics, d.limits, userID, group, ts.Labels, skipLabelValidation, skipLabelCountValidation, cat, nowt); err != nil {
return true, err
}

now := model.TimeFromUnixNano(nowt.UnixNano())
totalSamplesAndHistograms := len(ts.Samples) + len(ts.Histograms)

Expand Down Expand Up @@ -973,8 +973,8 @@ func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc {
}

numSamples := 0
tnow := time.Now()
group := d.activeGroups.UpdateActiveGroupTimestamp(userID, validation.GroupLabel(d.limits, userID, req.Timeseries), tnow)
now := time.Now()
group := d.activeGroups.UpdateActiveGroupTimestamp(userID, validation.GroupLabel(d.limits, userID, req.Timeseries), now)
for _, ts := range req.Timeseries {
numSamples += len(ts.Samples) + len(ts.Histograms)
}
Expand All @@ -988,7 +988,7 @@ func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc {

if errors.As(err, &tooManyClustersError{}) {
d.discardedSamplesTooManyHaClusters.WithLabelValues(userID, group).Add(float64(numSamples))
d.costAttributionMgr.TrackerForUser(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(req.Timeseries[0].Labels), float64(numSamples), reasonTooManyHAClusters, tnow)
d.costAttributionMgr.TrackerForUser(userID).IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(req.Timeseries[0].Labels), float64(numSamples), reasonTooManyHAClusters, now)
}

return err
Expand Down Expand Up @@ -1829,7 +1829,6 @@ func tokenForMetadata(userID string, metricName string) uint32 {

func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string) {
var receivedSamples, receivedExemplars, receivedMetadata int

for _, ts := range req.Timeseries {
receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms)
receivedExemplars += len(ts.TimeSeries.Exemplars)
Expand Down
Loading

0 comments on commit 077a94a

Please sign in to comment.