Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Config: Renamed `parquet_queryable_shard_cache_size` to `parquet_shard_cache_size` and `parquet_queryable_shard_cache_ttl` to `parquet_shard_cache_ttl`.
* [FEATURE] StoreGateway: Introduces a new parquet mode. #7046
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
* [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152
* [ENHANCEMENT] StoreGateway: Add tracings to parquet mode. #7125
* [ENHANCEMENT] Alertmanager: Upgrade alertmanger to 0.29.0 and add a new incidentIO integration. #7092
* [ENHANCEMENT] Querier: Add a `-querier.parquet-queryable-shard-cache-ttl` flag to add TTL to parquet shard cache. #7098
Expand Down
12 changes: 12 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,18 @@ querier:
# queryable.
# CLI flag: -querier.parquet-queryable-fallback-disabled
[parquet_queryable_fallback_disabled: <boolean> | default = false]

# [Experimental] If true, parquet queryable will honor projection hints and
# only materialize requested labels. Projection is only applied when all
# queried blocks are parquet blocks and not querying ingesters.
# CLI flag: -querier.parquet-queryable-honor-projection-hints
[parquet_queryable_honor_projection_hints: <boolean> | default = false]

# [Experimental] Time buffer to use when checking if query overlaps with
# ingester data. Projection hints are disabled if query time range overlaps
# with (now - query-ingesters-within - buffer).
# CLI flag: -querier.parquet-queryable-projection-hints-ingester-buffer
[parquet_queryable_projection_hints_ingester_buffer: <duration> | default = 1h]
```

### `blocks_storage_config`
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4827,6 +4827,18 @@ thanos_engine:
# need to make sure Parquet files are created before it is queryable.
# CLI flag: -querier.parquet-queryable-fallback-disabled
[parquet_queryable_fallback_disabled: <boolean> | default = false]

# [Experimental] If true, parquet queryable will honor projection hints and only
# materialize requested labels. Projection is only applied when all queried
# blocks are parquet blocks and not querying ingesters.
# CLI flag: -querier.parquet-queryable-honor-projection-hints
[parquet_queryable_honor_projection_hints: <boolean> | default = false]

# [Experimental] Time buffer to use when checking if query overlaps with
# ingester data. Projection hints are disabled if query time range overlaps with
# (now - query-ingesters-within - buffer).
# CLI flag: -querier.parquet-queryable-projection-hints-ingester-buffer
[parquet_queryable_projection_hints_ingester_buffer: <duration> | default = 1h]
```

### `query_frontend_config`
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/oklog/ulid/v2 v2.1.1
github.com/parquet-go/parquet-go v0.25.1
github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71
github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94
github.com/prometheus/client_golang/exp v0.0.0-20250914183048-a974e0d45e0a
github.com/prometheus/procfs v0.16.1
github.com/sercand/kuberesolver/v5 v5.1.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1634,8 +1634,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71 h1:BwrzRNGy0GbnBA7rQd85G6NuFvydvwTXxRB9XiA5TXk=
github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY=
github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94 h1:6WmPxbqGMjBKLOZvurIZR5eEBF0Rd0t1oQ06PMWaHe8=
github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY=
github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4=
github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis=
github.com/prometheus/alertmanager v0.29.0 h1:/ET4NmAGx2Dv9kStrXIBqBgHyiSgIk4OetY+hoZRfgc=
Expand Down
214 changes: 214 additions & 0 deletions integration/parquet_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"fmt"
"math/rand"
"path/filepath"
"slices"
"strconv"
"testing"
"time"

"github.com/cortexproject/promqlsmith"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
Expand All @@ -23,7 +25,9 @@ import (
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/util/log"
cortex_testutil "github.com/cortexproject/cortex/pkg/util/test"
)
Expand Down Expand Up @@ -176,3 +180,213 @@ func TestParquetFuzz(t *testing.T) {
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
}

func TestParquetProjectionPushdownFuzz(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

consul := e2edb.NewConsulWithName("consul")
memcached := e2ecache.NewMemcached()
require.NoError(t, s.StartAndWaitReady(consul, memcached))

baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
flags := mergeFlags(
baseFlags,
map[string]string{
"-target": "all,parquet-converter",
"-blocks-storage.tsdb.block-ranges-period": "1m,24h",
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s",
"-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s",
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-querier.query-store-for-labels-enabled": "true",
// compactor
"-compactor.cleanup-interval": "1s",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
// Store-gateway.
"-store-gateway.sharding-enabled": "false",
"--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
// parquet-converter
"-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-parquet-converter.conversion-interval": "1s",
"-parquet-converter.enabled": "true",
// Querier - Enable Thanos engine with projection optimizer
"-querier.thanos-engine": "true",
"-querier.optimizers": "propagate-matchers,sort-matchers,merge-selects,detect-histogram-stats,projection", // Enable all optimizers including projection
"-querier.enable-parquet-queryable": "true",
"-querier.parquet-queryable-honor-projection-hints": "true", // Honor projection hints
// Set query-ingesters-within to 2h so queries older than 2h don't hit ingesters
// Since test queries are 24-48h old, they won't query ingesters and projection will be enabled
"-querier.query-ingesters-within": "2h",
// Enable cache for parquet labels and chunks
"-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached",
"-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-blocks-storage.bucket-store.chunks-cache.backend": "inmemory,memcached",
"-blocks-storage.bucket-store.chunks-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
},
)

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

ctx := context.Background()
rnd := rand.New(rand.NewSource(time.Now().Unix()))
dir := filepath.Join(s.SharedDir(), "data")
numSeries := 20
numSamples := 100
lbls := make([]labels.Labels, 0, numSeries)
scrapeInterval := time.Minute
statusCodes := []string{"200", "400", "404", "500", "502"}
methods := []string{"GET", "POST", "PUT", "DELETE"}
now := time.Now()
// Make sure query time is old enough to not overlap with ingesters.
start := now.Add(-time.Hour * 72)
end := now.Add(-time.Hour * 48)

// Create series with multiple labels
for i := range numSeries {
lbls = append(lbls, labels.FromStrings(
labels.MetricName, "http_requests_total",
"job", "api-server",
"instance", fmt.Sprintf("instance-%d", i%5),
"status_code", statusCodes[i%len(statusCodes)],
"method", methods[i%len(methods)],
"path", fmt.Sprintf("/api/v1/endpoint%d", i%3),
"cluster", "test-cluster",
))
}

id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10)
require.NoError(t, err)
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))

storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, err)
bkt := storage.GetBucket()
userBucket := bucket.NewUserBucketClient("user-1", bkt, nil)

err = block.Upload(ctx, log.Logger, userBucket, filepath.Join(dir, id.String()), metadata.NoneFunc)
require.NoError(t, err)

// Wait until we convert the blocks to parquet AND bucket index is updated
cortex_testutil.Poll(t, 300*time.Second, true, func() interface{} {
// Check if parquet marker exists
markerFound := false
err := userBucket.Iter(context.Background(), "", func(name string) error {
if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) {
markerFound = true
}
return nil
}, objstore.WithRecursiveIter())
if err != nil || !markerFound {
return false
}

// Check if bucket index exists AND contains the parquet block metadata
idx, err := bucketindex.ReadIndex(ctx, bkt, "user-1", nil, log.Logger)
if err != nil {
return false
}

// Verify the block is in the bucket index with parquet metadata
for _, b := range idx.Blocks {
if b.ID == id && b.Parquet != nil {
require.True(t, b.Parquet.Version == cortex_parquet.CurrentVersion)
return true
}
}
return false
})

c, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

testCases := []struct {
name string
query string
expectedLabels []string // Labels that should be present in result
}{
{
name: "vector selector query should not use projection",
query: `http_requests_total`,
expectedLabels: []string{"__name__", "job", "instance", "status_code", "method", "path", "cluster"},
},
{
name: "simple_sum_by_job",
query: `sum by (job) (http_requests_total)`,
expectedLabels: []string{"job"},
},
{
name: "rate_with_aggregation",
query: `sum by (method) (rate(http_requests_total[5m]))`,
expectedLabels: []string{"method"},
},
{
name: "multiple_grouping_labels",
query: `sum by (job, status_code) (http_requests_total)`,
expectedLabels: []string{"job", "status_code"},
},
{
name: "aggregation without query",
query: `sum without (instance, method) (http_requests_total)`,
expectedLabels: []string{"job", "status_code", "path", "cluster"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
t.Logf("Testing: %s", tc.query)

// Execute instant query
result, err := c.Query(tc.query, end)
require.NoError(t, err)
require.NotNil(t, result)

// Verify we got results
vector, ok := result.(model.Vector)
require.True(t, ok, "result should be a vector")
require.NotEmpty(t, vector, "query should return results")

t.Logf("Query returned %d series", len(vector))

// Verify projection worked: series should only have the expected labels
for _, sample := range vector {
actualLabels := make(map[string]struct{})
for label := range sample.Metric {
actualLabels[string(label)] = struct{}{}
}

// Check that all expected labels are present
for _, expectedLabel := range tc.expectedLabels {
_, ok := actualLabels[expectedLabel]
require.True(t, ok,
"series should have %s label", expectedLabel)
}

// Check that no unexpected labels are present
for lbl := range actualLabels {
if !slices.Contains(tc.expectedLabels, lbl) {
require.Fail(t, "series should not have unexpected label: %s", lbl)
}
}
}
})
}

// Verify that parquet blocks were queried
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
}
Loading
Loading