Skip to content

Commit

Permalink
TraceQL metrics - RF1 read path
Browse files Browse the repository at this point in the history
  • Loading branch information
mapno committed May 28, 2024
1 parent 7c68b25 commit 9984f35
Show file tree
Hide file tree
Showing 9 changed files with 732 additions and 256 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
## main / unreleased

* [FEATURE] TraceQL support for event scope and event:name intrinsic [#3708](https://github.com/grafana/tempo/pull/3708) (@stoewer)
* [FEATURE] Flush blocks to storage from the metrics-generator [#3628](https://github.com/grafana/tempo/pull/3628) [#3691](https://github.com/grafana/tempo/pull/3691) (@mapno)
* [FEATURE] Flush and query RF1 blocks for TraceQL metric queries [#3628](https://github.com/grafana/tempo/pull/3628) [#3691](https://github.com/grafana/tempo/pull/3691) [#3723](https://github.com/grafana/tempo/pull/3723) (@mapno)

## v2.5.0-rc.1

Expand Down
141 changes: 54 additions & 87 deletions modules/frontend/metrics_query_range_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"math"
"net/http"
"time"

Expand Down Expand Up @@ -110,7 +109,11 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (pipeline.Responses[combin
reqCh <- generatorReq
}

totalJobs, totalBlocks, totalBlockBytes := s.backendRequests(ctx, tenantID, r, *req, now, samplingRate, targetBytesPerRequest, interval, reqCh, nil)
totalJobs, totalBlocks, totalBlockBytes := s.backendRequests(ctx, tenantID, r, *req, now, samplingRate, targetBytesPerRequest, interval, reqCh)

span.SetTag("totalJobs", totalJobs)
span.SetTag("totalBlocks", totalBlocks)
span.SetTag("totalBlockBytes", totalBlockBytes)

// send a job to communicate the search metrics. this is consumed by the combiner to calculate totalblocks/bytes/jobs
var jobMetricsResponse pipeline.Responses[combiner.PipelineResponse]
Expand Down Expand Up @@ -142,15 +145,16 @@ func (s *queryRangeSharder) blockMetas(start, end int64, tenantID string) []*bac
metas := make([]*backend.BlockMeta, 0, len(allMetas)/50) // divide by 50 for luck
for _, m := range allMetas {
if m.StartTime.UnixNano() <= end &&
m.EndTime.UnixNano() >= start {
m.EndTime.UnixNano() >= start &&
m.ReplicationFactor == 1 { // Only consider blocks with replication factor 1
metas = append(metas, m)
}
}

return metas
}

func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, now time.Time, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan *http.Request, _ func(error)) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) {
func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, now time.Time, _ float64, targetBytesPerRequest int, _ time.Duration, reqCh chan *http.Request) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) {
// request without start or end, search only in generator
if searchReq.Start == 0 || searchReq.End == 0 {
close(reqCh)
Expand All @@ -177,109 +181,71 @@ func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string
return
}

// count blocks
// calculate metrics to return to the caller
totalBlocks = uint32(len(blocks))
for _, b := range blocks {
totalBlockBytes += b.Size
}

// count jobs. same loops as below
var (
start = backendReq.Start
end = backendReq.End
timeWindowSize = uint64(interval.Nanoseconds())
)

for start < end {
thisStart := start
thisEnd := start + timeWindowSize
if thisEnd > end {
thisEnd = end
}
p := pagesPerRequest(b, targetBytesPerRequest)

blocks := s.blockMetas(int64(thisStart), int64(thisEnd), tenantID)
if len(blocks) == 0 {
start = thisEnd
continue
}

totalBlockSize := uint64(0)
for _, b := range blocks {
totalBlockSize += b.Size
}

shards := uint32(math.Ceil(float64(totalBlockSize) / float64(targetBytesPerRequest)))

for i := uint32(1); i <= shards; i++ {
totalJobs += b.TotalRecords / uint32(p)
if int(b.TotalRecords)%p != 0 {
totalJobs++
}

start = thisEnd
totalBlockBytes += b.Size
}

go func() {
s.buildBackendRequests(ctx, tenantID, parent, backendReq, samplingRate, targetBytesPerRequest, interval, reqCh, nil)
s.buildBackendRequests(ctx, tenantID, parent, backendReq, blocks, targetBytesPerRequest, reqCh)
}()

return
}

func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan *http.Request, _ func(error)) {
func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, metas []*backend.BlockMeta, targetBytesPerRequest int, reqCh chan<- *http.Request) {
defer close(reqCh)

var (
start = searchReq.Start
end = searchReq.End
timeWindowSize = uint64(interval.Nanoseconds())
)

for start < end {
thisStart := start
thisEnd := start + timeWindowSize
if thisEnd > end {
thisEnd = end
}

blocks := s.blockMetas(int64(thisStart), int64(thisEnd), tenantID)
if len(blocks) == 0 {
start = thisEnd
for _, m := range metas {
pages := pagesPerRequest(m, targetBytesPerRequest)
if pages == 0 {
continue
}

totalBlockSize := uint64(0)
for _, b := range blocks {
totalBlockSize += b.Size
}

shards := uint32(math.Ceil(float64(totalBlockSize) / float64(targetBytesPerRequest)))

for i := uint32(1); i <= shards; i++ {

shardR := searchReq
shardR.Start = thisStart
shardR.End = thisEnd
shardR.ShardID = i
shardR.ShardCount = shards

httpReq := s.toUpstreamRequest(ctx, shardR, parent, tenantID)
if samplingRate != 1.0 {
shardR.ShardID *= uint32(1.0 / samplingRate)
shardR.ShardCount *= uint32(1.0 / samplingRate)
for startPage := 0; startPage < int(m.TotalRecords); startPage += pages {
subR := parent.Clone(ctx)

// Set final sampling rate after integer rounding
samplingRate = float64(shards) / float64(shardR.ShardCount)

httpReq = pipeline.ContextAddAdditionalData(samplingRate, httpReq)
dc, err := m.DedicatedColumns.ToTempopb()
if err != nil {
// errFn(fmt.Errorf("failed to convert dedicated columns. block: %s tempopb: %w", blockID, err))
continue
}

subR = api.BuildQueryRangeRequest(subR, &tempopb.QueryRangeRequest{
Query: searchReq.Query,
Start: searchReq.Start,
End: searchReq.End,
Step: searchReq.Step,
// ShardID: uint32, // No sharding with RF=1
// ShardCount: uint32, // No sharding with RF=1
QueryMode: searchReq.QueryMode,
// New RF1 fields
BlockID: m.BlockID.String(),
StartPage: uint32(startPage),
PagesToSearch: uint32(pages),
Version: m.Version,
Encoding: m.Encoding.String(),
Size_: m.Size,
FooterSize: m.FooterSize,
DedicatedColumns: dc,
})

prepareRequestForQueriers(subR, tenantID, subR.URL.Path, subR.URL.Query())
// TODO: Handle sampling rate

select {
case reqCh <- httpReq:
case reqCh <- subR:
case <-ctx.Done():
return
}
}

start = thisEnd
}
}

Expand Down Expand Up @@ -319,12 +285,13 @@ func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest

searchReq.QueryMode = querier.QueryModeRecent

// No sharding on the generators (unnecessary), but we do apply sampling
// rates. In this case we execute a single arbitrary shard. Choosing
// the last shard works. The first shard should be avoided because it is
// weighted slightly off due to int63/128 sharding boundaries.
searchReq.ShardID = uint32(1.0 / samplingRate)
searchReq.ShardCount = uint32(1.0 / samplingRate)
// TODO: No sharding with RF=1. Remove
//// No sharding on the generators (unnecessary), but we do apply sampling
//// rates. In this case we execute a single arbitrary shard. Choosing
//// the last shard works. The first shard should be avoided because it is
//// weighted slightly off due to int63/128 sharding boundaries.
//searchReq.ShardID = uint32(1.0 / samplingRate)
//searchReq.ShardCount = uint32(1.0 / samplingRate)

return s.toUpstreamRequest(parent.Context(), searchReq, parent, tenantID)
}
Expand Down
3 changes: 2 additions & 1 deletion modules/frontend/search_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ func (s *asyncSearchSharder) blockMetas(start, end int64, tenantID string) []*ba
metas := make([]*backend.BlockMeta, 0, len(allMetas)/50) // divide by 50 for luck
for _, m := range allMetas {
if m.StartTime.Unix() <= end &&
m.EndTime.Unix() >= start {
m.EndTime.Unix() >= start &&
m.ReplicationFactor != 1 { // TODO: hacky
metas = append(metas, m)
}
}
Expand Down
83 changes: 83 additions & 0 deletions modules/querier/querier_query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/user"
"github.com/grafana/tempo/pkg/boundedwaitgroup"
Expand All @@ -24,6 +25,10 @@ func (q *Querier) QueryRange(ctx context.Context, req *tempopb.QueryRangeRequest
return q.queryRangeRecent(ctx, req)
}

if req.ShardCount == 0 { // RF1 search
return q.queryBlock(ctx, req)
}

// Backend requests go here
return q.queryBackend(ctx, req)
}
Expand Down Expand Up @@ -59,6 +64,84 @@ func (q *Querier) queryRangeRecent(ctx context.Context, req *tempopb.QueryRangeR
return c.Response(), nil
}

func (q *Querier) queryBlock(ctx context.Context, req *tempopb.QueryRangeRequest) (*tempopb.QueryRangeResponse, error) {
tenantID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, fmt.Errorf("error extracting org id in Querier.BackendSearch: %w", err)
}

blockID, err := uuid.Parse(req.BlockID)
if err != nil {
return nil, err
}

enc, err := backend.ParseEncoding(req.Encoding)
if err != nil {
return nil, err
}

dc, err := backend.DedicatedColumnsFromTempopb(req.DedicatedColumns)
if err != nil {
return nil, err
}

meta := &backend.BlockMeta{
Version: req.Version,
TenantID: tenantID,
StartTime: time.Unix(0, int64(req.Start)),
EndTime: time.Unix(0, int64(req.End)),
Encoding: enc,
// IndexPageSize: req.IndexPageSize,
// TotalRecords: req.TotalRecords,
BlockID: blockID,
// DataEncoding: req.DataEncoding,
Size: req.Size_,
FooterSize: req.FooterSize,
DedicatedColumns: dc,
}

opts := common.DefaultSearchOptions()
opts.StartPage = int(req.StartPage)
opts.TotalPages = int(req.PagesToSearch)

unsafe := q.limits.UnsafeQueryHints(tenantID)

expr, err := traceql.Parse(req.Query)
if err != nil {
return nil, err
}

timeOverlapCutoff := q.cfg.Metrics.TimeOverlapCutoff
if v, ok := expr.Hints.GetFloat(traceql.HintTimeOverlapCutoff, unsafe); ok && v >= 0 && v <= 1.0 {
timeOverlapCutoff = v
}

eval, err := traceql.NewEngine().CompileMetricsQueryRange(req, false, timeOverlapCutoff, unsafe)
if err != nil {
return nil, err
}

f := traceql.NewSpansetFetcherWrapper(func(ctx context.Context, req traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) {
return q.store.Fetch(ctx, meta, req, opts)
})
err = eval.Do(ctx, f, 0, 0)
if err != nil {
return nil, err
}

res := eval.Results()

inspectedBytes, spansTotal, _ := eval.Metrics()

return &tempopb.QueryRangeResponse{
Series: queryRangeTraceQLToProto(res, req),
Metrics: &tempopb.SearchMetrics{
InspectedBytes: inspectedBytes,
InspectedSpans: spansTotal,
},
}, nil
}

func (q *Querier) queryBackend(ctx context.Context, req *tempopb.QueryRangeRequest) (*tempopb.QueryRangeResponse, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
Loading

0 comments on commit 9984f35

Please sign in to comment.