From 5a71d03d6bbd0ecd07b93710546d7c989c78719c Mon Sep 17 00:00:00 2001 From: yeya24 Date: Tue, 18 May 2021 15:32:24 -0400 Subject: [PATCH] add merge unit test Signed-off-by: yeya24 --- go.mod | 1 + pkg/compact/compact_e2e_test.go | 15 +++- pkg/dedup/chunk_iter.go | 41 +++++----- pkg/dedup/chunk_iter_test.go | 141 ++++++++++++++++++++++++++++++++ pkg/dedup/iter_test.go | 8 ++ 5 files changed, 185 insertions(+), 21 deletions(-) create mode 100644 pkg/dedup/chunk_iter_test.go diff --git a/go.mod b/go.mod index f68a4422202..d3705bca0e3 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,7 @@ require ( github.com/prometheus/common v0.23.0 github.com/prometheus/exporter-toolkit v0.5.1 github.com/prometheus/prometheus v1.8.2-0.20210518163837-0a8912433a45 + github.com/stretchr/testify v1.7.0 github.com/uber/jaeger-client-go v2.28.0+incompatible github.com/uber/jaeger-lib v2.4.1+incompatible github.com/weaveworks/common v0.0.0-20210419092856-009d1eebd624 diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 663bb5767f8..b6071ff2abe 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -8,6 +8,8 @@ import ( "context" "encoding/json" "fmt" + "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/dedup" "io/ioutil" "os" "path" @@ -167,7 +169,16 @@ func MetricCount(c prometheus.Collector) int { return mCount } -func TestGroup_Compact_e2e(t *testing.T) { +func TestGroupCompactE2E(t *testing.T) { + testGroupCompactE2e(t, nil) +} + +// Penalty based merger should get the same result as the blocks don't have overlap. +func TestGroupCompactPenaltyDedupE2E(t *testing.T) { + testGroupCompactE2e(t, dedup.NewDedupChunkSeriesMerger()) +} + +func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMergeFunc) { objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() @@ -194,7 +205,7 @@ func TestGroup_Compact_e2e(t *testing.T) { sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 5) testutil.Ok(t, err) - comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, nil) + comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, mergeFunc) testutil.Ok(t, err) planner := NewTSDBBasedPlanner(logger, []int64{1000, 3000}) diff --git a/pkg/dedup/chunk_iter.go b/pkg/dedup/chunk_iter.go index 12344626aa6..f2cb74bb4b7 100644 --- a/pkg/dedup/chunk_iter.go +++ b/pkg/dedup/chunk_iter.go @@ -4,6 +4,7 @@ package dedup import ( + "bytes" "container/heap" "math" @@ -23,13 +24,16 @@ func NewDedupChunkSeriesMerger() storage.VerticalChunkSeriesMergeFunc { if len(series) == 0 { return nil } - if len(series) == 0 { - return series[0] - } return &storage.ChunkSeriesEntry{ Lset: series[0].Labels(), ChunkIteratorFn: func() chunks.Iterator { - return newDedupChunksIterator(series...) + iterators := make([]chunks.Iterator, 0, len(series)) + for _, s := range series { + iterators = append(iterators, s.Iterator()) + } + return &dedupChunksIterator{ + iterators: iterators, + } }, } } @@ -43,20 +47,12 @@ type dedupChunksIterator struct { curr chunks.Meta } -func newDedupChunksIterator(replicas ...storage.ChunkSeries) chunks.Iterator { - iterators := make([]chunks.Iterator, 0, len(replicas)) - for _, s := range replicas { - iterators = append(iterators, s.Iterator()) - } - return &dedupChunksIterator{ - iterators: iterators, - } -} - func (d *dedupChunksIterator) At() chunks.Meta { return d.curr } +// Next method is almost the same as https://github.com/prometheus/prometheus/blob/v2.27.1/storage/merge.go#L615. +// The difference is that it handles both XOR and Aggr chunk Encoding. func (d *dedupChunksIterator) Next() bool { if d.h == nil { for _, iter := range d.iterators { @@ -78,6 +74,7 @@ func (d *dedupChunksIterator) Next() bool { var ( om = newOverlappingMerger() oMaxTime = d.curr.MaxTime + prev = d.curr ) // Detect overlaps to compact. @@ -89,11 +86,18 @@ func (d *dedupChunksIterator) Next() bool { break } - // We operate on same series, so labels does not matter here. - om.addChunk(next) + if next.MinTime == prev.MinTime && + next.MaxTime == prev.MaxTime && + bytes.Equal(next.Chunk.Bytes(), prev.Chunk.Bytes()) { + // 1:1 duplicates, skip it. + } else { + // We operate on same series, so labels does not matter here. + om.addChunk(next) - if next.MaxTime > oMaxTime { - oMaxTime = next.MaxTime + if next.MaxTime > oMaxTime { + oMaxTime = next.MaxTime + } + prev = next } iter := heap.Pop(&d.h).(chunks.Iterator) @@ -261,7 +265,6 @@ func (o *overlappingMerger) iterator(baseChk chunks.Meta) chunks.Iterator { }}}).Iterator() case downsample.ChunkEncAggr: - // If Aggr encoding, each aggregated chunks need to be expanded and deduplicated, // then re-encoded into Aggr chunks. aggrChk := baseChk.Chunk.(*downsample.AggrChunk) diff --git a/pkg/dedup/chunk_iter_test.go b/pkg/dedup/chunk_iter_test.go new file mode 100644 index 00000000000..4eab9666eb0 --- /dev/null +++ b/pkg/dedup/chunk_iter_test.go @@ -0,0 +1,141 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package dedup + +import ( + "testing" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/tsdbutil" + "github.com/stretchr/testify/require" +) + +func TestDedupChunkSeriesMerger(t *testing.T) { + m := NewDedupChunkSeriesMerger() + + for _, tc := range []struct { + name string + input []storage.ChunkSeries + expected storage.ChunkSeries + }{ + { + name: "single empty series", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil), + }, + { + name: "single series", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), + }, + { + name: "two empty series", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil), + }, + { + name: "two non overlapping", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{5, 5}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{5, 5}}, []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + }, + { + name: "two overlapping", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{8, 8}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{8, 8}}, []tsdbutil.Sample{sample{10, 10}}), + }, + { + name: "two overlapping with large time diff", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{2, 2}, sample{5008, 5008}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + }, + // sample{5008, 5008} is added to the result due to its large timestamp. + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{5008, 5008}}), + }, + { + name: "two duplicated", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{5, 5}}), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + }, + { + name: "three overlapping", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{6, 6}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{4, 4}}), + }, + // only samples from the last series are retained due to high penalty. + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{4, 4}}), + }, + { + name: "three in chained overlap", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{4, 4}, sample{6, 66}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{6, 6}, sample{10, 10}}), + }, + // only samples from the last series are retained due to high penalty. + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + }, + { + name: "three in chained overlap complex", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{5, 5}}, []tsdbutil.Sample{sample{10, 10}, sample{15, 15}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{20, 20}}, []tsdbutil.Sample{sample{25, 25}, sample{30, 30}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{18, 18}, sample{26, 26}}, []tsdbutil.Sample{sample{31, 31}, sample{35, 35}}), + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []tsdbutil.Sample{sample{0, 0}, sample{5, 5}}, + []tsdbutil.Sample{sample{31, 31}, sample{35, 35}}, + ), + }, + { + name: "110 overlapping samples", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(0, 110)), // [0 - 110) + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(60, 50)), // [60 - 110) + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + tsdbutil.GenerateSamples(0, 110), + ), + }, + { + name: "150 overlapping samples, split chunk", + input: []storage.ChunkSeries{ + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(0, 90)), // [0 - 90) + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(60, 90)), // [90 - 150) + }, + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + tsdbutil.GenerateSamples(0, 90), + ), + }, + } { + t.Run(tc.name, func(t *testing.T) { + merged := m(tc.input...) + require.Equal(t, tc.expected.Labels(), merged.Labels()) + actChks, actErr := storage.ExpandChunks(merged.Iterator()) + expChks, expErr := storage.ExpandChunks(tc.expected.Iterator()) + + require.Equal(t, expErr, actErr) + require.Equal(t, expChks, actChks) + }) + } +} diff --git a/pkg/dedup/iter_test.go b/pkg/dedup/iter_test.go index b3f52f5bde8..79bd5de3c83 100644 --- a/pkg/dedup/iter_test.go +++ b/pkg/dedup/iter_test.go @@ -22,6 +22,14 @@ type sample struct { v float64 } +func (s sample) T() int64 { + return s.t +} + +func (s sample) V() float64 { + return s.v +} + type series struct { lset labels.Labels samples []sample