Skip to content

Commit

Permalink
Read path for RF1 generator blocks [3 of 3] (#3723)
Browse files Browse the repository at this point in the history
* TraceQL metrics - RF1 read path

* Maintain old metrics read path

* Fix req time alignment

* Address review comments

* Update docs

* Fix module optionalstore check, it was inverted

---------

Co-authored-by: Martin Disibio <[email protected]>
  • Loading branch information
mapno and mdisibio authored May 31, 2024
1 parent f9bd4fc commit b4514b9
Show file tree
Hide file tree
Showing 18 changed files with 861 additions and 224 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
## main / unreleased

* [FEATURE] TraceQL support for event scope and event:name intrinsic [#3708](https://github.com/grafana/tempo/pull/3708) (@stoewer)
* [FEATURE] Flush blocks to storage from the metrics-generator [#3628](https://github.com/grafana/tempo/pull/3628) [#3691](https://github.com/grafana/tempo/pull/3691) (@mapno)
* [FEATURE] Flush and query RF1 blocks for TraceQL metric queries [#3628](https://github.com/grafana/tempo/pull/3628) [#3691](https://github.com/grafana/tempo/pull/3691) [#3723](https://github.com/grafana/tempo/pull/3723) (@mapno)
* [ENHANCEMENT] Tag value lookup use protobuf internally for improved latency [#3731](https://github.com/grafana/tempo/pull/3731) (@mdisibio)
* [ENHANCEMENT] Improve use of OTEL semantic conventions on the service graph [#3711](https://github.com/grafana/tempo/pull/3711) (@zalegrala)
* [ENHANCEMENT] Performance improvement for `rate() by ()` queries [#3719](https://github.com/grafana/tempo/pull/3719) (@mapno)
Expand Down
5 changes: 2 additions & 3 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,8 @@ func (t *App) initIngester() (services.Service, error) {
}

func (t *App) initGenerator() (services.Service, error) {
if t.cfg.Target == MetricsGenerator &&
t.cfg.StorageConfig.Trace.Backend != "" &&
t.cfg.Generator.Processor.LocalBlocks.FlushToStorage {
if t.cfg.Generator.Processor.LocalBlocks.FlushToStorage &&
t.store == nil {
return nil, fmt.Errorf("generator.processor.local-blocks.flush-to-storage is enabled but no storage backend is configured")
}

Expand Down
3 changes: 3 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,9 @@ query_frontend:
# If set to a non-zero value, it's value will be used to decide if query is within SLO or not.
# Query is within SLO if it returned 200 within duration_slo seconds OR processed throughput_slo bytes/s data.
[throughput_bytes_slo: <float> | default = 0 ]

# If set to true, TraceQL metric queries will use RF1 blocks built and flushed by the metrics-generator.
[rf1_read_path: <bool> | default = false]
```
## Querier
Expand Down
21 changes: 17 additions & 4 deletions docs/sources/tempo/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ server:
log_format: logfmt
log_level: info
log_source_ips_enabled: false
log_source_ips_full: false
log_source_ips_header: ""
log_source_ips_regex: ""
log_request_headers: false
Expand Down Expand Up @@ -129,6 +130,7 @@ internal_server:
log_format: logfmt
log_level: info
log_source_ips_enabled: false
log_source_ips_full: false
log_source_ips_header: ""
log_source_ips_regex: ""
log_request_headers: false
Expand Down Expand Up @@ -254,6 +256,9 @@ querier:
external_endpoints: []
trace_by_id:
query_timeout: 10s
metrics:
concurrent_blocks: 2
time_overlap_cutoff: 0.2
max_concurrent_queries: 20
frontend_worker:
frontend_address: 127.0.0.1:9095
Expand Down Expand Up @@ -285,11 +290,14 @@ querier:
connect_timeout: 0s
connect_backoff_base_delay: 0s
connect_backoff_max_delay: 0s
shuffle_sharding_ingesters_enabled: false
shuffle_sharding_ingesters_lookback_period: 1h0m0s
query_relevant_ingesters: false
query_frontend:
max_outstanding_per_tenant: 2000
querier_forget_delay: 0s
max_batch_size: 5
log_query_request_headers: ""
max_retries: 2
search:
concurrent_jobs: 1000
Expand All @@ -299,13 +307,14 @@ query_frontend:
max_duration: 168h0m0s
query_backend_after: 15m0s
query_ingesters_until: 30m0s
ingester_shards: 1
trace_by_id:
query_shards: 50
metrics:
concurrent_jobs: 1000
target_bytes_per_job: 104857600
max_duration: 0s
query_backend_after: 1h0m0s
max_duration: 3h0m0s
query_backend_after: 30m0s
interval: 5m0s
multi_tenant_queries_enabled: true
compactor:
Expand Down Expand Up @@ -410,7 +419,6 @@ ingester:
min_ready_duration: 15s
interface_names:
- en0
- bridge100
enable_inet6: false
final_sleep: 0s
tokens_file_path: ""
Expand Down Expand Up @@ -492,6 +500,7 @@ metrics_generator:
- db.name
- db.system
span_multiplier_key: ""
enable_virtual_node_label: false
span_metrics:
histogram_buckets:
- 0.002
Expand Down Expand Up @@ -550,8 +559,10 @@ metrics_generator:
max_block_bytes: 500000000
complete_block_timeout: 1h0m0s
max_live_traces: 0
concurrent_blocks: 10
filter_server_spans: true
flush_to_storage: false
concurrent_blocks: 10
time_overlap_cutoff: 0.2
registry:
collection_interval: 15s
stale_duration: 15m0s
Expand Down Expand Up @@ -620,6 +631,8 @@ storage:
blocklist_poll_stale_tenant_index: 0s
blocklist_poll_jitter_ms: 0
blocklist_poll_tolerate_consecutive_errors: 1
empty_tenant_deletion_enabled: false
empty_tenant_deletion_age: 0s
backend: local
local:
path: /var/tempo/traces
Expand Down
1 change: 1 addition & 0 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
Interval: 5 * time.Minute,
RF1ReadPath: false,
},
SLO: slo,
}
Expand Down
148 changes: 130 additions & 18 deletions modules/frontend/metrics_query_range_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import (
)

type queryRangeSharder struct {
next pipeline.AsyncRoundTripper[combiner.PipelineResponse]
reader tempodb.Reader
overrides overrides.Interface
cfg QueryRangeSharderConfig
logger log.Logger
next pipeline.AsyncRoundTripper[combiner.PipelineResponse]
reader tempodb.Reader
overrides overrides.Interface
cfg QueryRangeSharderConfig
logger log.Logger
replicationFactor uint32
}

type QueryRangeSharderConfig struct {
Expand All @@ -38,10 +39,15 @@ type QueryRangeSharderConfig struct {
MaxDuration time.Duration `yaml:"max_duration"`
QueryBackendAfter time.Duration `yaml:"query_backend_after,omitempty"`
Interval time.Duration `yaml:"interval,omitempty"`
RF1ReadPath bool `yaml:"rf1_read_path,omitempty"`
}

// newAsyncQueryRangeSharder creates a sharding middleware for search
func newAsyncQueryRangeSharder(reader tempodb.Reader, o overrides.Interface, cfg QueryRangeSharderConfig, logger log.Logger) pipeline.AsyncMiddleware[combiner.PipelineResponse] {
var replicationFactor uint32
if cfg.RF1ReadPath {
replicationFactor = 1
}
return pipeline.AsyncMiddlewareFunc[combiner.PipelineResponse](func(next pipeline.AsyncRoundTripper[combiner.PipelineResponse]) pipeline.AsyncRoundTripper[combiner.PipelineResponse] {
return queryRangeSharder{
next: next,
Expand All @@ -50,6 +56,8 @@ func newAsyncQueryRangeSharder(reader tempodb.Reader, o overrides.Interface, cfg

cfg: cfg,
logger: logger,

replicationFactor: replicationFactor,
}
})
}
Expand Down Expand Up @@ -110,7 +118,19 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (pipeline.Responses[combin
reqCh <- generatorReq
}

totalJobs, totalBlocks, totalBlockBytes := s.backendRequests(ctx, tenantID, r, *req, now, samplingRate, targetBytesPerRequest, interval, reqCh, nil)
var (
totalJobs, totalBlocks uint32
totalBlockBytes uint64
)
if s.cfg.RF1ReadPath {
totalJobs, totalBlocks, totalBlockBytes = s.backendRequests(ctx, tenantID, r, *req, now, samplingRate, targetBytesPerRequest, interval, reqCh)
} else {
totalJobs, totalBlocks, totalBlockBytes = s.shardedBackendRequests(ctx, tenantID, r, *req, now, samplingRate, targetBytesPerRequest, interval, reqCh, nil)
}

span.SetTag("totalJobs", totalJobs)
span.SetTag("totalBlocks", totalBlocks)
span.SetTag("totalBlockBytes", totalBlockBytes)

// send a job to communicate the search metrics. this is consumed by the combiner to calculate totalblocks/bytes/jobs
var jobMetricsResponse pipeline.Responses[combiner.PipelineResponse]
Expand Down Expand Up @@ -142,15 +162,16 @@ func (s *queryRangeSharder) blockMetas(start, end int64, tenantID string) []*bac
metas := make([]*backend.BlockMeta, 0, len(allMetas)/50) // divide by 50 for luck
for _, m := range allMetas {
if m.StartTime.UnixNano() <= end &&
m.EndTime.UnixNano() >= start {
m.EndTime.UnixNano() >= start &&
m.ReplicationFactor == s.replicationFactor {
metas = append(metas, m)
}
}

return metas
}

func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, now time.Time, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan *http.Request, _ func(error)) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) {
func (s *queryRangeSharder) shardedBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, now time.Time, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan *http.Request, _ func(error)) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) {
// request without start or end, search only in generator
if searchReq.Start == 0 || searchReq.End == 0 {
close(reqCh)
Expand Down Expand Up @@ -218,13 +239,13 @@ func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string
}

go func() {
s.buildBackendRequests(ctx, tenantID, parent, backendReq, samplingRate, targetBytesPerRequest, interval, reqCh, nil)
s.buildShardedBackendRequests(ctx, tenantID, parent, backendReq, samplingRate, targetBytesPerRequest, interval, reqCh)
}()

return
}

func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan *http.Request, _ func(error)) {
func (s *queryRangeSharder) buildShardedBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan *http.Request) {
defer close(reqCh)

var (
Expand Down Expand Up @@ -260,7 +281,6 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
shardR.End = thisEnd
shardR.ShardID = i
shardR.ShardCount = shards

httpReq := s.toUpstreamRequest(ctx, shardR, parent, tenantID)
if samplingRate != 1.0 {
shardR.ShardID *= uint32(1.0 / samplingRate)
Expand All @@ -283,6 +303,105 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
}
}

func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, now time.Time, _ float64, targetBytesPerRequest int, _ time.Duration, reqCh chan *http.Request) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) {
// request without start or end, search only in generator
if searchReq.Start == 0 || searchReq.End == 0 {
close(reqCh)
return
}

// Make a copy and limit to backend time range.
backendReq := searchReq
backendReq.Start, backendReq.End = s.backendRange(now, backendReq.Start, backendReq.End, s.cfg.QueryBackendAfter)
alignTimeRange(&backendReq)

// If empty window then no need to search backend
if backendReq.Start == backendReq.End {
close(reqCh)
return
}

// Blocks within overall time range. This is just for instrumentation, more precise time
// range is checked for each window.
blocks := s.blockMetas(int64(backendReq.Start), int64(backendReq.End), tenantID)
if len(blocks) == 0 {
// no need to search backend
close(reqCh)
return
}

// calculate metrics to return to the caller
totalBlocks = uint32(len(blocks))
for _, b := range blocks {
p := pagesPerRequest(b, targetBytesPerRequest)

totalJobs += b.TotalRecords / uint32(p)
if int(b.TotalRecords)%p != 0 {
totalJobs++
}
totalBlockBytes += b.Size
}

go func() {
s.buildBackendRequests(ctx, tenantID, parent, backendReq, blocks, targetBytesPerRequest, reqCh)
}()

return
}

func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, metas []*backend.BlockMeta, targetBytesPerRequest int, reqCh chan<- *http.Request) {
defer close(reqCh)

for _, m := range metas {
pages := pagesPerRequest(m, targetBytesPerRequest)
if pages == 0 {
continue
}

for startPage := 0; startPage < int(m.TotalRecords); startPage += pages {
subR := parent.Clone(ctx)

dc, err := m.DedicatedColumns.ToTempopb()
if err != nil {
// errFn(fmt.Errorf("failed to convert dedicated columns. block: %s tempopb: %w", blockID, err))
continue
}

queryRangeReq := &tempopb.QueryRangeRequest{
Query: searchReq.Query,
Start: max(searchReq.Start, uint64(m.StartTime.UnixNano())),
End: min(searchReq.End, uint64(m.EndTime.UnixNano())),
Step: searchReq.Step,
// ShardID: uint32, // No sharding with RF=1
// ShardCount: uint32, // No sharding with RF=1
QueryMode: searchReq.QueryMode,
// New RF1 fields
BlockID: m.BlockID.String(),
StartPage: uint32(startPage),
PagesToSearch: uint32(pages),
Version: m.Version,
Encoding: m.Encoding.String(),
Size_: m.Size,
FooterSize: m.FooterSize,
DedicatedColumns: dc,
}
alignTimeRange(queryRangeReq)
queryRangeReq.End += queryRangeReq.Step

subR = api.BuildQueryRangeRequest(subR, queryRangeReq)

prepareRequestForQueriers(subR, tenantID, subR.URL.Path, subR.URL.Query())
// TODO: Handle sampling rate

select {
case reqCh <- subR:
case <-ctx.Done():
return
}
}
}
}

func (s *queryRangeSharder) backendRange(now time.Time, start, end uint64, queryBackendAfter time.Duration) (uint64, uint64) {
backendAfter := uint64(now.Add(-queryBackendAfter).UnixNano())

Expand Down Expand Up @@ -319,13 +438,6 @@ func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest

searchReq.QueryMode = querier.QueryModeRecent

// No sharding on the generators (unnecessary), but we do apply sampling
// rates. In this case we execute a single arbitrary shard. Choosing
// the last shard works. The first shard should be avoided because it is
// weighted slightly off due to int63/128 sharding boundaries.
searchReq.ShardID = uint32(1.0 / samplingRate)
searchReq.ShardCount = uint32(1.0 / samplingRate)

return s.toUpstreamRequest(parent.Context(), searchReq, parent, tenantID)
}

Expand Down
8 changes: 2 additions & 6 deletions modules/frontend/search_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ type SearchSharderConfig struct {
IngesterShards int `yaml:"ingester_shards,omitempty"`
}

type backendReqMsg struct {
req *http.Request
err error
}

type asyncSearchSharder struct {
next pipeline.AsyncRoundTripper[combiner.PipelineResponse]
reader tempodb.Reader
Expand Down Expand Up @@ -147,7 +142,8 @@ func (s *asyncSearchSharder) blockMetas(start, end int64, tenantID string) []*ba
metas := make([]*backend.BlockMeta, 0, len(allMetas)/50) // divide by 50 for luck
for _, m := range allMetas {
if m.StartTime.Unix() <= end &&
m.EndTime.Unix() >= start {
m.EndTime.Unix() >= start &&
m.ReplicationFactor == backend.DefaultReplicationFactor { // This check skips generator blocks (RF=1)
metas = append(metas, m)
}
}
Expand Down
2 changes: 1 addition & 1 deletion modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ func (p *Processor) resetHeadBlock() error {
BlockID: uuid.New(),
TenantID: p.tenant,
DedicatedColumns: p.overrides.DedicatedColumns(p.tenant),
ReplicationFactor: 1,
ReplicationFactor: backend.MetricsGeneratorReplicationFactor,
}
block, err := p.wal.NewBlock(meta, model.CurrentEncoding)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
span.LogFields(ot_log.String("timeEnd", fmt.Sprint(timeEnd)))

opts := common.DefaultSearchOptionsWithMaxBytes(maxBytes)
opts.BlockReplicationFactor = backend.DefaultReplicationFactor
partialTraces, blockErrs, err := q.store.Find(ctx, userID, req.TraceID, req.BlockStart, req.BlockEnd, timeStart, timeEnd, opts)
if err != nil {
retErr := fmt.Errorf("error querying store in Querier.FindTraceByID: %w", err)
Expand Down
Loading

0 comments on commit b4514b9

Please sign in to comment.