Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: Refetch series if longer than expected. #1985

Merged
merged 1 commit into from
Jan 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

### Fixed

- [1985](https://github.com/thanos-io/thanos/pull/1985) store gateway: Fixed case where series entry is larger than 64KB in index.
- [#1919](https://github.com/thanos-io/thanos/issues/1919) Compactor: Fixed potential data loss when uploading older blocks, or upload taking long time while compactor is
running.
- [#1937](https://github.com/thanos-io/thanos/pull/1937) Compactor: Improved synchronization of meta JSON files.
Expand Down
11 changes: 7 additions & 4 deletions pkg/objstore/inmem/inmem.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package inmem

import (
"bytes"
"context"
"io"
"sort"
"sync"

"bytes"
"io/ioutil"
"sort"
"strings"
"sync"

"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/objstore"
Expand Down Expand Up @@ -123,6 +122,10 @@ func (b *Bucket) GetRange(_ context.Context, name string, off, length int64) (io
return ioutil.NopCloser(bytes.NewReader(file[off:])), nil
}

if length <= 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the error message does not agree with the condition here. Message says “smaller than 0” but the condition is <=0

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return ioutil.NopCloser(bytes.NewReader(nil)), errors.New("length cannot be smaller or equal 0")
}

if int64(len(file)) <= off+length {
// Just return maximum of what we have.
length = int64(len(file)) - off
Expand Down
47 changes: 33 additions & 14 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ const (

maxChunkSize = 16000

maxSeriesSize = 64 * 1024

// CompatibilityTypeLabelName is an artificial label that Store Gateway can optionally advertise. This is required for compatibility
// with pre v0.8.0 Querier. Previous Queriers was strict about duplicated external labels of all StoreAPIs that had any labels.
// Now with newer Store Gateway advertising all the external labels it has access to, there was simple case where
Expand Down Expand Up @@ -87,6 +89,7 @@ type bucketStoreMetrics struct {
chunkSizeBytes prometheus.Histogram
queriesDropped prometheus.Counter
queriesLimit prometheus.Gauge
seriesRefetches prometheus.Counter
}

func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Expand Down Expand Up @@ -166,6 +169,10 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Name: "thanos_bucket_store_queries_concurrent_max",
Help: "Number of maximum concurrent queries.",
})
m.seriesRefetches = prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_series_refetches_total",
Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", maxSeriesSize),
})

if reg != nil {
reg.MustRegister(
Expand All @@ -185,6 +192,7 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
m.chunkSizeBytes,
m.queriesDropped,
m.queriesLimit,
m.seriesRefetches,
)
}
return &m
Expand Down Expand Up @@ -454,6 +462,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
s.chunkPool,
jr,
s.partitioner,
s.metrics.seriesRefetches,
)
if err != nil {
return errors.Wrap(err, "new bucket block")
Expand Down Expand Up @@ -601,8 +610,6 @@ func (s *bucketSeriesSet) Err() error {
}

func blockSeries(
ctx context.Context,
ulid ulid.ULID,
extLset map[string]string,
indexr *bucketIndexReader,
chunkr *bucketChunkReader,
Expand Down Expand Up @@ -845,8 +852,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
defer runutil.CloseWithLogOnErr(s.logger, chunkr, "series block")

g.Go(func() error {
part, pstats, err := blockSeries(ctx,
b.meta.ULID,
part, pstats, err := blockSeries(
b.meta.Thanos.Labels,
indexr,
chunkr,
Expand Down Expand Up @@ -1156,6 +1162,8 @@ type bucketBlock struct {
pendingReaders sync.WaitGroup

partitioner partitioner

seriesRefetches prometheus.Counter
}

func newBucketBlock(
Expand All @@ -1168,6 +1176,7 @@ func newBucketBlock(
chunkPool *pool.BytesPool,
indexHeadReader indexheader.Reader,
p partitioner,
seriesRefetches prometheus.Counter,
) (b *bucketBlock, err error) {
b = &bucketBlock{
logger: logger,
Expand All @@ -1178,6 +1187,7 @@ func newBucketBlock(
partitioner: p,
meta: meta,
indexHeaderReader: indexHeadReader,
seriesRefetches: seriesRefetches,
}

// Get object handles for all chunk files.
Expand Down Expand Up @@ -1484,12 +1494,11 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
fetchTime := time.Since(begin)

r.mtx.Lock()
defer r.mtx.Unlock()

r.stats.postingsFetchCount++
r.stats.postingsFetched += j - i
r.stats.postingsFetchDurationSum += fetchTime
r.stats.postingsFetchedSizeSum += int(length)
r.mtx.Unlock()

for _, p := range ptrs[i:j] {
c := b[p.ptr.Start-start : p.ptr.End-start]
Expand All @@ -1499,13 +1508,15 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
return errors.Wrap(err, "read postings list")
}

r.mtx.Lock()
// Return postings and fill LRU cache.
groups[p.groupID].Fill(p.keyID, fetchedPostings)
r.cache.StorePostings(r.ctx, r.block.meta.ULID, groups[p.groupID].keys[p.keyID], c)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(c)
r.mtx.Unlock()
}
return nil
})
Expand All @@ -1515,8 +1526,6 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
}

func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
const maxSeriesSize = 64 * 1024

// Load series from cache, overwriting the list of ids to preload
// with the missing ones.
fromCache, ids := r.cache.FetchMultiSeries(r.ctx, r.block.meta.ULID, ids)
Expand All @@ -1533,13 +1542,13 @@ func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
i, j := p.elemRng[0], p.elemRng[1]

g.Go(func() error {
return r.loadSeries(ctx, ids[i:j], s, e)
return r.loadSeries(ctx, ids[i:j], false, s, e)
})
}
return g.Wait()
}

func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start, end uint64) error {
func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, refetch bool, start, end uint64) error {
begin := time.Now()

b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start))
Expand All @@ -1548,26 +1557,36 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start,
}

r.mtx.Lock()
defer r.mtx.Unlock()

r.stats.seriesFetchCount++
r.stats.seriesFetched += len(ids)
r.stats.seriesFetchDurationSum += time.Since(begin)
r.stats.seriesFetchedSizeSum += int(end - start)
r.mtx.Unlock()

for _, id := range ids {
for i, id := range ids {
c := b[id-start:]

l, n := binary.Uvarint(c)
if n < 1 {
return errors.New("reading series length failed")
}
if len(c) < n+int(l) {
return errors.Errorf("invalid remaining size %d, expected %d", len(c), n+int(l))
if i == 0 && refetch {
return errors.Errorf("invalid remaining size, even after refetch, remaining: %d, expected %d", len(c), n+int(l))
}

// Inefficient, but should be rare.
r.block.seriesRefetches.Inc()
level.Warn(r.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", maxSeriesSize)

// Fetch plus to get the size of next one if exists.
return r.loadSeries(ctx, ids[i:], true, id, id+uint64(n+int(l)+1))
}
c = c[n : n+int(l)]
r.mtx.Lock()
r.loadedSeries[id] = c
r.cache.StoreSeries(r.ctx, r.block.meta.ULID, id, c)
r.mtx.Unlock()
}
return nil
}
Expand Down
75 changes: 75 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package store

import (
"bytes"
"context"
"fmt"
"io"
Expand All @@ -20,8 +21,11 @@ import (
"github.com/leanovate/gopter/gen"
"github.com/leanovate/gopter/prop"
"github.com/oklog/ulid"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
Expand Down Expand Up @@ -760,3 +764,74 @@ func expectedTouchedBlockOps(all []ulid.ULID, expected []ulid.ULID, cached []uli
sort.Strings(ops)
return ops
}

// Regression tests against: https://github.com/thanos-io/thanos/issues/1983.
func TestReadIndexCache_LoadSeries(t *testing.T) {
bkt := inmem.NewBucket()

s := newBucketStoreMetrics(nil)
b := &bucketBlock{
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{ULID: ulid.MustNew(1, nil)},
},
bucket: bkt,
seriesRefetches: s.seriesRefetches,
logger: log.NewNopLogger(),
}

buf := encoding.Encbuf{}
buf.PutByte(0)
buf.PutByte(0)
buf.PutUvarint(10)
buf.PutString("aaaaaaaaaa")
buf.PutUvarint(10)
buf.PutString("bbbbbbbbbb")
buf.PutUvarint(10)
buf.PutString("cccccccccc")
testutil.Ok(t, bkt.Upload(context.Background(), filepath.Join(b.meta.ULID.String(), block.IndexFilename), bytes.NewReader(buf.Get())))

r := bucketIndexReader{
block: b,
stats: &queryStats{},
loadedSeries: map[uint64][]byte{},
cache: noopCache{},
logger: log.NewNopLogger(),
}

// Success with no refetches.
testutil.Ok(t, r.loadSeries(context.TODO(), []uint64{2, 13, 24}, false, 2, 100))
testutil.Equals(t, map[uint64][]byte{
2: []byte("aaaaaaaaaa"),
13: []byte("bbbbbbbbbb"),
24: []byte("cccccccccc"),
}, r.loadedSeries)
testutil.Equals(t, float64(0), promtest.ToFloat64(s.seriesRefetches))

// Success with 2 refetches.
r.loadedSeries = map[uint64][]byte{}
testutil.Ok(t, r.loadSeries(context.TODO(), []uint64{2, 13, 24}, false, 2, 15))
testutil.Equals(t, map[uint64][]byte{
2: []byte("aaaaaaaaaa"),
13: []byte("bbbbbbbbbb"),
24: []byte("cccccccccc"),
}, r.loadedSeries)
testutil.Equals(t, float64(2), promtest.ToFloat64(s.seriesRefetches))

// Success with refetch on first element.
r.loadedSeries = map[uint64][]byte{}
testutil.Ok(t, r.loadSeries(context.TODO(), []uint64{2}, false, 2, 5))
testutil.Equals(t, map[uint64][]byte{
2: []byte("aaaaaaaaaa"),
}, r.loadedSeries)
testutil.Equals(t, float64(3), promtest.ToFloat64(s.seriesRefetches))

buf.Reset()
buf.PutByte(0)
buf.PutByte(0)
buf.PutUvarint(10)
buf.PutString("aaaaaaa")
testutil.Ok(t, bkt.Upload(context.Background(), filepath.Join(b.meta.ULID.String(), block.IndexFilename), bytes.NewReader(buf.Get())))

// Fail, but no recursion at least.
testutil.NotOk(t, r.loadSeries(context.TODO(), []uint64{2, 13, 24}, false, 1, 15))
}