Skip to content

Commit

Permalink
add tests for downsampled block
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <[email protected]>
  • Loading branch information
yeya24 committed May 21, 2021
1 parent ace4e70 commit 666c8c0
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
- [#4176](https://github.com/thanos-io/thanos/pull/4176) Query API: Adds optional `Stats param` to return stats for query APIs
- [#4125](https://github.com/thanos-io/thanos/pull/4125) Rule: Add `--alert.relabel-config` / `--alert.relabel-config-file` allowing to specify alert relabel configurations like [Prometheus](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config)
- [#4211](https://github.com/thanos-io/thanos/pull/4211) Add TLS and basic authentication to Thanos APIs
- [#4239](https://github.com/thanos-io/thanos/pull/4239) Add penalty based deduplication mode for compactor.

### Fixed
-
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Default("48h").SetValue(&cc.deleteDelay)

cmd.Flag("compact.enable-vertical-compaction", "Experimental. When set to true, compactor will allow overlaps and perform **irreversible** vertical compaction. See https://thanos.io/tip/components/compact.md/#vertical-compactions to read more."+
"Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together)."+
"Please note that this uses a NAIVE algorithm for merging. If you need a smarter deduplication algorithm, please set it via -- compact.dedup-func."+
"NOTE: This flag is ignored and (enabled) when --deduplication.replica-label flag is set.").
Hidden().Default("false").BoolVar(&cc.enableVerticalCompaction)

Expand Down
184 changes: 184 additions & 0 deletions pkg/dedup/chunk_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/compact/downsample"
)

func TestDedupChunkSeriesMerger(t *testing.T) {
Expand Down Expand Up @@ -139,3 +142,184 @@ func TestDedupChunkSeriesMerger(t *testing.T) {
})
}
}

func TestDedupChunkSeriesMergerDownsampledChunks(t *testing.T) {
m := NewDedupChunkSeriesMerger()

defaultLabels := labels.FromStrings("bar", "baz")
emptySamples := downsample.SamplesFromTSDBSamples([]tsdbutil.Sample{})
// Samples are created with step 1m. So the 5m downsampled chunk has 2 samples.
samples1 := downsample.SamplesFromTSDBSamples(createSamplesWithStep(0, 10, 60*1000))
// Non overlapping samples with samples1. 5m downsampled chunk has 2 samples.
samples2 := downsample.SamplesFromTSDBSamples(createSamplesWithStep(600000, 10, 60*1000))

samples3 := downsample.SamplesFromTSDBSamples(createSamplesWithStep(120000, 10, 60*1000))

for _, tc := range []struct {
name string
input []storage.ChunkSeries
expected storage.ChunkSeries
}{
{
name: "single empty series",
input: []storage.ChunkSeries{
&storage.ChunkSeriesEntry{
Lset: defaultLabels,
ChunkIteratorFn: func() chunks.Iterator {
return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(emptySamples, downsample.ResLevel1)...)
},
},
},
expected: &storage.ChunkSeriesEntry{
Lset: defaultLabels,
ChunkIteratorFn: func() chunks.Iterator {
return storage.NewListChunkSeriesIterator()
},
},
},
{
name: "single series",
input: []storage.ChunkSeries{
&storage.ChunkSeriesEntry{
Lset: defaultLabels,
ChunkIteratorFn: func() chunks.Iterator {
return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples1, downsample.ResLevel1)...)
},
},
},
expected: &storage.ChunkSeriesEntry{
Lset: defaultLabels,
ChunkIteratorFn: func() chunks.Iterator {
return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples1, downsample.ResLevel1)...)
},
},
},
{
name: "two empty series",
input: []storage.ChunkSeries{
&storage.ChunkSeriesEntry{
Lset: defaultLabels,
ChunkIteratorFn: func() chunks.Iterator {
return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(emptySamples, downsample.ResLevel1)...)
},
},
&storage.ChunkSeriesEntry{
Lset: defaultLabels,
ChunkIteratorFn: func() chunks.Iterator {
return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(emptySamples, downsample.ResLevel1)...)
},
},
},
expected: &storage.ChunkSeriesEntry{
Lset: defaultLabels,
ChunkIteratorFn: func() chunks.Iterator {
return storage.NewListChunkSeriesIterator()
},
},
},
{
name: "two non overlapping series",
input: []storage.ChunkSeries{
&storage.ChunkSeriesEntry{
Lset: defaultLabels,
ChunkIteratorFn: func() chunks.Iterator {
return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples1, downsample.ResLevel1)...)
},
},
&storage.ChunkSeriesEntry{
Lset: defaultLabels,
ChunkIteratorFn: func() chunks.Iterator {
return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples2, downsample.ResLevel1)...)
},
},
},
expected: &storage.ChunkSeriesEntry{
Lset: defaultLabels,
ChunkIteratorFn: func() chunks.Iterator {
return storage.NewListChunkSeriesIterator(
append(downsample.DownsampleRaw(samples1, downsample.ResLevel1),
downsample.DownsampleRaw(samples2, downsample.ResLevel1)...)...)
},
},
},
{
// 1:1 duplicated chunks are deduplicated.
name: "two same series",
input: []storage.ChunkSeries{
&storage.ChunkSeriesEntry{
Lset: defaultLabels,
ChunkIteratorFn: func() chunks.Iterator {
return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples1, downsample.ResLevel1)...)
},
},
&storage.ChunkSeriesEntry{
Lset: defaultLabels,
ChunkIteratorFn: func() chunks.Iterator {
return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples1, downsample.ResLevel1)...)
},
},
},
expected: &storage.ChunkSeriesEntry{
Lset: defaultLabels,
ChunkIteratorFn: func() chunks.Iterator {
return storage.NewListChunkSeriesIterator(
downsample.DownsampleRaw(samples1, downsample.ResLevel1)...)
},
},
},
{
name: "two overlapping series",
input: []storage.ChunkSeries{
&storage.ChunkSeriesEntry{
Lset: defaultLabels,
ChunkIteratorFn: func() chunks.Iterator {
return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples1, downsample.ResLevel1)...)
},
},
&storage.ChunkSeriesEntry{
Lset: defaultLabels,
ChunkIteratorFn: func() chunks.Iterator {
return storage.NewListChunkSeriesIterator(downsample.DownsampleRaw(samples3, downsample.ResLevel1)...)
},
},
},
expected: &storage.ChunkSeriesEntry{
Lset: defaultLabels,
ChunkIteratorFn: func() chunks.Iterator {
return storage.NewListChunkSeriesIterator(chunks.Meta{
MinTime: 299999,
MaxTime: 540000,
Chunk: downsample.EncodeAggrChunk([5]chunkenc.Chunk{
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{299999, 3}, sample{540000, 5}}).Chunk,
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{299999, 540000}, sample{540000, 2100000}}).Chunk,
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{299999, 120000}, sample{540000, 300000}}).Chunk,
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{299999, 240000}, sample{540000, 540000}}).Chunk,
tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{299999, 240000}, sample{299999, 240000}}).Chunk,
}),
})
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
merged := m(tc.input...)
require.Equal(t, tc.expected.Labels(), merged.Labels())
actChks, actErr := storage.ExpandChunks(merged.Iterator())
expChks, expErr := storage.ExpandChunks(tc.expected.Iterator())

require.Equal(t, expErr, actErr)
require.Equal(t, expChks, actChks)
})
}
}

func createSamplesWithStep(start, numOfSamples, step int) []tsdbutil.Sample {
res := make([]tsdbutil.Sample, numOfSamples)
cur := start
for i := 0; i < numOfSamples; i++ {
res[i] = sample{t: int64(cur), v: float64(cur)}
cur += step
}

return res
}

0 comments on commit 666c8c0

Please sign in to comment.