diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index a56562b8639..9d8f20a243a 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -25,6 +25,7 @@ import ( "github.com/thanos-io/objstore" "golang.org/x/sync/errgroup" + "github.com/thanos-io/thanos/internal/cortex/util/multierror" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" @@ -729,7 +730,7 @@ func (rs *RetentionProgressCalculator) ProgressCalculate(ctx context.Context, gr type Planner interface { // Plan returns a list of blocks that should be compacted into single one. // The blocks can be overlapping. The provided metadata has to be ordered by minTime. - Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]compactionTask, error) + Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]CompactionTask, error) } // Compactor provides compaction against an underlying storage of time series data. @@ -753,7 +754,7 @@ type Compactor interface { // Compact plans and runs a single compaction against the group. The compacted result // is uploaded into the bucket the blocks were retrieved from. -func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, compID ulid.ULID, rerr error) { +func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, rerr error) { cg.compactionRunsStarted.Inc() subDir := filepath.Join(dir, cg.Key()) @@ -770,19 +771,19 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp }() if err := os.MkdirAll(subDir, 0750); err != nil { - return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") + return false, errors.Wrap(err, "create compaction group dir") } err := tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) (err error) { - shouldRerun, compID, err = cg.compact(ctx, subDir, planner, comp) + shouldRerun, err = cg.compact(ctx, subDir, planner, comp) return err }, opentracing.Tags{"group.key": cg.Key()}) if err != nil { cg.compactionFailures.Inc() - return false, ulid.ULID{}, err + return false, err } cg.compactionRunsCompleted.Inc() - return shouldRerun, compID, nil + return shouldRerun, nil } // Issue347Error is a type wrapper for errors that should invoke repair process for broken block. @@ -976,7 +977,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return nil } -func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, compID ulid.ULID, _ error) { +func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, _ error) { cg.mtx.Lock() defer cg.mtx.Unlock() @@ -986,86 +987,114 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp // TODO(bwplotka): It would really nice if we could still check for other overlaps than replica. In fact this should be checked // in syncer itself. Otherwise with vertical compaction enabled we will sacrifice this important check. if !cg.enableVerticalCompaction { - return false, ulid.ULID{}, halt(errors.Wrap(err, "pre compaction overlap check")) + return false, halt(errors.Wrap(err, "pre compaction overlap check")) } overlappingBlocks = true } - var tasks []compactionTask + var tasks []CompactionTask if err := tracing.DoInSpanWithErr(ctx, "compaction_planning", func(ctx context.Context) (e error) { tasks, e = planner.Plan(ctx, cg.metasByMinTime) return e }); err != nil { - return false, ulid.ULID{}, errors.Wrap(err, "plan compaction") + return false, errors.Wrap(err, "plan compaction") } if len(tasks) == 0 { // Nothing to do. - return false, ulid.ULID{}, nil + return false, nil } - level.Info(cg.logger).Log("msg", "compaction available and planned; downloading blocks", "plan", fmt.Sprintf("%v", tasks)) - // Due to #183 we verify that none of the blocks in the plan have overlapping sources. // This is one potential source of how we could end up with duplicated chunks. uniqueSources := map[ulid.ULID]struct{}{} - - // Once we have a plan we need to download the actual data. - groupCompactionBegin := time.Now() - begin := groupCompactionBegin - g, errCtx := errgroup.WithContext(ctx) - g.SetLimit(cg.compactBlocksFetchConcurrency) - - toCompactDirs := make([]string, 0, len(tasks)) for _, task := range tasks { for _, m := range task { - bdir := filepath.Join(dir, m.ULID.String()) for _, s := range m.Compaction.Sources { if _, ok := uniqueSources[s]; ok { - return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", tasks)) + return false, halt(errors.Errorf("overlapping sources detected for plan %v", task)) } uniqueSources[s] = struct{}{} } - func(ctx context.Context, meta *metadata.Meta) { - g.Go(func() error { - if err := tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error { - return block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency)) - }, opentracing.Tags{"block.id": meta.ULID}); err != nil { - return retry(errors.Wrapf(err, "download block %s", meta.ULID)) - } + } + } - // Ensure all input blocks are valid. - var stats block.HealthStats - if err := tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) (e error) { - stats, e = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) - return e - }, opentracing.Tags{"block.id": meta.ULID}); err != nil { - return errors.Wrapf(err, "gather index issues for block %s", bdir) - } + level.Info(cg.logger).Log("msg", "compaction available and planned; downloading blocks", "plan", fmt.Sprintf("%v", tasks)) - if err := stats.CriticalErr(); err != nil { - return halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels)) - } + var ( + wg sync.WaitGroup + mu sync.Mutex + groupErr multierror.MultiError + rerunGroup bool + ) + for _, task := range tasks { + wg.Add(1) + go func(task CompactionTask) { + defer wg.Done() + rerunTask, _, err := cg.compactBlocks(ctx, dir, task, comp, overlappingBlocks) - if err := stats.OutOfOrderChunksErr(); err != nil { - return outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID) - } + mu.Lock() + defer mu.Unlock() + rerunGroup = rerunGroup || rerunTask + groupErr.Add(err) + }(task) + } + wg.Wait() - if err := stats.Issue347OutsideChunksErr(); err != nil { - return issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID) - } + return rerunGroup, groupErr.Err() +} - if err := stats.OutOfOrderLabelsErr(); !cg.acceptMalformedIndex && err != nil { - return errors.Wrapf(err, - "block id %s, try running with --debug.accept-malformed-index", meta.ULID) - } - return nil - }) - }(errCtx, m) +func (cg *Group) compactBlocks(ctx context.Context, dir string, task CompactionTask, comp Compactor, overlappingBlocks bool) (bool, ulid.ULID, error) { + // Once we have a plan we need to download the actual data. + compactionBegin := time.Now() + begin := compactionBegin - toCompactDirs = append(toCompactDirs, bdir) - } + g, errCtx := errgroup.WithContext(ctx) + g.SetLimit(cg.compactBlocksFetchConcurrency) + + toCompactDirs := make([]string, 0, len(task)) + for _, m := range task { + bdir := filepath.Join(dir, m.ULID.String()) + func(ctx context.Context, meta *metadata.Meta) { + g.Go(func() error { + if err := tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error { + return block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency)) + }, opentracing.Tags{"block.id": meta.ULID}); err != nil { + return retry(errors.Wrapf(err, "download block %s", meta.ULID)) + } + + // Ensure all input blocks are valid. + var stats block.HealthStats + if err := tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) (e error) { + stats, e = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) + return e + }, opentracing.Tags{"block.id": meta.ULID}); err != nil { + return errors.Wrapf(err, "gather index issues for block %s", bdir) + } + + if err := stats.CriticalErr(); err != nil { + return halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels)) + } + + if err := stats.OutOfOrderChunksErr(); err != nil { + return outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID) + } + + if err := stats.Issue347OutsideChunksErr(); err != nil { + return issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID) + } + + if err := stats.OutOfOrderLabelsErr(); !cg.acceptMalformedIndex && err != nil { + return errors.Wrapf(err, + "block id %s, try running with --debug.accept-malformed-index", meta.ULID) + } + return nil + }) + }(errCtx, m) + + toCompactDirs = append(toCompactDirs, bdir) } + sourceBlockStr := fmt.Sprintf("%v", toCompactDirs) if err := g.Wait(); err != nil { @@ -1074,6 +1103,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp level.Info(cg.logger).Log("msg", "downloaded and verified blocks; compacting blocks", "plan", sourceBlockStr, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) + var compID ulid.ULID begin = time.Now() if err := tracing.DoInSpanWithErr(ctx, "compaction", func(ctx context.Context) (e error) { compID, e = comp.Compact(dir, toCompactDirs, nil) @@ -1084,12 +1114,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp if compID == (ulid.ULID{}) { // Prometheus compactor found that the compacted block would have no samples. level.Info(cg.logger).Log("msg", "compacted block would have no samples, deleting source blocks", "blocks", sourceBlockStr) - for _, task := range tasks { - for _, meta := range task { - if meta.Stats.NumSamples == 0 { - if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil { - level.Warn(cg.logger).Log("msg", "failed to mark for deletion an empty block found during compaction", "block", meta.ULID) - } + for _, meta := range task { + if meta.Stats.NumSamples == 0 { + if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil { + level.Warn(cg.logger).Log("msg", "failed to mark for deletion an empty block found during compaction", "block", meta.ULID) } } } @@ -1131,10 +1159,8 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp // Ensure the output block is not overlapping with anything else, // unless vertical compaction is enabled. if !cg.enableVerticalCompaction { - for _, task := range tasks { - if err := cg.areBlocksOverlapping(newMeta, task...); err != nil { - return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir)) - } + if err := cg.areBlocksOverlapping(newMeta, task...); err != nil { + return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir)) } } @@ -1151,20 +1177,18 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp // Mark for deletion the blocks we just compacted from the group and bucket so they do not get included // into the next planning cycle. // Eventually the block we just uploaded should get synced into the group again (including sync-delay). - for _, task := range tasks { - for _, meta := range task { - err = tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error { - return cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())) - }, opentracing.Tags{"block.id": meta.ULID}) - if err != nil { - return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket")) - } - cg.groupGarbageCollectedBlocks.Inc() + for _, meta := range task { + err = tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error { + return cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())) + }, opentracing.Tags{"block.id": meta.ULID}) + if err != nil { + return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket")) } + cg.groupGarbageCollectedBlocks.Inc() } level.Info(cg.logger).Log("msg", "finished compacting blocks", "result_block", compID, "source_blocks", sourceBlockStr, - "duration", time.Since(groupCompactionBegin), "duration_ms", time.Since(groupCompactionBegin).Milliseconds()) + "duration", time.Since(compactionBegin), "duration_ms", time.Since(compactionBegin).Milliseconds()) return true, compID, nil } @@ -1257,7 +1281,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { go func() { defer wg.Done() for g := range groupChan { - shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.planner, c.comp) + shouldRerunGroup, err := g.Compact(workCtx, c.compactDir, c.planner, c.comp) if err == nil { if shouldRerunGroup { mtx.Lock() diff --git a/pkg/compact/planner.go b/pkg/compact/planner.go index 6d9c66a5af0..1188d75f96f 100644 --- a/pkg/compact/planner.go +++ b/pkg/compact/planner.go @@ -50,13 +50,15 @@ func NewPlanner(logger log.Logger, ranges []int64, noCompBlocks *GatherNoCompact } // TODO(bwplotka): Consider smarter algorithm, this prefers smaller iterative compactions vs big single one: https://github.com/thanos-io/thanos/issues/3405 -func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]compactionTask, error) { +func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]CompactionTask, error) { return p.plan(p.noCompBlocksFunc(), metasByMinTime) } -type compactionTask []*metadata.Meta +// CompactionTask is a set of blocks that should be compacted together in a single compaction. +// Multiple compaction tasks can be run in parallel even inside a single compaction group. +type CompactionTask []*metadata.Meta -func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) ([]compactionTask, error) { +func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) ([]CompactionTask, error) { notExcludedMetasByMinTime := make([]*metadata.Meta, 0, len(metasByMinTime)) for _, meta := range metasByMinTime { if _, excluded := noCompactMarked[meta.ULID]; excluded { @@ -79,7 +81,7 @@ func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompac metasByMinTime = metasByMinTime[:len(metasByMinTime)-1] res := selectMetas(p.ranges, noCompactMarked, metasByMinTime) if len(res) > 0 { - return []compactionTask{res}, nil + return []CompactionTask{res}, nil } // Compact any blocks with big enough time range that have >5% tombstones. @@ -90,7 +92,7 @@ func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompac } if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 { task := []*metadata.Meta{notExcludedMetasByMinTime[i]} - return []compactionTask{task}, nil + return []CompactionTask{task}, nil } } @@ -159,7 +161,7 @@ func selectMetas(ranges []int64, noCompactMarked map[ulid.ULID]*metadata.NoCompa // selectOverlappingMetas returns all dirs with overlapping time ranges. // It expects sorted input by mint and returns the overlapping dirs in the same order as received. // Copied and adjusted from https://github.com/prometheus/prometheus/blob/3d8826a3d42566684283a9b7f7e812e412c24407/tsdb/compact.go#L268. -func selectOverlappingMetas(metasByMinTime []*metadata.Meta) []compactionTask { +func selectOverlappingMetas(metasByMinTime []*metadata.Meta) []CompactionTask { if len(metasByMinTime) < 2 { return nil } @@ -203,7 +205,7 @@ loopMetas: } } - overlappingGroups := make([]compactionTask, 0, len(groups)) + overlappingGroups := make([]CompactionTask, 0, len(groups)) for _, group := range groups { if len(group) < 2 { continue @@ -281,7 +283,7 @@ func WithLargeTotalIndexSizeFilter(with *tsdbBasedPlanner, bkt objstore.Bucket, return &largeTotalIndexSizeFilter{tsdbBasedPlanner: with, bkt: bkt, totalMaxIndexSizeBytes: totalMaxIndexSizeBytes, markedForNoCompact: markedForNoCompact} } -func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]compactionTask, error) { +func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]CompactionTask, error) { noCompactMarked := t.noCompBlocksFunc() copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked)) for k, v := range noCompactMarked { diff --git a/pkg/compact/planner_test.go b/pkg/compact/planner_test.go index 0cdf48a5ec4..c760d692712 100644 --- a/pkg/compact/planner_test.go +++ b/pkg/compact/planner_test.go @@ -30,7 +30,7 @@ type tsdbPlannerAdapter struct { comp tsdb.Compactor } -func (p *tsdbPlannerAdapter) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]compactionTask, error) { +func (p *tsdbPlannerAdapter) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]CompactionTask, error) { // TSDB planning works based on the meta.json files in the given dir. Mock it up. for _, meta := range metasByMinTime { bdir := filepath.Join(p.dir, meta.ULID.String()) @@ -58,7 +58,7 @@ func (p *tsdbPlannerAdapter) Plan(_ context.Context, metasByMinTime []*metadata. } res = append(res, meta) } - return []compactionTask{res}, nil + return []CompactionTask{res}, nil } // Adapted from https://github.com/prometheus/prometheus/blob/6c56a1faaaad07317ff585bda75b99bdba0517ad/tsdb/compact_test.go#L167 @@ -80,7 +80,7 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { for _, c := range []struct { name string metas []*metadata.Meta - expected []compactionTask + expected []CompactionTask }{ { name: "Outside range", @@ -112,7 +112,7 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, }, - expected: []compactionTask{{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, @@ -127,7 +127,7 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(9, nil), MinTime: 180, MaxTime: 200}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(10, nil), MinTime: 200, MaxTime: 220}}, }, - expected: []compactionTask{ + expected: []CompactionTask{ { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 0, MaxTime: 60}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 60, MaxTime: 120}}, @@ -153,7 +153,7 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 80, MaxTime: 100}}, }, - expected: []compactionTask{{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, }}, @@ -167,7 +167,7 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 120, MaxTime: 180}}, }, - expected: []compactionTask{{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, @@ -181,7 +181,7 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(9, nil), MinTime: 180, MaxTime: 200}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(10, nil), MinTime: 200, MaxTime: 220}}, }, - expected: []compactionTask{{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 0, MaxTime: 60}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 120, MaxTime: 180}}, }}, @@ -195,7 +195,7 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 120, MaxTime: 180}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 720, MaxTime: 960}}, }, - expected: []compactionTask{{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 120, MaxTime: 180}}, @@ -217,7 +217,7 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { }}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 540, MaxTime: 560}}, }, - expected: []compactionTask{{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{ Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 540, Stats: tsdb.BlockStats{ NumSeries: 10, @@ -257,7 +257,7 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 360, MaxTime: 420}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 420, MaxTime: 540}}, }, - expected: []compactionTask{{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 360, MaxTime: 420}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 420, MaxTime: 540}}, }}, @@ -272,7 +272,7 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 19, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, }, - expected: []compactionTask{{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 19, MaxTime: 40}}, }}, @@ -287,7 +287,7 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 30, MaxTime: 50}}, }, - expected: []compactionTask{{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 30, MaxTime: 50}}, }}, @@ -302,7 +302,7 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 10, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 30, MaxTime: 50}}, }, - expected: []compactionTask{{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 10, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 30, MaxTime: 50}}, @@ -320,7 +320,7 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 360, MaxTime: 420}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 420, MaxTime: 540}}, }, - expected: []compactionTask{{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 0, MaxTime: 360}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 340, MaxTime: 560}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 360, MaxTime: 420}}, @@ -330,25 +330,25 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { // |--------------| // |--------------| // |--------------| - // |--------------| - // |--------------| + // |--------------| + // |--------------| { name: "Multiple independent groups of overlapping blocks", metas: []*metadata.Meta{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 10}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 9, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 17, MaxTime: 35}}, - {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 40, MaxTime: 50}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 35, MaxTime: 50}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 47, MaxTime: 60}}, }, - expected: []compactionTask{ + expected: []CompactionTask{ { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 10}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 9, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 17, MaxTime: 35}}, }, { - {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 40, MaxTime: 50}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 35, MaxTime: 50}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 47, MaxTime: 60}}, }, }, @@ -463,12 +463,12 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { tsdbPlanner.dir = dir plan, err := tsdbPlanner.Plan(context.Background(), c.metas) testutil.Ok(t, err) - testutil.Equals(t, []compactionTask(nil), plan) + testutil.Equals(t, []CompactionTask(nil), plan) }) t.Run("tsdbBasedPlanner", func(t *testing.T) { plan, err := tsdbBasedPlanner.Plan(context.Background(), c.metas) testutil.Ok(t, err) - testutil.Equals(t, []compactionTask(nil), plan) + testutil.Equals(t, []CompactionTask(nil), plan) }) }) } @@ -491,7 +491,7 @@ func TestTSDBBasedPlanner_PlanWithNoCompactMarks(t *testing.T) { metas []*metadata.Meta noCompactMarks map[ulid.ULID]*metadata.NoCompactMark - expected []compactionTask + expected []CompactionTask }{ { name: "Outside range and excluded", @@ -513,7 +513,7 @@ func TestTSDBBasedPlanner_PlanWithNoCompactMarks(t *testing.T) { noCompactMarks: map[ulid.ULID]*metadata.NoCompactMark{ ulid.MustNew(1, nil): {}, }, - expected: []compactionTask{{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, }}, @@ -541,7 +541,7 @@ func TestTSDBBasedPlanner_PlanWithNoCompactMarks(t *testing.T) { noCompactMarks: map[ulid.ULID]*metadata.NoCompactMark{ ulid.MustNew(4, nil): {}, }, - expected: []compactionTask{{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, @@ -559,7 +559,7 @@ func TestTSDBBasedPlanner_PlanWithNoCompactMarks(t *testing.T) { ulid.MustNew(1, nil): {}, ulid.MustNew(4, nil): {}, }, - expected: []compactionTask{{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, }}, @@ -604,7 +604,7 @@ func TestTSDBBasedPlanner_PlanWithNoCompactMarks(t *testing.T) { noCompactMarks: map[ulid.ULID]*metadata.NoCompactMark{ ulid.MustNew(6, nil): {}, }, - expected: []compactionTask{{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, }}, @@ -685,7 +685,7 @@ func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { name string metas []*metadata.Meta - expected []compactionTask + expected []CompactionTask expectedMarks float64 }{ { @@ -709,7 +709,7 @@ func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, }, expectedMarks: 1, - expected: []compactionTask{{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, }}, @@ -740,7 +740,7 @@ func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 90}}}, BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, }, - expected: []compactionTask{{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, @@ -760,7 +760,7 @@ func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 90}}}, BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 60, MaxTime: 80}}, }, - expected: []compactionTask{{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 50}}, }}, @@ -795,7 +795,7 @@ func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 720, MaxTime: 960}}, }, - expected: []compactionTask{{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, }},