Skip to content

Commit

Permalink
Ruler: Short-circuit constant queries
Browse files Browse the repository at this point in the history
Instead of sending those to the query-frontend + scheduler + querier, we can stop at the ruler
  • Loading branch information
julienduchesne committed Feb 10, 2025
1 parent 23162f4 commit ea1da1f
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
if err != nil {
return nil, err
}
remoteQuerier := ruler.NewRemoteQuerier(queryFrontendClient, t.Cfg.Querier.EngineConfig.Timeout, t.Cfg.Ruler.QueryFrontend.MaxRetriesRate, t.Cfg.Ruler.QueryFrontend.QueryResultResponseFormat, t.Cfg.API.PrometheusHTTPPrefix, util_log.Logger, ruler.WithOrgIDMiddleware)
remoteQuerier := ruler.NewRemoteQuerier(queryFrontendClient, t.Cfg.Querier.EngineConfig.Timeout, t.Cfg.Ruler.QueryFrontend.MaxRetriesRate, t.Cfg.Ruler.QueryFrontend.QueryResultResponseFormat, t.Cfg.Ruler.QueryFrontend.ShortCircuitConstantQueries, t.Cfg.API.PrometheusHTTPPrefix, util_log.Logger, ruler.WithOrgIDMiddleware)

embeddedQueryable = prom_remote.NewSampleAndChunkQueryableClient(
remoteQuerier,
Expand Down
51 changes: 50 additions & 1 deletion pkg/ruler/remotequerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -70,6 +71,10 @@ type QueryFrontendConfig struct {
QueryResultResponseFormat string `yaml:"query_result_response_format"`

MaxRetriesRate float64 `yaml:"max_retries_rate"`

// ShortCircuitConstantQueries enables short-circuiting of constant queries.
// If enabled, constant queries are evaluated locally and not sent to the query-frontend.
ShortCircuitConstantQueries bool `yaml:"short_circuit_constant_queries"`
}

func (c *QueryFrontendConfig) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -84,6 +89,7 @@ func (c *QueryFrontendConfig) RegisterFlags(f *flag.FlagSet) {

f.StringVar(&c.QueryResultResponseFormat, "ruler.query-frontend.query-result-response-format", formatProtobuf, fmt.Sprintf("Format to use when retrieving query results from query-frontends. Supported values: %s", strings.Join(allFormats, ", ")))
f.Float64Var(&c.MaxRetriesRate, "ruler.query-frontend.max-retries-rate", 170, "Maximum number of retries for failed queries per second.")
f.BoolVar(&c.ShortCircuitConstantQueries, "ruler.query-frontend.short-circuit-constant-queries", false, "Enable short-circuiting of constant queries.")
}

func (c *QueryFrontendConfig) Validate() error {
Expand Down Expand Up @@ -126,6 +132,8 @@ type RemoteQuerier struct {
logger log.Logger
preferredQueryResultResponseFormat string
decoders map[string]decoder
shortCircuitConstantQueries bool
engine *promql.Engine
}

var jsonDecoderInstance = jsonDecoder{}
Expand All @@ -137,11 +145,12 @@ func NewRemoteQuerier(
timeout time.Duration,
maxRetryRate float64, // maxRetryRate is the maximum number of retries for failed queries per second.
preferredQueryResultResponseFormat string,
shortCircuitConstantQueries bool,
prometheusHTTPPrefix string,
logger log.Logger,
middlewares ...Middleware,
) *RemoteQuerier {
return &RemoteQuerier{
q := &RemoteQuerier{
client: client,
timeout: timeout,
retryLimiter: rate.NewLimiter(rate.Limit(maxRetryRate), 1),
Expand All @@ -153,7 +162,17 @@ func NewRemoteQuerier(
jsonDecoderInstance.ContentType(): jsonDecoderInstance,
protobufDecoderInstance.ContentType(): protobufDecoderInstance,
},
shortCircuitConstantQueries: shortCircuitConstantQueries,
}

if shortCircuitConstantQueries {
q.engine = promql.NewEngine(promql.EngineOpts{
MaxSamples: 1e6,
Timeout: 5 * time.Second,
})
}

return q
}

// Read satisfies Prometheus remote.ReadClient.
Expand Down Expand Up @@ -236,11 +255,41 @@ func (q *RemoteQuerier) Read(ctx context.Context, query *prompb.Query, sortSerie
return remote.FromQueryResult(sortSeries, res), nil
}

func hasSelector(expr parser.Node) bool {
switch e := expr.(type) {
case *parser.VectorSelector, *parser.MatrixSelector:
return true
default:
for _, child := range parser.Children(e) {
if hasSelector(child) {
return true
}
}
return false
}
}

// Query performs a query for the given time.
func (q *RemoteQuerier) Query(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
logger, ctx := spanlogger.NewWithLogger(ctx, q.logger, "ruler.RemoteQuerier.Query")
defer logger.Span.Finish()

if q.shortCircuitConstantQueries {
parsed, err := parser.ParseExpr(qs)
if err != nil {
return promql.Vector{}, err
}
if !hasSelector(parsed) {
level.Debug(logger).Log("msg", "short-circuiting constant query evaluation", "qs", qs, "tm", t)
query, err := q.engine.NewInstantQuery(ctx, &remote.Storage{}, &promql.PrometheusQueryOpts{}, qs, t)
if err != nil {
return promql.Vector{}, err
}
res := query.Exec(ctx)
return res.Vector()
}
}

return q.query(ctx, qs, t, logger)
}

Expand Down
50 changes: 37 additions & 13 deletions pkg/ruler/remotequerier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestRemoteQuerier_Read(t *testing.T) {
t.Run("should issue a remote read request", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, false, "/prometheus", log.NewNopLogger())
_, err := q.Read(context.Background(), &prompb.Query{}, false)
require.NoError(t, err)

Expand All @@ -76,7 +76,7 @@ func TestRemoteQuerier_Read(t *testing.T) {
t.Run("should not inject the read consistency header if none is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, false, "/prometheus", log.NewNopLogger())
_, err := q.Read(context.Background(), &prompb.Query{}, false)
require.NoError(t, err)

Expand All @@ -86,7 +86,7 @@ func TestRemoteQuerier_Read(t *testing.T) {
t.Run("should inject the read consistency header if it is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, false, "/prometheus", log.NewNopLogger())

ctx := api.ContextWithReadConsistencyLevel(context.Background(), api.ReadConsistencyStrong)
_, err := q.Read(ctx, &prompb.Query{}, false)
Expand All @@ -101,7 +101,7 @@ func TestRemoteQuerier_ReadReqTimeout(t *testing.T) {
<-ctx.Done()
return nil, ctx.Err()
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, 1, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, 1, formatJSON, false, "/prometheus", log.NewNopLogger())

_, err := q.Read(context.Background(), &prompb.Query{}, false)
require.Error(t, err)
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestRemoteQuerier_Query(t *testing.T) {
t.Run(fmt.Sprintf("format = %s", format), func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, 1, format, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, format, false, "/prometheus", log.NewNopLogger())
_, err := q.Query(context.Background(), "qs", tm)
require.NoError(t, err)

Expand All @@ -165,7 +165,7 @@ func TestRemoteQuerier_Query(t *testing.T) {
t.Run("should not inject the read consistency header if none is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, false, "/prometheus", log.NewNopLogger())
_, err := q.Query(context.Background(), "qs", tm)
require.NoError(t, err)

Expand All @@ -175,14 +175,38 @@ func TestRemoteQuerier_Query(t *testing.T) {
t.Run("should inject the read consistency header if it is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, false, "/prometheus", log.NewNopLogger())

ctx := api.ContextWithReadConsistencyLevel(context.Background(), api.ReadConsistencyStrong)
_, err := q.Query(ctx, "qs", tm)
require.NoError(t, err)

require.Equal(t, api.ReadConsistencyStrong, getHeader(inReq.Headers, api.ReadConsistencyHeader))
})

t.Run("should be able to short-circuit constant queries", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, true, "/prometheus", log.NewNopLogger())
vec, err := q.Query(context.Background(), "vector(1 - 0.90)", tm)
require.NoError(t, err)
require.InEpsilon(t, 0.1, vec[0].F, 0.0001)
require.InEpsilon(t, float64(tm.UnixMilli()), vec[0].T, 1)
require.Len(t, vec, 1)

require.Empty(t, inReq)
})

t.Run("will not short-circuit non-constant queries", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, true, "/prometheus", log.NewNopLogger())
_, err := q.Query(context.Background(), "foo", tm)
require.NoError(t, err)

require.NotEmpty(t, inReq)
})

}

func TestRemoteQuerier_QueryRetryOnFailure(t *testing.T) {
Expand Down Expand Up @@ -276,7 +300,7 @@ func TestRemoteQuerier_QueryRetryOnFailure(t *testing.T) {
}
return testCase.response, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatJSON, false, "/prometheus", log.NewNopLogger())
require.Equal(t, int64(0), count.Load())
_, err := q.Query(ctx, "qs", time.Now())
if testCase.err == nil {
Expand Down Expand Up @@ -405,7 +429,7 @@ func TestRemoteQuerier_QueryJSONDecoding(t *testing.T) {
Body: []byte(scenario.body),
}, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatJSON, false, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
actual, err := q.Query(context.Background(), "qs", tm)
Expand Down Expand Up @@ -678,7 +702,7 @@ func TestRemoteQuerier_QueryProtobufDecoding(t *testing.T) {
Body: b,
}, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatProtobuf, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatProtobuf, false, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
actual, err := q.Query(context.Background(), "qs", tm)
Expand All @@ -701,7 +725,7 @@ func TestRemoteQuerier_QueryUnknownResponseContentType(t *testing.T) {
Body: []byte("some body content"),
}, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatJSON, false, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
_, err := q.Query(context.Background(), "qs", tm)
Expand All @@ -713,7 +737,7 @@ func TestRemoteQuerier_QueryReqTimeout(t *testing.T) {
<-ctx.Done()
return nil, ctx.Err()
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, 1, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, 1, formatJSON, false, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
_, err := q.Query(context.Background(), "qs", tm)
Expand Down Expand Up @@ -771,7 +795,7 @@ func TestRemoteQuerier_StatusErrorResponses(t *testing.T) {
return testCase.resp, testCase.err
}
logger := newLoggerWithCounter()
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatJSON, "/prometheus", logger)
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatJSON, false, "/prometheus", logger)

tm := time.Unix(1649092025, 515834)

Expand Down

0 comments on commit ea1da1f

Please sign in to comment.