-
Notifications
You must be signed in to change notification settings - Fork 543
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MVP: Cost attribution #10269
base: main
Are you sure you want to change the base?
MVP: Cost attribution #10269
Changes from 28 commits
e315ebb
f04c28f
2c422d1
d2eab6b
1f39282
9b4337d
1a523e1
f10f787
cc0e939
c020be0
71e4666
9dd101b
7d4ea9a
6754666
fffc5b3
2cf8c3e
f994034
b060c09
e35a8d9
5cc0b5d
389dff0
116a69e
9c30445
88ef49e
130636a
b701ba7
dccd9c8
eebd028
8386503
d8f1e9b
b9efb94
a37e6de
211b3a2
f697e6f
fe8a1e5
8b5836f
888d8b0
b15b487
87209d6
4706bde
1ab1f00
8111b6c
17b64a9
a191044
2bb1845
ddd507d
b27e379
a79fac7
37901b7
f7115f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
// SPDX-License-Identifier: AGPL-3.0-only | ||
|
||
package costattribution | ||
|
||
import ( | ||
"context" | ||
"sort" | ||
"sync" | ||
"time" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/grafana/dskit/services" | ||
"github.com/prometheus/client_golang/prometheus" | ||
|
||
"github.com/grafana/mimir/pkg/util/validation" | ||
) | ||
|
||
const ( | ||
trackerLabel = "tracker" | ||
tenantLabel = "tenant" | ||
defaultTrackerName = "cost-attribution" | ||
missingValue = "__missing__" | ||
overflowValue = "__overflow__" | ||
) | ||
|
||
type Manager struct { | ||
services.Service | ||
logger log.Logger | ||
inactiveTimeout time.Duration | ||
limits *validation.Overrides | ||
|
||
mtx sync.RWMutex | ||
trackersByUserID map[string]*Tracker | ||
reg *prometheus.Registry | ||
cleanupInterval time.Duration | ||
metricsExportInterval time.Duration | ||
} | ||
|
||
func NewManager(cleanupInterval, exportInterval, inactiveTimeout time.Duration, logger log.Logger, limits *validation.Overrides, reg *prometheus.Registry) (*Manager, error) { | ||
m := &Manager{ | ||
trackersByUserID: make(map[string]*Tracker), | ||
limits: limits, | ||
mtx: sync.RWMutex{}, | ||
inactiveTimeout: inactiveTimeout, | ||
logger: logger, | ||
reg: reg, | ||
cleanupInterval: cleanupInterval, | ||
metricsExportInterval: exportInterval, | ||
} | ||
|
||
m.Service = services.NewTimerService(cleanupInterval, nil, m.iteration, nil).WithName("cost attribution manager") | ||
if err := reg.Register(m); err != nil { | ||
return nil, err | ||
} | ||
return m, nil | ||
} | ||
|
||
func (m *Manager) iteration(_ context.Context) error { | ||
return m.purgeInactiveAttributionsUntil(time.Now().Add(-m.inactiveTimeout).Unix()) | ||
} | ||
|
||
func (m *Manager) EnabledForUser(userID string) bool { | ||
if m == nil { | ||
return false | ||
} | ||
return len(m.limits.CostAttributionLabels(userID)) > 0 | ||
} | ||
|
||
func (m *Manager) Tracker(userID string) *Tracker { | ||
if !m.EnabledForUser(userID) { | ||
return nil | ||
} | ||
|
||
// Check if the tracker already exists, if exists return it. Otherwise lock and create a new tracker. | ||
m.mtx.RLock() | ||
tracker, exists := m.trackersByUserID[userID] | ||
m.mtx.RUnlock() | ||
if exists { | ||
return tracker | ||
} | ||
|
||
m.mtx.Lock() | ||
defer m.mtx.Unlock() | ||
if tracker, exists = m.trackersByUserID[userID]; exists { | ||
return tracker | ||
} | ||
tracker = newTracker(userID, m.limits.CostAttributionLabels(userID), m.limits.MaxCostAttributionCardinalityPerUser(userID), m.limits.CostAttributionCooldown(userID), m.logger) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit, but I would collect all the information needed to build a tracker (all the |
||
m.trackersByUserID[userID] = tracker | ||
return tracker | ||
} | ||
|
||
func (m *Manager) Collect(out chan<- prometheus.Metric) { | ||
m.mtx.RLock() | ||
defer m.mtx.RUnlock() | ||
for _, tracker := range m.trackersByUserID { | ||
tracker.Collect(out) | ||
} | ||
} | ||
|
||
func (m *Manager) Describe(chan<- *prometheus.Desc) { | ||
colega marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Describe is not implemented because the metrics include dynamic labels. The Manager functions as an unchecked exporter. | ||
// For more details, refer to the documentation: https://pkg.go.dev/github.com/prometheus/client_golang/prometheus#hdr-Custom_Collectors_and_constant_Metrics | ||
} | ||
|
||
func (m *Manager) deleteTracker(userID string) { | ||
m.mtx.Lock() | ||
defer m.mtx.Unlock() | ||
delete(m.trackersByUserID, userID) | ||
} | ||
|
||
func (m *Manager) updateTracker(userID string) *Tracker { | ||
t := m.Tracker(userID) | ||
|
||
if t == nil { | ||
m.deleteTracker(userID) | ||
return nil | ||
} | ||
|
||
newTrackedLabels := m.limits.CostAttributionLabels(userID) | ||
|
||
// sort the labels to ensure the order is consistent | ||
sort.Slice(newTrackedLabels, func(i, j int) bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it safe to modify the slice returned by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. modify the copy instead, addressed in 888d8b0 |
||
return newTrackedLabels[i] < newTrackedLabels[j] | ||
}) | ||
ying-jeanne marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if !t.hasSameLabels(newTrackedLabels) { | ||
m.mtx.Lock() | ||
t = newTracker(userID, newTrackedLabels, m.limits.MaxCostAttributionCardinalityPerUser(userID), m.limits.CostAttributionCooldown(userID), m.logger) | ||
m.trackersByUserID[userID] = t | ||
m.mtx.Unlock() | ||
return t | ||
} | ||
|
||
maxCardinality := m.limits.MaxCostAttributionCardinalityPerUser(userID) | ||
if t.maxCardinality != maxCardinality { | ||
t.maxCardinality = maxCardinality | ||
} | ||
|
||
cooldown := int64(m.limits.CostAttributionCooldown(userID).Seconds()) | ||
if cooldown != t.cooldownDuration { | ||
t.cooldownDuration = cooldown | ||
} | ||
return t | ||
} | ||
|
||
func (m *Manager) purgeInactiveAttributionsUntil(deadline int64) error { | ||
m.mtx.RLock() | ||
userIDs := make([]string, 0, len(m.trackersByUserID)) | ||
for userID := range m.trackersByUserID { | ||
userIDs = append(userIDs, userID) | ||
} | ||
m.mtx.RUnlock() | ||
|
||
for _, userID := range userIDs { | ||
t := m.updateTracker(userID) | ||
if t == nil { | ||
continue | ||
} | ||
|
||
invalidKeys := t.inactiveObservations(deadline) | ||
for _, key := range invalidKeys { | ||
t.cleanupTrackerAttribution(key) | ||
} | ||
|
||
if t.shouldDelete(deadline) { | ||
m.deleteTracker(userID) | ||
ying-jeanne marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this doesn't need to be exported.