Skip to content

Commit

Permalink
Run all tasks independently
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski committed Dec 2, 2022
1 parent 912172b commit d6fbe29
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 117 deletions.
178 changes: 101 additions & 77 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/thanos-io/objstore"
"golang.org/x/sync/errgroup"

"github.com/thanos-io/thanos/internal/cortex/util/multierror"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
Expand Down Expand Up @@ -729,7 +730,7 @@ func (rs *RetentionProgressCalculator) ProgressCalculate(ctx context.Context, gr
type Planner interface {
// Plan returns a list of blocks that should be compacted into single one.
// The blocks can be overlapping. The provided metadata has to be ordered by minTime.
Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]compactionTask, error)
Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]CompactionTask, error)
}

// Compactor provides compaction against an underlying storage of time series data.
Expand All @@ -753,7 +754,7 @@ type Compactor interface {

// Compact plans and runs a single compaction against the group. The compacted result
// is uploaded into the bucket the blocks were retrieved from.
func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, compID ulid.ULID, rerr error) {
func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, rerr error) {
cg.compactionRunsStarted.Inc()

subDir := filepath.Join(dir, cg.Key())
Expand All @@ -770,19 +771,19 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp
}()

if err := os.MkdirAll(subDir, 0750); err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir")
return false, errors.Wrap(err, "create compaction group dir")
}

err := tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) (err error) {
shouldRerun, compID, err = cg.compact(ctx, subDir, planner, comp)
shouldRerun, err = cg.compact(ctx, subDir, planner, comp)
return err
}, opentracing.Tags{"group.key": cg.Key()})
if err != nil {
cg.compactionFailures.Inc()
return false, ulid.ULID{}, err
return false, err
}
cg.compactionRunsCompleted.Inc()
return shouldRerun, compID, nil
return shouldRerun, nil
}

// Issue347Error is a type wrapper for errors that should invoke repair process for broken block.
Expand Down Expand Up @@ -976,7 +977,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,
return nil
}

func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, compID ulid.ULID, _ error) {
func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, _ error) {
cg.mtx.Lock()
defer cg.mtx.Unlock()

Expand All @@ -986,86 +987,114 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
// TODO(bwplotka): It would really nice if we could still check for other overlaps than replica. In fact this should be checked
// in syncer itself. Otherwise with vertical compaction enabled we will sacrifice this important check.
if !cg.enableVerticalCompaction {
return false, ulid.ULID{}, halt(errors.Wrap(err, "pre compaction overlap check"))
return false, halt(errors.Wrap(err, "pre compaction overlap check"))
}

overlappingBlocks = true
}

var tasks []compactionTask
var tasks []CompactionTask
if err := tracing.DoInSpanWithErr(ctx, "compaction_planning", func(ctx context.Context) (e error) {
tasks, e = planner.Plan(ctx, cg.metasByMinTime)
return e
}); err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "plan compaction")
return false, errors.Wrap(err, "plan compaction")
}
if len(tasks) == 0 {
// Nothing to do.
return false, ulid.ULID{}, nil
return false, nil
}

level.Info(cg.logger).Log("msg", "compaction available and planned; downloading blocks", "plan", fmt.Sprintf("%v", tasks))

// Due to #183 we verify that none of the blocks in the plan have overlapping sources.
// This is one potential source of how we could end up with duplicated chunks.
uniqueSources := map[ulid.ULID]struct{}{}

// Once we have a plan we need to download the actual data.
groupCompactionBegin := time.Now()
begin := groupCompactionBegin
g, errCtx := errgroup.WithContext(ctx)
g.SetLimit(cg.compactBlocksFetchConcurrency)

toCompactDirs := make([]string, 0, len(tasks))
for _, task := range tasks {
for _, m := range task {
bdir := filepath.Join(dir, m.ULID.String())
for _, s := range m.Compaction.Sources {
if _, ok := uniqueSources[s]; ok {
return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", tasks))
return false, halt(errors.Errorf("overlapping sources detected for plan %v", task))
}
uniqueSources[s] = struct{}{}
}
func(ctx context.Context, meta *metadata.Meta) {
g.Go(func() error {
if err := tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error {
return block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency))
}, opentracing.Tags{"block.id": meta.ULID}); err != nil {
return retry(errors.Wrapf(err, "download block %s", meta.ULID))
}
}
}

// Ensure all input blocks are valid.
var stats block.HealthStats
if err := tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) (e error) {
stats, e = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
return e
}, opentracing.Tags{"block.id": meta.ULID}); err != nil {
return errors.Wrapf(err, "gather index issues for block %s", bdir)
}
level.Info(cg.logger).Log("msg", "compaction available and planned; downloading blocks", "plan", fmt.Sprintf("%v", tasks))

if err := stats.CriticalErr(); err != nil {
return halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels))
}
var (
wg sync.WaitGroup
mu sync.Mutex
groupErr multierror.MultiError
rerunGroup bool
)
for _, task := range tasks {
wg.Add(1)
go func(task CompactionTask) {
defer wg.Done()
rerunTask, _, err := cg.compactBlocks(ctx, dir, task, comp, overlappingBlocks)

if err := stats.OutOfOrderChunksErr(); err != nil {
return outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID)
}
mu.Lock()
defer mu.Unlock()
rerunGroup = rerunGroup || rerunTask
groupErr.Add(err)
}(task)
}
wg.Wait()

if err := stats.Issue347OutsideChunksErr(); err != nil {
return issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
}
return rerunGroup, groupErr.Err()
}

if err := stats.OutOfOrderLabelsErr(); !cg.acceptMalformedIndex && err != nil {
return errors.Wrapf(err,
"block id %s, try running with --debug.accept-malformed-index", meta.ULID)
}
return nil
})
}(errCtx, m)
func (cg *Group) compactBlocks(ctx context.Context, dir string, task CompactionTask, comp Compactor, overlappingBlocks bool) (bool, ulid.ULID, error) {
// Once we have a plan we need to download the actual data.
compactionBegin := time.Now()
begin := compactionBegin

toCompactDirs = append(toCompactDirs, bdir)
}
g, errCtx := errgroup.WithContext(ctx)
g.SetLimit(cg.compactBlocksFetchConcurrency)

toCompactDirs := make([]string, 0, len(task))
for _, m := range task {
bdir := filepath.Join(dir, m.ULID.String())
func(ctx context.Context, meta *metadata.Meta) {
g.Go(func() error {
if err := tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error {
return block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency))
}, opentracing.Tags{"block.id": meta.ULID}); err != nil {
return retry(errors.Wrapf(err, "download block %s", meta.ULID))
}

// Ensure all input blocks are valid.
var stats block.HealthStats
if err := tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) (e error) {
stats, e = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
return e
}, opentracing.Tags{"block.id": meta.ULID}); err != nil {
return errors.Wrapf(err, "gather index issues for block %s", bdir)
}

if err := stats.CriticalErr(); err != nil {
return halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels))
}

if err := stats.OutOfOrderChunksErr(); err != nil {
return outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID)
}

if err := stats.Issue347OutsideChunksErr(); err != nil {
return issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
}

if err := stats.OutOfOrderLabelsErr(); !cg.acceptMalformedIndex && err != nil {
return errors.Wrapf(err,
"block id %s, try running with --debug.accept-malformed-index", meta.ULID)
}
return nil
})
}(errCtx, m)

toCompactDirs = append(toCompactDirs, bdir)
}

sourceBlockStr := fmt.Sprintf("%v", toCompactDirs)

if err := g.Wait(); err != nil {
Expand All @@ -1074,6 +1103,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp

level.Info(cg.logger).Log("msg", "downloaded and verified blocks; compacting blocks", "plan", sourceBlockStr, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())

var compID ulid.ULID
begin = time.Now()
if err := tracing.DoInSpanWithErr(ctx, "compaction", func(ctx context.Context) (e error) {
compID, e = comp.Compact(dir, toCompactDirs, nil)
Expand All @@ -1084,12 +1114,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
if compID == (ulid.ULID{}) {
// Prometheus compactor found that the compacted block would have no samples.
level.Info(cg.logger).Log("msg", "compacted block would have no samples, deleting source blocks", "blocks", sourceBlockStr)
for _, task := range tasks {
for _, meta := range task {
if meta.Stats.NumSamples == 0 {
if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil {
level.Warn(cg.logger).Log("msg", "failed to mark for deletion an empty block found during compaction", "block", meta.ULID)
}
for _, meta := range task {
if meta.Stats.NumSamples == 0 {
if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil {
level.Warn(cg.logger).Log("msg", "failed to mark for deletion an empty block found during compaction", "block", meta.ULID)
}
}
}
Expand Down Expand Up @@ -1131,10 +1159,8 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
// Ensure the output block is not overlapping with anything else,
// unless vertical compaction is enabled.
if !cg.enableVerticalCompaction {
for _, task := range tasks {
if err := cg.areBlocksOverlapping(newMeta, task...); err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir))
}
if err := cg.areBlocksOverlapping(newMeta, task...); err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir))
}
}

Expand All @@ -1151,20 +1177,18 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
// Mark for deletion the blocks we just compacted from the group and bucket so they do not get included
// into the next planning cycle.
// Eventually the block we just uploaded should get synced into the group again (including sync-delay).
for _, task := range tasks {
for _, meta := range task {
err = tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error {
return cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String()))
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket"))
}
cg.groupGarbageCollectedBlocks.Inc()
for _, meta := range task {
err = tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error {
return cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String()))
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket"))
}
cg.groupGarbageCollectedBlocks.Inc()
}

level.Info(cg.logger).Log("msg", "finished compacting blocks", "result_block", compID, "source_blocks", sourceBlockStr,
"duration", time.Since(groupCompactionBegin), "duration_ms", time.Since(groupCompactionBegin).Milliseconds())
"duration", time.Since(compactionBegin), "duration_ms", time.Since(compactionBegin).Milliseconds())
return true, compID, nil
}

Expand Down Expand Up @@ -1257,7 +1281,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
go func() {
defer wg.Done()
for g := range groupChan {
shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.planner, c.comp)
shouldRerunGroup, err := g.Compact(workCtx, c.compactDir, c.planner, c.comp)
if err == nil {
if shouldRerunGroup {
mtx.Lock()
Expand Down
18 changes: 10 additions & 8 deletions pkg/compact/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@ func NewPlanner(logger log.Logger, ranges []int64, noCompBlocks *GatherNoCompact
}

// TODO(bwplotka): Consider smarter algorithm, this prefers smaller iterative compactions vs big single one: https://github.com/thanos-io/thanos/issues/3405
func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]compactionTask, error) {
func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]CompactionTask, error) {
return p.plan(p.noCompBlocksFunc(), metasByMinTime)
}

type compactionTask []*metadata.Meta
// CompactionTask is a set of blocks that should be compacted together in a single compaction.
// Multiple compaction tasks can be run in parallel even inside a single compaction group.
type CompactionTask []*metadata.Meta

func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) ([]compactionTask, error) {
func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) ([]CompactionTask, error) {
notExcludedMetasByMinTime := make([]*metadata.Meta, 0, len(metasByMinTime))
for _, meta := range metasByMinTime {
if _, excluded := noCompactMarked[meta.ULID]; excluded {
Expand All @@ -79,7 +81,7 @@ func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompac
metasByMinTime = metasByMinTime[:len(metasByMinTime)-1]
res := selectMetas(p.ranges, noCompactMarked, metasByMinTime)
if len(res) > 0 {
return []compactionTask{res}, nil
return []CompactionTask{res}, nil
}

// Compact any blocks with big enough time range that have >5% tombstones.
Expand All @@ -90,7 +92,7 @@ func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompac
}
if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 {
task := []*metadata.Meta{notExcludedMetasByMinTime[i]}
return []compactionTask{task}, nil
return []CompactionTask{task}, nil
}
}

Expand Down Expand Up @@ -159,7 +161,7 @@ func selectMetas(ranges []int64, noCompactMarked map[ulid.ULID]*metadata.NoCompa
// selectOverlappingMetas returns all dirs with overlapping time ranges.
// It expects sorted input by mint and returns the overlapping dirs in the same order as received.
// Copied and adjusted from https://github.com/prometheus/prometheus/blob/3d8826a3d42566684283a9b7f7e812e412c24407/tsdb/compact.go#L268.
func selectOverlappingMetas(metasByMinTime []*metadata.Meta) []compactionTask {
func selectOverlappingMetas(metasByMinTime []*metadata.Meta) []CompactionTask {
if len(metasByMinTime) < 2 {
return nil
}
Expand Down Expand Up @@ -203,7 +205,7 @@ loopMetas:
}
}

overlappingGroups := make([]compactionTask, 0, len(groups))
overlappingGroups := make([]CompactionTask, 0, len(groups))
for _, group := range groups {
if len(group) < 2 {
continue
Expand Down Expand Up @@ -281,7 +283,7 @@ func WithLargeTotalIndexSizeFilter(with *tsdbBasedPlanner, bkt objstore.Bucket,
return &largeTotalIndexSizeFilter{tsdbBasedPlanner: with, bkt: bkt, totalMaxIndexSizeBytes: totalMaxIndexSizeBytes, markedForNoCompact: markedForNoCompact}
}

func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]compactionTask, error) {
func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]CompactionTask, error) {
noCompactMarked := t.noCompBlocksFunc()
copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked))
for k, v := range noCompactMarked {
Expand Down
Loading

0 comments on commit d6fbe29

Please sign in to comment.