Skip to content

Commit

Permalink
skip blocks with out-of-order chunk during compaction
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Hu <[email protected]>
  • Loading branch information
huyan0 committed Jul 23, 2021
1 parent 0058003 commit 887afae
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 33 deletions.
1 change: 1 addition & 0 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ func runCompact(
reg,
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
compactMetrics.garbageCollectedBlocks,
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename),
metadata.HashFunc(conf.hashFunc),
)
planner := compact.WithLargeTotalIndexSizeFilter(
Expand Down
19 changes: 13 additions & 6 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,9 @@ func (i HealthStats) Issue347OutsideChunksErr() error {
return nil
}

// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure.
func (i HealthStats) CriticalErr() error {
var errMsg []string

if i.OutOfOrderSeries > 0 {
errMsg = append(errMsg, fmt.Sprintf(
func (i HealthStats) OutOfOrderChunksErr() error {
if i.OutOfOrderChunks > 0 {
return errors.New(fmt.Sprintf(
"%d/%d series have an average of %.3f out-of-order chunks: "+
"%.3f of these are exact duplicates (in terms of data and time range)",
i.OutOfOrderSeries,
Expand All @@ -125,6 +122,12 @@ func (i HealthStats) CriticalErr() error {
float64(i.DuplicatedChunks)/float64(i.OutOfOrderChunks),
))
}
return nil
}

// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure.
func (i HealthStats) CriticalErr() error {
var errMsg []string

n := i.OutsideChunks - (i.CompleteOutsideChunks + i.Issue347OutsideChunks)
if n > 0 {
Expand Down Expand Up @@ -158,6 +161,10 @@ func (i HealthStats) AnyErr() error {
errMsg = append(errMsg, err.Error())
}

if err := i.OutOfOrderChunksErr(); err != nil {
errMsg = append(errMsg, err.Error())
}

if len(errMsg) > 0 {
return errors.New(strings.Join(errMsg, ", "))
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/block/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package block
import (
"context"
"io/ioutil"
"math"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -83,5 +84,18 @@ func TestRewrite(t *testing.T) {
testutil.Ok(t, ir2.Series(p.At(), &lset, &chks))
testutil.Equals(t, 1, len(chks))
}
}

func TestGatherIndexHealthStatsReturnsOutOfOrderChunksErr(t *testing.T) {
blockDir, err := ioutil.TempDir("", "test-ooo-index")
testutil.Ok(t, err)

err = testutil.PutOutOfOrderIndex(blockDir, 0, math.MaxInt64)
testutil.Ok(t, err)

stats, err := GatherIndexHealthStats(log.NewLogfmtLogger(os.Stderr), blockDir+"/"+IndexFilename, 0, math.MaxInt64)

testutil.Ok(t, err)
testutil.Equals(t, 1, stats.OutOfOrderChunks)
testutil.NotOk(t, stats.OutOfOrderChunksErr())
}
2 changes: 2 additions & 0 deletions pkg/block/metadata/markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ const (
// IndexSizeExceedingNoCompactReason is a reason of index being too big (for example exceeding 64GB limit: https://github.com/thanos-io/thanos/issues/1424)
// This reason can be ignored when vertical block sharding will be implemented.
IndexSizeExceedingNoCompactReason = "index-size-exceeding"
// OutOfOrderChunksNoCompactReason is a reason of to no compact block with index contains out of order chunk so that the compaction is not blocked.
OutOfOrderChunksNoCompactReason = "block-index-out-of-order-chunk"
)

// NoCompactMark marker stores reason of block being excluded from compaction if needed.
Expand Down
53 changes: 50 additions & 3 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ type DefaultGrouper struct {
verticalCompactions *prometheus.CounterVec
garbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
hashFunc metadata.HashFunc
}

Expand All @@ -252,6 +253,7 @@ func NewDefaultGrouper(
reg prometheus.Registerer,
blocksMarkedForDeletion prometheus.Counter,
garbageCollectedBlocks prometheus.Counter,
blocksMarkedForNoCompact prometheus.Counter,
hashFunc metadata.HashFunc,
) *DefaultGrouper {
return &DefaultGrouper{
Expand Down Expand Up @@ -279,9 +281,10 @@ func NewDefaultGrouper(
Name: "thanos_compact_group_vertical_compactions_total",
Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.",
}, []string{"group"}),
garbageCollectedBlocks: garbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
hashFunc: hashFunc,
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
garbageCollectedBlocks: garbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
hashFunc: hashFunc,
}
}

Expand Down Expand Up @@ -309,6 +312,7 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
g.verticalCompactions.WithLabelValues(groupKey),
g.garbageCollectedBlocks,
g.blocksMarkedForDeletion,
g.blocksMarkedForNoCompact,
g.hashFunc,
)
if err != nil {
Expand Down Expand Up @@ -346,6 +350,7 @@ type Group struct {
verticalCompactions prometheus.Counter
groupGarbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
hashFunc metadata.HashFunc
}

Expand All @@ -365,6 +370,7 @@ func NewGroup(
verticalCompactions prometheus.Counter,
groupGarbageCollectedBlocks prometheus.Counter,
blocksMarkedForDeletion prometheus.Counter,
blockMakredForNoCopmact prometheus.Counter,
hashFunc metadata.HashFunc,
) (*Group, error) {
if logger == nil {
Expand All @@ -385,6 +391,7 @@ func NewGroup(
verticalCompactions: verticalCompactions,
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
blocksMarkedForNoCompact: blockMakredForNoCopmact,
hashFunc: hashFunc,
}
return g, nil
Expand Down Expand Up @@ -541,6 +548,27 @@ func IsIssue347Error(err error) bool {
return ok
}

// OutOfOrderChunkError is a type wrapper for OOO chunk error from validating block index.
type OutOfOrderChunksError struct {
err error

id ulid.ULID
}

func (e OutOfOrderChunksError) Error() string {
return e.err.Error()
}

func outOfOrderChunkError(err error, brokenBlock ulid.ULID) OutOfOrderChunksError {
return OutOfOrderChunksError{err: err, id: brokenBlock}
}

// IsOutOfOrderChunk returns true if the base error is a OutOfOrderChunkError.
func IsOutOfOrderChunkError(err error) bool {
_, ok := errors.Cause(err).(OutOfOrderChunksError)
return ok
}

// HaltError is a type wrapper for errors that should halt any further progress on compactions.
type HaltError struct {
err error
Expand Down Expand Up @@ -749,6 +777,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels))
}

if err := stats.OutOfOrderChunksErr(); err != nil {
return false, ulid.ULID{}, outOfOrderChunkError(errors.Wrapf(err, "blocks with out of order chunks should be drop block from compaction: %s", bdir), meta.ULID)
}

if err := stats.Issue347OutsideChunksErr(); err != nil {
return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
}
Expand Down Expand Up @@ -939,6 +971,21 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
continue
}
}
// if block has out of order chunk, mark the block for no copmaction and continue
if IsOutOfOrderChunkError(err) {
if err := block.MarkForNoCompact(
ctx,
c.logger,
c.bkt,
err.(OutOfOrderChunksError).id,
metadata.OutOfOrderChunksNoCompactReason,
"OutofOrderChunk: marking block with out-of-order series/chunks to as no compact to unblock compaction", g.blocksMarkedForNoCompact); err == nil {
mtx.Lock()
finishedAllGroups = false
mtx.Unlock()
continue
}
}
errChan <- errors.Wrapf(err, "group %s", g.Key())
return
}
Expand Down
77 changes: 53 additions & 24 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {

blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
blockMarkedForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, nil, 48*time.Hour, fetcherConcurrency)
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 1)
testutil.Ok(t, err)
Expand Down Expand Up @@ -138,7 +139,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
testutil.Ok(t, sy.GarbageCollect(ctx))

// Only the level 3 block, the last source block in both resolutions should be left.
grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, metadata.NoneFunc)
grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, blockMarkedForNoCompact, metadata.NoneFunc)
groups, err := grouper.Groups(sy.Metas())
testutil.Ok(t, err)

Expand Down Expand Up @@ -195,23 +196,26 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, objstore.WithNoopInstr(bkt), 48*time.Hour, fetcherConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter()
noCompactMarkerFilter := NewGatherNoCompactionMarkFilter(logger, objstore.WithNoopInstr(bkt), 2)
metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
noCompactMarkerFilter,
}, nil)
testutil.Ok(t, err)

blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
blocksMaredForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 5)
testutil.Ok(t, err)

comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, mergeFunc)
testutil.Ok(t, err)

planner := NewTSDBBasedPlanner(logger, []int64{1000, 3000})
planner := NewPlanner(logger, []int64{1000, 3000}, noCompactMarkerFilter)

grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, metadata.NoneFunc)
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc)
bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2)
testutil.Ok(t, err)

Expand All @@ -220,6 +224,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectedBlocks))
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.blocksMarkedForDeletion))
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures))
testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.blocksMarkedForNoCompact))
testutil.Equals(t, 0, MetricCount(grouper.compactions))
testutil.Equals(t, 0, MetricCount(grouper.compactionRunsStarted))
testutil.Equals(t, 0, MetricCount(grouper.compactionRunsCompleted))
Expand All @@ -233,7 +238,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
extLabels2 := labels.Labels{{Name: "e1", Value: "1"}}
metas := createAndUpload(t, bkt, []blockgenSpec{
{
numSamples: 100, mint: 0, maxt: 1000, extLset: extLabels, res: 124,
numSamples: 100, mint: 500, maxt: 1000, extLset: extLabels, res: 124,
series: []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "2"}},
Expand Down Expand Up @@ -303,31 +308,42 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
{{Name: "a", Value: "7"}},
},
},
}, []blockgenSpec{
{
numSamples: 100, mint: 0, maxt: 499, extLset: extLabels, res: 124,
series: []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
},
},
})

testutil.Ok(t, bComp.Compact(ctx))
testutil.Equals(t, 5.0, promtest.ToFloat64(sy.metrics.garbageCollectedBlocks))
testutil.Equals(t, 5.0, promtest.ToFloat64(sy.metrics.blocksMarkedForDeletion))
testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.blocksMarkedForNoCompact))
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures))
testutil.Equals(t, 4, MetricCount(grouper.compactions))
testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(DefaultGroupKey(metas[0].Thanos))))
testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(DefaultGroupKey(metas[7].Thanos))))
testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(DefaultGroupKey(metas[4].Thanos))))
testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(DefaultGroupKey(metas[5].Thanos))))
testutil.Equals(t, 4, MetricCount(grouper.compactionRunsStarted))
testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[0].Thanos))))
testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[7].Thanos))))
testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[0].Thanos))))
testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[7].Thanos))))
// TODO(bwplotka): Looks like we do some unnecessary loops. Not a major problem but investigate.
testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[4].Thanos))))
testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[5].Thanos))))
testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[4].Thanos))))
testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[5].Thanos))))
testutil.Equals(t, 4, MetricCount(grouper.compactionRunsCompleted))
testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[0].Thanos))))
testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[7].Thanos))))
testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[7].Thanos))))
// TODO(bwplotka): Looks like we do some unnecessary loops. Not a major problem but investigate.
testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[4].Thanos))))
testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[5].Thanos))))
testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[4].Thanos))))
testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[5].Thanos))))
testutil.Equals(t, 4, MetricCount(grouper.compactionFailures))
testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(DefaultGroupKey(metas[0].Thanos))))
testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(DefaultGroupKey(metas[0].Thanos))))
testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(DefaultGroupKey(metas[7].Thanos))))
testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(DefaultGroupKey(metas[4].Thanos))))
testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(DefaultGroupKey(metas[5].Thanos))))
Expand All @@ -342,6 +358,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
metas[4].ULID: false,
metas[5].ULID: false,
metas[8].ULID: false,
metas[9].ULID: false,
}
others := map[string]metadata.Meta{}
testutil.Ok(t, bkt.Iter(ctx, "", func(n string) error {
Expand Down Expand Up @@ -374,7 +391,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
meta, ok := others[defaultGroupKey(124, extLabels)]
testutil.Assert(t, ok, "meta not found")

testutil.Equals(t, int64(0), meta.MinTime)
testutil.Equals(t, int64(500), meta.MinTime)
testutil.Equals(t, int64(3000), meta.MaxTime)
testutil.Equals(t, uint64(6), meta.Stats.NumSeries)
testutil.Equals(t, uint64(2*4*100), meta.Stats.NumSamples) // Only 2 times 4*100 because one block was empty.
Expand Down Expand Up @@ -413,7 +430,7 @@ type blockgenSpec struct {
res int64
}

func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) (metas []*metadata.Meta) {
func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec, blocksWithOutOfOrderChunks []blockgenSpec) (metas []*metadata.Meta) {
prepareDir, err := ioutil.TempDir("", "test-compact-prepare")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(prepareDir)) }()
Expand All @@ -422,23 +439,35 @@ func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) (
defer cancel()

for _, b := range blocks {
var id ulid.ULID
var err error
if b.numSamples == 0 {
id, err = e2eutil.CreateEmptyBlock(prepareDir, b.mint, b.maxt, b.extLset, b.res)
} else {
id, err = e2eutil.CreateBlock(ctx, prepareDir, b.series, b.numSamples, b.mint, b.maxt, b.extLset, b.res, metadata.NoneFunc)
}
testutil.Ok(t, err)
id, meta := createBlock(t, ctx, prepareDir, b)
metas = append(metas, meta)
testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, id.String()), metadata.NoneFunc))
}
for _, b := range blocksWithOutOfOrderChunks {
id, meta := createBlock(t, ctx, prepareDir, b)

meta, err := metadata.ReadFromDir(filepath.Join(prepareDir, id.String()))
err := testutil.PutOutOfOrderIndex(filepath.Join(prepareDir, id.String()), b.mint, b.maxt)
testutil.Ok(t, err)
metas = append(metas, meta)

metas = append(metas, meta)
testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, id.String()), metadata.NoneFunc))
}

return metas
}
func createBlock(t testing.TB, ctx context.Context, prepareDir string, b blockgenSpec) (id ulid.ULID, meta *metadata.Meta) {
var err error
if b.numSamples == 0 {
id, err = e2eutil.CreateEmptyBlock(prepareDir, b.mint, b.maxt, b.extLset, b.res)
} else {
id, err = e2eutil.CreateBlock(ctx, prepareDir, b.series, b.numSamples, b.mint, b.maxt, b.extLset, b.res, metadata.NoneFunc)
}
testutil.Ok(t, err)

meta, err = metadata.ReadFromDir(filepath.Join(prepareDir, id.String()))
testutil.Ok(t, err)
return
}

// Regression test for #2459 issue.
func TestGarbageCollectDoesntCreateEmptyBlocksWithDeletionMarksOnly(t *testing.T) {
Expand Down
Loading

0 comments on commit 887afae

Please sign in to comment.