Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE Absent function #10523

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
af60627
Add UniqueSeriesLabel SeriesMetadataFunctionDefinition
lamida Jan 22, 2025
b130e6c
Implement absent function
lamida Jan 23, 2025
6e6a81a
Pass inner expr arg for absent
lamida Jan 27, 2025
eab4a79
Remove println
lamida Jan 27, 2025
f47d3ba
Simplify factory function
lamida Jan 27, 2025
636af41
Add innerExpression length check too
lamida Jan 27, 2025
f31b7a1
Add comment on copied function
lamida Jan 27, 2025
29cd9f6
Remove unused function
lamida Jan 27, 2025
ebd3aa6
Fix godoc
lamida Jan 27, 2025
f179568
Fix functions.test whitespace
lamida Jan 27, 2025
9245904
Rename constructor for consistency
lamida Jan 27, 2025
0cc993b
Move absent to function package
lamida Jan 28, 2025
90982a4
Rename AbsentOperator to just Absent
lamida Jan 28, 2025
fb168ea
Rename to argExpressions
lamida Jan 28, 2025
32e4bbf
Fix after rebase
lamida Feb 10, 2025
8f7e445
Small progress on absent
lamida Feb 11, 2025
d1f95ac
More absent changes
lamida Feb 11, 2025
fdfa4ba
Fix absent implementation
lamida Feb 13, 2025
732b24c
Add more absent tests
lamida Feb 13, 2025
ec4175f
Fix update receiver name
lamida Feb 13, 2025
607c848
Rename innerExpressions to argExpressions
lamida Feb 13, 2025
9af057a
Update more arg names
lamida Feb 13, 2025
a32b7b3
Make lint happy
lamida Feb 13, 2025
4b7839f
Add dedup and merge test
lamida Feb 13, 2025
eb544c1
Remove absent lookback test
lamida Feb 14, 2025
600248e
Change data loading to 6m
lamida Feb 14, 2025
3765fcf
Add more tests that handles multiple input series
lamida Feb 14, 2025
08a3a4a
Absent don't need dedup
lamida Feb 14, 2025
985c08c
Fix missing absent function call in test
lamida Feb 14, 2025
8850002
Show range selector result for multiseries in the test
lamida Feb 14, 2025
b931e2f
Fix wrong test
lamida Feb 14, 2025
b4d60b3
Track sample presence
lamida Feb 14, 2025
37b88ea
Fix lint
lamida Feb 14, 2025
da3e14c
Replace presence map with slice
lamida Feb 14, 2025
ac1f1c9
Use 6m step and load in our tests
lamida Feb 14, 2025
c9de184
Remove range evaluation from absent test
lamida Feb 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 33 additions & 13 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math"

"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/parser/posrange"
"github.com/prometheus/prometheus/util/annotations"

Expand All @@ -23,6 +24,7 @@ type InstantVectorFunctionOperatorFactory func(
annotations *annotations.Annotations,
expressionPosition posrange.PositionRange,
timeRange types.QueryTimeRange,
argExpressions parser.Expressions,
) (types.InstantVectorOperator, error)

type ScalarFunctionOperatorFactory func(
Expand All @@ -40,7 +42,7 @@ type ScalarFunctionOperatorFactory func(
// - name: The name of the function
// - f: The function implementation
func SingleInputVectorFunctionOperatorFactory(name string, f functions.FunctionOverInstantVectorDefinition) InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args))
Expand Down Expand Up @@ -77,13 +79,30 @@ func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataF
return SingleInputVectorFunctionOperatorFactory(name, f)
}

func AbsentFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, argExpressions parser.Expressions) (types.InstantVectorOperator, error) {
functionName := "absent"
if len(args) != 1 && len(argExpressions) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", functionName, len(args))
}
inner, ok := args[0].(types.InstantVectorOperator)
if !ok {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected an instant vector argument for %s, got %T", functionName, args[0])
}

var o types.InstantVectorOperator = functions.NewAbsent(inner, argExpressions[0], timeRange, expressionPosition, memoryConsumptionTracker)

return o, nil
}

func TimeTransformationFunctionOperatorFactory(name string, seriesDataFunc functions.InstantVectorSeriesFunction) InstantVectorFunctionOperatorFactory {
f := functions.FunctionOverInstantVectorDefinition{
SeriesDataFunc: seriesDataFunc,
SeriesMetadataFunction: functions.DropSeriesName,
}

return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
var inner types.InstantVectorOperator
if len(args) == 0 {
// if the argument is not provided, it will default to vector(time())
Expand Down Expand Up @@ -137,7 +156,7 @@ func FunctionOverRangeVectorOperatorFactory(
name string,
f functions.FunctionOverRangeVectorDefinition,
) InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args))
Expand All @@ -159,7 +178,7 @@ func FunctionOverRangeVectorOperatorFactory(
}
}

func PredictLinearFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
func PredictLinearFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
f := functions.PredictLinear

if len(args) != 2 {
Expand Down Expand Up @@ -188,7 +207,7 @@ func PredictLinearFactory(args []types.Operator, memoryConsumptionTracker *limit
return o, nil
}

func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) {
func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for vector, got %v", len(args))
Expand All @@ -203,7 +222,7 @@ func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.Mem
return scalars.NewScalarToInstantVector(inner, expressionPosition), nil
}

func LabelReplaceFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
func LabelReplaceFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 5 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 5 arguments for label_replace, got %v", len(args))
Expand Down Expand Up @@ -252,7 +271,7 @@ func LabelReplaceFunctionOperatorFactory(args []types.Operator, memoryConsumptio
return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil
}

func ClampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
func ClampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 3 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 3 arguments for clamp, got %v", len(args))
Expand Down Expand Up @@ -286,7 +305,7 @@ func ClampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracke
}

func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 2 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 2 arguments for %s, got %v", functionName, len(args))
Expand Down Expand Up @@ -314,7 +333,7 @@ func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) Instant
}
}

func RoundFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
func RoundFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 1 && len(args) != 2 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected 1 or 2 arguments for round, got %v", len(args))
Expand Down Expand Up @@ -346,7 +365,7 @@ func RoundFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracke
return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil
}

func HistogramQuantileFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
func HistogramQuantileFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 2 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 2 arguments for histogram_quantile, got %v", len(args))
Expand All @@ -368,7 +387,7 @@ func HistogramQuantileFunctionOperatorFactory(args []types.Operator, memoryConsu
return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil
}

func HistogramFractionFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
func HistogramFractionFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 3 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 3 arguments for histogram_fraction, got %v", len(args))
Expand Down Expand Up @@ -401,7 +420,7 @@ func HistogramFractionFunctionOperatorFactory(args []types.Operator, memoryConsu
return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil
}

func TimestampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
func TimestampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for timestamp, got %v", len(args))
Expand Down Expand Up @@ -432,7 +451,7 @@ func SortOperatorFactory(descending bool) InstantVectorFunctionOperatorFactory {
functionName = "sort_desc"
}

return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange, _ parser.Expressions) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", functionName, len(args))
Expand All @@ -457,6 +476,7 @@ func SortOperatorFactory(descending bool) InstantVectorFunctionOperatorFactory {
var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOperatorFactory{
// Please keep this list sorted alphabetically.
"abs": InstantVectorTransformationFunctionOperatorFactory("abs", functions.Abs),
"absent": AbsentFunctionOperatorFactory,
"acos": InstantVectorTransformationFunctionOperatorFactory("acos", functions.Acos),
"acosh": InstantVectorTransformationFunctionOperatorFactory("acosh", functions.Acosh),
"asin": InstantVectorTransformationFunctionOperatorFactory("asin", functions.Asin),
Expand Down
1 change: 1 addition & 0 deletions pkg/streamingpromql/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func TestFunctionDeduplicateAndMerge(t *testing.T) {
expressions := map[string]string{
// Please keep this list sorted alphabetically.
"abs": `abs({__name__=~"float.*"})`,
"absent": `<skip>`,
"acos": `acos({__name__=~"float.*"})`,
"acosh": `acosh({__name__=~"float.*"})`,
"asin": `asin({__name__=~"float.*"})`,
Expand Down
137 changes: 137 additions & 0 deletions pkg/streamingpromql/operators/functions/absent.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't handle the case where there are multiple inner series correctly.

I think we need to do something like this:

In SeriesMetadata:

  • call Inner.SeriesMetadata() to get the list of inner series
  • create a slice to keep track of whether we've seen a point at each time step
  • for each inner series:
    • call Inner.NextSeries() to get the data for that series
    • update the presence slice based on the series' data
  • if there is a point present at every time step, return no series
  • otherwise, if there are any points absent:
    • store the presence slice for use in NextSeries() later
    • return a single series (as it does currently)

In NextSeries:

  • if there is a point present at every time step, or if NextSeries() has been called before, return EOS
  • otherwise:
    • construct the slice of FPoints based on the presence slice created in SeriesMetadata()
    • return the FPoints

Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/functions.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors.

package functions

import (
"context"
"errors"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/parser/posrange"

"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

// Absent is an operator that implements the absent() function.
type Absent struct {
timeRange types.QueryTimeRange
argExpressions parser.Expr
inner types.InstantVectorOperator
expressionPosition posrange.PositionRange
memoryConsumptionTracker *limiting.MemoryConsumptionTracker
}

var _ types.InstantVectorOperator = &Absent{}

// NewAbsent creates a new Absent.
func NewAbsent(inner types.InstantVectorOperator, innerExpr parser.Expr, timeRange types.QueryTimeRange, expressionPosition posrange.PositionRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) *Absent {
return &Absent{
timeRange: timeRange,
inner: inner,
argExpressions: innerExpr,
expressionPosition: expressionPosition,
memoryConsumptionTracker: memoryConsumptionTracker,
}
}

func (a *Absent) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) {
innerMetadata, err := a.inner.SeriesMetadata(ctx)
if err != nil {
return nil, err
}
defer types.PutSeriesMetadataSlice(innerMetadata)

metadata := types.GetSeriesMetadataSlice(1)
metadata = append(metadata, types.SeriesMetadata{
Labels: createLabelsForAbsentFunction(a.argExpressions),
})

return metadata, nil
}

func (a *Absent) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) {
output := types.InstantVectorSeriesData{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to have an EOS error if this has been called already?


var err error
output.Floats, err = types.FPointSlicePool.Get(a.timeRange.StepCount, a.memoryConsumptionTracker)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] We could defer allocating this until we know we're returning at least one point in the if below

if err != nil {
return output, err
}

series, err := a.inner.NextSeries(ctx)
defer types.PutInstantVectorSeriesData(series, a.memoryConsumptionTracker)

for step := range a.timeRange.StepCount {
t := a.timeRange.IndexTime(int64(step))
if err != nil && errors.Is(err, types.EOS) {
output.Floats = append(output.Floats, promql.FPoint{T: t, F: 1})
} else {
found := false
for _, s := range series.Floats {
if t == s.T {
found = true
break
}
}
for _, s := range series.Histograms {
if t == s.T {
found = true
break
}
}
if !found {
output.Floats = append(output.Floats, promql.FPoint{T: t, F: 1})
}
}
}
return output, nil
}

func (a *Absent) ExpressionPosition() posrange.PositionRange {
return a.expressionPosition
}

func (a *Absent) Close() {
a.inner.Close()
}

// createLabelsForAbsentFunction returns the labels that are uniquely and exactly matched
// in a given expression. It is used in the absent functions.
// This function is copied from Prometheus
func createLabelsForAbsentFunction(expr parser.Expr) labels.Labels {
b := labels.NewBuilder(labels.EmptyLabels())

var lm []*labels.Matcher
switch n := expr.(type) {
case *parser.VectorSelector:
lm = n.LabelMatchers
case *parser.MatrixSelector:
lm = n.VectorSelector.(*parser.VectorSelector).LabelMatchers
default:
return labels.EmptyLabels()
}

// The 'has' map implements backwards-compatibility for historic behaviour:
// e.g. in `absent(x{job="a",job="b",foo="bar"})` then `job` is removed from the output.
// Note this gives arguably wrong behaviour for `absent(x{job="a",job="a",foo="bar"})`.
has := make(map[string]bool, len(lm))
for _, ma := range lm {
if ma.Name == labels.MetricName {
continue
}
if ma.Type == labels.MatchEqual && !has[ma.Name] {
b.Set(ma.Name, ma.Value)
has[ma.Name] = true
} else {
b.Del(ma.Name)
}
}

return b.Labels()
}
2 changes: 1 addition & 1 deletion pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (q *Query) convertFunctionCallToInstantVectorOperator(e *parser.Call, timeR
args[i] = a
}

return factory(args, q.memoryConsumptionTracker, q.annotations, e.PosRange, timeRange)
return factory(args, q.memoryConsumptionTracker, q.annotations, e.PosRange, timeRange, e.Args)
}

func (q *Query) convertToRangeVectorOperator(expr parser.Expr, timeRange types.QueryTimeRange) (types.RangeVectorOperator, error) {
Expand Down
Loading
Loading