Skip to content

Commit

Permalink
[PLAT-104290] remove overlapping blocks into its own file
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Jin <[email protected]>
  • Loading branch information
jnyi committed Mar 22, 2024
1 parent 4ed8a45 commit 5fc3435
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 53 deletions.
56 changes: 3 additions & 53 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,53 +1031,6 @@ func (cg *Group) areBlocksOverlapping(include *metadata.Meta, exclude ...*metada
return nil
}

func (cg *Group) removeOverlappingBlocks(ctx context.Context, toCompact []*metadata.Meta, dir string) error {
if len(toCompact) == 0 {
return nil
}
kept := toCompact[0]
for _, m := range toCompact {
if m.MinTime < kept.MinTime && m.MaxTime > kept.MaxTime {
level.Warn(cg.logger).Log("msg", "found overlapping block in plan that are not the first",
"first", kept.String(), "block", m.String())
kept = m
} else if (m.MinTime < kept.MinTime && kept.MinTime < m.MaxTime) ||
(m.MinTime < kept.MaxTime && kept.MaxTime < m.MaxTime) {
err := errors.Errorf("found partially overlapping block: %s vs %s", m.String(), kept.String())
if cg.enableVerticalCompaction {
level.Error(cg.logger).Log("msg", "best effort to vertical compact", "err", err)
return nil
} else {
return halt(err)
}
}
}
for _, m := range toCompact {
if m.ULID.Compare(kept.ULID) == 0 || m.Thanos.Source == metadata.ReceiveSource {
level.Info(cg.logger).Log("msg", "keep this overlapping block", "block", m.String(),
"level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels())
continue
}
cg.overlappingBlocks.Inc()
if err := DeleteBlockNow(ctx, cg.logger, cg.bkt, m, dir); err != nil {
return retry(err)
}
}
return retry(errors.Errorf("found overlapping blocks in plan. Only kept %s", kept.String()))
}

func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string) error {
level.Warn(logger).Log("msg", "delete polluted block immediately", "block", m.String(),
"level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels())
if err := block.Delete(ctx, logger, bkt, m.ULID); err != nil {
return errors.Wrapf(err, "delete overlapping block %s", m.String())
}
if err := os.RemoveAll(filepath.Join(dir, m.ULID.String())); err != nil {
return errors.Wrapf(err, "remove old block dir %s", m.String())
}
return nil
}

// RepairIssue347 repairs the https://github.com/prometheus/tsdb/issues/347 issue when having issue347Error.
func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, blocksMarkedForDeletion prometheus.Counter, issue347Err error) error {
ie, ok := errors.Cause(issue347Err).(Issue347Error)
Expand Down Expand Up @@ -1171,12 +1124,9 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
begin := groupCompactionBegin

if err := compactionLifecycleCallback.PreCompactionCallback(ctx, cg.logger, cg, toCompact); err != nil {
level.Error(cg.logger).Log("msg", fmt.Sprintf("failed to run pre compaction callback for plan: %v", toCompact), "err", err)
// instead of halting, we attempt to remove overlapped blocks and only keep the longest one.
if newErr := cg.removeOverlappingBlocks(ctx, toCompact, dir); newErr != nil {
return false, ulid.ULID{}, newErr
}
return false, ulid.ULID{}, errors.Wrapf(err, "failed to run pre compaction callback for plan: %s", fmt.Sprintf("%v", toCompact))
}
toCompact = FilterNilBlocks(toCompact)
level.Info(cg.logger).Log("msg", "finished running pre compaction callback; downloading blocks", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "plan", fmt.Sprintf("%v", toCompact))

begin = time.Now()
Expand Down Expand Up @@ -1407,7 +1357,7 @@ func NewBucketCompactor(
planner,
comp,
DefaultBlockDeletableChecker{},
DefaultCompactionLifecycleCallback{},
NewOverlappingCompactionLifecycleCallback(),
compactDir,
bkt,
concurrency,
Expand Down
92 changes: 92 additions & 0 deletions pkg/compact/overlapping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package compact

import (
"context"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"os"
"path/filepath"
)

type OverlappingCompactionLifecycleCallback struct {
overlappingBlocks prometheus.Counter
metaDir string
}

func NewOverlappingCompactionLifecycleCallback() *OverlappingCompactionLifecycleCallback {
return &OverlappingCompactionLifecycleCallback{}
}

// PreCompactionCallback given the assumption that toCompact is sorted by MinTime in ascending order from Planner
// (not guaranteed on MaxTime order), we will detect overlapping blocks and delete them while retaining all others.
func (c *OverlappingCompactionLifecycleCallback) PreCompactionCallback(ctx context.Context, logger log.Logger, cg *Group, toCompact []*metadata.Meta) error {
if len(toCompact) == 0 {
return nil
}
previous := 0
for i, m := range toCompact {
kept := toCompact[previous]
if previous == 0 || m.Thanos.Source == metadata.ReceiveSource || kept.MaxTime <= m.MinTime {
// no overlapping with previous blocks, skip it
previous = i
continue
} else if m.MinTime < kept.MinTime {
// halt when the assumption is broken, need manual investigation
return halt(errors.Errorf("later blocks has smaller minTime than previous block: %s -- %s", kept.String(), m.String()))
}
if kept.MaxTime >= m.MaxTime {
level.Warn(logger).Log("msg", "found overlapping block in plan",
"toKeep", kept.String(), "toDelete", m.String())
cg.overlappingBlocks.Inc()
if err := DeleteBlockNow(ctx, logger, cg.bkt, m, c.metaDir); err != nil {
return retry(err)
}
toCompact[i] = nil
} else {
err := errors.Errorf("found partially overlapping block: %s -- %s", kept.String(), m.String())
if cg.enableVerticalCompaction {
level.Error(logger).Log("msg", "best effort to vertical compact", "err", err)
previous = i // move to next block
} else {
return halt(err)
}
}
}
return nil
}

func (c *OverlappingCompactionLifecycleCallback) PostCompactionCallback(_ context.Context, _ log.Logger, _ *Group, _ ulid.ULID) error {
return nil
}

func (c *OverlappingCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, _ log.Logger, _ *Group) (tsdb.BlockPopulator, error) {
return tsdb.DefaultBlockPopulator{}, nil
}

func FilterNilBlocks(blocks []*metadata.Meta) (res []*metadata.Meta) {
for _, b := range blocks {
if b != nil {
res = append(res, b)
}
}
return res
}

func DeleteBlockNow(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string) error {
level.Warn(logger).Log("msg", "delete polluted block immediately", "block", m.String(),
"level", m.Compaction.Level, "source", m.Thanos.Source, "labels", m.Thanos.GetLabels())
if err := block.Delete(ctx, logger, bkt, m.ULID); err != nil {
return errors.Wrapf(err, "delete overlapping block %s", m.String())
}
if err := os.RemoveAll(filepath.Join(dir, m.ULID.String())); err != nil {
return errors.Wrapf(err, "remove old block dir %s", m.String())
}
return nil
}
13 changes: 13 additions & 0 deletions pkg/compact/overlapping_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package compact

import (
"github.com/efficientgo/core/testutil"
"github.com/thanos-io/thanos/pkg/block/metadata"
"testing"
)

func TestFilterNilCompact(t *testing.T) {
blocks := []*metadata.Meta{nil, nil}
filtered := FilterNilBlocks(blocks)
testutil.Equals(t, 0, len(filtered))
}

0 comments on commit 5fc3435

Please sign in to comment.