Skip to content

Commit

Permalink
perf: Introduce fixed size memory pool for bloom querier
Browse files Browse the repository at this point in the history
---

Revert "fix(regression):  reverts #13039 to prevent use-after-free corruptions (#13162)"

This reverts commit 41c5ee2.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Jun 7, 2024
1 parent 691b174 commit 093edd5
Show file tree
Hide file tree
Showing 21 changed files with 703 additions and 99 deletions.
4 changes: 2 additions & 2 deletions pkg/bloombuild/builder/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
})
}

Expand Down Expand Up @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBlooms, 0, len(data))
for _, block := range outputBlocks {
bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize).Iter()
bq := v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize).Iter()
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
})
}

Expand Down Expand Up @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBlooms, 0, len(data))
for _, block := range outputBlocks {
bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize).Iter()
bq := v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize).Iter()
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
44 changes: 0 additions & 44 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,50 +215,6 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
}
})

t.Run("request cancellation does not result in channel locking", func(t *testing.T) {
now := mktime("2024-01-25 10:00")

// replace store implementation and re-initialize workers and sub-services
refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(queriers, metas)
mockStore.delay = 2000 * time.Millisecond

reg := prometheus.NewRegistry()
gw, err := New(cfg, mockStore, logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
require.NoError(t, err)
t.Cleanup(func() {
err = services.StopAndAwaitTerminated(context.Background(), gw)
require.NoError(t, err)
})

chunkRefs := createQueryInputFromBlockData(t, tenantID, data, 100)

// saturate workers
// then send additional request
for i := 0; i < gw.cfg.WorkerConcurrency+1; i++ {
expr, err := syntax.ParseExpr(`{foo="bar"} |= "does not match"`)
require.NoError(t, err)

req := &logproto.FilterChunkRefRequest{
From: now.Add(-24 * time.Hour),
Through: now,
Refs: groupRefs(t, chunkRefs),
Plan: plan.QueryPlan{AST: expr},
Blocks: stringSlice(refs),
}

ctx, cancelFn := context.WithTimeout(context.Background(), 500*time.Millisecond)
ctx = user.InjectOrgID(ctx, tenantID)
t.Cleanup(cancelFn)

res, err := gw.FilterChunkRefs(ctx, req)
require.ErrorContainsf(t, err, context.DeadlineExceeded.Error(), "%+v", res)
}
})

t.Run("returns unfiltered chunk refs if no filters provided", func(t *testing.T) {
now := mktime("2023-10-03 10:00")

Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
// }
// }
querier := &bloomshipper.CloseableBlockQuerier{
BlockQuerier: v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
BlockRef: blockRef,
}
queriers = append(queriers, querier)
Expand Down
15 changes: 15 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"github.com/grafana/loki/v3/pkg/bloomcompactor"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/types"

"github.com/grafana/loki/v3/pkg/analytics"
Expand Down Expand Up @@ -79,6 +80,7 @@ import (
"github.com/grafana/loki/v3/pkg/util/httpreq"
"github.com/grafana/loki/v3/pkg/util/limiter"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/mempool"
"github.com/grafana/loki/v3/pkg/util/querylimits"
lokiring "github.com/grafana/loki/v3/pkg/util/ring"
serverutil "github.com/grafana/loki/v3/pkg/util/server"
Expand Down Expand Up @@ -730,6 +732,19 @@ func (t *Loki) initBloomStore() (services.Service, error) {
reg := prometheus.DefaultRegisterer
bsCfg := t.Cfg.StorageConfig.BloomShipperConfig

// Set global BloomPageAllocator variable
switch bsCfg.MemoryManagement.BloomPageAllocationType {
case "simple":
bloomshipper.BloomPageAllocator = &v1.SimpleHeapAllocator{}
case "dynamic":
bloomshipper.BloomPageAllocator = v1.BloomPagePool
case "fixed":
bloomshipper.BloomPageAllocator = mempool.New("bloom-page-pool", bsCfg.MemoryManagement.BloomPageMemPoolBuckets, reg)
default:
// do nothing
bloomshipper.BloomPageAllocator = nil
}

var metasCache cache.Cache
if t.Cfg.isTarget(IndexGateway) && cache.IsCacheConfigured(bsCfg.MetasCache) {
metasCache, err = cache.New(bsCfg.MetasCache, reg, logger, stats.BloomMetasCache, constants.Loki)
Expand Down
21 changes: 13 additions & 8 deletions pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,18 @@ type BlockQuerier struct {
}

// NewBlockQuerier returns a new BlockQuerier for the given block.
// WARNING: If noCapture is true, the underlying byte slice of the bloom page
// will be returned to the pool for efficiency. This can only safely be used
// when the underlying bloom bytes don't escape the decoder, i.e.
// when loading blooms for querying (bloom-gw) but not for writing (bloom-compactor).
// When usePool is true, the bloom MUST NOT be captured by the caller. Rather,
// it should be discarded before another call to Next().
func NewBlockQuerier(b *Block, usePool bool, maxPageSize int) *BlockQuerier {
// WARNING: You can pass an implementation of Allocator that is responsibe for
// whether the underlying byte slice of the bloom page will be returned to the
// pool for efficiency or not. Returning to the pool can only safely be used
// when the underlying bloom bytes don't escape the decoder, i.e. when loading
// blooms for querying (bloom-gateway), but not for writing (bloom-compactor).
// Therefore, when calling NewBlockQuerier on the write path, you should always
// pass the SimpleHeapAllocator implementation of the Allocator interface.
func NewBlockQuerier(b *Block, alloc Allocator, maxPageSize int) *BlockQuerier {
return &BlockQuerier{
block: b,
LazySeriesIter: NewLazySeriesIter(b),
blooms: NewLazyBloomIter(b, usePool, maxPageSize),
blooms: NewLazyBloomIter(b, alloc, maxPageSize),
}
}

Expand All @@ -144,6 +145,10 @@ func (bq *BlockQuerier) Err() error {
return bq.blooms.Err()
}

func (bq *BlockQuerier) Close() {
bq.blooms.Close()
}

type BlockQuerierIter struct {
*BlockQuerier
}
Expand Down
43 changes: 28 additions & 15 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Bloom struct {

func (b *Bloom) Encode(enc *encoding.Encbuf) error {
// divide by 8 b/c bloom capacity is measured in bits, but we want bytes
buf := bytes.NewBuffer(BloomPagePool.Get(int(b.Capacity() / 8)))
buf := bytes.NewBuffer(make([]byte, 0, int(b.Capacity()/8)))

// TODO(owen-d): have encoder implement writer directly so we don't need
// to indirect via a buffer
Expand All @@ -36,7 +36,6 @@ func (b *Bloom) Encode(enc *encoding.Encbuf) error {
data := buf.Bytes()
enc.PutUvarint(len(data)) // length of bloom filter
enc.PutBytes(data)
BloomPagePool.Put(data[:0]) // release to pool
return nil
}

Expand Down Expand Up @@ -64,11 +63,14 @@ func (b *Bloom) Decode(dec *encoding.Decbuf) error {
return nil
}

func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) {
data := BloomPagePool.Get(page.Len)[:page.Len]
defer BloomPagePool.Put(data)
func LazyDecodeBloomPage(r io.Reader, alloc Allocator, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) {
data, err := alloc.Get(page.Len)
if err != nil {
return nil, errors.Wrap(err, "allocating buffer")
}
defer alloc.Put(data)

_, err := io.ReadFull(r, data)
_, err = io.ReadFull(r, data)
if err != nil {
return nil, errors.Wrap(err, "reading bloom page")
}
Expand All @@ -84,7 +86,10 @@ func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHe
}
defer pool.PutReader(decompressor)

b := BloomPagePool.Get(page.DecompressedLen)[:page.DecompressedLen]
b, err := alloc.Get(page.DecompressedLen)
if err != nil {
return nil, errors.Wrap(err, "allocating buffer")
}

if _, err = io.ReadFull(decompressor, b); err != nil {
return nil, errors.Wrap(err, "decompressing bloom page")
Expand All @@ -96,14 +101,18 @@ func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHe
}

// shortcut to skip allocations when we know the page is not compressed
func LazyDecodeBloomPageNoCompression(r io.Reader, page BloomPageHeader) (*BloomPageDecoder, error) {
func LazyDecodeBloomPageNoCompression(r io.Reader, alloc Allocator, page BloomPageHeader) (*BloomPageDecoder, error) {
// data + checksum
if page.Len != page.DecompressedLen+4 {
return nil, errors.New("the Len and DecompressedLen of the page do not match")
}
data := BloomPagePool.Get(page.Len)[:page.Len]

_, err := io.ReadFull(r, data)
data, err := alloc.Get(page.Len)
if err != nil {
return nil, errors.Wrap(err, "allocating buffer")
}

_, err = io.ReadFull(r, data)
if err != nil {
return nil, errors.Wrap(err, "reading bloom page")
}
Expand Down Expand Up @@ -158,12 +167,16 @@ type BloomPageDecoder struct {
// This can only safely be used when the underlying bloom
// bytes don't escape the decoder:
// on reads in the bloom-gw but not in the bloom-compactor
func (d *BloomPageDecoder) Relinquish() {
func (d *BloomPageDecoder) Relinquish(alloc Allocator) {
if d == nil {
return
}

data := d.data
d.data = nil

if cap(data) > 0 {
BloomPagePool.Put(data)
_ = alloc.Put(data)
}
}

Expand Down Expand Up @@ -271,7 +284,7 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) {
// BloomPageDecoder returns a decoder for the given page index.
// It may skip the page if it's too large.
// NB(owen-d): if `skip` is true, err _must_ be nil.
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) {
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, alloc Allocator, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) {
if pageIdx < 0 || pageIdx >= len(b.pageHeaders) {
metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Add(float64(b.pageHeaders[pageIdx].DecompressedLen))
Expand All @@ -294,9 +307,9 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize
}

if b.schema.encoding == chunkenc.EncNone {
res, err = LazyDecodeBloomPageNoCompression(r, page)
res, err = LazyDecodeBloomPageNoCompression(r, alloc, page)
} else {
res, err = LazyDecodeBloomPage(r, b.schema.DecompressorPool(), page)
res, err = LazyDecodeBloomPage(r, alloc, b.schema.DecompressorPool(), page)
}

if err != nil {
Expand Down
30 changes: 15 additions & 15 deletions pkg/storage/bloom/v1/bloom_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ type BloomQuerier interface {
}

type LazyBloomIter struct {
usePool bool

b *Block
m int // max page size in bytes

alloc Allocator

// state
initialized bool
err error
Expand All @@ -24,11 +24,11 @@ type LazyBloomIter struct {
// will be returned to the pool for efficiency.
// This can only safely be used when the underlying bloom
// bytes don't escape the decoder.
func NewLazyBloomIter(b *Block, pool bool, maxSize int) *LazyBloomIter {
func NewLazyBloomIter(b *Block, alloc Allocator, maxSize int) *LazyBloomIter {
return &LazyBloomIter{
usePool: pool,
b: b,
m: maxSize,
b: b,
m: maxSize,
alloc: alloc,
}
}

Expand All @@ -53,16 +53,14 @@ func (it *LazyBloomIter) LoadOffset(offset BloomOffset) (skip bool) {

// drop the current page if it exists and
// we're using the pool
if it.curPage != nil && it.usePool {
it.curPage.Relinquish()
}
it.curPage.Relinquish(it.alloc)

r, err := it.b.reader.Blooms()
if err != nil {
it.err = errors.Wrap(err, "getting blooms reader")
return false
}
decoder, skip, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics)
decoder, skip, err := it.b.blooms.BloomPageDecoder(r, it.alloc, offset.Page, it.m, it.b.metrics)
if err != nil {
it.err = errors.Wrap(err, "loading bloom page")
return false
Expand Down Expand Up @@ -106,6 +104,7 @@ func (it *LazyBloomIter) next() bool {
var skip bool
it.curPage, skip, err = it.b.blooms.BloomPageDecoder(
r,
it.alloc,
it.curPageIndex,
it.m,
it.b.metrics,
Expand All @@ -130,11 +129,8 @@ func (it *LazyBloomIter) next() bool {

// we've exhausted the current page, progress to next
it.curPageIndex++
// drop the current page if it exists and
// we're using the pool
if it.usePool {
it.curPage.Relinquish()
}
// drop the current page if it exists
it.curPage.Relinquish(it.alloc)
it.curPage = nil
continue
}
Expand All @@ -161,3 +157,7 @@ func (it *LazyBloomIter) Err() error {
return nil
}
}

func (it *LazyBloomIter) Close() {
it.curPage.Relinquish(it.alloc)
}
Loading

0 comments on commit 093edd5

Please sign in to comment.