Skip to content

Commit

Permalink
add Unwrap
Browse files Browse the repository at this point in the history
  • Loading branch information
sepich committed Dec 18, 2023
1 parent 3d8dfee commit 93b3994
Show file tree
Hide file tree
Showing 10 changed files with 424 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
/.idea/
/.vscode/
/data/
/tmp/
/S3/
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
/.vscode/
/thanos-kit
/data/
/tmp/
/S3/
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.20 as builder
FROM golang:1.21 as builder
WORKDIR /build
# try to cache deps
COPY go.mod go.sum ./
Expand Down
66 changes: 62 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,23 @@ Tooling to work with Thanos blocks in object storage.
- **analyze** - Analyze churn, label pair cardinality for specific block. (same as `promtool tsdb analyze` but also show Labels suitable for block split)
- **dump** - Dump samples from a TSDB to text format (same as `promtool tsdb dump` but to promtext format)
- **import** - Import samples to TSDB blocks (same as `promtool tsdb create-blocks-from openmetrics` but from promtext format). Read more about [backfill](#backfill) below
- **unwrap** - Split one TSDB block to multiple based on Label values. Read more [below](#unwrap)

Cli arguments are mostly the same as for `thanos`, help is available for each sub-command:
```
$ docker run sepa/thanos-kit -h
usage: thanos-kit [<flags>] <command> [<args> ...]
Tooling to work with Thanos blocks in object storage
Tooling for Thanos blocks in object storage
Flags:
-h, --help Show context-sensitive help (also try --help-long and --help-man).
--version Show application version.
--log.level=info Log filtering level (info, debug)
--objstore.config-file=<file-path>
Path to YAML file that contains object store%s configuration. See format details: https://thanos.io/tip/thanos/storage.md/
Path to YAML file that contains object store configuration. See format details: https://thanos.io/tip/thanos/storage.md/
--objstore.config=<content>
Alternative to 'objstore.config-file' flag (mutually exclusive). Content of YAML file that contains object store%s configuration. See format details: https://thanos.io/tip/thanos/storage.md/
Alternative to 'objstore.config-file' flag (mutually exclusive). Content of YAML file that contains object store configuration. See format details: https://thanos.io/tip/thanos/storage.md/
Commands:
help [<command>...]
Expand All @@ -41,6 +42,9 @@ Commands:
import --input-file=INPUT-FILE --label=<name>="<value>" [<flags>]
Import samples from text to TSDB blocks
unwrap [<flags>]
Split TSDB block to multiple blocks by Label
```

### Get it
Expand Down Expand Up @@ -87,7 +91,61 @@ By default, `thanos-kit` will cache blocks from object storage to `./data` direc

Important note that `dump` command downloads specified blocks to cache dir, but then dump TSDB as a whole (including blocks already present there)

### Unwrap

This could be useful for incorporating Mimir to Thanos world by replacing thanos-receive component. Currently Mimir could accept remote-write, and do instant queries via [sidecar](https://grafana.com/docs/mimir/latest/set-up/migrate/migrate-from-thanos-to-mimir-with-thanos-sidecar/) scheme. But long-term queries via thanos-store would not work with Mimir blocks, as they have no Thanos metadata set.

Consider this example, we have 2 prometheuses in different locations configured like so:
```yml
# first
global:
external_labels:
prometheus: A
location: dc1

# second
global:
external_labels:
prometheus: B
location: dc2
```
And they do remote-write to Mimir, let's take a look at produced block:
```bash
# thanos-kit analyze
Block ID: 01GXGKXC3PA1DE6QNAH2BM2P0R
Thanos Labels:
Label names appearing in all Series: [instance, job, prometheus, location]
```
The block has no Thanos labels set, and each metric inside has labels `[prometheus, location]` coming from external_labels. We can split this block to 2 separate blocks for each original prometheus like this:
```bash
# thanos-kit unwrap --relabel-config='[{target_label: __meta_ext_labels, replacement: prometheus}]'
uploaded block ulid=001
uploaded block ulid=002
# thanos-kit analyze 001
Block ID: 001
Thanos Labels: prometheus=A
Label names appearing in all Series: [instance, job, location]
# thanos-kit analyze 002
Block ID: 002
Thanos Labels: prometheus=B
Label names appearing in all Series: [instance, job, location]
```
Unwrap command takes usual [relabel_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) and can modify block data. But additionally special label `__meta_ext_labels` is parsed if assigned, and all label names in it are extracted to unique blocks. You can specify multiple values separated by `;`, in example above to split original Mimir block by both prometheus+location sets:
```yaml
- |
--relabel-config=
- target_label: __meta_ext_labels
replacement: prometheus;location
```
And resulting blocks are the same, as would be shipped by `thanos-sidecar`. So they would be compacted without duplicates and wasted space on S3 by compactor afterward.

This could also be used for blocks produced by thanos-receive too, existing Thanos labels would be merged with extracted ones. Smaller blocks ease thanos-compactor/store sharding, and helps to have different retentions.

### Alternatives
- [thanos tools bucket](https://thanos.io/tip/components/tools.md/#bucket)
- [promtool tsdb](https://prometheus.io/docs/prometheus/latest/command-line/promtool/#promtool-tsdb)
- [mimirtool](https://grafana.com/docs/mimir/latest/manage/tools/mimirtool/#backfill)
- [mimirtool](https://grafana.com/docs/mimir/latest/manage/tools/mimirtool/#backfill)
8 changes: 6 additions & 2 deletions dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
func dump(bkt objstore.Bucket, out io.Writer, ids *[]string, dir *string, mint, maxt *int64, match *string, logger log.Logger) error {
ctx := context.Background()
for _, id := range *ids {
if _, err := ulid.Parse(id); err != nil {
return errors.Wrapf(err, `invalid ULID "%s"`, id)
}
if err := downloadBlock(ctx, *dir, id, bkt, logger); err != nil {
return err
}
Expand Down Expand Up @@ -55,6 +58,7 @@ func dumpSamples(out io.Writer, path string, mint, maxt int64, match string) (er
series := ss.At()
name := series.Labels().Get("__name__")
lbs := series.Labels().MatchLabels(false, "__name__")
// todo: add thanos labels?
it := series.Iterator(nil)
for it.Next() == chunkenc.ValFloat {
ts, val := it.At()
Expand Down Expand Up @@ -85,9 +89,9 @@ func dumpSamples(out io.Writer, path string, mint, maxt int64, match string) (er

// download block id to dir
func downloadBlock(ctx context.Context, dir, id string, bkt objstore.Bucket, logger log.Logger) error {
bdir := filepath.Join(dir, id)
dest := filepath.Join(dir, id)
begin := time.Now()
err := block.Download(ctx, logger, bkt, ulid.MustParse(id), bdir)
err := block.Download(ctx, logger, bkt, ulid.MustParse(id), dest)
if err != nil {
return errors.Wrapf(err, "download block %s", id)
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/sepich/thanos-kit
go 1.20

require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
github.com/efficientgo/tools/extkingpin v0.0.0-20220817170617-6c25e3b627dd
github.com/go-kit/log v0.2.1
github.com/oklog/ulid v1.3.1
Expand All @@ -16,6 +17,7 @@ require (
golang.org/x/sync v0.4.0
golang.org/x/text v0.13.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/yaml.v2 v2.4.0
)

require (
Expand All @@ -30,7 +32,6 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.5.1 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.1.1 // indirect
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible // indirect
github.com/aws/aws-sdk-go v1.45.25 // indirect
github.com/aws/aws-sdk-go-v2 v1.16.0 // indirect
Expand Down Expand Up @@ -114,6 +115,5 @@ require (
google.golang.org/grpc v1.58.3 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
24 changes: 14 additions & 10 deletions import.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,16 +230,7 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn
}
for _, b := range blocks {
if b.Meta().ULID == blk {
m := metadata.Meta{
BlockMeta: b.Meta(),
Thanos: metadata.Thanos{
Version: metadata.ThanosVersion1,
Labels: lbls.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: "thanos-kit",
},
}
if err = m.WriteToDir(logger, outputDir+"/"+b.Meta().ULID.String()); err != nil {
if err = writeThanosMeta(b.Meta(), lbls.Map(), 0, outputDir, logger); err != nil {
return fmt.Errorf("write metadata: %w", err)
}
printBlocks([]tsdb.BlockReader{b}, !wroteHeader, humanReadable)
Expand Down Expand Up @@ -299,3 +290,16 @@ func getFormatedBytes(bytes int64, humanReadable bool) string {
}
return strconv.FormatInt(bytes, 10)
}

func writeThanosMeta(meta tsdb.BlockMeta, lbls map[string]string, res int64, dir string, logger log.Logger) error {
m := metadata.Meta{
BlockMeta: meta,
Thanos: metadata.Thanos{
Version: metadata.ThanosVersion1,
Labels: lbls,
Downsample: metadata.ThanosDownsample{Resolution: res},
Source: "thanos-kit",
},
}
return m.WriteToDir(logger, dir+"/"+meta.ULID.String())
}
3 changes: 3 additions & 0 deletions ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ func getBlocks(ctx context.Context, bkt objstore.Bucket, recursive bool) (found
if recursive {
err = bkt.Iter(ctx, "", func(name string) error {
parts := strings.Split(name, "/")
if len(parts) < 2 {
return nil
}
dir, file := parts[len(parts)-2], parts[len(parts)-1]
if !block.IsBlockMetaFile(file) {
return nil
Expand Down
32 changes: 21 additions & 11 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,47 @@ func main() {
app := kingpin.New(filepath.Base(os.Args[0]), "Tooling for Thanos blocks in object storage").Version(version.Print("thanos-split"))
app.HelpFlag.Short('h')
logLevel := app.Flag("log.level", "Log filtering level (info, debug)").Default("info").Enum("error", "warn", "info", "debug")
objStoreConfig := extkingpin.RegisterPathOrContent(app, "objstore.config", "YAML file that contains object store%s configuration. See format details: https://thanos.io/tip/thanos/storage.md/ ", extkingpin.WithEnvSubstitution(), extkingpin.WithRequired())
objStoreConfig := extkingpin.RegisterPathOrContent(app, "objstore.config", "YAML file that contains object store configuration. See format details: https://thanos.io/tip/thanos/storage.md/ ", extkingpin.WithEnvSubstitution(), extkingpin.WithRequired())

lsCmd := app.Command("ls", "List all blocks in the bucket.")
recursive := lsCmd.Flag("recursive", "Recurse one level into bucket to get blocks (Mimir has tenants on top level)").Short('r').Default("false").Bool()
lsRecursive := lsCmd.Flag("recursive", "Recurive search for blocks in the bucket (Mimir has blocks nested to tenants folders)").Short('r').Default("false").Bool()

inspectCmd := app.Command("inspect", "Inspect all blocks in the bucket in detailed, table-like way")
recursive = inspectCmd.Flag("recursive", "Recurse one level into bucket to get blocks (Mimir has tenants on top level)").Short('r').Default("false").Bool()
inspectRecursive := inspectCmd.Flag("recursive", "Recursive search for blocks in the bucket (Mimir has blocks nested to tenants folders)").Short('r').Default("false").Bool()
inspectSelector := inspectCmd.Flag("label", `Filter by Thanos block label, e.g. '-l key1="value1" -l key2="value2"'. All key value pairs must match. To select all blocks for some key use "*" as value.`).Short('l').PlaceHolder(`<name>="<value>"`).Strings()
inspectSortBy := inspectCmd.Flag("sort-by", "Sort by columns. It's also possible to sort by multiple columns, e.g. '--sort-by FROM --sort-by LABELS'. I.e., if the 'FROM' value is equal the rows are then further sorted by the 'LABELS' value").
Default("FROM", "LABELS").Enums(inspectColumns...)

analyzeCmd := app.Command("analyze", "Analyze churn, label pair cardinality and find labels to split on")
analyzeULID := analyzeCmd.Arg("ULID", "Block id to analyze (ULID)").Required().String()
analyzeLimit := analyzeCmd.Flag("limit", "How many items to show in each list").Default("20").Int()
analyzeDataDir := analyzeCmd.Flag("data-dir", "Data directory in which to cache blocks").
analyzeDir := analyzeCmd.Flag("data-dir", "Data directory in which to cache blocks").
Default("./data").String()
analyzeMatchers := analyzeCmd.Flag("match", "Series selector to analyze. Only 1 set of matchers is supported now.").String()

dumpCmd := app.Command("dump", "Dump samples from a TSDB to text")
dumpULIDs := dumpCmd.Arg("ULID", "Blocks id (ULID) to dump (repeated)").Required().Strings()
dumpDataDir := dumpCmd.Flag("data-dir", "Data directory in which to cache blocks").Default("./data").String()
dumpDir := dumpCmd.Flag("data-dir", "Data directory in which to cache blocks").Default("./data").String()
dumpMinTime := dumpCmd.Flag("min-time", "Minimum timestamp to dump").Default("0").Int64()
dumpMaxTime := dumpCmd.Flag("max-time", "Maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
dumpMatch := dumpCmd.Flag("match", "Series selector.").Default("{__name__=~'(?s:.*)'}").String()

importCmd := app.Command("import", "Import samples from text to TSDB blocks")
importFromFile := importCmd.Flag("input-file", "Promtext file to read samples from.").Short('f').Required().String()
importBlockSize := importCmd.Flag("block-size", "The maximum block size. The actual block timestamps will be aligned with Prometheus time ranges").Default("2h").Duration()
importDataDir := importCmd.Flag("data-dir", "Data directory in which to cache blocks").
importDir := importCmd.Flag("data-dir", "Data directory in which to cache blocks").
Default("./data").String()
importLabels := importCmd.Flag("label", "Labels to add as Thanos block metadata (repeated)").Short('l').PlaceHolder(`<name>="<value>"`).Required().Strings()
importUpload := importCmd.Flag("upload", "Upload imported blocks to object storage").Default("false").Bool()

unwrapCmd := app.Command("unwrap", "Split TSDB block to multiple blocks by Label")
unwrapRelabel := extkingpin.RegisterPathOrContent(unwrapCmd, "relabel-config", fmt.Sprintf("YAML file that contains relabeling configuration. Set %s=name1;name2;... to split separate blocks for each uniq label combination.", metaExtLabels), extkingpin.WithEnvSubstitution(), extkingpin.WithRequired())
unwrapRecursive := unwrapCmd.Flag("recursive", "Recursive search for blocks in the bucket (Mimir has blocks nested to tenants folders)").Short('r').Default("false").Bool()
unwrapDir := unwrapCmd.Flag("data-dir", "Data directory in which to cache blocks and process tsdb.").Default("./data").String()
unwrapWait := unwrapCmd.Flag("wait-interval", "Wait interval between consecutive runs and bucket refreshes. Run once if 0.").Default("5m").Short('w').Duration()
unwrapDry := unwrapCmd.Flag("dry-run", "Don't do any changes to bucket. Only print what would be done.").Default("false").Bool()
unwrapDst := extkingpin.RegisterPathOrContent(unwrapCmd, "dst.config", "YAML file that contains destination object store configuration for generated blocks.", extkingpin.WithEnvSubstitution(), extkingpin.WithRequired())

cmd := kingpin.MustParse(app.Parse(os.Args[1:]))
var logger log.Logger
{
Expand All @@ -74,15 +82,17 @@ func main() {

switch cmd {
case lsCmd.FullCommand():
exitCode(ls(bkt, recursive))
exitCode(ls(bkt, lsRecursive))
case inspectCmd.FullCommand():
exitCode(inspect(bkt, recursive, inspectSelector, inspectSortBy, logger))
exitCode(inspect(bkt, inspectRecursive, inspectSelector, inspectSortBy, logger))
case analyzeCmd.FullCommand():
exitCode(analyze(bkt, analyzeULID, analyzeDataDir, analyzeLimit, analyzeMatchers, logger))
exitCode(analyze(bkt, analyzeULID, analyzeDir, analyzeLimit, analyzeMatchers, logger))
case dumpCmd.FullCommand():
exitCode(dump(bkt, os.Stdout, dumpULIDs, dumpDataDir, dumpMinTime, dumpMaxTime, dumpMatch, logger))
exitCode(dump(bkt, os.Stdout, dumpULIDs, dumpDir, dumpMinTime, dumpMaxTime, dumpMatch, logger))
case importCmd.FullCommand():
exitCode(importMetrics(bkt, importFromFile, importBlockSize, importDataDir, importLabels, *importUpload, logger))
exitCode(importMetrics(bkt, importFromFile, importBlockSize, importDir, importLabels, *importUpload, logger))
case unwrapCmd.FullCommand():
exitCode(unwrap(bkt, *unwrapRelabel, *unwrapRecursive, unwrapDir, unwrapWait, *unwrapDry, unwrapDst, logger))
}
}

Expand Down
Loading

0 comments on commit 93b3994

Please sign in to comment.