Skip to content

Commit

Permalink
[PLAT-102919] add a flag to control overlapping block removal behavior
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Jin <[email protected]>
  • Loading branch information
jnyi committed Mar 22, 2024
1 parent f86995a commit cc0ecfd
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 71 deletions.
11 changes: 6 additions & 5 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,12 +369,14 @@ func runCompact(
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason),
)
blocksCleaner := compact.NewBlocksCleaner(logger, insBkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(
compactor, err := compact.NewBucketCompactorWithCheckerAndCallback(
logger,
sy,
grouper,
planner,
comp,
compact.DefaultBlockDeletableChecker{},
compact.NewOverlappingCompactionLifecycleCallback(reg, conf.enableOverlappingRemoval),
compactDir,
insBkt,
conf.compactionConcurrency,
Expand Down Expand Up @@ -710,6 +712,7 @@ type compactConfig struct {
maxBlockIndexSize units.Base2Bytes
hashFunc string
enableVerticalCompaction bool
enableOverlappingRemoval bool
dedupFunc string
skipBlockWithOutOfOrderChunks bool
progressCalculateInterval time.Duration
Expand Down Expand Up @@ -786,10 +789,8 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"NOTE: This flag is ignored and (enabled) when --deduplication.replica-label flag is set.").
Hidden().Default("false").BoolVar(&cc.enableVerticalCompaction)

cmd.Flag("deduplication.func", "Experimental. Deduplication algorithm for merging overlapping blocks. "+
"Possible values are: \"\", \"penalty\". If no value is specified, the default compact deduplication merger is used, which performs 1:1 deduplication for samples. "+
"When set to penalty, penalty based deduplication algorithm will be used. At least one replica label has to be set via --deduplication.replica-label flag.").
Default("").EnumVar(&cc.dedupFunc, compact.DedupAlgorithmPenalty, "")
cmd.Flag("compact.remove-overlapping", "In house flag to remove overlapping blocks. Turn this on to fix PLAT-104290.").
Default("false").BoolVar(&cc.enableOverlappingRemoval)

cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+
"Experimental. When one or more labels are set, compactor will ignore the given labels so that vertical compaction can merge the blocks."+
Expand Down
11 changes: 0 additions & 11 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ type DefaultGrouper struct {
compactionRunsCompleted *prometheus.CounterVec
compactionFailures *prometheus.CounterVec
verticalCompactions *prometheus.CounterVec
overlappingBlocks *prometheus.CounterVec
garbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
Expand Down Expand Up @@ -284,10 +283,6 @@ func NewDefaultGrouper(
Name: "thanos_compact_group_vertical_compactions_total",
Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.",
}, []string{"resolution"}),
overlappingBlocks: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_compact_group_overlapping_blocks_total",
Help: "Total number of blocks that are overlapping and to be deleted.",
}, []string{"resolution", "tenant"}),
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
garbageCollectedBlocks: garbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
Expand All @@ -308,7 +303,6 @@ func NewDefaultGrouperWithMetrics(
compactionRunsCompleted *prometheus.CounterVec,
compactionFailures *prometheus.CounterVec,
verticalCompactions *prometheus.CounterVec,
overrlappingBlocks *prometheus.CounterVec,
blocksMarkedForDeletion prometheus.Counter,
garbageCollectedBlocks prometheus.Counter,
blocksMarkedForNoCompact prometheus.Counter,
Expand All @@ -326,7 +320,6 @@ func NewDefaultGrouperWithMetrics(
compactionRunsCompleted: compactionRunsCompleted,
compactionFailures: compactionFailures,
verticalCompactions: verticalCompactions,
overlappingBlocks: overrlappingBlocks,
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
garbageCollectedBlocks: garbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
Expand Down Expand Up @@ -359,7 +352,6 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
g.compactionRunsCompleted.WithLabelValues(resolutionLabel),
g.compactionFailures.WithLabelValues(resolutionLabel),
g.verticalCompactions.WithLabelValues(resolutionLabel),
g.overlappingBlocks.WithLabelValues(resolutionLabel, m.Thanos.GetLabels()),
g.garbageCollectedBlocks,
g.blocksMarkedForDeletion,
g.blocksMarkedForNoCompact,
Expand Down Expand Up @@ -400,7 +392,6 @@ type Group struct {
compactionRunsCompleted prometheus.Counter
compactionFailures prometheus.Counter
verticalCompactions prometheus.Counter
overlappingBlocks prometheus.Counter
groupGarbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
Expand All @@ -424,7 +415,6 @@ func NewGroup(
compactionRunsCompleted prometheus.Counter,
compactionFailures prometheus.Counter,
verticalCompactions prometheus.Counter,
overlappingBlocks prometheus.Counter,
groupGarbageCollectedBlocks prometheus.Counter,
blocksMarkedForDeletion prometheus.Counter,
blocksMarkedForNoCompact prometheus.Counter,
Expand Down Expand Up @@ -453,7 +443,6 @@ func NewGroup(
compactionRunsCompleted: compactionRunsCompleted,
compactionFailures: compactionFailures,
verticalCompactions: verticalCompactions,
overlappingBlocks: overlappingBlocks,
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
Expand Down
36 changes: 27 additions & 9 deletions pkg/compact/overlapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,45 @@ import (
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
)

type OverlappingCompactionLifecycleCallback struct {
overlappingBlocks prometheus.Counter
}

func NewOverlappingCompactionLifecycleCallback() *OverlappingCompactionLifecycleCallback {
return &OverlappingCompactionLifecycleCallback{}
func NewOverlappingCompactionLifecycleCallback(reg *prometheus.Registry, enabled bool) CompactionLifecycleCallback {
if enabled {
return OverlappingCompactionLifecycleCallback{
overlappingBlocks: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_group_overlapping_blocks_total",
Help: "Total number of blocks that are overlapping and to be deleted.",
}),
}
}
return DefaultCompactionLifecycleCallback{}
}

// PreCompactionCallback given the assumption that toCompact is sorted by MinTime in ascending order from Planner
// (not guaranteed on MaxTime order), we will detect overlapping blocks and delete them while retaining all others.
func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx context.Context, logger log.Logger, cg *Group, toCompact []*metadata.Meta) error {
func (o OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx context.Context, logger log.Logger, cg *Group, toCompact []*metadata.Meta) error {
if len(toCompact) == 0 {
return nil
}
prev := 0
for curr, currB := range toCompact {
prevB := toCompact[prev]
if curr == 0 || currB.Thanos.Source == metadata.ReceiveSource || prevB.MaxTime <= currB.MinTime {
// no overlapping with previous blocks, skip it
// no overlapping with previous blocks, skip it
prev = curr
continue
} else if currB.MinTime < prevB.MinTime {
// halt when the assumption is broken, need manual investigation
// halt when the assumption is broken, the input toCompact isn't sorted by minTime, need manual investigation
return halt(errors.Errorf("later blocks has smaller minTime than previous block: %s -- %s", prevB.String(), currB.String()))
} else if prevB.MaxTime < currB.MaxTime && prevB.MinTime != currB.MinTime {
err := errors.Errorf("found partially overlapping block: %s -- %s", prevB.String(), currB.String())
Expand All @@ -50,7 +61,14 @@ func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx conte
return halt(err)
}
} else if prevB.MinTime == currB.MinTime && prevB.MaxTime == currB.MaxTime {
continue
if prevB.Stats.NumSeries != currB.Stats.NumSeries || prevB.Stats.NumSamples != currB.Stats.NumSamples {
level.Warn(logger).Log("msg", "found same time range but different stats, keep both blocks",
"prev", prevB.String(), "prevSeries", prevB.Stats.NumSeries, "prevSamples", prevB.Stats.NumSamples,
"curr", currB.String(), "currSeries", currB.Stats.NumSeries, "currSamples", currB.Stats.NumSamples,
)
prev = curr
continue
}
}
// prev min <= curr min < prev max
toDelete := -1
Expand All @@ -64,7 +82,7 @@ func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx conte
level.Warn(logger).Log("msg", "found overlapping block in plan, keep current block",
"toKeep", currB.String(), "toDelete", prevB.String())
}
cg.overlappingBlocks.Inc()
o.overlappingBlocks.Inc()
if err := DeleteBlockNow(ctx, logger, cg.bkt, toCompact[toDelete]); err != nil {
return retry(err)
}
Expand All @@ -73,11 +91,11 @@ func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx conte
return nil
}

func (c *OverlappingCompactionLifecycleCallback) PostCompactionCallback(_ context.Context, _ log.Logger, _ *Group, _ ulid.ULID) error {
func (o OverlappingCompactionLifecycleCallback) PostCompactionCallback(_ context.Context, _ log.Logger, _ *Group, _ ulid.ULID) error {
return nil
}

func (c *OverlappingCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, _ log.Logger, _ *Group) (tsdb.BlockPopulator, error) {
func (o OverlappingCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, _ log.Logger, _ *Group) (tsdb.BlockPopulator, error) {
return tsdb.DefaultBlockPopulator{}, nil
}

Expand Down
105 changes: 59 additions & 46 deletions pkg/compact/overlapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@ package compact
import (
"context"
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
Expand All @@ -24,10 +22,10 @@ func TestFilterNilCompact(t *testing.T) {
testutil.Equals(t, 0, len(filtered))

meta := []*metadata.Meta{
createBlockMeta(6, 1, int64(time.Now().Add(-6*30*24*time.Hour).Unix()*1000), map[string]string{"a": "1"}, downsample.ResLevel0, []uint64{}),
createCustomBlockMeta(6, 1, 3, metadata.CompactorSource, 1),
nil,
createBlockMeta(7, 1, int64(time.Now().Add(-4*30*24*time.Hour).Unix()*1000), map[string]string{"b": "2"}, downsample.ResLevel1, []uint64{}),
createBlockMeta(8, 1, int64(time.Now().Add(-7*30*24*time.Hour).Unix()*1000), map[string]string{"a": "1", "b": "2"}, downsample.ResLevel2, []uint64{}),
createCustomBlockMeta(7, 3, 5, metadata.CompactorSource, 2),
createCustomBlockMeta(8, 5, 10, metadata.CompactorSource, 3),
nil,
}
testutil.Equals(t, 3, len(FilterRemovedBlocks(meta)))
Expand All @@ -37,14 +35,11 @@ func TestPreCompactionCallback(t *testing.T) {
reg := prometheus.NewRegistry()
logger := log.NewNopLogger()
bkt := objstore.NewInMemBucket()
temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for overlapping blocks"})
group := &Group{
logger: log.NewNopLogger(),
bkt: bkt,
overlappingBlocks: temp,
logger: log.NewNopLogger(),
bkt: bkt,
}
labels := map[string]string{"a": "1"}
callback := NewOverlappingCompactionLifecycleCallback()
callback := NewOverlappingCompactionLifecycleCallback(reg, true)
for _, tcase := range []struct {
testName string
input []*metadata.Meta
Expand All @@ -59,92 +54,108 @@ func TestPreCompactionCallback(t *testing.T) {
{
testName: "no overlapping blocks",
input: []*metadata.Meta{
createBlockMeta(6, 1, 3, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(7, 3, 5, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(8, 5, 10, labels, downsample.ResLevel0, []uint64{}),
createCustomBlockMeta(6, 1, 3, metadata.CompactorSource, 1),
createCustomBlockMeta(7, 3, 5, metadata.CompactorSource, 1),
createCustomBlockMeta(8, 5, 10, metadata.CompactorSource, 1),
},
expectedSize: 3,
},
{
testName: "duplicated blocks",
input: []*metadata.Meta{
createBlockMeta(6, 1, 7, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(7, 1, 7, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(8, 1, 7, labels, downsample.ResLevel0, []uint64{}),
createCustomBlockMeta(6, 1, 7, metadata.CompactorSource, 1),
createCustomBlockMeta(7, 1, 7, metadata.CompactorSource, 1),
createCustomBlockMeta(8, 1, 7, metadata.CompactorSource, 1),
},
expectedSize: 1,
expectedBlocks: []*metadata.Meta{
createCustomBlockMeta(6, 1, 7, metadata.CompactorSource, 1),
},
},
{
testName: "overlap non dup blocks",
input: []*metadata.Meta{
createCustomBlockMeta(6, 1, 7, metadata.CompactorSource, 1),
createCustomBlockMeta(7, 1, 7, metadata.CompactorSource, 2),
createCustomBlockMeta(8, 1, 7, metadata.CompactorSource, 2),
},
expectedSize: 2,
expectedBlocks: []*metadata.Meta{
createCustomBlockMeta(6, 1, 7, metadata.CompactorSource, 1),
createCustomBlockMeta(7, 1, 7, metadata.CompactorSource, 2),
},
expectedSize: 3,
},
{
testName: "receive blocks",
input: []*metadata.Meta{
createReceiveBlockMeta(6, 1, 7, labels),
createReceiveBlockMeta(7, 1, 7, labels),
createReceiveBlockMeta(8, 1, 7, labels),
createCustomBlockMeta(6, 1, 7, metadata.ReceiveSource, 1),
createCustomBlockMeta(7, 1, 7, metadata.ReceiveSource, 2),
createCustomBlockMeta(8, 1, 7, metadata.ReceiveSource, 3),
},
expectedSize: 3,
},
{
testName: "receive + compactor blocks",
input: []*metadata.Meta{
createReceiveBlockMeta(6, 1, 7, labels),
createBlockMeta(7, 2, 7, labels, downsample.ResLevel0, []uint64{}),
createReceiveBlockMeta(8, 2, 8, labels),
createCustomBlockMeta(6, 1, 7, metadata.ReceiveSource, 1),
createCustomBlockMeta(7, 2, 7, metadata.CompactorSource, 1),
createCustomBlockMeta(8, 2, 8, metadata.ReceiveSource, 1),
},
expectedSize: 2,
expectedBlocks: []*metadata.Meta{
createReceiveBlockMeta(6, 1, 7, labels),
createReceiveBlockMeta(8, 2, 8, labels),
createCustomBlockMeta(6, 1, 7, metadata.ReceiveSource, 1),
createCustomBlockMeta(8, 2, 8, metadata.ReceiveSource, 1),
},
},
{
testName: "full overlapping blocks",
input: []*metadata.Meta{
createBlockMeta(6, 1, 10, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(7, 3, 6, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}),
createCustomBlockMeta(6, 1, 10, metadata.CompactorSource, 1),
createCustomBlockMeta(7, 3, 6, metadata.CompactorSource, 1),
createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1),
},
expectedSize: 1,
expectedBlocks: []*metadata.Meta{
createBlockMeta(6, 1, 10, labels, downsample.ResLevel0, []uint64{}),
createCustomBlockMeta(6, 1, 10, metadata.CompactorSource, 1),
},
},
{
testName: "part overlapping blocks",
input: []*metadata.Meta{
createBlockMeta(1, 1, 2, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(2, 1, 6, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(3, 6, 8, labels, downsample.ResLevel0, []uint64{}),
createCustomBlockMeta(1, 1, 2, metadata.CompactorSource, 1),
createCustomBlockMeta(2, 1, 6, metadata.CompactorSource, 1),
createCustomBlockMeta(3, 6, 8, metadata.CompactorSource, 1),
},
expectedSize: 2,
expectedBlocks: []*metadata.Meta{
createBlockMeta(2, 1, 6, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(3, 6, 8, labels, downsample.ResLevel0, []uint64{}),
createCustomBlockMeta(2, 1, 6, metadata.CompactorSource, 1),
createCustomBlockMeta(3, 6, 8, metadata.CompactorSource, 1),
},
},
{
testName: "out of order blocks",
input: []*metadata.Meta{
createBlockMeta(6, 2, 3, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(7, 0, 5, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}),
createCustomBlockMeta(6, 2, 3, metadata.CompactorSource, 1),
createCustomBlockMeta(7, 0, 5, metadata.CompactorSource, 1),
createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1),
},
err: halt(errors.Errorf("expect halt error")),
},
{
testName: "partially overlapping blocks with vertical compaction off",
input: []*metadata.Meta{
createBlockMeta(6, 2, 4, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(7, 3, 5, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}),
createCustomBlockMeta(6, 2, 4, metadata.CompactorSource, 1),
createCustomBlockMeta(7, 3, 5, metadata.CompactorSource, 1),
createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1),
},
err: halt(errors.Errorf("expect halt error")),
},
{
testName: "partially overlapping blocks with vertical compaction on",
input: []*metadata.Meta{
createBlockMeta(6, 2, 4, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(7, 3, 6, labels, downsample.ResLevel0, []uint64{}),
createBlockMeta(8, 5, 8, labels, downsample.ResLevel0, []uint64{}),
createCustomBlockMeta(6, 2, 4, metadata.CompactorSource, 1),
createCustomBlockMeta(7, 3, 6, metadata.CompactorSource, 1),
createCustomBlockMeta(8, 5, 8, metadata.CompactorSource, 1),
},
enableVerticalCompaction: true,
expectedSize: 3,
Expand Down Expand Up @@ -172,8 +183,10 @@ func TestPreCompactionCallback(t *testing.T) {
}
}

func createReceiveBlockMeta(id uint64, minTime, maxTime int64, labels map[string]string) *metadata.Meta {
func createCustomBlockMeta(id uint64, minTime, maxTime int64, source metadata.SourceType, numSeries uint64) *metadata.Meta {
labels := map[string]string{"a": "1"}
m := createBlockMeta(id, minTime, maxTime, labels, downsample.ResLevel0, []uint64{})
m.Thanos.Source = metadata.ReceiveSource
m.Thanos.Source = source
m.Stats.NumSeries = numSeries
return m
}

0 comments on commit cc0ecfd

Please sign in to comment.