From 336818d6fb2da304924a490a45e96e74394b530d Mon Sep 17 00:00:00 2001 From: Andy Asp Date: Thu, 6 Feb 2025 15:42:13 -0500 Subject: [PATCH 01/11] Replace markblocks with rewritten mark-blocks --- tools/mark-blocks/.gitignore | 1 + tools/mark-blocks/main.go | 367 +++++++++++++++++++++++++++++++++ tools/mark-blocks/main_test.go | 194 +++++++++++++++++ tools/markblocks/.gitignore | 1 - tools/markblocks/main.go | 278 ------------------------- 5 files changed, 562 insertions(+), 279 deletions(-) create mode 100644 tools/mark-blocks/.gitignore create mode 100644 tools/mark-blocks/main.go create mode 100644 tools/mark-blocks/main_test.go delete mode 100644 tools/markblocks/.gitignore delete mode 100644 tools/markblocks/main.go diff --git a/tools/mark-blocks/.gitignore b/tools/mark-blocks/.gitignore new file mode 100644 index 00000000000..a636739a792 --- /dev/null +++ b/tools/mark-blocks/.gitignore @@ -0,0 +1 @@ +mark-blocks diff --git a/tools/mark-blocks/main.go b/tools/mark-blocks/main.go new file mode 100644 index 00000000000..9edaf937311 --- /dev/null +++ b/tools/mark-blocks/main.go @@ -0,0 +1,367 @@ +package main + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "os" + "path" + "slices" + "strings" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/concurrency" + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/tenant" + "github.com/oklog/ulid" + "github.com/thanos-io/objstore" + + "github.com/grafana/mimir/pkg/storage/bucket" + "github.com/grafana/mimir/pkg/storage/tsdb/block" +) + +type config struct { + bucket bucket.Config + tenantID string + markType string + details string + blocks flagext.StringSliceCSV + blocksFile string + resumeIndex int + remove bool + metaPresencePolicy string + dryRun bool + concurrency int +} + +func (cfg *config) registerFlags(f *flag.FlagSet) { + cfg.bucket.RegisterFlags(f) + f.StringVar(&cfg.tenantID, "tenant", "", "Tenant ID of the owner of the block. Required.") + f.StringVar(&cfg.markType, "mark-type", "", "Mark type to create, valid options: deletion, no-compact. Required.") + f.StringVar(&cfg.details, "details", "", "Details to include in a mark.") + f.Var(&cfg.blocks, "blocks", "Comma separated list of blocks. If non-empty, blocks-file is ignored.") + f.StringVar(&cfg.blocksFile, "blocks-file", "-", "File containing block IDs to mark. Defaults to standard input. Ignored if blocks is non-empty") + f.IntVar(&cfg.resumeIndex, "resume-index", 0, "The index of the block in blocks-file to resume from") + f.BoolVar(&cfg.remove, "remove", false, "If marks should be removed rather than uploaded.") + f.StringVar(&cfg.metaPresencePolicy, "meta-presence-policy", "require", "How to validate block meta.json files: \"none\", \"skip-block\", or \"require\".") + f.BoolVar(&cfg.dryRun, "dry-run", false, "Don't upload the markers generated, just print the intentions.") + f.IntVar(&cfg.concurrency, "concurrency", 16, "How many markers to upload or remove concurrently.") +} + +func (cfg *config) validate() error { + if cfg.tenantID == "" { + return errors.New("-tenant is required") + } + if err := tenant.ValidTenantID(cfg.tenantID); err != nil { + return fmt.Errorf("-tenant is invalid: %w", err) + } + if cfg.markType == "" { + return errors.New("-mark is required") + } + if cfg.blocksFile == "" && len(cfg.blocks) == 0 { + return errors.New("one of -blocks or -blocks-file must be specified") + } + if cfg.concurrency < 1 { + return errors.New("-concurrency must be positive") + } + if cfg.resumeIndex < 0 { + return errors.New("-resume-index must be non-negative") + } + return nil +} + +func main() { + var cfg config + cfg.registerFlags(flag.CommandLine) + + logger := log.NewLogfmtLogger(os.Stdout) + + // Parse CLI arguments. + if err := flagext.ParseFlagsWithoutArguments(flag.CommandLine); err != nil { + level.Error(logger).Log("msg", "failed to parse flags", "err", err) + os.Exit(1) + } + + if err := cfg.validate(); err != nil { + level.Error(logger).Log("msg", "flags did not pass validation", "err", err) + os.Exit(1) + } + + blocks, err := getBlocks(cfg.blocks, cfg.blocksFile) + if err != nil { + level.Error(logger).Log("msg", "failed to read blocks to mark", "err", err) + os.Exit(1) + } + + ctx := context.Background() + bkt, err := bucket.NewClient(ctx, cfg.bucket, "bucket", logger, nil) + if err != nil { + level.Error(logger).Log("msg", "failed to create bucket", "err", err) + os.Exit(1) + } + + if cfg.resumeIndex >= len(blocks) { + level.Error(logger).Log("msg", "invalid resume index", "resumeIndex", cfg.resumeIndex, "numBlocks", len(blocks)) + os.Exit(1) + } else if cfg.resumeIndex > 0 { + level.Info(logger).Log("msg", "skipping blocks due to resume", "resumeIndex", cfg.resumeIndex, "numBlocks", len(blocks)) + } + + blocks = blocks[cfg.resumeIndex:] + if len(blocks) == 0 { + level.Warn(logger).Log("msg", "no blocks, nothing marked") + os.Exit(0) + } + + mpf, err := metaPresenceFunc(cfg.tenantID, bkt, cfg.metaPresencePolicy) + if err != nil { + level.Error(logger).Log("msg", "failed to handle meta validation policy", "err", err) + os.Exit(1) + } + + mbf, suffix, err := markerBytesFunc(cfg.markType, cfg.details) + if err != nil { + level.Error(logger).Log("msg", "failed to handle marker type", "err", err) + os.Exit(1) + } + + var f func(context.Context, int) error + if cfg.remove { + f = removeMarksFunc(cfg, bkt, blocks, mpf, suffix, logger) + } else { + f = addMarksFunc(cfg, bkt, blocks, mpf, mbf, suffix, logger) + } + + if successUntil, err := forEachJobSuccessUntil(ctx, len(blocks), cfg.concurrency, f); err != nil { + // since indices were possibly based on a subslice, account for that as a starting point + // successUntil is known to be the first index that did not succeed + resumeFrom := cfg.resumeIndex + successUntil + + level.Error(logger).Log("msg", "encountered a failure", "err", err, "resumeFrom", resumeFrom) + os.Exit(1) + } +} + +// forEachJobSuccessUntil executes a function concurrently until an error is encountered or all jobs have completed +// It also returns the maximum index i where all indexes [0, i) succeeded and 0 <= i <= numJobs +func forEachJobSuccessUntil(ctx context.Context, numJobs int, jobConcurrency int, f func(ctx context.Context, idx int) error) (int, error) { + succeeded := make([]bool, numJobs) + err := concurrency.ForEachJob(ctx, numJobs, jobConcurrency, func(ctx context.Context, idx int) error { + if err := f(ctx, idx); err != nil { + return err + } + succeeded[idx] = true + return nil + }) + if err == nil { + return numJobs, err + } + return slices.Index(succeeded, false), err +} + +// removeMarksFunc returns a function that removes block markers for a given block index +func removeMarksFunc(cfg config, bkt objstore.Bucket, blocks []ulid.ULID, mpf metaPresence, suffix string, logger log.Logger) func(context.Context, int) error { + return func(ctx context.Context, idx int) error { + block := blocks[idx].String() + + skip, err := mpf(ctx, block) + if err != nil { + return err + } + if skip { + level.Info(logger).Log("msg", fmt.Sprintf("skipping block because its meta.json was not found: %s", block)) + return nil + } + + blockMarkPath := localMarkPath(cfg.tenantID, block, suffix) + globalMarkPath := globalMarkPath(cfg.tenantID, block, suffix) + + if cfg.dryRun { + level.Info(logger).Log("msg", fmt.Sprintf("would delete global mark at %s", globalMarkPath)) + level.Info(logger).Log("msg", fmt.Sprintf("would delete mark at %s", blockMarkPath)) + return nil + } + + // Global mark deleted first, local mark deleted second to follow write ordering + for _, markPath := range []string{globalMarkPath, blockMarkPath} { + if err := bkt.Delete(ctx, markPath); err != nil { + if bkt.IsObjNotFoundErr(err) { + level.Info(logger).Log("msg", fmt.Sprintf("deleted mark, but it was not present at %s", markPath)) + continue + } + return err + } + level.Info(logger).Log("msg", fmt.Sprintf("deleted mark at %s", markPath)) + } + return nil + } +} + +// addMarksFunc returns a function that uploads block markers for a given block index +func addMarksFunc( + cfg config, + bkt objstore.Bucket, + blocks []ulid.ULID, + mpf metaPresence, + mbf markerBytes, + suffix string, + logger log.Logger) func(context.Context, int) error { + + return func(ctx context.Context, idx int) error { + block := blocks[idx] + blockStr := block.String() + + skip, err := mpf(ctx, blockStr) + if err != nil { + return err + } + if skip { + level.Info(logger).Log("msg", fmt.Sprintf("skipping block because its meta.json was not found: %s", block)) + return nil + } + + b, err := mbf(block) + if err != nil { + return err + } + + blockMarkPath := localMarkPath(cfg.tenantID, blockStr, suffix) + globalMarkPath := globalMarkPath(cfg.tenantID, blockStr, suffix) + + if cfg.dryRun { + level.Info(logger).Log("msg", fmt.Sprintf("would upload mark to %s", blockMarkPath)) + level.Info(logger).Log("msg", fmt.Sprintf("would upload global mark to %s", globalMarkPath)) + return nil + } + + // Local mark first, global mark second to follow write ordering + for _, markPath := range []string{globalMarkPath, blockMarkPath} { + if err := bkt.Upload(ctx, markPath, bytes.NewReader(b)); err != nil { + return err + } + level.Info(logger).Log("msg", fmt.Sprintf("uploaded mark to %s", markPath)) + } + return nil + } +} + +func localMarkPath(tenantID, blk, markSuffix string) string { + return path.Join(tenantID, blk, markSuffix) +} + +func globalMarkPath(tenantID, blk, markSuffix string) string { + return path.Join(tenantID, block.MarkersPathname, blk+"-"+markSuffix) +} + +func metaPath(tenantID, blk string) string { + return path.Join(tenantID, blk, block.MetaFilename) +} + +type markerBytes func(b ulid.ULID) ([]byte, error) + +// markerBytesFunc accepts a mark type and details and returns a function to create a marker (given a blockID) and the name suffix of said marker +// If the mark type is unrecognized a non-nil error is returned +func markerBytesFunc(markType, details string) (markerBytes, string, error) { + switch markType { + case "no-compact": + return func(b ulid.ULID) ([]byte, error) { + return json.Marshal(block.NoCompactMark{ + ID: b, + Version: block.NoCompactMarkVersion1, + NoCompactTime: time.Now().Unix(), + Reason: block.ManualNoCompactReason, + Details: details, + }) + }, block.NoCompactMarkFilename, nil + case "deletion": + return func(b ulid.ULID) ([]byte, error) { + return json.Marshal(block.DeletionMark{ + ID: b, + Version: block.DeletionMarkVersion1, + Details: details, + DeletionTime: time.Now().Unix(), + }) + }, block.DeletionMarkFilename, nil + default: + return nil, "", fmt.Errorf("invalid mark type (must be no-compact or deletion): %q", markType) + } +} + +type metaPresence func(ctx context.Context, blk string) (bool, error) + +func metaPresenceFunc(tenantID string, bkt objstore.Bucket, policy string) (metaPresence, error) { + switch policy { + case "none": + // The meta is not checked at all + return func(ctx context.Context, blk string) (bool, error) { + return false, nil + }, nil + case "skip-block": + // If the meta is not present, skip this block + return func(ctx context.Context, blk string) (bool, error) { + exists, err := bkt.Exists(ctx, metaPath(tenantID, blk)) + return !exists, err + }, nil + case "require": + // If the meta is not present an error is returned + return func(ctx context.Context, blk string) (bool, error) { + metaName := metaPath(tenantID, blk) + exists, err := bkt.Exists(ctx, metaName) + if err != nil { + return false, err + } + if !exists { + return false, fmt.Errorf("block's metadata did not exist: %q", metaName) + } + return false, nil + }, nil + default: + return nil, fmt.Errorf("unrecognized meta-validation-policy: %q", policy) + } + +} + +func getBlocks(blocks flagext.StringSliceCSV, filePath string) ([]ulid.ULID, error) { + if len(blocks) > 0 { + r := strings.NewReader(strings.Join(blocks, "\n")) + return readBlocks(r) + } + + input, err := getInputFile(filePath) + if err != nil { + return nil, err + } + defer input.Close() + + return readBlocks(input) + +} + +func getInputFile(filePath string) (*os.File, error) { + if filePath == "-" { + return os.Stdin, nil + } + return os.Open(filePath) +} + +// readBlocks reads lines of blockIDs +func readBlocks(r io.Reader) ([]ulid.ULID, error) { + var blocks []ulid.ULID + scanner := bufio.NewScanner(r) + for scanner.Scan() { + line := scanner.Text() + u, err := ulid.Parse(line) + if err != nil { + return nil, fmt.Errorf("failed to parse a string=%s as a ULID", line) + } + blocks = append(blocks, u) + } + return blocks, nil +} diff --git a/tools/mark-blocks/main_test.go b/tools/mark-blocks/main_test.go new file mode 100644 index 00000000000..692a71e559e --- /dev/null +++ b/tools/mark-blocks/main_test.go @@ -0,0 +1,194 @@ +package main + +import ( + "bytes" + "context" + "errors" + "os" + "path" + "strings" + "testing" + + "github.com/go-kit/log" + "github.com/oklog/ulid" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" +) + +func generateBlocks(t *testing.T) []ulid.ULID { + blocks := make([]ulid.ULID, 0, 5) + for i := 0; i < 5; i++ { + id, err := ulid.New(ulid.Now(), nil) + require.NoError(t, err) + blocks = append(blocks, id) + } + return blocks +} + +func TestAddAndRemoveMarks(t *testing.T) { + cfg := config{ + tenantID: "tenant", + } + bkt := objstore.NewInMemBucket() + blocks := generateBlocks(t) + logger := log.NewNopLogger() + ctx := context.Background() + mvf, err := metaPresenceFunc("", nil, "none") + require.NoError(t, err) + + for _, markType := range []string{"no-compact", "deletion"} { + mbf, suffix, err := markerBytesFunc(markType, "") + require.NoError(t, err) + addF := addMarksFunc(cfg, bkt, blocks, mvf, mbf, suffix, logger) + removeF := removeMarksFunc(cfg, bkt, blocks, mvf, suffix, logger) + for i := 0; i < len(blocks); i++ { + err := addF(ctx, i) + require.NoError(t, err) + + exists, err := bkt.Exists(ctx, localMarkPath("tenant", blocks[i].String(), suffix)) + require.NoError(t, err) + require.True(t, exists) + + exists, err = bkt.Exists(ctx, globalMarkPath("tenant", blocks[i].String(), suffix)) + require.NoError(t, err) + require.True(t, exists) + } + + for i := 0; i < len(blocks); i++ { + err := removeF(ctx, i) + require.NoError(t, err) + + exists, err := bkt.Exists(ctx, localMarkPath("tenant", blocks[i].String(), suffix)) + require.NoError(t, err) + require.False(t, exists) + + exists, err = bkt.Exists(ctx, globalMarkPath("tenant", blocks[i].String(), suffix)) + require.NoError(t, err) + require.False(t, exists) + } + } + + var names []string + err = bkt.Iter(ctx, "", func(name string) error { + names = append(names, name) + return nil + }) + require.NoError(t, err) + require.Empty(t, names) +} + +func TestMetaValidationPolicy(t *testing.T) { + tenantID := "tenant" + + id, err := ulid.New(ulid.Now(), nil) + require.NoError(t, err) + blockID := id.String() + + cases := map[string]struct { + missingSkip bool + missingError bool + presentSkip bool + }{ + "none": { + false, + false, + false, + }, + "skip-block": { + true, + false, + false, + }, + "require": { + false, + true, + false, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + ctx := context.Background() + bkt := objstore.NewInMemBucket() + + mvf, err := metaPresenceFunc(tenantID, bkt, name) + require.NoError(t, err) + + // Block meta.json is not present + skip, err := mvf(ctx, blockID) + if tc.missingError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.missingSkip, skip) + + err = bkt.Upload(ctx, metaPath(tenantID, blockID), bytes.NewReader(nil)) + require.NoError(t, err) + + // Block meta.json is present + skip, err = mvf(ctx, blockID) + require.NoError(t, err) + require.Equal(t, tc.presentSkip, skip) + }) + } +} + +func TestReadBlocks(t *testing.T) { + blocks := generateBlocks(t) + blockStrings := make([]string, 0, len(blocks)) + for _, block := range blocks { + blockStrings = append(blockStrings, block.String()) + } + r := strings.NewReader(strings.Join(blockStrings, "\n")) + readBlocks, err := readBlocks(r) + require.NoError(t, err) + require.Equal(t, blocks, readBlocks) +} + +func TestForEachJobSuccessUntil(t *testing.T) { + until, err := forEachJobSuccessUntil(context.Background(), 5, 1, func(_ context.Context, idx int) error { + if idx == 4 { + return errors.New("injected failure") + } + return nil + }) + require.Error(t, err) + require.Equal(t, 4, until) + + until, err = forEachJobSuccessUntil(context.Background(), 10, 3, func(_ context.Context, _ int) error { + return nil + }) + require.NoError(t, err) + require.Equal(t, 10, until) +} + +func TestGetBlocks(t *testing.T) { + blockIDs := generateBlocks(t) + blockStrings := make([]string, 0, len(blockIDs)) + for _, blockID := range blockIDs { + blockStrings = append(blockStrings, blockID.String()) + } + + // Comma separated blocks take precedence + commaBlocks, err := getBlocks(blockStrings, "-") + require.NoError(t, err) + require.Equal(t, blockIDs, commaBlocks) + + // Write a file with a block per line + tmp := t.TempDir() + filePath := path.Join(tmp, "blockfile") + + data := []byte(strings.Join(blockStrings, "\n")) + err = os.WriteFile(filePath, data, os.ModePerm) + require.NoError(t, err) + + fileBlocks, err := getBlocks(nil, filePath) + require.NoError(t, err) + require.Equal(t, blockIDs, fileBlocks) + + // Missing file + missingBlocks, err := getBlocks(nil, path.Join(tmp, "missing")) + require.Error(t, err) + require.Empty(t, missingBlocks) +} diff --git a/tools/markblocks/.gitignore b/tools/markblocks/.gitignore deleted file mode 100644 index f39ec484d6e..00000000000 --- a/tools/markblocks/.gitignore +++ /dev/null @@ -1 +0,0 @@ -markblocks \ No newline at end of file diff --git a/tools/markblocks/main.go b/tools/markblocks/main.go deleted file mode 100644 index 9fbe5b71b46..00000000000 --- a/tools/markblocks/main.go +++ /dev/null @@ -1,278 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only - -package main - -import ( - "bytes" - "context" - "encoding/json" - "flag" - "fmt" - "os" - "strings" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - dskit_concurrency "github.com/grafana/dskit/concurrency" - "github.com/grafana/dskit/flagext" - "github.com/oklog/ulid" - "github.com/thanos-io/objstore" - - "github.com/grafana/mimir/pkg/storage/bucket" - "github.com/grafana/mimir/pkg/storage/tsdb/block" -) - -type config struct { - bucket bucket.Config - tenantID string - dryRun bool - allowPartialBlocks bool - concurrency int - skipExistenceCheck bool - - mark string - details string - blocks []string - - helpAll bool -} - -func main() { - ctx := context.Background() - logger := log.WithPrefix(log.NewLogfmtLogger(os.Stderr), "time", log.DefaultTimestampUTC) - - cfg := parseFlags() - marker, filename := createMarker(cfg.mark, logger, cfg.details) - ulids := validateTenantAndBlocks(logger, cfg.tenantID, cfg.blocks) - uploadMarks(ctx, logger, ulids, marker, filename, cfg.dryRun, cfg.bucket, cfg.tenantID, cfg.allowPartialBlocks, cfg.concurrency, cfg.skipExistenceCheck) -} - -func parseFlags() config { - var cfg config - - // We define two flag sets, one on basic straightforward flags of this cli, and the other one with all flags, - // which includes the bucket configuration flags, as there quite a lot of them and the help output with them - // might look a little bit overwhelming at first contact. - fullFlagSet := flag.NewFlagSet("markblocks", flag.ExitOnError) - fullFlagSet.SetOutput(os.Stdout) - basicFlagSet := flag.NewFlagSet("markblocks", flag.ExitOnError) - basicFlagSet.SetOutput(os.Stdout) - - // We register our basic flags on both basic and full flag set. - for _, f := range []*flag.FlagSet{basicFlagSet, fullFlagSet} { - f.StringVar(&cfg.tenantID, "tenant", "", "Tenant ID of the owner of the block. Required.") - f.StringVar(&cfg.mark, "mark", "", "Mark type to create, valid options: deletion, no-compact. Required.") - f.BoolVar(&cfg.dryRun, "dry-run", false, "Don't upload the markers generated, just print the intentions.") - f.StringVar(&cfg.details, "details", "", "Details field of the uploaded mark. Recommended. (default empty).") - f.BoolVar(&cfg.helpAll, "help-all", false, "Show help for all flags, including the bucket backend configuration.") - f.BoolVar(&cfg.allowPartialBlocks, "allow-partial", false, "Allow upload of marks into partial blocks (ie. blocks without meta.json). Only useful for deletion mark.") - f.BoolVar(&cfg.skipExistenceCheck, "skip-existence-check", false, "If true, do not check blocks exist before marking them.") - } - - commonUsageHeader := func() { - fmt.Println("This tool creates marks for TSDB blocks used by Mimir and uploads them to the specified backend.") - fmt.Println("") - fmt.Println("Usage:") - fmt.Println(" markblocks -tenant -mark [-details
] [-dry-run] blockID [blockID2 blockID3 ...]") - fmt.Println("") - } - - // We set the usage to fullFlagSet as that's the flag set we'll be always parsing, - // but by default we print only the basic flag set defaults. - fullFlagSet.Usage = func() { - commonUsageHeader() - if cfg.helpAll { - fullFlagSet.PrintDefaults() - } else { - basicFlagSet.PrintDefaults() - } - } - - // We set only the `-backend` flag on the basicFlagSet, to make sure that user sees that there are more backends supported. - // Then we register all bucket flags on the full flag set, which is the flag set we're parsing. - basicFlagSet.StringVar(&cfg.bucket.Backend, "backend", bucket.Filesystem, fmt.Sprintf("Backend storage to use. Supported backends are: %s. Use -help-all to see help on backends configuration.", strings.Join(bucket.SupportedBackends, ", "))) - cfg.bucket.RegisterFlags(fullFlagSet) - - fullFlagSet.IntVar(&cfg.concurrency, "concurrency", 16, "How many markers to upload concurrently.") - - if err := fullFlagSet.Parse(os.Args[1:]); err != nil { - fmt.Println(err) - os.Exit(1) - } - - // See if user did `markblocks -help-all`. - if cfg.helpAll { - commonUsageHeader() - fullFlagSet.PrintDefaults() - os.Exit(0) - } - cfg.blocks = fullFlagSet.Args() - - return cfg -} - -func validateTenantAndBlocks(logger log.Logger, tenantID string, blockIDs flagext.StringSlice) []ulid.ULID { - if tenantID == "" { - level.Error(logger).Log("msg", "Flag -tenant is required.") - os.Exit(1) - } - - if len(blockIDs) == 0 { - level.Warn(logger).Log("msg", "No blocks were provided. Nothing was done.") - os.Exit(0) - } - - var ulids []ulid.ULID - for _, b := range blockIDs { - blockID, err := ulid.Parse(b) - if err != nil { - level.Error(logger).Log("msg", "Can't parse block ID.", "block", b, "err", err) - os.Exit(1) - } - ulids = append(ulids, blockID) - } - return ulids -} - -func createMarker(markType string, logger log.Logger, details string) (func(b ulid.ULID) ([]byte, error), string) { - switch markType { - case "no-compact": - return func(b ulid.ULID) ([]byte, error) { - return json.Marshal(block.NoCompactMark{ - ID: b, - Version: block.NoCompactMarkVersion1, - NoCompactTime: time.Now().Unix(), - Reason: block.ManualNoCompactReason, - Details: details, - }) - }, block.NoCompactMarkFilename - case "deletion": - return func(b ulid.ULID) ([]byte, error) { - return json.Marshal(block.DeletionMark{ - ID: b, - Version: block.DeletionMarkVersion1, - Details: details, - DeletionTime: time.Now().Unix(), - }) - }, block.DeletionMarkFilename - default: - level.Error(logger).Log("msg", "Invalid -mark flag value. Should be no-compact or deletion.", "value", markType) - os.Exit(1) - panic("We never reach this.") - } -} - -func uploadMarks( - ctx context.Context, - logger log.Logger, - ulids []ulid.ULID, - mark func(b ulid.ULID) ([]byte, error), - markFilename string, - dryRun bool, - cfg bucket.Config, - tenantID string, - allowPartialBlocks bool, - concurrency int, - skipExistenceCheck bool, -) { - userBucketWithGlobalMarkers := createUserBucketWithGlobalMarkers(ctx, logger, cfg, tenantID) - - err := dskit_concurrency.ForEachJob(ctx, len(ulids), concurrency, func(ctx context.Context, idx int) error { - b := ulids[idx] - - blockFiles, exists, err := getBlockFiles(ctx, b, userBucketWithGlobalMarkers, allowPartialBlocks, skipExistenceCheck, logger) - if err != nil { - return err - } - if !exists { - return nil - } - - blockMarkPath := fmt.Sprintf("%s/%s", b, markFilename) - if blockFiles[markFilename] { - level.Warn(logger).Log("msg", "Mark already exists in block directory, skipping.", "block", b, "path", blockMarkPath) - return nil - } - - data, err := mark(b) - if err != nil { - level.Error(logger).Log("msg", "Can't create mark.", "block", b, "err", err) - return err - } - - if dryRun { - level.Info(logger).Log("msg", "Dry-run, not uploading marker.", "block", b, "marker", blockMarkPath, "data", string(data)) - return nil - } - - if err := userBucketWithGlobalMarkers.Upload(ctx, blockMarkPath, bytes.NewReader(data)); err != nil { - level.Error(logger).Log("msg", "Can't upload mark.", "block", b, "err", err) - return err - } - - level.Info(logger).Log("msg", "Successfully uploaded mark.", "block", b) - return nil - }) - - if err != nil { - os.Exit(1) - } -} - -func getBlockFiles(ctx context.Context, b ulid.ULID, bucket objstore.Bucket, allowPartialBlocks bool, skipExistenceCheck bool, logger log.Logger) (map[string]bool, bool, error) { - blockFiles := map[string]bool{} - // List all files in the blocks directory. We don't need recursive listing: if any segment - // files (chunks/0000xxx) are present, we will find "chunks" during iter. - err := bucket.Iter(ctx, b.String(), func(fn string) error { - if !strings.HasPrefix(fn, b.String()+"/") { - return nil - } - - fn = strings.TrimPrefix(fn, b.String()+"/") - fn = strings.TrimSuffix(fn, "/") - - blockFiles[fn] = true - return nil - }) - - if err != nil { - if bucket.IsObjNotFoundErr(err) { - if skipExistenceCheck { - return blockFiles, true, nil - } - - level.Warn(logger).Log("msg", "Block does not exist, skipping.", "block", b, "err", err) - return nil, false, nil - } - - level.Error(logger).Log("msg", "Failed to list files for block.", "block", b, "err", err) - return nil, false, err - } - - if !skipExistenceCheck { - if len(blockFiles) == 0 { - level.Warn(logger).Log("msg", "Block does not exist, skipping.", "block", b) - return nil, false, nil - } - - if !blockFiles[block.MetaFilename] && !allowPartialBlocks { - level.Warn(logger).Log("msg", "Block's meta.json file does not exist, skipping.", "block", b) - return nil, false, nil - } - } - - return blockFiles, true, nil -} - -func createUserBucketWithGlobalMarkers(ctx context.Context, logger log.Logger, cfg bucket.Config, tenantID string) objstore.Bucket { - bkt, err := bucket.NewClient(ctx, cfg, "bucket", logger, nil) - if err != nil { - level.Error(logger).Log("msg", "Can't instantiate bucket.", "err", err) - os.Exit(1) - } - userBucket := block.BucketWithGlobalMarkers( - bucket.NewUserBucketClient(tenantID, bkt, nil), - ) - return userBucket -} From e0e889c4e7efcbe7d0eccd165152f633ee875b88 Mon Sep 17 00:00:00 2001 From: Andy Asp Date: Thu, 6 Feb 2025 17:33:20 -0500 Subject: [PATCH 02/11] Lint --- tools/mark-blocks/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/mark-blocks/main.go b/tools/mark-blocks/main.go index 9edaf937311..e38634770f1 100644 --- a/tools/mark-blocks/main.go +++ b/tools/mark-blocks/main.go @@ -300,7 +300,7 @@ func metaPresenceFunc(tenantID string, bkt objstore.Bucket, policy string) (meta switch policy { case "none": // The meta is not checked at all - return func(ctx context.Context, blk string) (bool, error) { + return func(_ context.Context, _ string) (bool, error) { return false, nil }, nil case "skip-block": From 8edcd651f6eabe450be29f7d45f6699d74796c60 Mon Sep 17 00:00:00 2001 From: Andy Asp Date: Thu, 6 Feb 2025 17:41:18 -0500 Subject: [PATCH 03/11] Reorder error check for understandability --- tools/mark-blocks/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/mark-blocks/main.go b/tools/mark-blocks/main.go index e38634770f1..9ef55e90223 100644 --- a/tools/mark-blocks/main.go +++ b/tools/mark-blocks/main.go @@ -160,10 +160,10 @@ func forEachJobSuccessUntil(ctx context.Context, numJobs int, jobConcurrency int succeeded[idx] = true return nil }) - if err == nil { - return numJobs, err + if err != nil { + return slices.Index(succeeded, false), err } - return slices.Index(succeeded, false), err + return numJobs, nil } // removeMarksFunc returns a function that removes block markers for a given block index From 14641e5911e248d6153df6203315fbf4618a4a2c Mon Sep 17 00:00:00 2001 From: Andy Asp Date: Thu, 6 Feb 2025 17:54:03 -0500 Subject: [PATCH 04/11] Rename, reorder --- tools/mark-blocks/main.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tools/mark-blocks/main.go b/tools/mark-blocks/main.go index 9ef55e90223..187cb3ab60f 100644 --- a/tools/mark-blocks/main.go +++ b/tools/mark-blocks/main.go @@ -180,17 +180,17 @@ func removeMarksFunc(cfg config, bkt objstore.Bucket, blocks []ulid.ULID, mpf me return nil } - blockMarkPath := localMarkPath(cfg.tenantID, block, suffix) + localMarkPath := localMarkPath(cfg.tenantID, block, suffix) globalMarkPath := globalMarkPath(cfg.tenantID, block, suffix) if cfg.dryRun { level.Info(logger).Log("msg", fmt.Sprintf("would delete global mark at %s", globalMarkPath)) - level.Info(logger).Log("msg", fmt.Sprintf("would delete mark at %s", blockMarkPath)) + level.Info(logger).Log("msg", fmt.Sprintf("would delete mark at %s", localMarkPath)) return nil } // Global mark deleted first, local mark deleted second to follow write ordering - for _, markPath := range []string{globalMarkPath, blockMarkPath} { + for _, markPath := range []string{globalMarkPath, localMarkPath} { if err := bkt.Delete(ctx, markPath); err != nil { if bkt.IsObjNotFoundErr(err) { level.Info(logger).Log("msg", fmt.Sprintf("deleted mark, but it was not present at %s", markPath)) @@ -232,17 +232,17 @@ func addMarksFunc( return err } - blockMarkPath := localMarkPath(cfg.tenantID, blockStr, suffix) + localMarkPath := localMarkPath(cfg.tenantID, blockStr, suffix) globalMarkPath := globalMarkPath(cfg.tenantID, blockStr, suffix) if cfg.dryRun { - level.Info(logger).Log("msg", fmt.Sprintf("would upload mark to %s", blockMarkPath)) + level.Info(logger).Log("msg", fmt.Sprintf("would upload mark to %s", localMarkPath)) level.Info(logger).Log("msg", fmt.Sprintf("would upload global mark to %s", globalMarkPath)) return nil } // Local mark first, global mark second to follow write ordering - for _, markPath := range []string{globalMarkPath, blockMarkPath} { + for _, markPath := range []string{localMarkPath, globalMarkPath} { if err := bkt.Upload(ctx, markPath, bytes.NewReader(b)); err != nil { return err } From 77769969b1a1d6905848850f46c34747fe73e948 Mon Sep 17 00:00:00 2001 From: Andy Asp Date: Thu, 6 Feb 2025 19:32:08 -0500 Subject: [PATCH 05/11] Add license header --- tools/mark-blocks/main.go | 2 ++ tools/mark-blocks/main_test.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/tools/mark-blocks/main.go b/tools/mark-blocks/main.go index 187cb3ab60f..23fe74a8e40 100644 --- a/tools/mark-blocks/main.go +++ b/tools/mark-blocks/main.go @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: AGPL-3.0-only + package main import ( diff --git a/tools/mark-blocks/main_test.go b/tools/mark-blocks/main_test.go index 692a71e559e..cf0edce0b30 100644 --- a/tools/mark-blocks/main_test.go +++ b/tools/mark-blocks/main_test.go @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: AGPL-3.0-only + package main import ( From f766ad9792095c70fc4963dffd1dee3a76ad592e Mon Sep 17 00:00:00 2001 From: Andy Asp Date: Mon, 10 Feb 2025 10:20:26 -0500 Subject: [PATCH 06/11] Modify and add documentation --- CHANGELOG.md | 1 + Makefile | 6 +- .../tools/upload-block/upload-block.sh | 12 +-- docs/internal/tools/mark-blocks.md | 80 +++++++++++++++++++ docs/internal/tools/markblocks.md | 80 ------------------- .../mimir/manage/mimir-runbooks/_index.md | 6 +- tools/mark-blocks/README.md | 62 ++++++++++++++ tools/mark-blocks/main.go | 27 ++++--- 8 files changed, 169 insertions(+), 105 deletions(-) create mode 100644 docs/internal/tools/mark-blocks.md delete mode 100644 docs/internal/tools/markblocks.md create mode 100644 tools/mark-blocks/README.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 854b79de1c9..af1470bb410 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -98,6 +98,7 @@ ### Tools * [CHANGE] `copyblocks`: Remove /pprof endpoint. #10329 +* [CHANGE] `mark-blocks`: Replace `markblocks` with added features including removing markers and reading block identifiers from a file. #10597 ## 2.15.0 diff --git a/Makefile b/Makefile index e4ac16b55d4..76bf0dbd386 100644 --- a/Makefile +++ b/Makefile @@ -539,9 +539,9 @@ dist: ## Generates binaries for a Mimir release. echo "Building mimir-continuous-test for $$os/$$arch"; \ GOOS=$$os GOARCH=$$arch CGO_ENABLED=0 go build $(GO_FLAGS) -o ./dist/mimir-continuous-test-$$os-$$arch$$suffix ./cmd/mimir-continuous-test; \ sha256sum ./dist/mimir-continuous-test-$$os-$$arch$$suffix | cut -d ' ' -f 1 > ./dist/mimir-continuous-test-$$os-$$arch$$suffix-sha-256; \ - echo "Building markblocks for $$os/$$arch"; \ - GOOS=$$os GOARCH=$$arch CGO_ENABLED=0 go build $(GO_FLAGS) -o ./dist/markblocks-$$os-$$arch$$suffix ./tools/markblocks; \ - sha256sum ./dist/markblocks-$$os-$$arch$$suffix | cut -d ' ' -f 1 > ./dist/markblocks-$$os-$$arch$$suffix-sha-256; \ + echo "Building mark-blocks for $$os/$$arch"; \ + GOOS=$$os GOARCH=$$arch CGO_ENABLED=0 go build $(GO_FLAGS) -o ./dist/mark-blocks-$$os-$$arch$$suffix ./tools/mark-blocks; \ + sha256sum ./dist/mark-blocks-$$os-$$arch$$suffix | cut -d ' ' -f 1 > ./dist/mark-blocks-$$os-$$arch$$suffix-sha-256; \ done; \ done; \ touch $@ diff --git a/development/tools/upload-block/upload-block.sh b/development/tools/upload-block/upload-block.sh index 570163db4b6..8d16912b9a8 100755 --- a/development/tools/upload-block/upload-block.sh +++ b/development/tools/upload-block/upload-block.sh @@ -31,12 +31,12 @@ function main() { echo "Block ULID is $BLOCK_ULID." echo "Uploading no-compact marker..." - markblocks \ + mark_blocks \ -tenant="$TENANT" \ - -mark=no-compact \ + -mark-type=no-compact \ -details="block uploaded for debugging purposes" \ - -skip-existence-check=true \ - "$BLOCK_ULID" + -meta-presence-policy=none \ + -blocks "$BLOCK_ULID" echo "Uploading block contents..." aws_with_creds s3 cp --recursive "$BLOCK_DIR" "s3://$S3_BUCKET_NAME/$TENANT/$BLOCK_ULID" @@ -44,8 +44,8 @@ function main() { echo "Done." } -function markblocks() { - go run "$SCRIPT_DIR/../../../tools/markblocks" \ +function mark_blocks() { + go run "$SCRIPT_DIR/../../../tools/mark-blocks" \ -backend="s3" \ -s3.access-key-id="$AWS_ACCESS_KEY_ID" \ -s3.secret-access-key="$AWS_SECRET_ACCESS_KEY" \ diff --git a/docs/internal/tools/mark-blocks.md b/docs/internal/tools/mark-blocks.md new file mode 100644 index 00000000000..67e9677f028 --- /dev/null +++ b/docs/internal/tools/mark-blocks.md @@ -0,0 +1,80 @@ +# Mark blocks tool + +`mark-blocks` is a tool that assists in uploading or removing block markers to a specified backend. +Marks are uploaded to both the block folder and the global marks folder for the provided tenant. + +Two types of marks can be created, depending on the `-mark-type` flag provided: + +## `deletion` mark + +When `-mark-type deletion` is provided, `mark-blocks` uploads a `DeletionMark` which tells the compactor that the block provided should be deleted. +This is a **destructive operation** (although it won't happen immediately, the default delation delay is 12h, see `-compactor.deletion-delay` value), proceed with caution. + +### Example + +``` +$ mkdir -p tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS7Z + +$ touch tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R/meta.json + +$ tree -f tenant-1 +tenant-1 +├── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R +│   └── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R/meta.json +└── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS7Z + +3 directories, 1 file + +$ go run ./tools/mark-blocks -mark-type deletion -tenant tenant-1 -details "Corrupted blocks" -blocks "01FSCTA0A4M1YQHZQ4B2VTGS2R,01FSCTA0A4M1YQHZQ4B2VTGS7Z" +level=info time=2025-02-10T14:23:17.594517Z msg="skipping block because its meta.json was not found: 01FSCTA0A4M1YQHZQ4B2VTGS7Z" +level=info time=2025-02-10T14:23:17.594747Z msg="uploaded mark to tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R/deletion-mark.json" +level=info time=2025-02-10T14:23:17.594945Z msg="uploaded mark to tenant-1/markers/01FSCTA0A4M1YQHZQ4B2VTGS2R-deletion-mark.json" + +$ tree -f tenant-1 +tenant-1 +├── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R +│   ├── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R/deletion-mark.json +│   └── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R/meta.json +├── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS7Z +└── tenant-1/markers + └── tenant-1/markers/01FSCTA0A4M1YQHZQ4B2VTGS2R-deletion-mark.json + +4 directories, 3 files +``` + +## `no-compact` mark + +When `-mark-type no-compact` is provided, this tool uploads a `NoCompactMark` which tells the compactor that the marked blocks should not be compacted. +`-details` flag can be used to provide an explanation of why the block is marked as such (who marked it? issue link?). + +### Example + +``` +$ mkdir -p tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS7Z + +$ touch tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R/meta.json + +$ tree -f tenant-1 +tenant-1 +├── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R +│ └── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R/meta.json +└── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS7Z + +3 directories, 1 file + +$ go run ./tools/mark-blocks -mark-type no-compact -tenant tenant-1 -details "Blocks with out of order chunks" -blocks "01FSCTA0A4M1YQHZQ4B2VTGS2R,01FSCTA0A4M1YQHZQ4B2VTGS7Z" +level=info time=2025-02-10T14:27:46.232169Z msg="skipping block because its meta.json was not found: 01FSCTA0A4M1YQHZQ4B2VTGS7Z" +level=info time=2025-02-10T14:27:46.232419Z msg="uploaded mark to tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R/no-compact-mark.json" +level=info time=2025-02-10T14:27:46.232621Z msg="uploaded mark to tenant-1/markers/01FSCTA0A4M1YQHZQ4B2VTGS2R-no-compact-mark.json" + +$ tree -f tenant-1 +tenant-1 +├── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R +│   ├── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R/meta.json +│   └── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R/no-compact-mark.json +├── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS7Z +└── tenant-1/markers + └── tenant-1/markers/01FSCTA0A4M1YQHZQ4B2VTGS2R-no-compact-mark.json + +4 directories, 3 files +``` diff --git a/docs/internal/tools/markblocks.md b/docs/internal/tools/markblocks.md deleted file mode 100644 index d772558e8f5..00000000000 --- a/docs/internal/tools/markblocks.md +++ /dev/null @@ -1,80 +0,0 @@ -# Mark blocks tool - -`markblocks` is a tool that creates and uploads block markers to a specified backend. -Marks are uploaded to both block folder and the global marks folder for the provided tenant. - -See `markblocks -help` for flags usage, and `markblocks -help-all` for full backend configuration flags list. - -This tool can create two types of marks, depending on the `-mark` flag provided: - -## `deletion` mark - -When `-mark deletion` is provided, this tool uploads a `DeletionMark` which tells the compactor that the block provided should be deleted. -This is a **destructive operation** (although it won't happen immediately, the default delation delay is 12h, see `-compactor.deletion-delay` value), proceed with caution. - -### Example - -``` -$ mkdir -p tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2U - -$ touch tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R/meta.json - -$ tree -f tenant-1 -tenant-1 -├── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R -│ └── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R/meta.json -└── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2U - -2 directories, 1 file - -$ go run ./tools/markblocks -mark deletion -tenant tenant-1 -details "Corrupted blocks" 01FSCTA0A4M1YQHZQ4B2VTGS2R 01FSCTA0A4M1YQHZQ4B2VTGS2U -level=info time=2022-03-30T08:50:41.277334365Z msg="Successfully uploaded mark." block=01FSCTA0A4M1YQHZQ4B2VTGS2R -level=info time=2022-03-30T08:50:41.277359767Z msg="Block does not exist, skipping." block=01FSCTA0A4M1YQHZQ4B2VTGS7Z - -$ tree -f tenant-1 -tenant-1 -├── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R -│ ├── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R/deletion-mark.json -│ └── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R/meta.json -├── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2U -└── tenant-1/markers - └── tenant-1/markers/01FSCTA0A4M1YQHZQ4B2VTGS2R-deletion-mark.json - -3 directories, 3 files -``` - -## `no-compact` mark - -When `-mark no-compact` is provided, this tool uploads a `NoCompactMark` which tells the compactor that the marked blocks should not be compacted. -`-details` flag can be used to provide an explanation of why the block is marked as such (who marked it? issue link?). - -### Example - -``` -$ mkdir -p tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2U - -$ touch tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R/meta.json - -$ tree -f tenant-1 -tenant-1 -├── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R -│ └── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R/meta.json -└── ten ant-1/01FSCTA0A4M1YQHZQ4B2VTGS2U - -2 directories, 1 file - -$ go run ./tools/markblocks -mark no-compact -tenant tenant-1 -details "Blocks with out of order chunks" 01FSCTA0A4M1YQHZQ4B2VTGS2R 01FSCTA0A4M1YQHZQ4B2VTGS2U -level=info time=2022-03-30T08:53:13.012462019Z msg="Successfully uploaded mark." block=01FSCTA0A4M1YQHZQ4B2VTGS2R -level=info time=2022-03-30T08:53:13.012492902Z msg="Block does not exist, skipping." block=01FSCTA0A4M1YQHZQ4B2VTGS7Z - -$ tree -f tenant-1 -tenant-1 -├── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R -│ ├── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R/meta.json -│ └── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2R/no-compact-mark.json -├── tenant-1/01FSCTA0A4M1YQHZQ4B2VTGS2U -└── tenant-1/markers - └── tenant-1/markers/01FSCTA0A4M1YQHZQ4B2VTGS2R-no-compact-mark.json - -3 directories, 3 files -``` diff --git a/docs/sources/mimir/manage/mimir-runbooks/_index.md b/docs/sources/mimir/manage/mimir-runbooks/_index.md index fc166adc02e..62ef3378d4e 100644 --- a/docs/sources/mimir/manage/mimir-runbooks/_index.md +++ b/docs/sources/mimir/manage/mimir-runbooks/_index.md @@ -614,7 +614,7 @@ How to **investigate**: - Find source blocks for the compaction job: search for `msg="compact blocks"` and a mention of the result block ID. - Mark the source blocks for no compaction (in this example the object storage backend is GCS): ``` - ./tools/markblocks/markblocks -backend gcs -gcs.bucket-name -mark no-compact -tenant -details "Leading to out-of-order chunks when compacting with other blocks" ... + ./tools/mark-blocks/mark-blocks -backend gcs -gcs.bucket-name -mark-type no-compact -tenant -details "Leading to out-of-order chunks when compacting with other blocks" -blocks ",..." ``` - Result block exceeds symbol table maximum size: - **How to detect**: Search compactor logs for `symbol table size exceeds`. @@ -629,7 +629,7 @@ How to **investigate**: Where the filenames are the block IDs: `01GZS91PMTAWAWAKRYQVNV1FPP` and `01GZSC5803FN1V1ZFY6Q8PWV1E` - Mark the source blocks for no compaction (in this example the object storage backend is GCS): ``` - ./tools/markblocks/markblocks -backend gcs -gcs.bucket-name -mark no-compact -tenant -details "Result block exceeds symbol table maximum size" ... + ./tools/mark-blocks/mark-blocks -backend gcs -gcs.bucket-name -mark-type no-compact -tenant -details "Result block exceeds symbol table maximum size" -blocks ",..." ``` - Further reading: [Compaction algorithm]({{< relref "../../references/architecture/components/compactor#compaction-algorithm" >}}). - Compactor network disk unresponsive: @@ -668,7 +668,7 @@ How to **fix** it: - The only long-term solution is to give the compactor more disk space, as it requires more space to fit the largest single job into its disk. - If the number of blocks that the compactor is failing to compact is not very significant and you want to skip compacting them and focus on more recent blocks instead, consider marking the affected blocks for no compaction: ``` - ./tools/markblocks/markblocks -backend gcs -gcs.bucket-name -mark no-compact -tenant -details "focus on newer blocks" + ./tools/mark-blocks/mark-blocks -backend gcs -gcs.bucket-name -mark-type no-compact -tenant -details "focus on newer blocks" -blocks ",..." ``` ### MimirCompactorSkippedUnhealthyBlocks diff --git a/tools/mark-blocks/README.md b/tools/mark-blocks/README.md new file mode 100644 index 00000000000..9a325c722e7 --- /dev/null +++ b/tools/mark-blocks/README.md @@ -0,0 +1,62 @@ +# Mark Blocks + +This program creates or removes markers for blocks. + +## Flags +- `--tenant` (required) The tenant that owns the blocks +- `--mark-type` (required) Mark type to create or remove, valid options: `deletion`, `no-compact` +- `--blocks` (optional) Comma separated list of blocks IDs. If non-empty, `--blocks-file` is ignored +- `--blocks-file` (optional) File containing a block ID per-line. Defaults to standard input (`-`). Ignored if `--blocks` is non-empty +- `--meta-presence-policy` (optional) Policy on presence of block `meta.json` files: `none`, `skip-block`, or `require`. Defaults to `skip-block` +- `--remove` (optional) If marks should be removed rather than uploaded. Defaults to `false` +- `--resume-index` (optional) The index of the block to resume from. This index is logged to assist in recovering from partial failures +- `--concurrency` (optional) How many markers to upload or remove concurrently. Defaults to `16` +- `--details` (optional) Details to include in an added mark +- `--dry-run` (optional) Log changes that would be made instead of actually making them + +Each supported object storage service also has an additional set of flags (see examples in [Running](##Running)). + +## Running + +Running `go build .` in this directory builds the program. Then use an example below as a guide. + +### Example for Google Cloud Storage + +```bash +./mark-blocks \ + --tenant + --blocks + --mark-type + --backend gcs \ + --gcs.bucket-name \ + --dry-run +``` + +### Example for Azure Blob Storage + +```bash +./mark-blocks \ + --tenant + --blocks + --mark-type + --backend azure \ + --azure.container-name \ + --azure.account-name \ + --azure.account-key \ + --dry-run +``` + +### Example for Amazon Simple Storage Service + +```bash +./mark-blocks\ + --tenant + --blocks + --mark-type + --backend s3 \ + --s3.bucket-name \ + --s3.access-key-id \ + --s3.secret-access-key \ + --s3.endpoint \ + --dry-run +``` diff --git a/tools/mark-blocks/main.go b/tools/mark-blocks/main.go index 23fe74a8e40..2be9e446abc 100644 --- a/tools/mark-blocks/main.go +++ b/tools/mark-blocks/main.go @@ -46,14 +46,14 @@ type config struct { func (cfg *config) registerFlags(f *flag.FlagSet) { cfg.bucket.RegisterFlags(f) f.StringVar(&cfg.tenantID, "tenant", "", "Tenant ID of the owner of the block. Required.") - f.StringVar(&cfg.markType, "mark-type", "", "Mark type to create, valid options: deletion, no-compact. Required.") - f.StringVar(&cfg.details, "details", "", "Details to include in a mark.") - f.Var(&cfg.blocks, "blocks", "Comma separated list of blocks. If non-empty, blocks-file is ignored.") - f.StringVar(&cfg.blocksFile, "blocks-file", "-", "File containing block IDs to mark. Defaults to standard input. Ignored if blocks is non-empty") - f.IntVar(&cfg.resumeIndex, "resume-index", 0, "The index of the block in blocks-file to resume from") + f.StringVar(&cfg.markType, "mark-type", "", "Mark type to create or remove, valid options: deletion, no-compact. Required.") + f.StringVar(&cfg.details, "details", "", "Details to include in an added mark.") + f.Var(&cfg.blocks, "blocks", "Comma separated list of block IDs. If non-empty, blocks-file is ignored.") + f.StringVar(&cfg.blocksFile, "blocks-file", "-", "File containing a block ID per-line. Defaults to standard input. Ignored if blocks is non-empty") + f.IntVar(&cfg.resumeIndex, "resume-index", 0, "The index of the block to resume from") f.BoolVar(&cfg.remove, "remove", false, "If marks should be removed rather than uploaded.") - f.StringVar(&cfg.metaPresencePolicy, "meta-presence-policy", "require", "How to validate block meta.json files: \"none\", \"skip-block\", or \"require\".") - f.BoolVar(&cfg.dryRun, "dry-run", false, "Don't upload the markers generated, just print the intentions.") + f.StringVar(&cfg.metaPresencePolicy, "meta-presence-policy", "skip-block", "Policy on presence of block meta.json files: \"none\", \"skip-block\", or \"require\".") + f.BoolVar(&cfg.dryRun, "dry-run", false, "Log changes that would be made instead of actually making them.") f.IntVar(&cfg.concurrency, "concurrency", 16, "How many markers to upload or remove concurrently.") } @@ -65,7 +65,7 @@ func (cfg *config) validate() error { return fmt.Errorf("-tenant is invalid: %w", err) } if cfg.markType == "" { - return errors.New("-mark is required") + return errors.New("-mark-type is required") } if cfg.blocksFile == "" && len(cfg.blocks) == 0 { return errors.New("one of -blocks or -blocks-file must be specified") @@ -84,6 +84,7 @@ func main() { cfg.registerFlags(flag.CommandLine) logger := log.NewLogfmtLogger(os.Stdout) + logger = log.With(logger, "time", log.DefaultTimestampUTC) // Parse CLI arguments. if err := flagext.ParseFlagsWithoutArguments(flag.CommandLine); err != nil { @@ -144,9 +145,9 @@ func main() { if successUntil, err := forEachJobSuccessUntil(ctx, len(blocks), cfg.concurrency, f); err != nil { // since indices were possibly based on a subslice, account for that as a starting point // successUntil is known to be the first index that did not succeed - resumeFrom := cfg.resumeIndex + successUntil + resumeIndex := cfg.resumeIndex + successUntil - level.Error(logger).Log("msg", "encountered a failure", "err", err, "resumeFrom", resumeFrom) + level.Error(logger).Log("msg", "encountered a failure", "err", err, "resumeIndex", resumeIndex) os.Exit(1) } } @@ -225,7 +226,7 @@ func addMarksFunc( return err } if skip { - level.Info(logger).Log("msg", fmt.Sprintf("skipping block because its meta.json was not found: %s", block)) + level.Info(logger).Log("msg", fmt.Sprintf("skipping block because its meta.json was not found: %s", blockStr)) return nil } @@ -320,7 +321,7 @@ func metaPresenceFunc(tenantID string, bkt objstore.Bucket, policy string) (meta return false, err } if !exists { - return false, fmt.Errorf("block's metadata did not exist: %q", metaName) + return false, fmt.Errorf("block's metadata did not exist: %s", metaName) } return false, nil }, nil @@ -361,7 +362,7 @@ func readBlocks(r io.Reader) ([]ulid.ULID, error) { line := scanner.Text() u, err := ulid.Parse(line) if err != nil { - return nil, fmt.Errorf("failed to parse a string=%s as a ULID", line) + return nil, fmt.Errorf("failed to parse a ULID from: %q", line) } blocks = append(blocks, u) } From 448b1159d324efd3822bc332f7ac0c9dc029a065 Mon Sep 17 00:00:00 2001 From: Andy Asp Date: Mon, 10 Feb 2025 11:29:01 -0500 Subject: [PATCH 07/11] make doc --- .../mimir/manage/mimir-runbooks/_index.md | 2 +- tools/mark-blocks/README.md | 23 ++++++++++--------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/docs/sources/mimir/manage/mimir-runbooks/_index.md b/docs/sources/mimir/manage/mimir-runbooks/_index.md index 62ef3378d4e..96e887eab59 100644 --- a/docs/sources/mimir/manage/mimir-runbooks/_index.md +++ b/docs/sources/mimir/manage/mimir-runbooks/_index.md @@ -668,7 +668,7 @@ How to **fix** it: - The only long-term solution is to give the compactor more disk space, as it requires more space to fit the largest single job into its disk. - If the number of blocks that the compactor is failing to compact is not very significant and you want to skip compacting them and focus on more recent blocks instead, consider marking the affected blocks for no compaction: ``` - ./tools/mark-blocks/mark-blocks -backend gcs -gcs.bucket-name -mark-type no-compact -tenant -details "focus on newer blocks" -blocks ",..." + ./tools/mark-blocks/mark-blocks -backend gcs -gcs.bucket-name -mark-type no-compact -tenant -details "focus on newer blocks" -blocks ",..." ``` ### MimirCompactorSkippedUnhealthyBlocks diff --git a/tools/mark-blocks/README.md b/tools/mark-blocks/README.md index 9a325c722e7..d0b24246929 100644 --- a/tools/mark-blocks/README.md +++ b/tools/mark-blocks/README.md @@ -3,12 +3,13 @@ This program creates or removes markers for blocks. ## Flags + - `--tenant` (required) The tenant that owns the blocks - `--mark-type` (required) Mark type to create or remove, valid options: `deletion`, `no-compact` - `--blocks` (optional) Comma separated list of blocks IDs. If non-empty, `--blocks-file` is ignored -- `--blocks-file` (optional) File containing a block ID per-line. Defaults to standard input (`-`). Ignored if `--blocks` is non-empty +- `--blocks-file` (optional) File containing a block ID per-line. Defaults to standard input (`-`). Ignored if `--blocks` is non-empty - `--meta-presence-policy` (optional) Policy on presence of block `meta.json` files: `none`, `skip-block`, or `require`. Defaults to `skip-block` -- `--remove` (optional) If marks should be removed rather than uploaded. Defaults to `false` +- `--remove` (optional) If marks should be removed rather than uploaded. Defaults to `false` - `--resume-index` (optional) The index of the block to resume from. This index is logged to assist in recovering from partial failures - `--concurrency` (optional) How many markers to upload or remove concurrently. Defaults to `16` - `--details` (optional) Details to include in an added mark @@ -24,9 +25,9 @@ Running `go build .` in this directory builds the program. Then use an example b ```bash ./mark-blocks \ - --tenant - --blocks - --mark-type + --tenant + --blocks + --mark-type --backend gcs \ --gcs.bucket-name \ --dry-run @@ -36,9 +37,9 @@ Running `go build .` in this directory builds the program. Then use an example b ```bash ./mark-blocks \ - --tenant - --blocks - --mark-type + --tenant + --blocks + --mark-type --backend azure \ --azure.container-name \ --azure.account-name \ @@ -50,9 +51,9 @@ Running `go build .` in this directory builds the program. Then use an example b ```bash ./mark-blocks\ - --tenant - --blocks - --mark-type + --tenant + --blocks + --mark-type --backend s3 \ --s3.bucket-name \ --s3.access-key-id \ From d3b53f41c66c99a83c007f49580606ed8b516839 Mon Sep 17 00:00:00 2001 From: Andy Asp <90626759+andyasp@users.noreply.github.com> Date: Mon, 10 Feb 2025 16:02:21 -0500 Subject: [PATCH 08/11] Update docs/internal/tools/mark-blocks.md Co-authored-by: Taylor C <41653732+tacole02@users.noreply.github.com> --- docs/internal/tools/mark-blocks.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/internal/tools/mark-blocks.md b/docs/internal/tools/mark-blocks.md index 67e9677f028..24c7d3a775b 100644 --- a/docs/internal/tools/mark-blocks.md +++ b/docs/internal/tools/mark-blocks.md @@ -8,7 +8,7 @@ Two types of marks can be created, depending on the `-mark-type` flag provided: ## `deletion` mark When `-mark-type deletion` is provided, `mark-blocks` uploads a `DeletionMark` which tells the compactor that the block provided should be deleted. -This is a **destructive operation** (although it won't happen immediately, the default delation delay is 12h, see `-compactor.deletion-delay` value), proceed with caution. +This is a **destructive operation** (although it doesn't happen immediately, the default delation delay is 12h, see `-compactor.deletion-delay` value), proceed with caution. ### Example From 04627b282ff0a5ecc4b6be10d841ee7d4ba71697 Mon Sep 17 00:00:00 2001 From: Andy Asp <90626759+andyasp@users.noreply.github.com> Date: Mon, 10 Feb 2025 16:02:38 -0500 Subject: [PATCH 09/11] Update tools/mark-blocks/README.md Co-authored-by: Taylor C <41653732+tacole02@users.noreply.github.com> --- tools/mark-blocks/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/mark-blocks/README.md b/tools/mark-blocks/README.md index d0b24246929..2c88a0e17ae 100644 --- a/tools/mark-blocks/README.md +++ b/tools/mark-blocks/README.md @@ -19,7 +19,7 @@ Each supported object storage service also has an additional set of flags (see e ## Running -Running `go build .` in this directory builds the program. Then use an example below as a guide. +Run `go build .` in this directory to build the program. Then, use an example below as a guide. ### Example for Google Cloud Storage From 042ee6bd278be6a01071e82067974d1830ac7058 Mon Sep 17 00:00:00 2001 From: Andy Asp Date: Mon, 10 Feb 2025 16:10:02 -0500 Subject: [PATCH 10/11] Percolate suggestions to other similar READMEs --- tools/copyblocks/README.md | 4 ++-- tools/copyprefix/README.md | 4 ++-- tools/mark-blocks/README.md | 4 ++-- tools/undelete-blocks/README.md | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tools/copyblocks/README.md b/tools/copyblocks/README.md index 8f0fe49a936..afc4529507c 100644 --- a/tools/copyblocks/README.md +++ b/tools/copyblocks/README.md @@ -1,4 +1,4 @@ -# Copyblocks +# `copyblocks` This program can copy Mimir blocks between two buckets. @@ -18,7 +18,7 @@ The currently supported services are Amazon Simple Storage Service (S3 and S3-co ## Running -Running `go build` in this directory builds the program. Then use an example below as a guide. +Run `go build` in this directory to build the program. Then, use an example below as a guide. ### Example for Google Cloud Storage diff --git a/tools/copyprefix/README.md b/tools/copyprefix/README.md index dd2b9c5e9ad..4d98a97e704 100644 --- a/tools/copyprefix/README.md +++ b/tools/copyprefix/README.md @@ -1,4 +1,4 @@ -# Copyprefix +# `copyprefix` This program copies objects by prefix between two buckets. If your infrastructure is largely on a single object storage service their respective CLI tool may be more convenient to use. @@ -15,7 +15,7 @@ The currently supported services are Amazon Simple Storage Service (S3 and S3-co ## Running -Running `go build` in this directory builds the program. Then use an example below as a guide. +Run `go build` in this directory to build the program. Then, use an example below as a guide. ### Example for Google Cloud Storage diff --git a/tools/mark-blocks/README.md b/tools/mark-blocks/README.md index 2c88a0e17ae..b6461407c44 100644 --- a/tools/mark-blocks/README.md +++ b/tools/mark-blocks/README.md @@ -1,4 +1,4 @@ -# Mark Blocks +# `mark-blocks` This program creates or removes markers for blocks. @@ -19,7 +19,7 @@ Each supported object storage service also has an additional set of flags (see e ## Running -Run `go build .` in this directory to build the program. Then, use an example below as a guide. +Run `go build` in this directory to build the program. Then, use an example below as a guide. ### Example for Google Cloud Storage diff --git a/tools/undelete-blocks/README.md b/tools/undelete-blocks/README.md index cbd68c68329..96af00cb3c2 100644 --- a/tools/undelete-blocks/README.md +++ b/tools/undelete-blocks/README.md @@ -1,4 +1,4 @@ -# Undelete Blocks +# `undelete-blocks` This program is a disaster recovery tool that can restore deleted Mimir blocks in object storage. @@ -68,7 +68,7 @@ Files not listed within the `meta.json` of a block are not restored if they were ## Running -Running `go build .` in this directory builds the program. Then use an example below as a guide. +Run `go build` in this directory to build the program. Then, use an example below as a guide. ### Example for Google Cloud Storage From 430d8bc5c491de64cddc12cb420f5a5f2849cf3b Mon Sep 17 00:00:00 2001 From: Andy Asp Date: Mon, 10 Feb 2025 16:19:04 -0500 Subject: [PATCH 11/11] Missed internal docs change --- docs/internal/tools/mark-blocks.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/internal/tools/mark-blocks.md b/docs/internal/tools/mark-blocks.md index 24c7d3a775b..bfbc38071d8 100644 --- a/docs/internal/tools/mark-blocks.md +++ b/docs/internal/tools/mark-blocks.md @@ -1,4 +1,4 @@ -# Mark blocks tool +# mark-blocks `mark-blocks` is a tool that assists in uploading or removing block markers to a specified backend. Marks are uploaded to both the block folder and the global marks folder for the provided tenant.