Skip to content

Commit

Permalink
index cache: Cache expanded postings (thanos-io#6420)
Browse files Browse the repository at this point in the history
* cache expanded postings in index cache

Signed-off-by: Ben Ye <[email protected]>

* update changelog

Signed-off-by: Ben Ye <[email protected]>

* fix

Signed-off-by: Ben Ye <[email protected]>

* fix lint

Signed-off-by: Ben Ye <[email protected]>

* rebase main and added compression name to key

Signed-off-by: Ben Ye <[email protected]>

* update key

Signed-off-by: Ben Ye <[email protected]>

* add e2e test for memcached

Signed-off-by: Ben Ye <[email protected]>

* fix cache config

Signed-off-by: Ben Ye <[email protected]>

* address review comments

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored and GiedriusS committed Jun 14, 2023
1 parent 5dba71e commit fec633b
Show file tree
Hide file tree
Showing 14 changed files with 521 additions and 61 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6163](https://github.com/thanos-io/thanos/pull/6163) Receiver: Add hidden flag `--receive-forward-max-backoff` to configure the max backoff for forwarding requests.
- [#5777](https://github.com/thanos-io/thanos/pull/5777) Receive: Allow specifying tenant-specific external labels in Router Ingestor.
- [#6352](https://github.com/thanos-io/thanos/pull/6352) Store: Expose store gateway query stats in series response hints.
- [#6420](https://github.com/thanos-io/thanos/pull/6420) Index Cache: Cache expanded postings.

### Fixed

Expand Down
182 changes: 133 additions & 49 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,11 @@ func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []label
return map[labels.Label][]byte{}, keys
}

func (noopCache) StoreExpandedPostings(_ ulid.ULID, _ []*labels.Matcher, _ []byte) {}
func (noopCache) FetchExpandedPostings(_ context.Context, _ ulid.ULID, _ []*labels.Matcher) ([]byte, bool) {
return []byte{}, false
}

func (noopCache) StoreSeries(ulid.ULID, storage.SeriesRef, []byte) {}
func (noopCache) FetchMultiSeries(_ context.Context, _ ulid.ULID, ids []storage.SeriesRef) (map[storage.SeriesRef][]byte, []storage.SeriesRef) {
return map[storage.SeriesRef][]byte{}, ids
Expand Down Expand Up @@ -2185,6 +2190,23 @@ func (r *bucketIndexReader) reset() {
// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by
// single label name=value.
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) {
// Sort matchers to make sure we generate the same cache key.
sort.Slice(ms, func(i, j int) bool {
if ms[i].Type == ms[j].Type {
if ms[i].Name == ms[j].Name {
return ms[i].Value < ms[j].Value
}
return ms[i].Name < ms[j].Name
}
return ms[i].Type < ms[j].Type
})
hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter)
if err != nil {
return nil, err
}
if hit {
return postings, nil
}
var (
postingGroups []*postingGroup
allRequested = false
Expand Down Expand Up @@ -2280,18 +2302,29 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M
return nil, errors.Wrap(err, "expand")
}

// As of version two all series entries are 16 byte padded. All references
// we get have to account for that to get the correct offset.
version, err := r.block.indexHeaderReader.IndexVersion()
if err != nil {
return nil, errors.Wrap(err, "get index version")
}
if version >= 2 {
for i, id := range ps {
ps[i] = id * 16
// Encode postings to cache. We compress and cache postings before adding
// 16 bytes padding in order to make compressed size smaller.
dataToCache, compressionDuration, compressionErrors, compressedSize := r.encodePostingsToCache(index.NewListPostings(ps), len(ps))
r.stats.cachedPostingsCompressions++
r.stats.cachedPostingsCompressionErrors += compressionErrors
r.stats.CachedPostingsCompressionTimeSum += compressionDuration
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize)
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(ps) * 4) // Estimate the posting list size.
r.block.indexCache.StoreExpandedPostings(r.block.meta.ULID, ms, dataToCache)

if len(ps) > 0 {
// As of version two all series entries are 16 byte padded. All references
// we get have to account for that to get the correct offset.
version, err := r.block.indexHeaderReader.IndexVersion()
if err != nil {
return nil, errors.Wrap(err, "get index version")
}
if version >= 2 {
for i, id := range ps {
ps[i] = id * 16
}
}
}

return ps, nil
}

Expand Down Expand Up @@ -2408,6 +2441,51 @@ type postingPtr struct {
ptr index.Range
}

func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) (bool, []storage.SeriesRef, error) {
dataFromCache, hit := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms)
if !hit {
return false, nil, nil
}
if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil {
return false, nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading expanded postings from index cache: %s", err)
}
r.stats.DataDownloadedSizeSum += units.Base2Bytes(len(dataFromCache))
r.stats.postingsTouched++
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(dataFromCache))
p, closeFns, err := r.decodeCachedPostings(dataFromCache)
defer func() {
for _, closeFn := range closeFns {
closeFn()
}
}()
// If failed to decode or expand cached postings, return and expand postings again.
if err != nil {
level.Error(r.block.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String())
return false, nil, nil
}

ps, err := index.ExpandPostings(p)
if err != nil {
level.Error(r.block.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String())
return false, nil, nil
}

if len(ps) > 0 {
// As of version two all series entries are 16 byte padded. All references
// we get have to account for that to get the correct offset.
version, err := r.block.indexHeaderReader.IndexVersion()
if err != nil {
return false, nil, errors.Wrap(err, "get index version")
}
if version >= 2 {
for i, id := range ps {
ps[i] = id * 16
}
}
}
return true, ps, nil
}

// fetchPostings fill postings requested by posting groups.
// It returns one posting for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
Expand Down Expand Up @@ -2439,32 +2517,12 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
r.stats.postingsTouched++
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(b))

// Even if this instance is not using compression, there may be compressed
// entries in the cache written by other stores.
var (
l index.Postings
err error
)
if isDiffVarintSnappyEncodedPostings(b) || isDiffVarintSnappyStreamedEncodedPostings(b) {
s := time.Now()
clPostings, err := decodePostings(b)
r.stats.cachedPostingsDecompressions += 1
r.stats.CachedPostingsDecompressionTimeSum += time.Since(s)
if err != nil {
r.stats.cachedPostingsDecompressionErrors += 1
} else {
closeFns = append(closeFns, clPostings.close)
l = clPostings
}
} else {
_, l, err = r.dec.Postings(b)
}

l, closer, err := r.decodeCachedPostings(b)
if err != nil {
return nil, closeFns, errors.Wrap(err, "decode postings")
}

output[ix] = l
closeFns = append(closeFns, closer...)
continue
}

Expand Down Expand Up @@ -2536,27 +2594,12 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
return err
}

dataToCache := pBytes

compressionTime := time.Duration(0)
compressions, compressionErrors, compressedSize := 0, 0, 0

// Reencode postings before storing to cache. If that fails, we store original bytes.
// This can only fail, if postings data was somehow corrupted,
// and there is nothing we can do about it.
// Errors from corrupted postings will be reported when postings are used.
compressions++
s := time.Now()
bep := newBigEndianPostings(pBytes[4:])
data, err := diffVarintSnappyStreamedEncode(bep, bep.length())
compressionTime = time.Since(s)
if err == nil {
dataToCache = data
compressedSize = len(data)
} else {
compressionErrors = 1
}

dataToCache, compressionTime, compressionErrors, compressedSize := r.encodePostingsToCache(bep, bep.length())
r.mtx.Lock()
// Return postings and fill LRU cache.
// Truncate first 4 bytes which are length of posting.
Expand All @@ -2567,7 +2610,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(pBytes))
r.stats.cachedPostingsCompressions += compressions
r.stats.cachedPostingsCompressions += 1
r.stats.cachedPostingsCompressionErrors += compressionErrors
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(pBytes))
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize)
Expand All @@ -2581,6 +2624,47 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
return output, closeFns, g.Wait()
}

func (r *bucketIndexReader) decodeCachedPostings(b []byte) (index.Postings, []func(), error) {
// Even if this instance is not using compression, there may be compressed
// entries in the cache written by other stores.
var (
l index.Postings
err error
closeFns []func()
)
if isDiffVarintSnappyEncodedPostings(b) || isDiffVarintSnappyStreamedEncodedPostings(b) {
s := time.Now()
clPostings, err := decodePostings(b)
r.stats.cachedPostingsDecompressions += 1
r.stats.CachedPostingsDecompressionTimeSum += time.Since(s)
if err != nil {
r.stats.cachedPostingsDecompressionErrors += 1
} else {
closeFns = append(closeFns, clPostings.close)
l = clPostings
}
} else {
_, l, err = r.dec.Postings(b)
}
return l, closeFns, err
}

func (r *bucketIndexReader) encodePostingsToCache(p index.Postings, length int) ([]byte, time.Duration, int, int) {
var dataToCache []byte
compressionTime := time.Duration(0)
compressionErrors, compressedSize := 0, 0
s := time.Now()
data, err := diffVarintSnappyStreamedEncode(p, length)
compressionTime = time.Since(s)
if err == nil {
dataToCache = data
compressedSize = len(data)
} else {
compressionErrors = 1
}
return dataToCache, compressionTime, compressionErrors, compressedSize
}

func resizePostings(b []byte) ([]byte, error) {
d := encoding.Decbuf{B: b}
n := d.Be32int()
Expand Down
8 changes: 8 additions & 0 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ func (c *swappableCache) FetchMultiPostings(ctx context.Context, blockID ulid.UL
return c.ptr.FetchMultiPostings(ctx, blockID, keys)
}

func (c *swappableCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) {
c.ptr.StoreExpandedPostings(blockID, matchers, v)
}

func (c *swappableCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) {
return c.ptr.FetchExpandedPostings(ctx, blockID, matchers)
}

func (c *swappableCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) {
c.ptr.StoreSeries(blockID, id, v)
}
Expand Down
38 changes: 36 additions & 2 deletions pkg/store/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"encoding/base64"
"strconv"
"strings"

"github.com/oklog/ulid"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -15,8 +16,9 @@ import (
)

const (
cacheTypePostings string = "Postings"
cacheTypeSeries string = "Series"
cacheTypePostings string = "Postings"
cacheTypeExpandedPostings string = "ExpandedPostings"
cacheTypeSeries string = "Series"

sliceHeaderSize = 16
)
Expand All @@ -38,6 +40,12 @@ type IndexCache interface {
// and returns a map containing cache hits, along with a list of missing keys.
FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label)

// StoreExpandedPostings stores expanded postings for a set of label matchers.
StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte)

// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool)

// StoreSeries stores a single series.
StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte)

Expand All @@ -59,6 +67,8 @@ func (c cacheKey) keyType() string {
return cacheTypePostings
case cacheKeySeries:
return cacheTypeSeries
case cacheKeyExpandedPostings:
return cacheTypeExpandedPostings
}
return "<unknown>"
}
Expand All @@ -68,6 +78,8 @@ func (c cacheKey) size() uint64 {
case cacheKeyPostings:
// ULID + 2 slice headers + number of chars in value and name.
return ulidSize + 2*sliceHeaderSize + uint64(len(k.Value)+len(k.Name))
case cacheKeyExpandedPostings:
return ulidSize + sliceHeaderSize + uint64(len(k))
case cacheKeySeries:
return ulidSize + 8 // ULID + uint64.
}
Expand All @@ -86,12 +98,34 @@ func (c cacheKey) string() string {
key += ":" + c.compression
}
return key
case cacheKeyExpandedPostings:
// Use cryptographically hash functions to avoid hash collisions
// which would end up in wrong query results.
matchers := c.key.(cacheKeyExpandedPostings)
matchersHash := blake2b.Sum256([]byte(matchers))
key := "EP:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(matchersHash[0:])
if len(c.compression) > 0 {
key += ":" + c.compression
}
return key
case cacheKeySeries:
return "S:" + c.block + ":" + strconv.FormatUint(uint64(c.key.(cacheKeySeries)), 10)
default:
return ""
}
}

func labelMatchersToString(matchers []*labels.Matcher) string {
sb := strings.Builder{}
for i, lbl := range matchers {
sb.WriteString(lbl.String())
if i < len(matchers)-1 {
sb.WriteRune(';')
}
}
return sb.String()
}

type cacheKeyPostings labels.Label
type cacheKeyExpandedPostings string // We don't use []*labels.Matcher because it is not a hashable type so fail at inmemory cache.
type cacheKeySeries uint64
Loading

0 comments on commit fec633b

Please sign in to comment.