diff --git a/CHANGELOG.md b/CHANGELOG.md index 972e5322e1..7cae76be55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#4392](https://github.com/thanos-io/thanos/pull/4392) Tools: Added `--delete-blocks` to bucket rewrite tool to mark the original blocks for deletion after rewriting is done. - [#3970](https://github.com/thanos-io/thanos/pull/3970) Azure: Adds more configuration options for Azure blob storage. This allows for pipeline and reader specific configuration. Implements HTTP transport configuration options. These options allows for more fine-grained control on timeouts and retries. Implements MSI authentication as second method of authentication via a service principal token. - [#4406](https://github.com/thanos-io/thanos/pull/4406) Tools: Add retention command for applying retention policy on the bucket. +- [#4430](https://github.com/thanos-io/thanos/pull/4430) Compact: Add flag `downsample.concurrency` to specify the concurrency of downsampling blocks. ### Fixed diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index c374297ee4..af3fe78129 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -427,7 +427,7 @@ func runCompact( downsampleMetrics.downsamples.WithLabelValues(groupKey) downsampleMetrics.downsampleFailures.WithLabelValues(groupKey) } - if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, metadata.HashFunc(conf.hashFunc)); err != nil { + if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, metadata.HashFunc(conf.hashFunc)); err != nil { return errors.Wrap(err, "first pass of downsampling failed") } @@ -435,7 +435,7 @@ func runCompact( if err := sy.SyncMetas(ctx); err != nil { return errors.Wrap(err, "sync before second pass of downsampling") } - if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, metadata.HashFunc(conf.hashFunc)); err != nil { + if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, metadata.HashFunc(conf.hashFunc)); err != nil { return errors.Wrap(err, "second pass of downsampling failed") } level.Info(logger).Log("msg", "downsampling iterations done") @@ -575,6 +575,7 @@ type compactConfig struct { blockViewerSyncBlockInterval time.Duration cleanupBlocksInterval time.Duration compactionConcurrency int + downsampleConcurrency int deleteDelay model.Duration dedupReplicaLabels []string selectorRelabelConf extflag.PathOrContent @@ -634,6 +635,8 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups."). Default("1").IntVar(&cc.compactionConcurrency) + cmd.Flag("downsample.concurrency", "Number of goroutines to use when downsampling blocks."). + Default("1").IntVar(&cc.downsampleConcurrency) cmd.Flag("delete-delay", "Time before a block marked for deletion is deleted from bucket. "+ "If delete-delay is non zero, blocks will be marked for deletion and compactor component will delete blocks marked for deletion from the bucket. "+ diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 15bbe23fcb..dd68826192 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -20,6 +20,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" + "golang.org/x/sync/errgroup" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" @@ -61,6 +63,7 @@ func RunDownsample( httpTLSConfig string, httpGracePeriod time.Duration, dataDir string, + downsampleConcurrency int, objStoreConfig *extflag.PathOrContent, comp component.Component, hashFunc metadata.HashFunc, @@ -115,7 +118,7 @@ func RunDownsample( metrics.downsamples.WithLabelValues(groupKey) metrics.downsampleFailures.WithLabelValues(groupKey) } - if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, hashFunc); err != nil { + if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc); err != nil { return errors.Wrap(err, "downsampling failed") } @@ -124,7 +127,7 @@ func RunDownsample( if err != nil { return errors.Wrap(err, "sync before second pass of downsampling") } - if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, hashFunc); err != nil { + if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc); err != nil { return errors.Wrap(err, "downsampling failed") } @@ -162,6 +165,7 @@ func downsampleBucket( bkt objstore.Bucket, metas map[ulid.ULID]*metadata.Meta, dir string, + downsampleConcurrency int, hashFunc metadata.HashFunc, ) (rerr error) { if err := os.MkdirAll(dir, 0750); err != nil { @@ -218,57 +222,89 @@ func downsampleBucket( return metasULIDS[i].Compare(metasULIDS[j]) < 0 }) - for _, mk := range metasULIDS { - m := metas[mk] + var ( + eg errgroup.Group + ch = make(chan *metadata.Meta, downsampleConcurrency) + ) - switch m.Thanos.Downsample.Resolution { - case downsample.ResLevel0: - missing := false - for _, id := range m.Compaction.Sources { - if _, ok := sources5m[id]; !ok { - missing = true - break + level.Debug(logger).Log("msg", "downsampling bucket", "concurrency", downsampleConcurrency) + for i := 0; i < downsampleConcurrency; i++ { + eg.Go(func() error { + for m := range ch { + resolution := downsample.ResLevel1 + errMsg := "downsampling to 5 min" + if m.Thanos.Downsample.Resolution == downsample.ResLevel1 { + resolution = downsample.ResLevel2 + errMsg = "downsampling to 60 min" } + if err := processDownsampling(ctx, logger, bkt, m, dir, resolution, hashFunc); err != nil { + metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc() + return errors.Wrap(err, errMsg) + } + metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc() } - if !missing { - continue - } - // Only downsample blocks once we are sure to get roughly 2 chunks out of it. - // NOTE(fabxc): this must match with at which block size the compactor creates downsampled - // blocks. Otherwise we may never downsample some data. - if m.MaxTime-m.MinTime < downsample.DownsampleRange0 { + return nil + }) + } + + // Workers scheduled, distribute blocks. + eg.Go(func() error { + defer close(ch) + for _, mk := range metasULIDS { + m := metas[mk] + + switch m.Thanos.Downsample.Resolution { + case downsample.ResLevel2: continue - } - if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel1, hashFunc); err != nil { - metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc() - return errors.Wrap(err, "downsampling to 5 min") - } - metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc() + case downsample.ResLevel0: + missing := false + for _, id := range m.Compaction.Sources { + if _, ok := sources5m[id]; !ok { + missing = true + break + } + } + if !missing { + continue + } + // Only downsample blocks once we are sure to get roughly 2 chunks out of it. + // NOTE(fabxc): this must match with at which block size the compactor creates downsampled + // blocks. Otherwise we may never downsample some data. + if m.MaxTime-m.MinTime < downsample.DownsampleRange0 { + continue + } - case downsample.ResLevel1: - missing := false - for _, id := range m.Compaction.Sources { - if _, ok := sources1h[id]; !ok { - missing = true - break + case downsample.ResLevel1: + missing := false + for _, id := range m.Compaction.Sources { + if _, ok := sources1h[id]; !ok { + missing = true + break + } + } + if !missing { + continue + } + // Only downsample blocks once we are sure to get roughly 2 chunks out of it. + // NOTE(fabxc): this must match with at which block size the compactor creates downsampled + // blocks. Otherwise we may never downsample some data. + if m.MaxTime-m.MinTime < downsample.DownsampleRange1 { + continue } } - if !missing { - continue - } - // Only downsample blocks once we are sure to get roughly 2 chunks out of it. - // NOTE(fabxc): this must match with at which block size the compactor creates downsampled - // blocks. Otherwise we may never downsample some data. - if m.MaxTime-m.MinTime < downsample.DownsampleRange1 { - continue - } - if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel2, hashFunc); err != nil { - metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc() - return errors.Wrap(err, "downsampling to 60 min") + + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- m: } - metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc() } + return nil + }) + + if err := eg.Wait(); err != nil { + return errors.Wrap(err, "downsample bucket") } return nil } diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go index 02928bb434..59041d76cd 100644 --- a/cmd/thanos/main_test.go +++ b/cmd/thanos/main_test.go @@ -58,7 +58,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) { metas, _, err := metaFetcher.Fetch(ctx) testutil.Ok(t, err) - testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir, metadata.NoneFunc)) + testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir, 1, metadata.NoneFunc)) testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(meta.Thanos)))) _, err = os.Stat(dir) diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index bc34a6a80a..24b50e068e 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -522,13 +522,15 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P func registerBucketDownsample(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) { cmd := app.Command(component.Downsample.String(), "Continuously downsamples blocks in an object store bucket.") httpAddr, httpGracePeriod, httpTLSConfig := extkingpin.RegisterHTTPFlags(cmd) + downsampleConcurrency := cmd.Flag("downsample.concurrency", "Number of goroutines to use when downsampling blocks."). + Default("1").Int() dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings."). Default("./data").String() hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\"."). Default("").Enum("SHA256", "") cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { - return RunDownsample(g, logger, reg, *httpAddr, *httpTLSConfig, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, component.Downsample, metadata.HashFunc(*hashFunc)) + return RunDownsample(g, logger, reg, *httpAddr, *httpTLSConfig, time.Duration(*httpGracePeriod), *dataDir, *downsampleConcurrency, objStoreConfig, component.Downsample, metadata.HashFunc(*hashFunc)) }) } diff --git a/docs/components/compact.md b/docs/components/compact.md index ed0a4b76a6..176ea4950c 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -342,6 +342,9 @@ Flags: loaded, or compactor is ignoring the deletion because it's compacting the block at the same time. + --downsample.concurrency=1 + Number of goroutines to use when downsampling + blocks. --downsampling.disable Disables downsampling. This is not recommended as querying long time ranges without non-downsampled data is not efficient and useful diff --git a/docs/components/tools.md b/docs/components/tools.md index a6dc7925ee..934501eba1 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -582,6 +582,9 @@ Continuously downsamples blocks in an object store bucket. Flags: --data-dir="./data" Data directory in which to cache blocks and process downsamplings. + --downsample.concurrency=1 + Number of goroutines to use when downsampling + blocks. --hash-func= Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen.