Skip to content

Commit

Permalink
Compact: skip compaction for blocks with no samples (#904)
Browse files Browse the repository at this point in the history
* skip compaction for blocks with no samples

* update to actually delete the empty input blocks, and to correctly handle from bucket compactor

* warn on error deleting empty block

* use ULID instead of error for emptyBlockSentinel

* don't use a global variable

* full stop at end of comment

* use boolean to indicate whether there is more compaction work

* rename variables

* fix test
  • Loading branch information
mjd95 authored and bwplotka committed Mar 12, 2019
1 parent b74717a commit bc3aaab
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 54 deletions.
124 changes: 74 additions & 50 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,23 +513,23 @@ func (cg *Group) Resolution() int64 {

// Compact plans and runs a single compaction against the group. The compacted result
// is uploaded into the bucket the blocks were retrieved from.
func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (ulid.ULID, error) {
func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (bool, ulid.ULID, error) {
subDir := filepath.Join(dir, cg.Key())

if err := os.RemoveAll(subDir); err != nil {
return ulid.ULID{}, errors.Wrap(err, "clean compaction group dir")
return false, ulid.ULID{}, errors.Wrap(err, "clean compaction group dir")
}
if err := os.MkdirAll(subDir, 0777); err != nil {
return ulid.ULID{}, errors.Wrap(err, "create compaction group dir")
return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir")
}

compID, err := cg.compact(ctx, subDir, comp)
shouldRerun, compID, err := cg.compact(ctx, subDir, comp)
if err != nil {
cg.compactionFailures.Inc()
}
cg.compactions.Inc()

return compID, err
return shouldRerun, compID, err
}

// Issue347Error is a type wrapper for errors that should invoke repair process for broken block.
Expand Down Expand Up @@ -688,35 +688,35 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,
return nil
}

func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (compID ulid.ULID, err error) {
func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (shouldRerun bool, compID ulid.ULID, err error) {
cg.mtx.Lock()
defer cg.mtx.Unlock()

// Check for overlapped blocks.
if err := cg.areBlocksOverlapping(nil); err != nil {
return compID, halt(errors.Wrap(err, "pre compaction overlap check"))
return false, ulid.ULID{}, halt(errors.Wrap(err, "pre compaction overlap check"))
}

// Planning a compaction works purely based on the meta.json files in our future group's dir.
// So we first dump all our memory block metas into the directory.
for _, meta := range cg.blocks {
bdir := filepath.Join(dir, meta.ULID.String())
if err := os.MkdirAll(bdir, 0777); err != nil {
return compID, errors.Wrap(err, "create planning block dir")
return false, ulid.ULID{}, errors.Wrap(err, "create planning block dir")
}
if err := metadata.Write(cg.logger, bdir, meta); err != nil {
return compID, errors.Wrap(err, "write planning meta file")
return false, ulid.ULID{}, errors.Wrap(err, "write planning meta file")
}
}

// Plan against the written meta.json files.
plan, err := comp.Plan(dir)
if err != nil {
return compID, errors.Wrap(err, "plan compaction")
return false, ulid.ULID{}, errors.Wrap(err, "plan compaction")
}
if len(plan) == 0 {
// Nothing to do.
return compID, nil
return false, ulid.ULID{}, nil
}

// Due to #183 we verify that none of the blocks in the plan have overlapping sources.
Expand All @@ -729,45 +729,45 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
for _, pdir := range plan {
meta, err := metadata.Read(pdir)
if err != nil {
return compID, errors.Wrapf(err, "read meta from %s", pdir)
return false, ulid.ULID{}, errors.Wrapf(err, "read meta from %s", pdir)
}

if cg.Key() != GroupKey(*meta) {
return compID, halt(errors.Wrapf(err, "compact planned compaction for mixed groups. group: %s, planned block's group: %s", cg.Key(), GroupKey(*meta)))
return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact planned compaction for mixed groups. group: %s, planned block's group: %s", cg.Key(), GroupKey(*meta)))
}

for _, s := range meta.Compaction.Sources {
if _, ok := uniqueSources[s]; ok {
return compID, halt(errors.Errorf("overlapping sources detected for plan %v", plan))
return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", plan))
}
uniqueSources[s] = struct{}{}
}

id, err := ulid.Parse(filepath.Base(pdir))
if err != nil {
return compID, errors.Wrapf(err, "plan dir %s", pdir)
return false, ulid.ULID{}, errors.Wrapf(err, "plan dir %s", pdir)
}

if meta.ULID.Compare(id) != 0 {
return compID, errors.Errorf("mismatch between meta %s and dir %s", meta.ULID, id)
return false, ulid.ULID{}, errors.Errorf("mismatch between meta %s and dir %s", meta.ULID, id)
}

if err := block.Download(ctx, cg.logger, cg.bkt, id, pdir); err != nil {
return compID, retry(errors.Wrapf(err, "download block %s", id))
return false, ulid.ULID{}, retry(errors.Wrapf(err, "download block %s", id))
}

// Ensure all input blocks are valid.
stats, err := block.GatherIndexIssueStats(cg.logger, filepath.Join(pdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
if err != nil {
return compID, errors.Wrapf(err, "gather index issues for block %s", pdir)
return false, ulid.ULID{}, errors.Wrapf(err, "gather index issues for block %s", pdir)
}

if err := stats.CriticalErr(); err != nil {
return compID, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", pdir, meta.Compaction.Level, meta.Thanos.Labels))
return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", pdir, meta.Compaction.Level, meta.Thanos.Labels))
}

if err := stats.Issue347OutsideChunksErr(); err != nil {
return compID, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", pdir), meta.ULID)
return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", pdir), meta.ULID)
}
}
level.Debug(cg.logger).Log("msg", "downloaded and verified blocks",
Expand All @@ -777,7 +777,25 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (

compID, err = comp.Compact(dir, plan, nil)
if err != nil {
return compID, halt(errors.Wrapf(err, "compact blocks %v", plan))
return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact blocks %v", plan))
}
if compID == (ulid.ULID{}) {
// Prometheus compactor found that the compacted block would have no samples.
level.Info(cg.logger).Log("msg", "compacted block would have no samples, deleting source blocks", "blocks", fmt.Sprintf("%v", plan))
for _, block := range plan {
meta, err := metadata.Read(block)
if err != nil {
level.Warn(cg.logger).Log("msg", "failed to read meta for block", "block", block)
continue
}
if meta.Stats.NumSamples == 0 {
if err := cg.deleteBlock(block); err != nil {
level.Warn(cg.logger).Log("msg", "failed to delete empty block found during compaction", "block", block)
}
}
}
// Even though this block was empty, there may be more work to do
return true, ulid.ULID{}, nil
}
level.Debug(cg.logger).Log("msg", "compacted blocks",
"blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin))
Expand All @@ -790,55 +808,61 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
Source: metadata.CompactorSource,
}, nil)
if err != nil {
return compID, errors.Wrapf(err, "failed to finalize the block %s", bdir)
return false, ulid.ULID{}, errors.Wrapf(err, "failed to finalize the block %s", bdir)
}

if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil {
return compID, errors.Wrap(err, "remove tombstones")
return false, ulid.ULID{}, errors.Wrap(err, "remove tombstones")
}

// Ensure the output block is valid.
if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); err != nil {
return compID, halt(errors.Wrapf(err, "invalid result block %s", bdir))
return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir))
}

// Ensure the output block is not overlapping with anything else.
if err := cg.areBlocksOverlapping(newMeta, plan...); err != nil {
return compID, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir))
return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir))
}

begin = time.Now()

if err := block.Upload(ctx, cg.logger, cg.bkt, bdir); err != nil {
return compID, retry(errors.Wrapf(err, "upload of %s failed", compID))
return false, ulid.ULID{}, retry(errors.Wrapf(err, "upload of %s failed", compID))
}
level.Debug(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin))

// Delete the blocks we just compacted from the group and bucket so they do not get included
// into the next planning cycle.
// Eventually the block we just uploaded should get synced into the group again (including sync-delay).
for _, b := range plan {
id, err := ulid.Parse(filepath.Base(b))
if err != nil {
return compID, errors.Wrapf(err, "plan dir %s", b)
if err := cg.deleteBlock(b); err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "delete old block from bucket"))
}
cg.groupGarbageCollectedBlocks.Inc()
}

if err := os.RemoveAll(b); err != nil {
return compID, errors.Wrapf(err, "remove old block dir %s", id)
}
return true, compID, nil
}

// Spawn a new context so we always delete a block in full on shutdown.
delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
level.Info(cg.logger).Log("msg", "deleting compacted block", "old_block", id, "result_block", compID)
err = block.Delete(delCtx, cg.bkt, id)
cancel()
if err != nil {
return compID, retry(errors.Wrapf(err, "delete old block %s from bucket ", id))
}
cg.groupGarbageCollectedBlocks.Inc()
func (cg *Group) deleteBlock(b string) error {
id, err := ulid.Parse(filepath.Base(b))
if err != nil {
return errors.Wrapf(err, "plan dir %s", b)
}

if err := os.RemoveAll(b); err != nil {
return errors.Wrapf(err, "remove old block dir %s", id)
}

return compID, nil
// Spawn a new context so we always delete a block in full on shutdown.
delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
level.Info(cg.logger).Log("msg", "deleting compacted block", "old_block", id)
if err := block.Delete(delCtx, cg.bkt, id); err != nil {
return errors.Wrapf(err, "delete block %s from bucket", id)
}
return nil
}

// BucketCompactor compacts blocks in a bucket.
Expand Down Expand Up @@ -882,31 +906,31 @@ func (c *BucketCompactor) Compact(ctx context.Context) error {
return errors.Wrap(err, "garbage")
}

level.Info(c.logger).Log("msg", "start of compaction")

groups, err := c.sy.Groups()
if err != nil {
return errors.Wrap(err, "build compaction groups")
}
done := true
finishedAllGroups := true
for _, g := range groups {
id, err := g.Compact(ctx, c.compactDir, c.comp)
shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp)
if err == nil {
// If the returned ID has a zero value, the group had no blocks to be compacted.
// We keep going through the outer loop until no group has any work left.
if id != (ulid.ULID{}) {
done = false
if shouldRerunGroup {
finishedAllGroups = false
}
continue
}

if IsIssue347Error(err) {
if err := RepairIssue347(ctx, c.logger, c.bkt, err); err == nil {
done = false
finishedAllGroups = false
continue
}
}
return errors.Wrap(err, "compaction")
}
if done {
if finishedAllGroups {
break
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,18 +253,18 @@ func TestGroup_Compact_e2e(t *testing.T) {
comp, err := tsdb.NewLeveledCompactor(nil, log.NewLogfmtLogger(os.Stderr), []int64{1000, 3000}, nil)
testutil.Ok(t, err)

id, err := g.Compact(ctx, dir, comp)
shouldRerun, id, err := g.Compact(ctx, dir, comp)
testutil.Ok(t, err)
testutil.Assert(t, id == ulid.ULID{}, "group should be empty, but somehow compaction took place")
testutil.Assert(t, !shouldRerun, "group should be empty, but compactor did a compaction and told us to rerun")

// Add all metas that would be gathered by syncMetas.
for _, m := range metas {
testutil.Ok(t, g.Add(m))
}

id, err = g.Compact(ctx, dir, comp)
shouldRerun, id, err = g.Compact(ctx, dir, comp)
testutil.Ok(t, err)
testutil.Assert(t, id != ulid.ULID{}, "no compaction took place")
testutil.Assert(t, shouldRerun, "there should be compactible data, but the compactor reported there was not")

resDir := filepath.Join(dir, id.String())
testutil.Ok(t, block.Download(ctx, log.NewNopLogger(), bkt, id, resDir))
Expand Down

0 comments on commit bc3aaab

Please sign in to comment.