diff --git a/storage/interface.go b/storage/interface.go index 71131b217..a4493946e 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -465,6 +465,6 @@ type LabelValues interface { // When an error occurs, the iterator cannot continue. Err() error // Warnings is a collection of warnings that have occurred during iteration. - // Warnings could be non-empty even if iteration has not failed with error. + // Warnings could be non-empty even if iteration has not failed with an error. Warnings() Warnings } diff --git a/storage/merge_test.go b/storage/merge_test.go index f92f15301..e8fc5af39 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -1032,30 +1032,30 @@ func (m *mockGenericQuerier) LabelValuesStream(name string, matchers ...*labels. } type labelValuesList struct { - i int + cur string values []string warnings Warnings } func newLabelValuesList(values []string, warnings Warnings) *labelValuesList { return &labelValuesList{ - i: -1, values: values, warnings: warnings, } } func (l *labelValuesList) Next() bool { - if l.i >= len(l.values)-1 { + if len(l.values) == 0 { return false } - l.i++ + l.cur = l.values[0] + l.values = l.values[1:] return true } func (l *labelValuesList) At() string { - return l.values[l.i] + return l.cur } func (*labelValuesList) Err() error { diff --git a/storage/secondary.go b/storage/secondary.go index 0138d07c2..dd9de14c7 100644 --- a/storage/secondary.go +++ b/storage/secondary.go @@ -56,9 +56,8 @@ func (s *secondaryQuerier) LabelValues(name string, matchers ...*labels.Matcher) } func (s *secondaryQuerier) LabelValuesStream(name string, matchers ...*labels.Matcher) LabelValues { - it := s.genericQuerier.LabelValuesStream(name, matchers...) return &secondaryLabelValues{ - it: it, + it: s.genericQuerier.LabelValuesStream(name, matchers...), } } diff --git a/tsdb/block.go b/tsdb/block.go index bbb5e9b93..1d24a7b9c 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -94,6 +94,10 @@ type IndexReader interface { // the series labels mod shardCount, using a hash function which is consistent over time. ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings + // PostingsWithLabel returns a PostingsWithLabelValues iterator over postings (series refs) with the specified + // label name, that also tracks the associated label values. + PostingsWithLabel(name string) index.PostingsWithLabelValues + // Series populates the given builder and chunk metas for the series identified // by the reference. // Returns storage.ErrNotFound if the ref does not resolve to a known series. @@ -505,7 +509,18 @@ func (r blockIndexReader) LabelValues(name string, matchers ...*labels.Matcher) } func (r blockIndexReader) LabelValuesStream(name string, matchers ...*labels.Matcher) storage.LabelValues { - return r.ir.LabelValuesStream(name, matchers...) + ownMatchers := 0 + for _, m := range matchers { + if m.Name == name { + ownMatchers++ + } + } + if ownMatchers == len(matchers) { + return r.ir.LabelValuesStream(name, matchers...) + } + + // There are matchers on other label names than the requested one, so will need to intersect matching series + return labelValuesForMatchersStream(r.ir, name, matchers) } func (r blockIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, error) { @@ -536,6 +551,10 @@ func (r blockIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCou return r.ir.ShardedPostings(p, shardIndex, shardCount) } +func (r blockIndexReader) PostingsWithLabel(name string) index.PostingsWithLabelValues { + return r.ir.PostingsWithLabel(name) +} + func (r blockIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { if err := r.ir.Series(ref, builder, chks); err != nil { return errors.Wrapf(err, "block: %s", r.b.Meta().ULID) diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 7cacebe3f..806377cef 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -279,8 +279,6 @@ func TestLabelValuesWithMatchers(t *testing.T) { } func TestLabelValuesStream_WithMatchers(t *testing.T) { - tmpdir := t.TempDir() - var seriesEntries []storage.Series for i := 0; i < 100; i++ { seriesEntries = append(seriesEntries, storage.NewListSeries(labels.FromStrings( @@ -289,19 +287,18 @@ func TestLabelValuesStream_WithMatchers(t *testing.T) { ), []tsdbutil.Sample{sample{100, 0, nil, nil}})) } - blockDir := createBlock(t, tmpdir, seriesEntries) + blockDir := createBlock(t, t.TempDir(), seriesEntries) files, err := sequenceFiles(chunkDir(blockDir)) require.NoError(t, err) - require.Greater(t, len(files), 0, "No chunk created.") + require.NotEmpty(t, files, "No chunk created.") - // Check open err. block, err := OpenBlock(nil, blockDir, nil) require.NoError(t, err) - defer func() { require.NoError(t, block.Close()) }() + t.Cleanup(func() { require.NoError(t, block.Close()) }) indexReader, err := block.Index() require.NoError(t, err) - defer func() { require.NoError(t, indexReader.Close()) }() + t.Cleanup(func() { require.NoError(t, indexReader.Close()) }) var uniqueWithout30s []string for i := 0; i < 100; i++ { @@ -317,17 +314,17 @@ func TestLabelValuesStream_WithMatchers(t *testing.T) { expectedValues []string }{ { - name: "get tens based on unique id", + name: "get tens based on unique ID", labelName: "tens", matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "value35")}, expectedValues: []string{"value3"}, }, { - name: "get unique ids based on a ten", + name: "get unique IDs based on a ten", labelName: "unique", matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "tens", "value1")}, expectedValues: []string{"value10", "value11", "value12", "value13", "value14", "value15", "value16", "value17", "value18", "value19"}, }, { - name: "get tens by pattern matching on unique id", + name: "get tens by pattern matching on unique ID", labelName: "tens", matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "unique", "value[5-7]5")}, expectedValues: []string{"value5", "value6", "value7"}, @@ -337,7 +334,7 @@ func TestLabelValuesStream_WithMatchers(t *testing.T) { matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "unique", "")}, expectedValues: []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"}, }, { - name: "get unique IDs based on tens not being equal to a certain value, while not emptyy", + name: "get unique IDs based on tens not being equal to a certain value, while not empty", labelName: "unique", matchers: []*labels.Matcher{ labels.MustNewMatcher(labels.MatchNotEqual, "tens", "value3"), diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 9db70235e..0929de516 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -96,8 +96,18 @@ func (h *headIndexReader) LabelValuesStream(name string, matchers ...*labels.Mat return nil } - // TODO: Implement matchers - return h.head.postings.LabelValuesStream(name) + ownMatchers := 0 + for _, m := range matchers { + if m.Name == name { + ownMatchers++ + } + } + if ownMatchers == len(matchers) { + return h.head.postings.LabelValuesStream(name, matchers...) + } + + // There are matchers on other label names than the requested one, so will need to intersect matching series + return labelValuesForMatchersStream(h, name, matchers) } // LabelNames returns all the unique label names present in the head @@ -187,6 +197,10 @@ func (h *headIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCou return index.NewListPostings(out) } +func (h *headIndexReader) PostingsWithLabel(name string) index.PostingsWithLabelValues { + return h.head.postings.PostingsWithLabel(name) +} + // Series returns the series for the given reference. func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { s := h.head.series.getByID(chunks.HeadSeriesRef(ref)) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 25873053a..ba89ffe56 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2437,7 +2437,7 @@ func TestHeadLabelValuesWithMatchers(t *testing.T) { matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "unique", "value[5-7]5")}, expectedValues: []string{"value5", "value6", "value7"}, }, { - name: "get tens by matching for absence of unique label", + name: "get tens by matching for presence of unique label", labelName: "tens", matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "unique", "")}, expectedValues: []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"}, @@ -2474,6 +2474,18 @@ func TestHeadLabelValuesStream_WithMatchers(t *testing.T) { } require.NoError(t, app.Commit()) + indexReader := head.indexRange(0, 200) + t.Cleanup(func() { + require.NoError(t, indexReader.Close()) + }) + + var uniqueWithout30s []string + for i := 0; i < 100; i++ { + if i/10 != 3 { + uniqueWithout30s = append(uniqueWithout30s, fmt.Sprintf("value%d", i)) + } + } + sort.Strings(uniqueWithout30s) testCases := []struct { name string labelName string @@ -2481,40 +2493,44 @@ func TestHeadLabelValuesStream_WithMatchers(t *testing.T) { expectedValues []string }{ { - name: "get tens based on unique id", + name: "get tens based on unique ID", labelName: "tens", matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "value35")}, expectedValues: []string{"value3"}, }, { - name: "get unique ids based on a ten", + name: "get unique IDs based on a ten", labelName: "unique", matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "tens", "value1")}, expectedValues: []string{"value10", "value11", "value12", "value13", "value14", "value15", "value16", "value17", "value18", "value19"}, }, { - name: "get tens by pattern matching on unique id", + name: "get tens by pattern matching on unique ID", labelName: "tens", matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "unique", "value[5-7]5")}, expectedValues: []string{"value5", "value6", "value7"}, }, { - name: "get tens by matching for absence of unique label", + name: "get tens by matching for presence of unique label", labelName: "tens", matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "unique", "")}, expectedValues: []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"}, + }, { + name: "get unique IDs based on tens not being equal to a certain value, while not empty", + labelName: "unique", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotEqual, "tens", "value3"), + labels.MustNewMatcher(labels.MatchNotEqual, "tens", ""), + }, + expectedValues: uniqueWithout30s, }, } - for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - headIdxReader := head.indexRange(0, 200) - - it := headIdxReader.LabelValuesStream(tt.labelName, tt.matchers...) + it := indexReader.LabelValuesStream(tt.labelName, tt.matchers...) var values []string for it.Next() { values = append(values, it.At()) } require.NoError(t, it.Err()) require.Empty(t, it.Warnings()) - require.Equal(t, tt.expectedValues, values) }) } diff --git a/tsdb/index/index.go b/tsdb/index/index.go index dd2114e73..aa49712f2 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -1543,16 +1543,11 @@ func (r *Reader) LabelValues(name string, matchers ...*labels.Matcher) ([]string } func (r *Reader) LabelValuesStream(name string, matchers ...*labels.Matcher) storage.LabelValues { - ownMatchers := 0 - for _, m := range matchers { - if m.Name == name { - ownMatchers++ - } - } - if r.version == FormatV1 { p := r.postingsV1[name] - // TODO: Handle matchers on other label names + if len(p) == 0 { + return storage.EmptyLabelValues() + } return &labelValuesV1{ matchers: matchers, it: reflect.ValueOf(p).MapRange(), @@ -1561,16 +1556,10 @@ func (r *Reader) LabelValuesStream(name string, matchers ...*labels.Matcher) sto p := r.postings[name] if len(p) == 0 { - return nil - } - - if ownMatchers == len(matchers) { - // All matchers are for the requested label name - return r.newLabelValuesV2(name, matchers) + return storage.EmptyLabelValues() } - // There are matchers on other label names than the requested one, so will need to intersect matching series - return r.labelValuesForMatchersStream(name, matchers) + return r.newLabelValuesV2(name, matchers) } // LabelNamesFor returns all the label names for the series referred to by IDs. diff --git a/tsdb/index/labelvalues.go b/tsdb/index/labelvalues.go index 5c2ad97a6..21dd9548b 100644 --- a/tsdb/index/labelvalues.go +++ b/tsdb/index/labelvalues.go @@ -4,16 +4,22 @@ import ( "container/heap" "fmt" "reflect" - "sort" "github.com/pkg/errors" - "golang.org/x/exp/maps" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/encoding" ) +// PostingsWithLabelValues is a Postings iterator also tracking label values. +type PostingsWithLabelValues interface { + Postings + + // ValueAt returns the current label value. + ValueAt() string +} + type labelValuesV2 struct { name string cur string @@ -88,7 +94,7 @@ func (l *labelValuesV2) Next() bool { } } if l.dec.Err() != nil { - // An error occurred skipping to this entry + // An error occurred decoding l.err = errors.Wrap(l.dec.Err(), "get postings offset entry") return false } @@ -108,307 +114,25 @@ func (l *labelValuesV2) Warnings() storage.Warnings { return nil } -func (r *Reader) labelValuesForMatchersStream(name string, matchers []*labels.Matcher) storage.LabelValues { - // See which labels must be non-empty. - // Optimization for case like {l=~".", l!="1"}. - labelMustBeSet := make(map[string]bool, len(matchers)) - for _, m := range matchers { - if !m.Matches("") { - labelMustBeSet[m.Name] = true - } - } - - fmt.Printf("labelValuesForMatchersStream getting label values iterator for %q\n", name) - - // Make sure to intersect with series containing the label name - pit := r.postingsWithLabel(name) - if pit.Err() != nil { - return storage.ErrLabelValues(pit.Err()) - } - if IsEmptyPostingsType(pit) { - return storage.EmptyLabelValues() - } - its := []Postings{pit} - var notIts []Postings - for _, m := range matchers { - switch { - case labelMustBeSet[m.Name]: - // If this matcher must be non-empty, we can be smarter. - matchesEmpty := m.Matches("") - isNot := m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp - switch { - case isNot && matchesEmpty: // l!="foo" - // If the label can't be empty and is a Not and the inner matcher - // doesn't match empty, then subtract it out at the end. - inverse, err := m.Inverse() - if err != nil { - return storage.ErrLabelValues(err) - } - - it, err := r.postingsForMatcher(inverse) - if err != nil { - return storage.ErrLabelValues(err) - } - notIts = append(notIts, it) - case isNot && !matchesEmpty: // l!="" - // If the label can't be empty and is a Not, but the inner matcher can - // be empty we need to use inversePostingsForMatcher. - inverse, err := m.Inverse() - if err != nil { - return storage.ErrLabelValues(err) - } - - it, err := r.inversePostingsForMatcher(inverse) - if err != nil { - return storage.ErrLabelValues(err) - } - if IsEmptyPostingsType(it) { - return storage.EmptyLabelValues() - } - its = append(its, it) - default: // l="a" - // Non-Not matcher, use normal postingsForMatcher. - it, err := r.postingsForMatcher(m) - if err != nil { - return storage.ErrLabelValues(err) - } - if IsEmptyPostingsType(it) { - return storage.EmptyLabelValues() - } - its = append(its, it) - } - default: // l="" - // If a matcher for a labelname selects an empty value, it selects all - // the series which don't have the label name set too. See: - // https://github.com/prometheus/prometheus/issues/3575 and - // https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555 - it, err := r.inversePostingsForMatcher(m) - if err != nil { - return storage.ErrLabelValues(err) - } - notIts = append(notIts, it) - } - } - - it := newIntersectLabelValues(its, notIts) - - // Fill in a map for de-duplication - m := map[string]struct{}{} - for it.Next() { - m[it.At()] = struct{}{} - } - if it.Err() != nil { - return storage.ErrLabelValues(it.Err()) - } - - vals := make([]string, 0, len(m)) - for v := range m { - vals = append(vals, v) - } - maps.Clear(m) - sort.Strings(vals) - fmt.Printf("labelValuesForMatchersStream returning merging label values iterator for %q\n", name) - return newListLabelValues(vals) -} - -// newIntersectLabelValues returns an iterator over label values stemming from the intersection of the Postings -// iterators in its, minus the ones in notIts. -func newIntersectLabelValues(its, notIts []Postings) storage.LabelValues { - if len(its) == 0 { - return storage.EmptyLabelValues() - } - for _, p := range its { - if p == EmptyPostings() { - return storage.EmptyLabelValues() - } - } - - if len(its) == 1 { - it, ok := its[0].(postingsWithLabelValues) - if !ok { - return storage.ErrLabelValues(fmt.Errorf("input iterator should be a storage.LabelValues")) - } - if !it.Next() { - return storage.EmptyLabelValues() - } - if it.Err() != nil { - storage.ErrLabelValues(it.Err()) - } - return newListLabelValues([]string{it.ValueAt()}) - } - - return &intersectPostingsWithLabel{ - arr: its, - remove: Merge(notIts...), - } -} - -type intersectPostingsWithLabel struct { - arr []Postings - remove Postings - curSeries storage.SeriesRef - cur string -} - -func (it *intersectPostingsWithLabel) At() string { - return it.cur -} - -func (it *intersectPostingsWithLabel) Next() bool { - // Advance all iterators in it.arr and record the maximum of their current series IDs as a starting point - for _, p := range it.arr { - if !p.Next() { - return false - } - - cur := p.At() - if cur > it.curSeries { - // This is a candidate, but make sure it's not in the remove set - if ok := it.remove.Seek(cur); ok && cur == it.remove.At() { - continue - } - it.curSeries = p.At() - } - } - - return it.doNext() +type labelValuesV1 struct { + it *reflect.MapIter + matchers []*labels.Matcher } -func (it *intersectPostingsWithLabel) doNext() bool { - // Find next series ref greater than the current one, which all sub-iterators have in common +func (l *labelValuesV1) Next() bool { loop: - for { - for _, p := range it.arr { - if !p.Seek(it.curSeries) { - // Unable to find a common series ref >= the current one - return false - } - cur := p.At() - - if cur > it.curSeries { - // This is a candidate, but make sure it's not in the remove set - if ok := it.remove.Seek(cur); ok && cur == it.remove.At() { - continue - } - - it.curSeries = cur + for l.it.Next() { + for _, m := range l.matchers { + if !m.Matches(l.At()) { continue loop } - - if lvIt, ok := p.(postingsWithLabelValues); ok { - it.cur = lvIt.ValueAt() - } } - // All sub-iterators are currently aligned on the same series ref + // This entry satisfies all matchers return true } -} -func (it *intersectPostingsWithLabel) Seek(id storage.SeriesRef) bool { - if it.curSeries >= id { - return true - } - - it.curSeries = id - return it.doNext() -} - -func (it *intersectPostingsWithLabel) Err() error { - for _, p := range it.arr { - if p.Err() != nil { - return p.Err() - } - } - return nil -} - -func (*intersectPostingsWithLabel) Warnings() storage.Warnings { - return nil -} - -// postingsForMatcher returns a Postings iterator matching m. -func (r *Reader) postingsForMatcher(m *labels.Matcher) (Postings, error) { - // This method will not return postings for missing labels. - - // Fast-path for equal matching. - if m.Type == labels.MatchEqual { - return r.Postings(m.Name, m.Value) - } - - // Fast-path for set matching. - if m.Type == labels.MatchRegexp { - setMatches := m.SetMatches() - if len(setMatches) > 0 { - return r.Postings(m.Name, setMatches...) - } - } - - vals, err := r.LabelValues(m.Name) - if err != nil { - return nil, err - } - - var res []string - for _, val := range vals { - if m.Matches(val) { - res = append(res, val) - } - } - - if len(res) == 0 { - return EmptyPostings(), nil - } - - return r.Postings(m.Name, res...) -} - -// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher. -func (r *Reader) inversePostingsForMatcher(m *labels.Matcher) (Postings, error) { - // Fast-path for MatchNotRegexp matching. - // Inverse of a MatchNotRegexp is MatchRegexp (double negation). - // Fast-path for set matching. - if m.Type == labels.MatchNotRegexp { - setMatches := m.SetMatches() - if len(setMatches) > 0 { - return r.Postings(m.Name, setMatches...) - } - } - - // Fast-path for MatchNotEqual matching. - // Inverse of a MatchNotEqual is MatchEqual (double negation). - if m.Type == labels.MatchNotEqual { - return r.Postings(m.Name, m.Value) - } - - vals, err := r.LabelValues(m.Name) - if err != nil { - return nil, err - } - - var res []string - // If the inverse match is ="", we just want all the values. - if m.Type == labels.MatchEqual && m.Value == "" { - res = vals - } else { - for _, val := range vals { - if !m.Matches(val) { - res = append(res, val) - } - } - } - - return r.Postings(m.Name, res...) -} - -type labelValuesV1 struct { - it *reflect.MapIter - matchers []*labels.Matcher -} - -func (l *labelValuesV1) Next() bool { - // TODO: Implement matchers - return l.it.Next() + return false } func (l *labelValuesV1) At() string { @@ -423,42 +147,30 @@ func (*labelValuesV1) Warnings() storage.Warnings { return nil } -// postingsWithLabelValues is a Postings iterator also tracking label values. -type postingsWithLabelValues interface { - Postings - - // ValueAt returns the current label value. - ValueAt() string -} - -// postingsWithLabel returns a postingsWithLabelValues iterator over postings (series refs) with the specified -// label name, that also tracks the associated label values. -func (r *Reader) postingsWithLabel(name string) postingsWithLabelValues { +// PostingsWithLabel implements tsdb.IndexReader. +func (r *Reader) PostingsWithLabel(name string) PostingsWithLabelValues { if r.version == FormatV1 { e := r.postingsV1[name] if len(e) == 0 { - return &wrapPostingsWithLabelValue{p: EmptyPostings()} + return NewEmptyWrapPostingsWithLabelValue() } - var res []postingsWithLabelValues + var res []PostingsWithLabelValues for val, off := range e { // Read from the postings table. d := encoding.NewDecbufAt(r.b, int(off), castagnoliTable) _, p, err := r.dec.Postings(d.Get()) if err != nil { - return &wrapPostingsWithLabelValue{err: errors.Wrap(err, "decode postings")} + return NewErrWrapPostingsWithLabelValue(errors.Wrap(err, "decode postings")) } - res = append(res, &wrapPostingsWithLabelValue{ - value: val, - p: p, - }) + res = append(res, NewWrapPostingsWithLabelValue(p, val)) } return newMergedPostingsWithLabelValues(res) } e := r.postings[name] if len(e) == 0 { - return &wrapPostingsWithLabelValue{p: EmptyPostings()} + return NewEmptyWrapPostingsWithLabelValue() } d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil) @@ -467,7 +179,7 @@ func (r *Reader) postingsWithLabel(name string) postingsWithLabelValues { lastVal := e[len(e)-1].value skip := 0 - var res []postingsWithLabelValues + var res []PostingsWithLabelValues for d.Err() == nil { if skip == 0 { // These are always the same number of bytes, @@ -479,79 +191,98 @@ func (r *Reader) postingsWithLabel(name string) postingsWithLabelValues { } else { d.Skip(skip) } - v := yoloString(d.UvarintBytes()) // Label value. + // Label value + v := yoloString(d.UvarintBytes()) postingsOff := d.Uvarint64() // Read from the postings table d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) _, p, err := r.dec.Postings(d2.Get()) if err != nil { - return &wrapPostingsWithLabelValue{err: errors.Wrap(err, "decode postings")} + return NewErrWrapPostingsWithLabelValue(errors.Wrap(err, "decode postings")) } - res = append(res, &wrapPostingsWithLabelValue{ - value: v, - p: p, - }) + res = append(res, NewWrapPostingsWithLabelValue(p, v)) if v == lastVal { break } } if d.Err() != nil { - return &wrapPostingsWithLabelValue{err: errors.Wrap(d.Err(), "get postings offset entry")} + return NewErrWrapPostingsWithLabelValue(errors.Wrap(d.Err(), "get postings offset entry")) } return newMergedPostingsWithLabelValues(res) } -// wrapPostingsWithLabelValue is a Postings iterator, that also wraps the label value they have in common. -type wrapPostingsWithLabelValue struct { +// NewWrapPostingsWithLabelValue returns a WrapPostingsWithLabelValue iterator over p, associated with label value v. +func NewWrapPostingsWithLabelValue(p Postings, v string) *WrapPostingsWithLabelValue { + return &WrapPostingsWithLabelValue{ + p: p, + value: v, + } +} + +// NewErrWrapPostingsWithLabelValue returns a WrapPostingsWithLabelValue iterator failing with err. +func NewErrWrapPostingsWithLabelValue(err error) *WrapPostingsWithLabelValue { + return &WrapPostingsWithLabelValue{ + p: ErrPostings(err), + } +} + +// NewEmptyWrapPostingsWithLabelValue returns an empty WrapPostingsWithLabelValue iterator. +func NewEmptyWrapPostingsWithLabelValue() *WrapPostingsWithLabelValue { + return &WrapPostingsWithLabelValue{ + p: EmptyPostings(), + } +} + +// WrapPostingsWithLabelValue is a Postings iterator, that also wraps the label value they have in common. +type WrapPostingsWithLabelValue struct { value string p Postings - err error } -func (it *wrapPostingsWithLabelValue) Next() bool { +func (it *WrapPostingsWithLabelValue) Next() bool { return it.p.Next() } // At returns the current series ref. -func (it *wrapPostingsWithLabelValue) At() storage.SeriesRef { +func (it *WrapPostingsWithLabelValue) At() storage.SeriesRef { return it.p.At() } // ValueAt returns the associated label value. -func (it *wrapPostingsWithLabelValue) ValueAt() string { +func (it *WrapPostingsWithLabelValue) ValueAt() string { return it.value } -func (it *wrapPostingsWithLabelValue) Seek(v storage.SeriesRef) bool { - return it.p.Seek(v) +// Seek tries to advance to the first series ref >= sr and returns true if it could, false otherwise. +func (it *WrapPostingsWithLabelValue) Seek(sr storage.SeriesRef) bool { + return it.p.Seek(sr) } -func (it *wrapPostingsWithLabelValue) Err() error { - if it.err != nil { - return it.err - } +// Err returns any error encountered. +func (it *WrapPostingsWithLabelValue) Err() error { return it.p.Err() } -func (*wrapPostingsWithLabelValue) Warnings() storage.Warnings { +func (*WrapPostingsWithLabelValue) Warnings() storage.Warnings { return nil } -type listLabelValues struct { +// ListLabelValues is an iterator over a slice of label values. +type ListLabelValues struct { cur string values []string } -func newListLabelValues(values []string) *listLabelValues { - return &listLabelValues{ +func NewListLabelValues(values []string) *ListLabelValues { + return &ListLabelValues{ values: values, } } -func (l *listLabelValues) Next() bool { +func (l *ListLabelValues) Next() bool { if len(l.values) == 0 { return false } @@ -561,19 +292,19 @@ func (l *listLabelValues) Next() bool { return true } -func (l *listLabelValues) At() string { +func (l *ListLabelValues) At() string { return l.cur } -func (*listLabelValues) Err() error { +func (*ListLabelValues) Err() error { return nil } -func (*listLabelValues) Warnings() storage.Warnings { +func (*ListLabelValues) Warnings() storage.Warnings { return nil } -func newMergedPostingsWithLabelValues(postings []postingsWithLabelValues) *mergedPostingsWithLabelValues { +func newMergedPostingsWithLabelValues(postings []PostingsWithLabelValues) *mergedPostingsWithLabelValues { if len(postings) == 0 { return &mergedPostingsWithLabelValues{} } @@ -591,7 +322,7 @@ func newMergedPostingsWithLabelValues(postings []postingsWithLabelValues) *merge return &mergedPostingsWithLabelValues{h: h} } -// mergedPostingsWithLabelValues is an iterator that merges postingsWithLabelValues iterators, based on their +// mergedPostingsWithLabelValues is an iterator that merges PostingsWithLabelValues iterators, based on their // respective series references. type mergedPostingsWithLabelValues struct { h postingsWithLabelValuesHeap @@ -695,14 +426,14 @@ func (mergedPostingsWithLabelValues) Warnings() storage.Warnings { return nil } -type postingsWithLabelValuesHeap []postingsWithLabelValues +type postingsWithLabelValuesHeap []PostingsWithLabelValues func (h postingsWithLabelValuesHeap) Len() int { return len(h) } func (h postingsWithLabelValuesHeap) Less(i, j int) bool { return h[i].At() < h[j].At() } func (h *postingsWithLabelValuesHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] } func (h *postingsWithLabelValuesHeap) Push(x interface{}) { - *h = append(*h, x.(postingsWithLabelValues)) + *h = append(*h, x.(PostingsWithLabelValues)) } func (h *postingsWithLabelValuesHeap) Pop() interface{} { diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 8b7c869c1..4623a8333 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -152,10 +152,33 @@ func (p *MemPostings) LabelValuesStream(name string, matchers ...*labels.Matcher defer p.mtx.RUnlock() values := make([]string, 0, len(p.m[name])) +loop: for v := range p.m[name] { + for _, m := range matchers { + if !m.Matches(v) { + continue loop + } + } values = append(values, v) } - return newListLabelValues(values) + return NewListLabelValues(values) +} + +// PostingsWithLabel implements tsdb.IndexReader. +func (p *MemPostings) PostingsWithLabel(name string) PostingsWithLabelValues { + p.mtx.RLock() + defer p.mtx.RUnlock() + + e := p.m[name] + if len(e) == 0 { + return NewEmptyWrapPostingsWithLabelValue() + } + + var res []PostingsWithLabelValues + for val, srs := range e { + res = append(res, NewWrapPostingsWithLabelValue(newListPostings(srs...), val)) + } + return newMergedPostingsWithLabelValues(res) } // PostingsStats contains cardinality based statistics for postings. diff --git a/tsdb/labelvalues.go b/tsdb/labelvalues.go new file mode 100644 index 000000000..7ff3a80fc --- /dev/null +++ b/tsdb/labelvalues.go @@ -0,0 +1,219 @@ +package tsdb + +import ( + "fmt" + "sort" + + "golang.org/x/exp/maps" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/index" +) + +func labelValuesForMatchersStream(r IndexReader, name string, matchers []*labels.Matcher) storage.LabelValues { + // See which labels must be non-empty. + // Optimization for case like {l=~".", l!="1"}. + labelMustBeSet := make(map[string]bool, len(matchers)) + for _, m := range matchers { + if !m.Matches("") { + labelMustBeSet[m.Name] = true + } + } + + fmt.Printf("labelValuesForMatchersStream getting label values iterator for %q\n", name) + + // Make sure to intersect with series containing the label name + pit := r.PostingsWithLabel(name) + if pit.Err() != nil { + return storage.ErrLabelValues(pit.Err()) + } + if index.IsEmptyPostingsType(pit) { + return storage.EmptyLabelValues() + } + its := []index.Postings{pit} + var notIts []index.Postings + for _, m := range matchers { + switch { + case labelMustBeSet[m.Name]: + // If this matcher must be non-empty, we can be smarter. + matchesEmpty := m.Matches("") + isNot := m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp + switch { + case isNot && matchesEmpty: // l!="foo" + // If the label can't be empty and is a Not and the inner matcher + // doesn't match empty, then subtract it out at the end. + inverse, err := m.Inverse() + if err != nil { + return storage.ErrLabelValues(err) + } + + it, err := postingsForMatcher(r, inverse) + if err != nil { + return storage.ErrLabelValues(err) + } + notIts = append(notIts, it) + case isNot && !matchesEmpty: // l!="" + // If the label can't be empty and is a Not, but the inner matcher can + // be empty we need to use inversePostingsForMatcher. + inverse, err := m.Inverse() + if err != nil { + return storage.ErrLabelValues(err) + } + + it, err := inversePostingsForMatcher(r, inverse) + if err != nil { + return storage.ErrLabelValues(err) + } + if index.IsEmptyPostingsType(it) { + return storage.EmptyLabelValues() + } + its = append(its, it) + default: // l="a" + // Non-Not matcher, use normal postingsForMatcher. + it, err := postingsForMatcher(r, m) + if err != nil { + return storage.ErrLabelValues(err) + } + if index.IsEmptyPostingsType(it) { + return storage.EmptyLabelValues() + } + its = append(its, it) + } + default: // l="" + // If a matcher for a labelname selects an empty value, it selects all + // the series which don't have the label name set too. See: + // https://github.com/prometheus/prometheus/issues/3575 and + // https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555 + it, err := inversePostingsForMatcher(r, m) + if err != nil { + return storage.ErrLabelValues(err) + } + notIts = append(notIts, it) + } + } + + it := newIntersectLabelValues(its, notIts) + + // Fill in a map for de-duplication + m := map[string]struct{}{} + for it.Next() { + m[it.At()] = struct{}{} + } + if it.Err() != nil { + return storage.ErrLabelValues(it.Err()) + } + + vals := make([]string, 0, len(m)) + for v := range m { + vals = append(vals, v) + } + maps.Clear(m) + // The returned label values have to be sorted, for merging of results from different blocks to work + sort.Strings(vals) + fmt.Printf("labelValuesForMatchersStream returning label values iterator for %q\n", name) + return index.NewListLabelValues(vals) +} + +// newIntersectLabelValues returns an iterator over label values stemming from the intersection of the Postings +// iterators in its, minus the ones in notIts. +func newIntersectLabelValues(its, notIts []index.Postings) storage.LabelValues { + if len(its) == 0 { + return storage.EmptyLabelValues() + } + for _, p := range its { + if p == index.EmptyPostings() { + return storage.EmptyLabelValues() + } + } + + return &intersectPostingsWithLabel{ + arr: its, + remove: index.Merge(notIts...), + } +} + +type intersectPostingsWithLabel struct { + arr []index.Postings + remove index.Postings + curSeries storage.SeriesRef + cur string +} + +func (it *intersectPostingsWithLabel) At() string { + return it.cur +} + +func (it *intersectPostingsWithLabel) Next() bool { + // Advance all iterators in it.arr and record the maximum of their current series IDs as a starting point + for _, p := range it.arr { + if !p.Next() { + return false + } + + cur := p.At() + if cur > it.curSeries { + // This is a candidate, but make sure it's not in the remove set + if ok := it.remove.Seek(cur); ok && cur == it.remove.At() { + continue + } + it.curSeries = p.At() + } + } + + return it.doNext() +} + +func (it *intersectPostingsWithLabel) doNext() bool { + // Find next series ref greater than the current one, which all sub-iterators have in common +loop: + for { + for _, p := range it.arr { + if !p.Seek(it.curSeries) { + // Unable to find a common series ref >= the current one + return false + } + cur := p.At() + + if cur > it.curSeries { + // This is a candidate, but make sure it's not in the remove set + if ok := it.remove.Seek(cur); ok && cur == it.remove.At() { + continue + } + + it.curSeries = cur + continue loop + } + + if lvIt, ok := p.(index.PostingsWithLabelValues); ok { + it.cur = lvIt.ValueAt() + } + } + + // All sub-iterators are currently aligned on the same series ref + return true + } +} + +func (it *intersectPostingsWithLabel) Seek(id storage.SeriesRef) bool { + if it.curSeries >= id { + return true + } + + it.curSeries = id + return it.doNext() +} + +func (it *intersectPostingsWithLabel) Err() error { + for _, p := range it.arr { + if p.Err() != nil { + return p.Err() + } + } + + return it.remove.Err() +} + +func (*intersectPostingsWithLabel) Warnings() storage.Warnings { + return nil +} diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index e28853d0c..1c88fd7da 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -183,8 +183,18 @@ func (oh *OOOHeadIndexReader) LabelValuesStream(name string, matchers ...*labels return nil } - // TODO: Implement matchers - return oh.head.postings.LabelValuesStream(name) + ownMatchers := 0 + for _, m := range matchers { + if m.Name == name { + ownMatchers++ + } + } + if ownMatchers == len(matchers) { + return oh.head.postings.LabelValuesStream(name, matchers...) + } + + // There are matchers on other label names than the requested one, so will need to intersect matching series + return labelValuesForMatchersStream(oh, name, matchers) } type chunkMetaAndChunkDiskMapperRef struct { @@ -431,6 +441,10 @@ func (ir *OOOCompactionHeadIndexReader) ShardedPostings(p index.Postings, shardI return ir.ch.oooIR.ShardedPostings(p, shardIndex, shardCount) } +func (ir *OOOCompactionHeadIndexReader) PostingsWithLabel(name string) index.PostingsWithLabelValues { + return ir.ch.oooIR.PostingsWithLabel(name) +} + func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { return ir.ch.oooIR.series(ref, builder, chks, ir.ch.lastMmapRef) } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index a4d0bc06f..2a8b173b2 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -1523,7 +1523,7 @@ func (m mockIndex) LabelValues(name string, matchers ...*labels.Matcher) ([]stri func (m mockIndex) LabelValuesStream(name string, matchers ...*labels.Matcher) storage.LabelValues { // TODO - return nil + return storage.EmptyLabelValues() } func (m mockIndex) LabelValueFor(id storage.SeriesRef, label string) (string, error) { @@ -1607,6 +1607,10 @@ func (m mockIndex) ShardedPostings(p index.Postings, shardIndex, shardCount uint return index.NewListPostings(out) } +func (m mockIndex) PostingsWithLabel(name string) index.PostingsWithLabelValues { + panic("not implemented") +} + func (m mockIndex) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { s, ok := m.series[ref] if !ok {