Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for downsampling concurrency #4430

Merged
merged 2 commits into from
Jul 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4392](https://github.com/thanos-io/thanos/pull/4392) Tools: Added `--delete-blocks` to bucket rewrite tool to mark the original blocks for deletion after rewriting is done.
- [#3970](https://github.com/thanos-io/thanos/pull/3970) Azure: Adds more configuration options for Azure blob storage. This allows for pipeline and reader specific configuration. Implements HTTP transport configuration options. These options allows for more fine-grained control on timeouts and retries. Implements MSI authentication as second method of authentication via a service principal token.
- [#4406](https://github.com/thanos-io/thanos/pull/4406) Tools: Add retention command for applying retention policy on the bucket.
- [#4430](https://github.com/thanos-io/thanos/pull/4430) Compact: Add flag `downsample.concurrency` to specify the concurrency of downsampling blocks.

### Fixed

Expand Down
7 changes: 5 additions & 2 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,15 +427,15 @@ func runCompact(
downsampleMetrics.downsamples.WithLabelValues(groupKey)
downsampleMetrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, metadata.HashFunc(conf.hashFunc)); err != nil {
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, metadata.HashFunc(conf.hashFunc)); err != nil {
return errors.Wrap(err, "first pass of downsampling failed")
}

level.Info(logger).Log("msg", "start second pass of downsampling")
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, metadata.HashFunc(conf.hashFunc)); err != nil {
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, metadata.HashFunc(conf.hashFunc)); err != nil {
return errors.Wrap(err, "second pass of downsampling failed")
}
level.Info(logger).Log("msg", "downsampling iterations done")
Expand Down Expand Up @@ -575,6 +575,7 @@ type compactConfig struct {
blockViewerSyncBlockInterval time.Duration
cleanupBlocksInterval time.Duration
compactionConcurrency int
downsampleConcurrency int
deleteDelay model.Duration
dedupReplicaLabels []string
selectorRelabelConf extflag.PathOrContent
Expand Down Expand Up @@ -634,6 +635,8 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups.").
Default("1").IntVar(&cc.compactionConcurrency)
cmd.Flag("downsample.concurrency", "Number of goroutines to use when downsampling blocks.").
Default("1").IntVar(&cc.downsampleConcurrency)

cmd.Flag("delete-delay", "Time before a block marked for deletion is deleted from bucket. "+
"If delete-delay is non zero, blocks will be marked for deletion and compactor component will delete blocks marked for deletion from the bucket. "+
Expand Down
122 changes: 79 additions & 43 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"golang.org/x/sync/errgroup"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
Expand Down Expand Up @@ -61,6 +63,7 @@ func RunDownsample(
httpTLSConfig string,
httpGracePeriod time.Duration,
dataDir string,
downsampleConcurrency int,
objStoreConfig *extflag.PathOrContent,
comp component.Component,
hashFunc metadata.HashFunc,
Expand Down Expand Up @@ -115,7 +118,7 @@ func RunDownsample(
metrics.downsamples.WithLabelValues(groupKey)
metrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, hashFunc); err != nil {
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc); err != nil {
return errors.Wrap(err, "downsampling failed")
}

Expand All @@ -124,7 +127,7 @@ func RunDownsample(
if err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, hashFunc); err != nil {
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc); err != nil {
return errors.Wrap(err, "downsampling failed")
}

Expand Down Expand Up @@ -162,6 +165,7 @@ func downsampleBucket(
bkt objstore.Bucket,
metas map[ulid.ULID]*metadata.Meta,
dir string,
downsampleConcurrency int,
hashFunc metadata.HashFunc,
) (rerr error) {
if err := os.MkdirAll(dir, 0750); err != nil {
Expand Down Expand Up @@ -218,57 +222,89 @@ func downsampleBucket(
return metasULIDS[i].Compare(metasULIDS[j]) < 0
})

for _, mk := range metasULIDS {
m := metas[mk]
var (
eg errgroup.Group
ch = make(chan *metadata.Meta, downsampleConcurrency)
)

switch m.Thanos.Downsample.Resolution {
case downsample.ResLevel0:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources5m[id]; !ok {
missing = true
break
level.Debug(logger).Log("msg", "downsampling bucket", "concurrency", downsampleConcurrency)
for i := 0; i < downsampleConcurrency; i++ {
eg.Go(func() error {
for m := range ch {
resolution := downsample.ResLevel1
errMsg := "downsampling to 5 min"
if m.Thanos.Downsample.Resolution == downsample.ResLevel1 {
resolution = downsample.ResLevel2
errMsg = "downsampling to 60 min"
}
if err := processDownsampling(ctx, logger, bkt, m, dir, resolution, hashFunc); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
return errors.Wrap(err, errMsg)
}
metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
}
if !missing {
continue
}
// Only downsample blocks once we are sure to get roughly 2 chunks out of it.
// NOTE(fabxc): this must match with at which block size the compactor creates downsampled
// blocks. Otherwise we may never downsample some data.
if m.MaxTime-m.MinTime < downsample.DownsampleRange0 {
return nil
})
}

// Workers scheduled, distribute blocks.
eg.Go(func() error {
defer close(ch)
for _, mk := range metasULIDS {
m := metas[mk]

switch m.Thanos.Downsample.Resolution {
case downsample.ResLevel2:
continue
}

if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel1, hashFunc); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
return errors.Wrap(err, "downsampling to 5 min")
}
metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
case downsample.ResLevel0:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources5m[id]; !ok {
missing = true
break
}
}
if !missing {
continue
}
// Only downsample blocks once we are sure to get roughly 2 chunks out of it.
// NOTE(fabxc): this must match with at which block size the compactor creates downsampled
// blocks. Otherwise we may never downsample some data.
if m.MaxTime-m.MinTime < downsample.DownsampleRange0 {
continue
}

case downsample.ResLevel1:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources1h[id]; !ok {
missing = true
break
case downsample.ResLevel1:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources1h[id]; !ok {
missing = true
break
}
}
if !missing {
continue
}
// Only downsample blocks once we are sure to get roughly 2 chunks out of it.
// NOTE(fabxc): this must match with at which block size the compactor creates downsampled
// blocks. Otherwise we may never downsample some data.
if m.MaxTime-m.MinTime < downsample.DownsampleRange1 {
continue
}
}
if !missing {
continue
}
// Only downsample blocks once we are sure to get roughly 2 chunks out of it.
// NOTE(fabxc): this must match with at which block size the compactor creates downsampled
// blocks. Otherwise we may never downsample some data.
if m.MaxTime-m.MinTime < downsample.DownsampleRange1 {
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel2, hashFunc); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
return errors.Wrap(err, "downsampling to 60 min")

select {
case <-ctx.Done():
return ctx.Err()
case ch <- m:
}
metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
}
return nil
})

if err := eg.Wait(); err != nil {
return errors.Wrap(err, "downsample bucket")
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metas, _, err := metaFetcher.Fetch(ctx)
testutil.Ok(t, err)
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir, metadata.NoneFunc))
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir, 1, metadata.NoneFunc))
testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(meta.Thanos))))

_, err = os.Stat(dir)
Expand Down
4 changes: 3 additions & 1 deletion cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,15 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P
func registerBucketDownsample(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
cmd := app.Command(component.Downsample.String(), "Continuously downsamples blocks in an object store bucket.")
httpAddr, httpGracePeriod, httpTLSConfig := extkingpin.RegisterHTTPFlags(cmd)
downsampleConcurrency := cmd.Flag("downsample.concurrency", "Number of goroutines to use when downsampling blocks.").
Default("1").Int()
dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings.").
Default("./data").String()
hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").Enum("SHA256", "")

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
return RunDownsample(g, logger, reg, *httpAddr, *httpTLSConfig, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, component.Downsample, metadata.HashFunc(*hashFunc))
return RunDownsample(g, logger, reg, *httpAddr, *httpTLSConfig, time.Duration(*httpGracePeriod), *dataDir, *downsampleConcurrency, objStoreConfig, component.Downsample, metadata.HashFunc(*hashFunc))
})
}

Expand Down
3 changes: 3 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,9 @@ Flags:
loaded, or compactor is ignoring the deletion
because it's compacting the block at the same
time.
--downsample.concurrency=1
Number of goroutines to use when downsampling
blocks.
--downsampling.disable Disables downsampling. This is not recommended
as querying long time ranges without
non-downsampled data is not efficient and useful
Expand Down
3 changes: 3 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,9 @@ Continuously downsamples blocks in an object store bucket.
Flags:
--data-dir="./data" Data directory in which to cache blocks and
process downsamplings.
--downsample.concurrency=1
Number of goroutines to use when downsampling
blocks.
--hash-func= Specify which hash function to use when
calculating the hashes of produced files. If no
function has been specified, it does not happen.
Expand Down