From b7716d8930ff2b130d9759559cafe68e92836259 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Fri, 10 Jan 2020 19:27:58 +0000 Subject: [PATCH] store: Refetch series if longer than expected. Fixes: https://github.com/thanos-io/thanos/issues/1983 Signed-off-by: Bartlomiej Plotka --- CHANGELOG.md | 1 + pkg/objstore/inmem/inmem.go | 11 ++++-- pkg/store/bucket.go | 47 ++++++++++++++++------- pkg/store/bucket_test.go | 75 +++++++++++++++++++++++++++++++++++++ 4 files changed, 116 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 985ba42fa4..657e7f2826 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Fixed +- [1985](https://github.com/thanos-io/thanos/pull/1985) store gateway: Fixed case where series entry is larger than 64KB in index. - [#1919](https://github.com/thanos-io/thanos/issues/1919) Compactor: Fixed potential data loss when uploading older blocks, or upload taking long time while compactor is running. - [#1937](https://github.com/thanos-io/thanos/pull/1937) Compactor: Improved synchronization of meta JSON files. diff --git a/pkg/objstore/inmem/inmem.go b/pkg/objstore/inmem/inmem.go index 8b04974272..8dd391d570 100644 --- a/pkg/objstore/inmem/inmem.go +++ b/pkg/objstore/inmem/inmem.go @@ -1,14 +1,13 @@ package inmem import ( + "bytes" "context" "io" - "sort" - "sync" - - "bytes" "io/ioutil" + "sort" "strings" + "sync" "github.com/pkg/errors" "github.com/thanos-io/thanos/pkg/objstore" @@ -123,6 +122,10 @@ func (b *Bucket) GetRange(_ context.Context, name string, off, length int64) (io return ioutil.NopCloser(bytes.NewReader(file[off:])), nil } + if length <= 0 { + return ioutil.NopCloser(bytes.NewReader(nil)), errors.New("length cannot be smaller or equal 0") + } + if int64(len(file)) <= off+length { // Just return maximum of what we have. length = int64(len(file)) - off diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 835147a32d..d4a711e192 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -57,6 +57,8 @@ const ( maxChunkSize = 16000 + maxSeriesSize = 64 * 1024 + // CompatibilityTypeLabelName is an artificial label that Store Gateway can optionally advertise. This is required for compatibility // with pre v0.8.0 Querier. Previous Queriers was strict about duplicated external labels of all StoreAPIs that had any labels. // Now with newer Store Gateway advertising all the external labels it has access to, there was simple case where @@ -87,6 +89,7 @@ type bucketStoreMetrics struct { chunkSizeBytes prometheus.Histogram queriesDropped prometheus.Counter queriesLimit prometheus.Gauge + seriesRefetches prometheus.Counter } func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { @@ -166,6 +169,10 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Name: "thanos_bucket_store_queries_concurrent_max", Help: "Number of maximum concurrent queries.", }) + m.seriesRefetches = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_series_refetches_total", + Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", maxSeriesSize), + }) if reg != nil { reg.MustRegister( @@ -185,6 +192,7 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { m.chunkSizeBytes, m.queriesDropped, m.queriesLimit, + m.seriesRefetches, ) } return &m @@ -454,6 +462,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er s.chunkPool, jr, s.partitioner, + s.metrics.seriesRefetches, ) if err != nil { return errors.Wrap(err, "new bucket block") @@ -601,8 +610,6 @@ func (s *bucketSeriesSet) Err() error { } func blockSeries( - ctx context.Context, - ulid ulid.ULID, extLset map[string]string, indexr *bucketIndexReader, chunkr *bucketChunkReader, @@ -845,8 +852,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie defer runutil.CloseWithLogOnErr(s.logger, chunkr, "series block") g.Go(func() error { - part, pstats, err := blockSeries(ctx, - b.meta.ULID, + part, pstats, err := blockSeries( b.meta.Thanos.Labels, indexr, chunkr, @@ -1156,6 +1162,8 @@ type bucketBlock struct { pendingReaders sync.WaitGroup partitioner partitioner + + seriesRefetches prometheus.Counter } func newBucketBlock( @@ -1168,6 +1176,7 @@ func newBucketBlock( chunkPool *pool.BytesPool, indexHeadReader indexheader.Reader, p partitioner, + seriesRefetches prometheus.Counter, ) (b *bucketBlock, err error) { b = &bucketBlock{ logger: logger, @@ -1178,6 +1187,7 @@ func newBucketBlock( partitioner: p, meta: meta, indexHeaderReader: indexHeadReader, + seriesRefetches: seriesRefetches, } // Get object handles for all chunk files. @@ -1484,12 +1494,11 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { fetchTime := time.Since(begin) r.mtx.Lock() - defer r.mtx.Unlock() - r.stats.postingsFetchCount++ r.stats.postingsFetched += j - i r.stats.postingsFetchDurationSum += fetchTime r.stats.postingsFetchedSizeSum += int(length) + r.mtx.Unlock() for _, p := range ptrs[i:j] { c := b[p.ptr.Start-start : p.ptr.End-start] @@ -1499,6 +1508,7 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { return errors.Wrap(err, "read postings list") } + r.mtx.Lock() // Return postings and fill LRU cache. groups[p.groupID].Fill(p.keyID, fetchedPostings) r.cache.StorePostings(r.ctx, r.block.meta.ULID, groups[p.groupID].keys[p.keyID], c) @@ -1506,6 +1516,7 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { // If we just fetched it we still have to update the stats for touched postings. r.stats.postingsTouched++ r.stats.postingsTouchedSizeSum += len(c) + r.mtx.Unlock() } return nil }) @@ -1515,8 +1526,6 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { } func (r *bucketIndexReader) PreloadSeries(ids []uint64) error { - const maxSeriesSize = 64 * 1024 - // Load series from cache, overwriting the list of ids to preload // with the missing ones. fromCache, ids := r.cache.FetchMultiSeries(r.ctx, r.block.meta.ULID, ids) @@ -1533,13 +1542,13 @@ func (r *bucketIndexReader) PreloadSeries(ids []uint64) error { i, j := p.elemRng[0], p.elemRng[1] g.Go(func() error { - return r.loadSeries(ctx, ids[i:j], s, e) + return r.loadSeries(ctx, ids[i:j], false, s, e) }) } return g.Wait() } -func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start, end uint64) error { +func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, refetch bool, start, end uint64) error { begin := time.Now() b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start)) @@ -1548,14 +1557,13 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start, } r.mtx.Lock() - defer r.mtx.Unlock() - r.stats.seriesFetchCount++ r.stats.seriesFetched += len(ids) r.stats.seriesFetchDurationSum += time.Since(begin) r.stats.seriesFetchedSizeSum += int(end - start) + r.mtx.Unlock() - for _, id := range ids { + for i, id := range ids { c := b[id-start:] l, n := binary.Uvarint(c) @@ -1563,11 +1571,22 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start, return errors.New("reading series length failed") } if len(c) < n+int(l) { - return errors.Errorf("invalid remaining size %d, expected %d", len(c), n+int(l)) + if i == 0 && refetch { + return errors.Errorf("invalid remaining size, even after refetch, remaining: %d, expected %d", len(c), n+int(l)) + } + + // Inefficient, but should be rare. + r.block.seriesRefetches.Inc() + level.Warn(r.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", maxSeriesSize) + + // Fetch plus to get the size of next one if exists. + return r.loadSeries(ctx, ids[i:], true, id, id+uint64(n+int(l)+1)) } c = c[n : n+int(l)] + r.mtx.Lock() r.loadedSeries[id] = c r.cache.StoreSeries(r.ctx, r.block.meta.ULID, id, c) + r.mtx.Unlock() } return nil } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 913714f655..cb861347a2 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1,6 +1,7 @@ package store import ( + "bytes" "context" "fmt" "io" @@ -20,8 +21,11 @@ import ( "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" "github.com/oklog/ulid" + promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/encoding" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" @@ -760,3 +764,74 @@ func expectedTouchedBlockOps(all []ulid.ULID, expected []ulid.ULID, cached []uli sort.Strings(ops) return ops } + +// Regression tests against: https://github.com/thanos-io/thanos/issues/1983. +func TestReadIndexCache_LoadSeries(t *testing.T) { + bkt := inmem.NewBucket() + + s := newBucketStoreMetrics(nil) + b := &bucketBlock{ + meta: &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: ulid.MustNew(1, nil)}, + }, + bucket: bkt, + seriesRefetches: s.seriesRefetches, + logger: log.NewNopLogger(), + } + + buf := encoding.Encbuf{} + buf.PutByte(0) + buf.PutByte(0) + buf.PutUvarint(10) + buf.PutString("aaaaaaaaaa") + buf.PutUvarint(10) + buf.PutString("bbbbbbbbbb") + buf.PutUvarint(10) + buf.PutString("cccccccccc") + testutil.Ok(t, bkt.Upload(context.Background(), filepath.Join(b.meta.ULID.String(), block.IndexFilename), bytes.NewReader(buf.Get()))) + + r := bucketIndexReader{ + block: b, + stats: &queryStats{}, + loadedSeries: map[uint64][]byte{}, + cache: noopCache{}, + logger: log.NewNopLogger(), + } + + // Success with no refetches. + testutil.Ok(t, r.loadSeries(context.TODO(), []uint64{2, 13, 24}, false, 2, 100)) + testutil.Equals(t, map[uint64][]byte{ + 2: []byte("aaaaaaaaaa"), + 13: []byte("bbbbbbbbbb"), + 24: []byte("cccccccccc"), + }, r.loadedSeries) + testutil.Equals(t, float64(0), promtest.ToFloat64(s.seriesRefetches)) + + // Success with 2 refetches. + r.loadedSeries = map[uint64][]byte{} + testutil.Ok(t, r.loadSeries(context.TODO(), []uint64{2, 13, 24}, false, 2, 15)) + testutil.Equals(t, map[uint64][]byte{ + 2: []byte("aaaaaaaaaa"), + 13: []byte("bbbbbbbbbb"), + 24: []byte("cccccccccc"), + }, r.loadedSeries) + testutil.Equals(t, float64(2), promtest.ToFloat64(s.seriesRefetches)) + + // Success with refetch on first element. + r.loadedSeries = map[uint64][]byte{} + testutil.Ok(t, r.loadSeries(context.TODO(), []uint64{2}, false, 2, 5)) + testutil.Equals(t, map[uint64][]byte{ + 2: []byte("aaaaaaaaaa"), + }, r.loadedSeries) + testutil.Equals(t, float64(3), promtest.ToFloat64(s.seriesRefetches)) + + buf.Reset() + buf.PutByte(0) + buf.PutByte(0) + buf.PutUvarint(10) + buf.PutString("aaaaaaa") + testutil.Ok(t, bkt.Upload(context.Background(), filepath.Join(b.meta.ULID.String(), block.IndexFilename), bytes.NewReader(buf.Get()))) + + // Fail, but no recursion at least. + testutil.NotOk(t, r.loadSeries(context.TODO(), []uint64{2, 13, 24}, false, 1, 15)) +}