From fbf9018fb0f574fec30d82cfb9209a3f18844e01 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Fri, 20 Sep 2024 09:39:33 -0600 Subject: [PATCH] fix: guard against failed ingester requests for detected fields --- pkg/querier/querier.go | 60 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 53 insertions(+), 7 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 997a4bf7731c..55e3fe3ec29b 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/log/level" "github.com/google/uuid" "github.com/grafana/dskit/httpgrpc" + "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/tenant" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -1076,7 +1077,7 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto. if err != nil { return nil, err } - // just incject the header to categorize labels + // just inject the header to categorize labels ctx = httpreq.InjectHeader(ctx, httpreq.LokiEncodingFlagsHeader, (string)(httpreq.FlagCategorizeLabels)) params := logql.SelectLogParams{ QueryRequest: &logproto.QueryRequest{ @@ -1091,14 +1092,59 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto. }, } - iters, err := q.SelectLogs(ctx, params) - if err != nil { - return nil, err + ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End) + sp := opentracing.SpanFromContext(ctx) + iters := []iter.EntryIterator{} + errs := multierror.New() + if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil { + // Make a copy of the request before modifying + // because the initial request is used below to query stores + queryRequestCopy := *params.QueryRequest + newParams := logql.SelectLogParams{ + QueryRequest: &queryRequestCopy, + } + newParams.Start = ingesterQueryInterval.start + newParams.End = ingesterQueryInterval.end + if sp != nil { + sp.LogKV( + "msg", "querying ingester for detected fields", + "params", newParams) + } + ingesterIters, err := q.ingesterQuerier.SelectLogs(ctx, newParams) + if err != nil { + errs.Add(err) + } else { + iters = append(iters, ingesterIters...) + } + } + + if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil { + params.Start = storeQueryInterval.start + params.End = storeQueryInterval.end + if sp != nil { + sp.LogKV( + "msg", "querying store for detected fields", + "params", params) + } + storeIter, err := q.store.SelectLogs(ctx, params) + if err != nil { + errs.Add(err) + } else { + iters = append(iters, storeIter) + } + } + + //return an empty response instead of an error, so if there are results in other splits, they can be returned + if len(iters) == 0 { + level.Warn(q.logger).Log("msg", "no detected field results from store or ingester", "error", errs.Err()) + return &logproto.DetectedFieldsResponse{ + Fields: []*logproto.DetectedField{}, + FieldLimit: req.GetFieldLimit(), + }, nil } - // TODO(twhitney): converting from a step to a duration should be abstracted and reused, - // doing this in a few places now. - streams, err := streamsForFieldDetection(iters, req.LineLimit) + mergedIter := iter.NewMergeEntryIterator(ctx, iters, logproto.BACKWARD) + streams, err := streamsForFieldDetection(mergedIter, req.LineLimit) if err != nil { return nil, err }