-
Notifications
You must be signed in to change notification settings - Fork 561
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MQE Absent function #10523
base: main
Are you sure you want to change the base?
MQE Absent function #10523
Changes from all commits
af60627
b130e6c
6e6a81a
eab4a79
f47d3ba
636af41
f31b7a1
29cd9f6
ebd3aa6
f179568
9245904
0cc993b
90982a4
fb168ea
32e4bbf
8f7e445
d1f95ac
fdfa4ba
732b24c
ec4175f
607c848
9af057a
a32b7b3
4b7839f
eb544c1
600248e
3765fcf
08a3a4a
985c08c
8850002
b931e2f
b4d60b3
37b88ea
da3e14c
ac1f1c9
c9de184
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
// SPDX-License-Identifier: AGPL-3.0-only | ||
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/functions.go | ||
// Provenance-includes-license: Apache-2.0 | ||
// Provenance-includes-copyright: The Prometheus Authors. | ||
|
||
package functions | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
|
||
"github.com/prometheus/prometheus/model/labels" | ||
"github.com/prometheus/prometheus/promql" | ||
"github.com/prometheus/prometheus/promql/parser" | ||
"github.com/prometheus/prometheus/promql/parser/posrange" | ||
|
||
"github.com/grafana/mimir/pkg/streamingpromql/limiting" | ||
"github.com/grafana/mimir/pkg/streamingpromql/types" | ||
) | ||
|
||
// Absent is an operator that implements the absent() function. | ||
type Absent struct { | ||
timeRange types.QueryTimeRange | ||
argExpressions parser.Expr | ||
inner types.InstantVectorOperator | ||
expressionPosition posrange.PositionRange | ||
memoryConsumptionTracker *limiting.MemoryConsumptionTracker | ||
presence []bool | ||
} | ||
|
||
var _ types.InstantVectorOperator = &Absent{} | ||
|
||
// NewAbsent creates a new Absent. | ||
func NewAbsent(inner types.InstantVectorOperator, innerExpr parser.Expr, timeRange types.QueryTimeRange, expressionPosition posrange.PositionRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) *Absent { | ||
return &Absent{ | ||
timeRange: timeRange, | ||
inner: inner, | ||
argExpressions: innerExpr, | ||
expressionPosition: expressionPosition, | ||
memoryConsumptionTracker: memoryConsumptionTracker, | ||
} | ||
} | ||
|
||
func (a *Absent) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) { | ||
innerMetadata, err := a.inner.SeriesMetadata(ctx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer types.PutSeriesMetadataSlice(innerMetadata) | ||
|
||
if a.presence == nil { | ||
a.presence, err = types.BoolSlicePool.Get(a.timeRange.StepCount, a.memoryConsumptionTracker) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// Initialize presence slice | ||
for range a.timeRange.StepCount { | ||
a.presence = append(a.presence, false) | ||
} | ||
Comment on lines
+58
to
+60
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not necessary: we should be able to reslice The pool guarantees that all values in the slice up to the capacity requested are already |
||
} | ||
|
||
metadata := types.GetSeriesMetadataSlice(1) | ||
metadata = append(metadata, types.SeriesMetadata{ | ||
Labels: createLabelsForAbsentFunction(a.argExpressions), | ||
}) | ||
|
||
for range innerMetadata { | ||
series, err := a.inner.NextSeries(ctx) | ||
if err != nil && errors.Is(err, types.EOS) { | ||
return metadata, err | ||
} | ||
Comment on lines
+70
to
+72
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why only return if the error is an |
||
defer types.PutInstantVectorSeriesData(series, a.memoryConsumptionTracker) | ||
|
||
for step := range a.timeRange.StepCount { | ||
t := a.timeRange.IndexTime(int64(step)) | ||
for _, s := range series.Floats { | ||
if t == s.T { | ||
a.presence[step] = true | ||
} | ||
} | ||
for _, s := range series.Histograms { | ||
if t == s.T { | ||
a.presence[step] = true | ||
} | ||
} | ||
} | ||
Comment on lines
+75
to
+87
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems very inefficient - for every time step, we'll iterate through the floats and histograms slices. What if iterated through |
||
|
||
} | ||
|
||
return metadata, nil | ||
} | ||
|
||
func (a *Absent) NextSeries(_ context.Context) (types.InstantVectorSeriesData, error) { | ||
output := types.InstantVectorSeriesData{} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to have an EOS error if this has been called already? |
||
|
||
var err error | ||
output.Floats, err = types.FPointSlicePool.Get(a.timeRange.StepCount, a.memoryConsumptionTracker) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit] We could defer allocating this until we know we're returning at least one point in the |
||
if err != nil { | ||
return output, err | ||
} | ||
|
||
for step := range a.timeRange.StepCount { | ||
t := a.timeRange.IndexTime(int64(step)) | ||
if !a.presence[step] { | ||
output.Floats = append(output.Floats, promql.FPoint{T: t, F: 1}) | ||
} | ||
} | ||
return output, nil | ||
} | ||
|
||
func (a *Absent) ExpressionPosition() posrange.PositionRange { | ||
return a.expressionPosition | ||
} | ||
|
||
func (a *Absent) Close() { | ||
a.inner.Close() | ||
types.BoolSlicePool.Put(a.presence, a.memoryConsumptionTracker) | ||
} | ||
|
||
// createLabelsForAbsentFunction returns the labels that are uniquely and exactly matched | ||
// in a given expression. It is used in the absent functions. | ||
// This function is copied from Prometheus | ||
func createLabelsForAbsentFunction(expr parser.Expr) labels.Labels { | ||
b := labels.NewBuilder(labels.EmptyLabels()) | ||
|
||
var lm []*labels.Matcher | ||
switch n := expr.(type) { | ||
case *parser.VectorSelector: | ||
lm = n.LabelMatchers | ||
case *parser.MatrixSelector: | ||
lm = n.VectorSelector.(*parser.VectorSelector).LabelMatchers | ||
default: | ||
return labels.EmptyLabels() | ||
} | ||
|
||
// The 'has' map implements backwards-compatibility for historic behaviour: | ||
// e.g. in `absent(x{job="a",job="b",foo="bar"})` then `job` is removed from the output. | ||
// Note this gives arguably wrong behaviour for `absent(x{job="a",job="a",foo="bar"})`. | ||
has := make(map[string]bool, len(lm)) | ||
for _, ma := range lm { | ||
if ma.Name == labels.MetricName { | ||
continue | ||
} | ||
if ma.Type == labels.MatchEqual && !has[ma.Name] { | ||
b.Set(ma.Name, ma.Value) | ||
has[ma.Name] = true | ||
} else { | ||
b.Del(ma.Name) | ||
} | ||
} | ||
|
||
return b.Labels() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this check is needed. Won't this always be nil?