Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compact: skip compaction for blocks with no samples #904

Merged
58 changes: 42 additions & 16 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
)

var blockTooFreshSentinelError = errors.New("Block too fresh")
var emptyBlockSentinelULID = ulid.MustNew(123, nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sentinel is for noMoreCompactionForGroup I guess? can we name it like this or something similar?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK so I think there is misformation here , I would prefer something opposite like noMoreCompactionForGroup instead of this? What do you think? OR even bool flag return argument to make it more explicit (:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used a separate bool flag as I think that's the clearest what's happening. The naming was a little awkward but hopefully it makes sense, let me know what you think :)


// Syncer syncronizes block metas from a bucket into a local directory.
// It sorts them into compaction groups based on equal label sets.
Expand Down Expand Up @@ -779,6 +780,23 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
if err != nil {
return compID, halt(errors.Wrapf(err, "compact blocks %v", plan))
}
if compID == (ulid.ULID{}) {
// Prometheus compactor found that the compacted block would have no samples.
level.Info(cg.logger).Log("msg", "compacted block would have no samples, deleting source blocks", "blocks", fmt.Sprintf("%v", plan))
for _, block := range plan {
meta, err := metadata.Read(block)
if err != nil {
level.Warn(cg.logger).Log("msg", "failed to read meta for block", "block", block)
continue
}
if meta.Stats.NumSamples == 0 {
if err := cg.deleteBlock(block); err != nil {
level.Warn(cg.logger).Log("msg", "failed to delete empty block found during compaction", "block", block)
}
}
}
return emptyBlockSentinelULID, nil
}
level.Debug(cg.logger).Log("msg", "compacted blocks",
"blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin))

Expand Down Expand Up @@ -818,29 +836,35 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
// into the next planning cycle.
// Eventually the block we just uploaded should get synced into the group again (including sync-delay).
for _, b := range plan {
id, err := ulid.Parse(filepath.Base(b))
if err != nil {
return compID, errors.Wrapf(err, "plan dir %s", b)
}

if err := os.RemoveAll(b); err != nil {
return compID, errors.Wrapf(err, "remove old block dir %s", id)
}

// Spawn a new context so we always delete a block in full on shutdown.
delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
level.Info(cg.logger).Log("msg", "deleting compacted block", "old_block", id, "result_block", compID)
err = block.Delete(delCtx, cg.bkt, id)
cancel()
if err != nil {
return compID, retry(errors.Wrapf(err, "delete old block %s from bucket ", id))
if err := cg.deleteBlock(b); err != nil {
return compID, retry(errors.Wrapf(err, "delete old block from bucket"))
}
cg.groupGarbageCollectedBlocks.Inc()
}

return compID, nil
}

func (cg *Group) deleteBlock(b string) error {
id, err := ulid.Parse(filepath.Base(b))
if err != nil {
return errors.Wrapf(err, "plan dir %s", b)
}

if err := os.RemoveAll(b); err != nil {
return errors.Wrapf(err, "remove old block dir %s", id)
}

// Spawn a new context so we always delete a block in full on shutdown.
delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
level.Info(cg.logger).Log("msg", "deleting compacted block", "old_block", id)
if err := block.Delete(delCtx, cg.bkt, id); err != nil {
return errors.Wrapf(err, "delete block %s from bucket", id)
}
return nil
}

// BucketCompactor compacts blocks in a bucket.
type BucketCompactor struct {
logger log.Logger
Expand Down Expand Up @@ -882,6 +906,8 @@ func (c *BucketCompactor) Compact(ctx context.Context) error {
return errors.Wrap(err, "garbage")
}

level.Info(c.logger).Log("msg", "start of compaction")

groups, err := c.sy.Groups()
if err != nil {
return errors.Wrap(err, "build compaction groups")
Expand Down