diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 33fb4732dc..1093a8afc9 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -35,6 +35,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/index" + "github.com/prometheus/prometheus/util/zeropool" "github.com/weaveworks/common/httpgrpc" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -125,8 +126,22 @@ const ( var ( errBlockSyncConcurrencyNotValid = errors.New("the block sync concurrency must be equal or greater than 1.") hashPool = sync.Pool{New: func() interface{} { return xxhash.New() }} + postingsPool zeropool.Pool[[]storage.SeriesRef] ) +func getPostingsSlice() []storage.SeriesRef { + if p := postingsPool.Get(); p != nil { + return p + } + + // Pre-allocate slice with initial capacity. + return make([]storage.SeriesRef, 0, 1024) +} + +func putPostingsSlice(p []storage.SeriesRef) { + postingsPool.Put(p[:0]) +} + type bucketStoreMetrics struct { blocksLoaded prometheus.Gauge blockLoads prometheus.Counter @@ -2549,6 +2564,10 @@ type bucketIndexReader struct { indexVersion int logger log.Logger + + // Posting slice to return to the postings pool on close. + // A single bucketIndexReader should have at most 1 postings slice to return. + postings []storage.SeriesRef } func newBucketIndexReader(block *bucketBlock, logger log.Logger) *bucketIndexReader { @@ -2678,13 +2697,13 @@ func (r *bucketIndexReader) ExpandedPostings( // ExpandPostingsWithContext returns the postings expanded as a slice and considers context. func ExpandPostingsWithContext(ctx context.Context, p index.Postings) ([]storage.SeriesRef, error) { - res := make([]storage.SeriesRef, 0, 1024) // Pre-allocate slice with initial capacity + res := getPostingsSlice() i := 0 for p.Next() { i++ if i%checkContextEveryNIterations == 0 { if err := ctx.Err(); err != nil { - return nil, err + return res, err } } res = append(res, p.At()) @@ -2978,6 +2997,7 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, } ps, err := ExpandPostingsWithContext(ctx, p) + r.postings = ps if err != nil { level.Error(r.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) return false, nil, nil @@ -3414,6 +3434,10 @@ func (r *bucketIndexReader) LoadSeriesForTime(ref storage.SeriesRef, lset *[]sym // Close released the underlying resources of the reader. func (r *bucketIndexReader) Close() error { r.block.pendingReaders.Done() + + if r.postings != nil { + putPostingsSlice(r.postings) + } return nil } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 714ce21ca3..85030c846a 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2979,7 +2979,7 @@ func TestExpandPostingsWithContextCancel(t *testing.T) { res, err := ExpandPostingsWithContext(ctx, p) testutil.NotOk(t, err) testutil.Equals(t, context.Canceled, err) - testutil.Equals(t, []storage.SeriesRef(nil), res) + testutil.Equals(t, true, cap(res) == 1024) } func samePostingGroup(a, b *postingGroup) bool { diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index 783e04b31b..57b48cc342 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -310,6 +310,7 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post return nil, nil, err } ps, err := ExpandPostingsWithContext(ctx, result) + r.postings = ps if err != nil { return nil, nil, errors.Wrap(err, "expand") }