diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 2da673d03b..97f90d6f96 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -361,7 +361,7 @@ func processDownsampling( downsampleDuration := time.Since(begin) level.Info(logger).Log("msg", "downsampled block", "from", m.ULID, "to", id, "duration", downsampleDuration, "duration_ms", downsampleDuration.Milliseconds()) - metrics.downsampleDuration.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Observe(downsampleDuration.Seconds()) + metrics.downsampleDuration.WithLabelValues(m.Thanos.GroupKey()).Observe(downsampleDuration.Seconds()) if err := block.VerifyIndex(logger, filepath.Join(resdir, block.IndexFilename), m.MinTime, m.MaxTime); err != nil { return errors.Wrap(err, "output block index not valid") diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index ab7a172c6a..02ee7d69e6 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -623,7 +623,7 @@ func registerBucketWeb(app extkingpin.AppClause, objStoreConfig *extflag.PathOrC []block.MetadataFilter{ block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime), block.NewLabelShardedMetaFilter(relabelConfig), - block.NewDeduplicateFilter(), + block.NewDeduplicateFilter(block.FetcherConcurrency), }, nil) if err != nil { return err diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 80c38e071d..38ce4e88c0 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -23,7 +23,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" - "github.com/prometheus/prometheus/tsdb" "golang.org/x/sync/errgroup" "gopkg.in/yaml.v2" @@ -673,37 +672,6 @@ func (f *DeduplicateFilter) DuplicateIDs() []ulid.ULID { return f.duplicateIDs } -<<<<<<< HEAD -func addNodeBySources(root, add *Node) bool { - var rootNode *Node - childSources := add.Compaction.Sources - for _, node := range root.Children { - parentSources := node.Compaction.Sources - - // Block exists with same sources, add as child. - if contains(parentSources, childSources) && contains(childSources, parentSources) { - node.Children = append(node.Children, add) - return true - } - - // Block's sources are present in other block's sources, add as child. - if contains(parentSources, childSources) { - rootNode = node - break - } - } - - // Block cannot be attached to any child nodes, add it as child of root. - if rootNode == nil { - root.Children = append(root.Children, add) - return true - } - - return addNodeBySources(rootNode, add) -} - -======= ->>>>>>> Move group key function to the metadata package. Deduplicate within compaction groups and in parallel. func contains(s1, s2 []ulid.ULID) bool { for _, a := range s2 { found := false diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index 9eb0a32be3..1b9ce35d76 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -228,7 +228,7 @@ func TestRetentionProgressCalculate(t *testing.T) { m[2].Thanos.Labels = map[string]string{"a": "1", "b": "2"} m[2].Thanos.Downsample.Resolution = downsample.ResLevel2 for ind, meta := range m { - keys[ind] = DefaultGroupKey(meta.Thanos) + keys[ind] = meta.Thanos.GroupKey() } ps := NewRetentionProgressCalculator(reg, nil) @@ -369,7 +369,7 @@ func TestCompactProgressCalculate(t *testing.T) { m[2].Thanos.Labels = map[string]string{"a": "1", "b": "2"} m[2].Thanos.Downsample.Resolution = 1 for ind, meta := range m { - keys[ind] = DefaultGroupKey(meta.Thanos) + keys[ind] = meta.Thanos.GroupKey() } ps := NewCompactionProgressCalculator(reg, planner) @@ -491,7 +491,7 @@ func TestDownsampleProgressCalculate(t *testing.T) { m[2].Thanos.Labels = map[string]string{"a": "1", "b": "2"} m[2].Thanos.Downsample.Resolution = downsample.ResLevel2 for ind, meta := range m { - keys[ind] = DefaultGroupKey(meta.Thanos) + keys[ind] = meta.Thanos.GroupKey() } ds := NewDownsampleProgressCalculator(reg)