Skip to content

Commit

Permalink
Approximate quantile_over_time (#10417)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
This change shards `quantile_over_time` queries using t-digest or
DDSketch approximations. It can be enabled with `querier.shard_aggregations=quantile_over_time`.

Outstanding
- [x] Replace generic return type of `StepEvaluator` with interface
`StepResult`.
- [x] Send mapped query with quantile sketch expression from frontend to
querier over the wire.
- [x] Serialize sketches. See
influxdata/tdigest#34
- [x] Add feature flag.

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [x] Documentation added
- [x] Tests updated
- [x] `CHANGELOG.md` updated
- [x] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [x] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)

---------

Signed-off-by: Callum Styan <[email protected]>
Co-authored-by: Callum Styan <[email protected]>
  • Loading branch information
jeschkies and cstyan authored Dec 11, 2023
1 parent 5e34967 commit f67fff3
Show file tree
Hide file tree
Showing 57 changed files with 2,901 additions and 735 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [10727](https://github.com/grafana/loki/pull/10727) **sandeepsukhani** Native otlp ingestion support
* [11051](https://github.com/grafana/loki/pull/11051) Refactor to not use global logger in modules
* [10956](https://github.com/grafana/loki/pull/10956) **jeschkies** do not wrap requests but send pure Protobuf from frontend v2 via scheduler to querier when `-frontend.encoding=protobuf`.
* [10417](https://github.com/grafana/loki/pull/10417) **jeschkies** shard `quantile_over_time` range queries using probabilistic data structures.
* [11284](https://github.com/grafana/loki/pull/11284) **ashwanthgoli** Config: Adds `frontend.max-query-capacity` to tune per-tenant query capacity.

##### Fixes
Expand Down
5 changes: 5 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,11 @@ results_cache:
# CLI flag: -querier.parallelise-shardable-queries
[parallelise_shardable_queries: <boolean> | default = true]
# A comma-separated list of LogQL vector and range aggregations that should be
# sharded
# CLI flag: -querier.shard-aggregation
[shard_aggregations: <string> | default = ""]
# Cache index stats query results.
# CLI flag: -querier.cache-index-stats-results
[cache_index_stats_results: <boolean> | default = false]
Expand Down
42 changes: 41 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/querier/plan"
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk"
Expand Down Expand Up @@ -851,6 +852,16 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie
// initialize stats collection for ingester queries.
_, ctx := stats.NewContext(queryServer.Context())

if req.Plan == nil {
parsed, err := syntax.ParseLogSelector(req.Selector, true)
if err != nil {
return err
}
req.Plan = &plan.QueryPlan{
AST: parsed,
}
}

instanceID, err := tenant.TenantID(ctx)
if err != nil {
return err
Expand All @@ -874,6 +885,7 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie
Limit: req.Limit,
Shards: req.Shards,
Deletes: req.Deletes,
Plan: req.Plan,
}}
storeItr, err := i.store.SelectLogs(ctx, storeReq)
if err != nil {
Expand All @@ -900,6 +912,17 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log
_, ctx := stats.NewContext(queryServer.Context())
sp := opentracing.SpanFromContext(ctx)

// If the plan is empty we want all series to be returned.
if req.Plan == nil {
parsed, err := syntax.ParseSampleExpr(req.Selector)
if err != nil {
return err
}
req.Plan = &plan.QueryPlan{
AST: parsed,
}
}

instanceID, err := tenant.TenantID(ctx)
if err != nil {
return err
Expand All @@ -925,6 +948,7 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log
Selector: req.Selector,
Shards: req.Shards,
Deletes: req.Deletes,
Plan: req.Plan,
}}
storeItr, err := i.store.SelectSamples(ctx, storeReq)
if err != nil {
Expand Down Expand Up @@ -1234,6 +1258,16 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_
default:
}

if req.Plan == nil {
parsed, err := syntax.ParseLogSelector(req.Query, true)
if err != nil {
return err
}
req.Plan = &plan.QueryPlan{
AST: parsed,
}
}

instanceID, err := tenant.TenantID(queryServer.Context())
if err != nil {
return err
Expand All @@ -1243,7 +1277,13 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_
if err != nil {
return err
}
tailer, err := newTailer(instanceID, req.Query, queryServer, i.cfg.MaxDroppedStreams)

expr, ok := req.Plan.AST.(syntax.LogSelectorExpr)
if !ok {
return fmt.Errorf("unsupported query expression: want (LogSelectorExpr), got (%T)", req.Plan.AST)
}

tailer, err := newTailer(instanceID, expr, queryServer, i.cfg.MaxDroppedStreams)
if err != nil {
return err
}
Expand Down
23 changes: 23 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/plan"
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
Expand Down Expand Up @@ -812,6 +814,9 @@ func Test_DedupeIngester(t *testing.T) {
End: time.Unix(0, requests+1),
Limit: uint32(requests * streamCount),
Direction: logproto.BACKWARD,
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`{foo="bar"} | label_format bar=""`),
},
})
require.NoError(t, err)
iterators = append(iterators, iter.NewQueryClientIterator(stream, logproto.BACKWARD))
Expand Down Expand Up @@ -870,6 +875,9 @@ func Test_DedupeIngester(t *testing.T) {
Selector: `sum(rate({foo="bar"}[1m])) by (bar)`,
Start: time.Unix(0, 0),
End: time.Unix(0, requests+1),
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum(rate({foo="bar"}[1m])) by (bar)`),
},
})
require.NoError(t, err)
iterators = append(iterators, iter.NewSampleQueryClientIterator(stream))
Expand Down Expand Up @@ -905,6 +913,9 @@ func Test_DedupeIngester(t *testing.T) {
Selector: `sum(rate({foo="bar"}[1m]))`,
Start: time.Unix(0, 0),
End: time.Unix(0, requests+1),
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum(rate({foo="bar"}[1m]))`),
},
})
require.NoError(t, err)
iterators = append(iterators, iter.NewSampleQueryClientIterator(stream))
Expand Down Expand Up @@ -965,6 +976,9 @@ func Test_DedupeIngesterParser(t *testing.T) {
End: time.Unix(0, int64(requests+1)),
Limit: uint32(requests * streamCount * 2),
Direction: logproto.BACKWARD,
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`{foo="bar"} | json`),
},
})
require.NoError(t, err)
iterators = append(iterators, iter.NewQueryClientIterator(stream, logproto.BACKWARD))
Expand Down Expand Up @@ -992,6 +1006,9 @@ func Test_DedupeIngesterParser(t *testing.T) {
End: time.Unix(0, int64(requests+1)),
Limit: uint32(requests * streamCount * 2),
Direction: logproto.FORWARD,
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`{foo="bar"} | json`),
},
})
require.NoError(t, err)
iterators = append(iterators, iter.NewQueryClientIterator(stream, logproto.FORWARD))
Expand All @@ -1016,6 +1033,9 @@ func Test_DedupeIngesterParser(t *testing.T) {
Selector: `rate({foo="bar"} | json [1m])`,
Start: time.Unix(0, 0),
End: time.Unix(0, int64(requests+1)),
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`rate({foo="bar"} | json [1m])`),
},
})
require.NoError(t, err)
iterators = append(iterators, iter.NewSampleQueryClientIterator(stream))
Expand All @@ -1041,6 +1061,9 @@ func Test_DedupeIngesterParser(t *testing.T) {
Selector: `sum by (c,d,e,foo) (rate({foo="bar"} | json [1m]))`,
Start: time.Unix(0, 0),
End: time.Unix(0, int64(requests+1)),
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum by (c,d,e,foo) (rate({foo="bar"} | json [1m]))`),
},
})
require.NoError(t, err)
iterators = append(iterators, iter.NewSampleQueryClientIterator(stream))
Expand Down
17 changes: 16 additions & 1 deletion pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/querier/plan"
loki_runtime "github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/config"
Expand Down Expand Up @@ -537,7 +538,9 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
ctx := context.Background()

inst, _ := newInstance(&Config{}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil, 10)
expr, err := syntax.ParseLogSelector(`{namespace="foo",pod="bar",instance=~"10.*"}`, true)
require.NoError(b, err)
t, err := newTailer("foo", expr, nil, 10)
require.NoError(b, err)
for i := 0; i < 10000; i++ {
require.NoError(b, inst.Push(ctx, &logproto.PushRequest{
Expand Down Expand Up @@ -596,6 +599,9 @@ func Test_Iterator(t *testing.T) {
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Direction: logproto.BACKWARD,
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`{job="3"} | logfmt`),
},
},
},
)
Expand Down Expand Up @@ -648,6 +654,9 @@ func Test_ChunkFilter(t *testing.T) {
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Direction: logproto.BACKWARD,
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`{job="3"}`),
},
},
},
)
Expand Down Expand Up @@ -690,6 +699,9 @@ func Test_QueryWithDelete(t *testing.T) {
End: 10 * 1e6,
},
},
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`{job="3"}`),
},
},
},
)
Expand Down Expand Up @@ -730,6 +742,9 @@ func Test_QuerySampleWithDelete(t *testing.T) {
End: 10 * 1e6,
},
},
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`count_over_time({job="3"}[5m])`),
},
},
},
)
Expand Down
5 changes: 4 additions & 1 deletion pkg/ingester/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/util/flagext"
"github.com/grafana/loki/pkg/validation"
)
Expand Down Expand Up @@ -524,7 +525,9 @@ func Benchmark_PushStream(b *testing.B) {
chunkfmt, headfmt := defaultChunkFormat(b)

s := newStream(chunkfmt, headfmt, &Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, true, NewStreamRateCalculator(), NilMetrics, nil)
t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{}, 10)
expr, err := syntax.ParseLogSelector(`{namespace="loki-dev"}`, true)
require.NoError(b, err)
t, err := newTailer("foo", expr, &fakeTailServer{}, 10)
require.NoError(b, err)

go t.loop()
Expand Down
8 changes: 2 additions & 6 deletions pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ type tailer struct {
conn TailServer
}

func newTailer(orgID, query string, conn TailServer, maxDroppedStreams int) (*tailer, error) {
expr, err := syntax.ParseLogSelector(query, true)
if err != nil {
return nil, err
}
func newTailer(orgID string, expr syntax.LogSelectorExpr, conn TailServer, maxDroppedStreams int) (*tailer, error) {
// Make sure we can build a pipeline. The stream processing code doesn't have a place to handle
// this error so make sure we handle it here.
pipeline, err := expr.Pipeline()
Expand All @@ -66,7 +62,7 @@ func newTailer(orgID, query string, conn TailServer, maxDroppedStreams int) (*ta
conn: conn,
droppedStreams: make([]*logproto.DroppedStream, 0, maxDroppedStreams),
maxDroppedStreams: maxDroppedStreams,
id: generateUniqueID(orgID, query),
id: generateUniqueID(orgID, expr.String()),
closeChan: make(chan struct{}),
pipeline: pipeline,
}, nil
Expand Down
17 changes: 13 additions & 4 deletions pkg/ingester/tailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
)

func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) {
Expand All @@ -26,7 +27,9 @@ func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) {
}

for run := 0; run < runs; run++ {
tailer, err := newTailer("org-id", stream.Labels, nil, 10)
expr, err := syntax.ParseLogSelector(stream.Labels, true)
require.NoError(t, err)
tailer, err := newTailer("org-id", expr, nil, 10)
require.NoError(t, err)
require.NotNil(t, tailer)

Expand Down Expand Up @@ -78,7 +81,9 @@ func Test_dropstream(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
tail, err := newTailer("foo", `{app="foo"} |= "foo"`, &fakeTailServer{}, maxDroppedStreams)
expr, err := syntax.ParseLogSelector(`{app="foo"} |= "foo"`, true)
require.NoError(t, err)
tail, err := newTailer("foo", expr, &fakeTailServer{}, maxDroppedStreams)
require.NoError(t, err)

for i := 0; i < c.drop; i++ {
Expand Down Expand Up @@ -114,7 +119,9 @@ func (f *fakeTailServer) Reset() {
}

func Test_TailerSendRace(t *testing.T) {
tail, err := newTailer("foo", `{app="foo"} |= "foo"`, &fakeTailServer{}, 10)
expr, err := syntax.ParseLogSelector(`{app="foo"} |= "foo"`, true)
require.NoError(t, err)
tail, err := newTailer("foo", expr, &fakeTailServer{}, 10)
require.NoError(t, err)

var wg sync.WaitGroup
Expand Down Expand Up @@ -250,7 +257,9 @@ func Test_StructuredMetadata(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
var server fakeTailServer
tail, err := newTailer("foo", tc.query, &server, 10)
expr, err := syntax.ParseLogSelector(tc.query, true)
require.NoError(t, err)
tail, err := newTailer("foo", expr, &server, 10)
require.NoError(t, err)

var wg sync.WaitGroup
Expand Down
12 changes: 11 additions & 1 deletion pkg/loghttp/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/grafana/dskit/httpgrpc"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/plan"
)

const (
Expand Down Expand Up @@ -67,8 +69,16 @@ func (s *DroppedStream) UnmarshalJSON(data []byte) error {
// ParseTailQuery parses a TailRequest request from an http request.
func ParseTailQuery(r *http.Request) (*logproto.TailRequest, error) {
var err error
qs := query(r)
parsed, err := syntax.ParseExpr(qs)
if err != nil {
return nil, err
}
req := logproto.TailRequest{
Query: query(r),
Query: qs,
Plan: &plan.QueryPlan{
AST: parsed,
},
}

req.Query, err = parseRegexQuery(r)
Expand Down
5 changes: 5 additions & 0 deletions pkg/loghttp/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/plan"
)

func TestParseTailQuery(t *testing.T) {
Expand Down Expand Up @@ -38,6 +40,9 @@ func TestParseTailQuery(t *testing.T) {
DelayFor: 5,
Start: time.Date(2017, 06, 10, 21, 42, 24, 760738998, time.UTC),
Limit: 1000,
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`{foo="bar"}`),
},
}, false},
}
for _, tt := range tests {
Expand Down
Loading

0 comments on commit f67fff3

Please sign in to comment.