Skip to content

Commit 782f436

Browse files
authored
Merge pull request #22 from vinted/feature/add-series-match-limit
Add series match limit
2 parents 6e2c00a + 2b06a47 commit 782f436

File tree

4 files changed

+159
-16
lines changed

4 files changed

+159
-16
lines changed

cmd/thanos/sidecar.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ func runSidecar(
246246
{
247247
c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent)
248248

249-
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version)
249+
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version, conf.limitMaxMatchedSeries)
250250
if err != nil {
251251
return errors.Wrap(err, "create Prometheus store")
252252
}
@@ -469,15 +469,16 @@ func (s *promMetadata) Version() string {
469469
}
470470

471471
type sidecarConfig struct {
472-
http httpConfig
473-
grpc grpcConfig
474-
prometheus prometheusConfig
475-
tsdb tsdbConfig
476-
reloader reloaderConfig
477-
reqLogConfig *extflag.PathOrContent
478-
objStore extflag.PathOrContent
479-
shipper shipperConfig
480-
limitMinTime thanosmodel.TimeOrDurationValue
472+
http httpConfig
473+
grpc grpcConfig
474+
prometheus prometheusConfig
475+
tsdb tsdbConfig
476+
reloader reloaderConfig
477+
reqLogConfig *extflag.PathOrContent
478+
objStore extflag.PathOrContent
479+
shipper shipperConfig
480+
limitMinTime thanosmodel.TimeOrDurationValue
481+
limitMaxMatchedSeries int
481482
}
482483

483484
func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
@@ -491,4 +492,5 @@ func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
491492
sc.shipper.registerFlag(cmd)
492493
cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
493494
Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime)
495+
cmd.Flag("max-matched-series", "Maximum number of series can be matched before reading series data").Default("0").IntVar(&sc.limitMaxMatchedSeries)
494496
}

pkg/promclient/promclient.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -838,3 +838,33 @@ func (c *Client) TargetsInGRPC(ctx context.Context, base *url.URL, stateTargets
838838
}
839839
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_targets HTTP[client]", &u, &v)
840840
}
841+
842+
func (c *Client) SeriesMatchCount(ctx context.Context, base *url.URL, matchers []*labels.Matcher, start, end int64) (int, error) {
843+
u := *base
844+
u.Path = path.Join(u.Path, "/api/v1/series")
845+
q := u.Query()
846+
847+
q.Add("match[]", storepb.PromMatchersToString(matchers...))
848+
q.Add("start", formatTime(timestamp.Time(start)))
849+
q.Add("end", formatTime(timestamp.Time(end)))
850+
q.Add("only_count", "1")
851+
u.RawQuery = q.Encode()
852+
853+
body, _, err := c.req2xx(ctx, &u, http.MethodGet)
854+
if err != nil {
855+
return -1, errors.Wrap(err, "read query instant response")
856+
}
857+
858+
var m struct {
859+
Status string `json:"status"`
860+
Data struct {
861+
MetricsCount int `json:"metrics_count"`
862+
} `json:"data"`
863+
}
864+
865+
if err = json.Unmarshal(body, &m); err != nil {
866+
return -1, errors.Wrap(err, "unmarshal query instant response")
867+
}
868+
869+
return m.Data.MetricsCount, nil
870+
}

pkg/store/prometheus.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,13 @@ type PrometheusStore struct {
5757
remoteReadAcceptableResponses []prompb.ReadRequest_ResponseType
5858

5959
framesRead prometheus.Histogram
60+
61+
limitMaxMatchedSeries int
6062
}
6163

64+
// ErrSeriesMatchLimitReached is an error returned by PrometheusStore when matched series limit is enabled and matched series count exceeds the limit.
65+
var ErrSeriesMatchLimitReached = errors.New("series match limit reached")
66+
6267
// Label{Values,Names} call with matchers is supported for Prometheus versions >= 2.24.0.
6368
// https://github.com/prometheus/prometheus/commit/caa173d2aac4c390546b1f78302104b1ccae0878.
6469
var baseVer, _ = semver.Make("2.24.0")
@@ -77,6 +82,7 @@ func NewPrometheusStore(
7782
externalLabelsFn func() labels.Labels,
7883
timestamps func() (mint int64, maxt int64),
7984
promVersion func() string,
85+
limitMaxMatchedSeries int,
8086
) (*PrometheusStore, error) {
8187
if logger == nil {
8288
logger = log.NewNopLogger()
@@ -101,6 +107,7 @@ func NewPrometheusStore(
101107
Buckets: prometheus.ExponentialBuckets(10, 10, 5),
102108
},
103109
),
110+
limitMaxMatchedSeries: limitMaxMatchedSeries,
104111
}
105112
return p, nil
106113
}
@@ -155,6 +162,17 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
155162
return status.Error(codes.InvalidArgument, "no matchers specified (excluding external labels)")
156163
}
157164

165+
if p.limitMaxMatchedSeries > 0 {
166+
matchedSeriesCount, err := p.client.SeriesMatchCount(s.Context(), p.base, matchers, r.MinTime, r.MaxTime)
167+
if err != nil {
168+
return errors.Wrap(err, "get series match count")
169+
}
170+
171+
if matchedSeriesCount > p.limitMaxMatchedSeries {
172+
return ErrSeriesMatchLimitReached
173+
}
174+
}
175+
158176
// Don't ask for more than available time. This includes potential `minTime` flag limit.
159177
availableMinTime, _ := p.timestamps()
160178
if r.MinTime < availableMinTime {

pkg/store/prometheus_test.go

Lines changed: 99 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {
6464
limitMinT := int64(0)
6565
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
6666
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
67-
func() (int64, int64) { return limitMinT, -1 }, nil) // Maxt does not matter.
67+
func() (int64, int64) { return limitMinT, -1 }, nil, 0) // Maxt does not matter.
6868
testutil.Ok(t, err)
6969

7070
// Query all three samples except for the first one. Since we round up queried data
@@ -191,7 +191,7 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) {
191191

192192
promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
193193
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
194-
func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, nil)
194+
func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, nil, 0)
195195
testutil.Ok(t, err)
196196

197197
for _, tcase := range []struct {
@@ -361,7 +361,7 @@ func TestPrometheusStore_LabelAPIs(t *testing.T) {
361361

362362
promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels {
363363
return extLset
364-
}, nil, func() string { return version })
364+
}, nil, func() string { return version }, 0)
365365
testutil.Ok(t, err)
366366

367367
return promStore
@@ -396,7 +396,7 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) {
396396

397397
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
398398
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
399-
func() (int64, int64) { return 0, math.MaxInt64 }, nil)
399+
func() (int64, int64) { return 0, math.MaxInt64 }, nil, 0)
400400
testutil.Ok(t, err)
401401
srv := newStoreSeriesServer(ctx)
402402

@@ -430,6 +430,99 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) {
430430
testutil.Equals(t, 0, len(srv.SeriesSet))
431431
}
432432

433+
func TestPrometheusStore_Series_LimitMaxMatchedSeries(t *testing.T) {
434+
defer testutil.TolerantVerifyLeak(t)
435+
436+
p, err := e2eutil.NewPrometheus()
437+
testutil.Ok(t, err)
438+
defer func() { testutil.Ok(t, p.Stop()) }()
439+
440+
baseT := timestamp.FromTime(time.Now()) / 1000 * 1000
441+
442+
a := p.Appender()
443+
_, err = a.Append(0, labels.FromStrings("a", "b", "b", "d"), baseT+100, 1)
444+
testutil.Ok(t, err)
445+
_, err = a.Append(0, labels.FromStrings("a", "c", "b", "d", "job", "test"), baseT+200, 2)
446+
testutil.Ok(t, err)
447+
_, err = a.Append(0, labels.FromStrings("a", "d", "b", "d", "job", "test"), baseT+300, 3)
448+
testutil.Ok(t, err)
449+
_, err = a.Append(0, labels.FromStrings("b", "d", "job", "test"), baseT+400, 4)
450+
testutil.Ok(t, err)
451+
testutil.Ok(t, a.Commit())
452+
453+
ctx, cancel := context.WithCancel(context.Background())
454+
defer cancel()
455+
456+
testutil.Ok(t, p.Start())
457+
458+
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
459+
testutil.Ok(t, err)
460+
461+
req := &storepb.SeriesRequest{
462+
SkipChunks: true,
463+
Matchers: []storepb.LabelMatcher{
464+
{Type: storepb.LabelMatcher_EQ, Name: "job", Value: "test"},
465+
},
466+
MinTime: baseT,
467+
MaxTime: baseT + 300,
468+
}
469+
470+
expected2Series := []storepb.Series{
471+
{
472+
Labels: []labelpb.ZLabel{{Name: "a", Value: "c"}, {Name: "b", Value: "d"}, {Name: "job", Value: "test"}, {Name: "region", Value: "eu-west"}},
473+
},
474+
{
475+
Labels: []labelpb.ZLabel{{Name: "a", Value: "d"}, {Name: "b", Value: "d"}, {Name: "job", Value: "test"}, {Name: "region", Value: "eu-west"}},
476+
},
477+
}
478+
479+
for _, tcase := range []struct {
480+
req *storepb.SeriesRequest
481+
expected []storepb.Series
482+
expectedErr error
483+
limitMaxMatchedSeries int
484+
}{
485+
// limit is not active
486+
{
487+
limitMaxMatchedSeries: 0,
488+
req: req,
489+
expected: expected2Series,
490+
},
491+
// should return limit error as 'limit < matched series'
492+
{
493+
limitMaxMatchedSeries: 1,
494+
req: req,
495+
expected: expected2Series,
496+
expectedErr: ErrSeriesMatchLimitReached,
497+
},
498+
// should succeed as limit is not reached
499+
{
500+
limitMaxMatchedSeries: 2,
501+
req: req,
502+
expected: expected2Series,
503+
},
504+
} {
505+
t.Run("", func(t *testing.T) {
506+
promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
507+
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
508+
func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, nil,
509+
tcase.limitMaxMatchedSeries)
510+
testutil.Ok(t, err)
511+
512+
srv := newStoreSeriesServer(ctx)
513+
err = promStore.Series(tcase.req, srv)
514+
if tcase.expectedErr != nil {
515+
testutil.NotOk(t, err)
516+
testutil.Equals(t, tcase.expectedErr.Error(), err.Error())
517+
return
518+
}
519+
testutil.Ok(t, err)
520+
testutil.Equals(t, []string(nil), srv.Warnings)
521+
testutil.Equals(t, tcase.expected, srv.SeriesSet)
522+
})
523+
}
524+
}
525+
433526
func TestPrometheusStore_Info(t *testing.T) {
434527
defer testutil.TolerantVerifyLeak(t)
435528

@@ -438,7 +531,7 @@ func TestPrometheusStore_Info(t *testing.T) {
438531

439532
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), nil, component.Sidecar,
440533
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
441-
func() (int64, int64) { return 123, 456 }, nil)
534+
func() (int64, int64) { return 123, 456 }, nil, 0)
442535
testutil.Ok(t, err)
443536

444537
resp, err := proxy.Info(ctx, &storepb.InfoRequest{})
@@ -516,7 +609,7 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testin
516609

517610
proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar,
518611
func() labels.Labels { return labels.FromStrings("region", "eu-west") },
519-
func() (int64, int64) { return 0, math.MaxInt64 }, nil)
612+
func() (int64, int64) { return 0, math.MaxInt64 }, nil, 0)
520613
testutil.Ok(t, err)
521614

522615
// We build chunks only for SAMPLES method. Make sure we ask for SAMPLES only.

0 commit comments

Comments
 (0)