Skip to content

Commit

Permalink
tsdb: Implement limit in block querier
Browse files Browse the repository at this point in the history
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
  • Loading branch information
harry671003 committed Sep 26, 2024
1 parent 5d8f0ef commit cf9110e
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 6 deletions.
50 changes: 50 additions & 0 deletions tsdb/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,56 @@ func (it *ListPostings) Err() error {
return nil
}

// LimitedPostings wraps Postings and limits its iteration.
type LimitedPostings struct {
p Postings
limit int
i int
}

// NewLimitedPostings returns Postings that can only be iterated to the limit. 0 means limit is disabled.
func NewLimitedPostings(p Postings, limit int) Postings {
if limit <= 0 {
return p
}
return newLimitedPostings(p, limit)
}

func newLimitedPostings(p Postings, l int) *LimitedPostings {
return &LimitedPostings{p: p, limit: l}
}

func (it *LimitedPostings) At() storage.SeriesRef {
return it.p.At()
}

func (it *LimitedPostings) Next() bool {
if it.i >= it.limit {
return false
}
it.i++
return it.p.Next()
}

func (it *LimitedPostings) Seek(x storage.SeriesRef) bool {
// If the current value satisfies, then return.
if it.At() >= x {
return true
}

for it.Next() {
if it.At() >= x {
return true
}
}

return false
}

func (it *LimitedPostings) Err() error {
return nil
}

// bigEndianPostings implements the Postings interface over a byte stream of
// big endian numbers.
type bigEndianPostings struct {
Expand Down
128 changes: 128 additions & 0 deletions tsdb/index/postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1363,6 +1363,134 @@ func TestListPostings(t *testing.T) {
})
}

func TestLimitPostings(t *testing.T) {
t.Run("empty postings", func(t *testing.T) {
p := NewListPostings(nil)
p = NewLimitedPostings(p, 10)
require.False(t, p.Next())
require.False(t, p.Seek(10))
require.False(t, p.Next())
require.NoError(t, p.Err())
})

t.Run("one posting", func(t *testing.T) {
t.Run("next", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10})
p = NewLimitedPostings(p, 10)
require.True(t, p.Next())
require.Equal(t, storage.SeriesRef(10), p.At())
require.False(t, p.Next())
require.NoError(t, p.Err())
})
t.Run("seek less", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10})
p = NewLimitedPostings(p, 1)
require.True(t, p.Seek(5))
require.Equal(t, storage.SeriesRef(10), p.At())
require.True(t, p.Seek(5))
require.Equal(t, storage.SeriesRef(10), p.At())
require.False(t, p.Next())
require.NoError(t, p.Err())
})
t.Run("seek equal", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10})
p = NewLimitedPostings(p, 1)
require.True(t, p.Seek(10))
require.Equal(t, storage.SeriesRef(10), p.At())
require.False(t, p.Next())
require.NoError(t, p.Err())
})
t.Run("seek more", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10})
p = NewLimitedPostings(p, 1)
require.False(t, p.Seek(15))
require.False(t, p.Next())
require.NoError(t, p.Err())
})
t.Run("seek after next", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10})
p = NewLimitedPostings(p, 1)
require.True(t, p.Next())
require.False(t, p.Seek(15))
require.False(t, p.Next())
require.NoError(t, p.Err())
})
})

t.Run("multiple postings", func(t *testing.T) {
t.Run("next no limit", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10, 20})
p = NewLimitedPostings(p, 0)
require.True(t, p.Next())
require.Equal(t, storage.SeriesRef(10), p.At())
require.True(t, p.Next())
require.Equal(t, storage.SeriesRef(20), p.At())
require.False(t, p.Next())
require.NoError(t, p.Err())
})
t.Run("next with limit", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10, 20})
p = NewLimitedPostings(p, 1)
require.True(t, p.Next())
require.Equal(t, storage.SeriesRef(10), p.At())
require.False(t, p.Next())
require.NoError(t, p.Err())
})
t.Run("seek", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10, 20})
p = NewLimitedPostings(p, 2)
require.True(t, p.Seek(5))
require.Equal(t, storage.SeriesRef(10), p.At())
require.True(t, p.Seek(5))
require.Equal(t, storage.SeriesRef(10), p.At())
require.True(t, p.Seek(10))
require.Equal(t, storage.SeriesRef(10), p.At())
require.True(t, p.Next())
require.Equal(t, storage.SeriesRef(20), p.At())
require.True(t, p.Seek(10))
require.Equal(t, storage.SeriesRef(20), p.At())
require.True(t, p.Seek(20))
require.Equal(t, storage.SeriesRef(20), p.At())
require.False(t, p.Next())
require.NoError(t, p.Err())
})
t.Run("seek less than last", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50})
p = NewLimitedPostings(p, 5)
require.True(t, p.Seek(45))
require.Equal(t, storage.SeriesRef(50), p.At())
require.False(t, p.Next())
})
t.Run("seek exactly last", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50})
p = NewLimitedPostings(p, 5)
require.True(t, p.Seek(50))
require.Equal(t, storage.SeriesRef(50), p.At())
require.False(t, p.Next())
})
t.Run("seek more than last", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50})
p = NewLimitedPostings(p, 6)
require.False(t, p.Seek(60))
require.False(t, p.Next())
})
t.Run("seek with limit", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50})
p = NewLimitedPostings(p, 3)
require.True(t, p.Seek(10))
require.Equal(t, storage.SeriesRef(10), p.At())
require.True(t, p.Seek(20))
require.Equal(t, storage.SeriesRef(20), p.At())
require.True(t, p.Seek(30))
require.Equal(t, storage.SeriesRef(30), p.At())
require.True(t, p.Seek(10))
require.Equal(t, storage.SeriesRef(30), p.At())
require.False(t, p.Seek(40))
require.False(t, p.Next())
})
})
}

// BenchmarkListPostings benchmarks ListPostings by iterating Next/At sequentially.
// See also BenchmarkIntersect as it performs more `At` calls than `Next` calls when intersecting.
func BenchmarkListPostings(b *testing.B) {
Expand Down
22 changes: 16 additions & 6 deletions tsdb/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ func newBlockBaseQuerier(b BlockReader, mint, maxt int64) (*blockBaseQuerier, er

func (q *blockBaseQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
res, err := q.index.SortedLabelValues(ctx, name, matchers...)
res = truncateToLimit(res, hints)
return res, nil, err
}

func (q *blockBaseQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
res, err := q.index.LabelNames(ctx, matchers...)
res = truncateToLimit(res, hints)
return res, nil, err
}

Expand All @@ -101,6 +103,13 @@ func (q *blockBaseQuerier) Close() error {
return errs.Err()
}

func truncateToLimit(s []string, hints *storage.LabelHints) []string {
if hints != nil && hints.Limit > 0 && len(s) > hints.Limit {
s = s[:hints.Limit]
}
return s
}

type blockQuerier struct {
*blockBaseQuerier
}
Expand All @@ -119,33 +128,34 @@ func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *stora
}

func selectSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher,
index IndexReader, chunks ChunkReader, tombstones tombstones.Reader, mint, maxt int64,
ir IndexReader, chunks ChunkReader, tombstones tombstones.Reader, mint, maxt int64,
) storage.SeriesSet {
disableTrimming := false
sharded := hints != nil && hints.ShardCount > 0

p, err := PostingsForMatchers(ctx, index, ms...)
p, err := PostingsForMatchers(ctx, ir, ms...)
if err != nil {
return storage.ErrSeriesSet(err)
}
if sharded {
p = index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
p = ir.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
}
if sortSeries {
p = index.SortedPostings(p)
p = ir.SortedPostings(p)
}

if hints != nil {
mint = hints.Start
maxt = hints.End
disableTrimming = hints.DisableTrimming
p = index.NewLimitedPostings(p, hints.Limit)
if hints.Func == "series" {
// When you're only looking up metadata (for example series API), you don't need to load any chunks.
return newBlockSeriesSet(index, newNopChunkReader(), tombstones, p, mint, maxt, disableTrimming)
return newBlockSeriesSet(ir, newNopChunkReader(), tombstones, p, mint, maxt, disableTrimming)
}
}

return newBlockSeriesSet(index, chunks, tombstones, p, mint, maxt, disableTrimming)
return newBlockSeriesSet(ir, chunks, tombstones, p, mint, maxt, disableTrimming)
}

// blockChunkQuerier provides chunk querying access to a single block database.
Expand Down
90 changes: 90 additions & 0 deletions tsdb/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,96 @@ func TestBlockQuerierDelete(t *testing.T) {
}
}

func TestBlockQuerierLimit(t *testing.T) {
tmpdir := t.TempDir()
ctx := context.Background()
var (
allValues []string
allNames = []string{"__name__"}
seriesEntries []storage.Series
)

for i := 0; i < 5; i++ {
value := fmt.Sprintf("value%d", i)
name := fmt.Sprintf("labelName%d", i)
allValues = append(allValues, value)
allNames = append(allNames, name)

seriesEntries = append(seriesEntries, storage.NewListSeries(labels.FromStrings(
"__name__", value, name, value,
), []chunks.Sample{sample{100, 0, nil, nil}}))
}

blockDir := createBlock(t, tmpdir, seriesEntries)

// Check open err.
block, err := OpenBlock(nil, blockDir, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, block.Close()) })

q, err := NewBlockQuerier(block, 0, 100)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, q.Close()) })

type testCase struct {
limit int
expectedSeries []storage.Series
expectedLabelValues []string
expectedLabelNames []string
}

testCases := map[string]testCase{
"without limit": {
expectedSeries: seriesEntries,
expectedLabelValues: allValues,
expectedLabelNames: allNames,
},
"with limit": {
limit: 2,
expectedSeries: seriesEntries[:2],
expectedLabelValues: allValues[:2],
expectedLabelNames: allNames[:2],
},
}

for tName, tc := range testCases {
t.Run(fmt.Sprintf("label values %s", tName), func(t *testing.T) {
values, _, err := q.LabelValues(ctx, "__name__", &storage.LabelHints{
Limit: tc.limit,
})
require.NoError(t, err)
require.Equal(t, tc.expectedLabelValues, values)
})

t.Run(fmt.Sprintf("label names %s", tName), func(t *testing.T) {
names, _, err := q.LabelNames(ctx, &storage.LabelHints{
Limit: tc.limit,
})
require.NoError(t, err)
require.Equal(t, tc.expectedLabelNames, names)
})

t.Run(fmt.Sprintf("select %s", tName), func(t *testing.T) {
matcher := labels.MustNewMatcher(labels.MatchRegexp, "__name__", "value.*")
set := q.Select(ctx, true, &storage.SelectHints{
Start: 0,
End: 100,
Limit: tc.limit,
}, matcher)

var s []storage.Series
for set.Next() {
s = append(s, set.At())
}
require.NoError(t, err)
require.Equal(t, len(tc.expectedSeries), len(s))
for i, exp := range tc.expectedSeries {
require.True(t, labels.Equal(exp.Labels(), s[i].Labels()))
}
})
}
}

type fakeChunksReader struct {
ChunkReader
chks map[chunks.ChunkRef]chunkenc.Chunk
Expand Down

0 comments on commit cf9110e

Please sign in to comment.