diff --git a/CHANGELOG.md b/CHANGELOG.md index ae9f5391d6..032fce9609 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Changed - [#5255](https://github.com/thanos-io/thanos/pull/5296) Query: Use k-way merging for the proxying logic. The proxying sub-system now uses much less resources (~25-80% less CPU usage, ~30-50% less RAM usage according to our benchmarks). Reduces query duration by a few percent on queries with lots of series. +- [#5690](https://github.com/thanos-io/thanos/pull/5690) Compact: update `--debug.accept-malformed-index` flag to apply to downsampling. Previously the flag only applied to compaction, and fatal errors would still occur when downsampling was attempted. ### Removed diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index ca9fba5543..6d81b32975 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -438,7 +438,7 @@ func runCompact( downsampleMetrics.downsamples.WithLabelValues(groupKey) downsampleMetrics.downsampleFailures.WithLabelValues(groupKey) } - if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, metadata.HashFunc(conf.hashFunc)); err != nil { + if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, metadata.HashFunc(conf.hashFunc), conf.acceptMalformedIndex); err != nil { return errors.Wrap(err, "first pass of downsampling failed") } @@ -446,7 +446,7 @@ func runCompact( if err := sy.SyncMetas(ctx); err != nil { return errors.Wrap(err, "sync before second pass of downsampling") } - if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, metadata.HashFunc(conf.hashFunc)); err != nil { + if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, metadata.HashFunc(conf.hashFunc), conf.acceptMalformedIndex); err != nil { return errors.Wrap(err, "second pass of downsampling failed") } level.Info(logger).Log("msg", "downsampling iterations done") @@ -657,7 +657,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("debug.halt-on-error", "Halt the process if a critical compaction error is detected."). Hidden().Default("true").BoolVar(&cc.haltOnError) cmd.Flag("debug.accept-malformed-index", - "Compaction index verification will ignore out of order label names."). + "Compaction and downsampling index verification will ignore out of order label names."). Hidden().Default("false").BoolVar(&cc.acceptMalformedIndex) cmd.Flag("debug.max-compaction-level", fmt.Sprintf("Maximum compaction level, default is %d: %s", compactions.maxLevel(), compactions.String())). Hidden().Default(strconv.Itoa(compactions.maxLevel())).IntVar(&cc.maxCompactionLevel) diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 232d3ac49d..693823afc2 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -126,7 +126,7 @@ func RunDownsample( metrics.downsamples.WithLabelValues(groupKey) metrics.downsampleFailures.WithLabelValues(groupKey) } - if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc); err != nil { + if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc, false); err != nil { return errors.Wrap(err, "downsampling failed") } @@ -135,7 +135,7 @@ func RunDownsample( if err != nil { return errors.Wrap(err, "sync before second pass of downsampling") } - if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc); err != nil { + if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc, false); err != nil { return errors.Wrap(err, "downsampling failed") } return nil @@ -175,6 +175,7 @@ func downsampleBucket( dir string, downsampleConcurrency int, hashFunc metadata.HashFunc, + acceptMalformedIndex bool, ) (rerr error) { if err := os.MkdirAll(dir, 0750); err != nil { return errors.Wrap(err, "create dir") @@ -252,7 +253,7 @@ func downsampleBucket( resolution = downsample.ResLevel2 errMsg = "downsampling to 60 min" } - if err := processDownsampling(workerCtx, logger, bkt, m, dir, resolution, hashFunc, metrics); err != nil { + if err := processDownsampling(workerCtx, logger, bkt, m, dir, resolution, hashFunc, metrics, acceptMalformedIndex); err != nil { metrics.downsampleFailures.WithLabelValues(m.Thanos.GroupKey()).Inc() errCh <- errors.Wrap(err, errMsg) @@ -341,6 +342,7 @@ func processDownsampling( resolution int64, hashFunc metadata.HashFunc, metrics *DownsampleMetrics, + acceptMalformedIndex bool, ) error { begin := time.Now() bdir := filepath.Join(dir, m.ULID.String()) @@ -351,7 +353,7 @@ func processDownsampling( } level.Info(logger).Log("msg", "downloaded block", "id", m.ULID, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) - if err := block.VerifyIndex(logger, filepath.Join(bdir, block.IndexFilename), m.MinTime, m.MaxTime); err != nil { + if err := block.VerifyIndex(logger, filepath.Join(bdir, block.IndexFilename), m.MinTime, m.MaxTime); err != nil && !acceptMalformedIndex { return errors.Wrap(err, "input block index not valid") } @@ -381,7 +383,7 @@ func processDownsampling( "from", m.ULID, "to", id, "duration", downsampleDuration, "duration_ms", downsampleDuration.Milliseconds()) metrics.downsampleDuration.WithLabelValues(m.Thanos.GroupKey()).Observe(downsampleDuration.Seconds()) - if err := block.VerifyIndex(logger, filepath.Join(resdir, block.IndexFilename), m.MinTime, m.MaxTime); err != nil { + if err := block.VerifyIndex(logger, filepath.Join(resdir, block.IndexFilename), m.MinTime, m.MaxTime); err != nil && !acceptMalformedIndex { return errors.Wrap(err, "output block index not valid") } diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go index 46179d9134..74daa223d8 100644 --- a/cmd/thanos/main_test.go +++ b/cmd/thanos/main_test.go @@ -157,7 +157,7 @@ func TestRegression4960_Deadlock(t *testing.T) { metas, _, err := metaFetcher.Fetch(ctx) testutil.Ok(t, err) - err = downsampleBucket(ctx, logger, metrics, bkt, metas, dir, 1, metadata.NoneFunc) + err = downsampleBucket(ctx, logger, metrics, bkt, metas, dir, 1, metadata.NoneFunc, false) testutil.NotOk(t, err) testutil.Assert(t, strings.Contains(err.Error(), "some random error has occurred")) @@ -196,7 +196,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) { metas, _, err := metaFetcher.Fetch(ctx) testutil.Ok(t, err) - testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir, 1, metadata.NoneFunc)) + testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir, 1, metadata.NoneFunc, false)) testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey()))) _, err = os.Stat(dir)