Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

skip blocks with out-of-order chunk during compaction #4469

Merged
merged 1 commit into from
Aug 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4444](https://github.com/thanos-io/thanos/pull/4444) UI: Add search block UI.
- [#4509](https://github.com/thanos-io/thanos/pull/4509) Logging: Adds duration_ms in int64 to the logs.
- [#4462](https://github.com/thanos-io/thanos/pull/4462) UI: Add find overlap block UI
- [#4469](https://github.com/thanos-io/thanos/pull/4469) Compact: Add flag `compact.skip-block-with-out-of-order-chunks` to skip blocks with out-of-order chunks during compaction instead of halting

### Fixed

Expand Down
21 changes: 14 additions & 7 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,10 @@ func newCompactMetrics(reg *prometheus.Registry, deleteDelay time.Duration) *com
m.blocksMarked = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_compact_blocks_marked_total",
Help: "Total number of blocks marked in compactor.",
}, []string{"marker"})
m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename)
m.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename)
}, []string{"marker", "reason"})
m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason)
huyan0 marked this conversation as resolved.
Show resolved Hide resolved
m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason)
m.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")

m.garbageCollectedBlocks = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_garbage_collected_blocks_total",
Expand Down Expand Up @@ -281,7 +282,7 @@ func runCompact(
cf,
duplicateBlocksFilter,
ignoreDeletionMarkFilter,
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, ""),
compactMetrics.garbageCollectedBlocks,
conf.blockSyncConcurrency)
if err != nil {
Expand Down Expand Up @@ -347,15 +348,17 @@ func runCompact(
conf.acceptMalformedIndex,
enableVerticalCompaction,
reg,
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, ""),
compactMetrics.garbageCollectedBlocks,
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason),
metadata.HashFunc(conf.hashFunc),
conf.skipBlockWithOutOfOrderChunks,
)
planner := compact.WithLargeTotalIndexSizeFilter(
compact.NewPlanner(logger, levels, noCompactMarkerFilter),
bkt,
int64(conf.maxBlockIndexSize),
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason),
)
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(
Expand Down Expand Up @@ -448,7 +451,7 @@ func runCompact(
return errors.Wrap(err, "sync before first pass of downsampling")
}

if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename)); err != nil {
if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")); err != nil {
return errors.Wrap(err, "retention failed")
}

Expand Down Expand Up @@ -585,6 +588,7 @@ type compactConfig struct {
hashFunc string
enableVerticalCompaction bool
dedupFunc string
skipBlockWithOutOfOrderChunks bool
}

func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -668,6 +672,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"Default is due to https://github.com/thanos-io/thanos/issues/1424, but it's overall recommended to keeps block size to some reasonable size.").
Hidden().Default("64GB").BytesVar(&cc.maxBlockIndexSize)

cmd.Flag("compact.skip-block-with-out-of-order-chunks", "When set to true, mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction").
Hidden().Default("false").BoolVar(&cc.skipBlockWithOutOfOrderChunks)

cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").EnumVar(&cc.hashFunc, "SHA256", "")

Expand Down
19 changes: 13 additions & 6 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,9 @@ func (i HealthStats) Issue347OutsideChunksErr() error {
return nil
}

// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure.
func (i HealthStats) CriticalErr() error {
var errMsg []string

if i.OutOfOrderSeries > 0 {
errMsg = append(errMsg, fmt.Sprintf(
func (i HealthStats) OutOfOrderChunksErr() error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering, do you think we can just skip compaction for all the CriticalErr; instead of just OutOfOrderChunksErr?

Copy link
Contributor Author

@huyan0 huyan0 Jul 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay on the compact side given we cannot generate more blocks than not compacting...I don't think there's much implication on the retrieval side either but would like to hear what @yeya24 thinks : )
@yeya24 do you have some context on why the project decided to halt compaction in the first place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bwplotka do you have some context on the discussion? :)

if i.OutOfOrderChunks > 0 {
return errors.New(fmt.Sprintf(
"%d/%d series have an average of %.3f out-of-order chunks: "+
"%.3f of these are exact duplicates (in terms of data and time range)",
i.OutOfOrderSeries,
Expand All @@ -125,6 +122,12 @@ func (i HealthStats) CriticalErr() error {
float64(i.DuplicatedChunks)/float64(i.OutOfOrderChunks),
))
}
return nil
}

// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure.
func (i HealthStats) CriticalErr() error {
var errMsg []string

n := i.OutsideChunks - (i.CompleteOutsideChunks + i.Issue347OutsideChunks)
if n > 0 {
Expand Down Expand Up @@ -158,6 +161,10 @@ func (i HealthStats) AnyErr() error {
errMsg = append(errMsg, err.Error())
}

if err := i.OutOfOrderChunksErr(); err != nil {
errMsg = append(errMsg, err.Error())
}

if len(errMsg) > 0 {
return errors.New(strings.Join(errMsg, ", "))
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/block/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package block
import (
"context"
"io/ioutil"
"math"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -83,5 +84,18 @@ func TestRewrite(t *testing.T) {
testutil.Ok(t, ir2.Series(p.At(), &lset, &chks))
testutil.Equals(t, 1, len(chks))
}
}

func TestGatherIndexHealthStatsReturnsOutOfOrderChunksErr(t *testing.T) {
blockDir, err := ioutil.TempDir("", "test-ooo-index")
testutil.Ok(t, err)

err = testutil.PutOutOfOrderIndex(blockDir, 0, math.MaxInt64)
testutil.Ok(t, err)

stats, err := GatherIndexHealthStats(log.NewLogfmtLogger(os.Stderr), blockDir+"/"+IndexFilename, 0, math.MaxInt64)

testutil.Ok(t, err)
testutil.Equals(t, 1, stats.OutOfOrderChunks)
testutil.NotOk(t, stats.OutOfOrderChunksErr())
}
2 changes: 2 additions & 0 deletions pkg/block/metadata/markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ const (
// IndexSizeExceedingNoCompactReason is a reason of index being too big (for example exceeding 64GB limit: https://github.com/thanos-io/thanos/issues/1424)
// This reason can be ignored when vertical block sharding will be implemented.
IndexSizeExceedingNoCompactReason = "index-size-exceeding"
// OutOfOrderChunksNoCompactReason is a reason of to no compact block with index contains out of order chunk so that the compaction is not blocked.
OutOfOrderChunksNoCompactReason = "block-index-out-of-order-chunk"
)

// NoCompactMark marker stores reason of block being excluded from compaction if needed.
Expand Down
147 changes: 100 additions & 47 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,18 +229,20 @@ func defaultGroupKey(res int64, lbls labels.Labels) string {
// DefaultGrouper is the Thanos built-in grouper. It groups blocks based on downsample
// resolution and block's labels.
type DefaultGrouper struct {
bkt objstore.Bucket
logger log.Logger
acceptMalformedIndex bool
enableVerticalCompaction bool
compactions *prometheus.CounterVec
compactionRunsStarted *prometheus.CounterVec
compactionRunsCompleted *prometheus.CounterVec
compactionFailures *prometheus.CounterVec
verticalCompactions *prometheus.CounterVec
garbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
hashFunc metadata.HashFunc
bkt objstore.Bucket
logger log.Logger
acceptMalformedIndex bool
enableVerticalCompaction bool
compactions *prometheus.CounterVec
compactionRunsStarted *prometheus.CounterVec
compactionRunsCompleted *prometheus.CounterVec
compactionFailures *prometheus.CounterVec
verticalCompactions *prometheus.CounterVec
garbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
hashFunc metadata.HashFunc
skipChunksWithOutOfOrderBlocks bool
}

// NewDefaultGrouper makes a new DefaultGrouper.
Expand All @@ -252,7 +254,9 @@ func NewDefaultGrouper(
reg prometheus.Registerer,
blocksMarkedForDeletion prometheus.Counter,
garbageCollectedBlocks prometheus.Counter,
blocksMarkedForNoCompact prometheus.Counter,
hashFunc metadata.HashFunc,
skipChunksWithOutOfOrderBlocks bool,
) *DefaultGrouper {
return &DefaultGrouper{
bkt: bkt,
Expand All @@ -279,9 +283,11 @@ func NewDefaultGrouper(
Name: "thanos_compact_group_vertical_compactions_total",
Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.",
}, []string{"group"}),
garbageCollectedBlocks: garbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
hashFunc: hashFunc,
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
garbageCollectedBlocks: garbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
hashFunc: hashFunc,
skipChunksWithOutOfOrderBlocks: skipChunksWithOutOfOrderBlocks,
}
}

Expand Down Expand Up @@ -309,7 +315,9 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
g.verticalCompactions.WithLabelValues(groupKey),
g.garbageCollectedBlocks,
g.blocksMarkedForDeletion,
g.blocksMarkedForNoCompact,
g.hashFunc,
g.skipChunksWithOutOfOrderBlocks,
)
if err != nil {
return nil, errors.Wrap(err, "create compaction group")
Expand All @@ -330,23 +338,25 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
// Group captures a set of blocks that have the same origin labels and downsampling resolution.
// Those blocks generally contain the same series and can thus efficiently be compacted.
type Group struct {
logger log.Logger
bkt objstore.Bucket
key string
labels labels.Labels
resolution int64
mtx sync.Mutex
metasByMinTime []*metadata.Meta
acceptMalformedIndex bool
enableVerticalCompaction bool
compactions prometheus.Counter
compactionRunsStarted prometheus.Counter
compactionRunsCompleted prometheus.Counter
compactionFailures prometheus.Counter
verticalCompactions prometheus.Counter
groupGarbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
hashFunc metadata.HashFunc
logger log.Logger
bkt objstore.Bucket
key string
labels labels.Labels
resolution int64
mtx sync.Mutex
metasByMinTime []*metadata.Meta
acceptMalformedIndex bool
enableVerticalCompaction bool
compactions prometheus.Counter
compactionRunsStarted prometheus.Counter
compactionRunsCompleted prometheus.Counter
compactionFailures prometheus.Counter
verticalCompactions prometheus.Counter
groupGarbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
hashFunc metadata.HashFunc
skipChunksWithOutofOrderBlocks bool
}

// NewGroup returns a new compaction group.
Expand All @@ -365,27 +375,31 @@ func NewGroup(
verticalCompactions prometheus.Counter,
groupGarbageCollectedBlocks prometheus.Counter,
blocksMarkedForDeletion prometheus.Counter,
blockMakredForNoCopmact prometheus.Counter,
hashFunc metadata.HashFunc,
skipChunksWithOutOfOrderChunks bool,
) (*Group, error) {
if logger == nil {
logger = log.NewNopLogger()
}
g := &Group{
logger: logger,
bkt: bkt,
key: key,
labels: lset,
resolution: resolution,
acceptMalformedIndex: acceptMalformedIndex,
enableVerticalCompaction: enableVerticalCompaction,
compactions: compactions,
compactionRunsStarted: compactionRunsStarted,
compactionRunsCompleted: compactionRunsCompleted,
compactionFailures: compactionFailures,
verticalCompactions: verticalCompactions,
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
hashFunc: hashFunc,
logger: logger,
bkt: bkt,
key: key,
labels: lset,
resolution: resolution,
acceptMalformedIndex: acceptMalformedIndex,
enableVerticalCompaction: enableVerticalCompaction,
compactions: compactions,
compactionRunsStarted: compactionRunsStarted,
compactionRunsCompleted: compactionRunsCompleted,
compactionFailures: compactionFailures,
verticalCompactions: verticalCompactions,
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
blocksMarkedForNoCompact: blockMakredForNoCopmact,
hashFunc: hashFunc,
skipChunksWithOutofOrderBlocks: skipChunksWithOutOfOrderChunks,
}
return g, nil
}
Expand Down Expand Up @@ -541,6 +555,26 @@ func IsIssue347Error(err error) bool {
return ok
}

// OutOfOrderChunkError is a type wrapper for OOO chunk error from validating block index.
type OutOfOrderChunksError struct {
err error
id ulid.ULID
}

func (e OutOfOrderChunksError) Error() string {
return e.err.Error()
}

func outOfOrderChunkError(err error, brokenBlock ulid.ULID) OutOfOrderChunksError {
return OutOfOrderChunksError{err: err, id: brokenBlock}
}

// IsOutOfOrderChunk returns true if the base error is a OutOfOrderChunkError.
func IsOutOfOrderChunkError(err error) bool {
_, ok := errors.Cause(err).(OutOfOrderChunksError)
return ok
}

// HaltError is a type wrapper for errors that should halt any further progress on compactions.
type HaltError struct {
err error
Expand Down Expand Up @@ -749,6 +783,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels))
}

if err := stats.OutOfOrderChunksErr(); cg.skipChunksWithOutofOrderBlocks && err != nil {
return false, ulid.ULID{}, outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID)
}

if err := stats.Issue347OutsideChunksErr(); err != nil {
return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
}
Expand Down Expand Up @@ -939,6 +977,21 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
continue
}
}
// if block has out of order chunk, mark the block for no compaction and continue.
if IsOutOfOrderChunkError(err) {
if err := block.MarkForNoCompact(
ctx,
c.logger,
c.bkt,
err.(OutOfOrderChunksError).id,
metadata.OutOfOrderChunksNoCompactReason,
"OutofOrderChunk: marking block with out-of-order series/chunks to as no compact to unblock compaction", g.blocksMarkedForNoCompact); err == nil {
mtx.Lock()
finishedAllGroups = false
mtx.Unlock()
continue
}
}
errChan <- errors.Wrapf(err, "group %s", g.Key())
return
}
Expand Down
Loading