diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 8425df42386..1df9e001c01 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -369,12 +369,14 @@ func runCompact( compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason), ) blocksCleaner := compact.NewBlocksCleaner(logger, insBkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures) - compactor, err := compact.NewBucketCompactor( + compactor, err := compact.NewBucketCompactorWithCheckerAndCallback( logger, sy, grouper, planner, comp, + compact.DefaultBlockDeletableChecker{}, + compact.NewOverlappingCompactionLifecycleCallback(reg, conf.enableOverlappingRemoval), compactDir, insBkt, conf.compactionConcurrency, @@ -710,6 +712,7 @@ type compactConfig struct { maxBlockIndexSize units.Base2Bytes hashFunc string enableVerticalCompaction bool + enableOverlappingRemoval bool dedupFunc string skipBlockWithOutOfOrderChunks bool progressCalculateInterval time.Duration @@ -786,10 +789,8 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { "NOTE: This flag is ignored and (enabled) when --deduplication.replica-label flag is set."). Hidden().Default("false").BoolVar(&cc.enableVerticalCompaction) - cmd.Flag("deduplication.func", "Experimental. Deduplication algorithm for merging overlapping blocks. "+ - "Possible values are: \"\", \"penalty\". If no value is specified, the default compact deduplication merger is used, which performs 1:1 deduplication for samples. "+ - "When set to penalty, penalty based deduplication algorithm will be used. At least one replica label has to be set via --deduplication.replica-label flag."). - Default("").EnumVar(&cc.dedupFunc, compact.DedupAlgorithmPenalty, "") + cmd.Flag("compact.remove-overlapping", "In house flag to remove overlapping blocks. Turn this on to fix PLAT-104290."). + Default("false").BoolVar(&cc.enableOverlappingRemoval) cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+ "Experimental. When one or more labels are set, compactor will ignore the given labels so that vertical compaction can merge the blocks."+ diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 9ebf6280ce5..52133ec2eac 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -236,7 +236,6 @@ type DefaultGrouper struct { compactionRunsCompleted *prometheus.CounterVec compactionFailures *prometheus.CounterVec verticalCompactions *prometheus.CounterVec - overlappingBlocks *prometheus.CounterVec garbageCollectedBlocks prometheus.Counter blocksMarkedForDeletion prometheus.Counter blocksMarkedForNoCompact prometheus.Counter @@ -284,10 +283,6 @@ func NewDefaultGrouper( Name: "thanos_compact_group_vertical_compactions_total", Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.", }, []string{"resolution"}), - overlappingBlocks: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_compact_group_overlapping_blocks_total", - Help: "Total number of blocks that are overlapping and to be deleted.", - }, []string{"resolution", "tenant"}), blocksMarkedForNoCompact: blocksMarkedForNoCompact, garbageCollectedBlocks: garbageCollectedBlocks, blocksMarkedForDeletion: blocksMarkedForDeletion, @@ -308,7 +303,6 @@ func NewDefaultGrouperWithMetrics( compactionRunsCompleted *prometheus.CounterVec, compactionFailures *prometheus.CounterVec, verticalCompactions *prometheus.CounterVec, - overrlappingBlocks *prometheus.CounterVec, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter, @@ -326,7 +320,6 @@ func NewDefaultGrouperWithMetrics( compactionRunsCompleted: compactionRunsCompleted, compactionFailures: compactionFailures, verticalCompactions: verticalCompactions, - overlappingBlocks: overrlappingBlocks, blocksMarkedForNoCompact: blocksMarkedForNoCompact, garbageCollectedBlocks: garbageCollectedBlocks, blocksMarkedForDeletion: blocksMarkedForDeletion, @@ -359,7 +352,6 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro g.compactionRunsCompleted.WithLabelValues(resolutionLabel), g.compactionFailures.WithLabelValues(resolutionLabel), g.verticalCompactions.WithLabelValues(resolutionLabel), - g.overlappingBlocks.WithLabelValues(resolutionLabel, m.Thanos.GetLabels()), g.garbageCollectedBlocks, g.blocksMarkedForDeletion, g.blocksMarkedForNoCompact, @@ -400,7 +392,6 @@ type Group struct { compactionRunsCompleted prometheus.Counter compactionFailures prometheus.Counter verticalCompactions prometheus.Counter - overlappingBlocks prometheus.Counter groupGarbageCollectedBlocks prometheus.Counter blocksMarkedForDeletion prometheus.Counter blocksMarkedForNoCompact prometheus.Counter @@ -424,7 +415,6 @@ func NewGroup( compactionRunsCompleted prometheus.Counter, compactionFailures prometheus.Counter, verticalCompactions prometheus.Counter, - overlappingBlocks prometheus.Counter, groupGarbageCollectedBlocks prometheus.Counter, blocksMarkedForDeletion prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter, @@ -453,7 +443,6 @@ func NewGroup( compactionRunsCompleted: compactionRunsCompleted, compactionFailures: compactionFailures, verticalCompactions: verticalCompactions, - overlappingBlocks: overlappingBlocks, groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, blocksMarkedForDeletion: blocksMarkedForDeletion, blocksMarkedForNoCompact: blocksMarkedForNoCompact, diff --git a/pkg/compact/overlapping.go b/pkg/compact/overlapping.go index 987510f2cdb..adc363ba63e 100644 --- a/pkg/compact/overlapping.go +++ b/pkg/compact/overlapping.go @@ -11,6 +11,8 @@ import ( "github.com/go-kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block" @@ -18,15 +20,24 @@ import ( ) type OverlappingCompactionLifecycleCallback struct { + overlappingBlocks prometheus.Counter } -func NewOverlappingCompactionLifecycleCallback() *OverlappingCompactionLifecycleCallback { - return &OverlappingCompactionLifecycleCallback{} +func NewOverlappingCompactionLifecycleCallback(reg *prometheus.Registry, enabled bool) CompactionLifecycleCallback { + if enabled { + return OverlappingCompactionLifecycleCallback{ + overlappingBlocks: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_compact_group_overlapping_blocks_total", + Help: "Total number of blocks that are overlapping and to be deleted.", + }), + } + } + return DefaultCompactionLifecycleCallback{} } // PreCompactionCallback given the assumption that toCompact is sorted by MinTime in ascending order from Planner // (not guaranteed on MaxTime order), we will detect overlapping blocks and delete them while retaining all others. -func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx context.Context, logger log.Logger, cg *Group, toCompact []*metadata.Meta) error { +func (o OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx context.Context, logger log.Logger, cg *Group, toCompact []*metadata.Meta) error { if len(toCompact) == 0 { return nil } @@ -34,11 +45,11 @@ func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx conte for curr, currB := range toCompact { prevB := toCompact[prev] if curr == 0 || currB.Thanos.Source == metadata.ReceiveSource || prevB.MaxTime <= currB.MinTime { - // no overlapping with previous blocks, skip it + // no overlapping with previous blocks, skip it prev = curr continue } else if currB.MinTime < prevB.MinTime { - // halt when the assumption is broken, need manual investigation + // halt when the assumption is broken, the input toCompact isn't sorted by minTime, need manual investigation return halt(errors.Errorf("later blocks has smaller minTime than previous block: %s -- %s", prevB.String(), currB.String())) } else if prevB.MaxTime < currB.MaxTime && prevB.MinTime != currB.MinTime { err := errors.Errorf("found partially overlapping block: %s -- %s", prevB.String(), currB.String()) @@ -50,7 +61,14 @@ func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx conte return halt(err) } } else if prevB.MinTime == currB.MinTime && prevB.MaxTime == currB.MaxTime { - continue + if prevB.Stats.NumSeries != currB.Stats.NumSeries || prevB.Stats.NumSamples != currB.Stats.NumSamples { + level.Warn(logger).Log("msg", "found same time range but different stats, keep both blocks", + "prev", prevB.String(), "prevSeries", prevB.Stats.NumSeries, "prevSamples", prevB.Stats.NumSamples, + "curr", currB.String(), "currSeries", currB.Stats.NumSeries, "currSamples", currB.Stats.NumSamples, + ) + prev = curr + continue + } } // prev min <= curr min < prev max toDelete := -1 @@ -64,7 +82,7 @@ func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx conte level.Warn(logger).Log("msg", "found overlapping block in plan, keep current block", "toKeep", currB.String(), "toDelete", prevB.String()) } - cg.overlappingBlocks.Inc() + o.overlappingBlocks.Inc() if err := DeleteBlockNow(ctx, logger, cg.bkt, toCompact[toDelete]); err != nil { return retry(err) } @@ -73,11 +91,11 @@ func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx conte return nil } -func (c *OverlappingCompactionLifecycleCallback) PostCompactionCallback(_ context.Context, _ log.Logger, _ *Group, _ ulid.ULID) error { +func (o OverlappingCompactionLifecycleCallback) PostCompactionCallback(_ context.Context, _ log.Logger, _ *Group, _ ulid.ULID) error { return nil } -func (c *OverlappingCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, _ log.Logger, _ *Group) (tsdb.BlockPopulator, error) { +func (o OverlappingCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, _ log.Logger, _ *Group) (tsdb.BlockPopulator, error) { return tsdb.DefaultBlockPopulator{}, nil } diff --git a/pkg/compact/overlapping_test.go b/pkg/compact/overlapping_test.go index 320fc490f73..4da40a05fd4 100644 --- a/pkg/compact/overlapping_test.go +++ b/pkg/compact/overlapping_test.go @@ -6,13 +6,11 @@ package compact import ( "context" "testing" - "time" "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" @@ -24,10 +22,10 @@ func TestFilterNilCompact(t *testing.T) { testutil.Equals(t, 0, len(filtered)) meta := []*metadata.Meta{ - createBlockMeta(6, 1, int64(time.Now().Add(-6*30*24*time.Hour).Unix()*1000), map[string]string{"a": "1"}, downsample.ResLevel0, []uint64{}), + createCustomBlockMeta(6, 1, 3, metadata.CompactorSource, 1), nil, - createBlockMeta(7, 1, int64(time.Now().Add(-4*30*24*time.Hour).Unix()*1000), map[string]string{"b": "2"}, downsample.ResLevel1, []uint64{}), - createBlockMeta(8, 1, int64(time.Now().Add(-7*30*24*time.Hour).Unix()*1000), map[string]string{"a": "1", "b": "2"}, downsample.ResLevel2, []uint64{}), + createCustomBlockMeta(7, 3, 5, metadata.CompactorSource, 2), + createCustomBlockMeta(8, 5, 10, metadata.CompactorSource, 3), nil, } testutil.Equals(t, 3, len(FilterRemovedBlocks(meta))) @@ -37,14 +35,11 @@ func TestPreCompactionCallback(t *testing.T) { reg := prometheus.NewRegistry() logger := log.NewNopLogger() bkt := objstore.NewInMemBucket() - temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for overlapping blocks"}) group := &Group{ - logger: log.NewNopLogger(), - bkt: bkt, - overlappingBlocks: temp, + logger: log.NewNopLogger(), + bkt: bkt, } - labels := map[string]string{"a": "1"} - callback := NewOverlappingCompactionLifecycleCallback() + callback := NewOverlappingCompactionLifecycleCallback(reg, true) for _, tcase := range []struct { testName string input []*metadata.Meta @@ -59,92 +54,108 @@ func TestPreCompactionCallback(t *testing.T) { { testName: "no overlapping blocks", input: []*metadata.Meta{ - createBlockMeta(6, 1, 3, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(7, 3, 5, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(8, 5, 10, labels, downsample.ResLevel0, []uint64{}), + createCustomBlockMeta(6, 1, 3, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 3, 5, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 5, 10, metadata.CompactorSource, 1), }, expectedSize: 3, }, { testName: "duplicated blocks", input: []*metadata.Meta{ - createBlockMeta(6, 1, 7, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(7, 1, 7, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(8, 1, 7, labels, downsample.ResLevel0, []uint64{}), + createCustomBlockMeta(6, 1, 7, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 1, 7, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 1, 7, metadata.CompactorSource, 1), + }, + expectedSize: 1, + expectedBlocks: []*metadata.Meta{ + createCustomBlockMeta(6, 1, 7, metadata.CompactorSource, 1), + }, + }, + { + testName: "overlap non dup blocks", + input: []*metadata.Meta{ + createCustomBlockMeta(6, 1, 7, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 1, 7, metadata.CompactorSource, 2), + createCustomBlockMeta(8, 1, 7, metadata.CompactorSource, 2), + }, + expectedSize: 2, + expectedBlocks: []*metadata.Meta{ + createCustomBlockMeta(6, 1, 7, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 1, 7, metadata.CompactorSource, 2), }, - expectedSize: 3, }, { testName: "receive blocks", input: []*metadata.Meta{ - createReceiveBlockMeta(6, 1, 7, labels), - createReceiveBlockMeta(7, 1, 7, labels), - createReceiveBlockMeta(8, 1, 7, labels), + createCustomBlockMeta(6, 1, 7, metadata.ReceiveSource, 1), + createCustomBlockMeta(7, 1, 7, metadata.ReceiveSource, 2), + createCustomBlockMeta(8, 1, 7, metadata.ReceiveSource, 3), }, expectedSize: 3, }, { testName: "receive + compactor blocks", input: []*metadata.Meta{ - createReceiveBlockMeta(6, 1, 7, labels), - createBlockMeta(7, 2, 7, labels, downsample.ResLevel0, []uint64{}), - createReceiveBlockMeta(8, 2, 8, labels), + createCustomBlockMeta(6, 1, 7, metadata.ReceiveSource, 1), + createCustomBlockMeta(7, 2, 7, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 2, 8, metadata.ReceiveSource, 1), }, expectedSize: 2, expectedBlocks: []*metadata.Meta{ - createReceiveBlockMeta(6, 1, 7, labels), - createReceiveBlockMeta(8, 2, 8, labels), + createCustomBlockMeta(6, 1, 7, metadata.ReceiveSource, 1), + createCustomBlockMeta(8, 2, 8, metadata.ReceiveSource, 1), }, }, { testName: "full overlapping blocks", input: []*metadata.Meta{ - createBlockMeta(6, 1, 10, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(7, 3, 6, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}), + createCustomBlockMeta(6, 1, 10, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 3, 6, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1), }, expectedSize: 1, expectedBlocks: []*metadata.Meta{ - createBlockMeta(6, 1, 10, labels, downsample.ResLevel0, []uint64{}), + createCustomBlockMeta(6, 1, 10, metadata.CompactorSource, 1), }, }, { testName: "part overlapping blocks", input: []*metadata.Meta{ - createBlockMeta(1, 1, 2, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(2, 1, 6, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(3, 6, 8, labels, downsample.ResLevel0, []uint64{}), + createCustomBlockMeta(1, 1, 2, metadata.CompactorSource, 1), + createCustomBlockMeta(2, 1, 6, metadata.CompactorSource, 1), + createCustomBlockMeta(3, 6, 8, metadata.CompactorSource, 1), }, expectedSize: 2, expectedBlocks: []*metadata.Meta{ - createBlockMeta(2, 1, 6, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(3, 6, 8, labels, downsample.ResLevel0, []uint64{}), + createCustomBlockMeta(2, 1, 6, metadata.CompactorSource, 1), + createCustomBlockMeta(3, 6, 8, metadata.CompactorSource, 1), }, }, { testName: "out of order blocks", input: []*metadata.Meta{ - createBlockMeta(6, 2, 3, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(7, 0, 5, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}), + createCustomBlockMeta(6, 2, 3, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 0, 5, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1), }, err: halt(errors.Errorf("expect halt error")), }, { testName: "partially overlapping blocks with vertical compaction off", input: []*metadata.Meta{ - createBlockMeta(6, 2, 4, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(7, 3, 5, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}), + createCustomBlockMeta(6, 2, 4, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 3, 5, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1), }, err: halt(errors.Errorf("expect halt error")), }, { testName: "partially overlapping blocks with vertical compaction on", input: []*metadata.Meta{ - createBlockMeta(6, 2, 4, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(7, 3, 6, labels, downsample.ResLevel0, []uint64{}), - createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}), + createCustomBlockMeta(6, 2, 4, metadata.CompactorSource, 1), + createCustomBlockMeta(7, 3, 6, metadata.CompactorSource, 1), + createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1), }, enableVerticalCompaction: true, expectedSize: 3, @@ -172,8 +183,10 @@ func TestPreCompactionCallback(t *testing.T) { } } -func createReceiveBlockMeta(id uint64, minTime, maxTime int64, labels map[string]string) *metadata.Meta { +func createCustomBlockMeta(id uint64, minTime, maxTime int64, source metadata.SourceType, numSeries uint64) *metadata.Meta { + labels := map[string]string{"a": "1"} m := createBlockMeta(id, minTime, maxTime, labels, downsample.ResLevel0, []uint64{}) - m.Thanos.Source = metadata.ReceiveSource + m.Thanos.Source = source + m.Stats.NumSeries = numSeries return m }