Skip to content

Commit

Permalink
fix: guard against failed ingester requests for detected fields
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed Sep 20, 2024
1 parent f8d9143 commit fbf9018
Showing 1 changed file with 53 additions and 7 deletions.
60 changes: 53 additions & 7 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/tenant"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand Down Expand Up @@ -1076,7 +1077,7 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
if err != nil {
return nil, err
}
// just incject the header to categorize labels
// just inject the header to categorize labels
ctx = httpreq.InjectHeader(ctx, httpreq.LokiEncodingFlagsHeader, (string)(httpreq.FlagCategorizeLabels))
params := logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Expand All @@ -1091,14 +1092,59 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
},
}

iters, err := q.SelectLogs(ctx, params)
if err != nil {
return nil, err
ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End)
sp := opentracing.SpanFromContext(ctx)
iters := []iter.EntryIterator{}
errs := multierror.New()
if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil {
// Make a copy of the request before modifying
// because the initial request is used below to query stores
queryRequestCopy := *params.QueryRequest
newParams := logql.SelectLogParams{
QueryRequest: &queryRequestCopy,
}
newParams.Start = ingesterQueryInterval.start
newParams.End = ingesterQueryInterval.end
if sp != nil {
sp.LogKV(
"msg", "querying ingester for detected fields",
"params", newParams)
}
ingesterIters, err := q.ingesterQuerier.SelectLogs(ctx, newParams)
if err != nil {
errs.Add(err)
} else {
iters = append(iters, ingesterIters...)
}
}

if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil {
params.Start = storeQueryInterval.start
params.End = storeQueryInterval.end
if sp != nil {
sp.LogKV(
"msg", "querying store for detected fields",
"params", params)
}
storeIter, err := q.store.SelectLogs(ctx, params)
if err != nil {
errs.Add(err)
} else {
iters = append(iters, storeIter)
}
}

//return an empty response instead of an error, so if there are results in other splits, they can be returned
if len(iters) == 0 {
level.Warn(q.logger).Log("msg", "no detected field results from store or ingester", "error", errs.Err())
return &logproto.DetectedFieldsResponse{
Fields: []*logproto.DetectedField{},
FieldLimit: req.GetFieldLimit(),
}, nil
}

// TODO(twhitney): converting from a step to a duration should be abstracted and reused,
// doing this in a few places now.
streams, err := streamsForFieldDetection(iters, req.LineLimit)
mergedIter := iter.NewMergeEntryIterator(ctx, iters, logproto.BACKWARD)
streams, err := streamsForFieldDetection(mergedIter, req.LineLimit)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit fbf9018

Please sign in to comment.