Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

index cache: Cache expanded postings #6420

Merged
merged 9 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6163](https://github.com/thanos-io/thanos/pull/6163) Receiver: Add hidden flag `--receive-forward-max-backoff` to configure the max backoff for forwarding requests.
- [#5777](https://github.com/thanos-io/thanos/pull/5777) Receive: Allow specifying tenant-specific external labels in Router Ingestor.
- [#6352](https://github.com/thanos-io/thanos/pull/6352) Store: Expose store gateway query stats in series response hints.
- [#6420](https://github.com/thanos-io/thanos/pull/6420) Index Cache: Cache expanded postings.

### Fixed
- [#6427](https://github.com/thanos-io/thanos/pull/6427) Receive: increasing log level for failed uploads to error
Expand Down
182 changes: 133 additions & 49 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []label
return map[labels.Label][]byte{}, keys
}

func (noopCache) StoreExpandedPostings(_ ulid.ULID, _ []*labels.Matcher, _ []byte) {}
func (noopCache) FetchExpandedPostings(_ context.Context, _ ulid.ULID, _ []*labels.Matcher) ([]byte, bool) {
return []byte{}, false
}

func (noopCache) StoreSeries(ulid.ULID, storage.SeriesRef, []byte) {}
func (noopCache) FetchMultiSeries(_ context.Context, _ ulid.ULID, ids []storage.SeriesRef) (map[storage.SeriesRef][]byte, []storage.SeriesRef) {
return map[storage.SeriesRef][]byte{}, ids
Expand Down Expand Up @@ -2151,6 +2156,23 @@ func (r *bucketIndexReader) reset() {
// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by
// single label name=value.
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) {
// Sort matchers to make sure we generate the same cache key.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can move this new section into a function to reduce some nesting with early returns, e.g. !hit and err != nil.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated with the new commit

sort.Slice(ms, func(i, j int) bool {
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
if ms[i].Type == ms[j].Type {
if ms[i].Name == ms[j].Name {
return ms[i].Value < ms[j].Value
}
return ms[i].Name < ms[j].Name
}
return ms[i].Type < ms[j].Type
})
hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter)
if err != nil {
return nil, err
}
if hit {
return postings, nil
}
var (
postingGroups []*postingGroup
allRequested = false
Expand Down Expand Up @@ -2246,18 +2268,29 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M
return nil, errors.Wrap(err, "expand")
}

// As of version two all series entries are 16 byte padded. All references
// we get have to account for that to get the correct offset.
version, err := r.block.indexHeaderReader.IndexVersion()
if err != nil {
return nil, errors.Wrap(err, "get index version")
}
if version >= 2 {
for i, id := range ps {
ps[i] = id * 16
// Encode postings to cache. We compress and cache postings before adding
// 16 bytes padding in order to make compressed size smaller.
dataToCache, compressionDuration, compressionErrors, compressedSize := r.encodePostingsToCache(index.NewListPostings(ps), len(ps))
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
r.stats.cachedPostingsCompressions++
r.stats.cachedPostingsCompressionErrors += compressionErrors
r.stats.CachedPostingsCompressionTimeSum += compressionDuration
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize)
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(ps) * 4) // Estimate the posting list size.
r.block.indexCache.StoreExpandedPostings(r.block.meta.ULID, ms, dataToCache)

if len(ps) > 0 {
// As of version two all series entries are 16 byte padded. All references
// we get have to account for that to get the correct offset.
version, err := r.block.indexHeaderReader.IndexVersion()
if err != nil {
return nil, errors.Wrap(err, "get index version")
}
if version >= 2 {
for i, id := range ps {
ps[i] = id * 16
}
}
}

return ps, nil
}

Expand Down Expand Up @@ -2374,6 +2407,51 @@ type postingPtr struct {
ptr index.Range
}

func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) (bool, []storage.SeriesRef, error) {
dataFromCache, hit := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms)
if !hit {
return false, nil, nil
}
if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil {
return false, nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading expanded postings from index cache: %s", err)
}
r.stats.DataDownloadedSizeSum += units.Base2Bytes(len(dataFromCache))
r.stats.postingsTouched++
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(dataFromCache))
p, closeFns, err := r.decodeCachedPostings(dataFromCache)
defer func() {
for _, closeFn := range closeFns {
closeFn()
}
}()
// If failed to decode or expand cached postings, return and expand postings again.
if err != nil {
level.Error(r.block.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String())
return false, nil, nil
}

ps, err := index.ExpandPostings(p)
if err != nil {
level.Error(r.block.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String())
return false, nil, nil
}

if len(ps) > 0 {
// As of version two all series entries are 16 byte padded. All references
// we get have to account for that to get the correct offset.
version, err := r.block.indexHeaderReader.IndexVersion()
if err != nil {
return false, nil, errors.Wrap(err, "get index version")
}
if version >= 2 {
for i, id := range ps {
ps[i] = id * 16
}
}
}
return true, ps, nil
}

// fetchPostings fill postings requested by posting groups.
// It returns one posting for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
Expand Down Expand Up @@ -2405,32 +2483,12 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
r.stats.postingsTouched++
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(b))

// Even if this instance is not using compression, there may be compressed
// entries in the cache written by other stores.
var (
l index.Postings
err error
)
if isDiffVarintSnappyEncodedPostings(b) || isDiffVarintSnappyStreamedEncodedPostings(b) {
s := time.Now()
clPostings, err := decodePostings(b)
r.stats.cachedPostingsDecompressions += 1
r.stats.CachedPostingsDecompressionTimeSum += time.Since(s)
if err != nil {
r.stats.cachedPostingsDecompressionErrors += 1
} else {
closeFns = append(closeFns, clPostings.close)
l = clPostings
}
} else {
_, l, err = r.dec.Postings(b)
}

l, closer, err := r.decodeCachedPostings(b)
if err != nil {
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
return nil, closeFns, errors.Wrap(err, "decode postings")
}

output[ix] = l
closeFns = append(closeFns, closer...)
continue
}

Expand Down Expand Up @@ -2502,27 +2560,12 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
return err
}

dataToCache := pBytes

compressionTime := time.Duration(0)
compressions, compressionErrors, compressedSize := 0, 0, 0

// Reencode postings before storing to cache. If that fails, we store original bytes.
// This can only fail, if postings data was somehow corrupted,
// and there is nothing we can do about it.
// Errors from corrupted postings will be reported when postings are used.
compressions++
s := time.Now()
bep := newBigEndianPostings(pBytes[4:])
data, err := diffVarintSnappyStreamedEncode(bep, bep.length())
compressionTime = time.Since(s)
if err == nil {
dataToCache = data
compressedSize = len(data)
} else {
compressionErrors = 1
}

dataToCache, compressionTime, compressionErrors, compressedSize := r.encodePostingsToCache(bep, bep.length())
r.mtx.Lock()
// Return postings and fill LRU cache.
// Truncate first 4 bytes which are length of posting.
Expand All @@ -2533,7 +2576,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(pBytes))
r.stats.cachedPostingsCompressions += compressions
r.stats.cachedPostingsCompressions += 1
r.stats.cachedPostingsCompressionErrors += compressionErrors
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(pBytes))
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize)
Expand All @@ -2547,6 +2590,47 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
return output, closeFns, g.Wait()
}

func (r *bucketIndexReader) decodeCachedPostings(b []byte) (index.Postings, []func(), error) {
// Even if this instance is not using compression, there may be compressed
// entries in the cache written by other stores.
var (
l index.Postings
err error
closeFns []func()
)
if isDiffVarintSnappyEncodedPostings(b) || isDiffVarintSnappyStreamedEncodedPostings(b) {
s := time.Now()
clPostings, err := decodePostings(b)
r.stats.cachedPostingsDecompressions += 1
r.stats.CachedPostingsDecompressionTimeSum += time.Since(s)
if err != nil {
r.stats.cachedPostingsDecompressionErrors += 1
} else {
closeFns = append(closeFns, clPostings.close)
l = clPostings
}
} else {
_, l, err = r.dec.Postings(b)
}
return l, closeFns, err
}

func (r *bucketIndexReader) encodePostingsToCache(p index.Postings, length int) ([]byte, time.Duration, int, int) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit but maybe instead of returning multiple ints we could pass r.stats.cachedPostingsCompressionErrors and r.stats.CachedPostingsCompressedSizeSum directly? Perhaps even the whole r.stats? I think this would make the code clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes sense and I wanted to do the same at the beginning.
But since encodePostingsToCache will be called concurrently, passing r.stats means we need to have lock inside https://github.com/thanos-io/thanos/pull/6420/files#diff-3e2896fafa6ff73509c77df2c4389b68828e02575bb4fb78b6c34bcfb922a7ceR2554.

But not all situations need to hold the lock so I want to simplify the logic and the caller can decide how to update the stats and add lock. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that makes sense. Maybe it is logical to convert those fields to atomic types to avoid holding a lock in a separate PR so that it could simplify this function's return values.

var dataToCache []byte
compressionTime := time.Duration(0)
compressionErrors, compressedSize := 0, 0
s := time.Now()
data, err := diffVarintSnappyStreamedEncode(p, length)
compressionTime = time.Since(s)
if err == nil {
dataToCache = data
compressedSize = len(data)
} else {
compressionErrors = 1
}
return dataToCache, compressionTime, compressionErrors, compressedSize
}

func resizePostings(b []byte) ([]byte, error) {
d := encoding.Decbuf{B: b}
n := d.Be32int()
Expand Down
8 changes: 8 additions & 0 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ func (c *swappableCache) FetchMultiPostings(ctx context.Context, blockID ulid.UL
return c.ptr.FetchMultiPostings(ctx, blockID, keys)
}

func (c *swappableCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) {
c.ptr.StoreExpandedPostings(blockID, matchers, v)
}

func (c *swappableCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) {
return c.ptr.FetchExpandedPostings(ctx, blockID, matchers)
}

func (c *swappableCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) {
c.ptr.StoreSeries(blockID, id, v)
}
Expand Down
38 changes: 36 additions & 2 deletions pkg/store/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"encoding/base64"
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
"strconv"
"strings"

"github.com/oklog/ulid"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -15,8 +16,9 @@ import (
)

const (
cacheTypePostings string = "Postings"
cacheTypeSeries string = "Series"
cacheTypePostings string = "Postings"
cacheTypeExpandedPostings string = "ExpandedPostings"
cacheTypeSeries string = "Series"

sliceHeaderSize = 16
)
Expand All @@ -38,6 +40,12 @@ type IndexCache interface {
// and returns a map containing cache hits, along with a list of missing keys.
FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label)

// StoreExpandedPostings stores expanded postings for a set of label matchers.
StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte)

// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool)

// StoreSeries stores a single series.
StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte)

Expand All @@ -59,6 +67,8 @@ func (c cacheKey) keyType() string {
return cacheTypePostings
case cacheKeySeries:
return cacheTypeSeries
case cacheKeyExpandedPostings:
return cacheTypeExpandedPostings
}
return "<unknown>"
}
Expand All @@ -68,6 +78,8 @@ func (c cacheKey) size() uint64 {
case cacheKeyPostings:
// ULID + 2 slice headers + number of chars in value and name.
return ulidSize + 2*sliceHeaderSize + uint64(len(k.Value)+len(k.Name))
case cacheKeyExpandedPostings:
return ulidSize + sliceHeaderSize + uint64(len(k))
case cacheKeySeries:
return ulidSize + 8 // ULID + uint64.
}
Expand All @@ -86,12 +98,34 @@ func (c cacheKey) string() string {
key += ":" + c.compression
}
return key
case cacheKeyExpandedPostings:
// Use cryptographically hash functions to avoid hash collisions
// which would end up in wrong query results.
matchers := c.key.(cacheKeyExpandedPostings)
matchersHash := blake2b.Sum256([]byte(matchers))
key := "EP:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(matchersHash[0:])
Copy link
Member

@GiedriusS GiedriusS Jun 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also encode the index header version (https://github.com/thanos-io/thanos/pull/6420/files#diff-3e2896fafa6ff73509c77df2c4389b68828e02575bb4fb78b6c34bcfb922a7ceR2442) because that leads to different results? 🤔 This is probably a theoretical problem but still it probably makes sense to be safer than sorry 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get it. Block is immutable so since we include the block ID here the version should be the same?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, for about that invariant. My idea was that if in case another index version comes out in the future, it would be enough to only rewrite the index file. But this works too, our tooling is written with that assumption in mind 👍

Copy link
Contributor Author

@yeya24 yeya24 Jun 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, you mean we even changed the format of the postings? Like we changed posting list into roaring bitmap or something else?
Yeah then I think it is a good point.

if len(c.compression) > 0 {
key += ":" + c.compression
}
return key
case cacheKeySeries:
return "S:" + c.block + ":" + strconv.FormatUint(uint64(c.key.(cacheKeySeries)), 10)
default:
return ""
}
}

func labelMatchersToString(matchers []*labels.Matcher) string {
sb := strings.Builder{}
for i, lbl := range matchers {
sb.WriteString(lbl.String())
if i < len(matchers)-1 {
sb.WriteRune(';')
}
}
return sb.String()
}

type cacheKeyPostings labels.Label
type cacheKeyExpandedPostings string // We don't use []*labels.Matcher because it is not a hashable type so fail at inmemory cache.
type cacheKeySeries uint64
Loading