diff --git a/CHANGELOG.md b/CHANGELOG.md index c8dfefc1a9..4423537567 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6098](https://github.com/thanos-io/thanos/pull/6098) Cache/Redis: upgrade `rueidis` to v0.0.93 to fix potential panic when the client-side caching is disabled. - [#6103](https://github.com/thanos-io/thanos/pull/6103) Mixins(Rule): Fix query for long rule evaluations. - [#6121](https://github.com/thanos-io/thanos/pull/6121) Receive: Deduplicate metamonitoring queries. +- [#6137](https://github.com/thanos-io/thanos/pull/6137) Downsample: Repair of non-empty XOR chunks during 1h downsampling. ### Changed diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 07cf294f06..d55c95f449 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -167,10 +167,22 @@ func Downsample( // https://github.com/thanos-io/thanos/issues/5272 level.Warn(logger).Log("msg", fmt.Sprintf("expected downsampled chunk (*downsample.AggrChunk) got an empty %T instead for series: %d", c.Chunk, postings.At())) continue + } else { + if err := expandChunkIterator(c.Chunk.Iterator(reuseIt), &all); err != nil { + return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, postings.At()) + } + aggrDataChunks := DownsampleRaw(all, ResLevel1) + for _, cn := range aggrDataChunks { + ac, ok = cn.Chunk.(*AggrChunk) + if !ok { + return id, errors.New("Not able to convert non-empty XOR chunks to 5m downsampled aggregated chunks.") + } + } } - return id, errors.Errorf("expected downsampled chunk (*downsample.AggrChunk) got a non-empty %T instead for series: %d", c.Chunk, postings.At()) + } aggrChunks = append(aggrChunks, ac) + } downsampledChunks, err := downsampleAggr( aggrChunks, diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index d86f610007..14c4d4dea8 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -538,21 +538,23 @@ func TestDownsampleAggrAndEmptyXORChunks(t *testing.T) { } func TestDownsampleAggrAndNonEmptyXORChunks(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stderr) dir := t.TempDir() - ser := &series{lset: labels.FromStrings("__name__", "a")} aggr := map[AggrType][]sample{ - AggrCount: {{t: 1587690299999, v: 20}, {t: 1587690599999, v: 20}, {t: 1587690899999, v: 20}, {t: 1587691199999, v: 20}, {t: 1587691499999, v: 20}, {t: 1587691799999, v: 20}, {t: 1587692099999, v: 20}, {t: 1587692399999, v: 20}, {t: 1587692699999, v: 16}, {t: 1587692999999, v: 20}, {t: 1587693299999, v: 20}, {t: 1587693590791, v: 20}}, - AggrSum: {{t: 1587690299999, v: 9.276972e+06}, {t: 1587690599999, v: 9.359861e+06}, {t: 1587690899999, v: 9.447457e+06}, {t: 1587691199999, v: 9.542732e+06}, {t: 1587691499999, v: 9.630379e+06}, {t: 1587691799999, v: 9.715631e+06}, {t: 1587692099999, v: 9.799808e+06}, {t: 1587692399999, v: 9.888117e+06}, {t: 1587692699999, v: 2.98928e+06}, {t: 1587692999999, v: 81592}, {t: 1587693299999, v: 163711}, {t: 1587693590791, v: 255746}}, - AggrMin: {{t: 1587690299999, v: 461968}, {t: 1587690599999, v: 466070}, {t: 1587690899999, v: 470131}, {t: 1587691199999, v: 474913}, {t: 1587691499999, v: 479625}, {t: 1587691799999, v: 483709}, {t: 1587692099999, v: 488036}, {t: 1587692399999, v: 492223}, {t: 1587692699999, v: 75}, {t: 1587692999999, v: 2261}, {t: 1587693299999, v: 6210}, {t: 1587693590791, v: 10464}}, - AggrMax: {{t: 1587690299999, v: 465870}, {t: 1587690599999, v: 469951}, {t: 1587690899999, v: 474726}, {t: 1587691199999, v: 479368}, {t: 1587691499999, v: 483566}, {t: 1587691799999, v: 487787}, {t: 1587692099999, v: 492065}, {t: 1587692399999, v: 496245}, {t: 1587692699999, v: 496544}, {t: 1587692999999, v: 6010}, {t: 1587693299999, v: 10242}, {t: 1587693590791, v: 14956}}, - AggrCounter: {{t: 1587690005791, v: 461968}, {t: 1587690299999, v: 465870}, {t: 1587690599999, v: 469951}, {t: 1587690899999, v: 474726}, {t: 1587691199999, v: 479368}, {t: 1587691499999, v: 483566}, {t: 1587691799999, v: 487787}, {t: 1587692099999, v: 492065}, {t: 1587692399999, v: 496245}, {t: 1587692699999, v: 498647}, {t: 1587692999999, v: 502554}, {t: 1587693299999, v: 506786}, {t: 1587693590791, v: 511500}, {t: 1587693590791, v: 14956}}, + AggrCount: {{t: 1587690299999, v: 20}, {t: 1587690599999, v: 20}, {t: 1587690899999, v: 20}}, + AggrSum: {{t: 1587690299999, v: 9.276972e+06}, {t: 1587690599999, v: 9.359861e+06}, {t: 1587693590791, v: 255746}}, + AggrMin: {{t: 1587690299999, v: 461968}, {t: 1587690599999, v: 466070}, {t: 1587690899999, v: 470131}, {t: 1587691199999, v: 474913}}, + AggrMax: {{t: 1587690299999, v: 465870}, {t: 1587690599999, v: 469951}, {t: 1587690899999, v: 474726}}, + AggrCounter: {{t: 1587690005791, v: 461968}, {t: 1587690299999, v: 465870}, {t: 1587690599999, v: 469951}}, } raw := chunkenc.NewXORChunk() app, err := raw.Appender() testutil.Ok(t, err) + app.Append(1587690005794, 42.5) + ser.chunks = append(ser.chunks, encodeTestAggrSeries(aggr), chunks.Meta{ MinTime: math.MaxInt64, MaxTime: math.MinInt64, @@ -566,7 +568,64 @@ func TestDownsampleAggrAndNonEmptyXORChunks(t *testing.T) { fakeMeta.Thanos.Downsample.Resolution = 300_000 id, err := Downsample(logger, fakeMeta, mb, dir, 3_600_000) _ = id - testutil.NotOk(t, err) + testutil.Ok(t, err) + + expected := []map[AggrType][]sample{ + { + AggrCount: {{1587690005794, 20}, {1587690005794, 20}, {1587690005794, 21}}, + AggrSum: {{1587690005794, 9.276972e+06}, {1587690005794, 9.359861e+06}, {1587690005794, 255788.5}}, + AggrMin: {{1587690005794, 461968}, {1587690005794, 466070}, {1587690005794, 470131}, {1587690005794, 42.5}}, + AggrMax: {{1587690005794, 465870}, {1587690005794, 469951}, {1587690005794, 474726}}, + AggrCounter: {{1587690005791, 461968}, {1587690599999, 469951}, {1587690599999, 469951}}, + }, + } + + _, err = metadata.ReadFromDir(filepath.Join(dir, id.String())) + testutil.Ok(t, err) + + indexr, err := index.NewFileReader(filepath.Join(dir, id.String(), block.IndexFilename)) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, indexr.Close()) }() + + chunkr, err := chunks.NewDirReader(filepath.Join(dir, id.String(), block.ChunksDirname), NewPool()) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, chunkr.Close()) }() + + pall, err := indexr.Postings(index.AllPostingsKey()) + testutil.Ok(t, err) + + var series []storage.SeriesRef + for pall.Next() { + series = append(series, pall.At()) + } + testutil.Ok(t, pall.Err()) + testutil.Equals(t, 1, len(series)) + + var builder labels.ScratchBuilder + var chks []chunks.Meta + testutil.Ok(t, indexr.Series(series[0], &builder, &chks)) + + var got []map[AggrType][]sample + for _, c := range chks { + chk, err := chunkr.Chunk(c) + testutil.Ok(t, err) + + m := map[AggrType][]sample{} + for _, at := range []AggrType{AggrCount, AggrSum, AggrMin, AggrMax, AggrCounter} { + c, err := chk.(*AggrChunk).Get(at) + if err == ErrAggrNotExist { + continue + } + testutil.Ok(t, err) + + buf := m[at] + testutil.Ok(t, expandChunkIterator(c.Iterator(nil), &buf)) + m[at] = buf + } + got = append(got, m) + } + testutil.Equals(t, expected, got) + } func chunksToSeriesIteratable(t *testing.T, inRaw [][]sample, inAggr []map[AggrType][]sample) *series {