From 2736cf9368c7d3459c85f82cdec7898811498f08 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 1 May 2024 09:19:08 -0700 Subject: [PATCH] fix reader getting wrong posting offsets when querying multiple values (#7301) --- CHANGELOG.md | 1 + pkg/block/indexheader/binary_reader.go | 12 ++- pkg/block/indexheader/header_test.go | 122 +++++++++++++++++++++++-- pkg/store/lazy_postings.go | 11 ++- 4 files changed, 134 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6915f47c30..f9653befd7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7271](https://github.com/thanos-io/thanos/pull/7271) Query: fixing dedup iterator when working on mixed sample types. - [#7289](https://github.com/thanos-io/thanos/pull/7289) Query Frontend: show warnings from downstream queries. - [#7308](https://github.com/thanos-io/thanos/pull/7308) Store: Batch TSDB Infos for blocks. +- [#7301](https://github.com/thanos-io/thanos/pull/7301) Store Gateway: fix index header reader `PostingsOffsets` returning wrong values. ### Added diff --git a/pkg/block/indexheader/binary_reader.go b/pkg/block/indexheader/binary_reader.go index c86185bfbf..1afaabb786 100644 --- a/pkg/block/indexheader/binary_reader.go +++ b/pkg/block/indexheader/binary_reader.go @@ -875,6 +875,7 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra // Iterate on the offset table. newSameRngs = newSameRngs[:0] + Iter: for d.Err() == nil { // Posting format entry is as follows: // │ ┌────────────────────────────────────────┐ │ @@ -916,6 +917,15 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra break } wantedValue = values[valueIndex] + // Only do this if there is no new range added. If there is an existing + // range we want to continue iterating the offset table to get the end. + if len(newSameRngs) == 0 && i+1 < len(e.offsets) { + // We want to limit this loop within e.offsets[i, i+1). So when the wanted value + // is >= e.offsets[i+1], go out of the loop and binary search again. + if wantedValue >= e.offsets[i+1].value { + break Iter + } + } } if i+1 == len(e.offsets) { @@ -942,7 +952,7 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra if len(newSameRngs) > 0 { // We added some ranges in this iteration. Use next posting offset as the end of our ranges. - // We know it exists as we never go further in this loop than e.offsets[i, i+1]. + // We know it exists as we never go further in this loop than e.offsets[i, i+1). skipNAndName(&d, &buf) d.UvarintBytes() // Label value. diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index 4130157a96..7a797c4e08 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -7,9 +7,12 @@ import ( "context" "fmt" "math" + "math/rand" "path/filepath" + "sort" "strconv" "testing" + "time" "github.com/go-kit/log" "github.com/oklog/ulid" @@ -18,6 +21,7 @@ import ( "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/index" + "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/filesystem" @@ -53,6 +57,9 @@ func TestReaders(t *testing.T) { {{Name: "a", Value: "13"}}, {{Name: "a", Value: "1"}, {Name: "longer-string", Value: "1"}}, {{Name: "a", Value: "1"}, {Name: "longer-string", Value: "2"}}, + {{Name: "cluster", Value: "a-eu-west-1"}}, + {{Name: "cluster", Value: "b-eu-west-1"}}, + {{Name: "cluster", Value: "c-eu-west-1"}}, }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) testutil.Ok(t, err) @@ -108,15 +115,15 @@ func TestReaders(t *testing.T) { if id == id1 { testutil.Equals(t, 1, br.version) testutil.Equals(t, 2, br.indexVersion) - testutil.Equals(t, &BinaryTOC{Symbols: headerLen, PostingsOffsetTable: 70}, br.toc) - testutil.Equals(t, int64(710), br.indexLastPostingEnd) + testutil.Equals(t, &BinaryTOC{Symbols: headerLen, PostingsOffsetTable: 114}, br.toc) + testutil.Equals(t, int64(905), br.indexLastPostingEnd) testutil.Equals(t, 8, br.symbols.Size()) testutil.Equals(t, 0, len(br.postingsV1)) - testutil.Equals(t, 2, len(br.nameSymbols)) + testutil.Equals(t, 3, len(br.nameSymbols)) testutil.Equals(t, map[string]*postingValueOffsets{ "": { offsets: []postingOffset{{value: "", tableOff: 4}}, - lastValOffset: 440, + lastValOffset: 576, }, "a": { offsets: []postingOffset{ @@ -126,14 +133,21 @@ func TestReaders(t *testing.T) { {value: "7", tableOff: 75}, {value: "9", tableOff: 89}, }, - lastValOffset: 640, + lastValOffset: 776, + }, + "cluster": { + offsets: []postingOffset{ + {value: "a-eu-west-1", tableOff: 96}, + {value: "c-eu-west-1", tableOff: 142}, + }, + lastValOffset: 824, }, "longer-string": { offsets: []postingOffset{ - {value: "1", tableOff: 96}, - {value: "2", tableOff: 115}, + {value: "1", tableOff: 165}, + {value: "2", tableOff: 184}, }, - lastValOffset: 706, + lastValOffset: 901, }, }, br.postings) @@ -173,6 +187,17 @@ func TestReaders(t *testing.T) { testutil.Assert(t, rngs[2].End > rngs[2].Start) testutil.Equals(t, NotFoundRange, rngs[1]) + // 3 values exist and 3 values don't exist. + rngs, err = br.PostingsOffsets("cluster", "a-eu-west-1", "a-us-west-2", "b-eu-west-1", "b-us-east-1", "c-eu-west-1", "c-us-east-2") + testutil.Ok(t, err) + for i := 0; i < len(rngs); i++ { + if i%2 == 0 { + testutil.Assert(t, rngs[i].End > rngs[i].Start) + } else { + testutil.Equals(t, NotFoundRange, rngs[i]) + } + } + // Regression tests for https://github.com/thanos-io/thanos/issues/2213. // Most of not existing value was working despite bug, except in certain unlucky cases // it was causing "invalid size" errors. @@ -521,3 +546,84 @@ func readSymbols(bs index.ByteSlice, version, off int) ([]string, map[uint32]str } return symbolSlice, symbols, errors.Wrap(d.Err(), "read symbols") } + +// The idea of this test case is to make sure that reader.PostingsOffsets and +// reader.PostingsOffset get the same index ranges for required label values. +func TestReaderPostingsOffsets(t *testing.T) { + ctx := context.Background() + + tmpDir := t.TempDir() + + possibleClusters := []string{"us-west-2", "us-east-1", "us-east-2", "eu-west-1", "eu-central-1", "ap-southeast-1", "ap-south-1"} + possiblePrefixes := []string{"a", "b", "c", "d", "1", "2", "3", "4"} + totalValues := []string{} + for i := 0; i < len(possibleClusters); i++ { + for j := 0; j < len(possiblePrefixes); j++ { + totalValues = append(totalValues, fmt.Sprintf("%s-%s", possiblePrefixes[j], possibleClusters[i])) + } + } + + rnd := rand.New(rand.NewSource(time.Now().Unix())) + // Pick 5 label values to be used in the block. + clusterLbls := make([]labels.Labels, 0) + valueSet := map[int]struct{}{} + for i := 0; i < 5; { + idx := rnd.Intn(len(totalValues)) + if _, ok := valueSet[idx]; ok { + continue + } + valueSet[idx] = struct{}{} + clusterLbls = append(clusterLbls, []labels.Label{ + {Name: "cluster", Value: totalValues[idx]}, + }) + i++ + } + + // Add additional labels. + lbls := append([]labels.Labels{ + {{Name: "job", Value: "1"}}, + {{Name: "job", Value: "2"}}, + {{Name: "job", Value: "3"}}, + {{Name: "job", Value: "4"}}, + {{Name: "job", Value: "5"}}, + {{Name: "job", Value: "6"}}, + {{Name: "job", Value: "7"}}, + {{Name: "job", Value: "8"}}, + {{Name: "job", Value: "9"}}}, clusterLbls...) + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + id, err := e2eutil.CreateBlock(ctx, tmpDir, lbls, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) + testutil.Ok(t, err) + + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, id.String()), metadata.NoneFunc)) + + fn := filepath.Join(tmpDir, id.String(), block.IndexHeaderFilename) + _, err = WriteBinary(ctx, bkt, id, fn) + testutil.Ok(t, err) + + br, err := NewBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, NewBinaryReaderMetrics(nil)) + testutil.Ok(t, err) + + defer func() { testutil.Ok(t, br.Close()) }() + + for i := 0; i < 100; i++ { + vals := make([]string, 0, 15) + for j := 0; j < 15; j++ { + vals = append(vals, totalValues[rnd.Intn(len(totalValues))]) + } + sort.Strings(vals) + rngs, err := br.PostingsOffsets("cluster", vals...) + require.NoError(t, err) + rngs2 := make([]index.Range, 0) + for _, val := range vals { + rng2, err2 := br.PostingsOffset("cluster", val) + if err2 == NotFoundRangeErr { + rngs2 = append(rngs2, NotFoundRange) + } else { + rngs2 = append(rngs2, rng2) + } + } + require.Equal(t, rngs2, rngs, "Got mismatched results from batched and non-batched API.\nInput cluster labels: %v.\nValues queried: %v", clusterLbls, vals) + } +} diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index 9469be9b47..cfcf987e14 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -8,6 +8,7 @@ import ( "math" "strings" + "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" @@ -54,14 +55,18 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups return nil, false, errors.Wrapf(err, "postings offsets for %s", pg.name) } - for _, r := range rngs { - if r == indexheader.NotFoundRange { + for _, rng := range rngs { + if rng == indexheader.NotFoundRange { continue } + if rng.End <= rng.Start { + level.Error(r.block.logger).Log("msg", "invalid index range, fallback to non lazy posting optimization") + return postingGroups, false, nil + } // Each range starts from the #entries field which is 4 bytes. // Need to subtract it when calculating number of postings. // https://github.com/prometheus/prometheus/blob/v2.46.0/tsdb/docs/format/index.md. - pg.cardinality += (r.End - r.Start - 4) / 4 + pg.cardinality += (rng.End - rng.Start - 4) / 4 } // If the posting group adds keys, 0 cardinality means the posting doesn't exist. // If the posting group removes keys, no posting ranges found is fine as it is a noop.