Skip to content

Commit e9cecc7

Browse files
committed
Implement delayed name removal
Signed-off-by: SungJin1212 <[email protected]>
1 parent 576bf7b commit e9cecc7

31 files changed

+318
-244
lines changed

engine/engine.go

Lines changed: 47 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -192,14 +192,15 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine {
192192

193193
disableDuplicateLabelChecks: opts.DisableDuplicateLabelChecks,
194194

195-
logger: opts.Logger,
196-
lookbackDelta: opts.LookbackDelta,
197-
enablePerStepStats: opts.EnablePerStepStats,
198-
logicalOptimizers: opts.LogicalOptimizers,
199-
timeout: opts.Timeout,
200-
metrics: metrics,
201-
extLookbackDelta: opts.ExtLookbackDelta,
202-
enableAnalysis: opts.EnableAnalysis,
195+
logger: opts.Logger,
196+
lookbackDelta: opts.LookbackDelta,
197+
enablePerStepStats: opts.EnablePerStepStats,
198+
logicalOptimizers: opts.LogicalOptimizers,
199+
timeout: opts.Timeout,
200+
metrics: metrics,
201+
extLookbackDelta: opts.ExtLookbackDelta,
202+
enableAnalysis: opts.EnableAnalysis,
203+
enableDelayedNameRemoval: opts.EnableDelayedNameRemoval,
203204
noStepSubqueryIntervalFn: func(d time.Duration) time.Duration {
204205
return time.Duration(opts.NoStepSubqueryIntervalFn(d.Milliseconds()) * 1000000)
205206
},
@@ -233,6 +234,7 @@ type Engine struct {
233234
decodingConcurrency int
234235
selectorBatchSize int64
235236
enableAnalysis bool
237+
enableDelayedNameRemoval bool
236238
noStepSubqueryIntervalFn func(time.Duration) time.Duration
237239
}
238240

@@ -275,17 +277,18 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts
275277
}
276278
e.metrics.totalQueries.Inc()
277279
return &compatibilityQuery{
278-
Query: &Query{exec: exec, opts: opts},
279-
engine: e,
280-
plan: lplan,
281-
warns: warns,
282-
ts: ts,
283-
t: InstantQuery,
284-
resultSort: resultSort,
285-
scanners: scanners,
286-
start: ts,
287-
end: ts,
288-
step: 0,
280+
Query: &Query{exec: exec, opts: opts},
281+
engine: e,
282+
plan: lplan,
283+
ts: ts,
284+
warns: warns,
285+
t: InstantQuery,
286+
resultSort: resultSort,
287+
scanners: scanners,
288+
enableDelayedNameRemoval: e.enableDelayedNameRemoval,
289+
start: ts,
290+
end: ts,
291+
step: 0,
289292
}, nil
290293
}
291294

@@ -327,11 +330,12 @@ func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryab
327330
ts: ts,
328331
t: InstantQuery,
329332
// TODO(fpetkovski): Infer the sort order from the plan, ideally without copying the newResultSort function.
330-
resultSort: noSortResultSort{},
331-
scanners: scnrs,
332-
start: ts,
333-
end: ts,
334-
step: 0,
333+
resultSort: noSortResultSort{},
334+
scanners: scnrs,
335+
enableDelayedNameRemoval: e.enableDelayedNameRemoval,
336+
start: ts,
337+
end: ts,
338+
step: 0,
335339
}, nil
336340
}
337341

@@ -374,15 +378,16 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *
374378
e.metrics.totalQueries.Inc()
375379

376380
return &compatibilityQuery{
377-
Query: &Query{exec: exec, opts: opts},
378-
engine: e,
379-
plan: lplan,
380-
warns: warns,
381-
t: RangeQuery,
382-
scanners: scnrs,
383-
start: start,
384-
end: end,
385-
step: step,
381+
Query: &Query{exec: exec, opts: opts},
382+
engine: e,
383+
plan: lplan,
384+
warns: warns,
385+
t: RangeQuery,
386+
scanners: scnrs,
387+
enableDelayedNameRemoval: e.enableDelayedNameRemoval,
388+
start: start,
389+
end: end,
390+
step: step,
386391
}, nil
387392
}
388393

@@ -450,6 +455,7 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
450455
EnablePerStepStats: e.enablePerStepStats,
451456
ExtLookbackDelta: e.extLookbackDelta,
452457
EnableAnalysis: e.enableAnalysis,
458+
EnableDelayedNameRemoval: e.enableDelayedNameRemoval,
453459
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
454460
DecodingConcurrency: e.decodingConcurrency,
455461
}
@@ -520,9 +526,10 @@ type compatibilityQuery struct {
520526
end time.Time
521527
step time.Duration
522528

523-
t QueryType
524-
resultSort resultSorter
525-
cancel context.CancelFunc
529+
t QueryType
530+
resultSort resultSorter
531+
cancel context.CancelFunc
532+
enableDelayedNameRemoval bool
526533

527534
scanners engstorage.Scanners
528535
}
@@ -564,7 +571,10 @@ func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) {
564571

565572
series := make([]promql.Series, len(resultSeries))
566573
for i, s := range resultSeries {
567-
series[i].Metric = s
574+
if s.DropName && q.enableDelayedNameRemoval {
575+
s.Metric = s.Metric.DropMetricName()
576+
}
577+
series[i] = s
568578
}
569579
loop:
570580
for {

engine/engine_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,12 @@ func TestPromqlAcceptance(t *testing.T) {
8686
MaxSamples: 5e10,
8787
Timeout: 1 * time.Hour,
8888
NoStepSubqueryIntervalFn: func(rangeMillis int64) int64 { return 30 * time.Second.Milliseconds() },
89+
EnableDelayedNameRemoval: true,
8990
}})
9091

9192
st := &skipTest{
9293
skipTests: []string{
93-
"testdata/name_label_dropping.test", // feature unsupported
94-
"testdata/limit.test", // limitk, limit_ratio
94+
"testdata/limit.test", // limitk, limit_ratio
9595
}, // TODO(sungjin1212): change to test whole cases
9696
TBRun: t,
9797
}

engine/existing_test.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ func TestRangeQuery(t *testing.T) {
3434
Query: `sum_over_time(bar[30s])`,
3535
Result: promql.Matrix{
3636
promql.Series{
37-
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 10, T: 60000}, {F: 1000, T: 120000}},
38-
Metric: labels.Labels{},
37+
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 10, T: 60000}, {F: 1000, T: 120000}},
38+
Metric: labels.Labels{},
39+
DropName: true,
3940
},
4041
},
4142
Start: time.Unix(0, 0),
@@ -49,8 +50,9 @@ func TestRangeQuery(t *testing.T) {
4950
Query: `sum_over_time(bar[45s])`,
5051
Result: promql.Matrix{
5152
promql.Series{
52-
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 11, T: 60000}, {F: 1100, T: 120000}},
53-
Metric: labels.Labels{},
53+
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 11, T: 60000}, {F: 1100, T: 120000}},
54+
Metric: labels.Labels{},
55+
DropName: true,
5456
},
5557
},
5658
Start: time.Unix(0, 0),
@@ -64,8 +66,9 @@ func TestRangeQuery(t *testing.T) {
6466
Query: `sum_over_time(bar[45s])`,
6567
Result: promql.Matrix{
6668
promql.Series{
67-
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 11, T: 60000}, {F: 1100, T: 120000}},
68-
Metric: labels.Labels{},
69+
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 11, T: 60000}, {F: 1100, T: 120000}},
70+
Metric: labels.Labels{},
71+
DropName: true,
6972
},
7073
},
7174
Start: time.Unix(0, 0),
@@ -79,8 +82,9 @@ func TestRangeQuery(t *testing.T) {
7982
Query: `sum_over_time(bar[45s])`,
8083
Result: promql.Matrix{
8184
promql.Series{
82-
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 11, T: 60000}, {F: 1100, T: 120000}, {F: 110000, T: 180000}, {F: 11000000, T: 240000}},
83-
Metric: labels.Labels{},
85+
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 11, T: 60000}, {F: 1100, T: 120000}, {F: 110000, T: 180000}, {F: 11000000, T: 240000}},
86+
Metric: labels.Labels{},
87+
DropName: true,
8488
},
8589
},
8690
Start: time.Unix(0, 0),
@@ -94,8 +98,9 @@ func TestRangeQuery(t *testing.T) {
9498
Query: `sum_over_time(bar[45s])`,
9599
Result: promql.Matrix{
96100
promql.Series{
97-
Floats: []promql.FPoint{{F: 5, T: 0}, {F: 59, T: 60000}, {F: 9, T: 120000}, {F: 956, T: 180000}},
98-
Metric: labels.Labels{},
101+
Floats: []promql.FPoint{{F: 5, T: 0}, {F: 59, T: 60000}, {F: 9, T: 120000}, {F: 956, T: 180000}},
102+
Metric: labels.Labels{},
103+
DropName: true,
99104
},
100105
},
101106
Start: time.Unix(0, 0),

engine/user_defined_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ load 30s
4949

5050
expected := promql.Matrix{
5151
promql.Series{
52-
Metric: labels.EmptyLabels(),
5352
Floats: []promql.FPoint{{T: 0, F: 14}, {T: 30000, F: 14}, {T: 60000, F: 14}, {T: 90000, F: 14}},
5453
},
5554
}
@@ -120,10 +119,10 @@ func (c *vectorSelectorOperator) Next(ctx context.Context) ([]model.StepVector,
120119
return vectors, nil
121120
}
122121

123-
func (c *vectorSelectorOperator) Series(ctx context.Context) ([]labels.Labels, error) {
124-
return []labels.Labels{
125-
labels.FromStrings(labels.MetricName, "http_requests_total", "container", "a"),
126-
labels.FromStrings(labels.MetricName, "http_requests_total", "container", "b"),
122+
func (c *vectorSelectorOperator) Series(ctx context.Context) ([]promql.Series, error) {
123+
return []promql.Series{
124+
{Metric: labels.FromStrings(labels.MetricName, "http_requests_total", "container", "a")},
125+
{Metric: labels.FromStrings(labels.MetricName, "http_requests_total", "container", "b")},
127126
}, nil
128127
}
129128

execution/aggregate/count_values.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@ import (
1111
"sync"
1212
"time"
1313

14-
"github.com/thanos-io/promql-engine/execution/telemetry"
15-
1614
"github.com/prometheus/prometheus/model/labels"
15+
"github.com/prometheus/prometheus/promql"
1716

1817
"github.com/thanos-io/promql-engine/execution/model"
18+
"github.com/thanos-io/promql-engine/execution/telemetry"
1919
"github.com/thanos-io/promql-engine/query"
2020
)
2121

@@ -34,7 +34,7 @@ type countValuesOperator struct {
3434

3535
ts []int64
3636
counts []map[int]int
37-
series []labels.Labels
37+
series []promql.Series
3838

3939
once sync.Once
4040
}
@@ -72,7 +72,7 @@ func (c *countValuesOperator) String() string {
7272
return fmt.Sprintf("[countValues] without (%v) - param (%v)", c.grouping, c.param)
7373
}
7474

75-
func (c *countValuesOperator) Series(ctx context.Context) ([]labels.Labels, error) {
75+
func (c *countValuesOperator) Series(ctx context.Context) ([]promql.Series, error) {
7676
start := time.Now()
7777
defer func() { c.AddExecutionTimeTaken(time.Since(start)) }()
7878

@@ -143,7 +143,7 @@ func (c *countValuesOperator) initSeriesOnce(ctx context.Context) error {
143143

144144
ts := make([]int64, 0)
145145
counts := make([]map[int]int, 0)
146-
series := make([]labels.Labels, 0)
146+
series := make([]promql.Series, 0)
147147

148148
b := labels.NewBuilder(labels.EmptyLabels())
149149
for {
@@ -192,7 +192,7 @@ func (c *countValuesOperator) initSeriesOnce(ctx context.Context) error {
192192
hash := lbls.Hash()
193193
outputId, ok := hashToOutputId[hash]
194194
if !ok {
195-
series = append(series, lbls)
195+
series = append(series, promql.Series{Metric: lbls})
196196
outputId = len(series) - 1
197197
hashToOutputId[hash] = outputId
198198
}

0 commit comments

Comments
 (0)