Skip to content

Commit

Permalink
fix(regression): reverts #13039 to prevent use-after-free corruptions (
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Jun 6, 2024
1 parent 9823f20 commit 41c5ee2
Show file tree
Hide file tree
Showing 21 changed files with 98 additions and 680 deletions.
4 changes: 2 additions & 2 deletions pkg/bloombuild/builder/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize),
})
}

Expand Down Expand Up @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBloom, 0, len(data))
for _, block := range outputBlocks {
bq := v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize)
bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize)
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize),
})
}

Expand Down Expand Up @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBloom, 0, len(data))
for _, block := range outputBlocks {
bq := v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize)
bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize)
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,50 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
}
})

t.Run("request cancellation does not result in channel locking", func(t *testing.T) {
now := mktime("2024-01-25 10:00")

// replace store implementation and re-initialize workers and sub-services
refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(queriers, metas)
mockStore.delay = 2000 * time.Millisecond

reg := prometheus.NewRegistry()
gw, err := New(cfg, mockStore, logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
require.NoError(t, err)
t.Cleanup(func() {
err = services.StopAndAwaitTerminated(context.Background(), gw)
require.NoError(t, err)
})

chunkRefs := createQueryInputFromBlockData(t, tenantID, data, 100)

// saturate workers
// then send additional request
for i := 0; i < gw.cfg.WorkerConcurrency+1; i++ {
expr, err := syntax.ParseExpr(`{foo="bar"} |= "does not match"`)
require.NoError(t, err)

req := &logproto.FilterChunkRefRequest{
From: now.Add(-24 * time.Hour),
Through: now,
Refs: groupRefs(t, chunkRefs),
Plan: plan.QueryPlan{AST: expr},
Blocks: stringSlice(refs),
}

ctx, cancelFn := context.WithTimeout(context.Background(), 500*time.Millisecond)
ctx = user.InjectOrgID(ctx, tenantID)
t.Cleanup(cancelFn)

res, err := gw.FilterChunkRefs(ctx, req)
require.ErrorContainsf(t, err, context.DeadlineExceeded.Error(), "%+v", res)
}
})

t.Run("returns unfiltered chunk refs if no filters provided", func(t *testing.T) {
now := mktime("2023-10-03 10:00")

Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
// }
// }
querier := &bloomshipper.CloseableBlockQuerier{
BlockQuerier: v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize),
BlockRef: blockRef,
}
queriers = append(queriers, querier)
Expand Down
15 changes: 0 additions & 15 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (

"github.com/grafana/loki/v3/pkg/bloomcompactor"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/types"

"github.com/grafana/loki/v3/pkg/analytics"
Expand Down Expand Up @@ -80,7 +79,6 @@ import (
"github.com/grafana/loki/v3/pkg/util/httpreq"
"github.com/grafana/loki/v3/pkg/util/limiter"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/mempool"
"github.com/grafana/loki/v3/pkg/util/querylimits"
lokiring "github.com/grafana/loki/v3/pkg/util/ring"
serverutil "github.com/grafana/loki/v3/pkg/util/server"
Expand Down Expand Up @@ -732,19 +730,6 @@ func (t *Loki) initBloomStore() (services.Service, error) {
reg := prometheus.DefaultRegisterer
bsCfg := t.Cfg.StorageConfig.BloomShipperConfig

// Set global BloomPageAllocator variable
switch bsCfg.MemoryManagement.BloomPageAllocationType {
case "simple":
bloomshipper.BloomPageAllocator = &v1.SimpleHeapAllocator{}
case "dynamic":
bloomshipper.BloomPageAllocator = v1.BloomPagePool
case "fixed":
bloomshipper.BloomPageAllocator = mempool.New("bloom-page-pool", bsCfg.MemoryManagement.BloomPageMemPoolBuckets, reg)
default:
// do nothing
bloomshipper.BloomPageAllocator = nil
}

var metasCache cache.Cache
if t.Cfg.isTarget(IndexGateway) && cache.IsCacheConfigured(bsCfg.MetasCache) {
metasCache, err = cache.New(bsCfg.MetasCache, reg, logger, stats.BloomMetasCache, constants.Loki)
Expand Down
8 changes: 2 additions & 6 deletions pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ type BlockQuerier struct {
// will be returned to the pool for efficiency. This can only safely be used
// when the underlying bloom bytes don't escape the decoder, i.e.
// when loading blooms for querying (bloom-gw) but not for writing (bloom-compactor).
func NewBlockQuerier(b *Block, alloc Allocator, maxPageSize int) *BlockQuerier {
func NewBlockQuerier(b *Block, noCapture bool, maxPageSize int) *BlockQuerier {
return &BlockQuerier{
block: b,
series: NewLazySeriesIter(b),
blooms: NewLazyBloomIter(b, alloc, maxPageSize),
blooms: NewLazyBloomIter(b, noCapture, maxPageSize),
}
}

Expand Down Expand Up @@ -173,7 +173,3 @@ func (bq *BlockQuerier) Err() error {

return bq.blooms.Err()
}

func (bq *BlockQuerier) Close() {
bq.blooms.Close()
}
43 changes: 15 additions & 28 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Bloom struct {

func (b *Bloom) Encode(enc *encoding.Encbuf) error {
// divide by 8 b/c bloom capacity is measured in bits, but we want bytes
buf := bytes.NewBuffer(make([]byte, 0, int(b.Capacity()/8)))
buf := bytes.NewBuffer(BloomPagePool.Get(int(b.Capacity() / 8)))

// TODO(owen-d): have encoder implement writer directly so we don't need
// to indirect via a buffer
Expand All @@ -36,6 +36,7 @@ func (b *Bloom) Encode(enc *encoding.Encbuf) error {
data := buf.Bytes()
enc.PutUvarint(len(data)) // length of bloom filter
enc.PutBytes(data)
BloomPagePool.Put(data[:0]) // release to pool
return nil
}

Expand Down Expand Up @@ -63,14 +64,11 @@ func (b *Bloom) Decode(dec *encoding.Decbuf) error {
return nil
}

func LazyDecodeBloomPage(r io.Reader, alloc Allocator, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) {
data, err := alloc.Get(page.Len)
if err != nil {
return nil, errors.Wrap(err, "allocating buffer")
}
defer alloc.Put(data)
func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) {
data := BloomPagePool.Get(page.Len)[:page.Len]
defer BloomPagePool.Put(data)

_, err = io.ReadFull(r, data)
_, err := io.ReadFull(r, data)
if err != nil {
return nil, errors.Wrap(err, "reading bloom page")
}
Expand All @@ -86,10 +84,7 @@ func LazyDecodeBloomPage(r io.Reader, alloc Allocator, pool chunkenc.ReaderPool,
}
defer pool.PutReader(decompressor)

b, err := alloc.Get(page.DecompressedLen)
if err != nil {
return nil, errors.Wrap(err, "allocating buffer")
}
b := BloomPagePool.Get(page.DecompressedLen)[:page.DecompressedLen]

if _, err = io.ReadFull(decompressor, b); err != nil {
return nil, errors.Wrap(err, "decompressing bloom page")
Expand All @@ -101,18 +96,14 @@ func LazyDecodeBloomPage(r io.Reader, alloc Allocator, pool chunkenc.ReaderPool,
}

// shortcut to skip allocations when we know the page is not compressed
func LazyDecodeBloomPageNoCompression(r io.Reader, alloc Allocator, page BloomPageHeader) (*BloomPageDecoder, error) {
func LazyDecodeBloomPageNoCompression(r io.Reader, page BloomPageHeader) (*BloomPageDecoder, error) {
// data + checksum
if page.Len != page.DecompressedLen+4 {
return nil, errors.New("the Len and DecompressedLen of the page do not match")
}
data := BloomPagePool.Get(page.Len)[:page.Len]

data, err := alloc.Get(page.Len)
if err != nil {
return nil, errors.Wrap(err, "allocating buffer")
}

_, err = io.ReadFull(r, data)
_, err := io.ReadFull(r, data)
if err != nil {
return nil, errors.Wrap(err, "reading bloom page")
}
Expand Down Expand Up @@ -167,16 +158,12 @@ type BloomPageDecoder struct {
// This can only safely be used when the underlying bloom
// bytes don't escape the decoder:
// on reads in the bloom-gw but not in the bloom-compactor
func (d *BloomPageDecoder) Relinquish(alloc Allocator) {
if d == nil {
return
}

func (d *BloomPageDecoder) Relinquish() {
data := d.data
d.data = nil

if cap(data) > 0 {
_ = alloc.Put(data)
BloomPagePool.Put(data)
}
}

Expand Down Expand Up @@ -290,7 +277,7 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) {
// BloomPageDecoder returns a decoder for the given page index.
// It may skip the page if it's too large.
// NB(owen-d): if `skip` is true, err _must_ be nil.
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, alloc Allocator, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) {
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) {
if pageIdx < 0 || pageIdx >= len(b.pageHeaders) {
metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Add(float64(b.pageHeaders[pageIdx].DecompressedLen))
Expand All @@ -313,9 +300,9 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, alloc Allocator, pageIdx
}

if b.schema.encoding == chunkenc.EncNone {
res, err = LazyDecodeBloomPageNoCompression(r, alloc, page)
res, err = LazyDecodeBloomPageNoCompression(r, page)
} else {
res, err = LazyDecodeBloomPage(r, alloc, b.schema.DecompressorPool(), page)
res, err = LazyDecodeBloomPage(r, b.schema.DecompressorPool(), page)
}

if err != nil {
Expand Down
30 changes: 15 additions & 15 deletions pkg/storage/bloom/v1/bloom_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ type BloomQuerier interface {
}

type LazyBloomIter struct {
usePool bool

b *Block
m int // max page size in bytes

alloc Allocator

// state
initialized bool
err error
Expand All @@ -24,11 +24,11 @@ type LazyBloomIter struct {
// will be returned to the pool for efficiency.
// This can only safely be used when the underlying bloom
// bytes don't escape the decoder.
func NewLazyBloomIter(b *Block, alloc Allocator, maxSize int) *LazyBloomIter {
func NewLazyBloomIter(b *Block, pool bool, maxSize int) *LazyBloomIter {
return &LazyBloomIter{
b: b,
m: maxSize,
alloc: alloc,
usePool: pool,
b: b,
m: maxSize,
}
}

Expand All @@ -53,14 +53,16 @@ func (it *LazyBloomIter) LoadOffset(offset BloomOffset) (skip bool) {

// drop the current page if it exists and
// we're using the pool
it.curPage.Relinquish(it.alloc)
if it.curPage != nil && it.usePool {
it.curPage.Relinquish()
}

r, err := it.b.reader.Blooms()
if err != nil {
it.err = errors.Wrap(err, "getting blooms reader")
return false
}
decoder, skip, err := it.b.blooms.BloomPageDecoder(r, it.alloc, offset.Page, it.m, it.b.metrics)
decoder, skip, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics)
if err != nil {
it.err = errors.Wrap(err, "loading bloom page")
return false
Expand Down Expand Up @@ -104,7 +106,6 @@ func (it *LazyBloomIter) next() bool {
var skip bool
it.curPage, skip, err = it.b.blooms.BloomPageDecoder(
r,
it.alloc,
it.curPageIndex,
it.m,
it.b.metrics,
Expand All @@ -129,8 +130,11 @@ func (it *LazyBloomIter) next() bool {

// we've exhausted the current page, progress to next
it.curPageIndex++
// drop the current page if it exists
it.curPage.Relinquish(it.alloc)
// drop the current page if it exists and
// we're using the pool
if it.usePool {
it.curPage.Relinquish()
}
it.curPage = nil
continue
}
Expand All @@ -157,7 +161,3 @@ func (it *LazyBloomIter) Err() error {
return nil
}
}

func (it *LazyBloomIter) Close() {
it.curPage.Relinquish(it.alloc)
}
12 changes: 6 additions & 6 deletions pkg/storage/bloom/v1/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestBlockBuilder_RoundTrip(t *testing.T) {
}

block := NewBlock(tc.reader, NewMetrics(nil))
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize)
querier := NewBlockQuerier(block, false, DefaultMaxPageSize)

err = block.LoadHeaders()
require.Nil(t, err)
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestMergeBuilder(t *testing.T) {
itr := NewSliceIter[SeriesWithBloom](data[min:max])
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &SimpleHeapAllocator{}, DefaultMaxPageSize)))
blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false, DefaultMaxPageSize)))
}

// We're not testing the ability to extend a bloom in this test
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestMergeBuilder(t *testing.T) {
require.Nil(t, err)

block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize)
querier := NewBlockQuerier(block, false, DefaultMaxPageSize)

EqualIterators[*SeriesWithBloom](
t,
Expand Down Expand Up @@ -296,7 +296,7 @@ func TestBlockReset(t *testing.T) {
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize)
querier := NewBlockQuerier(block, false, DefaultMaxPageSize)

rounds := make([][]model.Fingerprint, 2)

Expand Down Expand Up @@ -362,7 +362,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize)
querier := NewBlockQuerier(block, false, DefaultMaxPageSize)

// rather than use the block querier directly, collect it's data
// so we can use it in a few places later
Expand Down Expand Up @@ -423,7 +423,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {

// ensure the new block contains one copy of all the data
// by comparing it against an iterator over the source data
mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &SimpleHeapAllocator{}, DefaultMaxPageSize)
mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false, DefaultMaxPageSize)
sourceItr := NewSliceIter[*SeriesWithBloom](PointerSlice[SeriesWithBloom](xs))

EqualIterators[*SeriesWithBloom](
Expand Down
Loading

0 comments on commit 41c5ee2

Please sign in to comment.