Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: Make FindTraceByID honor buffer and caching settings #1697

Merged
merged 5 commits into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* [ENHANCEMENT] metrics-generator: expose span size as a metric [#1662](https://github.com/grafana/tempo/pull/1662) (@ie-pham)
* [ENHANCEMENT] Set Max Idle connections to 100 for Azure, should reduce DNS errors in Azure [#1632](https://github.com/grafana/tempo/pull/1632) (@electron0zero)
* [ENHANCEMENT] Add PodDisruptionBudget to ingesters in jsonnet [#1691](https://github.com/grafana/tempo/pull/1691) (@joe-elliott)
* [BUGFIX] Honor caching and buffering settings when finding traces by id [#1697](https://github.com/grafana/tempo/pull/1697) (@joe-elliott)

## v1.5.0 / 2022-08-17

Expand Down
6 changes: 5 additions & 1 deletion cmd/tempo-cli/cmd-query-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/tempo/pkg/model/trace"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/tempodb"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
Expand Down Expand Up @@ -148,7 +149,10 @@ func queryBlock(ctx context.Context, r backend.Reader, c backend.Compactor, bloc
return nil, err
}

trace, err := block.FindTraceByID(ctx, traceID)
searchOpts := common.SearchOptions{}
tempodb.SearchConfig{}.ApplyToOptions(&searchOpts)

trace, err := block.FindTraceByID(ctx, traceID, searchOpts)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (i *instance) FindTraceByID(ctx context.Context, id []byte) (*tempopb.Trace
combiner := trace.NewCombiner()
combiner.Consume(completeTrace)
for _, c := range i.completeBlocks {
found, err := c.FindTraceByID(ctx, id)
found, err := c.FindTraceByID(ctx, id, common.SearchOptions{})
if err != nil {
return nil, fmt.Errorf("completeBlock.FindTraceByID failed: %w", err)
}
Expand Down Expand Up @@ -480,7 +480,8 @@ func (i *instance) AddCompletingBlock(b *wal.AppendBlock, s *search.StreamingSea
}

// getOrCreateTrace will return a new trace object for the given request
// It must be called under the i.tracesMtx lock
//
// It must be called under the i.tracesMtx lock
func (i *instance) getOrCreateTrace(traceID []byte) *liveTrace {
fp := i.tokenForTraceID(traceID)
trace, ok := i.traces[fp]
Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type Finder interface {
FindTraceByID(ctx context.Context, id ID) (*tempopb.Trace, error)
FindTraceByID(ctx context.Context, id ID, opts SearchOptions) (*tempopb.Trace, error)
}

type Searcher interface {
Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/v2/backend_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (b *BackendBlock) BlockMeta() *backend.BlockMeta {
return b.meta
}

func (b *BackendBlock) FindTraceByID(ctx context.Context, id common.ID) (*tempopb.Trace, error) {
func (b *BackendBlock) FindTraceByID(ctx context.Context, id common.ID, _ common.SearchOptions) (*tempopb.Trace, error) {
obj, err := b.find(ctx, id)
if err != nil {
return nil, err
Expand Down
17 changes: 11 additions & 6 deletions tempodb/encoding/vparquet/block_findtracebyid.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (b *backendBlock) checkBloom(ctx context.Context, id common.ID) (found bool
return filter.Test(id), nil
}

func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID) (_ *tempopb.Trace, err error) {
func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID, opts common.SearchOptions) (_ *tempopb.Trace, err error) {
span, derivedCtx := opentracing.StartSpanFromContext(ctx, "parquet.backendBlock.FindTraceByID",
opentracing.Tags{
"blockID": b.meta.BlockID,
Expand All @@ -173,15 +173,20 @@ func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID) (_
return nil, nil
}

// todo: combine with open logic from the other search functions
var readerAt io.ReaderAt
rr := NewBackendReaderAt(derivedCtx, b.r, DataFileName, b.meta.BlockID, b.meta.TenantID)
defer func() { span.SetTag("inspectedBytes", rr.TotalBytesRead) }()
defer func() { span.SetTag("inspectedBytes", rr.TotalBytesRead.Load()) }()

br := tempo_io.NewBufferedReaderAt(rr, int64(b.meta.Size), 512*1024, 32)
readerAt = rr
if opts.ReadBufferCount > 0 {
br := tempo_io.NewBufferedReaderAt(rr, int64(b.meta.Size), opts.ReadBufferSize, opts.ReadBufferCount)

// todo: disabling by default but we should make cache settings configurable here
or := newParquetOptimizedReaderAt(br, rr, int64(b.meta.Size), b.meta.FooterSize, common.CacheControl{Footer: false, ColumnIndex: false, OffsetIndex: false})
or := newParquetOptimizedReaderAt(br, rr, int64(b.meta.Size), b.meta.FooterSize, opts.CacheControl)
readerAt = or
}

pf, err := parquet.OpenFile(or, int64(b.meta.Size))
pf, err := parquet.OpenFile(readerAt, int64(b.meta.Size))
if err != nil {
return nil, errors.Wrap(err, "error opening file in FindTraceByID")
}
Expand Down
4 changes: 2 additions & 2 deletions tempodb/encoding/vparquet/block_findtracebyid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestBackendBlockFindTraceByID(t *testing.T) {
wantProto, err := parquetTraceToTempopbTrace(tr)
require.NoError(t, err)

gotProto, err := b.FindTraceByID(ctx, tr.TraceID)
gotProto, err := b.FindTraceByID(ctx, tr.TraceID, common.SearchOptions{})
require.NoError(t, err)

require.Equal(t, wantProto, gotProto)
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestBackendBlockFindTraceByID_TestData(t *testing.T) {
// fmt.Println(tr)
// fmt.Println("going to search for traceID", util.TraceIDToHexString(tr.TraceID))

protoTr, err := b.FindTraceByID(ctx, tr.TraceID)
protoTr, err := b.FindTraceByID(ctx, tr.TraceID, common.SearchOptions{})
require.NoError(t, err)
require.NotNil(t, protoTr)
}
Expand Down
4 changes: 2 additions & 2 deletions tempodb/encoding/vparquet/block_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (b *backendBlock) Search(ctx context.Context, req *tempopb.SearchRequest, o
defer span.Finish()

rr := NewBackendReaderAt(derivedCtx, b.r, DataFileName, b.meta.BlockID, b.meta.TenantID)
defer func() { span.SetTag("inspectedBytes", rr.TotalBytesRead) }()
defer func() { span.SetTag("inspectedBytes", rr.TotalBytesRead.Load()) }()

br := tempo_io.NewBufferedReaderAt(rr, int64(b.meta.Size), opts.ReadBufferSize, opts.ReadBufferCount)

Expand Down Expand Up @@ -73,7 +73,7 @@ func (b *backendBlock) Search(ctx context.Context, req *tempopb.SearchRequest, o
// TODO: error handling
results := searchParquetFile(derivedCtx, pf, req, rgs)
results.Metrics.InspectedBlocks++
results.Metrics.InspectedBytes += rr.TotalBytesRead
results.Metrics.InspectedBytes += rr.TotalBytesRead.Load()

return results, nil
}
Expand Down
7 changes: 4 additions & 3 deletions tempodb/encoding/vparquet/readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"

"github.com/google/uuid"
"go.uber.org/atomic"

"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding/common"
Expand All @@ -18,17 +19,17 @@ type BackendReaderAt struct {
blockID uuid.UUID
tenantID string

TotalBytesRead uint64
TotalBytesRead atomic.Uint64
}

var _ io.ReaderAt = (*BackendReaderAt)(nil)

func NewBackendReaderAt(ctx context.Context, r backend.Reader, name string, blockID uuid.UUID, tenantID string) *BackendReaderAt {
return &BackendReaderAt{ctx, r, name, blockID, tenantID, 0}
return &BackendReaderAt{ctx, r, name, blockID, tenantID, atomic.Uint64{}}
}

func (b *BackendReaderAt) ReadAt(p []byte, off int64) (int, error) {
b.TotalBytesRead += uint64(len(p))
b.TotalBytesRead.Add(uint64(len(p)))
err := b.r.ReadRange(b.ctx, b.name, b.blockID, b.tenantID, uint64(off), p, false)
return len(p), err
}
Expand Down
10 changes: 8 additions & 2 deletions tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,11 @@ func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID,
return nil, nil, nil
}

opts := common.SearchOptions{}
if rw.cfg != nil && rw.cfg.Search != nil {
rw.cfg.Search.ApplyToOptions(&opts)
}

curTime := time.Now()
partialTraces, funcErrs, err := rw.pool.RunJobs(ctx, copiedBlocklist, func(ctx context.Context, payload interface{}) (interface{}, error) {
meta := payload.(*backend.BlockMeta)
Expand All @@ -325,7 +330,7 @@ func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID,
return nil, errors.Wrap(err, fmt.Sprintf("error opening block for reading, blockID: %s", meta.BlockID.String()))
}

foundObject, err := block.FindTraceByID(ctx, id)
foundObject, err := block.FindTraceByID(ctx, id, opts)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("error finding trace by id, blockID: %s", meta.BlockID.String()))
}
Expand Down Expand Up @@ -390,7 +395,8 @@ func (rw *readerWriter) EnableCompaction(cfg *CompactorConfig, c CompactorSharde
}

// EnablePolling activates the polling loop. Pass nil if this component
// should never be a tenant index builder.
//
// should never be a tenant index builder.
func (rw *readerWriter) EnablePolling(sharder blocklist.JobSharder) {
if sharder == nil {
sharder = blocklist.OwnsNothingSharder
Expand Down
2 changes: 1 addition & 1 deletion tempodb/tempodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ func TestCompleteBlock(t *testing.T) {
require.NoError(t, err, "unexpected error completing block")

for i, id := range ids {
found, err := complete.FindTraceByID(context.TODO(), id)
found, err := complete.FindTraceByID(context.TODO(), id, common.SearchOptions{})
require.NoError(t, err)
require.True(t, proto.Equal(found, reqs[i]))
}
Expand Down
7 changes: 4 additions & 3 deletions tempodb/wal/local_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ func NewLocalBlock(ctx context.Context, existingBlock common.BackendBlock, l *lo
return c, nil
}

func (c *LocalBlock) FindTraceByID(ctx context.Context, id common.ID) (*tempopb.Trace, error) {
return c.BackendBlock.FindTraceByID(ctx, id)
func (c *LocalBlock) FindTraceByID(ctx context.Context, id common.ID, opts common.SearchOptions) (*tempopb.Trace, error) {
return c.BackendBlock.FindTraceByID(ctx, id, opts)
}

// FlushedTime returns the time the block was flushed. Will return 0
// if the block was never flushed
//
// if the block was never flushed
func (c *LocalBlock) FlushedTime() time.Time {
unixTime := c.flushedTime.Load()
if unixTime == 0 {
Expand Down