Skip to content

Commit

Permalink
add a flag to not check external labels when uploading blocks using b…
Browse files Browse the repository at this point in the history
…ucket rewrite

Signed-off-by: yeya24 <[email protected]>

add changelog

Signed-off-by: yeya24 <[email protected]>

rename param to checkExternalLabels

Signed-off-by: yeya24 <[email protected]>
  • Loading branch information
yeya24 committed Mar 10, 2021
1 parent ebba2b8 commit b3d763b
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
- [#3792](https://github.com/thanos-io/thanos/pull/3792) Receiver: Added `--tsdb.allow-overlapping-blocks` flag to allow overlapping tsdb blocks and enable vertical compaction
- [#3031](https://github.com/thanos-io/thanos/pull/3031) Compact/Sidecar/other writers: added `--hash-func`. If some function has been specified, writers calculate hashes using that function of each file in a block before uploading them. If those hashes exist in the `meta.json` file then Compact does not download the files if they already exist on disk and with the same hash. This also means that the data directory passed to Thanos Compact is only *cleared once at boot* or *if everything succeeds*. So, if you, for example, use persistent volumes on k8s and your Thanos Compact crashes or fails to make an iteration properly then the last downloaded files are not wiped from the disk. The directories that were created the last time are only wiped again after a successful iteration or if the previously picked up blocks have disappeared.
- [#3686](https://github.com/thanos-io/thanos/pull/3686) Query: Added federated metric metadata support.
- [#3840](https://github.com/thanos-io/thanos/pull/3840) Tools: Added a flag to support rewrite Prometheus TSDB blocks.

### Fixed

Expand Down
11 changes: 9 additions & 2 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,7 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
dryRun := cmd.Flag("dry-run", "Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this.").Default("true").Bool()
toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", true)
provideChangeLog := cmd.Flag("rewrite.add-change-log", "If specified, all modifications are written to new block directory. Disable if latency is to high.").Default("true").Bool()
promBlocks := cmd.Flag("prom-blocks", "If specified, we assume the blocks to be uploaded are only used with Prometheus so we don't check external labels in this case.").Default("false").Bool()
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
Expand Down Expand Up @@ -907,8 +908,14 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
}

level.Info(logger).Log("msg", "uploading new block", "source", id, "new", newID)
if err := block.Upload(ctx, logger, bkt, filepath.Join(*tmpDir, newID.String()), metadata.HashFunc(*hashFunc)); err != nil {
return errors.Wrap(err, "upload")
if *promBlocks {
if err := block.UploadPromBlock(ctx, logger, bkt, filepath.Join(*tmpDir, newID.String()), metadata.HashFunc(*hashFunc)); err != nil {
return errors.Wrap(err, "upload")
}
} else {
if err := block.Upload(ctx, logger, bkt, filepath.Join(*tmpDir, newID.String()), metadata.HashFunc(*hashFunc)); err != nil {
return errors.Wrap(err, "upload")
}
}
level.Info(logger).Log("msg", "uploaded", "source", id, "new", newID)
}
Expand Down
3 changes: 3 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,9 @@ Flags:
--rewrite.add-change-log If specified, all modifications are written to
new block directory. Disable if latency is to
high.
--prom-blocks If specified, we assume the blocks to be
uploaded are only used with Prometheus so we
don't check external labels in this case.
```

Expand Down
26 changes: 20 additions & 6 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,23 @@ func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id
return nil
}

// Upload uploads block from given block dir that ends with block id.
// Upload uploads a TSDB block to the object storage. It verifies basic
// features of Thanos block.
func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc) error {
return upload(ctx, logger, bkt, bdir, hf, true)
}

// UploadPromBlock uploads a TSDB block to the object storage. It assumes
// the block is used in Prometheus so it doesn't check Thanos external labels.
func UploadPromBlock(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc) error {
return upload(ctx, logger, bkt, bdir, hf, false)
}

// upload uploads block from given block dir that ends with block id.
// It makes sure cleanup is done on error to avoid partial block uploads.
// It also verifies basic features of Thanos block.
// TODO(bplotka): Ensure bucket operations have reasonable backoff retries.
// NOTE: Upload updates `meta.Thanos.File` section.
func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc) error {
func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, checkExternalLabels bool) error {
df, err := os.Stat(bdir)
if err != nil {
return err
Expand All @@ -119,20 +130,23 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
return errors.Wrap(err, "read meta")
}

if meta.Thanos.Labels == nil || len(meta.Thanos.Labels) == 0 {
return errors.New("empty external labels are not allowed for Thanos block.")
if checkExternalLabels {
if meta.Thanos.Labels == nil || len(meta.Thanos.Labels) == 0 {
return errors.New("empty external labels are not allowed for Thanos block.")
}
}

metaEncoded := strings.Builder{}
meta.Thanos.Files, err = gatherFileStats(bdir, hf, logger)
if err != nil {
return errors.Wrap(err, "gather meta file stats")
}

metaEncoded := strings.Builder{}
if err := meta.Write(&metaEncoded); err != nil {
return errors.Wrap(err, "encode meta file")
}

// TODO(yeya24): Remove this step.
if err := bkt.Upload(ctx, path.Join(DebugMetas, fmt.Sprintf("%s.json", id)), strings.NewReader(metaEncoded.String())); err != nil {
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload debug meta file"))
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,23 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, "empty external labels are not allowed for Thanos block.", err.Error())
testutil.Equals(t, 4, len(bkt.Objects()))
}
{
// No external labels with UploadPromBlocks.
b2, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "b", Value: "1"}},
}, 100, 0, 1000, nil, 124, metadata.NoneFunc)
testutil.Ok(t, err)
err = UploadPromBlock(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b2.String()), metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Equals(t, 8, len(bkt.Objects()))
testutil.Equals(t, 3736, len(bkt.Objects()[path.Join(b2.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b2.String(), IndexFilename)]))
testutil.Equals(t, 525, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)]))
}
}

func TestDelete(t *testing.T) {
Expand Down

0 comments on commit b3d763b

Please sign in to comment.