diff --git a/.dockerignore b/.dockerignore index 2e559ec..972de2c 100644 --- a/.dockerignore +++ b/.dockerignore @@ -2,4 +2,4 @@ /.idea/ /.vscode/ /data/ -/tmp/ \ No newline at end of file +/S3/ diff --git a/.gitignore b/.gitignore index 4b4f2df..e4c3b83 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,4 @@ /.vscode/ /thanos-kit /data/ -/tmp/ +/S3/ diff --git a/Dockerfile b/Dockerfile index 7194aac..c4f46d5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.20 as builder +FROM golang:1.21 as builder WORKDIR /build # try to cache deps COPY go.mod go.sum ./ diff --git a/README.md b/README.md index cc7d2f8..ff3628b 100644 --- a/README.md +++ b/README.md @@ -6,22 +6,23 @@ Tooling to work with Thanos blocks in object storage. - **analyze** - Analyze churn, label pair cardinality for specific block. (same as `promtool tsdb analyze` but also show Labels suitable for block split) - **dump** - Dump samples from a TSDB to text format (same as `promtool tsdb dump` but to promtext format) - **import** - Import samples to TSDB blocks (same as `promtool tsdb create-blocks-from openmetrics` but from promtext format). Read more about [backfill](#backfill) below +- **unwrap** - Split one TSDB block to multiple based on Label values. Read more [below](#unwrap) Cli arguments are mostly the same as for `thanos`, help is available for each sub-command: ``` $ docker run sepa/thanos-kit -h usage: thanos-kit [] [ ...] -Tooling to work with Thanos blocks in object storage +Tooling for Thanos blocks in object storage Flags: -h, --help Show context-sensitive help (also try --help-long and --help-man). --version Show application version. --log.level=info Log filtering level (info, debug) --objstore.config-file= - Path to YAML file that contains object store%s configuration. See format details: https://thanos.io/tip/thanos/storage.md/ + Path to YAML file that contains object store configuration. See format details: https://thanos.io/tip/thanos/storage.md/ --objstore.config= - Alternative to 'objstore.config-file' flag (mutually exclusive). Content of YAML file that contains object store%s configuration. See format details: https://thanos.io/tip/thanos/storage.md/ + Alternative to 'objstore.config-file' flag (mutually exclusive). Content of YAML file that contains object store configuration. See format details: https://thanos.io/tip/thanos/storage.md/ Commands: help [...] @@ -41,6 +42,9 @@ Commands: import --input-file=INPUT-FILE --label=="" [] Import samples from text to TSDB blocks + + unwrap [] + Split TSDB block to multiple blocks by Label ``` ### Get it @@ -87,7 +91,61 @@ By default, `thanos-kit` will cache blocks from object storage to `./data` direc Important note that `dump` command downloads specified blocks to cache dir, but then dump TSDB as a whole (including blocks already present there) +### Unwrap + +This could be useful for incorporating Mimir to Thanos world by replacing thanos-receive component. Currently Mimir could accept remote-write, and do instant queries via [sidecar](https://grafana.com/docs/mimir/latest/set-up/migrate/migrate-from-thanos-to-mimir-with-thanos-sidecar/) scheme. But long-term queries via thanos-store would not work with Mimir blocks, as they have no Thanos metadata set. + +Consider this example, we have 2 prometheuses in different locations configured like so: +```yml +# first +global: + external_labels: + prometheus: A + location: dc1 + +# second +global: + external_labels: + prometheus: B + location: dc2 +``` + +And they do remote-write to Mimir, let's take a look at produced block: +```bash +# thanos-kit analyze +Block ID: 01GXGKXC3PA1DE6QNAH2BM2P0R +Thanos Labels: +Label names appearing in all Series: [instance, job, prometheus, location] +``` + +The block has no Thanos labels set, and each metric inside has labels `[prometheus, location]` coming from external_labels. We can split this block to 2 separate blocks for each original prometheus like this: +```bash +# thanos-kit unwrap --relabel-config='[{target_label: __meta_ext_labels, replacement: prometheus}]' +uploaded block ulid=001 +uploaded block ulid=002 + +# thanos-kit analyze 001 +Block ID: 001 +Thanos Labels: prometheus=A +Label names appearing in all Series: [instance, job, location] + +# thanos-kit analyze 002 +Block ID: 002 +Thanos Labels: prometheus=B +Label names appearing in all Series: [instance, job, location] +``` +Unwrap command takes usual [relabel_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) and can modify block data. But additionally special label `__meta_ext_labels` is parsed if assigned, and all label names in it are extracted to unique blocks. You can specify multiple values separated by `;`, in example above to split original Mimir block by both prometheus+location sets: +```yaml + - | + --relabel-config= + - target_label: __meta_ext_labels + replacement: prometheus;location +``` +And resulting blocks are the same, as would be shipped by `thanos-sidecar`. So they would be compacted without duplicates and wasted space on S3 by compactor afterward. + +This could also be used for blocks produced by thanos-receive too, existing Thanos labels would be merged with extracted ones. Smaller blocks ease thanos-compactor/store sharding, and helps to have different retentions. + ### Alternatives - [thanos tools bucket](https://thanos.io/tip/components/tools.md/#bucket) - [promtool tsdb](https://prometheus.io/docs/prometheus/latest/command-line/promtool/#promtool-tsdb) -- [mimirtool](https://grafana.com/docs/mimir/latest/manage/tools/mimirtool/#backfill) \ No newline at end of file +- [mimirtool](https://grafana.com/docs/mimir/latest/manage/tools/mimirtool/#backfill) diff --git a/dump.go b/dump.go index 033a8ea..cb68e21 100644 --- a/dump.go +++ b/dump.go @@ -22,6 +22,9 @@ import ( func dump(bkt objstore.Bucket, out io.Writer, ids *[]string, dir *string, mint, maxt *int64, match *string, logger log.Logger) error { ctx := context.Background() for _, id := range *ids { + if _, err := ulid.Parse(id); err != nil { + return errors.Wrapf(err, `invalid ULID "%s"`, id) + } if err := downloadBlock(ctx, *dir, id, bkt, logger); err != nil { return err } @@ -55,6 +58,7 @@ func dumpSamples(out io.Writer, path string, mint, maxt int64, match string) (er series := ss.At() name := series.Labels().Get("__name__") lbs := series.Labels().MatchLabels(false, "__name__") + // todo: add thanos labels? it := series.Iterator(nil) for it.Next() == chunkenc.ValFloat { ts, val := it.At() @@ -85,9 +89,9 @@ func dumpSamples(out io.Writer, path string, mint, maxt int64, match string) (er // download block id to dir func downloadBlock(ctx context.Context, dir, id string, bkt objstore.Bucket, logger log.Logger) error { - bdir := filepath.Join(dir, id) + dest := filepath.Join(dir, id) begin := time.Now() - err := block.Download(ctx, logger, bkt, ulid.MustParse(id), bdir) + err := block.Download(ctx, logger, bkt, ulid.MustParse(id), dest) if err != nil { return errors.Wrapf(err, "download block %s", id) } diff --git a/go.mod b/go.mod index cfcc4e0..d96fec6 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/sepich/thanos-kit go 1.20 require ( + github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 github.com/efficientgo/tools/extkingpin v0.0.0-20220817170617-6c25e3b627dd github.com/go-kit/log v0.2.1 github.com/oklog/ulid v1.3.1 @@ -16,6 +17,7 @@ require ( golang.org/x/sync v0.4.0 golang.org/x/text v0.13.0 gopkg.in/alecthomas/kingpin.v2 v2.2.6 + gopkg.in/yaml.v2 v2.4.0 ) require ( @@ -30,7 +32,6 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.5.1 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.1.1 // indirect github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect - github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible // indirect github.com/aws/aws-sdk-go v1.45.25 // indirect github.com/aws/aws-sdk-go-v2 v1.16.0 // indirect @@ -114,6 +115,5 @@ require ( google.golang.org/grpc v1.58.3 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/import.go b/import.go index 3a4ebb2..4c171c1 100644 --- a/import.go +++ b/import.go @@ -230,16 +230,7 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn } for _, b := range blocks { if b.Meta().ULID == blk { - m := metadata.Meta{ - BlockMeta: b.Meta(), - Thanos: metadata.Thanos{ - Version: metadata.ThanosVersion1, - Labels: lbls.Map(), - Downsample: metadata.ThanosDownsample{Resolution: 0}, - Source: "thanos-kit", - }, - } - if err = m.WriteToDir(logger, outputDir+"/"+b.Meta().ULID.String()); err != nil { + if err = writeThanosMeta(b.Meta(), lbls.Map(), 0, outputDir, logger); err != nil { return fmt.Errorf("write metadata: %w", err) } printBlocks([]tsdb.BlockReader{b}, !wroteHeader, humanReadable) @@ -299,3 +290,16 @@ func getFormatedBytes(bytes int64, humanReadable bool) string { } return strconv.FormatInt(bytes, 10) } + +func writeThanosMeta(meta tsdb.BlockMeta, lbls map[string]string, res int64, dir string, logger log.Logger) error { + m := metadata.Meta{ + BlockMeta: meta, + Thanos: metadata.Thanos{ + Version: metadata.ThanosVersion1, + Labels: lbls, + Downsample: metadata.ThanosDownsample{Resolution: res}, + Source: "thanos-kit", + }, + } + return m.WriteToDir(logger, dir+"/"+meta.ULID.String()) +} diff --git a/ls.go b/ls.go index 4682d0d..cca6281 100644 --- a/ls.go +++ b/ls.go @@ -28,6 +28,9 @@ func getBlocks(ctx context.Context, bkt objstore.Bucket, recursive bool) (found if recursive { err = bkt.Iter(ctx, "", func(name string) error { parts := strings.Split(name, "/") + if len(parts) < 2 { + return nil + } dir, file := parts[len(parts)-2], parts[len(parts)-1] if !block.IsBlockMetaFile(file) { return nil diff --git a/main.go b/main.go index 7d04401..47eec43 100644 --- a/main.go +++ b/main.go @@ -18,13 +18,13 @@ func main() { app := kingpin.New(filepath.Base(os.Args[0]), "Tooling for Thanos blocks in object storage").Version(version.Print("thanos-split")) app.HelpFlag.Short('h') logLevel := app.Flag("log.level", "Log filtering level (info, debug)").Default("info").Enum("error", "warn", "info", "debug") - objStoreConfig := extkingpin.RegisterPathOrContent(app, "objstore.config", "YAML file that contains object store%s configuration. See format details: https://thanos.io/tip/thanos/storage.md/ ", extkingpin.WithEnvSubstitution(), extkingpin.WithRequired()) + objStoreConfig := extkingpin.RegisterPathOrContent(app, "objstore.config", "YAML file that contains object store configuration. See format details: https://thanos.io/tip/thanos/storage.md/ ", extkingpin.WithEnvSubstitution(), extkingpin.WithRequired()) lsCmd := app.Command("ls", "List all blocks in the bucket.") - recursive := lsCmd.Flag("recursive", "Recurse one level into bucket to get blocks (Mimir has tenants on top level)").Short('r').Default("false").Bool() + lsRecursive := lsCmd.Flag("recursive", "Recurive search for blocks in the bucket (Mimir has blocks nested to tenants folders)").Short('r').Default("false").Bool() inspectCmd := app.Command("inspect", "Inspect all blocks in the bucket in detailed, table-like way") - recursive = inspectCmd.Flag("recursive", "Recurse one level into bucket to get blocks (Mimir has tenants on top level)").Short('r').Default("false").Bool() + inspectRecursive := inspectCmd.Flag("recursive", "Recursive search for blocks in the bucket (Mimir has blocks nested to tenants folders)").Short('r').Default("false").Bool() inspectSelector := inspectCmd.Flag("label", `Filter by Thanos block label, e.g. '-l key1="value1" -l key2="value2"'. All key value pairs must match. To select all blocks for some key use "*" as value.`).Short('l').PlaceHolder(`=""`).Strings() inspectSortBy := inspectCmd.Flag("sort-by", "Sort by columns. It's also possible to sort by multiple columns, e.g. '--sort-by FROM --sort-by LABELS'. I.e., if the 'FROM' value is equal the rows are then further sorted by the 'LABELS' value"). Default("FROM", "LABELS").Enums(inspectColumns...) @@ -32,13 +32,13 @@ func main() { analyzeCmd := app.Command("analyze", "Analyze churn, label pair cardinality and find labels to split on") analyzeULID := analyzeCmd.Arg("ULID", "Block id to analyze (ULID)").Required().String() analyzeLimit := analyzeCmd.Flag("limit", "How many items to show in each list").Default("20").Int() - analyzeDataDir := analyzeCmd.Flag("data-dir", "Data directory in which to cache blocks"). + analyzeDir := analyzeCmd.Flag("data-dir", "Data directory in which to cache blocks"). Default("./data").String() analyzeMatchers := analyzeCmd.Flag("match", "Series selector to analyze. Only 1 set of matchers is supported now.").String() dumpCmd := app.Command("dump", "Dump samples from a TSDB to text") dumpULIDs := dumpCmd.Arg("ULID", "Blocks id (ULID) to dump (repeated)").Required().Strings() - dumpDataDir := dumpCmd.Flag("data-dir", "Data directory in which to cache blocks").Default("./data").String() + dumpDir := dumpCmd.Flag("data-dir", "Data directory in which to cache blocks").Default("./data").String() dumpMinTime := dumpCmd.Flag("min-time", "Minimum timestamp to dump").Default("0").Int64() dumpMaxTime := dumpCmd.Flag("max-time", "Maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64() dumpMatch := dumpCmd.Flag("match", "Series selector.").Default("{__name__=~'(?s:.*)'}").String() @@ -46,11 +46,19 @@ func main() { importCmd := app.Command("import", "Import samples from text to TSDB blocks") importFromFile := importCmd.Flag("input-file", "Promtext file to read samples from.").Short('f').Required().String() importBlockSize := importCmd.Flag("block-size", "The maximum block size. The actual block timestamps will be aligned with Prometheus time ranges").Default("2h").Duration() - importDataDir := importCmd.Flag("data-dir", "Data directory in which to cache blocks"). + importDir := importCmd.Flag("data-dir", "Data directory in which to cache blocks"). Default("./data").String() importLabels := importCmd.Flag("label", "Labels to add as Thanos block metadata (repeated)").Short('l').PlaceHolder(`=""`).Required().Strings() importUpload := importCmd.Flag("upload", "Upload imported blocks to object storage").Default("false").Bool() + unwrapCmd := app.Command("unwrap", "Split TSDB block to multiple blocks by Label") + unwrapRelabel := extkingpin.RegisterPathOrContent(unwrapCmd, "relabel-config", fmt.Sprintf("YAML file that contains relabeling configuration. Set %s=name1;name2;... to split separate blocks for each uniq label combination.", metaExtLabels), extkingpin.WithEnvSubstitution(), extkingpin.WithRequired()) + unwrapRecursive := unwrapCmd.Flag("recursive", "Recursive search for blocks in the bucket (Mimir has blocks nested to tenants folders)").Short('r').Default("false").Bool() + unwrapDir := unwrapCmd.Flag("data-dir", "Data directory in which to cache blocks and process tsdb.").Default("./data").String() + unwrapWait := unwrapCmd.Flag("wait-interval", "Wait interval between consecutive runs and bucket refreshes. Run once if 0.").Default("5m").Short('w').Duration() + unwrapDry := unwrapCmd.Flag("dry-run", "Don't do any changes to bucket. Only print what would be done.").Default("false").Bool() + unwrapDst := extkingpin.RegisterPathOrContent(unwrapCmd, "dst.config", "YAML file that contains destination object store configuration for generated blocks.", extkingpin.WithEnvSubstitution(), extkingpin.WithRequired()) + cmd := kingpin.MustParse(app.Parse(os.Args[1:])) var logger log.Logger { @@ -74,15 +82,17 @@ func main() { switch cmd { case lsCmd.FullCommand(): - exitCode(ls(bkt, recursive)) + exitCode(ls(bkt, lsRecursive)) case inspectCmd.FullCommand(): - exitCode(inspect(bkt, recursive, inspectSelector, inspectSortBy, logger)) + exitCode(inspect(bkt, inspectRecursive, inspectSelector, inspectSortBy, logger)) case analyzeCmd.FullCommand(): - exitCode(analyze(bkt, analyzeULID, analyzeDataDir, analyzeLimit, analyzeMatchers, logger)) + exitCode(analyze(bkt, analyzeULID, analyzeDir, analyzeLimit, analyzeMatchers, logger)) case dumpCmd.FullCommand(): - exitCode(dump(bkt, os.Stdout, dumpULIDs, dumpDataDir, dumpMinTime, dumpMaxTime, dumpMatch, logger)) + exitCode(dump(bkt, os.Stdout, dumpULIDs, dumpDir, dumpMinTime, dumpMaxTime, dumpMatch, logger)) case importCmd.FullCommand(): - exitCode(importMetrics(bkt, importFromFile, importBlockSize, importDataDir, importLabels, *importUpload, logger)) + exitCode(importMetrics(bkt, importFromFile, importBlockSize, importDir, importLabels, *importUpload, logger)) + case unwrapCmd.FullCommand(): + exitCode(unwrap(bkt, *unwrapRelabel, *unwrapRecursive, unwrapDir, unwrapWait, *unwrapDry, unwrapDst, logger)) } } diff --git a/unwrap.go b/unwrap.go new file mode 100644 index 0000000..f4be1e7 --- /dev/null +++ b/unwrap.go @@ -0,0 +1,313 @@ +package main + +import ( + "context" + "fmt" + "github.com/efficientgo/tools/extkingpin" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/thanos-io/objstore" + "github.com/thanos-io/objstore/client" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/runutil" + "gopkg.in/yaml.v2" + "math" + "os" + "path" + "path/filepath" + "slices" + "strings" + "sync" + "time" +) + +const metaExtLabels = "__meta_ext_labels" + +func unwrap(bkt objstore.Bucket, unwrapRelabel extkingpin.PathOrContent, recursive bool, dir *string, wait *time.Duration, unwrapDry bool, outConfig *extkingpin.PathOrContent, logger log.Logger) (err error) { + relabelContentYaml, err := unwrapRelabel.Content() + if err != nil { + return fmt.Errorf("get content of relabel configuration: %w", err) + } + var relabelConfig []*relabel.Config + if err := yaml.Unmarshal(relabelContentYaml, &relabelConfig); err != nil { + return fmt.Errorf("parse relabel configuration: %w", err) + } + + objStoreYaml, err := outConfig.Content() + if err != nil { + return err + } + dst, err := client.NewBucket(logger, objStoreYaml, "thanos-kit") + if err != nil { + return err + } + + processBucket := func() error { + begin := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + blocks, err := getBlocks(ctx, bkt, recursive) + if err != nil { + return err + } + for _, b := range blocks { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + if err := unwrapBlock(ctx, bkt, b, relabelConfig, *dir, unwrapDry, dst, logger); err != nil { + return err + } + } + level.Info(logger).Log("msg", "bucket iteration done", "blocks", len(blocks), "duration", time.Since(begin), "sleeping", wait) + return nil + } + + if *wait == 0 { + return processBucket() + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + return runutil.Repeat(*wait, ctx.Done(), func() error { + return processBucket() + }) +} + +func unwrapBlock(ctx context.Context, bkt objstore.Bucket, b Block, relabelConfig []*relabel.Config, dir string, unwrapDry bool, dst objstore.Bucket, logger log.Logger) error { + if err := runutil.DeleteAll(dir); err != nil { + return fmt.Errorf("unable to cleanup cache folder %s: %w", dir, err) + } + inDir := path.Join(dir, "in") + outDir := path.Join(dir, "out") + matchAll := &labels.Matcher{ + Name: "__name__", + Type: labels.MatchNotEqual, + Value: "", + } + + // prepare input + pb := objstore.NewPrefixedBucket(bkt, b.Prefix) + if err := downloadBlock(ctx, inDir, b.Id.String(), pb, logger); err != nil { + return err + } + os.Mkdir(path.Join(inDir, "wal"), 0777) + origMeta, err := metadata.ReadFromDir(path.Join(inDir, b.Id.String())) + if err != nil { + return fmt.Errorf("fail to read meta.json for %s: %w", b.Id.String(), err) + } + db, err := tsdb.OpenDBReadOnly(inDir, logger) + if err != nil { + return err + } + defer func() { + err = tsdb_errors.NewMulti(err, db.Close()).Err() + }() + q, err := db.Querier(ctx, 0, math.MaxInt64) + if err != nil { + return err + } + defer q.Close() + + // prepare output + os.Mkdir(outDir, 0777) + duration := getCompatibleBlockDuration(math.MaxInt64) //todo + mw, err := newMultiBlockWriter(outDir, logger, duration, *origMeta) + if err != nil { + return err + } + + // unwrap + ss := q.Select(false, nil, matchAll) + for ss.Next() { + series := ss.At() + rl, keep := relabel.Process(series.Labels(), relabelConfig...) + if !keep { + continue + } + + lbs, extl := extractLabels(rl, strings.Split(rl.Get(metaExtLabels), ";")) + tdb, err := mw.getTenant(ctx, extl) + if err != nil { + return err + } + it := series.Iterator(nil) + for it.Next() == chunkenc.ValFloat { + ts, val := it.At() + if _, err := tdb.appender.Append(0, lbs, ts, val); err != nil { + return err + } + tdb.samples++ + } + for it.Next() == chunkenc.ValFloatHistogram { + ts, fh := it.AtFloatHistogram() + if _, err := tdb.appender.AppendHistogram(0, lbs, ts, nil, fh); err != nil { + return err + } + tdb.samples++ + } + for it.Next() == chunkenc.ValHistogram { + ts, h := it.AtHistogram() + if _, err := tdb.appender.AppendHistogram(0, lbs, ts, h, nil); err != nil { + return err + } + tdb.samples++ + } + if it.Err() != nil { + return ss.Err() + } + } + + if ss.Err() != nil { + return ss.Err() + } + if ws := ss.Warnings(); len(ws) > 0 { + return tsdb_errors.NewMulti(ws...).Err() + } + + blocks, err := mw.flush(ctx) + if unwrapDry { + level.Info(logger).Log("msg", "dry-run: skipping upload of created blocks and delete of original block", "ulids", fmt.Sprint(blocks), "orig", b.Id) + } else { + for _, id := range blocks { + begin := time.Now() + err = block.Upload(ctx, logger, dst, filepath.Join(outDir, id.String()), metadata.SHA256Func) + if err != nil { + return fmt.Errorf("upload block %s: %v", id, err) + } + level.Info(logger).Log("msg", "uploaded block", "ulid", id, "duration", time.Since(begin)) + } + level.Info(logger).Log("msg", "deleting original block", "ulid", b.Id) + if err := pb.Delete(ctx, b.Id.String()); err != nil { + return fmt.Errorf("delete block %s/%s: %v", b.Prefix, b.Id, err) + } + } + + return nil +} + +// extractLabels splits given labels to two sets: for given `names` and the rest without metaExtLabels preserving sort order +func extractLabels(ls labels.Labels, names []string) (res labels.Labels, el labels.Labels) { + slices.Sort(names) + i, j := 0, 0 + for i < len(ls) && j < len(names) { + switch { + // https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config + // Labels starting with __ will be removed from the label set after target relabeling is completed. + case strings.HasPrefix(ls[i].Name, "__") && ls[i].Name != "__name__": + i++ + case names[j] < ls[i].Name: + j++ + case ls[i].Name < names[j]: + res = append(res, ls[i]) + i++ + default: + el = append(el, ls[i]) + i++ + j++ + } + } + res = append(res, ls[i:]...) + return res, el +} + +type MultiBlockWriter struct { + db *tsdb.DBReadOnly + origMeta metadata.Meta + blockSize int64 // in ms + tenants map[uint64]*tenant + logger log.Logger + dir string + mu sync.Mutex +} + +type tenant struct { + appender storage.Appender + writer *tsdb.BlockWriter + samples int + extLables labels.Labels +} + +// newMultiBlockWriter creates a new multi-tenant tsdb BlockWriter +func newMultiBlockWriter(dir string, logger log.Logger, blockSize int64, meta metadata.Meta) (*MultiBlockWriter, error) { + return &MultiBlockWriter{ + blockSize: blockSize, + origMeta: meta, + tenants: make(map[uint64]*tenant), + logger: logger, + dir: dir, + }, nil +} + +func (m *MultiBlockWriter) getTenant(ctx context.Context, lbls labels.Labels) (*tenant, error) { + id := lbls.Hash() + if _, ok := m.tenants[id]; !ok { + w, err := tsdb.NewBlockWriter(m.logger, m.dir, m.blockSize) + if err != nil { + return nil, err + } + m.mu.Lock() + m.tenants[id] = &tenant{ + writer: w, + appender: w.Appender(ctx), + samples: 0, + extLables: lbls, + } + m.mu.Unlock() + } + // reduce Mem usage, commit appender + if m.tenants[id].samples > 5000 { + if err := m.commit(ctx, id); err != nil { + return nil, err + } + } + + return m.tenants[id], nil +} + +func (m *MultiBlockWriter) commit(ctx context.Context, id uint64) error { + err := m.tenants[id].appender.Commit() + m.mu.Lock() + defer m.mu.Unlock() + m.tenants[id].appender = m.tenants[id].writer.Appender(ctx) + m.tenants[id].samples = 0 + return err +} + +// flush writes all blocks and reset MultiBlockWriter tenants +func (m *MultiBlockWriter) flush(ctx context.Context) (ids []ulid.ULID, err error) { + m.mu.Lock() + defer m.mu.Unlock() + for _, t := range m.tenants { + if err := t.appender.Commit(); err != nil { + return nil, err + } + id, err := t.writer.Flush(ctx) + if err != nil { + return nil, err + } + ids = append(ids, id) + if err := t.writer.Close(); err != nil { + return nil, err + } + + meta, err := metadata.ReadFromDir(path.Join(m.dir, id.String())) + if err != nil { + return nil, fmt.Errorf("read %s metadata: %w", id, err) + } + l := m.origMeta.Thanos.Labels + for _, e := range t.extLables { + l[e.Name] = e.Value + } + if err = writeThanosMeta(meta.BlockMeta, l, m.origMeta.Thanos.Downsample.Resolution, m.dir, m.logger); err != nil { + return nil, fmt.Errorf("write %s metadata: %w", id, err) + } + } + m.tenants = make(map[uint64]*tenant) + return ids, nil +}