diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index c6c36bf3cc..a56562b863 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -542,26 +542,28 @@ func (ps *CompactionProgressCalculator) ProgressCalculate(ctx context.Context, g if len(plan) == 0 { continue } - groupCompactions[g.key]++ - toRemove := make(map[ulid.ULID]struct{}, len(plan)) - metas := make([]*tsdb.BlockMeta, 0, len(plan)) - for _, p := range plan { - metas = append(metas, &p.BlockMeta) - toRemove[p.BlockMeta.ULID] = struct{}{} - } - g.deleteFromGroup(toRemove) + for _, groupTask := range plan { + groupCompactions[g.key]++ + toRemove := make(map[ulid.ULID]struct{}, len(groupTask)) + metas := make([]*tsdb.BlockMeta, 0, len(groupTask)) + for _, meta := range groupTask { + metas = append(metas, &meta.BlockMeta) + toRemove[meta.BlockMeta.ULID] = struct{}{} + } + g.deleteFromGroup(toRemove) + groupBlocks[g.key] += len(groupTask) - groupBlocks[g.key] += len(plan) + newMeta := tsdb.CompactBlockMetas(ulid.MustNew(uint64(time.Now().Unix()), nil), metas...) + if err := g.AppendMeta(&metadata.Meta{BlockMeta: *newMeta, Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: g.Resolution()}, Labels: g.Labels().Map()}}); err != nil { + return errors.Wrapf(err, "append meta") + } + } if len(g.metasByMinTime) == 0 { continue } - newMeta := tsdb.CompactBlockMetas(ulid.MustNew(uint64(time.Now().Unix()), nil), metas...) - if err := g.AppendMeta(&metadata.Meta{BlockMeta: *newMeta, Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: g.Resolution()}, Labels: g.Labels().Map()}}); err != nil { - return errors.Wrapf(err, "append meta") - } tmpGroups = append(tmpGroups, g) } @@ -727,7 +729,7 @@ func (rs *RetentionProgressCalculator) ProgressCalculate(ctx context.Context, gr type Planner interface { // Plan returns a list of blocks that should be compacted into single one. // The blocks can be overlapping. The provided metadata has to be ordered by minTime. - Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) + Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]compactionTask, error) } // Compactor provides compaction against an underlying storage of time series data. @@ -990,19 +992,19 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp overlappingBlocks = true } - var toCompact []*metadata.Meta + var tasks []compactionTask if err := tracing.DoInSpanWithErr(ctx, "compaction_planning", func(ctx context.Context) (e error) { - toCompact, e = planner.Plan(ctx, cg.metasByMinTime) + tasks, e = planner.Plan(ctx, cg.metasByMinTime) return e }); err != nil { return false, ulid.ULID{}, errors.Wrap(err, "plan compaction") } - if len(toCompact) == 0 { + if len(tasks) == 0 { // Nothing to do. return false, ulid.ULID{}, nil } - level.Info(cg.logger).Log("msg", "compaction available and planned; downloading blocks", "plan", fmt.Sprintf("%v", toCompact)) + level.Info(cg.logger).Log("msg", "compaction available and planned; downloading blocks", "plan", fmt.Sprintf("%v", tasks)) // Due to #183 we verify that none of the blocks in the plan have overlapping sources. // This is one potential source of how we could end up with duplicated chunks. @@ -1014,53 +1016,55 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp g, errCtx := errgroup.WithContext(ctx) g.SetLimit(cg.compactBlocksFetchConcurrency) - toCompactDirs := make([]string, 0, len(toCompact)) - for _, m := range toCompact { - bdir := filepath.Join(dir, m.ULID.String()) - for _, s := range m.Compaction.Sources { - if _, ok := uniqueSources[s]; ok { - return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", toCompact)) - } - uniqueSources[s] = struct{}{} - } - func(ctx context.Context, meta *metadata.Meta) { - g.Go(func() error { - if err := tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error { - return block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency)) - }, opentracing.Tags{"block.id": meta.ULID}); err != nil { - return retry(errors.Wrapf(err, "download block %s", meta.ULID)) + toCompactDirs := make([]string, 0, len(tasks)) + for _, task := range tasks { + for _, m := range task { + bdir := filepath.Join(dir, m.ULID.String()) + for _, s := range m.Compaction.Sources { + if _, ok := uniqueSources[s]; ok { + return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", tasks)) } + uniqueSources[s] = struct{}{} + } + func(ctx context.Context, meta *metadata.Meta) { + g.Go(func() error { + if err := tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error { + return block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency)) + }, opentracing.Tags{"block.id": meta.ULID}); err != nil { + return retry(errors.Wrapf(err, "download block %s", meta.ULID)) + } - // Ensure all input blocks are valid. - var stats block.HealthStats - if err := tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) (e error) { - stats, e = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) - return e - }, opentracing.Tags{"block.id": meta.ULID}); err != nil { - return errors.Wrapf(err, "gather index issues for block %s", bdir) - } + // Ensure all input blocks are valid. + var stats block.HealthStats + if err := tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) (e error) { + stats, e = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) + return e + }, opentracing.Tags{"block.id": meta.ULID}); err != nil { + return errors.Wrapf(err, "gather index issues for block %s", bdir) + } - if err := stats.CriticalErr(); err != nil { - return halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels)) - } + if err := stats.CriticalErr(); err != nil { + return halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels)) + } - if err := stats.OutOfOrderChunksErr(); err != nil { - return outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID) - } + if err := stats.OutOfOrderChunksErr(); err != nil { + return outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID) + } - if err := stats.Issue347OutsideChunksErr(); err != nil { - return issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID) - } + if err := stats.Issue347OutsideChunksErr(); err != nil { + return issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID) + } - if err := stats.OutOfOrderLabelsErr(); !cg.acceptMalformedIndex && err != nil { - return errors.Wrapf(err, - "block id %s, try running with --debug.accept-malformed-index", meta.ULID) - } - return nil - }) - }(errCtx, m) + if err := stats.OutOfOrderLabelsErr(); !cg.acceptMalformedIndex && err != nil { + return errors.Wrapf(err, + "block id %s, try running with --debug.accept-malformed-index", meta.ULID) + } + return nil + }) + }(errCtx, m) - toCompactDirs = append(toCompactDirs, bdir) + toCompactDirs = append(toCompactDirs, bdir) + } } sourceBlockStr := fmt.Sprintf("%v", toCompactDirs) @@ -1080,10 +1084,12 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp if compID == (ulid.ULID{}) { // Prometheus compactor found that the compacted block would have no samples. level.Info(cg.logger).Log("msg", "compacted block would have no samples, deleting source blocks", "blocks", sourceBlockStr) - for _, meta := range toCompact { - if meta.Stats.NumSamples == 0 { - if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil { - level.Warn(cg.logger).Log("msg", "failed to mark for deletion an empty block found during compaction", "block", meta.ULID) + for _, task := range tasks { + for _, meta := range task { + if meta.Stats.NumSamples == 0 { + if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil { + level.Warn(cg.logger).Log("msg", "failed to mark for deletion an empty block found during compaction", "block", meta.ULID) + } } } } @@ -1125,8 +1131,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp // Ensure the output block is not overlapping with anything else, // unless vertical compaction is enabled. if !cg.enableVerticalCompaction { - if err := cg.areBlocksOverlapping(newMeta, toCompact...); err != nil { - return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir)) + for _, task := range tasks { + if err := cg.areBlocksOverlapping(newMeta, task...); err != nil { + return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir)) + } } } @@ -1143,14 +1151,16 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp // Mark for deletion the blocks we just compacted from the group and bucket so they do not get included // into the next planning cycle. // Eventually the block we just uploaded should get synced into the group again (including sync-delay). - for _, meta := range toCompact { - err = tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error { - return cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())) - }, opentracing.Tags{"block.id": meta.ULID}) - if err != nil { - return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket")) + for _, task := range tasks { + for _, meta := range task { + err = tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error { + return cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())) + }, opentracing.Tags{"block.id": meta.ULID}) + if err != nil { + return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket")) + } + cg.groupGarbageCollectedBlocks.Inc() } - cg.groupGarbageCollectedBlocks.Inc() } level.Info(cg.logger).Log("msg", "finished compacting blocks", "result_block", compID, "source_blocks", sourceBlockStr, diff --git a/pkg/compact/planner.go b/pkg/compact/planner.go index 5c2a93df8d..6d9c66a5af 100644 --- a/pkg/compact/planner.go +++ b/pkg/compact/planner.go @@ -8,6 +8,7 @@ import ( "fmt" "math" "path/filepath" + "sort" "github.com/go-kit/log" "github.com/oklog/ulid" @@ -49,11 +50,13 @@ func NewPlanner(logger log.Logger, ranges []int64, noCompBlocks *GatherNoCompact } // TODO(bwplotka): Consider smarter algorithm, this prefers smaller iterative compactions vs big single one: https://github.com/thanos-io/thanos/issues/3405 -func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { +func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]compactionTask, error) { return p.plan(p.noCompBlocksFunc(), metasByMinTime) } -func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { +type compactionTask []*metadata.Meta + +func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) ([]compactionTask, error) { notExcludedMetasByMinTime := make([]*metadata.Meta, 0, len(metasByMinTime)) for _, meta := range metasByMinTime { if _, excluded := noCompactMarked[meta.ULID]; excluded { @@ -62,9 +65,9 @@ func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompac notExcludedMetasByMinTime = append(notExcludedMetasByMinTime, meta) } - res := selectOverlappingMetas(notExcludedMetasByMinTime) - if len(res) > 0 { - return res, nil + verticalCompactions := selectOverlappingMetas(notExcludedMetasByMinTime) + if len(verticalCompactions) > 0 { + return verticalCompactions, nil } // No overlapping blocks, do compaction the usual way. @@ -74,9 +77,9 @@ func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompac notExcludedMetasByMinTime = notExcludedMetasByMinTime[:len(notExcludedMetasByMinTime)-1] } metasByMinTime = metasByMinTime[:len(metasByMinTime)-1] - res = append(res, selectMetas(p.ranges, noCompactMarked, metasByMinTime)...) + res := selectMetas(p.ranges, noCompactMarked, metasByMinTime) if len(res) > 0 { - return res, nil + return []compactionTask{res}, nil } // Compact any blocks with big enough time range that have >5% tombstones. @@ -86,7 +89,8 @@ func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompac break } if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 { - return []*metadata.Meta{notExcludedMetasByMinTime[i]}, nil + task := []*metadata.Meta{notExcludedMetasByMinTime[i]} + return []compactionTask{task}, nil } } @@ -155,28 +159,62 @@ func selectMetas(ranges []int64, noCompactMarked map[ulid.ULID]*metadata.NoCompa // selectOverlappingMetas returns all dirs with overlapping time ranges. // It expects sorted input by mint and returns the overlapping dirs in the same order as received. // Copied and adjusted from https://github.com/prometheus/prometheus/blob/3d8826a3d42566684283a9b7f7e812e412c24407/tsdb/compact.go#L268. -func selectOverlappingMetas(metasByMinTime []*metadata.Meta) []*metadata.Meta { +func selectOverlappingMetas(metasByMinTime []*metadata.Meta) []compactionTask { if len(metasByMinTime) < 2 { return nil } - var overlappingMetas []*metadata.Meta - globalMaxt := metasByMinTime[0].MaxTime - for i, m := range metasByMinTime[1:] { - if m.MinTime < globalMaxt { - if len(overlappingMetas) == 0 { - // When it is the first overlap, need to add the last one as well. - overlappingMetas = append(overlappingMetas, metasByMinTime[i]) + + groups := make([][]*metadata.Meta, len(metasByMinTime)) + for i, m := range metasByMinTime { + groups[i] = []*metadata.Meta{m} + } + + // Iterate through all metas and merge overlapping blocks into groups. + // We compare each block's max time with the next block's min time. + // If we detect an overlap, we merge all blocks from the current block's group + // into the group of the next block. + // We also adjust the head of the next group 's head(zero-th element) to be the block with the + // highest max-time. This allows groups to be used as heaps, and helps us detect + // overlaps when the next block is contained in the current block. + // See test case https://github.com/thanos-io/thanos/blob/04106d7a7add7f47025c00422c80f746650c1b97/pkg/compact/planner_test.go#L310-L321. +loopMetas: + for i := range metasByMinTime { + currentGroup := groups[i] + currentBlock := currentGroup[0] + for j := i + 1; j < len(groups); j++ { + if currentBlock.MaxTime <= groups[j][0].MinTime { + continue } - overlappingMetas = append(overlappingMetas, m) - } else if len(overlappingMetas) > 0 { - break + // If the current block has an overlap with the next block, + // merge the current block's group of the overlapping block's group. + for _, blockToMerge := range currentGroup { + groups[j] = append(groups[j], blockToMerge) + // Set the block with the highest max time as the head of the group. + if blockToMerge.MaxTime > groups[j][0].MaxTime { + n := len(groups[j]) - 1 + groups[j][0], groups[j][n] = groups[j][n], groups[j][0] + } + } + // Empty the current block's group. + groups[i] = nil + + // Move on to the next block. + continue loopMetas } + } - if m.MaxTime > globalMaxt { - globalMaxt = m.MaxTime + overlappingGroups := make([]compactionTask, 0, len(groups)) + for _, group := range groups { + if len(group) < 2 { + continue } + sort.Slice(group, func(i, j int) bool { + return group[i].MinTime < group[j].MinTime + }) + overlappingGroups = append(overlappingGroups, group) } - return overlappingMetas + + return overlappingGroups } // splitByRange splits the directories by the time range. The range sequence starts at 0. @@ -243,7 +281,7 @@ func WithLargeTotalIndexSizeFilter(with *tsdbBasedPlanner, bkt objstore.Bucket, return &largeTotalIndexSizeFilter{tsdbBasedPlanner: with, bkt: bkt, totalMaxIndexSizeBytes: totalMaxIndexSizeBytes, markedForNoCompact: markedForNoCompact} } -func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { +func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]compactionTask, error) { noCompactMarked := t.noCompBlocksFunc() copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked)) for k, v := range noCompactMarked { @@ -252,54 +290,56 @@ func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []* PlanLoop: for { - plan, err := t.plan(copiedNoCompactMarked, metasByMinTime) + tasks, err := t.plan(copiedNoCompactMarked, metasByMinTime) if err != nil { return nil, err } - var totalIndexBytes, maxIndexSize int64 = 0, math.MinInt64 - var biggestIndex int - for i, p := range plan { - indexSize := int64(-1) - for _, f := range p.Thanos.Files { - if f.RelPath == block.IndexFilename { - indexSize = f.SizeBytes + for _, task := range tasks { + var totalIndexBytes, maxIndexSize int64 = 0, math.MinInt64 + var biggestIndex int + for i, meta := range task { + indexSize := int64(-1) + for _, f := range meta.Thanos.Files { + if f.RelPath == block.IndexFilename { + indexSize = f.SizeBytes + } } - } - if indexSize <= 0 { - // Get size from bkt instead. - attr, err := t.bkt.Attributes(ctx, filepath.Join(p.ULID.String(), block.IndexFilename)) - if err != nil { - return nil, errors.Wrapf(err, "get attr of %v", filepath.Join(p.ULID.String(), block.IndexFilename)) + if indexSize <= 0 { + // Get size from bkt instead. + attr, err := t.bkt.Attributes(ctx, filepath.Join(meta.ULID.String(), block.IndexFilename)) + if err != nil { + return nil, errors.Wrapf(err, "get attr of %v", filepath.Join(meta.ULID.String(), block.IndexFilename)) + } + indexSize = attr.Size } - indexSize = attr.Size - } - if maxIndexSize < indexSize { - maxIndexSize = indexSize - biggestIndex = i - } - totalIndexBytes += indexSize - // Leave 15% headroom for index compaction bloat. - if totalIndexBytes >= int64(float64(t.totalMaxIndexSizeBytes)*0.85) { - // Marking blocks for no compact to limit size. - // TODO(bwplotka): Make sure to reset cache once this is done: https://github.com/thanos-io/thanos/issues/3408 - if err := block.MarkForNoCompact( - ctx, - t.logger, - t.bkt, - plan[biggestIndex].ULID, - metadata.IndexSizeExceedingNoCompactReason, - fmt.Sprintf("largeTotalIndexSizeFilter: Total compacted block's index size could exceed: %v with this block. See https://github.com/thanos-io/thanos/issues/1424", t.totalMaxIndexSizeBytes), - t.markedForNoCompact, - ); err != nil { - return nil, errors.Wrapf(err, "mark %v for no compaction", plan[biggestIndex].ULID.String()) + if maxIndexSize < indexSize { + maxIndexSize = indexSize + biggestIndex = i + } + totalIndexBytes += indexSize + // Leave 15% headroom for index compaction bloat. + if totalIndexBytes >= int64(float64(t.totalMaxIndexSizeBytes)*0.85) { + // Marking blocks for no compact to limit size. + // TODO(bwplotka): Make sure to reset cache once this is done: https://github.com/thanos-io/thanos/issues/3408 + if err := block.MarkForNoCompact( + ctx, + t.logger, + t.bkt, + task[biggestIndex].ULID, + metadata.IndexSizeExceedingNoCompactReason, + fmt.Sprintf("largeTotalIndexSizeFilter: Total compacted block's index size could exceed: %v with this block. See https://github.com/thanos-io/thanos/issues/1424", t.totalMaxIndexSizeBytes), + t.markedForNoCompact, + ); err != nil { + return nil, errors.Wrapf(err, "mark %v for no compaction", task[biggestIndex].ULID.String()) + } + // Make sure wrapped planner exclude this block. + copiedNoCompactMarked[task[biggestIndex].ULID] = &metadata.NoCompactMark{ID: task[biggestIndex].ULID, Version: metadata.NoCompactMarkVersion1} + continue PlanLoop } - // Make sure wrapped planner exclude this block. - copiedNoCompactMarked[plan[biggestIndex].ULID] = &metadata.NoCompactMark{ID: plan[biggestIndex].ULID, Version: metadata.NoCompactMarkVersion1} - continue PlanLoop } } // Planned blocks should not exceed limit. - return plan, nil + return tasks, nil } } diff --git a/pkg/compact/planner_test.go b/pkg/compact/planner_test.go index d5c253be72..0cdf48a5ec 100644 --- a/pkg/compact/planner_test.go +++ b/pkg/compact/planner_test.go @@ -30,7 +30,7 @@ type tsdbPlannerAdapter struct { comp tsdb.Compactor } -func (p *tsdbPlannerAdapter) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { +func (p *tsdbPlannerAdapter) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]compactionTask, error) { // TSDB planning works based on the meta.json files in the given dir. Mock it up. for _, meta := range metasByMinTime { bdir := filepath.Join(p.dir, meta.ULID.String()) @@ -46,6 +46,10 @@ func (p *tsdbPlannerAdapter) Plan(_ context.Context, metasByMinTime []*metadata. return nil, err } + if len(plan) == 0 { + return nil, nil + } + var res []*metadata.Meta for _, pdir := range plan { meta, err := metadata.ReadFromDir(pdir) @@ -54,7 +58,7 @@ func (p *tsdbPlannerAdapter) Plan(_ context.Context, metasByMinTime []*metadata. } res = append(res, meta) } - return res, nil + return []compactionTask{res}, nil } // Adapted from https://github.com/prometheus/prometheus/blob/6c56a1faaaad07317ff585bda75b99bdba0517ad/tsdb/compact_test.go#L167 @@ -76,7 +80,7 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { for _, c := range []struct { name string metas []*metadata.Meta - expected []*metadata.Meta + expected []compactionTask }{ { name: "Outside range", @@ -108,11 +112,11 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, }, - expected: []*metadata.Meta{ + expected: []compactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, - }, + }}, }, { name: "There are blocks to fill the entire 2nd parent range.", @@ -123,10 +127,12 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(9, nil), MinTime: 180, MaxTime: 200}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(10, nil), MinTime: 200, MaxTime: 220}}, }, - expected: []*metadata.Meta{ - {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 0, MaxTime: 60}}, - {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 60, MaxTime: 120}}, - {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 120, MaxTime: 180}}, + expected: []compactionTask{ + { + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 0, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 60, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 120, MaxTime: 180}}, + }, }, }, { @@ -147,10 +153,10 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 80, MaxTime: 100}}, }, - expected: []*metadata.Meta{ + expected: []compactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, - }, + }}, }, { name: "We have 20, 20, 20, 60, 60 range blocks. '5' is marked as fresh one", @@ -161,11 +167,11 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 120, MaxTime: 180}}, }, - expected: []*metadata.Meta{ + expected: []compactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, - }, + }}, }, { name: "There are blocks to fill the entire 2nd parent range, but there is a gap", @@ -175,10 +181,10 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(9, nil), MinTime: 180, MaxTime: 200}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(10, nil), MinTime: 200, MaxTime: 220}}, }, - expected: []*metadata.Meta{ + expected: []compactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 0, MaxTime: 60}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 120, MaxTime: 180}}, - }, + }}, }, { name: "We have 20, 60, 20, 60, 240 range blocks. We can compact 20 + 60 + 60", @@ -189,20 +195,18 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 120, MaxTime: 180}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 720, MaxTime: 960}}, }, - expected: []*metadata.Meta{ + expected: []compactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 120, MaxTime: 180}}, - }, + }}, }, { name: "Do not select large blocks that have many tombstones when there is no fresh block", - metas: []*metadata.Meta{ - {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 540, Stats: tsdb.BlockStats{ - NumSeries: 10, - NumTombstones: 3, - }}}, - }, + metas: []*metadata.Meta{{BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 540, Stats: tsdb.BlockStats{ + NumSeries: 10, + NumTombstones: 3, + }}}}, }, { name: "Select large blocks that have many tombstones when fresh appears", @@ -213,10 +217,12 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { }}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 540, MaxTime: 560}}, }, - expected: []*metadata.Meta{{BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 540, Stats: tsdb.BlockStats{ - NumSeries: 10, - NumTombstones: 3, - }}}}, + expected: []compactionTask{{ + {BlockMeta: tsdb.BlockMeta{ + Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 540, Stats: tsdb.BlockStats{ + NumSeries: 10, + NumTombstones: 3, + }}}}}, }, { name: "For small blocks, do not compact tombstones, even when fresh appears.", @@ -251,10 +257,10 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 360, MaxTime: 420}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 420, MaxTime: 540}}, }, - expected: []*metadata.Meta{ + expected: []compactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 360, MaxTime: 420}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 420, MaxTime: 540}}, - }, + }}, }, // |--------------| // |----------------| @@ -266,10 +272,10 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 19, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, }, - expected: []*metadata.Meta{ + expected: []compactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 19, MaxTime: 40}}, - }, + }}, }, // |--------------| // |--------------| @@ -281,10 +287,10 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 30, MaxTime: 50}}, }, - expected: []*metadata.Meta{ + expected: []compactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 30, MaxTime: 50}}, - }, + }}, }, // |--------------| // |---------------------| @@ -296,11 +302,11 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 10, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 30, MaxTime: 50}}, }, - expected: []*metadata.Meta{ + expected: []compactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 10, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 30, MaxTime: 50}}, - }, + }}, }, // |--------------| // |--------------------------------| @@ -314,35 +320,46 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 360, MaxTime: 420}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 420, MaxTime: 540}}, }, - expected: []*metadata.Meta{ + expected: []compactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 0, MaxTime: 360}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 340, MaxTime: 560}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 360, MaxTime: 420}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 420, MaxTime: 540}}, - }, + }}, }, // |--------------| // |--------------| + // |--------------| // |--------------| // |--------------| { - name: "Overlapping blocks 5", + name: "Multiple independent groups of overlapping blocks", metas: []*metadata.Meta{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 10}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 9, MaxTime: 20}}, - {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 30, MaxTime: 40}}, - {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 39, MaxTime: 50}}, - }, - expected: []*metadata.Meta{ - {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 10}}, - {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 9, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 17, MaxTime: 35}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 40, MaxTime: 50}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 47, MaxTime: 60}}, + }, + expected: []compactionTask{ + { + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 10}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 9, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 17, MaxTime: 35}}, + }, + { + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 40, MaxTime: 50}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 47, MaxTime: 60}}, + }, }, }, } { t.Run(c.name, func(t *testing.T) { for _, e := range c.expected { - // Add here to avoid boilerplate. - e.Thanos.Labels = make(map[string]string) + for _, meta := range e { + // Add here to avoid boilerplate. + meta.Thanos.Labels = make(map[string]string) + } } for _, e := range c.metas { // Add here to avoid boilerplate. @@ -366,7 +383,11 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { tsdbPlanner.dir = dir plan, err := tsdbPlanner.Plan(context.Background(), metasByMinTime) testutil.Ok(t, err) - testutil.Equals(t, c.expected, plan) + if len(c.expected) == 0 { + testutil.Equals(t, len(plan), 0) + } else { + testutil.Equals(t, c.expected[0], plan[0]) + } }) t.Run("tsdbBasedPlanner", func(t *testing.T) { metasByMinTime := make([]*metadata.Meta, len(c.metas)) @@ -442,12 +463,12 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { tsdbPlanner.dir = dir plan, err := tsdbPlanner.Plan(context.Background(), c.metas) testutil.Ok(t, err) - testutil.Equals(t, []*metadata.Meta(nil), plan) + testutil.Equals(t, []compactionTask(nil), plan) }) t.Run("tsdbBasedPlanner", func(t *testing.T) { plan, err := tsdbBasedPlanner.Plan(context.Background(), c.metas) testutil.Ok(t, err) - testutil.Equals(t, []*metadata.Meta(nil), plan) + testutil.Equals(t, []compactionTask(nil), plan) }) }) } @@ -470,7 +491,7 @@ func TestTSDBBasedPlanner_PlanWithNoCompactMarks(t *testing.T) { metas []*metadata.Meta noCompactMarks map[ulid.ULID]*metadata.NoCompactMark - expected []*metadata.Meta + expected []compactionTask }{ { name: "Outside range and excluded", @@ -492,10 +513,10 @@ func TestTSDBBasedPlanner_PlanWithNoCompactMarks(t *testing.T) { noCompactMarks: map[ulid.ULID]*metadata.NoCompactMark{ ulid.MustNew(1, nil): {}, }, - expected: []*metadata.Meta{ + expected: []compactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, - }, + }}, }, { name: "Blocks to fill the entire parent, but with second one excluded.", @@ -520,11 +541,11 @@ func TestTSDBBasedPlanner_PlanWithNoCompactMarks(t *testing.T) { noCompactMarks: map[ulid.ULID]*metadata.NoCompactMark{ ulid.MustNew(4, nil): {}, }, - expected: []*metadata.Meta{ + expected: []compactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, - }, + }}, }, { name: "Blocks to fill the entire parent, but with last one fist excluded.", @@ -538,10 +559,10 @@ func TestTSDBBasedPlanner_PlanWithNoCompactMarks(t *testing.T) { ulid.MustNew(1, nil): {}, ulid.MustNew(4, nil): {}, }, - expected: []*metadata.Meta{ + expected: []compactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, - }, + }}, }, { name: "Blocks to fill the entire parent, but with all of them excluded.", @@ -583,10 +604,10 @@ func TestTSDBBasedPlanner_PlanWithNoCompactMarks(t *testing.T) { noCompactMarks: map[ulid.ULID]*metadata.NoCompactMark{ ulid.MustNew(6, nil): {}, }, - expected: []*metadata.Meta{ + expected: []compactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, - }, + }}, }, { name: "We have 20, 60, 20, 60, 240 range blocks. We could compact 20 + 60 + 60, but 4th is excluded", @@ -664,7 +685,7 @@ func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { name string metas []*metadata.Meta - expected []*metadata.Meta + expected []compactionTask expectedMarks float64 }{ { @@ -688,10 +709,10 @@ func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, }, expectedMarks: 1, - expected: []*metadata.Meta{ + expected: []compactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, - }, + }}, }, { name: "Blocks to fill the entire parent, but with second one too large.", @@ -719,11 +740,11 @@ func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 90}}}, BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, }, - expected: []*metadata.Meta{ + expected: []compactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, - }, + }}, }, { name: "Blocks to fill the entire parent, but with pre-last one and first too large.", @@ -739,10 +760,10 @@ func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 90}}}, BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 60, MaxTime: 80}}, }, - expected: []*metadata.Meta{ + expected: []compactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 50}}, - }, + }}, expectedMarks: 2, }, { @@ -774,10 +795,10 @@ func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 720, MaxTime: 960}}, }, - expected: []*metadata.Meta{ + expected: []compactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, - }, + }}, expectedMarks: 1, }, // |--------------| @@ -817,9 +838,11 @@ func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { plan, err := planner.Plan(context.Background(), metasByMinTime) testutil.Ok(t, err) - for _, m := range plan { - // For less boilerplate. - m.Thanos = metadata.Thanos{} + for _, task := range plan { + for _, m := range task { + // For less boilerplate. + m.Thanos = metadata.Thanos{} + } } testutil.Equals(t, c.expected, plan) testutil.Equals(t, c.expectedMarks, promtest.ToFloat64(marked)-lastMarkValue)