Skip to content

Commit

Permalink
unwrap: add max-time and source block filters
Browse files Browse the repository at this point in the history
  • Loading branch information
sepich committed Apr 27, 2024
1 parent 419c6b9 commit 287eb21
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 45 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ Important note that `dump` command downloads specified blocks to cache dir, but

### Unwrap

This could be useful for incorporating Mimir to Thanos world by replacing thanos-receive component. Currently Mimir could accept remote-write, and do instant queries via [sidecar](https://grafana.com/docs/mimir/latest/set-up/migrate/migrate-from-thanos-to-mimir-with-thanos-sidecar/) scheme. But long-term queries via thanos-store would not work with Mimir blocks, as they have no Thanos metadata set.
This could be useful for incorporating Mimir to Thanos world by replacing thanos-receive component. Currently Mimir could accept remote-write, and do instant queries via [sidecar](https://grafana.com/docs/mimir/latest/set-up/migrate/migrate-from-thanos-to-mimir-with-thanos-sidecar/) scheme or via [thanos-promql-connector](https://github.com/thanos-community/thanos-promql-connector). But long-term queries via thanos-store would not work with Mimir blocks, as they have no Thanos metadata set.

Consider this example, we have 2 prometheuses in different locations configured like so:
```yml
Expand Down
57 changes: 35 additions & 22 deletions inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block/metadata"
mtd "github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/runutil"
"golang.org/x/sync/errgroup"
"golang.org/x/text/language"
Expand All @@ -36,14 +37,14 @@ type Meta struct {
Prefix string
}

func inspect(bkt objstore.Bucket, recursive *bool, selector *[]string, sortBy *[]string, logger log.Logger) error {
func inspect(bkt objstore.Bucket, recursive *bool, selector *[]string, sortBy *[]string, maxTime *mtd.TimeOrDurationValue, logger log.Logger) error {
selectorLabels, err := parseFlagLabels(*selector)
if err != nil {
return errors.Wrap(err, "error parsing selector flag")
}

ctx := context.Background() // Ctrl-C instead of 5min limit
metas, err := getMeta(ctx, bkt, *recursive, logger)
metas, err := getAllMetas(ctx, bkt, *recursive, maxTime, logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -76,8 +77,8 @@ func parseFlagLabels(s []string) (labels.Labels, error) {
}

// read mata.json from all blocks in bucket
func getMeta(ctx context.Context, bkt objstore.Bucket, recursive bool, logger log.Logger) (map[ulid.ULID]*Meta, error) {
blocks, err := getBlocks(ctx, bkt, recursive)
func getAllMetas(ctx context.Context, bkt objstore.Bucket, recursive bool, maxTime *mtd.TimeOrDurationValue, logger log.Logger) (map[ulid.ULID]*Meta, error) {
blocks, err := getBlocks(ctx, bkt, recursive, maxTime)
if err != nil {
return nil, err
}
Expand All @@ -89,28 +90,12 @@ func getMeta(ctx context.Context, bkt objstore.Bucket, recursive bool, logger lo
for i := 0; i < concurrency; i++ {
eg.Go(func() error {
for b := range ch {
metaFile := b.Prefix + b.Id.String() + "/" + metadata.MetaFilename
r, err := bkt.Get(ctx, metaFile)
m, err := getMeta(ctx, b, bkt, logger)
if bkt.IsObjNotFoundErr(err) {
continue // Meta.json was deleted between bkt.Exists and here.
}
if err != nil {
return errors.Wrapf(err, "get meta file: %v", metaFile)
}

defer runutil.CloseWithLogOnErr(logger, r, "close bkt meta get")

metaContent, err := io.ReadAll(r)
if err != nil {
return errors.Wrapf(err, "read meta file: %v", metaFile)
}

m := &metadata.Meta{}
if err := json.Unmarshal(metaContent, m); err != nil {
return errors.Wrapf(err, "%s unmarshal", metaFile)
}
if m.Version != metadata.ThanosVersion1 {
return errors.Errorf("unexpected meta file: %s version: %d", metaFile, m.Version)
return err
}
mu.Lock()
res[b.Id] = &Meta{Meta: *m, Prefix: b.Prefix}
Expand Down Expand Up @@ -293,3 +278,31 @@ func compare(s1, s2 string) bool {
}
return s1Time.Before(s2Time)
}

// get block Metadata from bucket
func getMeta(ctx context.Context, b Block, bkt objstore.Bucket, logger log.Logger) (*metadata.Meta, error) {
metaFile := b.Prefix + b.Id.String() + "/" + metadata.MetaFilename
r, err := bkt.Get(ctx, metaFile)
if bkt.IsObjNotFoundErr(err) {
return nil, err // continue
}
if err != nil {
return nil, errors.Wrapf(err, "get meta file: %v", metaFile)
}

defer runutil.CloseWithLogOnErr(logger, r, "close bkt meta get")

metaContent, err := io.ReadAll(r)
if err != nil {
return nil, errors.Wrapf(err, "read meta file: %v", metaFile)
}

m := &metadata.Meta{}
if err := json.Unmarshal(metaContent, m); err != nil {
return nil, errors.Wrapf(err, "%s unmarshal", metaFile)
}
if m.Version != metadata.ThanosVersion1 {
return nil, errors.Errorf("unexpected meta file: %s version: %d", metaFile, m.Version)
}
return m, nil
}
13 changes: 8 additions & 5 deletions ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"github.com/oklog/ulid"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/model"
"strings"
)

func ls(bkt objstore.Bucket, recursive *bool) error {
blocks, err := getBlocks(context.Background(), bkt, *recursive)
func ls(bkt objstore.Bucket, recursive *bool, maxTime *model.TimeOrDurationValue) error {
blocks, err := getBlocks(context.Background(), bkt, *recursive, maxTime)
if err == nil {
for _, b := range blocks {
fmt.Println(b.Prefix+b.Id.String(), ulid.Time(b.Id.Time()).UTC().Format("06-01-02T15:04:05Z"))
Expand All @@ -24,7 +25,7 @@ type Block struct {
Id ulid.ULID
}

func getBlocks(ctx context.Context, bkt objstore.Bucket, recursive bool) (found []Block, err error) {
func getBlocks(ctx context.Context, bkt objstore.Bucket, recursive bool, maxTime *model.TimeOrDurationValue) (found []Block, err error) {
if recursive {
err = bkt.Iter(ctx, "", func(name string) error {
parts := strings.Split(name, "/")
Expand All @@ -40,13 +41,15 @@ func getBlocks(ctx context.Context, bkt objstore.Bucket, recursive bool) (found
if len(parts) > 2 {
prefix = strings.Join(parts[0:len(parts)-2], "/") + "/"
}
found = append(found, Block{Prefix: prefix, Id: id})
if id.Time() < uint64(maxTime.PrometheusTimestamp()) {
found = append(found, Block{Prefix: prefix, Id: id})
}
}
return nil
}, objstore.WithRecursiveIter)
} else {
err = bkt.Iter(ctx, "", func(name string) error {
if id, ok := block.IsBlockDir(name); ok {
if id, ok := block.IsBlockDir(name); ok && id.Time() < uint64(maxTime.PrometheusTimestamp()) {
found = append(found, Block{Prefix: "", Id: id})
}
return nil
Expand Down
14 changes: 11 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/go-kit/log/level"
"github.com/prometheus/common/version"
"github.com/thanos-io/objstore/client"
"github.com/thanos-io/thanos/pkg/model"
"gopkg.in/alecthomas/kingpin.v2"
"math"
"os"
Expand All @@ -22,12 +23,16 @@ func main() {

lsCmd := app.Command("ls", "List all blocks in the bucket.")
lsRecursive := lsCmd.Flag("recursive", "Recurive search for blocks in the bucket (Mimir has blocks nested to tenants folders)").Short('r').Default("false").Bool()
lsMaxTime := model.TimeOrDuration(lsCmd.Flag("max-time", "End of time range limit to get blocks. List only those, which happened earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))

inspectCmd := app.Command("inspect", "Inspect all blocks in the bucket in detailed, table-like way")
inspectRecursive := inspectCmd.Flag("recursive", "Recursive search for blocks in the bucket (Mimir has blocks nested to tenants folders)").Short('r').Default("false").Bool()
inspectSelector := inspectCmd.Flag("label", `Filter by Thanos block label, e.g. '-l key1="value1" -l key2="value2"'. All key value pairs must match. To select all blocks for some key use "*" as value.`).Short('l').PlaceHolder(`<name>="<value>"`).Strings()
inspectSortBy := inspectCmd.Flag("sort-by", "Sort by columns. It's also possible to sort by multiple columns, e.g. '--sort-by FROM --sort-by LABELS'. I.e., if the 'FROM' value is equal the rows are then further sorted by the 'LABELS' value").
Default("FROM", "LABELS").Enums(inspectColumns...)
inspectMaxTime := model.TimeOrDuration(inspectCmd.Flag("max-time", "End of time range limit to get blocks. Inspect only those, which happened earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))

analyzeCmd := app.Command("analyze", "Analyze churn, label pair cardinality and find labels to split on")
analyzeULID := analyzeCmd.Arg("ULID", "Block id to analyze (ULID)").Required().String()
Expand Down Expand Up @@ -58,6 +63,9 @@ func main() {
unwrapWait := unwrapCmd.Flag("wait-interval", "Wait interval between consecutive runs and bucket refreshes. Run once if 0.").Default("5m").Short('w').Duration()
unwrapDry := unwrapCmd.Flag("dry-run", "Don't do any changes to bucket. Only print what would be done.").Default("false").Bool()
unwrapDst := extkingpin.RegisterPathOrContent(unwrapCmd, "dst.config", "YAML file that contains destination object store configuration for generated blocks.", extkingpin.WithEnvSubstitution(), extkingpin.WithRequired())
unwrapMaxTime := model.TimeOrDuration(unwrapCmd.Flag("max-time", "End of time range limit to get blocks. Unwrap only those, which happened earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))
unwrapSrc := unwrapCmd.Flag("source", "Only process blocks produced by this source (e.g `compactor`). Empty means process all blocks").Default("").String()

cmd := kingpin.MustParse(app.Parse(os.Args[1:]))
var logger log.Logger
Expand All @@ -82,17 +90,17 @@ func main() {

switch cmd {
case lsCmd.FullCommand():
exitCode(ls(bkt, lsRecursive))
exitCode(ls(bkt, lsRecursive, lsMaxTime))
case inspectCmd.FullCommand():
exitCode(inspect(bkt, inspectRecursive, inspectSelector, inspectSortBy, logger))
exitCode(inspect(bkt, inspectRecursive, inspectSelector, inspectSortBy, inspectMaxTime, logger))
case analyzeCmd.FullCommand():
exitCode(analyze(bkt, analyzeULID, analyzeDir, analyzeLimit, analyzeMatchers, logger))
case dumpCmd.FullCommand():
exitCode(dump(bkt, os.Stdout, dumpULIDs, dumpDir, dumpMinTime, dumpMaxTime, dumpMatch, logger))
case importCmd.FullCommand():
exitCode(importMetrics(bkt, importFromFile, importBlockSize, importDir, importLabels, *importUpload, logger))
case unwrapCmd.FullCommand():
exitCode(unwrap(bkt, *unwrapRelabel, *unwrapRecursive, unwrapDir, unwrapWait, *unwrapDry, unwrapDst, logger))
exitCode(unwrap(bkt, *unwrapRelabel, *unwrapRecursive, unwrapDir, unwrapWait, *unwrapDry, unwrapDst, unwrapMaxTime, unwrapSrc, logger))
}
}

Expand Down
45 changes: 31 additions & 14 deletions unwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/thanos-io/objstore/client"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/runutil"
"gopkg.in/yaml.v2"
"math"
Expand All @@ -31,7 +32,7 @@ import (

const metaExtLabels = "__meta_ext_labels"

func unwrap(bkt objstore.Bucket, unwrapRelabel extkingpin.PathOrContent, recursive bool, dir *string, wait *time.Duration, unwrapDry bool, outConfig *extkingpin.PathOrContent, logger log.Logger) (err error) {
func unwrap(bkt objstore.Bucket, unwrapRelabel extkingpin.PathOrContent, recursive bool, dir *string, wait *time.Duration, unwrapDry bool, outConfig *extkingpin.PathOrContent, maxTime *model.TimeOrDurationValue, unwrapSrc *string, logger log.Logger) (err error) {
relabelContentYaml, err := unwrapRelabel.Content()
if err != nil {
return fmt.Errorf("get content of relabel configuration: %w", err)
Expand All @@ -52,16 +53,26 @@ func unwrap(bkt objstore.Bucket, unwrapRelabel extkingpin.PathOrContent, recursi

processBucket := func() error {
begin := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
blocks, err := getBlocks(ctx, bkt, recursive)
blocks, err := getBlocks(ctx, bkt, recursive, maxTime)
if err != nil {
return err
}
for _, b := range blocks {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
if err := unwrapBlock(ctx, bkt, b, relabelConfig, *dir, unwrapDry, dst, logger); err != nil {
if *unwrapSrc != "" {
m, err := getMeta(ctx, b, bkt, logger)
if bkt.IsObjNotFoundErr(err) {
continue // Meta.json was deleted between bkt.Exists and here.
}
if err != nil {
return err
}
if string(m.Thanos.Source) != *unwrapSrc {
continue
}
}
if err := unwrapBlock(bkt, b, relabelConfig, *dir, unwrapDry, dst, logger); err != nil {
return err
}
}
Expand All @@ -79,7 +90,7 @@ func unwrap(bkt objstore.Bucket, unwrapRelabel extkingpin.PathOrContent, recursi
})
}

func unwrapBlock(ctx context.Context, bkt objstore.Bucket, b Block, relabelConfig []*relabel.Config, dir string, unwrapDry bool, dst objstore.Bucket, logger log.Logger) error {
func unwrapBlock(bkt objstore.Bucket, b Block, relabelConfig []*relabel.Config, dir string, unwrapDry bool, dst objstore.Bucket, logger log.Logger) error {
if err := runutil.DeleteAll(dir); err != nil {
return fmt.Errorf("unable to cleanup cache folder %s: %w", dir, err)
}
Expand All @@ -92,8 +103,10 @@ func unwrapBlock(ctx context.Context, bkt objstore.Bucket, b Block, relabelConfi
}

// prepare input
ctxd, canceld := context.WithTimeout(context.Background(), 10*time.Minute)
defer canceld()
pb := objstore.NewPrefixedBucket(bkt, b.Prefix)
if err := downloadBlock(ctx, inDir, b.Id.String(), pb, logger); err != nil {
if err := downloadBlock(ctxd, inDir, b.Id.String(), pb, logger); err != nil {
return err
}
os.Mkdir(path.Join(inDir, "wal"), 0777)
Expand All @@ -108,7 +121,7 @@ func unwrapBlock(ctx context.Context, bkt objstore.Bucket, b Block, relabelConfi
defer func() {
err = tsdb_errors.NewMulti(err, db.Close()).Err()
}()
q, err := db.Querier(ctx, 0, math.MaxInt64)
q, err := db.Querier(context.Background(), 0, math.MaxInt64)
if err != nil {
return err
}
Expand All @@ -132,7 +145,7 @@ func unwrapBlock(ctx context.Context, bkt objstore.Bucket, b Block, relabelConfi
}

lbs, extl := extractLabels(rl, strings.Split(rl.Get(metaExtLabels), ";"))
tdb, err := mw.getTenant(ctx, extl)
tdb, err := mw.getTenant(context.Background(), extl)
if err != nil {
return err
}
Expand Down Expand Up @@ -170,21 +183,25 @@ func unwrapBlock(ctx context.Context, bkt objstore.Bucket, b Block, relabelConfi
return tsdb_errors.NewMulti(ws...).Err()
}

blocks, err := mw.flush(ctx)
blocks, err := mw.flush(context.Background())
if unwrapDry {
level.Info(logger).Log("msg", "dry-run: skipping upload of created blocks and delete of original block", "ulids", fmt.Sprint(blocks), "orig", b.Id)
} else {
for _, id := range blocks {
begin := time.Now()
err = block.Upload(ctx, logger, dst, filepath.Join(outDir, id.String()), metadata.SHA256Func)
ctxu, cancelu := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancelu()
err = block.Upload(ctxu, logger, dst, filepath.Join(outDir, id.String()), metadata.SHA256Func)
if err != nil {
return fmt.Errorf("upload block %s: %v", id, err)
}
level.Info(logger).Log("msg", "uploaded block", "ulid", id, "duration", time.Since(begin))
}
level.Info(logger).Log("msg", "deleting original block", "ulid", b.Id)
if err := pb.Delete(ctx, b.Id.String()); err != nil {
return fmt.Errorf("delete block %s/%s: %v", b.Prefix, b.Id, err)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
if err := block.Delete(ctx, logger, pb, b.Id); err != nil {
return fmt.Errorf("delete block %s%s: %v", b.Prefix, b.Id, err)
}
}

Expand Down

0 comments on commit 287eb21

Please sign in to comment.