Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE: add support for stddev_over_time and stdvar_over_time #10628

Merged
merged 7 commits into from
Feb 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2268,6 +2268,24 @@ func TestAnnotations(t *testing.T) {
},
}

for _, f := range []string{"min_over_time", "max_over_time", "stddev_over_time", "stdvar_over_time"} {
testCases[fmt.Sprintf("%v() over series with only floats", f)] = annotationTestCase{
data: `some_metric 1 2`,
expr: fmt.Sprintf(`%v(some_metric[1m1s])`, f),
}
testCases[fmt.Sprintf("%v() over series with only histograms", f)] = annotationTestCase{
data: `some_metric {{count:1}} {{count:2}}`,
expr: fmt.Sprintf(`%v(some_metric[1m1s])`, f),
}
testCases[fmt.Sprintf("%v() over series with both floats and histograms", f)] = annotationTestCase{
data: `some_metric 1 {{count:2}}`,
expr: fmt.Sprintf(`%v(some_metric[1m1s])`, f),
expectedInfoAnnotations: []string{
fmt.Sprintf(`PromQL info: ignored histograms in a range containing both floats and histograms for metric name "some_metric" (1:%v)`, len(f)+2),
},
}
}

runAnnotationTests(t, testCases)
}

Expand Down Expand Up @@ -2923,7 +2941,7 @@ func TestCompareVariousMixedMetricsVectorSelectors(t *testing.T) {

for _, labels := range labelCombinations {
labelRegex := strings.Join(labels, "|")
for _, function := range []string{"rate", "increase", "changes", "resets", "deriv", "irate", "idelta", "delta", "deriv"} {
for _, function := range []string{"rate", "increase", "changes", "resets", "deriv", "irate", "idelta", "delta", "deriv", "stddev_over_time", "stdvar_over_time"} {
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[45s])`, function, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[1m])`, function, labelRegex))
expressions = append(expressions, fmt.Sprintf(`sum(%s(series{label=~"(%s)"}[2m15s]))`, function, labelRegex))
Expand Down
2 changes: 2 additions & 0 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,8 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"sort": SortOperatorFactory(false),
"sort_desc": SortOperatorFactory(true),
"sqrt": InstantVectorTransformationFunctionOperatorFactory("sqrt", functions.Sqrt),
"stddev_over_time": FunctionOverRangeVectorOperatorFactory("stddev_over_time", functions.StddevOverTime),
"stdvar_over_time": FunctionOverRangeVectorOperatorFactory("stdvar_over_time", functions.StdvarOverTime),
"sum_over_time": FunctionOverRangeVectorOperatorFactory("sum_over_time", functions.SumOverTime),
"tan": InstantVectorTransformationFunctionOperatorFactory("tan", functions.Tan),
"tanh": InstantVectorTransformationFunctionOperatorFactory("tanh", functions.Tanh),
Expand Down
2 changes: 2 additions & 0 deletions pkg/streamingpromql/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ func TestFunctionDeduplicateAndMerge(t *testing.T) {
"sort": `<skip>`, // sort() and sort_desc() don't drop the metric name, so this test doesn't apply.
"sort_desc": `<skip>`, // sort() and sort_desc() don't drop the metric name, so this test doesn't apply.
"sqrt": `sqrt({__name__=~"float.*"})`,
"stddev_over_time": `stddev_over_time({__name__=~"float.*"}[1m])`,
"stdvar_over_time": `stdvar_over_time({__name__=~"float.*"}[1m])`,
"sum_over_time": `sum_over_time({__name__=~"float.*"}[1m])`,
"tan": `tan({__name__=~"float.*"})`,
"tanh": `tanh({__name__=~"float.*"})`,
Expand Down
52 changes: 52 additions & 0 deletions pkg/streamingpromql/operators/functions/range_vectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,3 +634,55 @@ func irateIdelta(isRate bool) RangeVectorStepFunction {
return resultValue, true, nil, nil
}
}

var StddevOverTime = FunctionOverRangeVectorDefinition{
SeriesMetadataFunction: DropSeriesName,
StepFunc: stddevStdvarOverTime(true),
NeedsSeriesNamesForAnnotations: true,
}

var StdvarOverTime = FunctionOverRangeVectorDefinition{
SeriesMetadataFunction: DropSeriesName,
StepFunc: stddevStdvarOverTime(false),
NeedsSeriesNamesForAnnotations: true,
}

func stddevStdvarOverTime(isStdDev bool) RangeVectorStepFunction {
return func(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
head, tail := step.Floats.UnsafePoints()

if len(head) == 0 && len(tail) == 0 {
return 0, false, nil, nil
}

if step.Histograms.Any() {
emitAnnotation(annotations.NewHistogramIgnoredInMixedRangeInfo)
}

count := 0.0
mean := 0.0
meanC := 0.0
deviation := 0.0
deviationC := 0.0

accumulate := func(points []promql.FPoint) {
for _, p := range points {
count++
delta := p.F - (mean + meanC)
mean, meanC = floats.KahanSumInc(delta/count, mean, meanC)
deviation, deviationC = floats.KahanSumInc(delta*(p.F-(mean+meanC)), deviation, deviationC)
}
}

accumulate(head)
accumulate(tail)

result := (deviation + deviationC) / count

if isStdDev {
result = math.Sqrt(result)
}

return result, true, nil, nil
}
}
30 changes: 30 additions & 0 deletions pkg/streamingpromql/testdata/ours/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -940,3 +940,33 @@ eval range from 0 to 4m step 1m sort_desc(test_metric)
test_metric{case="histogram with NaN"} {{count:0 sum:NaN}} {{count:0 sum:NaN}} {{count:0 sum:NaN}} {{count:0 sum:NaN}} {{count:0 sum:NaN}}
test_metric{case="histogram with +Inf"} {{count:0 sum:Inf}} {{count:0 sum:Inf}} {{count:0 sum:Inf}} {{count:0 sum:Inf}} {{count:0 sum:Inf}}
test_metric{case="histogram with -Inf"} {{count:0 sum:-Inf}} {{count:0 sum:-Inf}} {{count:0 sum:-Inf}} {{count:0 sum:-Inf}} {{count:0 sum:-Inf}}

clear

# Test stddev_over_time and stdvar_over_time.
load 1m
metric{case="floats"} 1 2
metric{case="NaN at end"} 1 NaN
metric{case="NaN at start"} NaN 2
metric{case="all NaN"} NaN NaN
metric{case="Inf at start"} 1 Inf
metric{case="Inf at end"} Inf 2
metric{case="all Inf"} Inf Inf

eval instant at 1m stddev_over_time(metric[1m1s])
{case="floats"} 0.5
{case="NaN at end"} NaN
{case="NaN at start"} NaN
{case="all NaN"} NaN
{case="Inf at start"} NaN
{case="Inf at end"} NaN
{case="all Inf"} NaN

eval instant at 1m stdvar_over_time(metric[1m1s])
{case="floats"} 0.25
{case="NaN at end"} NaN
{case="NaN at start"} NaN
{case="all NaN"} NaN
{case="Inf at start"} NaN
{case="Inf at end"} NaN
{case="all Inf"} NaN
45 changes: 18 additions & 27 deletions pkg/streamingpromql/testdata/upstream/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -958,47 +958,38 @@ load 10s
metric_histogram{type="only_histogram"} {{schema:1 sum:2 count:3}}x5
metric_histogram{type="mix"} 1 1 1 {{schema:1 sum:2 count:3}} {{schema:1 sum:2 count:3}}

# Unsupported by streaming engine.
# eval instant at 1m stdvar_over_time(metric[2m])
# {} 10.56
eval instant at 1m stdvar_over_time(metric[2m])
{} 10.56

# Unsupported by streaming engine.
# eval instant at 1m stddev_over_time(metric[2m])
# {} 3.249615
eval instant at 1m stddev_over_time(metric[2m])
{} 3.249615

# Unsupported by streaming engine.
# eval instant at 1m stddev_over_time((metric[2m]))
# {} 3.249615
eval instant at 1m stddev_over_time((metric[2m]))
{} 3.249615

# Tests for stddev_over_time and stdvar_over_time with histograms.
# Unsupported by streaming engine.
# eval instant at 1m stddev_over_time(metric_histogram{type="only_histogram"}[2m])
# #empty
eval instant at 1m stddev_over_time(metric_histogram{type="only_histogram"}[2m])
#empty

# Unsupported by streaming engine.
# eval_info instant at 1m stddev_over_time(metric_histogram{type="mix"}[2m])
# {type="mix"} 0
eval_info instant at 1m stddev_over_time(metric_histogram{type="mix"}[2m])
{type="mix"} 0

# Unsupported by streaming engine.
# eval instant at 1m stdvar_over_time(metric_histogram{type="only_histogram"}[2m])
# #empty
eval instant at 1m stdvar_over_time(metric_histogram{type="only_histogram"}[2m])
#empty

# Unsupported by streaming engine.
# eval_info instant at 1m stdvar_over_time(metric_histogram{type="mix"}[2m])
# {type="mix"} 0
eval_info instant at 1m stdvar_over_time(metric_histogram{type="mix"}[2m])
{type="mix"} 0

# Tests for stddev_over_time and stdvar_over_time #4927.
clear
load 10s
metric 1.5990505637277868 1.5990505637277868 1.5990505637277868

# Unsupported by streaming engine.
# eval instant at 1m stdvar_over_time(metric[1m])
# {} 0
eval instant at 1m stdvar_over_time(metric[1m])
{} 0

# Unsupported by streaming engine.
# eval instant at 1m stddev_over_time(metric[1m])
# {} 0
eval instant at 1m stddev_over_time(metric[1m])
{} 0

# Tests for mad_over_time.
clear
Expand Down
Loading