diff --git a/CHANGELOG.md b/CHANGELOG.md index b2265bc9146..5cf168f14fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * [ENHANCEMENT] metrics-generator: expose span size as a metric [#1662](https://github.com/grafana/tempo/pull/1662) (@ie-pham) * [ENHANCEMENT] Set Max Idle connections to 100 for Azure, should reduce DNS errors in Azure [#1632](https://github.com/grafana/tempo/pull/1632) (@electron0zero) * [ENHANCEMENT] Add PodDisruptionBudget to ingesters in jsonnet [#1691](https://github.com/grafana/tempo/pull/1691) (@joe-elliott) +* [BUGFIX] Honor caching and buffering settings when finding traces by id [#1697](https://github.com/grafana/tempo/pull/1697) (@joe-elliott) ## v1.5.0 / 2022-08-17 diff --git a/cmd/tempo-cli/cmd-query-blocks.go b/cmd/tempo-cli/cmd-query-blocks.go index 34e5dc39fbd..6e695a8be4f 100644 --- a/cmd/tempo-cli/cmd-query-blocks.go +++ b/cmd/tempo-cli/cmd-query-blocks.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/tempo/pkg/model/trace" "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util" + "github.com/grafana/tempo/tempodb" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" @@ -148,7 +149,10 @@ func queryBlock(ctx context.Context, r backend.Reader, c backend.Compactor, bloc return nil, err } - trace, err := block.FindTraceByID(ctx, traceID) + searchOpts := common.SearchOptions{} + tempodb.SearchConfig{}.ApplyToOptions(&searchOpts) + + trace, err := block.FindTraceByID(ctx, traceID, searchOpts) if err != nil { return nil, err } diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index 47b09a01790..fbc25367525 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -452,7 +452,7 @@ func (i *instance) FindTraceByID(ctx context.Context, id []byte) (*tempopb.Trace combiner := trace.NewCombiner() combiner.Consume(completeTrace) for _, c := range i.completeBlocks { - found, err := c.FindTraceByID(ctx, id) + found, err := c.FindTraceByID(ctx, id, common.SearchOptions{}) if err != nil { return nil, fmt.Errorf("completeBlock.FindTraceByID failed: %w", err) } @@ -480,7 +480,8 @@ func (i *instance) AddCompletingBlock(b *wal.AppendBlock, s *search.StreamingSea } // getOrCreateTrace will return a new trace object for the given request -// It must be called under the i.tracesMtx lock +// +// It must be called under the i.tracesMtx lock func (i *instance) getOrCreateTrace(traceID []byte) *liveTrace { fp := i.tokenForTraceID(traceID) trace, ok := i.traces[fp] diff --git a/tempodb/encoding/common/interfaces.go b/tempodb/encoding/common/interfaces.go index 01a005c036e..500ef9d08f6 100644 --- a/tempodb/encoding/common/interfaces.go +++ b/tempodb/encoding/common/interfaces.go @@ -11,7 +11,7 @@ import ( ) type Finder interface { - FindTraceByID(ctx context.Context, id ID) (*tempopb.Trace, error) + FindTraceByID(ctx context.Context, id ID, opts SearchOptions) (*tempopb.Trace, error) } type Searcher interface { diff --git a/tempodb/encoding/v2/backend_block.go b/tempodb/encoding/v2/backend_block.go index 810b3b8ee04..e17ea736ae9 100644 --- a/tempodb/encoding/v2/backend_block.go +++ b/tempodb/encoding/v2/backend_block.go @@ -138,7 +138,7 @@ func (b *BackendBlock) BlockMeta() *backend.BlockMeta { return b.meta } -func (b *BackendBlock) FindTraceByID(ctx context.Context, id common.ID) (*tempopb.Trace, error) { +func (b *BackendBlock) FindTraceByID(ctx context.Context, id common.ID, _ common.SearchOptions) (*tempopb.Trace, error) { obj, err := b.find(ctx, id) if err != nil { return nil, err diff --git a/tempodb/encoding/vparquet/block_findtracebyid.go b/tempodb/encoding/vparquet/block_findtracebyid.go index 0796adbc313..97bf6ba0348 100644 --- a/tempodb/encoding/vparquet/block_findtracebyid.go +++ b/tempodb/encoding/vparquet/block_findtracebyid.go @@ -156,7 +156,7 @@ func (b *backendBlock) checkBloom(ctx context.Context, id common.ID) (found bool return filter.Test(id), nil } -func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID) (_ *tempopb.Trace, err error) { +func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID, opts common.SearchOptions) (_ *tempopb.Trace, err error) { span, derivedCtx := opentracing.StartSpanFromContext(ctx, "parquet.backendBlock.FindTraceByID", opentracing.Tags{ "blockID": b.meta.BlockID, @@ -173,15 +173,20 @@ func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID) (_ return nil, nil } + // todo: combine with open logic from the other search functions + var readerAt io.ReaderAt rr := NewBackendReaderAt(derivedCtx, b.r, DataFileName, b.meta.BlockID, b.meta.TenantID) - defer func() { span.SetTag("inspectedBytes", rr.TotalBytesRead) }() + defer func() { span.SetTag("inspectedBytes", rr.TotalBytesRead.Load()) }() - br := tempo_io.NewBufferedReaderAt(rr, int64(b.meta.Size), 512*1024, 32) + readerAt = rr + if opts.ReadBufferCount > 0 { + br := tempo_io.NewBufferedReaderAt(rr, int64(b.meta.Size), opts.ReadBufferSize, opts.ReadBufferCount) - // todo: disabling by default but we should make cache settings configurable here - or := newParquetOptimizedReaderAt(br, rr, int64(b.meta.Size), b.meta.FooterSize, common.CacheControl{Footer: false, ColumnIndex: false, OffsetIndex: false}) + or := newParquetOptimizedReaderAt(br, rr, int64(b.meta.Size), b.meta.FooterSize, opts.CacheControl) + readerAt = or + } - pf, err := parquet.OpenFile(or, int64(b.meta.Size)) + pf, err := parquet.OpenFile(readerAt, int64(b.meta.Size)) if err != nil { return nil, errors.Wrap(err, "error opening file in FindTraceByID") } diff --git a/tempodb/encoding/vparquet/block_findtracebyid_test.go b/tempodb/encoding/vparquet/block_findtracebyid_test.go index 7e1610f429d..9e9810673e3 100644 --- a/tempodb/encoding/vparquet/block_findtracebyid_test.go +++ b/tempodb/encoding/vparquet/block_findtracebyid_test.go @@ -95,7 +95,7 @@ func TestBackendBlockFindTraceByID(t *testing.T) { wantProto, err := parquetTraceToTempopbTrace(tr) require.NoError(t, err) - gotProto, err := b.FindTraceByID(ctx, tr.TraceID) + gotProto, err := b.FindTraceByID(ctx, tr.TraceID, common.SearchOptions{}) require.NoError(t, err) require.Equal(t, wantProto, gotProto) @@ -134,7 +134,7 @@ func TestBackendBlockFindTraceByID_TestData(t *testing.T) { // fmt.Println(tr) // fmt.Println("going to search for traceID", util.TraceIDToHexString(tr.TraceID)) - protoTr, err := b.FindTraceByID(ctx, tr.TraceID) + protoTr, err := b.FindTraceByID(ctx, tr.TraceID, common.SearchOptions{}) require.NoError(t, err) require.NotNil(t, protoTr) } diff --git a/tempodb/encoding/vparquet/block_search.go b/tempodb/encoding/vparquet/block_search.go index 43bbe70035c..84d65436b1a 100644 --- a/tempodb/encoding/vparquet/block_search.go +++ b/tempodb/encoding/vparquet/block_search.go @@ -43,7 +43,7 @@ func (b *backendBlock) Search(ctx context.Context, req *tempopb.SearchRequest, o defer span.Finish() rr := NewBackendReaderAt(derivedCtx, b.r, DataFileName, b.meta.BlockID, b.meta.TenantID) - defer func() { span.SetTag("inspectedBytes", rr.TotalBytesRead) }() + defer func() { span.SetTag("inspectedBytes", rr.TotalBytesRead.Load()) }() br := tempo_io.NewBufferedReaderAt(rr, int64(b.meta.Size), opts.ReadBufferSize, opts.ReadBufferCount) @@ -73,7 +73,7 @@ func (b *backendBlock) Search(ctx context.Context, req *tempopb.SearchRequest, o // TODO: error handling results := searchParquetFile(derivedCtx, pf, req, rgs) results.Metrics.InspectedBlocks++ - results.Metrics.InspectedBytes += rr.TotalBytesRead + results.Metrics.InspectedBytes += rr.TotalBytesRead.Load() return results, nil } diff --git a/tempodb/encoding/vparquet/readers.go b/tempodb/encoding/vparquet/readers.go index 53b49ea7e7d..1e4190d6a72 100644 --- a/tempodb/encoding/vparquet/readers.go +++ b/tempodb/encoding/vparquet/readers.go @@ -6,6 +6,7 @@ import ( "io" "github.com/google/uuid" + "go.uber.org/atomic" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" @@ -18,17 +19,17 @@ type BackendReaderAt struct { blockID uuid.UUID tenantID string - TotalBytesRead uint64 + TotalBytesRead atomic.Uint64 } var _ io.ReaderAt = (*BackendReaderAt)(nil) func NewBackendReaderAt(ctx context.Context, r backend.Reader, name string, blockID uuid.UUID, tenantID string) *BackendReaderAt { - return &BackendReaderAt{ctx, r, name, blockID, tenantID, 0} + return &BackendReaderAt{ctx, r, name, blockID, tenantID, atomic.Uint64{}} } func (b *BackendReaderAt) ReadAt(p []byte, off int64) (int, error) { - b.TotalBytesRead += uint64(len(p)) + b.TotalBytesRead.Add(uint64(len(p))) err := b.r.ReadRange(b.ctx, b.name, b.blockID, b.tenantID, uint64(off), p, false) return len(p), err } diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index 1e971db0cf4..91d011551df 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -316,6 +316,11 @@ func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID, return nil, nil, nil } + opts := common.SearchOptions{} + if rw.cfg != nil && rw.cfg.Search != nil { + rw.cfg.Search.ApplyToOptions(&opts) + } + curTime := time.Now() partialTraces, funcErrs, err := rw.pool.RunJobs(ctx, copiedBlocklist, func(ctx context.Context, payload interface{}) (interface{}, error) { meta := payload.(*backend.BlockMeta) @@ -325,7 +330,7 @@ func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID, return nil, errors.Wrap(err, fmt.Sprintf("error opening block for reading, blockID: %s", meta.BlockID.String())) } - foundObject, err := block.FindTraceByID(ctx, id) + foundObject, err := block.FindTraceByID(ctx, id, opts) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("error finding trace by id, blockID: %s", meta.BlockID.String())) } @@ -390,7 +395,8 @@ func (rw *readerWriter) EnableCompaction(cfg *CompactorConfig, c CompactorSharde } // EnablePolling activates the polling loop. Pass nil if this component -// should never be a tenant index builder. +// +// should never be a tenant index builder. func (rw *readerWriter) EnablePolling(sharder blocklist.JobSharder) { if sharder == nil { sharder = blocklist.OwnsNothingSharder diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index c951ec3360d..92e3f755dce 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -592,7 +592,7 @@ func TestCompleteBlock(t *testing.T) { require.NoError(t, err, "unexpected error completing block") for i, id := range ids { - found, err := complete.FindTraceByID(context.TODO(), id) + found, err := complete.FindTraceByID(context.TODO(), id, common.SearchOptions{}) require.NoError(t, err) require.True(t, proto.Equal(found, reqs[i])) } diff --git a/tempodb/wal/local_block.go b/tempodb/wal/local_block.go index cc234cbec00..7ba1d9c3362 100644 --- a/tempodb/wal/local_block.go +++ b/tempodb/wal/local_block.go @@ -48,12 +48,13 @@ func NewLocalBlock(ctx context.Context, existingBlock common.BackendBlock, l *lo return c, nil } -func (c *LocalBlock) FindTraceByID(ctx context.Context, id common.ID) (*tempopb.Trace, error) { - return c.BackendBlock.FindTraceByID(ctx, id) +func (c *LocalBlock) FindTraceByID(ctx context.Context, id common.ID, opts common.SearchOptions) (*tempopb.Trace, error) { + return c.BackendBlock.FindTraceByID(ctx, id, opts) } // FlushedTime returns the time the block was flushed. Will return 0 -// if the block was never flushed +// +// if the block was never flushed func (c *LocalBlock) FlushedTime() time.Time { unixTime := c.flushedTime.Load() if unixTime == 0 {