Skip to content

Commit

Permalink
Add vertical compaction check
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski committed Dec 4, 2022
1 parent d754370 commit 24fd336
Showing 1 changed file with 19 additions and 13 deletions.
32 changes: 19 additions & 13 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,11 +763,6 @@ type GroupCompactionTask struct {
// Compact plans and runs a single compaction against the group. The compacted result
// is uploaded into the bucket the blocks were retrieved from.
func (task *GroupCompactionTask) Compact(ctx context.Context, comp Compactor) (shouldRerun bool, cerr error) {
overlappingBlocks := false
if err := task.Group.areBlocksOverlapping(nil); err != nil {
overlappingBlocks = true
}

task.Group.compactionRunsStarted.Inc()
defer func() {
// Leave the compact directory for inspection if it is a halt error
Expand All @@ -782,12 +777,23 @@ func (task *GroupCompactionTask) Compact(ctx context.Context, comp Compactor) (s
}
}()

hasOverlappingBlocks := false
if err := task.Group.areBlocksOverlapping(task.Blocks, task.Group.metasByMinTime...); err != nil {
// TODO(bwplotka): It would really nice if we could still check for other overlaps than replica. In fact this should be checked
// in syncer itself. Otherwise with vertical compaction enabled we will sacrifice this important check.
if !task.Group.enableVerticalCompaction {
return false, halt(errors.Wrap(err, "pre compaction overlap check"))
}

hasOverlappingBlocks = true
}

if err := os.MkdirAll(task.Dir, 0750); err != nil {
return false, errors.Wrap(err, "create compaction group dir")
}

terr := tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) (cerr error) {
shouldRerun, cerr = task.Group.compactBlocks(ctx, task.Dir, task.Blocks, comp, overlappingBlocks)
shouldRerun, cerr = task.Group.compactBlocks(ctx, task.Dir, task.Blocks, comp, hasOverlappingBlocks)
return cerr
}, opentracing.Tags{"group.key": task.Group.Key()})
if terr != nil {
Expand Down Expand Up @@ -900,7 +906,7 @@ func IsRetryError(err error) bool {
return ok
}

func (cg *Group) areBlocksOverlapping(include *metadata.Meta, exclude ...*metadata.Meta) error {
func (cg *Group) areBlocksOverlapping(include []*metadata.Meta, exclude ...*metadata.Meta) error {
var (
metas []tsdb.BlockMeta
excludeMap = map[ulid.ULID]struct{}{}
Expand All @@ -917,8 +923,8 @@ func (cg *Group) areBlocksOverlapping(include *metadata.Meta, exclude ...*metada
metas = append(metas, m.BlockMeta)
}

if include != nil {
metas = append(metas, include.BlockMeta)
for _, meta := range include {
metas = append(metas, meta.BlockMeta)
}

sort.Slice(metas, func(i, j int) bool {
Expand Down Expand Up @@ -1006,7 +1012,7 @@ func (cg *Group) GetCompactionTasks(ctx context.Context, dir string, planner Pla
return tasks, nil
}

func (cg *Group) compactBlocks(ctx context.Context, dir string, blocks CompactionTask, comp Compactor, overlappingBlocks bool) (bool, error) {
func (cg *Group) compactBlocks(ctx context.Context, dir string, blocks CompactionTask, comp Compactor, hasOverlappingBlocks bool) (bool, error) {
// Once we have a plan we need to download the actual data.
compactionBegin := time.Now()
begin := compactionBegin
Expand Down Expand Up @@ -1087,11 +1093,11 @@ func (cg *Group) compactBlocks(ctx context.Context, dir string, blocks Compactio
return true, nil
}
cg.compactions.Inc()
if overlappingBlocks {
if hasOverlappingBlocks {
cg.verticalCompactions.Inc()
}
level.Info(cg.logger).Log("msg", "compacted blocks", "new", compID,
"blocks", sourceBlockStr, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "overlapping_blocks", overlappingBlocks)
"blocks", sourceBlockStr, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "overlapping_blocks", hasOverlappingBlocks)

bdir := filepath.Join(dir, compID.String())
index := filepath.Join(bdir, block.IndexFilename)
Expand Down Expand Up @@ -1121,7 +1127,7 @@ func (cg *Group) compactBlocks(ctx context.Context, dir string, blocks Compactio
// Ensure the output block is not overlapping with anything else,
// unless vertical compaction is enabled.
if !cg.enableVerticalCompaction {
if err := cg.areBlocksOverlapping(newMeta, blocks...); err != nil {
if err := cg.areBlocksOverlapping([]*metadata.Meta{newMeta}, blocks...); err != nil {
return false, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir))
}
}
Expand Down

0 comments on commit 24fd336

Please sign in to comment.