Skip to content

Commit

Permalink
Implement for head index reader
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed Sep 5, 2023
1 parent 5feed1d commit abe07c3
Show file tree
Hide file tree
Showing 13 changed files with 422 additions and 397 deletions.
2 changes: 1 addition & 1 deletion storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,6 @@ type LabelValues interface {
// When an error occurs, the iterator cannot continue.
Err() error
// Warnings is a collection of warnings that have occurred during iteration.
// Warnings could be non-empty even if iteration has not failed with error.
// Warnings could be non-empty even if iteration has not failed with an error.
Warnings() Warnings
}
10 changes: 5 additions & 5 deletions storage/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,30 +1032,30 @@ func (m *mockGenericQuerier) LabelValuesStream(name string, matchers ...*labels.
}

type labelValuesList struct {
i int
cur string
values []string
warnings Warnings
}

func newLabelValuesList(values []string, warnings Warnings) *labelValuesList {
return &labelValuesList{
i: -1,
values: values,
warnings: warnings,
}
}

func (l *labelValuesList) Next() bool {
if l.i >= len(l.values)-1 {
if len(l.values) == 0 {
return false
}

l.i++
l.cur = l.values[0]
l.values = l.values[1:]
return true
}

func (l *labelValuesList) At() string {
return l.values[l.i]
return l.cur
}

func (*labelValuesList) Err() error {
Expand Down
3 changes: 1 addition & 2 deletions storage/secondary.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ func (s *secondaryQuerier) LabelValues(name string, matchers ...*labels.Matcher)
}

func (s *secondaryQuerier) LabelValuesStream(name string, matchers ...*labels.Matcher) LabelValues {
it := s.genericQuerier.LabelValuesStream(name, matchers...)
return &secondaryLabelValues{
it: it,
it: s.genericQuerier.LabelValuesStream(name, matchers...),
}
}

Expand Down
21 changes: 20 additions & 1 deletion tsdb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ type IndexReader interface {
// the series labels mod shardCount, using a hash function which is consistent over time.
ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings

// PostingsWithLabel returns a PostingsWithLabelValues iterator over postings (series refs) with the specified
// label name, that also tracks the associated label values.
PostingsWithLabel(name string) index.PostingsWithLabelValues

// Series populates the given builder and chunk metas for the series identified
// by the reference.
// Returns storage.ErrNotFound if the ref does not resolve to a known series.
Expand Down Expand Up @@ -505,7 +509,18 @@ func (r blockIndexReader) LabelValues(name string, matchers ...*labels.Matcher)
}

func (r blockIndexReader) LabelValuesStream(name string, matchers ...*labels.Matcher) storage.LabelValues {
return r.ir.LabelValuesStream(name, matchers...)
ownMatchers := 0
for _, m := range matchers {
if m.Name == name {
ownMatchers++
}
}
if ownMatchers == len(matchers) {
return r.ir.LabelValuesStream(name, matchers...)
}

// There are matchers on other label names than the requested one, so will need to intersect matching series
return labelValuesForMatchersStream(r.ir, name, matchers)
}

func (r blockIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, error) {
Expand Down Expand Up @@ -536,6 +551,10 @@ func (r blockIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCou
return r.ir.ShardedPostings(p, shardIndex, shardCount)
}

func (r blockIndexReader) PostingsWithLabel(name string) index.PostingsWithLabelValues {
return r.ir.PostingsWithLabel(name)
}

func (r blockIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
if err := r.ir.Series(ref, builder, chks); err != nil {
return errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
Expand Down
19 changes: 8 additions & 11 deletions tsdb/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,6 @@ func TestLabelValuesWithMatchers(t *testing.T) {
}

func TestLabelValuesStream_WithMatchers(t *testing.T) {
tmpdir := t.TempDir()

var seriesEntries []storage.Series
for i := 0; i < 100; i++ {
seriesEntries = append(seriesEntries, storage.NewListSeries(labels.FromStrings(
Expand All @@ -289,19 +287,18 @@ func TestLabelValuesStream_WithMatchers(t *testing.T) {
), []tsdbutil.Sample{sample{100, 0, nil, nil}}))
}

blockDir := createBlock(t, tmpdir, seriesEntries)
blockDir := createBlock(t, t.TempDir(), seriesEntries)
files, err := sequenceFiles(chunkDir(blockDir))
require.NoError(t, err)
require.Greater(t, len(files), 0, "No chunk created.")
require.NotEmpty(t, files, "No chunk created.")

// Check open err.
block, err := OpenBlock(nil, blockDir, nil)
require.NoError(t, err)
defer func() { require.NoError(t, block.Close()) }()
t.Cleanup(func() { require.NoError(t, block.Close()) })

indexReader, err := block.Index()
require.NoError(t, err)
defer func() { require.NoError(t, indexReader.Close()) }()
t.Cleanup(func() { require.NoError(t, indexReader.Close()) })

var uniqueWithout30s []string
for i := 0; i < 100; i++ {
Expand All @@ -317,17 +314,17 @@ func TestLabelValuesStream_WithMatchers(t *testing.T) {
expectedValues []string
}{
{
name: "get tens based on unique id",
name: "get tens based on unique ID",
labelName: "tens",
matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "value35")},
expectedValues: []string{"value3"},
}, {
name: "get unique ids based on a ten",
name: "get unique IDs based on a ten",
labelName: "unique",
matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "tens", "value1")},
expectedValues: []string{"value10", "value11", "value12", "value13", "value14", "value15", "value16", "value17", "value18", "value19"},
}, {
name: "get tens by pattern matching on unique id",
name: "get tens by pattern matching on unique ID",
labelName: "tens",
matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "unique", "value[5-7]5")},
expectedValues: []string{"value5", "value6", "value7"},
Expand All @@ -337,7 +334,7 @@ func TestLabelValuesStream_WithMatchers(t *testing.T) {
matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "unique", "")},
expectedValues: []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"},
}, {
name: "get unique IDs based on tens not being equal to a certain value, while not emptyy",
name: "get unique IDs based on tens not being equal to a certain value, while not empty",
labelName: "unique",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotEqual, "tens", "value3"),
Expand Down
18 changes: 16 additions & 2 deletions tsdb/head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,18 @@ func (h *headIndexReader) LabelValuesStream(name string, matchers ...*labels.Mat
return nil
}

// TODO: Implement matchers
return h.head.postings.LabelValuesStream(name)
ownMatchers := 0
for _, m := range matchers {
if m.Name == name {
ownMatchers++
}
}
if ownMatchers == len(matchers) {
return h.head.postings.LabelValuesStream(name, matchers...)
}

// There are matchers on other label names than the requested one, so will need to intersect matching series
return labelValuesForMatchersStream(h, name, matchers)
}

// LabelNames returns all the unique label names present in the head
Expand Down Expand Up @@ -187,6 +197,10 @@ func (h *headIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCou
return index.NewListPostings(out)
}

func (h *headIndexReader) PostingsWithLabel(name string) index.PostingsWithLabelValues {
return h.head.postings.PostingsWithLabel(name)
}

// Series returns the series for the given reference.
func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
s := h.head.series.getByID(chunks.HeadSeriesRef(ref))
Expand Down
36 changes: 26 additions & 10 deletions tsdb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2437,7 +2437,7 @@ func TestHeadLabelValuesWithMatchers(t *testing.T) {
matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "unique", "value[5-7]5")},
expectedValues: []string{"value5", "value6", "value7"},
}, {
name: "get tens by matching for absence of unique label",
name: "get tens by matching for presence of unique label",
labelName: "tens",
matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "unique", "")},
expectedValues: []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"},
Expand Down Expand Up @@ -2474,47 +2474,63 @@ func TestHeadLabelValuesStream_WithMatchers(t *testing.T) {
}
require.NoError(t, app.Commit())

indexReader := head.indexRange(0, 200)
t.Cleanup(func() {
require.NoError(t, indexReader.Close())
})

var uniqueWithout30s []string
for i := 0; i < 100; i++ {
if i/10 != 3 {
uniqueWithout30s = append(uniqueWithout30s, fmt.Sprintf("value%d", i))
}
}
sort.Strings(uniqueWithout30s)
testCases := []struct {
name string
labelName string
matchers []*labels.Matcher
expectedValues []string
}{
{
name: "get tens based on unique id",
name: "get tens based on unique ID",
labelName: "tens",
matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "value35")},
expectedValues: []string{"value3"},
}, {
name: "get unique ids based on a ten",
name: "get unique IDs based on a ten",
labelName: "unique",
matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "tens", "value1")},
expectedValues: []string{"value10", "value11", "value12", "value13", "value14", "value15", "value16", "value17", "value18", "value19"},
}, {
name: "get tens by pattern matching on unique id",
name: "get tens by pattern matching on unique ID",
labelName: "tens",
matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "unique", "value[5-7]5")},
expectedValues: []string{"value5", "value6", "value7"},
}, {
name: "get tens by matching for absence of unique label",
name: "get tens by matching for presence of unique label",
labelName: "tens",
matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "unique", "")},
expectedValues: []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"},
}, {
name: "get unique IDs based on tens not being equal to a certain value, while not empty",
labelName: "unique",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotEqual, "tens", "value3"),
labels.MustNewMatcher(labels.MatchNotEqual, "tens", ""),
},
expectedValues: uniqueWithout30s,
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
headIdxReader := head.indexRange(0, 200)

it := headIdxReader.LabelValuesStream(tt.labelName, tt.matchers...)
it := indexReader.LabelValuesStream(tt.labelName, tt.matchers...)
var values []string
for it.Next() {
values = append(values, it.At())
}
require.NoError(t, it.Err())
require.Empty(t, it.Warnings())

require.Equal(t, tt.expectedValues, values)
})
}
Expand Down
21 changes: 5 additions & 16 deletions tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1543,16 +1543,11 @@ func (r *Reader) LabelValues(name string, matchers ...*labels.Matcher) ([]string
}

func (r *Reader) LabelValuesStream(name string, matchers ...*labels.Matcher) storage.LabelValues {
ownMatchers := 0
for _, m := range matchers {
if m.Name == name {
ownMatchers++
}
}

if r.version == FormatV1 {
p := r.postingsV1[name]
// TODO: Handle matchers on other label names
if len(p) == 0 {
return storage.EmptyLabelValues()
}
return &labelValuesV1{
matchers: matchers,
it: reflect.ValueOf(p).MapRange(),
Expand All @@ -1561,16 +1556,10 @@ func (r *Reader) LabelValuesStream(name string, matchers ...*labels.Matcher) sto

p := r.postings[name]
if len(p) == 0 {
return nil
}

if ownMatchers == len(matchers) {
// All matchers are for the requested label name
return r.newLabelValuesV2(name, matchers)
return storage.EmptyLabelValues()
}

// There are matchers on other label names than the requested one, so will need to intersect matching series
return r.labelValuesForMatchersStream(name, matchers)
return r.newLabelValuesV2(name, matchers)
}

// LabelNamesFor returns all the label names for the series referred to by IDs.
Expand Down
Loading

0 comments on commit abe07c3

Please sign in to comment.