diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index c6c36bf3cc..a0f0dd0047 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -8,8 +8,10 @@ import ( "fmt" "math" "os" + "path" "path/filepath" "sort" + "strconv" "sync" "time" @@ -542,26 +544,28 @@ func (ps *CompactionProgressCalculator) ProgressCalculate(ctx context.Context, g if len(plan) == 0 { continue } - groupCompactions[g.key]++ - toRemove := make(map[ulid.ULID]struct{}, len(plan)) - metas := make([]*tsdb.BlockMeta, 0, len(plan)) - for _, p := range plan { - metas = append(metas, &p.BlockMeta) - toRemove[p.BlockMeta.ULID] = struct{}{} - } - g.deleteFromGroup(toRemove) + for _, groupTask := range plan { + groupCompactions[g.key]++ + toRemove := make(map[ulid.ULID]struct{}, len(groupTask)) + metas := make([]*tsdb.BlockMeta, 0, len(groupTask)) + for _, meta := range groupTask { + metas = append(metas, &meta.BlockMeta) + toRemove[meta.BlockMeta.ULID] = struct{}{} + } + g.deleteFromGroup(toRemove) + groupBlocks[g.key] += len(groupTask) - groupBlocks[g.key] += len(plan) + newMeta := tsdb.CompactBlockMetas(ulid.MustNew(uint64(time.Now().Unix()), nil), metas...) + if err := g.AppendMeta(&metadata.Meta{BlockMeta: *newMeta, Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: g.Resolution()}, Labels: g.Labels().Map()}}); err != nil { + return errors.Wrapf(err, "append meta") + } + } if len(g.metasByMinTime) == 0 { continue } - newMeta := tsdb.CompactBlockMetas(ulid.MustNew(uint64(time.Now().Unix()), nil), metas...) - if err := g.AppendMeta(&metadata.Meta{BlockMeta: *newMeta, Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: g.Resolution()}, Labels: g.Labels().Map()}}); err != nil { - return errors.Wrapf(err, "append meta") - } tmpGroups = append(tmpGroups, g) } @@ -727,7 +731,7 @@ func (rs *RetentionProgressCalculator) ProgressCalculate(ctx context.Context, gr type Planner interface { // Plan returns a list of blocks that should be compacted into single one. // The blocks can be overlapping. The provided metadata has to be ordered by minTime. - Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) + Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]CompactionTask, error) } // Compactor provides compaction against an underlying storage of time series data. @@ -749,38 +753,54 @@ type Compactor interface { Compact(dest string, dirs []string, open []*tsdb.Block) (ulid.ULID, error) } +// GroupCompactionTask is an independent compaction task for a given compaction group. +type GroupCompactionTask struct { + Group *Group + Blocks CompactionTask + Dir string +} + // Compact plans and runs a single compaction against the group. The compacted result // is uploaded into the bucket the blocks were retrieved from. -func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, compID ulid.ULID, rerr error) { - cg.compactionRunsStarted.Inc() - - subDir := filepath.Join(dir, cg.Key()) - +func (task *GroupCompactionTask) Compact(ctx context.Context, comp Compactor) (shouldRerun bool, cerr error) { + task.Group.compactionRunsStarted.Inc() defer func() { // Leave the compact directory for inspection if it is a halt error // or if it is not then so that possibly we would not have to download everything again. - if rerr != nil { + if cerr != nil { + task.Group.compactionFailures.Inc() return } - if err := os.RemoveAll(subDir); err != nil { - level.Error(cg.logger).Log("msg", "failed to remove compaction group work directory", "path", subDir, "err", err) + task.Group.compactionRunsCompleted.Inc() + if err := os.RemoveAll(task.Dir); err != nil { + level.Error(task.Group.logger).Log("msg", "failed to remove compaction group work directory", "path", task.Dir, "err", err) } }() - if err := os.MkdirAll(subDir, 0750); err != nil { - return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") + hasOverlappingBlocks := false + if err := task.Group.areBlocksOverlapping(task.Blocks, task.Group.metasByMinTime...); err != nil { + // TODO(bwplotka): It would really nice if we could still check for other overlaps than replica. In fact this should be checked + // in syncer itself. Otherwise with vertical compaction enabled we will sacrifice this important check. + if !task.Group.enableVerticalCompaction { + return false, halt(errors.Wrap(err, "pre compaction overlap check")) + } + + hasOverlappingBlocks = true } - err := tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) (err error) { - shouldRerun, compID, err = cg.compact(ctx, subDir, planner, comp) - return err - }, opentracing.Tags{"group.key": cg.Key()}) - if err != nil { - cg.compactionFailures.Inc() - return false, ulid.ULID{}, err + if err := os.MkdirAll(task.Dir, 0750); err != nil { + return false, errors.Wrap(err, "create compaction group dir") + } + + terr := tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) (cerr error) { + shouldRerun, cerr = task.Group.compactBlocks(ctx, task.Dir, task.Blocks, comp, hasOverlappingBlocks) + return cerr + }, opentracing.Tags{"group.key": task.Group.Key()}) + if terr != nil { + return false, terr } - cg.compactionRunsCompleted.Inc() - return shouldRerun, compID, nil + + return shouldRerun, cerr } // Issue347Error is a type wrapper for errors that should invoke repair process for broken block. @@ -886,7 +906,7 @@ func IsRetryError(err error) bool { return ok } -func (cg *Group) areBlocksOverlapping(include *metadata.Meta, exclude ...*metadata.Meta) error { +func (cg *Group) areBlocksOverlapping(include []*metadata.Meta, exclude ...*metadata.Meta) error { var ( metas []tsdb.BlockMeta excludeMap = map[ulid.ULID]struct{}{} @@ -903,8 +923,8 @@ func (cg *Group) areBlocksOverlapping(include *metadata.Meta, exclude ...*metada metas = append(metas, m.BlockMeta) } - if include != nil { - metas = append(metas, include.BlockMeta) + for _, meta := range include { + metas = append(metas, meta.BlockMeta) } sort.Slice(metas, func(i, j int) bool { @@ -974,55 +994,35 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return nil } -func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, compID ulid.ULID, _ error) { - cg.mtx.Lock() - defer cg.mtx.Unlock() - - // Check for overlapped blocks. - overlappingBlocks := false - if err := cg.areBlocksOverlapping(nil); err != nil { - // TODO(bwplotka): It would really nice if we could still check for other overlaps than replica. In fact this should be checked - // in syncer itself. Otherwise with vertical compaction enabled we will sacrifice this important check. - if !cg.enableVerticalCompaction { - return false, ulid.ULID{}, halt(errors.Wrap(err, "pre compaction overlap check")) - } - - overlappingBlocks = true +func (cg *Group) PlanCompactionTasks(ctx context.Context, dir string, planner Planner) ([]GroupCompactionTask, error) { + plannedCompactions, err := planner.Plan(ctx, cg.metasByMinTime) + if err != nil { + return nil, err } - var toCompact []*metadata.Meta - if err := tracing.DoInSpanWithErr(ctx, "compaction_planning", func(ctx context.Context) (e error) { - toCompact, e = planner.Plan(ctx, cg.metasByMinTime) - return e - }); err != nil { - return false, ulid.ULID{}, errors.Wrap(err, "plan compaction") - } - if len(toCompact) == 0 { - // Nothing to do. - return false, ulid.ULID{}, nil + tasks := make([]GroupCompactionTask, 0, len(plannedCompactions)) + for i, blocks := range plannedCompactions { + tasks = append(tasks, GroupCompactionTask{ + Group: cg, + Blocks: blocks, + Dir: path.Join(dir, cg.Key(), strconv.Itoa(i)), + }) } - level.Info(cg.logger).Log("msg", "compaction available and planned; downloading blocks", "plan", fmt.Sprintf("%v", toCompact)) - - // Due to #183 we verify that none of the blocks in the plan have overlapping sources. - // This is one potential source of how we could end up with duplicated chunks. - uniqueSources := map[ulid.ULID]struct{}{} + return tasks, nil +} +func (cg *Group) compactBlocks(ctx context.Context, dir string, blocks CompactionTask, comp Compactor, hasOverlappingBlocks bool) (bool, error) { // Once we have a plan we need to download the actual data. - groupCompactionBegin := time.Now() - begin := groupCompactionBegin + compactionBegin := time.Now() + begin := compactionBegin + g, errCtx := errgroup.WithContext(ctx) g.SetLimit(cg.compactBlocksFetchConcurrency) - toCompactDirs := make([]string, 0, len(toCompact)) - for _, m := range toCompact { + toCompactDirs := make([]string, 0, len(blocks)) + for _, m := range blocks { bdir := filepath.Join(dir, m.ULID.String()) - for _, s := range m.Compaction.Sources { - if _, ok := uniqueSources[s]; ok { - return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", toCompact)) - } - uniqueSources[s] = struct{}{} - } func(ctx context.Context, meta *metadata.Meta) { g.Go(func() error { if err := tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error { @@ -1062,25 +1062,27 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp toCompactDirs = append(toCompactDirs, bdir) } + sourceBlockStr := fmt.Sprintf("%v", toCompactDirs) if err := g.Wait(); err != nil { - return false, ulid.ULID{}, err + return false, err } level.Info(cg.logger).Log("msg", "downloaded and verified blocks; compacting blocks", "plan", sourceBlockStr, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) + var compID ulid.ULID begin = time.Now() if err := tracing.DoInSpanWithErr(ctx, "compaction", func(ctx context.Context) (e error) { compID, e = comp.Compact(dir, toCompactDirs, nil) return e }); err != nil { - return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact blocks %v", toCompactDirs)) + return false, halt(errors.Wrapf(err, "compact blocks %v", toCompactDirs)) } if compID == (ulid.ULID{}) { // Prometheus compactor found that the compacted block would have no samples. level.Info(cg.logger).Log("msg", "compacted block would have no samples, deleting source blocks", "blocks", sourceBlockStr) - for _, meta := range toCompact { + for _, meta := range blocks { if meta.Stats.NumSamples == 0 { if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil { level.Warn(cg.logger).Log("msg", "failed to mark for deletion an empty block found during compaction", "block", meta.ULID) @@ -1088,14 +1090,14 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp } } // Even though this block was empty, there may be more work to do. - return true, ulid.ULID{}, nil + return true, nil } cg.compactions.Inc() - if overlappingBlocks { + if hasOverlappingBlocks { cg.verticalCompactions.Inc() } level.Info(cg.logger).Log("msg", "compacted blocks", "new", compID, - "blocks", sourceBlockStr, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "overlapping_blocks", overlappingBlocks) + "blocks", sourceBlockStr, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "overlapping_blocks", hasOverlappingBlocks) bdir := filepath.Join(dir, compID.String()) index := filepath.Join(bdir, block.IndexFilename) @@ -1107,11 +1109,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp SegmentFiles: block.GetSegmentFiles(bdir), }, nil) if err != nil { - return false, ulid.ULID{}, errors.Wrapf(err, "failed to finalize the block %s", bdir) + return false, errors.Wrapf(err, "failed to finalize the block %s", bdir) } if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil { - return false, ulid.ULID{}, errors.Wrap(err, "remove tombstones") + return false, errors.Wrap(err, "remove tombstones") } // Ensure the output block is valid. @@ -1119,43 +1121,42 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp return block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime) }) if !cg.acceptMalformedIndex && err != nil { - return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir)) + return false, halt(errors.Wrapf(err, "invalid result block %s", bdir)) } // Ensure the output block is not overlapping with anything else, // unless vertical compaction is enabled. if !cg.enableVerticalCompaction { - if err := cg.areBlocksOverlapping(newMeta, toCompact...); err != nil { - return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir)) + if err := cg.areBlocksOverlapping([]*metadata.Meta{newMeta}, blocks...); err != nil { + return false, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir)) } } begin = time.Now() - err = tracing.DoInSpanWithErr(ctx, "compaction_block_upload", func(ctx context.Context) error { return block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc, objstore.WithUploadConcurrency(cg.blockFilesConcurrency)) }) if err != nil { - return false, ulid.ULID{}, retry(errors.Wrapf(err, "upload of %s failed", compID)) + return false, retry(errors.Wrapf(err, "upload of %s failed", compID)) } level.Info(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) // Mark for deletion the blocks we just compacted from the group and bucket so they do not get included // into the next planning cycle. // Eventually the block we just uploaded should get synced into the group again (including sync-delay). - for _, meta := range toCompact { + for _, meta := range blocks { err = tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error { return cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())) }, opentracing.Tags{"block.id": meta.ULID}) if err != nil { - return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket")) + return false, retry(errors.Wrapf(err, "mark old block for deletion from bucket")) } cg.groupGarbageCollectedBlocks.Inc() } level.Info(cg.logger).Log("msg", "finished compacting blocks", "result_block", compID, "source_blocks", sourceBlockStr, - "duration", time.Since(groupCompactionBegin), "duration_ms", time.Since(groupCompactionBegin).Milliseconds()) - return true, compID, nil + "duration", time.Since(compactionBegin), "duration_ms", time.Since(compactionBegin).Milliseconds()) + return true, nil } func (cg *Group) deleteBlock(id ulid.ULID, bdir string) error { @@ -1233,9 +1234,9 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { var ( wg sync.WaitGroup workCtx, workCtxCancel = context.WithCancel(ctx) - groupChan = make(chan *Group) + taskChan = make(chan GroupCompactionTask) errChan = make(chan error, c.concurrency) - finishedAllGroups = true + finishedAllTasks = true mtx sync.Mutex ) defer workCtxCancel() @@ -1246,12 +1247,12 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { wg.Add(1) go func() { defer wg.Done() - for g := range groupChan { - shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.planner, c.comp) + for task := range taskChan { + shouldRerunGroup, err := task.Compact(workCtx, c.comp) if err == nil { if shouldRerunGroup { mtx.Lock() - finishedAllGroups = false + finishedAllTasks = false mtx.Unlock() } continue @@ -1260,11 +1261,12 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { if IsIssue347Error(err) { if err := RepairIssue347(workCtx, c.logger, c.bkt, c.sy.metrics.blocksMarkedForDeletion, err); err == nil { mtx.Lock() - finishedAllGroups = false + finishedAllTasks = false mtx.Unlock() continue } } + // If block has out of order chunk and it has been configured to skip it, // then we can mark the block for no compaction so that the next compaction run // will skip it. @@ -1275,15 +1277,19 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { c.bkt, err.(OutOfOrderChunksError).id, metadata.OutOfOrderChunksNoCompactReason, - "OutofOrderChunk: marking block with out-of-order series/chunks to as no compact to unblock compaction", g.blocksMarkedForNoCompact); err == nil { + "OutofOrderChunk: marking block with out-of-order series/chunks to as no compact to unblock compaction", task.Group.blocksMarkedForNoCompact, + ); err == nil { mtx.Lock() - finishedAllGroups = false + finishedAllTasks = false mtx.Unlock() continue } } - errChan <- errors.Wrapf(err, "group %s", g.Key()) - return + + if err != nil { + errChan <- errors.Wrapf(err, "group %s", task.Group.Key()) + return + } } }() } @@ -1318,37 +1324,38 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { level.Info(c.logger).Log("msg", "start of compactions") - // Send all groups found during this pass to the compaction workers. - var groupErrs errutil.MultiError - groupLoop: - for _, g := range groups { - // Ignore groups with only one block because there is nothing to compact. - if len(g.IDs()) == 1 { - continue - } + tasks, err := c.planTasks(ctx, groups) + if err != nil { + return err + } + + // Send all tasks planned in this pass to the compaction workers. + var taskErrs errutil.MultiError + tasksLoop: + for _, task := range tasks { select { - case groupErr := <-errChan: - groupErrs.Add(groupErr) - break groupLoop - case groupChan <- g: + case taskErr := <-errChan: + taskErrs.Add(taskErr) + break tasksLoop + case taskChan <- task: } } - close(groupChan) + close(taskChan) wg.Wait() // Collect any other error reported by the workers, or any error reported // while we were waiting for the last batch of groups to run the compaction. close(errChan) for groupErr := range errChan { - groupErrs.Add(groupErr) + taskErrs.Add(groupErr) } workCtxCancel() - if len(groupErrs) > 0 { - return groupErrs.Err() + if len(taskErrs) > 0 { + return taskErrs.Err() } - if finishedAllGroups { + if finishedAllTasks { break } } @@ -1356,6 +1363,52 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { return nil } +func (c *BucketCompactor) planTasks(ctx context.Context, groups []*Group) ([]GroupCompactionTask, error) { + // Plan tasks from all groups + allGroupTasks := make([][]GroupCompactionTask, 0, len(groups)) + numTasks := 0 + for _, g := range groups { + // Ignore groups with only one block because there is nothing to compact. + if len(g.IDs()) == 1 { + continue + } + + groupTasks, err := g.PlanCompactionTasks(ctx, c.compactDir, c.planner) + if err != nil { + return nil, errors.Wrapf(err, "get compaction group tasks: %s", g.Key()) + } + if len(groupTasks) > 0 { + allGroupTasks = append(allGroupTasks, groupTasks) + numTasks += len(groupTasks) + } + } + + tasksLimit := c.concurrency + // Make sure we plan at least one task from each group. + if tasksLimit < len(allGroupTasks) { + tasksLimit = len(allGroupTasks) + } + + // If there aren't enough tasks across all groups, plan all available tasks. + if numTasks < tasksLimit { + tasksLimit = numTasks + } + + // Distribute tasks from all groups in a round-robin manner until we + // reach the concurrency limit. + tasks := make([]GroupCompactionTask, 0) + for len(tasks) < tasksLimit { + for i, groupTasks := range allGroupTasks { + if len(groupTasks) == 0 { + continue + } + tasks = append(tasks, groupTasks[0]) + allGroupTasks[i] = allGroupTasks[i][1:] + } + } + return tasks, nil +} + var _ block.MetadataFilter = &GatherNoCompactionMarkFilter{} // GatherNoCompactionMarkFilter is a block.Fetcher filter that passes all metas. While doing it, it gathers all no-compact-mark.json markers. diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 485753432e..c3ea353f63 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -171,15 +171,23 @@ func MetricCount(c prometheus.Collector) int { } func TestGroupCompactE2E(t *testing.T) { - testGroupCompactE2e(t, nil) + testGroupCompactE2e(t, nil, 2) +} + +func TestGroupCompactE2EWithoutConcurrency(t *testing.T) { + testGroupCompactE2e(t, nil, 1) +} + +func TestGroupCompactE2EWithHighConcurrency(t *testing.T) { + testGroupCompactE2e(t, nil, 20) } // Penalty based merger should get the same result as the blocks don't have overlap. func TestGroupCompactPenaltyDedupE2E(t *testing.T) { - testGroupCompactE2e(t, dedup.NewChunkSeriesMerger()) + testGroupCompactE2e(t, dedup.NewChunkSeriesMerger(), 2) } -func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMergeFunc) { +func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMergeFunc, concurrency int) { objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() @@ -212,7 +220,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg planner := NewPlanner(logger, []int64{1000, 3000}, noCompactMarkerFilter) grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc, 10, 10) - bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2, true) + bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, concurrency, true) testutil.Ok(t, err) // Compaction on empty should not fail. @@ -330,13 +338,13 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(metas[4].Thanos.GroupKey()))) testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(metas[5].Thanos.GroupKey()))) testutil.Equals(t, 4, MetricCount(grouper.compactionRunsStarted)) - testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(metas[0].Thanos.GroupKey()))) - testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(metas[7].Thanos.GroupKey()))) + testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(metas[0].Thanos.GroupKey()))) + testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(metas[7].Thanos.GroupKey()))) testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(metas[4].Thanos.GroupKey()))) testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(metas[5].Thanos.GroupKey()))) testutil.Equals(t, 4, MetricCount(grouper.compactionRunsCompleted)) - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(metas[0].Thanos.GroupKey()))) - testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(metas[7].Thanos.GroupKey()))) + testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(metas[0].Thanos.GroupKey()))) + testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(metas[7].Thanos.GroupKey()))) testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(metas[4].Thanos.GroupKey()))) testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(metas[5].Thanos.GroupKey()))) testutil.Equals(t, 4, MetricCount(grouper.compactionFailures)) diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index d5485f02bb..6ca8f74239 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -451,6 +451,21 @@ func TestCompactProgressCalculate(t *testing.T) { }, }, }, + { + testName: "multiple_vertical_compactions", + input: []*metadata.Meta{ + createBlockMeta(1, 0, 10, map[string]string{"a": "1"}, 0, []uint64{}), + createBlockMeta(2, 5, 15, map[string]string{"a": "1"}, 0, []uint64{}), + createBlockMeta(3, 20, 30, map[string]string{"a": "1"}, 0, []uint64{}), + createBlockMeta(4, 25, 40, map[string]string{"a": "1"}, 0, []uint64{}), + }, + expected: map[string]planResult{ + keys[0]: { + compactionRuns: 2.0, + compactionBlocks: 4.0, + }, + }, + }, } { if ok := t.Run(tcase.testName, func(t *testing.T) { blocks := make(map[ulid.ULID]*metadata.Meta, len(tcase.input)) diff --git a/pkg/compact/planner.go b/pkg/compact/planner.go index 5c2a93df8d..6f767e6aba 100644 --- a/pkg/compact/planner.go +++ b/pkg/compact/planner.go @@ -8,6 +8,7 @@ import ( "fmt" "math" "path/filepath" + "sort" "github.com/go-kit/log" "github.com/oklog/ulid" @@ -45,15 +46,23 @@ func NewTSDBBasedPlanner(logger log.Logger, ranges []int64) *tsdbBasedPlanner { // NewPlanner is a default Thanos planner with the same functionality as Prometheus' TSDB plus special handling of excluded blocks. // It's the same functionality just without accessing filesystem, and special handling of excluded blocks. func NewPlanner(logger log.Logger, ranges []int64, noCompBlocks *GatherNoCompactionMarkFilter) *tsdbBasedPlanner { - return &tsdbBasedPlanner{logger: logger, ranges: ranges, noCompBlocksFunc: noCompBlocks.NoCompactMarkedBlocks} + return &tsdbBasedPlanner{ + logger: logger, + ranges: ranges, + noCompBlocksFunc: noCompBlocks.NoCompactMarkedBlocks, + } } // TODO(bwplotka): Consider smarter algorithm, this prefers smaller iterative compactions vs big single one: https://github.com/thanos-io/thanos/issues/3405 -func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { +func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]CompactionTask, error) { return p.plan(p.noCompBlocksFunc(), metasByMinTime) } -func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { +// CompactionTask is a set of blocks that should be compacted together in a single compaction run. +// Multiple compaction tasks can be run in parallel even within a single compaction group. +type CompactionTask []*metadata.Meta + +func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) ([]CompactionTask, error) { notExcludedMetasByMinTime := make([]*metadata.Meta, 0, len(metasByMinTime)) for _, meta := range metasByMinTime { if _, excluded := noCompactMarked[meta.ULID]; excluded { @@ -62,9 +71,9 @@ func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompac notExcludedMetasByMinTime = append(notExcludedMetasByMinTime, meta) } - res := selectOverlappingMetas(notExcludedMetasByMinTime) - if len(res) > 0 { - return res, nil + verticalCompactions := selectOverlappingMetas(notExcludedMetasByMinTime) + if len(verticalCompactions) > 0 { + return verticalCompactions, nil } // No overlapping blocks, do compaction the usual way. @@ -74,9 +83,9 @@ func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompac notExcludedMetasByMinTime = notExcludedMetasByMinTime[:len(notExcludedMetasByMinTime)-1] } metasByMinTime = metasByMinTime[:len(metasByMinTime)-1] - res = append(res, selectMetas(p.ranges, noCompactMarked, metasByMinTime)...) + res := selectMetas(p.ranges, noCompactMarked, metasByMinTime) if len(res) > 0 { - return res, nil + return []CompactionTask{res}, nil } // Compact any blocks with big enough time range that have >5% tombstones. @@ -86,7 +95,8 @@ func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompac break } if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 { - return []*metadata.Meta{notExcludedMetasByMinTime[i]}, nil + task := []*metadata.Meta{notExcludedMetasByMinTime[i]} + return []CompactionTask{task}, nil } } @@ -155,28 +165,62 @@ func selectMetas(ranges []int64, noCompactMarked map[ulid.ULID]*metadata.NoCompa // selectOverlappingMetas returns all dirs with overlapping time ranges. // It expects sorted input by mint and returns the overlapping dirs in the same order as received. // Copied and adjusted from https://github.com/prometheus/prometheus/blob/3d8826a3d42566684283a9b7f7e812e412c24407/tsdb/compact.go#L268. -func selectOverlappingMetas(metasByMinTime []*metadata.Meta) []*metadata.Meta { +func selectOverlappingMetas(metasByMinTime []*metadata.Meta) []CompactionTask { if len(metasByMinTime) < 2 { return nil } - var overlappingMetas []*metadata.Meta - globalMaxt := metasByMinTime[0].MaxTime - for i, m := range metasByMinTime[1:] { - if m.MinTime < globalMaxt { - if len(overlappingMetas) == 0 { - // When it is the first overlap, need to add the last one as well. - overlappingMetas = append(overlappingMetas, metasByMinTime[i]) + + groups := make([][]*metadata.Meta, len(metasByMinTime)) + for i, m := range metasByMinTime { + groups[i] = []*metadata.Meta{m} + } + + // Iterate through all metas and merge overlapping blocks into groups. + // We compare each block's max time with the next block's min time. + // If we detect an overlap, we merge all blocks from the current block's group + // into the group of the next block. + // We also adjust the head of the next group's head (zero-th element) to be the block with the + // highest max-time. This allows groups to be used as heaps, and helps us detect + // overlaps when the next block is contained in the current block. + // See test case https://github.com/thanos-io/thanos/blob/04106d7a7add7f47025c00422c80f746650c1b97/pkg/compact/planner_test.go#L310-L321. +loopMetas: + for i := range metasByMinTime { + currentGroup := groups[i] + currentBlock := currentGroup[0] + for j := i + 1; j < len(groups); j++ { + if currentBlock.MaxTime <= groups[j][0].MinTime { + continue } - overlappingMetas = append(overlappingMetas, m) - } else if len(overlappingMetas) > 0 { - break + // If the current block has an overlap with the next block, + // merge the current block's group of the overlapping block's group. + for _, blockToMerge := range currentGroup { + groups[j] = append(groups[j], blockToMerge) + // Set the block with the highest max time as the head of the group. + if blockToMerge.MaxTime > groups[j][0].MaxTime { + n := len(groups[j]) - 1 + groups[j][0], groups[j][n] = groups[j][n], groups[j][0] + } + } + // Empty the current block's group. + groups[i] = nil + + // Move on to the next block. + continue loopMetas } + } - if m.MaxTime > globalMaxt { - globalMaxt = m.MaxTime + overlappingGroups := make([]CompactionTask, 0, len(groups)) + for _, group := range groups { + if len(group) < 2 { + continue } + sort.Slice(group, func(i, j int) bool { + return group[i].MinTime < group[j].MinTime + }) + overlappingGroups = append(overlappingGroups, group) } - return overlappingMetas + + return overlappingGroups } // splitByRange splits the directories by the time range. The range sequence starts at 0. @@ -243,7 +287,7 @@ func WithLargeTotalIndexSizeFilter(with *tsdbBasedPlanner, bkt objstore.Bucket, return &largeTotalIndexSizeFilter{tsdbBasedPlanner: with, bkt: bkt, totalMaxIndexSizeBytes: totalMaxIndexSizeBytes, markedForNoCompact: markedForNoCompact} } -func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { +func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]CompactionTask, error) { noCompactMarked := t.noCompBlocksFunc() copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked)) for k, v := range noCompactMarked { @@ -252,54 +296,56 @@ func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []* PlanLoop: for { - plan, err := t.plan(copiedNoCompactMarked, metasByMinTime) + tasks, err := t.plan(copiedNoCompactMarked, metasByMinTime) if err != nil { return nil, err } - var totalIndexBytes, maxIndexSize int64 = 0, math.MinInt64 - var biggestIndex int - for i, p := range plan { - indexSize := int64(-1) - for _, f := range p.Thanos.Files { - if f.RelPath == block.IndexFilename { - indexSize = f.SizeBytes + for _, task := range tasks { + var totalIndexBytes, maxIndexSize int64 = 0, math.MinInt64 + var biggestIndex int + for i, meta := range task { + indexSize := int64(-1) + for _, f := range meta.Thanos.Files { + if f.RelPath == block.IndexFilename { + indexSize = f.SizeBytes + } } - } - if indexSize <= 0 { - // Get size from bkt instead. - attr, err := t.bkt.Attributes(ctx, filepath.Join(p.ULID.String(), block.IndexFilename)) - if err != nil { - return nil, errors.Wrapf(err, "get attr of %v", filepath.Join(p.ULID.String(), block.IndexFilename)) + if indexSize <= 0 { + // Get size from bkt instead. + attr, err := t.bkt.Attributes(ctx, filepath.Join(meta.ULID.String(), block.IndexFilename)) + if err != nil { + return nil, errors.Wrapf(err, "get attr of %v", filepath.Join(meta.ULID.String(), block.IndexFilename)) + } + indexSize = attr.Size } - indexSize = attr.Size - } - if maxIndexSize < indexSize { - maxIndexSize = indexSize - biggestIndex = i - } - totalIndexBytes += indexSize - // Leave 15% headroom for index compaction bloat. - if totalIndexBytes >= int64(float64(t.totalMaxIndexSizeBytes)*0.85) { - // Marking blocks for no compact to limit size. - // TODO(bwplotka): Make sure to reset cache once this is done: https://github.com/thanos-io/thanos/issues/3408 - if err := block.MarkForNoCompact( - ctx, - t.logger, - t.bkt, - plan[biggestIndex].ULID, - metadata.IndexSizeExceedingNoCompactReason, - fmt.Sprintf("largeTotalIndexSizeFilter: Total compacted block's index size could exceed: %v with this block. See https://github.com/thanos-io/thanos/issues/1424", t.totalMaxIndexSizeBytes), - t.markedForNoCompact, - ); err != nil { - return nil, errors.Wrapf(err, "mark %v for no compaction", plan[biggestIndex].ULID.String()) + if maxIndexSize < indexSize { + maxIndexSize = indexSize + biggestIndex = i + } + totalIndexBytes += indexSize + // Leave 15% headroom for index compaction bloat. + if totalIndexBytes >= int64(float64(t.totalMaxIndexSizeBytes)*0.85) { + // Marking blocks for no compact to limit size. + // TODO(bwplotka): Make sure to reset cache once this is done: https://github.com/thanos-io/thanos/issues/3408 + if err := block.MarkForNoCompact( + ctx, + t.logger, + t.bkt, + task[biggestIndex].ULID, + metadata.IndexSizeExceedingNoCompactReason, + fmt.Sprintf("largeTotalIndexSizeFilter: Total compacted block's index size could exceed: %v with this block. See https://github.com/thanos-io/thanos/issues/1424", t.totalMaxIndexSizeBytes), + t.markedForNoCompact, + ); err != nil { + return nil, errors.Wrapf(err, "mark %v for no compaction", task[biggestIndex].ULID.String()) + } + // Make sure wrapped planner exclude this block. + copiedNoCompactMarked[task[biggestIndex].ULID] = &metadata.NoCompactMark{ID: task[biggestIndex].ULID, Version: metadata.NoCompactMarkVersion1} + continue PlanLoop } - // Make sure wrapped planner exclude this block. - copiedNoCompactMarked[plan[biggestIndex].ULID] = &metadata.NoCompactMark{ID: plan[biggestIndex].ULID, Version: metadata.NoCompactMarkVersion1} - continue PlanLoop } } // Planned blocks should not exceed limit. - return plan, nil + return tasks, nil } } diff --git a/pkg/compact/planner_test.go b/pkg/compact/planner_test.go index d5c253be72..6474c59641 100644 --- a/pkg/compact/planner_test.go +++ b/pkg/compact/planner_test.go @@ -30,7 +30,7 @@ type tsdbPlannerAdapter struct { comp tsdb.Compactor } -func (p *tsdbPlannerAdapter) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { +func (p *tsdbPlannerAdapter) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]CompactionTask, error) { // TSDB planning works based on the meta.json files in the given dir. Mock it up. for _, meta := range metasByMinTime { bdir := filepath.Join(p.dir, meta.ULID.String()) @@ -46,6 +46,10 @@ func (p *tsdbPlannerAdapter) Plan(_ context.Context, metasByMinTime []*metadata. return nil, err } + if len(plan) == 0 { + return nil, nil + } + var res []*metadata.Meta for _, pdir := range plan { meta, err := metadata.ReadFromDir(pdir) @@ -54,7 +58,7 @@ func (p *tsdbPlannerAdapter) Plan(_ context.Context, metasByMinTime []*metadata. } res = append(res, meta) } - return res, nil + return []CompactionTask{res}, nil } // Adapted from https://github.com/prometheus/prometheus/blob/6c56a1faaaad07317ff585bda75b99bdba0517ad/tsdb/compact_test.go#L167 @@ -76,7 +80,7 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { for _, c := range []struct { name string metas []*metadata.Meta - expected []*metadata.Meta + expected []CompactionTask }{ { name: "Outside range", @@ -108,11 +112,11 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, }, - expected: []*metadata.Meta{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, - }, + }}, }, { name: "There are blocks to fill the entire 2nd parent range.", @@ -123,10 +127,12 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(9, nil), MinTime: 180, MaxTime: 200}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(10, nil), MinTime: 200, MaxTime: 220}}, }, - expected: []*metadata.Meta{ - {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 0, MaxTime: 60}}, - {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 60, MaxTime: 120}}, - {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 120, MaxTime: 180}}, + expected: []CompactionTask{ + { + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 0, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 60, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 120, MaxTime: 180}}, + }, }, }, { @@ -147,10 +153,10 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 80, MaxTime: 100}}, }, - expected: []*metadata.Meta{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, - }, + }}, }, { name: "We have 20, 20, 20, 60, 60 range blocks. '5' is marked as fresh one", @@ -161,11 +167,11 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 120, MaxTime: 180}}, }, - expected: []*metadata.Meta{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, - }, + }}, }, { name: "There are blocks to fill the entire 2nd parent range, but there is a gap", @@ -175,10 +181,10 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(9, nil), MinTime: 180, MaxTime: 200}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(10, nil), MinTime: 200, MaxTime: 220}}, }, - expected: []*metadata.Meta{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 0, MaxTime: 60}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 120, MaxTime: 180}}, - }, + }}, }, { name: "We have 20, 60, 20, 60, 240 range blocks. We can compact 20 + 60 + 60", @@ -189,20 +195,18 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 120, MaxTime: 180}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 720, MaxTime: 960}}, }, - expected: []*metadata.Meta{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 120, MaxTime: 180}}, - }, + }}, }, { name: "Do not select large blocks that have many tombstones when there is no fresh block", - metas: []*metadata.Meta{ - {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 540, Stats: tsdb.BlockStats{ - NumSeries: 10, - NumTombstones: 3, - }}}, - }, + metas: []*metadata.Meta{{BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 540, Stats: tsdb.BlockStats{ + NumSeries: 10, + NumTombstones: 3, + }}}}, }, { name: "Select large blocks that have many tombstones when fresh appears", @@ -213,10 +217,12 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { }}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 540, MaxTime: 560}}, }, - expected: []*metadata.Meta{{BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 540, Stats: tsdb.BlockStats{ - NumSeries: 10, - NumTombstones: 3, - }}}}, + expected: []CompactionTask{{ + {BlockMeta: tsdb.BlockMeta{ + Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 540, Stats: tsdb.BlockStats{ + NumSeries: 10, + NumTombstones: 3, + }}}}}, }, { name: "For small blocks, do not compact tombstones, even when fresh appears.", @@ -251,10 +257,10 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 360, MaxTime: 420}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 420, MaxTime: 540}}, }, - expected: []*metadata.Meta{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 360, MaxTime: 420}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 420, MaxTime: 540}}, - }, + }}, }, // |--------------| // |----------------| @@ -266,10 +272,10 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 19, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, }, - expected: []*metadata.Meta{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 19, MaxTime: 40}}, - }, + }}, }, // |--------------| // |--------------| @@ -281,10 +287,10 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 30, MaxTime: 50}}, }, - expected: []*metadata.Meta{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 30, MaxTime: 50}}, - }, + }}, }, // |--------------| // |---------------------| @@ -296,11 +302,11 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 10, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 30, MaxTime: 50}}, }, - expected: []*metadata.Meta{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 10, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 30, MaxTime: 50}}, - }, + }}, }, // |--------------| // |--------------------------------| @@ -314,35 +320,46 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 360, MaxTime: 420}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 420, MaxTime: 540}}, }, - expected: []*metadata.Meta{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 0, MaxTime: 360}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 340, MaxTime: 560}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 360, MaxTime: 420}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 420, MaxTime: 540}}, - }, + }}, }, // |--------------| // |--------------| - // |--------------| - // |--------------| + // |--------------| + // |--------------| + // |--------------| { - name: "Overlapping blocks 5", + name: "Multiple independent overlapping blocks", metas: []*metadata.Meta{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 10}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 9, MaxTime: 20}}, - {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 30, MaxTime: 40}}, - {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 39, MaxTime: 50}}, - }, - expected: []*metadata.Meta{ - {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 10}}, - {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 9, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 17, MaxTime: 35}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 35, MaxTime: 50}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 47, MaxTime: 60}}, + }, + expected: []CompactionTask{ + { + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 10}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 9, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 17, MaxTime: 35}}, + }, + { + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 35, MaxTime: 50}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 47, MaxTime: 60}}, + }, }, }, } { t.Run(c.name, func(t *testing.T) { for _, e := range c.expected { - // Add here to avoid boilerplate. - e.Thanos.Labels = make(map[string]string) + for _, meta := range e { + // Add here to avoid boilerplate. + meta.Thanos.Labels = make(map[string]string) + } } for _, e := range c.metas { // Add here to avoid boilerplate. @@ -366,7 +383,11 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { tsdbPlanner.dir = dir plan, err := tsdbPlanner.Plan(context.Background(), metasByMinTime) testutil.Ok(t, err) - testutil.Equals(t, c.expected, plan) + if len(c.expected) == 0 { + testutil.Equals(t, len(plan), 0) + } else { + testutil.Equals(t, c.expected[0], plan[0]) + } }) t.Run("tsdbBasedPlanner", func(t *testing.T) { metasByMinTime := make([]*metadata.Meta, len(c.metas)) @@ -442,12 +463,12 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { tsdbPlanner.dir = dir plan, err := tsdbPlanner.Plan(context.Background(), c.metas) testutil.Ok(t, err) - testutil.Equals(t, []*metadata.Meta(nil), plan) + testutil.Equals(t, []CompactionTask(nil), plan) }) t.Run("tsdbBasedPlanner", func(t *testing.T) { plan, err := tsdbBasedPlanner.Plan(context.Background(), c.metas) testutil.Ok(t, err) - testutil.Equals(t, []*metadata.Meta(nil), plan) + testutil.Equals(t, []CompactionTask(nil), plan) }) }) } @@ -470,7 +491,7 @@ func TestTSDBBasedPlanner_PlanWithNoCompactMarks(t *testing.T) { metas []*metadata.Meta noCompactMarks map[ulid.ULID]*metadata.NoCompactMark - expected []*metadata.Meta + expected []CompactionTask }{ { name: "Outside range and excluded", @@ -492,10 +513,10 @@ func TestTSDBBasedPlanner_PlanWithNoCompactMarks(t *testing.T) { noCompactMarks: map[ulid.ULID]*metadata.NoCompactMark{ ulid.MustNew(1, nil): {}, }, - expected: []*metadata.Meta{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, - }, + }}, }, { name: "Blocks to fill the entire parent, but with second one excluded.", @@ -520,11 +541,11 @@ func TestTSDBBasedPlanner_PlanWithNoCompactMarks(t *testing.T) { noCompactMarks: map[ulid.ULID]*metadata.NoCompactMark{ ulid.MustNew(4, nil): {}, }, - expected: []*metadata.Meta{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, - }, + }}, }, { name: "Blocks to fill the entire parent, but with last one fist excluded.", @@ -538,10 +559,10 @@ func TestTSDBBasedPlanner_PlanWithNoCompactMarks(t *testing.T) { ulid.MustNew(1, nil): {}, ulid.MustNew(4, nil): {}, }, - expected: []*metadata.Meta{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, - }, + }}, }, { name: "Blocks to fill the entire parent, but with all of them excluded.", @@ -583,10 +604,10 @@ func TestTSDBBasedPlanner_PlanWithNoCompactMarks(t *testing.T) { noCompactMarks: map[ulid.ULID]*metadata.NoCompactMark{ ulid.MustNew(6, nil): {}, }, - expected: []*metadata.Meta{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, - }, + }}, }, { name: "We have 20, 60, 20, 60, 240 range blocks. We could compact 20 + 60 + 60, but 4th is excluded", @@ -664,7 +685,7 @@ func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { name string metas []*metadata.Meta - expected []*metadata.Meta + expected []CompactionTask expectedMarks float64 }{ { @@ -688,10 +709,10 @@ func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, }, expectedMarks: 1, - expected: []*metadata.Meta{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, - }, + }}, }, { name: "Blocks to fill the entire parent, but with second one too large.", @@ -719,11 +740,11 @@ func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 90}}}, BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, }, - expected: []*metadata.Meta{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, - }, + }}, }, { name: "Blocks to fill the entire parent, but with pre-last one and first too large.", @@ -739,10 +760,10 @@ func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 90}}}, BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 60, MaxTime: 80}}, }, - expected: []*metadata.Meta{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 50}}, - }, + }}, expectedMarks: 2, }, { @@ -774,10 +795,10 @@ func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 720, MaxTime: 960}}, }, - expected: []*metadata.Meta{ + expected: []CompactionTask{{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, - }, + }}, expectedMarks: 1, }, // |--------------| @@ -795,6 +816,31 @@ func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { }, expectedMarks: 1, }, + // |--------------| + // |----------------| + // |--------------| + // |----------------| + // |--------------| + { + name: "Two groups of overlapping blocks, first group total is too large", + metas: []*metadata.Meta{ + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 90}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 19, MaxTime: 40}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 70, MaxTime: 80}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 75, MaxTime: 90}}, + }, + expectedMarks: 1, + expected: []CompactionTask{{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 70, MaxTime: 80}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 75, MaxTime: 90}}, + }}, + }, } { if !t.Run(c.name, func(t *testing.T) { t.Run("from meta", func(t *testing.T) { @@ -817,9 +863,11 @@ func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { plan, err := planner.Plan(context.Background(), metasByMinTime) testutil.Ok(t, err) - for _, m := range plan { - // For less boilerplate. - m.Thanos = metadata.Thanos{} + for _, task := range plan { + for _, m := range task { + // For less boilerplate. + m.Thanos = metadata.Thanos{} + } } testutil.Equals(t, c.expected, plan) testutil.Equals(t, c.expectedMarks, promtest.ToFloat64(marked)-lastMarkValue) diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index 5686d6afaf..4f758b8b31 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -649,8 +649,8 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) { testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(0), "thanos_compact_group_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(0), "thanos_compact_group_vertical_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(1), "thanos_compact_group_compactions_failures_total")) - testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(2), "thanos_compact_group_compaction_runs_started_total")) - testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(1), "thanos_compact_group_compaction_runs_completed_total")) + testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(1), "thanos_compact_group_compaction_runs_started_total")) + testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(0), "thanos_compact_group_compaction_runs_completed_total")) // However, the blocks have been cleaned because that happens concurrently. testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(2), "thanos_compact_aborted_partial_uploads_deletion_attempts_total")) @@ -702,8 +702,8 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) { testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(6), "thanos_compact_group_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(3), "thanos_compact_group_vertical_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(0), "thanos_compact_group_compactions_failures_total")) - testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(14), "thanos_compact_group_compaction_runs_started_total")) - testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(14), "thanos_compact_group_compaction_runs_completed_total")) + testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(6), "thanos_compact_group_compaction_runs_started_total")) + testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(6), "thanos_compact_group_compaction_runs_completed_total")) testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(2), "thanos_compact_downsample_total")) testutil.Ok(t, c.WaitSumMetrics(e2emon.Equals(0), "thanos_compact_downsample_failures_total")) @@ -723,7 +723,7 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) { operationMatcher, err := matchers.NewMatcher(matchers.MatchEqual, "operation", "get") testutil.Ok(t, err) testutil.Ok(t, c.WaitSumMetricsWithOptions( - e2emon.Equals(573), + e2emon.Equals(635), []string{"thanos_objstore_bucket_operations_total"}, e2emon.WithLabelMatchers( bucketMatcher, operationMatcher, @@ -773,8 +773,8 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) { testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"thanos_compact_group_compactions_total"}, e2emon.WaitMissingMetrics())) testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"thanos_compact_group_vertical_compactions_total"}, e2emon.WaitMissingMetrics())) testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"thanos_compact_group_compactions_failures_total"}, e2emon.WaitMissingMetrics())) - testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Equals(7), []string{"thanos_compact_group_compaction_runs_started_total"}, e2emon.WaitMissingMetrics())) - testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Equals(7), []string{"thanos_compact_group_compaction_runs_completed_total"}, e2emon.WaitMissingMetrics())) + testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"thanos_compact_group_compaction_runs_started_total"}, e2emon.WaitMissingMetrics())) + testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"thanos_compact_group_compaction_runs_completed_total"}, e2emon.WaitMissingMetrics())) testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"thanos_compact_downsample_total"}, e2emon.WaitMissingMetrics())) testutil.Ok(t, c.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"thanos_compact_downsample_failures_total"}, e2emon.WaitMissingMetrics()))