Skip to content

Commit

Permalink
Poc: cost attribution proposal 2
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Dec 17, 2024
1 parent 3af22a8 commit e315ebb
Show file tree
Hide file tree
Showing 31 changed files with 1,670 additions and 347 deletions.
66 changes: 66 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -4368,6 +4368,50 @@
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "cost_attribution_labels",
"required": false,
"desc": "Defines labels for cost attribution, applied to metrics like cortex_distributor_attributed_received_samples_total. Set to an empty string to disable. Example: 'team,service' will produce metrics such as cortex_distributor_attributed_received_samples_total{team='frontend', service='api'}.",
"fieldValue": null,
"fieldDefaultValue": "",
"fieldFlag": "validation.cost-attribution-labels",
"fieldType": "string",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_cost_attribution_labels_per_user",
"required": false,
"desc": "Maximum number of cost attribution labels allowed per user.",
"fieldValue": null,
"fieldDefaultValue": 2,
"fieldFlag": "validation.max-cost-attribution-labels-per-user",
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_cost_attribution_cardinality_per_user",
"required": false,
"desc": "Maximum cardinality of cost attribution labels allowed per user.",
"fieldValue": null,
"fieldDefaultValue": 10000,
"fieldFlag": "validation.max-cost-attribution-cardinality-per-user",
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "cost_attribution_cooldown",
"required": false,
"desc": "Cooldown period for cost attribution labels. Specifies the duration the cost attribution remains in overflow before attempting a reset. If the cardinality remains above the limit after this period, the system will stay in overflow mode and extend the cooldown. Setting this value to 0 disables the cooldown, causing the system to continuously check whether the cardinality has dropped below the limit. A reset will occur once the cardinality falls below the limit.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "validation.cost-attribution-cooldown",
"fieldType": "duration",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "ruler_evaluation_delay_duration",
Expand Down Expand Up @@ -19639,6 +19683,28 @@
"fieldFlag": "timeseries-unmarshal-caching-optimization-enabled",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "cost_attribution_eviction_interval",
"required": false,
"desc": "Time interval at which inactive cost attributions are evicted from the counter, ensuring they are not included in the cost attribution cardinality per user limit.",
"fieldValue": null,
"fieldDefaultValue": 1200000000000,
"fieldFlag": "cost-attribution.eviction-interval",
"fieldType": "duration",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "cost_attribution_registry_path",
"required": false,
"desc": "Defines a custom path for the registry. When specified, Mimir will expose cost attribution metrics through this custom path, if not specified, cost attribution metrics won't be exposed.",
"fieldValue": null,
"fieldDefaultValue": "",
"fieldFlag": "cost-attribution.registry-path",
"fieldType": "string",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
12 changes: 12 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,10 @@ Usage of ./cmd/mimir/mimir:
Expands ${var} or $var in config according to the values of the environment variables.
-config.file value
Configuration file to load.
-cost-attribution.eviction-interval duration
[experimental] Time interval at which inactive cost attributions are evicted from the counter, ensuring they are not included in the cost attribution cardinality per user limit. (default 20m0s)
-cost-attribution.registry-path string
[experimental] Defines a custom path for the registry. When specified, Mimir will expose cost attribution metrics through this custom path, if not specified, cost attribution metrics won't be exposed.
-debug.block-profile-rate int
Fraction of goroutine blocking events that are reported in the blocking profile. 1 to include every blocking event in the profile, 0 to disable.
-debug.mutex-profile-fraction int
Expand Down Expand Up @@ -3317,10 +3321,18 @@ Usage of ./cmd/mimir/mimir:
Enable anonymous usage reporting. (default true)
-usage-stats.installation-mode string
Installation mode. Supported values: custom, helm, jsonnet. (default "custom")
-validation.cost-attribution-cooldown duration
[experimental] Cooldown period for cost attribution labels. Specifies the duration the cost attribution remains in overflow before attempting a reset. If the cardinality remains above the limit after this period, the system will stay in overflow mode and extend the cooldown. Setting this value to 0 disables the cooldown, causing the system to continuously check whether the cardinality has dropped below the limit. A reset will occur once the cardinality falls below the limit.
-validation.cost-attribution-labels comma-separated-list-of-strings
[experimental] Defines labels for cost attribution, applied to metrics like cortex_distributor_attributed_received_samples_total. Set to an empty string to disable. Example: 'team,service' will produce metrics such as cortex_distributor_attributed_received_samples_total{team='frontend', service='api'}.
-validation.create-grace-period duration
Controls how far into the future incoming samples and exemplars are accepted compared to the wall clock. Any sample or exemplar will be rejected if its timestamp is greater than '(now + creation_grace_period)'. This configuration is enforced in the distributor and ingester. (default 10m)
-validation.enforce-metadata-metric-name
Enforce every metadata has a metric name. (default true)
-validation.max-cost-attribution-cardinality-per-user int
[experimental] Maximum cardinality of cost attribution labels allowed per user. (default 10000)
-validation.max-cost-attribution-labels-per-user int
[experimental] Maximum number of cost attribution labels allowed per user. (default 2)
-validation.max-label-names-per-info-series int
Maximum number of label names per info series. Has no effect if less than the value of the maximum number of label names per series option (-validation.max-label-names-per-series) (default 80)
-validation.max-label-names-per-series int
Expand Down
9 changes: 8 additions & 1 deletion development/mimir-microservices-mode/config/mimir.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
multitenancy_enabled: false
cost_attribution_registry_path: "/usage-metrics"
cost_attribution_eviction_interval: 10m

distributor:
ha_tracker:
Expand Down Expand Up @@ -184,5 +186,10 @@ limits:
ha_replica_label: ha_replica
ha_max_clusters: 10

cost_attribution_labels: "container"
max_cost_attribution_labels_per_user: 2
max_cost_attribution_cardinality_per_user: 100
cost_attribution_cooldown: 20m

runtime_config:
file: ./config/runtime.yaml
file: ./config/runtime.yaml
6 changes: 6 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/server"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/grafana/mimir/pkg/alertmanager"
"github.com/grafana/mimir/pkg/alertmanager/alertmanagerpb"
Expand Down Expand Up @@ -280,6 +281,11 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, true, "GET")
}

// RegisterCostAttribution registers a Prometheus HTTP handler for the cost attribution metrics.
func (a *API) RegisterCostAttribution(customRegistryPath string, reg *prometheus.Registry) {
a.RegisterRoute(customRegistryPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), false, false, "GET")
}

// Ingester is defined as an interface to allow for alternative implementations
// of ingesters to be passed into the API.RegisterIngester() method.
type Ingester interface {
Expand Down
2 changes: 1 addition & 1 deletion pkg/blockbuilder/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type TSDBBuilder struct {
var softErrProcessor = mimir_storage.NewSoftAppendErrorProcessor(
func() {}, func(int64, []mimirpb.LabelAdapter) {}, func(int64, []mimirpb.LabelAdapter) {},
func(int64, []mimirpb.LabelAdapter) {}, func(int64, []mimirpb.LabelAdapter) {}, func(int64, []mimirpb.LabelAdapter) {},
func() {}, func([]mimirpb.LabelAdapter) {}, func(error, int64, []mimirpb.LabelAdapter) {},
func([]mimirpb.LabelAdapter) {}, func([]mimirpb.LabelAdapter) {}, func(error, int64, []mimirpb.LabelAdapter) {},
func(error, int64, []mimirpb.LabelAdapter) {}, func(error, int64, []mimirpb.LabelAdapter) {}, func(error, int64, []mimirpb.LabelAdapter) {},
func(error, int64, []mimirpb.LabelAdapter) {}, func(error, int64, []mimirpb.LabelAdapter) {},
)
Expand Down
173 changes: 173 additions & 0 deletions pkg/costattribution/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// SPDX-License-Identifier: AGPL-3.0-only

package costattribution

import (
"context"
"sort"
"sync"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/mimir/pkg/util/validation"
)

const (
TrackerLabel = "tracker"
TenantLabel = "tenant"
defaultTrackerName = "cost-attribution"
missingValue = "__missing__"
overflowValue = "__overflow__"
)

type Manager struct {
services.Service
logger log.Logger
inactiveTimeout time.Duration
limits *validation.Overrides

mtx sync.RWMutex
trackersByUserID map[string]*Tracker
reg *prometheus.Registry
cleanupInterval time.Duration
metricsExportInterval time.Duration
}

func NewManager(cleanupInterval, exportInterval, inactiveTimeout time.Duration, logger log.Logger, limits *validation.Overrides, reg *prometheus.Registry) (*Manager, error) {
m := &Manager{
trackersByUserID: make(map[string]*Tracker),
limits: limits,
mtx: sync.RWMutex{},
inactiveTimeout: inactiveTimeout,
logger: logger,
reg: reg,
cleanupInterval: cleanupInterval,
metricsExportInterval: exportInterval,
}

m.Service = services.NewBasicService(nil, m.running, nil).WithName("cost attribution manager")
if err := reg.Register(m); err != nil {
return nil, err
}
return m, nil
}

func (m *Manager) running(ctx context.Context) error {
t := time.NewTicker(m.cleanupInterval)
defer t.Stop()

for {
select {
case <-t.C:
if err := m.purgeInactiveAttributionsUntil(time.Now().Add(-m.inactiveTimeout).Unix()); err != nil {
return err
}
case <-ctx.Done():
return nil
}
}
}

func (m *Manager) EnabledForUser(userID string) bool {
if m == nil {
return false
}
return len(m.limits.CostAttributionLabels(userID)) > 0
}

func (m *Manager) TrackerForUser(userID string) *Tracker {
if !m.EnabledForUser(userID) {
return nil
}

m.mtx.Lock()
defer m.mtx.Unlock()

if tracker, exists := m.trackersByUserID[userID]; exists {
return tracker
}

tracker := newTracker(userID, m.limits.CostAttributionLabels(userID), m.limits.MaxCostAttributionCardinalityPerUser(userID), m.limits.CostAttributionCooldown(userID), m.logger)
m.trackersByUserID[userID] = tracker
return tracker
}

func (m *Manager) Collect(out chan<- prometheus.Metric) {
m.mtx.RLock()
defer m.mtx.RUnlock()
for _, tracker := range m.trackersByUserID {
tracker.Collect(out)
}
}

func (m *Manager) Describe(chan<- *prometheus.Desc) {
}

func (m *Manager) deleteUserTracker(userID string) {
m.mtx.Lock()
defer m.mtx.Unlock()
delete(m.trackersByUserID, userID)
}

func (m *Manager) purgeInactiveAttributionsUntil(deadline int64) error {
m.mtx.RLock()
userIDs := make([]string, 0, len(m.trackersByUserID))
for userID := range m.trackersByUserID {
userIDs = append(userIDs, userID)
}
m.mtx.RUnlock()

for _, userID := range userIDs {
if !m.EnabledForUser(userID) {
m.deleteUserTracker(userID)
continue
}

invalidKeys := m.inactiveObservationsForUser(userID, deadline)
cat := m.TrackerForUser(userID)
for _, key := range invalidKeys {
cat.cleanupTrackerAttribution(key)
}

if cat != nil && cat.cooldownUntil != nil && cat.cooldownUntil.Load() < deadline {
if len(cat.observed) <= cat.MaxCardinality() {
cat.state = OverflowComplete
m.deleteUserTracker(userID)
} else {
cat.cooldownUntil.Store(deadline + cat.cooldownDuration)
}
}
}
return nil
}

func (m *Manager) inactiveObservationsForUser(userID string, deadline int64) []string {
cat := m.TrackerForUser(userID)
newTrackedLabels := m.limits.CostAttributionLabels(userID)
sort.Slice(newTrackedLabels, func(i, j int) bool {
return newTrackedLabels[i] < newTrackedLabels[j]
})

if !cat.CompareCALabels(newTrackedLabels) {
m.mtx.Lock()
cat = newTracker(userID, newTrackedLabels, m.limits.MaxCostAttributionCardinalityPerUser(userID), m.limits.CostAttributionCooldown(userID), m.logger)
m.trackersByUserID[userID] = cat
m.mtx.Unlock()
return nil
} else {
maxCardinality := m.limits.MaxCostAttributionCardinalityPerUser(userID)
if cat.MaxCardinality() != maxCardinality {
cat.UpdateMaxCardinality(maxCardinality)
}

cooldown := int64(m.limits.CostAttributionCooldown(userID).Seconds())
if cooldown != cat.CooldownDuration() {
cat.UpdateCooldownDuration(cooldown)
}
}

return cat.InactiveObservations(deadline)
}
Loading

0 comments on commit e315ebb

Please sign in to comment.