Skip to content

Commit

Permalink
add merge unit test
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <[email protected]>
  • Loading branch information
yeya24 committed May 18, 2021
1 parent 6884bbb commit 5a71d03
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 21 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ require (
github.com/prometheus/common v0.23.0
github.com/prometheus/exporter-toolkit v0.5.1
github.com/prometheus/prometheus v1.8.2-0.20210518163837-0a8912433a45
github.com/stretchr/testify v1.7.0
github.com/uber/jaeger-client-go v2.28.0+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible
github.com/weaveworks/common v0.0.0-20210419092856-009d1eebd624
Expand Down
15 changes: 13 additions & 2 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/dedup"
"io/ioutil"
"os"
"path"
Expand Down Expand Up @@ -167,7 +169,16 @@ func MetricCount(c prometheus.Collector) int {
return mCount
}

func TestGroup_Compact_e2e(t *testing.T) {
func TestGroupCompactE2E(t *testing.T) {
testGroupCompactE2e(t, nil)
}

// Penalty based merger should get the same result as the blocks don't have overlap.
func TestGroupCompactPenaltyDedupE2E(t *testing.T) {
testGroupCompactE2e(t, dedup.NewDedupChunkSeriesMerger())
}

func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMergeFunc) {
objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
Expand All @@ -194,7 +205,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 5)
testutil.Ok(t, err)

comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, nil)
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, mergeFunc)
testutil.Ok(t, err)

planner := NewTSDBBasedPlanner(logger, []int64{1000, 3000})
Expand Down
41 changes: 22 additions & 19 deletions pkg/dedup/chunk_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package dedup

import (
"bytes"
"container/heap"
"math"

Expand All @@ -23,13 +24,16 @@ func NewDedupChunkSeriesMerger() storage.VerticalChunkSeriesMergeFunc {
if len(series) == 0 {
return nil
}
if len(series) == 0 {
return series[0]
}
return &storage.ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: func() chunks.Iterator {
return newDedupChunksIterator(series...)
iterators := make([]chunks.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator())
}
return &dedupChunksIterator{
iterators: iterators,
}
},
}
}
Expand All @@ -43,20 +47,12 @@ type dedupChunksIterator struct {
curr chunks.Meta
}

func newDedupChunksIterator(replicas ...storage.ChunkSeries) chunks.Iterator {
iterators := make([]chunks.Iterator, 0, len(replicas))
for _, s := range replicas {
iterators = append(iterators, s.Iterator())
}
return &dedupChunksIterator{
iterators: iterators,
}
}

func (d *dedupChunksIterator) At() chunks.Meta {
return d.curr
}

// Next method is almost the same as https://github.com/prometheus/prometheus/blob/v2.27.1/storage/merge.go#L615.
// The difference is that it handles both XOR and Aggr chunk Encoding.
func (d *dedupChunksIterator) Next() bool {
if d.h == nil {
for _, iter := range d.iterators {
Expand All @@ -78,6 +74,7 @@ func (d *dedupChunksIterator) Next() bool {
var (
om = newOverlappingMerger()
oMaxTime = d.curr.MaxTime
prev = d.curr
)

// Detect overlaps to compact.
Expand All @@ -89,11 +86,18 @@ func (d *dedupChunksIterator) Next() bool {
break
}

// We operate on same series, so labels does not matter here.
om.addChunk(next)
if next.MinTime == prev.MinTime &&
next.MaxTime == prev.MaxTime &&
bytes.Equal(next.Chunk.Bytes(), prev.Chunk.Bytes()) {
// 1:1 duplicates, skip it.
} else {
// We operate on same series, so labels does not matter here.
om.addChunk(next)

if next.MaxTime > oMaxTime {
oMaxTime = next.MaxTime
if next.MaxTime > oMaxTime {
oMaxTime = next.MaxTime
}
prev = next
}

iter := heap.Pop(&d.h).(chunks.Iterator)
Expand Down Expand Up @@ -261,7 +265,6 @@ func (o *overlappingMerger) iterator(baseChk chunks.Meta) chunks.Iterator {
}}}).Iterator()

case downsample.ChunkEncAggr:

// If Aggr encoding, each aggregated chunks need to be expanded and deduplicated,
// then re-encoded into Aggr chunks.
aggrChk := baseChk.Chunk.(*downsample.AggrChunk)
Expand Down
141 changes: 141 additions & 0 deletions pkg/dedup/chunk_iter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package dedup

import (
"testing"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/stretchr/testify/require"
)

func TestDedupChunkSeriesMerger(t *testing.T) {
m := NewDedupChunkSeriesMerger()

for _, tc := range []struct {
name string
input []storage.ChunkSeries
expected storage.ChunkSeries
}{
{
name: "single empty series",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil),
},
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil),
},
{
name: "single series",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
},
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
},
{
name: "two empty series",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil),
},
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil),
},
{
name: "two non overlapping",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{5, 5}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}),
},
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{5, 5}}, []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}),
},
{
name: "two overlapping",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{8, 8}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}),
},
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{8, 8}}, []tsdbutil.Sample{sample{10, 10}}),
},
{
name: "two overlapping with large time diff",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{2, 2}, sample{5008, 5008}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}),
},
// sample{5008, 5008} is added to the result due to its large timestamp.
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{5008, 5008}}),
},
{
name: "two duplicated",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{5, 5}}),
},
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
},
{
name: "three overlapping",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{6, 6}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{4, 4}}),
},
// only samples from the last series are retained due to high penalty.
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{4, 4}}),
},
{
name: "three in chained overlap",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{4, 4}, sample{6, 66}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{6, 6}, sample{10, 10}}),
},
// only samples from the last series are retained due to high penalty.
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
},
{
name: "three in chained overlap complex",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{5, 5}}, []tsdbutil.Sample{sample{10, 10}, sample{15, 15}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{20, 20}}, []tsdbutil.Sample{sample{25, 25}, sample{30, 30}}),
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{18, 18}, sample{26, 26}}, []tsdbutil.Sample{sample{31, 31}, sample{35, 35}}),
},
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{0, 0}, sample{5, 5}},
[]tsdbutil.Sample{sample{31, 31}, sample{35, 35}},
),
},
{
name: "110 overlapping samples",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(0, 110)), // [0 - 110)
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(60, 50)), // [60 - 110)
},
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
tsdbutil.GenerateSamples(0, 110),
),
},
{
name: "150 overlapping samples, split chunk",
input: []storage.ChunkSeries{
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(0, 90)), // [0 - 90)
storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(60, 90)), // [90 - 150)
},
expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
tsdbutil.GenerateSamples(0, 90),
),
},
} {
t.Run(tc.name, func(t *testing.T) {
merged := m(tc.input...)
require.Equal(t, tc.expected.Labels(), merged.Labels())
actChks, actErr := storage.ExpandChunks(merged.Iterator())
expChks, expErr := storage.ExpandChunks(tc.expected.Iterator())

require.Equal(t, expErr, actErr)
require.Equal(t, expChks, actChks)
})
}
}
8 changes: 8 additions & 0 deletions pkg/dedup/iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ type sample struct {
v float64
}

func (s sample) T() int64 {
return s.t
}

func (s sample) V() float64 {
return s.v
}

type series struct {
lset labels.Labels
samples []sample
Expand Down

0 comments on commit 5a71d03

Please sign in to comment.