-
Notifications
You must be signed in to change notification settings - Fork 543
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MVP: Cost attribution #10269
base: main
Are you sure you want to change the base?
MVP: Cost attribution #10269
Conversation
5165a5b
to
6f36b5f
Compare
6f36b5f
to
077a94a
Compare
077a94a
to
f04c28f
Compare
@@ -502,6 +525,18 @@ func (s *seriesStripe) remove(ref storage.SeriesRef) { | |||
} | |||
|
|||
s.active-- | |||
if s.cat != nil { | |||
if idx == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, we should assume this isn't nil. Just skipping the removal will break the numbers forever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
vendor and update in commit 4706bde
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the active series tracker tests with the costattribution.Tracker, otherwise the new code isn't tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed in 17b64a9
pkg/ingester/ingester.go
Outdated
idx, err := db.Head().Index() | ||
if err != nil { | ||
level.Warn(i.logger).Log("msg", "failed to get the index of the TSDB head", "user", userID, "err", err) | ||
idx = nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As commented previously, we should never proceed without an index.
If you check the implementation of db.Head().Index()
it never returns an error. We have three options here:
- Skip tenants if they don't have index: this is the least effort one.
- Panic if err is not nil, this is ugly
- Update mimir-prometheus to add a
MustIndex() IndexReader
method that does not return an error, and use that one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR in mimir-prometheus grafana/mimir-prometheus#811
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
vendor and update in commit 4706bde
pkg/mimir/modules.go
Outdated
if t.Cfg.CostAttributionRegistryPath != "" { | ||
reg := prometheus.NewRegistry() | ||
var err error | ||
t.CostAttributionManager, err = costattribution.NewManager(3*time.Minute, time.Minute, t.Cfg.CostAttributionEvictionInterval, util_log.Logger, t.Overrides, reg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these values should not be hardcoded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed unused parameter b27e379
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the docs! I left a few suggestions.
pkg/costattribution/tracker.go
Outdated
} | ||
} | ||
|
||
func (t *Tracker) shouldDelete(deadline int64) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rename this method to "recovered from overflow". The fact that it should be deleted is a decision of the Manager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed in 1ab1f00
pkg/costattribution/tracker.go
Outdated
} | ||
|
||
func (t *Tracker) shouldDelete(deadline int64) bool { | ||
if t.cooldownUntil != nil && t.cooldownUntil.Load() < deadline { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can't check whether t.cooldownUntil
without holding the mutex here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed in fe8a1e5
pkg/costattribution/manager.go
Outdated
newTrackedLabels := m.limits.CostAttributionLabels(userID) | ||
|
||
// sort the labels to ensure the order is consistent | ||
sort.Slice(newTrackedLabels, func(i, j int) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it safe to modify the slice returned by m.limits.CostAttributionLabels(userID)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
modify the copy instead, addressed in 888d8b0
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20241219104229-b50052711673 | ||
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20241224134504-460b7be5bce8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should not update mimir-prometheus here. It should be updated in main
, and then you merge main
into your branch.
return m.purgeInactiveAttributionsUntil(time.Now().Add(-m.inactiveTimeout).Unix()) | ||
} | ||
|
||
func (m *Manager) EnabledForUser(userID string) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this doesn't need to be exported.
if tracker, exists = m.trackersByUserID[userID]; exists { | ||
return tracker | ||
} | ||
tracker = newTracker(userID, m.limits.CostAttributionLabels(userID), m.limits.MaxCostAttributionCardinalityPerUser(userID), m.limits.CostAttributionCooldown(userID), m.logger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, but I would collect all the information needed to build a tracker (all the m.limits.*
calls before we take m.mtx.Lock
, to spend less time locked.
lbls := m.limits.CostAttributionLabels(userID) | ||
|
||
newTrackedLabels := make([]string, len(lbls)) | ||
copy(newTrackedLabels, lbls) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't call these newTrackedLabels
unless we know they're different from the previous ones. I'd just stick to lbls
for the entire method.
Also, you can leverage slices.Clone
to make it shorter.
lbls := m.limits.CostAttributionLabels(userID) | |
newTrackedLabels := make([]string, len(lbls)) | |
copy(newTrackedLabels, lbls) | |
lbls := slices.Clone(m.limits.CostAttributionLabels(userID)) |
func (t *Tracker) createNewObservation(key []byte, ts int64, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement float64, reason *string) { | ||
t.observed[string(key)] = &observation{ | ||
lastUpdate: atomic.NewInt64(ts), | ||
activeSerie: *atomic.NewFloat64(activeSeriesIncrement), | ||
receivedSample: *atomic.NewFloat64(receivedSampleIncrement), | ||
discardedSample: map[string]atomic.Float64{}, | ||
discardedSampleMtx: sync.Mutex{}, | ||
} | ||
if discardedSampleIncrement > 0 && reason != nil { | ||
t.observed[string(key)].discardedSampleMtx.Lock() | ||
t.observed[string(key)].discardedSample[*reason] = *atomic.NewFloat64(discardedSampleIncrement) | ||
t.observed[string(key)].discardedSampleMtx.Unlock() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're creating a new observation here, I would pass a key string
here, allocated once.
I'm not sure if this method isn't allocating 4 strings here (not a big deal, since it's not a hot path, but still..)
|
||
// updateObservations updates or creates a new observation in the 'observed' map. | ||
func (t *Tracker) updateObservations(key []byte, ts int64, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement float64, reason *string, createIfDoesNotExist bool) { | ||
if o, known := t.observed[string(key)]; known && o.lastUpdate != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can o.lastUpdate
be nil?
} else if len(t.observed) < t.maxCardinality*2 && createIfDoesNotExist { | ||
// When createIfDoesNotExist is false, it means that the method is called from DecrementActiveSeries, when key doesn't exist we should ignore the call | ||
// Otherwise create a new observation for the key | ||
t.createNewObservation(key, ts, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement, reason) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would move this to the beginning of the method to un-indent the most common path.
i.e.
o, known := t.observed[string(key)]
if !known {
if len(t.observed) < t.maxCardinality*2 && createIfDoesNotExist {
// When createIfDoesNotExist is false, it means that the method is called from DecrementActiveSeries, when key doesn't exist we should ignore the call
// Otherwise create a new observation for the key
t.createNewObservation(key, ts, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement, reason)
}
return
}
// ...
func (t *Tracker) IncrementActiveSeriesFailure() { | ||
if t == nil { | ||
return | ||
} | ||
t.totalFailedActiveSeries.Add(1) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still not convinced this belongs here.
What did fail here? costattribution.Tracker
didn't fail. It's the caller (activeseries.ActiveSeries
) who failed to retrieve labels for a series it should know.
So it's the caller who should register the failure.
costattribution.Tracker
wasn't even called here, so it can't fail.
for _, ts := range req.Timeseries { | ||
receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms) | ||
receivedExemplars += len(ts.TimeSeries.Exemplars) | ||
d.costAttributionMgr.Tracker(userID).IncrementReceivedSamples(ts.Labels, float64(receivedSamples), mtime.Now()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for _, ts := range req.Timeseries { | |
receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms) | |
receivedExemplars += len(ts.TimeSeries.Exemplars) | |
d.costAttributionMgr.Tracker(userID).IncrementReceivedSamples(ts.Labels, float64(receivedSamples), mtime.Now()) | |
cat := d.costAttributionMgr.Tracker(userID) | |
for _, ts := range req.Timeseries { | |
receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms) | |
receivedExemplars += len(ts.TimeSeries.Exemplars) | |
cat.IncrementReceivedSamples(ts.Labels, float64(receivedSamples), mtime.Now()) |
Co-authored-by: Oleg Zaytsev <[email protected]>
What this PR does
This is the follow up of #9733,
The PR intent to export extra attributed metrics in distributor and ingester, in order to get sample received, sample discarded and active_series attributed by cost attribution label.
Which issue(s) this PR fixes or relates to
Fixes #
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]
.about-versioning.md
updated with experimental features.