From 24fd3361a05ddc670cee494710b5bb5c3ea53882 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Sun, 4 Dec 2022 12:47:57 +0100 Subject: [PATCH] Add vertical compaction check Signed-off-by: Filip Petkovski --- pkg/compact/compact.go | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index ab32335df7..bd13f4fd00 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -763,11 +763,6 @@ type GroupCompactionTask struct { // Compact plans and runs a single compaction against the group. The compacted result // is uploaded into the bucket the blocks were retrieved from. func (task *GroupCompactionTask) Compact(ctx context.Context, comp Compactor) (shouldRerun bool, cerr error) { - overlappingBlocks := false - if err := task.Group.areBlocksOverlapping(nil); err != nil { - overlappingBlocks = true - } - task.Group.compactionRunsStarted.Inc() defer func() { // Leave the compact directory for inspection if it is a halt error @@ -782,12 +777,23 @@ func (task *GroupCompactionTask) Compact(ctx context.Context, comp Compactor) (s } }() + hasOverlappingBlocks := false + if err := task.Group.areBlocksOverlapping(task.Blocks, task.Group.metasByMinTime...); err != nil { + // TODO(bwplotka): It would really nice if we could still check for other overlaps than replica. In fact this should be checked + // in syncer itself. Otherwise with vertical compaction enabled we will sacrifice this important check. + if !task.Group.enableVerticalCompaction { + return false, halt(errors.Wrap(err, "pre compaction overlap check")) + } + + hasOverlappingBlocks = true + } + if err := os.MkdirAll(task.Dir, 0750); err != nil { return false, errors.Wrap(err, "create compaction group dir") } terr := tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) (cerr error) { - shouldRerun, cerr = task.Group.compactBlocks(ctx, task.Dir, task.Blocks, comp, overlappingBlocks) + shouldRerun, cerr = task.Group.compactBlocks(ctx, task.Dir, task.Blocks, comp, hasOverlappingBlocks) return cerr }, opentracing.Tags{"group.key": task.Group.Key()}) if terr != nil { @@ -900,7 +906,7 @@ func IsRetryError(err error) bool { return ok } -func (cg *Group) areBlocksOverlapping(include *metadata.Meta, exclude ...*metadata.Meta) error { +func (cg *Group) areBlocksOverlapping(include []*metadata.Meta, exclude ...*metadata.Meta) error { var ( metas []tsdb.BlockMeta excludeMap = map[ulid.ULID]struct{}{} @@ -917,8 +923,8 @@ func (cg *Group) areBlocksOverlapping(include *metadata.Meta, exclude ...*metada metas = append(metas, m.BlockMeta) } - if include != nil { - metas = append(metas, include.BlockMeta) + for _, meta := range include { + metas = append(metas, meta.BlockMeta) } sort.Slice(metas, func(i, j int) bool { @@ -1006,7 +1012,7 @@ func (cg *Group) GetCompactionTasks(ctx context.Context, dir string, planner Pla return tasks, nil } -func (cg *Group) compactBlocks(ctx context.Context, dir string, blocks CompactionTask, comp Compactor, overlappingBlocks bool) (bool, error) { +func (cg *Group) compactBlocks(ctx context.Context, dir string, blocks CompactionTask, comp Compactor, hasOverlappingBlocks bool) (bool, error) { // Once we have a plan we need to download the actual data. compactionBegin := time.Now() begin := compactionBegin @@ -1087,11 +1093,11 @@ func (cg *Group) compactBlocks(ctx context.Context, dir string, blocks Compactio return true, nil } cg.compactions.Inc() - if overlappingBlocks { + if hasOverlappingBlocks { cg.verticalCompactions.Inc() } level.Info(cg.logger).Log("msg", "compacted blocks", "new", compID, - "blocks", sourceBlockStr, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "overlapping_blocks", overlappingBlocks) + "blocks", sourceBlockStr, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "overlapping_blocks", hasOverlappingBlocks) bdir := filepath.Join(dir, compID.String()) index := filepath.Join(bdir, block.IndexFilename) @@ -1121,7 +1127,7 @@ func (cg *Group) compactBlocks(ctx context.Context, dir string, blocks Compactio // Ensure the output block is not overlapping with anything else, // unless vertical compaction is enabled. if !cg.enableVerticalCompaction { - if err := cg.areBlocksOverlapping(newMeta, blocks...); err != nil { + if err := cg.areBlocksOverlapping([]*metadata.Meta{newMeta}, blocks...); err != nil { return false, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir)) } }