Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize vertical compactions inside a single group #5936

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 125 additions & 78 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,26 +542,28 @@ func (ps *CompactionProgressCalculator) ProgressCalculate(ctx context.Context, g
if len(plan) == 0 {
continue
}
groupCompactions[g.key]++

toRemove := make(map[ulid.ULID]struct{}, len(plan))
metas := make([]*tsdb.BlockMeta, 0, len(plan))
for _, p := range plan {
metas = append(metas, &p.BlockMeta)
toRemove[p.BlockMeta.ULID] = struct{}{}
}
g.deleteFromGroup(toRemove)
for _, groupTask := range plan {
groupCompactions[g.key]++
toRemove := make(map[ulid.ULID]struct{}, len(groupTask))
metas := make([]*tsdb.BlockMeta, 0, len(groupTask))
for _, meta := range groupTask {
metas = append(metas, &meta.BlockMeta)
toRemove[meta.BlockMeta.ULID] = struct{}{}
}
g.deleteFromGroup(toRemove)
groupBlocks[g.key] += len(groupTask)

groupBlocks[g.key] += len(plan)
newMeta := tsdb.CompactBlockMetas(ulid.MustNew(uint64(time.Now().Unix()), nil), metas...)
if err := g.AppendMeta(&metadata.Meta{BlockMeta: *newMeta, Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: g.Resolution()}, Labels: g.Labels().Map()}}); err != nil {
return errors.Wrapf(err, "append meta")
}
}

if len(g.metasByMinTime) == 0 {
continue
}

newMeta := tsdb.CompactBlockMetas(ulid.MustNew(uint64(time.Now().Unix()), nil), metas...)
if err := g.AppendMeta(&metadata.Meta{BlockMeta: *newMeta, Thanos: metadata.Thanos{Downsample: metadata.ThanosDownsample{Resolution: g.Resolution()}, Labels: g.Labels().Map()}}); err != nil {
return errors.Wrapf(err, "append meta")
}
tmpGroups = append(tmpGroups, g)
}

Expand Down Expand Up @@ -727,7 +729,7 @@ func (rs *RetentionProgressCalculator) ProgressCalculate(ctx context.Context, gr
type Planner interface {
// Plan returns a list of blocks that should be compacted into single one.
// The blocks can be overlapping. The provided metadata has to be ordered by minTime.
Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error)
Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]CompactionTask, error)
}

// Compactor provides compaction against an underlying storage of time series data.
Expand All @@ -751,7 +753,7 @@ type Compactor interface {

// Compact plans and runs a single compaction against the group. The compacted result
// is uploaded into the bucket the blocks were retrieved from.
func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, compID ulid.ULID, rerr error) {
func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, rerr error) {
cg.compactionRunsStarted.Inc()

subDir := filepath.Join(dir, cg.Key())
Expand All @@ -768,19 +770,19 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp
}()

if err := os.MkdirAll(subDir, 0750); err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ULID return value was not used anywhere which is why I've removed it here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to fit into Prometheus interface. Especially in Prometheus, empty ULID meant special behaviour when compacting blocks - we rely on it to check if compaction really compacted anything.

I think it's fine to change interface if you want - it's not super important to allow Prometheus to use Thanos compaction on anything like that. Just be careful with Cortex @yeya24 and Mimir @pracucci who might use this interface/our structs here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked on the Cortex side and we are not using it, too. I guess same for Mimir as I heard they are moving away from importing Thanos main repo directly.
I don't have a strong preference here. OK to clean it up or keep it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess same for Mimir as I heard they are moving away from importing Thanos main repo directly.

Thanks for asking! I confirm we're not running Thanos compactor or store-gateway anymore in Mimir.

return false, errors.Wrap(err, "create compaction group dir")
}

err := tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) (err error) {
shouldRerun, compID, err = cg.compact(ctx, subDir, planner, comp)
shouldRerun, err = cg.compact(ctx, subDir, planner, comp)
return err
}, opentracing.Tags{"group.key": cg.Key()})
if err != nil {
cg.compactionFailures.Inc()
return false, ulid.ULID{}, err
return false, err
}
cg.compactionRunsCompleted.Inc()
return shouldRerun, compID, nil
return shouldRerun, nil
}

// Issue347Error is a type wrapper for errors that should invoke repair process for broken block.
Expand Down Expand Up @@ -974,7 +976,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,
return nil
}

func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, compID ulid.ULID, _ error) {
func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, _ error) {
cg.mtx.Lock()
defer cg.mtx.Unlock()

Expand All @@ -984,45 +986,74 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
// TODO(bwplotka): It would really nice if we could still check for other overlaps than replica. In fact this should be checked
// in syncer itself. Otherwise with vertical compaction enabled we will sacrifice this important check.
if !cg.enableVerticalCompaction {
return false, ulid.ULID{}, halt(errors.Wrap(err, "pre compaction overlap check"))
return false, halt(errors.Wrap(err, "pre compaction overlap check"))
}

overlappingBlocks = true
}

var toCompact []*metadata.Meta
var tasks []CompactionTask
if err := tracing.DoInSpanWithErr(ctx, "compaction_planning", func(ctx context.Context) (e error) {
toCompact, e = planner.Plan(ctx, cg.metasByMinTime)
tasks, e = planner.Plan(ctx, cg.metasByMinTime)
return e
}); err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "plan compaction")
return false, errors.Wrap(err, "plan compaction")
}
if len(toCompact) == 0 {
if len(tasks) == 0 {
// Nothing to do.
return false, ulid.ULID{}, nil
return false, nil
}

level.Info(cg.logger).Log("msg", "compaction available and planned; downloading blocks", "plan", fmt.Sprintf("%v", toCompact))

// Due to #183 we verify that none of the blocks in the plan have overlapping sources.
// This is one potential source of how we could end up with duplicated chunks.
uniqueSources := map[ulid.ULID]struct{}{}
for _, task := range tasks {
for _, m := range task {
for _, s := range m.Compaction.Sources {
if _, ok := uniqueSources[s]; ok {
return false, halt(errors.Errorf("overlapping sources detected for plan %v", task))
}
uniqueSources[s] = struct{}{}
}
}
}

level.Info(cg.logger).Log("msg", "compaction available and planned; downloading blocks", "plan", fmt.Sprintf("%v", tasks))

var (
wg sync.WaitGroup
mu sync.Mutex
groupErr errutil.MultiError
rerunGroup bool
)
for _, task := range tasks {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how best to handle this, since we will have unbound concurrency. We can have a per-group concurrency in the short term, but that can still lead to one group slowing down everything else. Long term maybe we want a single queue for tasks so that we can have global concurrency for all tasks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might need that sooner than later - It's common for users to crash their compactor for even TWO compactions at the same time. Why not workers approach as usually? (e.g at max 5 compactions at one time)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also we have compactions on bigger laver (caller of this method I believe)- can we have one concurrency loop to not get too complex in terms of unpredictive concurrency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean max 5 parallel compactions inside a single group? Or 5 parallel vertical compactions across all groups? The former is easier to implement, the latter is better but will make this PR bigger :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have concurrency on a task level now, so users can also run one task at a time if they want to.

wg.Add(1)
go func(task CompactionTask) {
defer wg.Done()
rerunTask, err := cg.compactBlocks(ctx, dir, task, comp, overlappingBlocks)

mu.Lock()
defer mu.Unlock()
rerunGroup = rerunGroup || rerunTask
groupErr.Add(err)
}(task)
}
wg.Wait()

return rerunGroup, groupErr.Err()
}

func (cg *Group) compactBlocks(ctx context.Context, dir string, task CompactionTask, comp Compactor, overlappingBlocks bool) (bool, error) {
// Once we have a plan we need to download the actual data.
groupCompactionBegin := time.Now()
begin := groupCompactionBegin
compactionBegin := time.Now()
begin := compactionBegin

g, errCtx := errgroup.WithContext(ctx)
g.SetLimit(cg.compactBlocksFetchConcurrency)

toCompactDirs := make([]string, 0, len(toCompact))
for _, m := range toCompact {
toCompactDirs := make([]string, 0, len(task))
for _, m := range task {
bdir := filepath.Join(dir, m.ULID.String())
for _, s := range m.Compaction.Sources {
if _, ok := uniqueSources[s]; ok {
return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", toCompact))
}
uniqueSources[s] = struct{}{}
}
func(ctx context.Context, meta *metadata.Meta) {
g.Go(func() error {
if err := tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error {
Expand Down Expand Up @@ -1062,33 +1093,35 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp

toCompactDirs = append(toCompactDirs, bdir)
}

sourceBlockStr := fmt.Sprintf("%v", toCompactDirs)

if err := g.Wait(); err != nil {
return false, ulid.ULID{}, err
return false, err
}

level.Info(cg.logger).Log("msg", "downloaded and verified blocks; compacting blocks", "plan", sourceBlockStr, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())

var compID ulid.ULID
begin = time.Now()
if err := tracing.DoInSpanWithErr(ctx, "compaction", func(ctx context.Context) (e error) {
compID, e = comp.Compact(dir, toCompactDirs, nil)
return e
}); err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact blocks %v", toCompactDirs))
return false, halt(errors.Wrapf(err, "compact blocks %v", toCompactDirs))
}
if compID == (ulid.ULID{}) {
// Prometheus compactor found that the compacted block would have no samples.
level.Info(cg.logger).Log("msg", "compacted block would have no samples, deleting source blocks", "blocks", sourceBlockStr)
for _, meta := range toCompact {
for _, meta := range task {
if meta.Stats.NumSamples == 0 {
if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil {
level.Warn(cg.logger).Log("msg", "failed to mark for deletion an empty block found during compaction", "block", meta.ULID)
}
}
}
// Even though this block was empty, there may be more work to do.
return true, ulid.ULID{}, nil
return true, nil
}
cg.compactions.Inc()
if overlappingBlocks {
Expand All @@ -1107,26 +1140,26 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
SegmentFiles: block.GetSegmentFiles(bdir),
}, nil)
if err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "failed to finalize the block %s", bdir)
return false, errors.Wrapf(err, "failed to finalize the block %s", bdir)
}

if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "remove tombstones")
return false, errors.Wrap(err, "remove tombstones")
}

// Ensure the output block is valid.
err = tracing.DoInSpanWithErr(ctx, "compaction_verify_index", func(ctx context.Context) error {
return block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime)
})
if !cg.acceptMalformedIndex && err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir))
return false, halt(errors.Wrapf(err, "invalid result block %s", bdir))
}

// Ensure the output block is not overlapping with anything else,
// unless vertical compaction is enabled.
if !cg.enableVerticalCompaction {
if err := cg.areBlocksOverlapping(newMeta, toCompact...); err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir))
if err := cg.areBlocksOverlapping(newMeta, task...); err != nil {
return false, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir))
}
}

Expand All @@ -1136,26 +1169,26 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
return block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc, objstore.WithUploadConcurrency(cg.blockFilesConcurrency))
})
if err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "upload of %s failed", compID))
return false, retry(errors.Wrapf(err, "upload of %s failed", compID))
}
level.Info(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())

// Mark for deletion the blocks we just compacted from the group and bucket so they do not get included
// into the next planning cycle.
// Eventually the block we just uploaded should get synced into the group again (including sync-delay).
for _, meta := range toCompact {
for _, meta := range task {
err = tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error {
return cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String()))
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket"))
return false, retry(errors.Wrapf(err, "mark old block for deletion from bucket"))
}
cg.groupGarbageCollectedBlocks.Inc()
}

level.Info(cg.logger).Log("msg", "finished compacting blocks", "result_block", compID, "source_blocks", sourceBlockStr,
"duration", time.Since(groupCompactionBegin), "duration_ms", time.Since(groupCompactionBegin).Milliseconds())
return true, compID, nil
"duration", time.Since(compactionBegin), "duration_ms", time.Since(compactionBegin).Milliseconds())
return true, nil
}

func (cg *Group) deleteBlock(id ulid.ULID, bdir string) error {
Expand Down Expand Up @@ -1247,43 +1280,57 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
go func() {
defer wg.Done()
for g := range groupChan {
shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.planner, c.comp)
if err == nil {
shouldRerunGroup, compactErrs := g.Compact(workCtx, c.compactDir, c.planner, c.comp)
if compactErrs == nil {
if shouldRerunGroup {
mtx.Lock()
finishedAllGroups = false
mtx.Unlock()
}
continue
}
errs, ok := compactErrs.(errutil.NonNilMultiError)
if !ok {
errs = []error{compactErrs}
}

if IsIssue347Error(err) {
if err := RepairIssue347(workCtx, c.logger, c.bkt, c.sy.metrics.blocksMarkedForDeletion, err); err == nil {
mtx.Lock()
finishedAllGroups = false
mtx.Unlock()
continue
var nonRecoverableErrs errutil.MultiError
for _, err := range errs {
if IsIssue347Error(err) {
if err := RepairIssue347(workCtx, c.logger, c.bkt, c.sy.metrics.blocksMarkedForDeletion, err); err == nil {
mtx.Lock()
finishedAllGroups = false
mtx.Unlock()
continue
}
}
}
// If block has out of order chunk and it has been configured to skip it,
// then we can mark the block for no compaction so that the next compaction run
// will skip it.
if IsOutOfOrderChunkError(err) && c.skipBlocksWithOutOfOrderChunks {
if err := block.MarkForNoCompact(
ctx,
c.logger,
c.bkt,
err.(OutOfOrderChunksError).id,
metadata.OutOfOrderChunksNoCompactReason,
"OutofOrderChunk: marking block with out-of-order series/chunks to as no compact to unblock compaction", g.blocksMarkedForNoCompact); err == nil {
mtx.Lock()
finishedAllGroups = false
mtx.Unlock()
continue

// If block has out of order chunk and it has been configured to skip it,
// then we can mark the block for no compaction so that the next compaction run
// will skip it.
if IsOutOfOrderChunkError(err) && c.skipBlocksWithOutOfOrderChunks {
if err := block.MarkForNoCompact(
ctx,
c.logger,
c.bkt,
err.(OutOfOrderChunksError).id,
metadata.OutOfOrderChunksNoCompactReason,
"OutofOrderChunk: marking block with out-of-order series/chunks to as no compact to unblock compaction", g.blocksMarkedForNoCompact,
); err == nil {
mtx.Lock()
finishedAllGroups = false
mtx.Unlock()
continue
}
}

nonRecoverableErrs.Add(err)
}

if nonRecoverableErrs.Err() != nil {
errChan <- errors.Wrapf(nonRecoverableErrs.Err(), "group %s", g.Key())
return
}
errChan <- errors.Wrapf(err, "group %s", g.Key())
return
}
}()
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,21 @@ func TestCompactProgressCalculate(t *testing.T) {
},
},
},
{
testName: "multiple_vertical_compactions",
input: []*metadata.Meta{
createBlockMeta(1, 0, 10, map[string]string{"a": "1"}, 0, []uint64{}),
createBlockMeta(2, 5, 15, map[string]string{"a": "1"}, 0, []uint64{}),
createBlockMeta(3, 20, 30, map[string]string{"a": "1"}, 0, []uint64{}),
createBlockMeta(4, 25, 40, map[string]string{"a": "1"}, 0, []uint64{}),
},
expected: map[string]planResult{
keys[0]: {
compactionRuns: 2.0,
compactionBlocks: 4.0,
},
},
},
} {
if ok := t.Run(tcase.testName, func(t *testing.T) {
blocks := make(map[ulid.ULID]*metadata.Meta, len(tcase.input))
Expand Down
Loading