Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read path for RF1 generator blocks [3 of 3] #3723

Merged
merged 8 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
## main / unreleased

* [FEATURE] TraceQL support for event scope and event:name intrinsic [#3708](https://github.com/grafana/tempo/pull/3708) (@stoewer)
* [FEATURE] Flush blocks to storage from the metrics-generator [#3628](https://github.com/grafana/tempo/pull/3628) [#3691](https://github.com/grafana/tempo/pull/3691) (@mapno)
* [FEATURE] Flush and query RF1 blocks for TraceQL metric queries [#3628](https://github.com/grafana/tempo/pull/3628) [#3691](https://github.com/grafana/tempo/pull/3691) [#3723](https://github.com/grafana/tempo/pull/3723) (@mapno)
* [ENHANCEMENT] Tag value lookup use protobuf internally for improved latency [#3731](https://github.com/grafana/tempo/pull/3731) (@mdisibio)
* [ENHANCEMENT] Improve use of OTEL semantic conventions on the service graph [#3711](https://github.com/grafana/tempo/pull/3711) (@zalegrala)
* [ENHANCEMENT] Performance improvement for `rate() by ()` queries [#3719](https://github.com/grafana/tempo/pull/3719) (@mapno)
Expand Down
5 changes: 2 additions & 3 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,8 @@ func (t *App) initIngester() (services.Service, error) {
}

func (t *App) initGenerator() (services.Service, error) {
if t.cfg.Target == MetricsGenerator &&
t.cfg.StorageConfig.Trace.Backend != "" &&
t.cfg.Generator.Processor.LocalBlocks.FlushToStorage {
if t.cfg.Generator.Processor.LocalBlocks.FlushToStorage &&
t.store == nil {
return nil, fmt.Errorf("generator.processor.local-blocks.flush-to-storage is enabled but no storage backend is configured")
}

Expand Down
3 changes: 3 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,9 @@ query_frontend:
# If set to a non-zero value, it's value will be used to decide if query is within SLO or not.
# Query is within SLO if it returned 200 within duration_slo seconds OR processed throughput_slo bytes/s data.
[throughput_bytes_slo: <float> | default = 0 ]

# If set to true, TraceQL metric queries will use RF1 blocks built and flushed by the metrics-generator.
[rf1_read_path: <bool> | default = false]
```

## Querier
Expand Down
21 changes: 17 additions & 4 deletions docs/sources/tempo/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ server:
log_format: logfmt
log_level: info
log_source_ips_enabled: false
log_source_ips_full: false
log_source_ips_header: ""
log_source_ips_regex: ""
log_request_headers: false
Expand Down Expand Up @@ -129,6 +130,7 @@ internal_server:
log_format: logfmt
log_level: info
log_source_ips_enabled: false
log_source_ips_full: false
log_source_ips_header: ""
log_source_ips_regex: ""
log_request_headers: false
Expand Down Expand Up @@ -254,6 +256,9 @@ querier:
external_endpoints: []
trace_by_id:
query_timeout: 10s
metrics:
concurrent_blocks: 2
time_overlap_cutoff: 0.2
max_concurrent_queries: 20
frontend_worker:
frontend_address: 127.0.0.1:9095
Expand Down Expand Up @@ -285,11 +290,14 @@ querier:
connect_timeout: 0s
connect_backoff_base_delay: 0s
connect_backoff_max_delay: 0s
shuffle_sharding_ingesters_enabled: false
shuffle_sharding_ingesters_lookback_period: 1h0m0s
query_relevant_ingesters: false
query_frontend:
max_outstanding_per_tenant: 2000
querier_forget_delay: 0s
max_batch_size: 5
log_query_request_headers: ""
max_retries: 2
search:
concurrent_jobs: 1000
Expand All @@ -299,13 +307,14 @@ query_frontend:
max_duration: 168h0m0s
query_backend_after: 15m0s
query_ingesters_until: 30m0s
ingester_shards: 1
trace_by_id:
query_shards: 50
metrics:
concurrent_jobs: 1000
target_bytes_per_job: 104857600
max_duration: 0s
query_backend_after: 1h0m0s
max_duration: 3h0m0s
query_backend_after: 30m0s
interval: 5m0s
multi_tenant_queries_enabled: true
compactor:
Expand Down Expand Up @@ -410,7 +419,6 @@ ingester:
min_ready_duration: 15s
interface_names:
- en0
- bridge100
enable_inet6: false
final_sleep: 0s
tokens_file_path: ""
Expand Down Expand Up @@ -492,6 +500,7 @@ metrics_generator:
- db.name
- db.system
span_multiplier_key: ""
enable_virtual_node_label: false
span_metrics:
histogram_buckets:
- 0.002
Expand Down Expand Up @@ -550,8 +559,10 @@ metrics_generator:
max_block_bytes: 500000000
complete_block_timeout: 1h0m0s
max_live_traces: 0
concurrent_blocks: 10
filter_server_spans: true
flush_to_storage: false
concurrent_blocks: 10
time_overlap_cutoff: 0.2
registry:
collection_interval: 15s
stale_duration: 15m0s
Expand Down Expand Up @@ -620,6 +631,8 @@ storage:
blocklist_poll_stale_tenant_index: 0s
blocklist_poll_jitter_ms: 0
blocklist_poll_tolerate_consecutive_errors: 1
empty_tenant_deletion_enabled: false
empty_tenant_deletion_age: 0s
backend: local
local:
path: /var/tempo/traces
Expand Down
1 change: 1 addition & 0 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
Interval: 5 * time.Minute,
RF1ReadPath: false,
},
SLO: slo,
}
Expand Down
148 changes: 130 additions & 18 deletions modules/frontend/metrics_query_range_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import (
)

type queryRangeSharder struct {
next pipeline.AsyncRoundTripper[combiner.PipelineResponse]
reader tempodb.Reader
overrides overrides.Interface
cfg QueryRangeSharderConfig
logger log.Logger
next pipeline.AsyncRoundTripper[combiner.PipelineResponse]
reader tempodb.Reader
overrides overrides.Interface
cfg QueryRangeSharderConfig
logger log.Logger
replicationFactor uint32
}

type QueryRangeSharderConfig struct {
Expand All @@ -38,10 +39,15 @@ type QueryRangeSharderConfig struct {
MaxDuration time.Duration `yaml:"max_duration"`
QueryBackendAfter time.Duration `yaml:"query_backend_after,omitempty"`
Interval time.Duration `yaml:"interval,omitempty"`
RF1ReadPath bool `yaml:"rf1_read_path,omitempty"`
}

// newAsyncQueryRangeSharder creates a sharding middleware for search
func newAsyncQueryRangeSharder(reader tempodb.Reader, o overrides.Interface, cfg QueryRangeSharderConfig, logger log.Logger) pipeline.AsyncMiddleware[combiner.PipelineResponse] {
var replicationFactor uint32
if cfg.RF1ReadPath {
replicationFactor = 1
}
return pipeline.AsyncMiddlewareFunc[combiner.PipelineResponse](func(next pipeline.AsyncRoundTripper[combiner.PipelineResponse]) pipeline.AsyncRoundTripper[combiner.PipelineResponse] {
return queryRangeSharder{
next: next,
Expand All @@ -50,6 +56,8 @@ func newAsyncQueryRangeSharder(reader tempodb.Reader, o overrides.Interface, cfg

cfg: cfg,
logger: logger,

replicationFactor: replicationFactor,
}
})
}
Expand Down Expand Up @@ -110,7 +118,19 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (pipeline.Responses[combin
reqCh <- generatorReq
}

totalJobs, totalBlocks, totalBlockBytes := s.backendRequests(ctx, tenantID, r, *req, now, samplingRate, targetBytesPerRequest, interval, reqCh, nil)
var (
totalJobs, totalBlocks uint32
totalBlockBytes uint64
)
if s.cfg.RF1ReadPath {
totalJobs, totalBlocks, totalBlockBytes = s.backendRequests(ctx, tenantID, r, *req, now, samplingRate, targetBytesPerRequest, interval, reqCh)
} else {
totalJobs, totalBlocks, totalBlockBytes = s.shardedBackendRequests(ctx, tenantID, r, *req, now, samplingRate, targetBytesPerRequest, interval, reqCh, nil)
}

span.SetTag("totalJobs", totalJobs)
span.SetTag("totalBlocks", totalBlocks)
span.SetTag("totalBlockBytes", totalBlockBytes)

// send a job to communicate the search metrics. this is consumed by the combiner to calculate totalblocks/bytes/jobs
var jobMetricsResponse pipeline.Responses[combiner.PipelineResponse]
Expand Down Expand Up @@ -142,15 +162,16 @@ func (s *queryRangeSharder) blockMetas(start, end int64, tenantID string) []*bac
metas := make([]*backend.BlockMeta, 0, len(allMetas)/50) // divide by 50 for luck
for _, m := range allMetas {
if m.StartTime.UnixNano() <= end &&
m.EndTime.UnixNano() >= start {
m.EndTime.UnixNano() >= start &&
m.ReplicationFactor == s.replicationFactor {
metas = append(metas, m)
}
}

return metas
}

func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, now time.Time, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan *http.Request, _ func(error)) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) {
func (s *queryRangeSharder) shardedBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, now time.Time, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan *http.Request, _ func(error)) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) {
// request without start or end, search only in generator
if searchReq.Start == 0 || searchReq.End == 0 {
close(reqCh)
Expand Down Expand Up @@ -218,13 +239,13 @@ func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string
}

go func() {
s.buildBackendRequests(ctx, tenantID, parent, backendReq, samplingRate, targetBytesPerRequest, interval, reqCh, nil)
s.buildShardedBackendRequests(ctx, tenantID, parent, backendReq, samplingRate, targetBytesPerRequest, interval, reqCh)
}()

return
}

func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan *http.Request, _ func(error)) {
func (s *queryRangeSharder) buildShardedBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan *http.Request) {
defer close(reqCh)

var (
Expand Down Expand Up @@ -260,7 +281,6 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
shardR.End = thisEnd
shardR.ShardID = i
shardR.ShardCount = shards

httpReq := s.toUpstreamRequest(ctx, shardR, parent, tenantID)
if samplingRate != 1.0 {
shardR.ShardID *= uint32(1.0 / samplingRate)
Expand All @@ -283,6 +303,105 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
}
}

func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, now time.Time, _ float64, targetBytesPerRequest int, _ time.Duration, reqCh chan *http.Request) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) {
// request without start or end, search only in generator
if searchReq.Start == 0 || searchReq.End == 0 {
close(reqCh)
return
}

// Make a copy and limit to backend time range.
backendReq := searchReq
backendReq.Start, backendReq.End = s.backendRange(now, backendReq.Start, backendReq.End, s.cfg.QueryBackendAfter)
alignTimeRange(&backendReq)

// If empty window then no need to search backend
if backendReq.Start == backendReq.End {
close(reqCh)
return
}

// Blocks within overall time range. This is just for instrumentation, more precise time
// range is checked for each window.
blocks := s.blockMetas(int64(backendReq.Start), int64(backendReq.End), tenantID)
if len(blocks) == 0 {
// no need to search backend
close(reqCh)
return
}

// calculate metrics to return to the caller
totalBlocks = uint32(len(blocks))
for _, b := range blocks {
p := pagesPerRequest(b, targetBytesPerRequest)

totalJobs += b.TotalRecords / uint32(p)
if int(b.TotalRecords)%p != 0 {
totalJobs++
}
totalBlockBytes += b.Size
}

go func() {
s.buildBackendRequests(ctx, tenantID, parent, backendReq, blocks, targetBytesPerRequest, reqCh)
}()

return
}

func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, metas []*backend.BlockMeta, targetBytesPerRequest int, reqCh chan<- *http.Request) {
defer close(reqCh)

for _, m := range metas {
pages := pagesPerRequest(m, targetBytesPerRequest)
if pages == 0 {
continue
}

for startPage := 0; startPage < int(m.TotalRecords); startPage += pages {
subR := parent.Clone(ctx)

dc, err := m.DedicatedColumns.ToTempopb()
if err != nil {
// errFn(fmt.Errorf("failed to convert dedicated columns. block: %s tempopb: %w", blockID, err))
continue
}

queryRangeReq := &tempopb.QueryRangeRequest{
Query: searchReq.Query,
Start: max(searchReq.Start, uint64(m.StartTime.UnixNano())),
End: min(searchReq.End, uint64(m.EndTime.UnixNano())),
Step: searchReq.Step,
// ShardID: uint32, // No sharding with RF=1
// ShardCount: uint32, // No sharding with RF=1
QueryMode: searchReq.QueryMode,
// New RF1 fields
BlockID: m.BlockID.String(),
StartPage: uint32(startPage),
PagesToSearch: uint32(pages),
Version: m.Version,
Encoding: m.Encoding.String(),
Size_: m.Size,
FooterSize: m.FooterSize,
DedicatedColumns: dc,
}
alignTimeRange(queryRangeReq)
queryRangeReq.End += queryRangeReq.Step

subR = api.BuildQueryRangeRequest(subR, queryRangeReq)

prepareRequestForQueriers(subR, tenantID, subR.URL.Path, subR.URL.Query())
// TODO: Handle sampling rate

select {
case reqCh <- subR:
case <-ctx.Done():
return
}
}
}
}

func (s *queryRangeSharder) backendRange(now time.Time, start, end uint64, queryBackendAfter time.Duration) (uint64, uint64) {
backendAfter := uint64(now.Add(-queryBackendAfter).UnixNano())

Expand Down Expand Up @@ -319,13 +438,6 @@ func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest

searchReq.QueryMode = querier.QueryModeRecent

// No sharding on the generators (unnecessary), but we do apply sampling
// rates. In this case we execute a single arbitrary shard. Choosing
// the last shard works. The first shard should be avoided because it is
// weighted slightly off due to int63/128 sharding boundaries.
searchReq.ShardID = uint32(1.0 / samplingRate)
searchReq.ShardCount = uint32(1.0 / samplingRate)

return s.toUpstreamRequest(parent.Context(), searchReq, parent, tenantID)
}

Expand Down
8 changes: 2 additions & 6 deletions modules/frontend/search_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ type SearchSharderConfig struct {
IngesterShards int `yaml:"ingester_shards,omitempty"`
}

type backendReqMsg struct {
req *http.Request
err error
}

type asyncSearchSharder struct {
next pipeline.AsyncRoundTripper[combiner.PipelineResponse]
reader tempodb.Reader
Expand Down Expand Up @@ -147,7 +142,8 @@ func (s *asyncSearchSharder) blockMetas(start, end int64, tenantID string) []*ba
metas := make([]*backend.BlockMeta, 0, len(allMetas)/50) // divide by 50 for luck
for _, m := range allMetas {
if m.StartTime.Unix() <= end &&
m.EndTime.Unix() >= start {
m.EndTime.Unix() >= start &&
m.ReplicationFactor == backend.DefaultReplicationFactor { // This check skips generator blocks (RF=1)
metas = append(metas, m)
}
}
Expand Down
2 changes: 1 addition & 1 deletion modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ func (p *Processor) resetHeadBlock() error {
BlockID: uuid.New(),
TenantID: p.tenant,
DedicatedColumns: p.overrides.DedicatedColumns(p.tenant),
ReplicationFactor: 1,
ReplicationFactor: backend.MetricsGeneratorReplicationFactor,
}
block, err := p.wal.NewBlock(meta, model.CurrentEncoding)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
span.LogFields(ot_log.String("timeEnd", fmt.Sprint(timeEnd)))

opts := common.DefaultSearchOptionsWithMaxBytes(maxBytes)
opts.BlockReplicationFactor = backend.DefaultReplicationFactor
partialTraces, blockErrs, err := q.store.Find(ctx, userID, req.TraceID, req.BlockStart, req.BlockEnd, timeStart, timeEnd, opts)
if err != nil {
retErr := fmt.Errorf("error querying store in Querier.FindTraceByID: %w", err)
Expand Down
Loading
Loading