diff --git a/promql/engine_test.go b/promql/engine_test.go index 09f102bed2..01879956f6 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -205,6 +205,10 @@ func (*errQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]s return nil, nil, nil } +func (*errQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) storage.LabelValues { + return storage.EmptyLabelValues() +} + func (*errQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) { return nil, nil, nil } diff --git a/storage/fanout_test.go b/storage/fanout_test.go index 913e2fe24e..a6156e1a11 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -240,6 +240,10 @@ func (errQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]st return nil, nil, errors.New("label values error") } +func (errQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) storage.LabelValues { + return storage.ErrLabelValues(errors.New("label values stream error")) +} + func (errQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) { return nil, nil, errors.New("label names error") } diff --git a/storage/interface.go b/storage/interface.go index 166501873f..94126da715 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -126,6 +126,10 @@ func (q *MockQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ( return nil, nil, nil } +func (q *MockQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) LabelValues { + return EmptyLabelValues() +} + func (q *MockQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) { return nil, nil, nil } @@ -163,6 +167,12 @@ type LabelQuerier interface { // to label values of metrics matching the matchers. LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) + // LabelValuesStream returns an iterator over all potential values for a label name. + // It is not safe to use the strings beyond the lifetime of the querier. + // If matchers are specified the returned result set is reduced + // to label values of metrics matching the matchers. + LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) LabelValues + // LabelNames returns all the unique label names present in the block in sorted order. // If matchers are specified the returned result set is reduced // to label names of metrics matching the matchers. diff --git a/storage/labelvalues.go b/storage/labelvalues.go index b14a69b9e3..c3dd8ec594 100644 --- a/storage/labelvalues.go +++ b/storage/labelvalues.go @@ -35,13 +35,15 @@ func EmptyLabelValues() LabelValues { // ListLabelValues is an iterator over a slice of label values. type ListLabelValues struct { - cur string - values []string + cur string + values []string + warnings annotations.Annotations } -func NewListLabelValues(values []string) *ListLabelValues { +func NewListLabelValues(values []string, warnings annotations.Annotations) *ListLabelValues { return &ListLabelValues{ - values: values, + values: values, + warnings: warnings, } } @@ -63,8 +65,8 @@ func (*ListLabelValues) Err() error { return nil } -func (*ListLabelValues) Warnings() annotations.Annotations { - return nil +func (l *ListLabelValues) Warnings() annotations.Annotations { + return l.warnings } func (*ListLabelValues) Close() error { diff --git a/storage/merge.go b/storage/merge.go index d64388e884..c08e367493 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -20,6 +20,7 @@ import ( "fmt" "math" "sync" + "unicode/utf8" "golang.org/x/exp/slices" @@ -29,6 +30,8 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/util/annotations" + + "github.com/bboreham/go-loser" ) type mergeGenericQuerier struct { @@ -221,6 +224,72 @@ func mergeStrings(a, b []string) []string { return res } +// LabelValuesStream implements LabelQuerier. +func (q *mergeGenericQuerier) LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) LabelValues { + if len(q.queriers) == 0 { + return EmptyLabelValues() + } + if len(q.queriers) == 1 { + return q.queriers[0].LabelValuesStream(ctx, name, matchers...) + } + + its := make([]LabelValues, 0, len(q.queriers)) + for _, sq := range q.queriers { + its = append(its, sq.LabelValuesStream(ctx, name, matchers...)) + } + + lt := loser.New(its, string(utf8.MaxRune)) + return &mergedLabelValues{ + lt: lt, + lvs: its, + } +} + +// mergedLabelValues is a label values iterator merging a collection of sub-iterators. +type mergedLabelValues struct { + lt *loser.Tree[string, LabelValues] + lvs []LabelValues + cur string +} + +func (m *mergedLabelValues) Next() bool { + for m.lt.Next() { + // Remove duplicate entries + at := m.lt.At() + if at != m.cur { + m.cur = at + return true + } + } + + return false +} + +func (m *mergedLabelValues) At() string { + return m.cur +} + +func (m *mergedLabelValues) Err() error { + for _, lv := range m.lvs { + if err := lv.Err(); err != nil { + return err + } + } + return nil +} + +func (m *mergedLabelValues) Warnings() annotations.Annotations { + var warnings annotations.Annotations + for _, lv := range m.lvs { + warnings = warnings.Merge(lv.Warnings()) + } + return warnings +} + +func (m *mergedLabelValues) Close() error { + return nil +} + // LabelNames returns all the unique label names present in all queriers in sorted order. func (q *mergeGenericQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { var ( diff --git a/storage/merge_test.go b/storage/merge_test.go index 4632b170da..fda832554c 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -1387,6 +1387,23 @@ func (m *mockGenericQuerier) LabelValues(_ context.Context, name string, matcher return m.resp, m.warnings, m.err } +func (m *mockGenericQuerier) LabelValuesStream(_ context.Context, name string, matchers ...*labels.Matcher) LabelValues { + m.mtx.Lock() + m.labelNamesRequested = append(m.labelNamesRequested, labelNameRequest{ + name: name, + matchers: matchers, + }) + m.mtx.Unlock() + + if m.err == nil { + return NewListLabelValues(m.resp, m.warnings) + } + return errLabelValues{ + err: m.err, + warnings: m.warnings, + } +} + func (m *mockGenericQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) { m.mtx.Lock() m.labelNamesCalls++ @@ -1456,8 +1473,9 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { expectedLabels []string expectedWarnings annotations.Annotations - expectedErrs [4]error + expectedErrs [5]error }{ + {}, { name: "one successful primary querier", queriers: []genericQuerier{&mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}}, @@ -1483,7 +1501,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { { name: "one failed primary querier", queriers: []genericQuerier{&mockGenericQuerier{warnings: nil, err: errStorage}}, - expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage}, + expectedErrs: [5]error{errStorage, errStorage, errStorage, errStorage, errStorage}, }, { name: "one successful primary querier with successful secondaries", @@ -1519,7 +1537,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}}, &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}}, }, - expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage}, + expectedErrs: [5]error{errStorage, errStorage, errStorage, errStorage, errStorage}, }, { name: "one successful primary querier with failed secondaries", @@ -1605,11 +1623,36 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { require.Equal(t, []labelNameRequest{{name: "test"}}, m.labelNamesRequested) } }) + t.Run("LabelValuesStream", func(t *testing.T) { + it := q.LabelValuesStream(ctx, "test2") + require.NotNil(t, it) + var res []string + for it.Next() { + res = append(res, it.At()) + } + require.Equal(t, tcase.expectedWarnings, it.Warnings()) + err := it.Err() + require.ErrorIs(t, err, tcase.expectedErrs[3], "expected error doesn't match") + if err != nil { + return + } + + require.Equal(t, tcase.expectedLabels, res) + + for _, qr := range q.queriers { + m := unwrapMockGenericQuerier(t, qr) + + require.Equal(t, []labelNameRequest{ + {name: "test"}, + {name: "test2"}, + }, m.labelNamesRequested) + } + }) t.Run("LabelValuesWithMatchers", func(t *testing.T) { matcher := labels.MustNewMatcher(labels.MatchEqual, "otherLabel", "someValue") - res, w, err := q.LabelValues(ctx, "test2", matcher) + res, w, err := q.LabelValues(ctx, "test3", matcher) require.Subset(t, tcase.expectedWarnings, w) - require.ErrorIs(t, err, tcase.expectedErrs[3], "expected error doesn't match") + require.ErrorIs(t, err, tcase.expectedErrs[4], "expected error doesn't match") require.Equal(t, tcase.expectedLabels, res) if err != nil { @@ -1620,7 +1663,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { require.Equal(t, []labelNameRequest{ {name: "test"}, - {name: "test2", matchers: []*labels.Matcher{matcher}}, + {name: "test2"}, + {name: "test3", matchers: []*labels.Matcher{matcher}}, }, m.labelNamesRequested) } }) diff --git a/storage/noop.go b/storage/noop.go index be5741ddd8..1dd485573d 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -35,6 +35,10 @@ func (noopQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]s return nil, nil, nil } +func (noopQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) LabelValues { + return EmptyLabelValues() +} + func (noopQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) { return nil, nil, nil } @@ -58,6 +62,10 @@ func (noopChunkQuerier) LabelValues(context.Context, string, ...*labels.Matcher) return nil, nil, nil } +func (noopChunkQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) LabelValues { + return EmptyLabelValues() +} + func (noopChunkQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) { return nil, nil, nil } diff --git a/storage/remote/read.go b/storage/remote/read.go index 723030091a..2f41e5faf2 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -209,12 +209,18 @@ func (q querier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, []s return ms, names } -// LabelValues implements storage.Querier and is a noop. +// LabelValues implements storage.LabelQuerier and is a noop. func (q *querier) LabelValues(context.Context, string, ...*labels.Matcher) ([]string, annotations.Annotations, error) { // TODO: Implement: https://github.com/prometheus/prometheus/issues/3351 return nil, nil, errors.New("not implemented") } +// LabelValuesStream implements storage.LabelQuerier and is a noop. +func (q *querier) LabelValuesStream(context.Context, string, ...*labels.Matcher) storage.LabelValues { + // TODO: Implement: https://github.com/prometheus/prometheus/issues/3351 + return storage.ErrLabelValues(errors.New("not implemented")) +} + // LabelNames implements storage.Querier and is a noop. func (q *querier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) { // TODO: Implement: https://github.com/prometheus/prometheus/issues/3351 diff --git a/storage/secondary.go b/storage/secondary.go index 44d9781835..f63160db86 100644 --- a/storage/secondary.go +++ b/storage/secondary.go @@ -57,6 +57,28 @@ func (s *secondaryQuerier) LabelValues(ctx context.Context, name string, matcher return vals, w, nil } +func (s *secondaryQuerier) LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) LabelValues { + return &secondaryLabelValues{ + s.genericQuerier.LabelValuesStream(ctx, name, matchers...), + } +} + +type secondaryLabelValues struct { + LabelValues +} + +func (s *secondaryLabelValues) Err() error { + return nil +} + +func (s *secondaryLabelValues) Warnings() annotations.Annotations { + ws := s.LabelValues.Warnings() + if s.LabelValues.Err() != nil { + ws.Add(s.LabelValues.Err()) + } + return ws +} + func (s *secondaryQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { names, w, err := s.genericQuerier.LabelNames(ctx, matchers...) if err != nil { diff --git a/tsdb/block.go b/tsdb/block.go index 9458852579..982bebe775 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -61,6 +61,8 @@ type IndexWriter interface { // IndexReader provides reading access of serialized index data. type IndexReader interface { + index.LabelsGetter + // Symbols return an iterator over sorted string symbols that may occur in // series' labels and indices. It is not safe to use the returned strings // beyond the lifetime of the index reader. @@ -72,12 +74,18 @@ type IndexReader interface { // LabelValues returns possible label values which may not be sorted. LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) + // LabelValuesStream returns an iterator over matching label values. + LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) storage.LabelValues + // Postings returns the postings list iterator for the label pairs. // The Postings here contain the offsets to the series inside the index. // Found IDs are not strictly required to point to a valid Series, e.g. // during background garbage collections. Postings(ctx context.Context, name string, values ...string) (index.Postings, error) + // PostingsForRegexp returns an iterator over postings matching the provided regexp label matcher. + PostingsForRegexp(ctx context.Context, m *labels.Matcher) index.Postings + // PostingsForMatchers assembles a single postings iterator based on the given matchers. // The resulting postings are not ordered by series. // If concurrent hint is set to true, call will be optimized for a (most likely) concurrent call with same matchers, @@ -526,6 +534,21 @@ func (r blockIndexReader) LabelValues(ctx context.Context, name string, matchers return labelValuesWithMatchers(ctx, r.ir, name, matchers...) } +func (r blockIndexReader) LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) storage.LabelValues { + ownMatchers := 0 + for _, m := range matchers { + if m.Name == name { + ownMatchers++ + } + } + if ownMatchers == len(matchers) { + return r.ir.LabelValuesStream(ctx, name, matchers...) + } + + // There are matchers on other label names than the requested one, so will need to intersect matching series + return labelValuesForMatchersStream(ctx, r.ir, name, matchers) +} + func (r blockIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, error) { if len(matchers) == 0 { return r.b.LabelNames(ctx) @@ -542,6 +565,10 @@ func (r blockIndexReader) Postings(ctx context.Context, name string, values ...s return p, nil } +func (r blockIndexReader) PostingsForRegexp(ctx context.Context, m *labels.Matcher) index.Postings { + return r.ir.PostingsForRegexp(ctx, m) +} + func (r blockIndexReader) PostingsForMatchers(ctx context.Context, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { return r.ir.PostingsForMatchers(ctx, concurrent, ms...) } @@ -572,6 +599,13 @@ func (r blockIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB return nil } +func (r blockIndexReader) Labels(ref storage.SeriesRef, builder *labels.ScratchBuilder) error { + if err := r.ir.Labels(ref, builder); err != nil { + return fmt.Errorf("block: %s: %w", r.b.Meta().ULID, err) + } + return nil +} + func (r blockIndexReader) Close() error { r.b.pendingReaders.Done() return nil diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 0d69998d10..157e32bfb3 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -306,6 +306,87 @@ func TestLabelValuesWithMatchers(t *testing.T) { } } +func TestLabelValuesStream_WithMatchers(t *testing.T) { + var seriesEntries []storage.Series + for i := 0; i < 100; i++ { + seriesEntries = append(seriesEntries, storage.NewListSeries(labels.FromStrings( + "tens", fmt.Sprintf("value%d", i/10), + "unique", fmt.Sprintf("value%d", i), + ), []chunks.Sample{sample{100, 0, nil, nil}})) + } + + ctx := context.Background() + + blockDir := createBlock(t, t.TempDir(), seriesEntries) + files, err := sequenceFiles(chunkDir(blockDir)) + require.NoError(t, err) + require.NotEmpty(t, files, "No chunk created.") + + block, err := OpenBlock(nil, blockDir, nil) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, block.Close()) }) + + indexReader, err := block.Index() + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, indexReader.Close()) }) + + var uniqueWithout30s []string + for i := 0; i < 100; i++ { + if i/10 != 3 { + uniqueWithout30s = append(uniqueWithout30s, fmt.Sprintf("value%d", i)) + } + } + sort.Strings(uniqueWithout30s) + testCases := []struct { + name string + labelName string + matchers []*labels.Matcher + expectedValues []string + }{ + { + name: "get tens based on unique ID", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "value35")}, + expectedValues: []string{"value3"}, + }, { + name: "get unique IDs based on a ten", + labelName: "unique", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "tens", "value1")}, + expectedValues: []string{"value10", "value11", "value12", "value13", "value14", "value15", "value16", "value17", "value18", "value19"}, + }, { + name: "get tens by pattern matching on unique ID", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "unique", "value[5-7]5")}, + expectedValues: []string{"value5", "value6", "value7"}, + }, { + name: "get tens by matching for presence of unique label", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "unique", "")}, + expectedValues: []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"}, + }, { + name: "get unique IDs based on tens not being equal to a certain value, while not empty", + labelName: "unique", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotEqual, "tens", "value3"), + labels.MustNewMatcher(labels.MatchNotEqual, "tens", ""), + }, + expectedValues: uniqueWithout30s, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + it := indexReader.LabelValuesStream(ctx, tt.labelName, tt.matchers...) + var values []string + for it.Next() { + values = append(values, it.At()) + } + require.NoError(t, it.Err()) + require.Empty(t, it.Warnings()) + require.Equal(t, tt.expectedValues, values) + }) + } +} + // TestBlockSize ensures that the block size is calculated correctly. func TestBlockSize(t *testing.T) { tmpdir := t.TempDir() diff --git a/tsdb/db_bench_test.go b/tsdb/db_bench_test.go new file mode 100644 index 0000000000..498d0cacd7 --- /dev/null +++ b/tsdb/db_bench_test.go @@ -0,0 +1,167 @@ +package tsdb + +import ( + "context" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/labels" +) + +func BenchmarkQuerier_LabelValuesStream(b *testing.B) { + db := openTestDB(b, nil, []int64{1000}) + b.Cleanup(func() { + require.NoError(b, db.Close()) + }) + + ctx := context.Background() + + app := db.head.Appender(ctx) + // var seriesEntries []storage.Series + addSeries := func(l labels.Labels) { + app.Append(0, l, 0, 0) + // seriesEntries = append(seriesEntries, storage.NewListSeries(l, tsdbutil.GenerateSamples(1, 1))) + } + + for n := 0; n < 10; n++ { + for i := 0; i < 100000; i++ { + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "foo", "i_times_n", strconv.Itoa(i*n))) + // Have some series that won't be matched, to properly test inverted matchers. + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "0_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "1_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "2_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "foo")) + } + } + require.NoError(b, app.Commit()) + // createBlock(b, db.Dir(), seriesEntries) + require.NoError(b, db.reloadBlocks()) + + querier, err := db.Querier(0, 1) + require.NoError(b, err) + b.Cleanup(func() { require.NoError(b, querier.Close()) }) + + i1 := labels.MustNewMatcher(labels.MatchEqual, "i", "1") + iStar := labels.MustNewMatcher(labels.MatchRegexp, "i", "^.*$") + jNotFoo := labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo") + jXXXYYY := labels.MustNewMatcher(labels.MatchRegexp, "j", "XXX|YYY") + jXplus := labels.MustNewMatcher(labels.MatchRegexp, "j", "X.+") + n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix) + nX := labels.MustNewMatcher(labels.MatchNotEqual, "n", "X"+postingsBenchSuffix) + nPlus := labels.MustNewMatcher(labels.MatchRegexp, "i", "^.+$") + primesTimes := labels.MustNewMatcher(labels.MatchEqual, "i_times_n", "533701") // = 76243*7, ie. multiplication of primes. It will match single i*n combination. + nonPrimesTimes := labels.MustNewMatcher(labels.MatchEqual, "i_times_n", "20") // 1*20, 2*10, 4*5, 5*4 + times12 := labels.MustNewMatcher(labels.MatchRegexp, "i_times_n", "12.*") + + cases := []struct { + name string + labelName string + matchers []*labels.Matcher + }{ + {`i with i="1"`, "i", []*labels.Matcher{i1}}, + // i has 100k values. + {`i with n="1"`, "i", []*labels.Matcher{n1}}, + {`i with n="^.+$"`, "i", []*labels.Matcher{nPlus}}, + {`i with n="1",j!="foo"`, "i", []*labels.Matcher{n1, jNotFoo}}, + {`i with n="1",j=~"X.+"`, "i", []*labels.Matcher{n1, jXplus}}, + {`i with n="1",j=~"XXX|YYY"`, "i", []*labels.Matcher{n1, jXXXYYY}}, + {`i with n="X",j!="foo"`, "i", []*labels.Matcher{nX, jNotFoo}}, + {`i with n="1",i=~"^.*$",j!="foo"`, "i", []*labels.Matcher{n1, iStar, jNotFoo}}, + {`i with i_times_n=533701`, "i", []*labels.Matcher{primesTimes}}, + {`i with i_times_n=20`, "i", []*labels.Matcher{nonPrimesTimes}}, + {`i with i_times_n=~"12.*""`, "i", []*labels.Matcher{times12}}, + // n has 10 values. + {`n with j!="foo"`, "n", []*labels.Matcher{jNotFoo}}, + {`n with i="1"`, "n", []*labels.Matcher{i1}}, + } + for _, c := range cases { + b.Run(c.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + it := querier.LabelValuesStream(ctx, c.labelName, c.matchers...) + for it.Next() { + } + require.NoError(b, it.Err()) + require.Empty(b, it.Warnings()) + } + }) + } +} + +func BenchmarkQuerier_LabelValues(b *testing.B) { + db := openTestDB(b, nil, []int64{1000}) + b.Cleanup(func() { + require.NoError(b, db.Close()) + }) + + ctx := context.Background() + + app := db.head.Appender(ctx) + // var seriesEntries []storage.Series + addSeries := func(l labels.Labels) { + app.Append(0, l, 0, 0) + // seriesEntries = append(seriesEntries, storage.NewListSeries(l, tsdbutil.GenerateSamples(1, 1))) + } + + for n := 0; n < 10; n++ { + for i := 0; i < 100000; i++ { + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "foo", "i_times_n", strconv.Itoa(i*n))) + // Have some series that won't be matched, to properly test inverted matchers. + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "0_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "1_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "2_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "foo")) + } + } + require.NoError(b, app.Commit()) + // createBlock(b, db.Dir(), seriesEntries) + require.NoError(b, db.reloadBlocks()) + + querier, err := db.Querier(0, 1) + require.NoError(b, err) + b.Cleanup(func() { require.NoError(b, querier.Close()) }) + + i1 := labels.MustNewMatcher(labels.MatchEqual, "i", "1") + iStar := labels.MustNewMatcher(labels.MatchRegexp, "i", "^.*$") + jNotFoo := labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo") + jXXXYYY := labels.MustNewMatcher(labels.MatchRegexp, "j", "XXX|YYY") + jXplus := labels.MustNewMatcher(labels.MatchRegexp, "j", "X.+") + n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix) + nX := labels.MustNewMatcher(labels.MatchNotEqual, "n", "X"+postingsBenchSuffix) + nPlus := labels.MustNewMatcher(labels.MatchRegexp, "i", "^.+$") + primesTimes := labels.MustNewMatcher(labels.MatchEqual, "i_times_n", "533701") // = 76243*7, ie. multiplication of primes. It will match single i*n combination. + nonPrimesTimes := labels.MustNewMatcher(labels.MatchEqual, "i_times_n", "20") // 1*20, 2*10, 4*5, 5*4 + times12 := labels.MustNewMatcher(labels.MatchRegexp, "i_times_n", "12.*") + + cases := []struct { + name string + labelName string + matchers []*labels.Matcher + }{ + {`i with i="1"`, "i", []*labels.Matcher{i1}}, + // i has 100k values. + {`i with n="1"`, "i", []*labels.Matcher{n1}}, + {`i with n="^.+$"`, "i", []*labels.Matcher{nPlus}}, + {`i with n="1",j!="foo"`, "i", []*labels.Matcher{n1, jNotFoo}}, + {`i with n="1",j=~"X.+"`, "i", []*labels.Matcher{n1, jXplus}}, + {`i with n="1",j=~"XXX|YYY"`, "i", []*labels.Matcher{n1, jXXXYYY}}, + {`i with n="X",j!="foo"`, "i", []*labels.Matcher{nX, jNotFoo}}, + {`i with n="1",i=~"^.*$",j!="foo"`, "i", []*labels.Matcher{n1, iStar, jNotFoo}}, + {`i with i_times_n=533701`, "i", []*labels.Matcher{primesTimes}}, + {`i with i_times_n=20`, "i", []*labels.Matcher{nonPrimesTimes}}, + {`i with i_times_n=~"12.*""`, "i", []*labels.Matcher{times12}}, + // n has 10 values. + {`n with j!="foo"`, "n", []*labels.Matcher{jNotFoo}}, + {`n with i="1"`, "n", []*labels.Matcher{i1}}, + } + for _, c := range cases { + b.Run(c.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + _, warnings, err := querier.LabelValues(ctx, c.labelName, c.matchers...) + require.NoError(b, err) + require.Empty(b, warnings) + } + }) + } +} diff --git a/tsdb/db_test.go b/tsdb/db_test.go index f2293e8277..51489eb57b 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -6957,3 +6957,123 @@ Outer: require.NoError(t, writerErr) } + +func TestQuerier_LabelValuesStream(t *testing.T) { + db := openTestDB(t, nil, nil) + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + ctx := context.Background() + + var seriesEntries []storage.Series + // Add a block of 70 series with timestamp 1 + for i := 0; i < 70; i++ { + seriesEntries = append(seriesEntries, storage.NewListSeries(labels.FromStrings( + "tens", fmt.Sprintf("value%d", i/10), + "unique", fmt.Sprintf("value%d", i), + ), chunks.GenerateSamples(1, 1))) + } + createBlock(t, db.Dir(), seriesEntries) + + // Add a block of 50 series with timestamp 2 + // Since "tens" start at 50, two of the label values ("value5", "value6") will overlap with the + // previous block + seriesEntries = seriesEntries[:0] + for i := 50; i < 100; i++ { + seriesEntries = append(seriesEntries, storage.NewListSeries(labels.FromStrings( + "tens", fmt.Sprintf("value%d", i/10), + "unique", fmt.Sprintf("value%d", i), + ), chunks.GenerateSamples(2, 1))) + } + createBlock(t, db.Dir(), seriesEntries) + + require.NoError(t, db.reloadBlocks()) + + querier, err := db.Querier(1, 2) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, querier.Close()) }) + + t.Run("without matchers", func(t *testing.T) { + it := querier.LabelValuesStream(ctx, "tens") + var values []string + for it.Next() { + values = append(values, it.At()) + } + require.NoError(t, it.Err()) + + require.Equal(t, []string{ + "value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9", + }, values) + }) + + t.Run("with matchers", func(t *testing.T) { + var uniqueWithout30s []string + for i := 0; i < 100; i++ { + if i/10 != 3 { + uniqueWithout30s = append(uniqueWithout30s, fmt.Sprintf("value%d", i)) + } + } + sort.Strings(uniqueWithout30s) + testCases := []struct { + name string + label string + matchers []*labels.Matcher + expLabels []string + }{ + { + name: "matching on requested label", + label: "tens", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "tens", "value1"), + }, + expLabels: []string{"value1"}, + }, + { + name: "unsuccessful matching on requested label", + label: "tens", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "tens", "value10"), + }, + expLabels: nil, + }, + { + name: "matching on other label", + label: "tens", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "unique", "value51"), + }, + expLabels: []string{"value5"}, + }, + { + name: "tens for empty unique ID", + label: "tens", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "unique", ""), + }, + expLabels: nil, + }, + { + name: "get unique IDs based on tens not being equal to a certain value, while not empty", + label: "unique", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotEqual, "tens", "value3"), + labels.MustNewMatcher(labels.MatchNotEqual, "tens", ""), + }, + expLabels: uniqueWithout30s, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + it := querier.LabelValuesStream(ctx, tc.label, tc.matchers...) + var values []string + for it.Next() { + values = append(values, it.At()) + } + require.NoError(t, it.Err()) + + require.Equal(t, tc.expLabels, values) + }) + } + }) +} diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 20d7c7780c..375704d14b 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -87,6 +87,30 @@ func (h *headIndexReader) LabelValues(ctx context.Context, name string, matchers return labelValuesWithMatchers(ctx, h, name, matchers...) } +// LabelValuesStream returns an iterator over label values present in +// the head for the specific label name that are within the time range +// mint to maxt. +// If matchers are specified the returned result set is reduced +// to label values of metrics matching the matchers. +func (h *headIndexReader) LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) storage.LabelValues { + if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() { + return storage.EmptyLabelValues() + } + + ownMatchers := 0 + for _, m := range matchers { + if m.Name == name { + ownMatchers++ + } + } + if ownMatchers == len(matchers) { + return h.head.postings.LabelValuesStream(ctx, name, matchers...) + } + + // There are matchers on other label names than the requested one, so will need to intersect matching series + return labelValuesForMatchersStream(ctx, h, name, matchers) +} + // LabelNames returns all the unique label names present in the head // that are within the time range mint to maxt. func (h *headIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, error) { @@ -121,6 +145,10 @@ func (h *headIndexReader) Postings(ctx context.Context, name string, values ...s } } +func (h *headIndexReader) PostingsForRegexp(ctx context.Context, m *labels.Matcher) index.Postings { + return h.head.postings.PostingsForRegexp(ctx, m) +} + func (h *headIndexReader) PostingsForMatchers(ctx context.Context, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { return h.head.pfmc.PostingsForMatchers(ctx, h, concurrent, ms...) } @@ -176,7 +204,7 @@ func (h *headIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCou // LabelValuesFor returns LabelValues for the given label name in the series referred to by postings. func (h *headIndexReader) LabelValuesFor(postings index.Postings, name string) storage.LabelValues { - return h.head.postings.LabelValuesFor(postings, name) + return h.head.postings.LabelValuesFor(postings, name, h) } // LabelValuesExcluding returns LabelValues for the given label name in all other series than those referred to by postings. @@ -185,6 +213,17 @@ func (h *headIndexReader) LabelValuesExcluding(postings index.Postings, name str return h.head.postings.LabelValuesExcluding(postings, name) } +// Labels reads the series with the given ref and writes its labels into builder. +func (h *headIndexReader) Labels(ref storage.SeriesRef, builder *labels.ScratchBuilder) error { + s := h.head.series.getByID(chunks.HeadSeriesRef(ref)) + if s == nil { + h.head.metrics.seriesNotFound.Inc() + return storage.ErrNotFound + } + builder.Assign(s.lset) + return nil +} + // Series returns the series for the given reference. func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { s := h.head.series.getByID(chunks.HeadSeriesRef(ref)) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 2f8d1f848e..030430a927 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2837,6 +2837,89 @@ func TestHeadLabelValuesWithMatchers(t *testing.T) { } } +func TestHeadLabelValuesStream_WithMatchers(t *testing.T) { + head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) + t.Cleanup(func() { require.NoError(t, head.Close()) }) + + ctx := context.Background() + + app := head.Appender(ctx) + for i := 0; i < 100; i++ { + _, err := app.Append(0, labels.FromStrings( + "tens", fmt.Sprintf("value%d", i/10), + "unique", fmt.Sprintf("value%d", i), + ), 100, 0) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + indexReader := head.indexRange(0, 200) + t.Cleanup(func() { + require.NoError(t, indexReader.Close()) + }) + + var uniqueWithout30s []string + for i := 0; i < 100; i++ { + if i/10 != 3 { + uniqueWithout30s = append(uniqueWithout30s, fmt.Sprintf("value%d", i)) + } + } + sort.Strings(uniqueWithout30s) + testCases := []struct { + name string + labelName string + matchers []*labels.Matcher + expectedValues []string + }{ + { + name: "get tens based on unique ID", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "value35")}, + expectedValues: []string{"value3"}, + }, { + name: "get unique IDs based on a ten", + labelName: "unique", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "tens", "value1")}, + expectedValues: []string{"value10", "value11", "value12", "value13", "value14", "value15", "value16", "value17", "value18", "value19"}, + }, { + name: "get tens by pattern matching on unique ID", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "unique", "value[5-7]5")}, + expectedValues: []string{"value5", "value6", "value7"}, + }, { + name: "get tens by matching for presence of unique label", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "unique", "")}, + expectedValues: []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"}, + }, { + name: "get unique IDs based on tens not being equal to a certain value, while not empty", + labelName: "unique", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotEqual, "tens", "value3"), + labels.MustNewMatcher(labels.MatchNotEqual, "tens", ""), + }, + expectedValues: uniqueWithout30s, + }, { + name: "get tens for empty unique ID", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "")}, + expectedValues: nil, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + it := indexReader.LabelValuesStream(ctx, tt.labelName, tt.matchers...) + var values []string + for it.Next() { + values = append(values, it.At()) + } + require.NoError(t, it.Err()) + require.Empty(t, it.Warnings()) + require.Equal(t, tt.expectedValues, values) + }) + } +} + func TestHeadLabelNamesWithMatchers(t *testing.T) { head, _ := newTestHead(t, 1000, wlog.CompressionNone, false) defer func() { diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 181e80570a..7b4d8c140d 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -25,6 +25,7 @@ import ( "math" "os" "path/filepath" + "reflect" "sort" "unsafe" @@ -1266,6 +1267,10 @@ func newReader(b ByteSlice, c io.Closer, cacheProvider ReaderCacheProvider) (*Re return r, nil } +func (r *Reader) Labels(ref storage.SeriesRef, builder *labels.ScratchBuilder) error { + return r.Series(ref, builder, nil) +} + // Version returns the file format version of the underlying index. func (r *Reader) Version() int { return r.version @@ -1561,6 +1566,22 @@ func (r *Reader) LabelValues(ctx context.Context, name string, matchers ...*labe return values, ctx.Err() } +func (r *Reader) LabelValuesStream(_ context.Context, name string, matchers ...*labels.Matcher) storage.LabelValues { + if r.version == FormatV1 { + p := r.postingsV1[name] + if len(p) == 0 { + return storage.EmptyLabelValues() + } + return &labelValuesV1{ + matchers: matchers, + it: reflect.ValueOf(p).MapRange(), + name: name, + } + } + + return r.newLabelValuesV2(name, matchers) +} + // LabelNamesFor returns all the label names for the series referred to by IDs. // The names returned are sorted. func (r *Reader) LabelNamesFor(ctx context.Context, ids ...storage.SeriesRef) ([]string, error) { @@ -1757,6 +1778,77 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P return Merge(ctx, res...), nil } +func (r *Reader) PostingsForRegexp(ctx context.Context, m *labels.Matcher) Postings { + if r.version == FormatV1 { + e := r.postingsV1[m.Name] + if len(e) == 0 { + return EmptyPostings() + } + + var its []Postings + for val, offset := range e { + if !m.Matches(val) { + continue + } + + // Read from the postings table. + d := encoding.NewDecbufAt(r.b, int(offset), castagnoliTable) + _, p, err := r.dec.Postings(d.Get()) + if err != nil { + return ErrPostings(fmt.Errorf("decode postings: %w", err)) + } + + its = append(its, p) + } + + return Merge(ctx, its...) + } + + e := r.postings[m.Name] + if len(e) == 0 { + return EmptyPostings() + } + + d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil) + d.Skip(e[0].off) + lastVal := e[len(e)-1].value + + var its []Postings + skip := 0 + for d.Err() == nil && ctx.Err() == nil { + if skip == 0 { + // These are always the same number of bytes, + // and it's faster to skip than to parse. + skip = d.Len() + d.Uvarint() // Keycount. + d.UvarintBytes() // Label name. + skip -= d.Len() + } else { + d.Skip(skip) + } + s := yoloString(d.UvarintBytes()) // Label value. + postingsOff := d.Uvarint64() // Offset. + if m.Matches(s) { + // We want this postings iterator since the value is a match + d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) + _, p, err := r.dec.Postings(d2.Get()) + if err != nil { + return ErrPostings(fmt.Errorf("decode postings: %w", err)) + } + its = append(its, p) + } + + if s == lastVal { + break + } + } + if d.Err() != nil { + return ErrPostings(fmt.Errorf("get postings offset entry: %w", d.Err())) + } + + return Merge(ctx, its...) +} + // SortedPostings returns the given postings list reordered so that the backing series // are sorted. func (r *Reader) SortedPostings(p Postings) Postings { diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index f68657637e..88ebe96975 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -135,6 +135,15 @@ func (m mockIndex) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, return nil } +func (m mockIndex) Labels(ref storage.SeriesRef, builder *labels.ScratchBuilder) error { + s, ok := m.series[ref] + if !ok { + return errors.New("not found") + } + builder.Assign(s.l) + return nil +} + func TestIndexRW_Create_Open(t *testing.T) { dir := t.TempDir() diff --git a/tsdb/index/labelvalues.go b/tsdb/index/labelvalues.go index 9680e5d695..88c8c4e9f0 100644 --- a/tsdb/index/labelvalues.go +++ b/tsdb/index/labelvalues.go @@ -1,15 +1,163 @@ package index import ( + "context" + "errors" "fmt" + "reflect" "golang.org/x/exp/slices" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/util/annotations" ) +type labelValuesV2 struct { + name string + cur string + dec encoding.Decbuf + matchers []*labels.Matcher + skip int + lastVal string + exhausted bool + err error +} + +// newLabelValuesV2 returns an iterator over label values in a v2 index. +func (r *Reader) newLabelValuesV2(name string, matchers []*labels.Matcher) storage.LabelValues { + p := r.postings[name] + if len(p) == 0 { + return storage.EmptyLabelValues() + } + + d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil) + d.Skip(p[0].off) + // These are always the same number of bytes, and it's faster to skip than to parse + skip := d.Len() + // Key count + d.Uvarint() + // Label name + d.UvarintBytes() + skip -= d.Len() + + return &labelValuesV2{ + name: name, + matchers: matchers, + dec: d, + lastVal: p[len(p)-1].value, + skip: skip, + } +} + +func (l *labelValuesV2) Next() bool { + if l.err != nil || l.exhausted { + return false + } + + // Pick the first matching label value + for l.dec.Err() == nil { + // Label value + val := yoloString(l.dec.UvarintBytes()) + isMatch := true + for _, m := range l.matchers { + if m.Name != l.name { + // This should not happen + continue + } + + if !m.Matches(val) { + isMatch = false + break + } + } + + if isMatch { + l.cur = val + } + if val == l.lastVal { + l.exhausted = true + return isMatch + } + + // Offset + l.dec.Uvarint64() + // Skip forward to next entry + l.dec.Skip(l.skip) + + if isMatch { + break + } + } + if l.dec.Err() != nil { + // An error occurred decoding + l.err = fmt.Errorf("get postings offset entry: %w", l.dec.Err()) + return false + } + + return true +} + +func (l *labelValuesV2) At() string { + return l.cur +} + +func (l *labelValuesV2) Err() error { + return l.err +} + +func (l *labelValuesV2) Warnings() annotations.Annotations { + return nil +} + +func (l *labelValuesV2) Close() error { + return nil +} + +type labelValuesV1 struct { + it *reflect.MapIter + matchers []*labels.Matcher + name string +} + +func (l *labelValuesV1) Next() bool { +loop: + for l.it.Next() { + for _, m := range l.matchers { + if m.Name != l.name { + // This should not happen + continue + } + + if !m.Matches(l.At()) { + continue loop + } + } + + // This entry satisfies all matchers + return true + } + + return false +} + +func (l *labelValuesV1) At() string { + return yoloString(l.it.Value().Bytes()) +} + +func (*labelValuesV1) Err() error { + return nil +} + +func (*labelValuesV1) Warnings() annotations.Annotations { + return nil +} + +func (*labelValuesV1) Close() error { + return nil +} + // LabelValuesFor returns LabelValues for the given label name in the series referred to by postings. func (r *Reader) LabelValuesFor(postings Postings, name string) storage.LabelValues { return r.labelValuesFor(postings, name, true) @@ -208,9 +356,89 @@ func (it *intersectLabelValues) Close() error { return nil } +type LabelsGetter interface { + // Labels reads the series with the given ref and writes its labels into builder. + Labels(ref storage.SeriesRef, builder *labels.ScratchBuilder) error +} + // LabelValuesFor returns LabelValues for the given label name in the series referred to by postings. -func (p *MemPostings) LabelValuesFor(postings Postings, name string) storage.LabelValues { - return p.labelValuesFor(postings, name, true) +// lg is used to get labels from series in case the ratio of postings vs label values is sufficiently low, +// as an optimization. +func (p *MemPostings) LabelValuesFor(postings Postings, name string, lg LabelsGetter) storage.LabelValues { + p.mtx.RLock() + + e := p.m[name] + if len(e) == 0 { + p.mtx.RUnlock() + return storage.EmptyLabelValues() + } + + // With thread safety in mind and due to random key ordering in map, we have to construct the array in memory + vals := make([]string, 0, len(e)) + for val := range e { + vals = append(vals, val) + } + + // Let's see if expanded postings for matchers have smaller cardinality than label values. + // Since computing label values from series is expensive, we apply a limit on number of expanded + // postings (and series). + const maxExpandedPostingsFactor = 100 // Division factor for maximum number of matched series. + maxExpandedPostings := len(vals) / maxExpandedPostingsFactor + if maxExpandedPostings > 0 { + // Add space for one extra posting when checking if we expanded all postings. + expanded := make([]storage.SeriesRef, 0, maxExpandedPostings+1) + + // Call postings.Next() even if len(expanded) == maxExpandedPostings. This tells us if there are more postings or not. + for len(expanded) <= maxExpandedPostings && postings.Next() { + expanded = append(expanded, postings.At()) + } + + if len(expanded) <= maxExpandedPostings { + // When we're here, postings.Next() must have returned false, so we need to check for errors. + p.mtx.RUnlock() + + if err := postings.Err(); err != nil { + return storage.ErrLabelValues(fmt.Errorf("expanding postings for matchers: %w", err)) + } + + // We have expanded all the postings -- all returned label values will be from these series only. + // (We supply vals as a buffer for storing results. It should be big enough already, since it holds all possible label values.) + vals, err := LabelValuesFromSeries(lg, name, expanded, vals) + if err != nil { + return storage.ErrLabelValues(err) + } + + slices.Sort(vals) + return storage.NewListLabelValues(vals, nil) + } + + // If we haven't reached end of postings, we prepend our expanded postings to "postings", and continue. + postings = NewPrependPostings(expanded, postings) + } + + candidates := make([]Postings, 0, len(e)) + vals = vals[:0] + for val, srs := range e { + vals = append(vals, val) + candidates = append(candidates, NewListPostings(srs)) + } + indexes, err := FindIntersectingPostings(postings, candidates) + p.mtx.RUnlock() + if err != nil { + return storage.ErrLabelValues(err) + } + + // Filter the values, keeping only those with intersecting postings + if len(vals) != len(indexes) { + slices.Sort(indexes) + for i, index := range indexes { + vals[i] = vals[index] + } + vals = vals[:len(indexes)] + } + + slices.Sort(vals) + return storage.NewListLabelValues(vals, nil) } // LabelValuesExcluding returns LabelValues for the given label name in all other series than those referred to by postings. @@ -312,3 +540,62 @@ func contains(p, subp Postings) bool { // Couldn't find any value in subp which is not in p. return true } + +// LabelValuesStream returns an iterator over sorted label values for the given name. +// The matchers should only be for the name in question. +// LabelValues iterators need to be sorted, to enable merging of them. +func (p *MemPostings) LabelValuesStream(_ context.Context, name string, matchers ...*labels.Matcher) storage.LabelValues { + p.mtx.RLock() + + values := make([]string, 0, len(p.m[name])) +loop: + for v := range p.m[name] { + for _, m := range matchers { + if m.Name != name { + // This should not happen + continue + } + + if !m.Matches(v) { + continue loop + } + } + values = append(values, v) + } + p.mtx.RUnlock() + + slices.Sort(values) + return storage.NewListLabelValues(values, nil) +} + +func (p *MemPostings) PostingsForRegexp(ctx context.Context, m *labels.Matcher) Postings { + p.mtx.RLock() + + e := p.m[m.Name] + if len(e) == 0 { + p.mtx.RUnlock() + return EmptyPostings() + } + + values := make([]string, 0, len(e)) + for v := range e { + if m.Matches(v) { + values = append(values, v) + } + } + + its := make([]Postings, 0, len(values)) + for _, val := range values { + srs := e[val] + if len(srs) > 0 { + // Make a copy with thread safety in mind + srsCpy := make([]storage.SeriesRef, len(srs)) + copy(srsCpy, srs) + its = append(its, NewListPostings(srsCpy)) + } + } + p.mtx.RUnlock() + + return Merge(ctx, its...) +} + diff --git a/tsdb/index/labelvalues_test.go b/tsdb/index/labelvalues_test.go index 1c9cb4c42e..40f3c8ee79 100644 --- a/tsdb/index/labelvalues_test.go +++ b/tsdb/index/labelvalues_test.go @@ -358,3 +358,62 @@ func TestMemPostings_LabelValuesExcluding(t *testing.T) { require.Equal(t, []string{"1", "2"}, vals) }) } + +func TestMemPostings_LabelValuesStream(t *testing.T) { + ctx := context.Background() + mp := NewMemPostings() + mp.Add(1, labels.FromStrings("a", "1")) + mp.Add(1, labels.FromStrings("b", "1")) + mp.Add(2, labels.FromStrings("a", "1")) + mp.Add(2, labels.FromStrings("b", "2")) + mp.Add(3, labels.FromStrings("a", "1")) + mp.Add(3, labels.FromStrings("b", "3")) + mp.Add(4, labels.FromStrings("a", "1")) + mp.Add(4, labels.FromStrings("b", "4")) + mp.Add(5, labels.FromStrings("a", "2")) + mp.Add(5, labels.FromStrings("b", "5")) + + t.Run("without matchers", func(t *testing.T) { + it := mp.LabelValuesStream(context.Background(), "b") + + var vals []string + for it.Next() { + vals = append(vals, it.At()) + } + require.NoError(t, it.Err()) + require.Empty(t, it.Warnings()) + require.Equal(t, []string{"1", "2", "3", "4", "5"}, vals) + }) + + t.Run("with matchers", func(t *testing.T) { + it := mp.LabelValuesStream(context.Background(), "b", labels.MustNewMatcher(labels.MatchRegexp, "b", "[2,3]")) + + var vals []string + for it.Next() { + vals = append(vals, it.At()) + } + require.NoError(t, it.Err()) + require.Empty(t, it.Warnings()) + require.Equal(t, []string{"2", "3"}, vals) + }) + + // Matchers for other labels should be ignored. + t.Run("with matchers for another label", func(t *testing.T) { + it := mp.LabelValuesStream(context.Background(), "b", labels.MustNewMatcher(labels.MatchEqual, "a", "1")) + + var vals []string + for it.Next() { + vals = append(vals, it.At()) + } + require.NoError(t, it.Err()) + require.Empty(t, it.Warnings()) + require.Equal(t, []string{"1", "2", "3", "4", "5"}, vals) + }) + + t.Run("non-existent label", func(t *testing.T) { + it := mp.LabelValuesStream(ctx, "c") + require.False(t, it.Next()) + require.NoError(t, it.Err()) + require.Empty(t, it.Warnings()) + }) +} diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 9f37b62b35..cb72717cfc 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -956,3 +956,51 @@ func (h *postingsWithIndexHeap) Pop() interface{} { *h = old[0 : n-1] return x } + +func NewPrependPostings(a []storage.SeriesRef, b Postings) Postings { + return &prependPostings{ + ix: -1, + prefix: a, + rest: b, + } +} + +// prependPostings returns series references from "prefix" before using "rest" postings. +type prependPostings struct { + ix int + prefix []storage.SeriesRef + rest Postings +} + +func (p *prependPostings) Next() bool { + p.ix++ + if p.ix < len(p.prefix) { + return true + } + return p.rest.Next() +} + +func (p *prependPostings) Seek(v storage.SeriesRef) bool { + for p.ix < len(p.prefix) { + if p.ix >= 0 && p.prefix[p.ix] >= v { + return true + } + p.ix++ + } + + return p.rest.Seek(v) +} + +func (p *prependPostings) At() storage.SeriesRef { + if p.ix >= 0 && p.ix < len(p.prefix) { + return p.prefix[p.ix] + } + return p.rest.At() +} + +func (p *prependPostings) Err() error { + if p.ix >= 0 && p.ix < len(p.prefix) { + return nil + } + return p.rest.Err() +} diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 95f3aa2c1e..fd05c6b29d 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -1385,6 +1385,61 @@ func TestListPostings(t *testing.T) { }) } +func TestPrependPostings(t *testing.T) { + t.Run("empty", func(t *testing.T) { + p := NewPrependPostings(nil, NewListPostings(nil)) + require.False(t, p.Next()) + }) + + t.Run("next+At", func(t *testing.T) { + p := NewPrependPostings([]storage.SeriesRef{10, 20, 30}, NewListPostings([]storage.SeriesRef{200, 300, 500})) + + for _, s := range []storage.SeriesRef{10, 20, 30, 200, 300, 500} { + require.True(t, p.Next()) + require.Equal(t, s, p.At()) + require.Equal(t, s, p.At()) // Multiple calls return same value. + } + require.False(t, p.Next()) + }) + + t.Run("seek+At", func(t *testing.T) { + p := NewPrependPostings([]storage.SeriesRef{10, 20, 30}, NewListPostings([]storage.SeriesRef{200, 300, 500})) + + require.True(t, p.Seek(5)) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.Equal(t, storage.SeriesRef(10), p.At()) + + require.True(t, p.Seek(15)) + require.Equal(t, storage.SeriesRef(20), p.At()) + require.Equal(t, storage.SeriesRef(20), p.At()) + + require.True(t, p.Seek(20)) // Seeking to "current" value doesn't move postings iterator. + require.Equal(t, storage.SeriesRef(20), p.At()) + require.Equal(t, storage.SeriesRef(20), p.At()) + + require.True(t, p.Seek(50)) + require.Equal(t, storage.SeriesRef(200), p.At()) + require.Equal(t, storage.SeriesRef(200), p.At()) + + require.False(t, p.Seek(1000)) + require.False(t, p.Next()) + }) + + t.Run("err", func(t *testing.T) { + err := fmt.Errorf("error") + p := NewPrependPostings([]storage.SeriesRef{10, 20, 30}, ErrPostings(err)) + + for _, s := range []storage.SeriesRef{10, 20, 30} { + require.True(t, p.Next()) + require.Equal(t, s, p.At()) + require.NoError(t, p.Err()) + } + // Advancing after prepended values returns false, and gives us access to error. + require.False(t, p.Next()) + require.Equal(t, err, p.Err()) + }) +} + // BenchmarkListPostings benchmarks ListPostings by iterating Next/At sequentially. // See also BenchmarkIntersect as it performs more `At` calls than `Next` calls when intersecting. func BenchmarkListPostings(b *testing.B) { diff --git a/tsdb/labelvalues.go b/tsdb/labelvalues.go new file mode 100644 index 0000000000..2ef7139271 --- /dev/null +++ b/tsdb/labelvalues.go @@ -0,0 +1,117 @@ +package tsdb + +import ( + "context" + "fmt" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/index" +) + +func labelValuesForMatchersStream(ctx context.Context, r IndexReader, name string, matchers []*labels.Matcher) storage.LabelValues { + // See which labels must be non-empty. + // Optimization for case like {l=~".", l!="1"}. + labelMustBeSet := make(map[string]bool, len(matchers)) + for _, m := range matchers { + if !m.Matches("") { + labelMustBeSet[m.Name] = true + } + } + + var its, notIts []index.Postings + for _, m := range matchers { + switch { + case labelMustBeSet[m.Name]: + // If this matcher must be non-empty, we can be smarter. + matchesEmpty := m.Matches("") + isNot := m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp + switch { + case isNot && matchesEmpty: // l!="foo" + // If the label can't be empty and is a Not and the inner matcher + // doesn't match empty, then subtract it out at the end. + inverse, err := m.Inverse() + if err != nil { + return storage.ErrLabelValues(err) + } + + it, err := postingsForMatcher(ctx, r, inverse) + if err != nil { + return storage.ErrLabelValues(err) + } + notIts = append(notIts, it) + case isNot && !matchesEmpty: // l!="" + // If the label can't be empty and is a Not, but the inner matcher can + // be empty we need to use inversePostingsForMatcher. + inverse, err := m.Inverse() + if err != nil { + return storage.ErrLabelValues(err) + } + + it, err := inversePostingsForMatcher(ctx, r, inverse) + if err != nil { + return storage.ErrLabelValues(err) + } + if index.IsEmptyPostingsType(it) { + return storage.EmptyLabelValues() + } + its = append(its, it) + default: // l="a" + // Non-Not matcher, use normal postingsForMatcher. + it, err := postingsForMatcher(ctx, r, m) + if err != nil { + return storage.ErrLabelValues(err) + } + if index.IsEmptyPostingsType(it) { + return storage.EmptyLabelValues() + } + its = append(its, it) + } + default: // l="" + // If a matcher for a labelname selects an empty value, it selects all + // the series which don't have the label name set too. See: + // https://github.com/prometheus/prometheus/issues/3575 and + // https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555 + it, err := inversePostingsForMatcher(ctx, r, m) + if err != nil { + return storage.ErrLabelValues(err) + } + notIts = append(notIts, it) + } + } + + if len(its) == 0 && len(notIts) > 0 { + pit := index.Merge(ctx, notIts...) + return r.LabelValuesExcluding(pit, name) + } + + pit := index.Intersect(its...) + for _, n := range notIts { + pit = index.Without(pit, n) + } + pit = expandPostings(pit) + + return r.LabelValuesFor(pit, name) +} + +// expandPostings expands postings up to a certain limit, to reduce runtime complexity when filtering label values. +// If the limit is reached, the rest is unexpanded. +func expandPostings(postings index.Postings) index.Postings { + const expandPostingsLimit = 10_000_000 + var expanded []storage.SeriesRef + // Go one beyond the limit, so we can tell if the iterator is exhausted + for len(expanded) <= expandPostingsLimit && postings.Next() { + expanded = append(expanded, postings.At()) + } + if postings.Err() != nil { + return index.ErrPostings(fmt.Errorf("expanding postings for matchers: %w", postings.Err())) + } + if len(expanded) > expandPostingsLimit { + // Couldn't exhaust the iterator + postings = index.NewPrependPostings(expanded, postings) + } else { + postings = index.NewListPostings(expanded) + } + + return postings +} diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 0e7769f03f..e1e34e3d13 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -164,6 +164,10 @@ func (oh *OOOHeadIndexReader) PostingsForMatchers(ctx context.Context, concurren return oh.head.pfmc.PostingsForMatchers(ctx, oh, concurrent, ms...) } +func (oh *OOOHeadIndexReader) PostingsForRegexp(ctx context.Context, m *labels.Matcher) index.Postings { + return oh.head.postings.PostingsForRegexp(ctx, m) +} + // LabelValues needs to be overridden from the headIndexReader implementation due // to the check that happens at the beginning where we make sure that the query // interval overlaps with the head minooot and maxooot. @@ -179,6 +183,28 @@ func (oh *OOOHeadIndexReader) LabelValues(ctx context.Context, name string, matc return labelValuesWithMatchers(ctx, oh, name, matchers...) } +// LabelValuesStream needs to be overridden from the headIndexReader implementation due +// to the check that happens at the beginning where we make sure that the query +// interval overlaps with the head minooot and maxooot. +func (oh *OOOHeadIndexReader) LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) storage.LabelValues { + if oh.maxt < oh.head.MinOOOTime() || oh.mint > oh.head.MaxOOOTime() { + return storage.EmptyLabelValues() + } + + ownMatchers := 0 + for _, m := range matchers { + if m.Name == name { + ownMatchers++ + } + } + if ownMatchers == len(matchers) { + return oh.head.postings.LabelValuesStream(ctx, name, matchers...) + } + + // There are matchers on other label names than the requested one, so will need to intersect matching series + return labelValuesForMatchersStream(ctx, oh, name, matchers) +} + type chunkMetaAndChunkDiskMapperRef struct { meta chunks.Meta ref chunks.ChunkDiskMapperRef @@ -441,6 +467,10 @@ func (ir *OOOCompactionHeadIndexReader) Postings(_ context.Context, name string, return index.NewListPostings(ir.ch.postings), nil } +func (ir *OOOCompactionHeadIndexReader) PostingsForRegexp(context.Context, *labels.Matcher) index.Postings { + return index.ErrPostings(errors.New("not supported")) +} + func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings { // This will already be sorted from the Postings() call above. return p @@ -462,6 +492,10 @@ func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *l return ir.ch.oooIR.series(ref, builder, chks, 0, ir.ch.lastMmapRef) } +func (ir *OOOCompactionHeadIndexReader) Labels(ref storage.SeriesRef, builder *labels.ScratchBuilder) error { + return ir.ch.oooIR.series(ref, builder, nil, 0, ir.ch.lastMmapRef) +} + func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { return nil, errors.New("not implemented") } @@ -470,6 +504,10 @@ func (ir *OOOCompactionHeadIndexReader) LabelValues(_ context.Context, name stri return nil, errors.New("not implemented") } +func (ir *OOOCompactionHeadIndexReader) LabelValuesStream(context.Context, string, ...*labels.Matcher) storage.LabelValues { + return storage.ErrLabelValues(errors.New("not implemented")) +} + func (ir *OOOCompactionHeadIndexReader) PostingsForMatchers(_ context.Context, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { return nil, errors.New("not implemented") } diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index 059f192b3f..c90c2921e1 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -37,6 +37,9 @@ type IndexPostingsReader interface { // Found IDs are not strictly required to point to a valid Series, e.g. // during background garbage collections. Input values must be sorted. Postings(ctx context.Context, name string, values ...string) (index.Postings, error) + + // PostingsForRegexp returns an iterator over postings matching the provided regexp label matcher. + PostingsForRegexp(ctx context.Context, m *labels.Matcher) index.Postings } // NewPostingsForMatchersCache creates a new PostingsForMatchersCache. diff --git a/tsdb/postings_for_matchers_cache_test.go b/tsdb/postings_for_matchers_cache_test.go index 60312c6f47..c77e7f05a8 100644 --- a/tsdb/postings_for_matchers_cache_test.go +++ b/tsdb/postings_for_matchers_cache_test.go @@ -429,6 +429,10 @@ func (idx indexForPostingsMock) Postings(context.Context, string, ...string) (in panic("implement me") } +func (idx indexForPostingsMock) PostingsForRegexp(context.Context, *labels.Matcher) index.Postings { + panic("implement me") +} + // timeNowMock offers a mockable time.Now() implementation // empty value is ready to be used, and it should not be copied (use a reference). type timeNowMock struct { diff --git a/tsdb/querier.go b/tsdb/querier.go index 6fd34d41a9..d5b8bd493c 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -79,6 +79,10 @@ func (q *blockBaseQuerier) LabelValues(ctx context.Context, name string, matcher return res, nil, err } +func (q *blockBaseQuerier) LabelValuesStream(ctx context.Context, name string, matchers ...*labels.Matcher) storage.LabelValues { + return q.index.LabelValuesStream(ctx, name, matchers...) +} + func (q *blockBaseQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { res, err := q.index.LabelNames(ctx, matchers...) return res, nil, err @@ -324,23 +328,8 @@ func postingsForMatcher(ctx context.Context, ix IndexPostingsReader, m *labels.M } } - vals, err := ix.LabelValues(ctx, m.Name) - if err != nil { - return nil, err - } - - var res []string - for _, val := range vals { - if m.Matches(val) { - res = append(res, val) - } - } - - if len(res) == 0 { - return index.EmptyPostings(), nil - } - - return ix.Postings(ctx, m.Name, res...) + it := ix.PostingsForRegexp(ctx, m) + return it, it.Err() } // inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher. @@ -445,11 +434,11 @@ func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, ma // We have expanded all the postings -- all returned label values will be from these series only. // (We supply allValues as a buffer for storing results. It should be big enough already, since it holds all possible label values.) - return labelValuesFromSeries(r, name, expanded, allValues) + return index.LabelValuesFromSeries(r, name, expanded, allValues) } // If we haven't reached end of postings, we prepend our expanded postings to "p", and continue. - p = newPrependPostings(expanded, p) + p = index.NewPrependPostings(expanded, p) } valuesPostings := make([]index.Postings, len(allValues)) @@ -472,87 +461,6 @@ func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, ma return values, nil } -// labelValuesFromSeries returns all unique label values from for given label name from supplied series. Values are not sorted. -// buf is space for holding result (if it isn't big enough, it will be ignored), may be nil. -func labelValuesFromSeries(r IndexReader, labelName string, refs []storage.SeriesRef, buf []string) ([]string, error) { - values := map[string]struct{}{} - - var builder labels.ScratchBuilder - for _, ref := range refs { - err := r.Series(ref, &builder, nil) - // Postings may be stale. Skip if no underlying series exists. - if errors.Is(err, storage.ErrNotFound) { - continue - } - if err != nil { - return nil, fmt.Errorf("label values for label %s: %w", labelName, err) - } - - v := builder.Labels().Get(labelName) - if v != "" { - values[v] = struct{}{} - } - } - - if cap(buf) >= len(values) { - buf = buf[:0] - } else { - buf = make([]string, 0, len(values)) - } - for v := range values { - buf = append(buf, v) - } - return buf, nil -} - -func newPrependPostings(a []storage.SeriesRef, b index.Postings) index.Postings { - return &prependPostings{ - ix: -1, - prefix: a, - rest: b, - } -} - -// prependPostings returns series references from "prefix" before using "rest" postings. -type prependPostings struct { - ix int - prefix []storage.SeriesRef - rest index.Postings -} - -func (p *prependPostings) Next() bool { - p.ix++ - if p.ix < len(p.prefix) { - return true - } - return p.rest.Next() -} - -func (p *prependPostings) Seek(v storage.SeriesRef) bool { - for p.ix < len(p.prefix) { - if p.ix >= 0 && p.prefix[p.ix] >= v { - return true - } - p.ix++ - } - - return p.rest.Seek(v) -} - -func (p *prependPostings) At() storage.SeriesRef { - if p.ix >= 0 && p.ix < len(p.prefix) { - return p.prefix[p.ix] - } - return p.rest.At() -} - -func (p *prependPostings) Err() error { - if p.ix >= 0 && p.ix < len(p.prefix) { - return nil - } - return p.rest.Err() -} - func labelNamesWithMatchers(ctx context.Context, r IndexReader, matchers ...*labels.Matcher) ([]string, error) { p, err := r.PostingsForMatchers(ctx, false, matchers...) if err != nil { diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index 1edc37c23d..eae5ebcadf 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -97,6 +97,60 @@ func BenchmarkQuerier(b *testing.B) { }) } +func BenchmarkIndexReader_LabelValuesStream(b *testing.B) { + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = b.TempDir() + h, err := NewHead(nil, nil, nil, nil, opts, nil) + require.NoError(b, err) + b.Cleanup(func() { + require.NoError(b, h.Close()) + }) + + app := h.Appender(context.Background()) + addSeries := func(l labels.Labels) { + app.Append(0, l, 0, 0) + } + + for n := 0; n < 10; n++ { + for i := 0; i < 100000; i++ { + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "foo", "i_times_n", strconv.Itoa(i*n))) + // Have some series that won't be matched, to properly test inverted matches. + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "0_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "1_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "2_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "foo")) + } + } + require.NoError(b, app.Commit()) + + b.Run("Head", func(b *testing.B) { + ir, err := h.Index() + require.NoError(b, err) + b.Cleanup(func() { + require.NoError(b, ir.Close()) + }) + + benchmarkLabelValuesStream(b, ir) + }) + + b.Run("Block", func(b *testing.B) { + blockdir := createBlockFromHead(b, b.TempDir(), h) + block, err := OpenBlock(nil, blockdir, nil) + require.NoError(b, err) + b.Cleanup(func() { + require.NoError(b, block.Close()) + }) + ir, err := block.Index() + require.NoError(b, err) + b.Cleanup(func() { + require.NoError(b, ir.Close()) + }) + + benchmarkLabelValuesStream(b, ir) + }) +} + func benchmarkPostingsForMatchers(b *testing.B, ir IndexReader) { ctx := context.Background() @@ -242,6 +296,55 @@ func benchmarkLabelValuesWithMatchers(b *testing.B, ir IndexReader) { } } +func benchmarkLabelValuesStream(b *testing.B, ir IndexReader) { + i1 := labels.MustNewMatcher(labels.MatchEqual, "i", "1") + iStar := labels.MustNewMatcher(labels.MatchRegexp, "i", "^.*$") + jNotFoo := labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo") + jXXXYYY := labels.MustNewMatcher(labels.MatchRegexp, "j", "XXX|YYY") + jXplus := labels.MustNewMatcher(labels.MatchRegexp, "j", "X.+") + n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix) + nX := labels.MustNewMatcher(labels.MatchNotEqual, "n", "X"+postingsBenchSuffix) + // XXX: This is badly defined, i.e. it advertises as matching on "n", but matches on "i" + nPlus := labels.MustNewMatcher(labels.MatchRegexp, "i", "^.+$") + primesTimes := labels.MustNewMatcher(labels.MatchEqual, "i_times_n", "533701") // = 76243*7, ie. multiplication of primes. It will match a single i*n combination. + nonPrimesTimes := labels.MustNewMatcher(labels.MatchEqual, "i_times_n", "20") // 1*20, 2*10, 4*5, 5*4 + times12 := labels.MustNewMatcher(labels.MatchRegexp, "i_times_n", "12.*") + + ctx := context.Background() + + cases := []struct { + name string + labelName string + matchers []*labels.Matcher + }{ + {`i with i="1"`, "i", []*labels.Matcher{i1}}, + {`i with n="1"`, "i", []*labels.Matcher{n1}}, + {`i with n="^.+$"`, "i", []*labels.Matcher{nPlus}}, + {`i with n="1",j!="foo"`, "i", []*labels.Matcher{n1, jNotFoo}}, + {`i with n="1",j=~"X.+"`, "i", []*labels.Matcher{n1, jXplus}}, + {`i with n="1",j=~"XXX|YYY"`, "i", []*labels.Matcher{n1, jXXXYYY}}, + {`i with n="X",j!="foo"`, "i", []*labels.Matcher{nX, jNotFoo}}, + {`i with n="1",i=~"^.*$",j!="foo"`, "i", []*labels.Matcher{n1, iStar, jNotFoo}}, + {`i with i_times_n=533701`, "i", []*labels.Matcher{primesTimes}}, + {`i with i_times_n=20`, "i", []*labels.Matcher{nonPrimesTimes}}, + {`i with i_times_n=~"12.*""`, "i", []*labels.Matcher{times12}}, + {`n with j!="foo"`, "n", []*labels.Matcher{jNotFoo}}, + {`n with i="1"`, "n", []*labels.Matcher{i1}}, + } + + for _, c := range cases { + b.Run(c.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + it := ir.LabelValuesStream(ctx, c.labelName, c.matchers...) + for it.Next() { + } + require.NoError(b, it.Err()) + require.Empty(b, it.Warnings()) + } + }) + } +} + func BenchmarkMergedStringIter(b *testing.B) { numSymbols := 100000 s := make([]string, numSymbols) diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 49b3be36d0..43652c56a0 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -2327,6 +2327,11 @@ func (m mockIndex) LabelValues(_ context.Context, name string, matchers ...*labe return values, nil } +func (m mockIndex) LabelValuesStream(context.Context, string, ...*labels.Matcher) storage.LabelValues { + // TODO + return storage.EmptyLabelValues() +} + func (m mockIndex) LabelValueFor(_ context.Context, id storage.SeriesRef, label string) (string, error) { return m.series[id].l.Get(label), nil } @@ -2366,6 +2371,16 @@ func (m mockIndex) SortedPostings(p index.Postings) index.Postings { return index.NewListPostings(ep) } +func (m mockIndex) PostingsForRegexp(ctx context.Context, matcher *labels.Matcher) index.Postings { + var res []index.Postings + for l, srs := range m.postings { + if l.Name == matcher.Name && matcher.Matches(l.Value) { + res = append(res, index.NewListPostings(srs)) + } + } + return index.Merge(ctx, res...) +} + func (m mockIndex) PostingsForMatchers(_ context.Context, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { var ps []storage.SeriesRef for p, s := range m.series { @@ -2427,6 +2442,15 @@ func (m mockIndex) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, return nil } +func (m mockIndex) Labels(ref storage.SeriesRef, builder *labels.ScratchBuilder) error { + s, ok := m.series[ref] + if !ok { + return storage.ErrNotFound + } + builder.Assign(s.l) + return nil +} + func (m mockIndex) LabelNames(_ context.Context, matchers ...*labels.Matcher) ([]string, error) { names := map[string]struct{}{} if len(matchers) == 0 { @@ -3283,6 +3307,11 @@ func (m mockMatcherIndex) LabelValues(context.Context, string, ...*labels.Matche return []string{}, errors.New("label values called") } +// LabelValuesStream will return a failing label values iterator. +func (m mockMatcherIndex) LabelValuesStream(context.Context, string, ...*labels.Matcher) storage.LabelValues { + return storage.ErrLabelValues(errors.New("label values stream called")) +} + func (m mockMatcherIndex) LabelValueFor(context.Context, storage.SeriesRef, string) (string, error) { return "", errors.New("label value for called") } @@ -3295,6 +3324,10 @@ func (m mockMatcherIndex) Postings(context.Context, string, ...string) (index.Po return index.EmptyPostings(), nil } +func (m mockMatcherIndex) PostingsForRegexp(context.Context, *labels.Matcher) index.Postings { + return index.EmptyPostings() +} + func (m mockMatcherIndex) PostingsForMatchers(bool, ...*labels.Matcher) (index.Postings, error) { return index.EmptyPostings(), nil } @@ -3328,9 +3361,8 @@ func TestPostingsForMatcher(t *testing.T) { hasError: false, }, { - // Regex matcher which doesn't have '|' will call Labelvalues() matcher: labels.MustNewMatcher(labels.MatchRegexp, "test", ".*"), - hasError: true, + hasError: false, }, { matcher: labels.MustNewMatcher(labels.MatchRegexp, "test", "a|b"), @@ -3625,62 +3657,7 @@ func TestQueryWithDeletedHistograms(t *testing.T) { } } -func TestPrependPostings(t *testing.T) { - t.Run("empty", func(t *testing.T) { - p := newPrependPostings(nil, index.NewListPostings(nil)) - require.False(t, p.Next()) - }) - - t.Run("next+At", func(t *testing.T) { - p := newPrependPostings([]storage.SeriesRef{10, 20, 30}, index.NewListPostings([]storage.SeriesRef{200, 300, 500})) - - for _, s := range []storage.SeriesRef{10, 20, 30, 200, 300, 500} { - require.True(t, p.Next()) - require.Equal(t, s, p.At()) - require.Equal(t, s, p.At()) // Multiple calls return same value. - } - require.False(t, p.Next()) - }) - - t.Run("seek+At", func(t *testing.T) { - p := newPrependPostings([]storage.SeriesRef{10, 20, 30}, index.NewListPostings([]storage.SeriesRef{200, 300, 500})) - - require.True(t, p.Seek(5)) - require.Equal(t, storage.SeriesRef(10), p.At()) - require.Equal(t, storage.SeriesRef(10), p.At()) - - require.True(t, p.Seek(15)) - require.Equal(t, storage.SeriesRef(20), p.At()) - require.Equal(t, storage.SeriesRef(20), p.At()) - - require.True(t, p.Seek(20)) // Seeking to "current" value doesn't move postings iterator. - require.Equal(t, storage.SeriesRef(20), p.At()) - require.Equal(t, storage.SeriesRef(20), p.At()) - - require.True(t, p.Seek(50)) - require.Equal(t, storage.SeriesRef(200), p.At()) - require.Equal(t, storage.SeriesRef(200), p.At()) - - require.False(t, p.Seek(1000)) - require.False(t, p.Next()) - }) - - t.Run("err", func(t *testing.T) { - err := fmt.Errorf("error") - p := newPrependPostings([]storage.SeriesRef{10, 20, 30}, index.ErrPostings(err)) - - for _, s := range []storage.SeriesRef{10, 20, 30} { - require.True(t, p.Next()) - require.Equal(t, s, p.At()) - require.NoError(t, p.Err()) - } - // Advancing after prepended values returns false, and gives us access to error. - require.False(t, p.Next()) - require.Equal(t, err, p.Err()) - }) -} - -func TestLabelsValuesWithMatchersOptimization(t *testing.T) { +func TestLabelValuesWithMatchersOptimization(t *testing.T) { dir := t.TempDir() opts := DefaultHeadOptions() opts.ChunkRange = 1000 diff --git a/web/api/v1/errors_test.go b/web/api/v1/errors_test.go index b6ec7d4e1f..d46b0efbca 100644 --- a/web/api/v1/errors_test.go +++ b/web/api/v1/errors_test.go @@ -174,6 +174,10 @@ func (t errorTestQuerier) LabelValues(context.Context, string, ...*labels.Matche return nil, nil, t.err } +func (t errorTestQuerier) LabelValuesStream(context.Context, string, ...*labels.Matcher) storage.LabelValues { + return storage.ErrLabelValues(t.err) +} + func (t errorTestQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, annotations.Annotations, error) { return nil, nil, t.err }