Skip to content

Commit

Permalink
Repair non-empty XOR chunks during 1h downsampling
Browse files Browse the repository at this point in the history
Iterated through the non empty XOR chunks, stored the results in a buffer and then downsampled them to 5m aggregated chunks by using DownsampleRaw method and then passed them for 1h downsampling.

Signed-off-by: Kartik-Garg <[email protected]>
  • Loading branch information
Kartik-Garg committed Feb 17, 2023
1 parent 0edbf2b commit 35eab83
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
15 changes: 14 additions & 1 deletion pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,23 @@ func Downsample(
// https://github.com/thanos-io/thanos/issues/5272
level.Warn(logger).Log("msg", fmt.Sprintf("expected downsampled chunk (*downsample.AggrChunk) got an empty %T instead for series: %d", c.Chunk, postings.At()))
continue
} else {
if err := expandChunkIterator(c.Chunk.Iterator(reuseIt), &all); err != nil {
return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, postings.At())
}
aggrDataChunks := DownsampleRaw(all, ResLevel1)
for _, cn := range aggrDataChunks {
ac, ok = cn.Chunk.(*AggrChunk)
if !ok {
level.Warn(logger).Log("Not able to convert non-empty XOR chunks into 5m downsampled Aggregated chunks")
continue
}
}

}
return id, errors.Errorf("expected downsampled chunk (*downsample.AggrChunk) got a non-empty %T instead for series: %d", c.Chunk, postings.At())
}
aggrChunks = append(aggrChunks, ac)

}
downsampledChunks, err := downsampleAggr(
aggrChunks,
Expand Down
5 changes: 4 additions & 1 deletion pkg/compact/downsample/downsample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ func TestDownsampleAggrAndNonEmptyXORChunks(t *testing.T) {
raw := chunkenc.NewXORChunk()
app, err := raw.Appender()
testutil.Ok(t, err)
// this comes in !ok and passes through our newly created funcionality
app.Append(1587690005794, 42.5)
ser.chunks = append(ser.chunks, encodeTestAggrSeries(aggr), chunks.Meta{
MinTime: math.MaxInt64,
Expand All @@ -563,10 +564,12 @@ func TestDownsampleAggrAndNonEmptyXORChunks(t *testing.T) {
mb.addSeries(ser)

fakeMeta := &metadata.Meta{}
// target
fakeMeta.Thanos.Downsample.Resolution = 300_000
// already existing resolution
id, err := Downsample(logger, fakeMeta, mb, dir, 3_600_000)
_ = id
testutil.NotOk(t, err)
testutil.Ok(t, err)
}

func chunksToSeriesIteratable(t *testing.T, inRaw [][]sample, inAggr []map[AggrType][]sample) *series {
Expand Down

0 comments on commit 35eab83

Please sign in to comment.