From b3d763be9c55ace0df0fe05922c116fb74f8ac28 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Thu, 25 Feb 2021 23:18:36 -0500 Subject: [PATCH] add a flag to not check external labels when uploading blocks using bucket rewrite Signed-off-by: yeya24 add changelog Signed-off-by: yeya24 rename param to checkExternalLabels Signed-off-by: yeya24 --- CHANGELOG.md | 1 + cmd/thanos/tools_bucket.go | 11 +++++++++-- docs/components/tools.md | 3 +++ pkg/block/block.go | 26 ++++++++++++++++++++------ pkg/block/block_test.go | 17 +++++++++++++++++ 5 files changed, 50 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 11fa0e033ab..9e01a9c6adf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re - [#3792](https://github.com/thanos-io/thanos/pull/3792) Receiver: Added `--tsdb.allow-overlapping-blocks` flag to allow overlapping tsdb blocks and enable vertical compaction - [#3031](https://github.com/thanos-io/thanos/pull/3031) Compact/Sidecar/other writers: added `--hash-func`. If some function has been specified, writers calculate hashes using that function of each file in a block before uploading them. If those hashes exist in the `meta.json` file then Compact does not download the files if they already exist on disk and with the same hash. This also means that the data directory passed to Thanos Compact is only *cleared once at boot* or *if everything succeeds*. So, if you, for example, use persistent volumes on k8s and your Thanos Compact crashes or fails to make an iteration properly then the last downloaded files are not wiped from the disk. The directories that were created the last time are only wiped again after a successful iteration or if the previously picked up blocks have disappeared. - [#3686](https://github.com/thanos-io/thanos/pull/3686) Query: Added federated metric metadata support. +- [#3840](https://github.com/thanos-io/thanos/pull/3840) Tools: Added a flag to support rewrite Prometheus TSDB blocks. ### Fixed diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 6d612af23a7..a779af46d99 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -793,6 +793,7 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat dryRun := cmd.Flag("dry-run", "Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this.").Default("true").Bool() toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", true) provideChangeLog := cmd.Flag("rewrite.add-change-log", "If specified, all modifications are written to new block directory. Disable if latency is to high.").Default("true").Bool() + promBlocks := cmd.Flag("prom-blocks", "If specified, we assume the blocks to be uploaded are only used with Prometheus so we don't check external labels in this case.").Default("false").Bool() cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error { confContentYaml, err := objStoreConfig.Content() if err != nil { @@ -907,8 +908,14 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat } level.Info(logger).Log("msg", "uploading new block", "source", id, "new", newID) - if err := block.Upload(ctx, logger, bkt, filepath.Join(*tmpDir, newID.String()), metadata.HashFunc(*hashFunc)); err != nil { - return errors.Wrap(err, "upload") + if *promBlocks { + if err := block.UploadPromBlock(ctx, logger, bkt, filepath.Join(*tmpDir, newID.String()), metadata.HashFunc(*hashFunc)); err != nil { + return errors.Wrap(err, "upload") + } + } else { + if err := block.Upload(ctx, logger, bkt, filepath.Join(*tmpDir, newID.String()), metadata.HashFunc(*hashFunc)); err != nil { + return errors.Wrap(err, "upload") + } } level.Info(logger).Log("msg", "uploaded", "source", id, "new", newID) } diff --git a/docs/components/tools.md b/docs/components/tools.md index 87b3a17b81c..59099faf6fe 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -759,6 +759,9 @@ Flags: --rewrite.add-change-log If specified, all modifications are written to new block directory. Disable if latency is to high. + --prom-blocks If specified, we assume the blocks to be + uploaded are only used with Prometheus so we + don't check external labels in this case. ``` diff --git a/pkg/block/block.go b/pkg/block/block.go index 88901682ac6..c77f8792f5a 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -93,12 +93,23 @@ func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id return nil } -// Upload uploads block from given block dir that ends with block id. +// Upload uploads a TSDB block to the object storage. It verifies basic +// features of Thanos block. +func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc) error { + return upload(ctx, logger, bkt, bdir, hf, true) +} + +// UploadPromBlock uploads a TSDB block to the object storage. It assumes +// the block is used in Prometheus so it doesn't check Thanos external labels. +func UploadPromBlock(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc) error { + return upload(ctx, logger, bkt, bdir, hf, false) +} + +// upload uploads block from given block dir that ends with block id. // It makes sure cleanup is done on error to avoid partial block uploads. -// It also verifies basic features of Thanos block. // TODO(bplotka): Ensure bucket operations have reasonable backoff retries. // NOTE: Upload updates `meta.Thanos.File` section. -func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc) error { +func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, checkExternalLabels bool) error { df, err := os.Stat(bdir) if err != nil { return err @@ -119,20 +130,23 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st return errors.Wrap(err, "read meta") } - if meta.Thanos.Labels == nil || len(meta.Thanos.Labels) == 0 { - return errors.New("empty external labels are not allowed for Thanos block.") + if checkExternalLabels { + if meta.Thanos.Labels == nil || len(meta.Thanos.Labels) == 0 { + return errors.New("empty external labels are not allowed for Thanos block.") + } } + metaEncoded := strings.Builder{} meta.Thanos.Files, err = gatherFileStats(bdir, hf, logger) if err != nil { return errors.Wrap(err, "gather meta file stats") } - metaEncoded := strings.Builder{} if err := meta.Write(&metaEncoded); err != nil { return errors.Wrap(err, "encode meta file") } + // TODO(yeya24): Remove this step. if err := bkt.Upload(ctx, path.Join(DebugMetas, fmt.Sprintf("%s.json", id)), strings.NewReader(metaEncoded.String())); err != nil { return cleanUp(logger, bkt, id, errors.Wrap(err, "upload debug meta file")) } diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go index b18453ee0a5..05d7271a852 100644 --- a/pkg/block/block_test.go +++ b/pkg/block/block_test.go @@ -210,6 +210,23 @@ func TestUpload(t *testing.T) { testutil.Equals(t, "empty external labels are not allowed for Thanos block.", err.Error()) testutil.Equals(t, 4, len(bkt.Objects())) } + { + // No external labels with UploadPromBlocks. + b2, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + {{Name: "b", Value: "1"}}, + }, 100, 0, 1000, nil, 124, metadata.NoneFunc) + testutil.Ok(t, err) + err = UploadPromBlock(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b2.String()), metadata.NoneFunc) + testutil.Ok(t, err) + testutil.Equals(t, 8, len(bkt.Objects())) + testutil.Equals(t, 3736, len(bkt.Objects()[path.Join(b2.String(), ChunksDirname, "000001")])) + testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b2.String(), IndexFilename)])) + testutil.Equals(t, 525, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)])) + } } func TestDelete(t *testing.T) {