Skip to content

Commit

Permalink
Merge branch 'main' of github.com:thanos-io/thanos
Browse files Browse the repository at this point in the history
  • Loading branch information
douglascamata committed Mar 11, 2024
2 parents 13e1558 + 7acce0c commit 48783e1
Show file tree
Hide file tree
Showing 103 changed files with 2,530 additions and 850 deletions.
6 changes: 3 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
make install-tool-deps
- go/save-cache
- setup_remote_docker:
version: 20.10.12
version: docker24
- run:
name: Create Secret if PR is not forked
# GCS integration tests are run only for author's PR that have write access, because these tests
Expand Down Expand Up @@ -82,7 +82,7 @@ jobs:
- git-shallow-clone/checkout
- go/mod-download-cached
- setup_remote_docker:
version: 20.10.12
version: docker24
- attach_workspace:
at: .
# Register qemu to support multi-arch.
Expand All @@ -104,7 +104,7 @@ jobs:
- git-shallow-clone/checkout
- go/mod-download-cached
- setup_remote_docker:
version: 20.10.12
version: docker24
- attach_workspace:
at: .
- run: make tarballs-release
Expand Down
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,26 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

- [#7083](https://github.com/thanos-io/thanos/pull/7083) Store Gateway: Fix lazy expanded postings with 0 length failed to be cached.
- [#7080](https://github.com/thanos-io/thanos/pull/7080) Receive: race condition in handler Close() when stopped early
- [#7132](https://github.com/thanos-io/thanos/pull/7132) Documentation: fix broken helm installation instruction
- [#7134](https://github.com/thanos-io/thanos/pull/7134) Store, Compact: Revert the recursive block listing mechanism introduced in https://github.com/thanos-io/thanos/pull/6474 and use the same strategy as in 0.31. Introduce a `--block-discovery-strategy` flag to control the listing strategy so that a recursive lister can still be used if the tradeoff of slower but cheaper discovery is preferred.
- [#7122](https://github.com/thanos-io/thanos/pull/7122) Store Gateway: Fix lazy expanded postings estimate base cardinality using posting group with remove keys.

### Added
- [#7105](https://github.com/thanos-io/thanos/pull/7105) Rule: add flag `--query.enable-x-functions` to allow usage of extended promql functions (xrate, xincrease, xdelta) in loaded rules
- [#6867](https://github.com/thanos-io/thanos/pull/6867) Query UI: Tenant input box added to the Query UI, in order to be able to specify which tenant the query should use.
- [#7175](https://github.com/thanos-io/thanos/pull/7175): Query: Add `--query.mode=distributed` which enables the new distributed mode of the Thanos query engine.

### Changed

- [#7123](https://github.com/thanos-io/thanos/pull/7123) Rule: Change default Alertmanager API version to v2.

### Removed

## [v0.34.1](https://github.com/thanos-io/thanos/tree/release-0.34) - 11.02.24

### Fixed

- [#7078](https://github.com/thanos-io/thanos/pull/7078) *: Bump gRPC to 1.57.2

### Added

Expand All @@ -29,6 +49,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6874](https://github.com/thanos-io/thanos/pull/6874) Sidecar: fix labels returned by 'api/v1/series' in presence of conflicting external and inner labels.
- [#7009](https://github.com/thanos-io/thanos/pull/7009) Rule: Fix spacing error in URL.
- [#7082](https://github.com/thanos-io/thanos/pull/7082) Stores: fix label values edge case when requesting external label values with matchers
- [#7114](https://github.com/thanos-io/thanos/pull/7114) Stores: fix file path bug for minio v7.0.61

### Added

Expand All @@ -44,6 +65,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6887](https://github.com/thanos-io/thanos/pull/6887) Query Frontend: *breaking :warning:* Add tenant label to relevant exported metrics. Note that this change may cause some pre-existing custom dashboard queries to be incorrect due to the added label.
- [#7028](https://github.com/thanos-io/thanos/pull/7028) Query|Query Frontend: Add new `--query-frontend.enable-x-functions` flag to enable experimental extended functions.
- [#6884](https://github.com/thanos-io/thanos/pull/6884) Tools: Add upload-block command to upload blocks to object storage.
- [#7010](https://github.com/thanos-io/thanos/pull/7010) Cache: Added `set_async_circuit_breaker_*` to utilize the circuit breaker pattern for dynamically thresholding asynchronous set operations.

### Changed

Expand Down
16 changes: 14 additions & 2 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,16 @@ func runCompact(
consistencyDelayMetaFilter := block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg))
timePartitionMetaFilter := block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime)

baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg))
var blockLister block.Lister
switch syncStrategy(conf.blockListStrategy) {
case concurrentDiscovery:
blockLister = block.NewConcurrentLister(logger, insBkt)
case recursiveDiscovery:
blockLister = block.NewRecursiveLister(logger, insBkt)
default:
return errors.Errorf("unknown sync strategy %s", conf.blockListStrategy)
}
baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, blockLister, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand Down Expand Up @@ -693,6 +701,7 @@ type compactConfig struct {
wait bool
waitInterval time.Duration
disableDownsampling bool
blockListStrategy string
blockMetaFetchConcurrency int
blockFilesConcurrency int
blockViewerSyncBlockInterval time.Duration
Expand Down Expand Up @@ -754,6 +763,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"as querying long time ranges without non-downsampled data is not efficient and useful e.g it is not possible to render all samples for a human eye anyway").
Default("false").BoolVar(&cc.disableDownsampling)

strategies := strings.Join([]string{string(concurrentDiscovery), string(recursiveDiscovery)}, ", ")
cmd.Flag("block-discovery-strategy", "One of "+strategies+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations.").
Default(string(concurrentDiscovery)).StringVar(&cc.blockListStrategy)
cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage.").
Default("32").IntVar(&cc.blockMetaFetchConcurrency)
cmd.Flag("block-files-concurrency", "Number of goroutines to use when fetching/uploading block files from object storage.").
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func RunDownsample(
insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))

// While fetching blocks, filter out blocks that were marked for no downsample.
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewDeduplicateFilter(block.FetcherConcurrency),
downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, block.FetcherConcurrency),
Expand Down
5 changes: 3 additions & 2 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/thanos-io/objstore"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
Expand Down Expand Up @@ -157,7 +158,7 @@ func TestRegression4960_Deadlock(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey())))
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

Expand Down Expand Up @@ -197,7 +198,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey())))
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

Expand Down
8 changes: 5 additions & 3 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/file"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -104,7 +105,6 @@ func registerQuery(app *extkingpin.App) {
Enum(string(apiv1.PromqlEnginePrometheus), string(apiv1.PromqlEngineThanos))
extendedFunctionsEnabled := cmd.Flag("query.enable-x-functions", "Whether to enable extended rate functions (xrate, xincrease and xdelta). Only has effect when used with Thanos engine.").Default("false").Bool()
promqlQueryMode := cmd.Flag("query.mode", "PromQL query mode. One of: local, distributed.").
Hidden().
Default(string(queryModeLocal)).
Enum(string(queryModeLocal), string(queryModeDistributed))

Expand Down Expand Up @@ -261,7 +261,7 @@ func registerQuery(app *extkingpin.App) {
RefreshInterval: *fileSDInterval,
}
var err error
if fileSD, err = file.NewDiscovery(conf, logger, reg); err != nil {
if fileSD, err = file.NewDiscovery(conf, logger, conf.NewDiscovererMetrics(reg, discovery.NewRefreshMetrics(reg))); err != nil {
return err
}
}
Expand Down Expand Up @@ -646,6 +646,8 @@ func runQuery(

var remoteEngineEndpoints api.RemoteEndpoints
if queryMode != queryModeLocal {
level.Info(logger).Log("msg", "Distributed query mode enabled, using Thanos as the default query engine.")
defaultEngine = string(apiv1.PromqlEngineThanos)
remoteEngineEndpoints = query.NewRemoteEndpoints(logger, endpoints.GetQueryAPIClients, query.Opts{
AutoDownsample: enableAutodownsampling,
ReplicaLabels: queryReplicaLabels,
Expand Down Expand Up @@ -685,7 +687,7 @@ func runQuery(

ins := extpromhttp.NewTenantInstrumentationMiddleware(tenantHeader, defaultTenant, reg, nil)
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
ui.NewQueryUI(logger, endpoints, webExternalPrefix, webPrefixHeaderName, alertQueryURL).Register(router, ins)
ui.NewQueryUI(logger, endpoints, webExternalPrefix, webPrefixHeaderName, alertQueryURL, tenantHeader, defaultTenant, enforceTenancy).Register(router, ins)

api := apiv1.NewQueryAPI(
logger,
Expand Down
21 changes: 17 additions & 4 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
Expand All @@ -44,6 +45,7 @@ import (
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/client"
objstoretracing "github.com/thanos-io/objstore/tracing/opentracing"
"github.com/thanos-io/promql-engine/execution/parse"
"gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/alert"
Expand Down Expand Up @@ -105,6 +107,8 @@ type ruleConfig struct {
lset labels.Labels
ignoredLabelNames []string
storeRateLimits store.SeriesSelectLimits

extendedFunctionsEnabled bool
}

type Expression struct {
Expand Down Expand Up @@ -155,6 +159,8 @@ func registerRule(app *extkingpin.App) {
cmd.Flag("grpc-query-endpoint", "Addresses of Thanos gRPC query API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect Thanos API servers through respective DNS lookups.").
PlaceHolder("<endpoint>").StringsVar(&conf.grpcQueryEndpoints)

cmd.Flag("query.enable-x-functions", "Whether to enable extended rate functions (xrate, xincrease and xdelta). Only has effect when used with Thanos engine.").Default("false").BoolVar(&conf.extendedFunctionsEnabled)

conf.rwConfig = extflag.RegisterPathOrContent(cmd, "remote-write.config", "YAML config for the remote-write configurations, that specify servers where samples should be sent to (see https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). This automatically enables stateless mode for ruler and no series will be stored in the ruler's TSDB. If an empty config (or file) is provided, the flag is ignored and ruler is run with its own TSDB.", extflag.WithEnvSubstitution())

conf.objStoreConfig = extkingpin.RegisterCommonObjStoreFlags(cmd, "", false)
Expand Down Expand Up @@ -385,7 +391,7 @@ func runRule(
queryClients = append(queryClients, queryClient)
promClients = append(promClients, promclient.NewClient(queryClient, logger, "thanos-rule"))
// Discover and resolve query addresses.
addDiscoveryGroups(g, queryClient, conf.query.dnsSDInterval)
addDiscoveryGroups(g, queryClient, conf.query.dnsSDInterval, logger)
}

if cfg.GRPCConfig != nil {
Expand Down Expand Up @@ -572,7 +578,7 @@ func runRule(
return err
}
// Discover and resolve Alertmanager addresses.
addDiscoveryGroups(g, amClient, conf.alertmgr.alertmgrsDNSSDInterval)
addDiscoveryGroups(g, amClient, conf.alertmgr.alertmgrsDNSSDInterval, logger)

alertmgrs = append(alertmgrs, alert.NewAlertmanager(logger, amClient, time.Duration(cfg.Timeout), cfg.APIVersion))
}
Expand All @@ -582,6 +588,12 @@ func runRule(
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(conf.lset), conf.alertmgr.alertExcludeLabels, alertRelabelConfigs)
)
{
if conf.extendedFunctionsEnabled {
for k, fn := range parse.XFunctions {
parser.Functions[k] = fn
}
}

// Run rule evaluation and alert notifications.
notifyFunc := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
res := make([]*notifier.Alert, 0, len(alerts))
Expand Down Expand Up @@ -969,7 +981,7 @@ func queryFuncCreator(
}
}

func addDiscoveryGroups(g *run.Group, c *clientconfig.HTTPClient, interval time.Duration) {
func addDiscoveryGroups(g *run.Group, c *clientconfig.HTTPClient, interval time.Duration, logger log.Logger) {
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
c.Discover(ctx)
Expand All @@ -979,9 +991,10 @@ func addDiscoveryGroups(g *run.Group, c *clientconfig.HTTPClient, interval time.
})

g.Add(func() error {
return runutil.Repeat(interval, ctx.Done(), func() error {
runutil.RepeatInfinitely(logger, interval, ctx.Done(), func() error {
return c.Resolve(ctx)
})
return nil
}, func(error) {
cancel()
})
Expand Down
27 changes: 24 additions & 3 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"strconv"
"strings"
"time"

"github.com/alecthomas/units"
Expand Down Expand Up @@ -56,6 +57,13 @@ const (
retryIntervalDuration = 10
)

type syncStrategy string

const (
concurrentDiscovery syncStrategy = "concurrent"
recursiveDiscovery syncStrategy = "recursive"
)

type storeConfig struct {
indexCacheConfigs extflag.PathOrContent
objStoreConfig extflag.PathOrContent
Expand All @@ -74,6 +82,7 @@ type storeConfig struct {
component component.StoreAPI
debugLogging bool
syncInterval time.Duration
blockListStrategy string
blockSyncConcurrency int
blockMetaFetchConcurrency int
filterConf *store.FilterConfig
Expand Down Expand Up @@ -137,6 +146,10 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view.").
Default("15m").DurationVar(&sc.syncInterval)

strategies := strings.Join([]string{string(concurrentDiscovery), string(recursiveDiscovery)}, ", ")
cmd.Flag("block-discovery-strategy", "One of "+strategies+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations.").
Default(string(concurrentDiscovery)).StringVar(&sc.blockListStrategy)

cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage. Must be equal or greater than 1.").
Default("20").IntVar(&sc.blockSyncConcurrency)

Expand Down Expand Up @@ -309,7 +322,7 @@ func runStore(
r := route.New()

if len(cachingBucketConfigYaml) > 0 {
insBkt, err = storecache.NewCachingBucketFromYaml(cachingBucketConfigYaml, insBkt, logger, reg, r)
insBkt, err = storecache.NewCachingBucketFromYaml(cachingBucketConfigYaml, insBkt, logger, reg, r, conf.cachingBucketConfig.Path())
if err != nil {
return errors.Wrap(err, "create caching bucket")
}
Expand Down Expand Up @@ -345,9 +358,17 @@ func runStore(
return errors.Wrap(err, "create index cache")
}

var blockLister block.Lister
switch syncStrategy(conf.blockListStrategy) {
case concurrentDiscovery:
blockLister = block.NewConcurrentLister(logger, insBkt)
case recursiveDiscovery:
blockLister = block.NewRecursiveLister(logger, insBkt)
default:
return errors.Errorf("unknown sync strategy %s", conf.blockListStrategy)
}
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, time.Duration(conf.ignoreDeletionMarksDelay), conf.blockMetaFetchConcurrency)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, blockLister, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
[]block.MetadataFilter{
block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
Expand Down
Loading

0 comments on commit 48783e1

Please sign in to comment.