Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

querier: adjust requested limit parameter for downstream requests to series endpoint #10652

Merged
merged 9 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
* [ENHANCEMENT] Query-frontend: Allow adjustment of queries looking into the future to a maximum duration with experimental `-query-frontend.max-future-query-window` flag. #10547
* [ENHANCEMENT] Ruler: Adds support for filtering results from rule status endpoint by `file[]`, `rule_group[]` and `rule_name[]`. #10589
* [ENHANCEMENT] Query-frontend: Add option to "spin off" subqueries as actual range queries, so that they benefit from query acceleration techniques such as sharding, splitting and caching. To enable this, set the `-query-frontend.spin-off-instant-subqueries-to-url=<url>` option on the frontend and the `instant_queries_with_subquery_spin_off` per-tenant override with regular expressions matching the queries to enable. #10460 #10603 #10621
* [ENHANCEMENT] Querier, ingester: The series API respects passed `limit` parameter. #10620
* [ENHANCEMENT] Querier, ingester: The series API respects passed `limit` parameter. #10620 #10652
* [ENHANCEMENT] Store-gateway: Add experimental settings under `-store-gateway.dynamic-replication` to allow more than the default of 3 store-gateways to own recent blocks. #10382 #10637
* [ENHANCEMENT] Ingester: Add reactive concurrency limiters to protect push and read operations from overload. #10574
* [ENHANCEMENT] Compactor: Add experimental `-compactor.max-lookback` option to limit blocks considered in each compaction cycle. Blocks uploaded prior to the lookback period aren't processed. This option helps reduce CPU utilization in tenants with large block metadata files that are processed before each compaction. #10585
Expand Down
47 changes: 42 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2754,23 +2754,25 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
return nil, err
}

resultLimit := math.MaxInt
if hints != nil && hints.Limit > 0 {
resultLimit = hints.Limit
req.Limit = int64(adjustRequestLimitForReplicationSets(ctx, d, replicationSets, resultLimit))
}

resps, err := forReplicationSets(ctx, d, replicationSets, func(ctx context.Context, client ingester_client.IngesterClient) (*ingester_client.MetricsForLabelMatchersResponse, error) {
return client.MetricsForLabelMatchers(ctx, req)
})
if err != nil {
return nil, err
}

metricsLimit := math.MaxInt
if hints != nil && hints.Limit > 0 {
metricsLimit = hints.Limit
}
metrics := map[uint64]labels.Labels{}
respsLoop:
for _, resp := range resps {
ms := ingester_client.FromMetricsForLabelMatchersResponse(resp)
for _, m := range ms {
if len(metrics) >= metricsLimit {
if len(metrics) >= resultLimit {
break respsLoop
}
metrics[labels.StableHash(m)] = m
Expand All @@ -2789,6 +2791,41 @@ respsLoop:
return result, nil
}

// adjustRequestLimitForReplicationSets recalculated the limit with respect to replication sets.
// The returned value is the approximation, a query to an individual instance in the replication sets needs to be limited with.
func adjustRequestLimitForReplicationSets(ctx context.Context, d *Distributor, replicationSets []ring.ReplicationSet, limit int) int {
if limit == 0 {
return limit
}

var shardSize int
if d.cfg.IngestStorageConfig.Enabled {
// When ingest storage is enabled, each partition, represented by one replication set is owned by only one ingester.
// So we use the number of replication sets to count the number of shards.
shardSize = len(replicationSets)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

len(replicationSets) includes both ACTIVE and INACTIVE partitions. To be more accurate, I think we should only consider ACTIVE partitions (the shard size on the write path, where series are written, is made only of ACTIVE partitions). It may not be a big deal: if we decide to not address it, then I would at least leave a comment here about it.

} else if len(replicationSets) == 1 {
// We expect to always have exactly 1 replication set when ingest storage is disabled.
// In classic Mimir the total number of shards (ingestion-tenant-shard-size) is the number of ingesters in the shard across all zones.
shardSize = len(replicationSets[0].Instances) / d.ingestersRing.ReplicationFactor()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment about read-only instances here. Another case where the number of instances is higher than the real shard size is when the "lookback" threshold triggers (e.g. for 12h after a scale up). The number of instances we lookup is higher than the real shard size. I think what we really want here is the shard size as computed on the write path, and not the read path. It would be more accurate computing the actual shard size by looking at the min between "configured tenant shard size" and "writeable instances/partitions in the ring".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the PR. The idea makes sense to me, although I'm not fully sure whether I've picked the right methods in the ring to calculate what you'd suggested. PTAL

}
if shardSize == 0 || limit <= shardSize {
return limit
}

// Always round the adjusted limit up so the approximation overcounted, rather undercounted.
newLimit := int(math.Ceil(float64(limit) / float64(shardSize)))

spanLog := spanlogger.FromContext(ctx, d.log)
spanLog.DebugLog(
"msg", "the limit of query is adjusted to account for sharding",
"original", limit,
"updated", newLimit,
"shard_size (before replication)", shardSize,
)

return newLimit
}

// MetricsMetadata returns the metrics metadata based on the provided req.
func (d *Distributor) MetricsMetadata(ctx context.Context, req *ingester_client.MetricsMetadataRequest) ([]scrape.MetricMetadata, error) {
replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx)
Expand Down
12 changes: 9 additions & 3 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (mq multiQuerier) getQueriers(ctx context.Context, matchers ...*labels.Matc
}

// Select implements storage.Querier interface.
// The bool passed is ignored because the series is always sorted.
// The bool passed is ignored because the series are always sorted.
func (mq multiQuerier) Select(ctx context.Context, _ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
spanLog, ctx := spanlogger.NewWithLogger(ctx, mq.logger, "querier.Select")
defer spanLog.Span.Finish()
Expand All @@ -341,8 +341,14 @@ func (mq multiQuerier) Select(ctx context.Context, _ bool, sp *storage.SelectHin
}
now := time.Now()

level.Debug(spanLog).Log("hint.func", sp.Func, "start", util.TimeFromMillis(sp.Start).UTC().String(), "end",
util.TimeFromMillis(sp.End).UTC().String(), "step", sp.Step, "matchers", util.MatchersStringer(matchers))
level.Debug(spanLog).Log(
"hint.func", sp.Func,
"start", util.TimeFromMillis(sp.Start).UTC().String(),
"end", util.TimeFromMillis(sp.End).UTC().String(),
"limit", sp.Limit,
"step", sp.Step,
"matchers", util.MatchersStringer(matchers),
)

userID, err := tenant.TenantID(ctx)
if err != nil {
Expand Down
Loading