Skip to content

Commit

Permalink
fix: add a retry middleware to all the stats handlers (#13584)
Browse files Browse the repository at this point in the history
Signed-off-by: Edward Welch <[email protected]>
  • Loading branch information
slim-bean authored Jul 19, 2024
1 parent 3a1a3a2 commit 7232795
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 25 deletions.
24 changes: 24 additions & 0 deletions pkg/querier/queryrange/queryrangebase/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package queryrangebase

import (
"context"
"reflect"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -81,20 +82,28 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) {
query := req.GetQuery()

for ; tries < r.maxRetries; tries++ {
// Make sure the context isn't done before sending the request
if ctx.Err() != nil {
return nil, ctx.Err()
}

resp, err := r.next.Do(ctx, req)
if err == nil {
return resp, nil
}

// Make sure the context isn't done before retrying the request
if ctx.Err() != nil {
return nil, ctx.Err()
}

// Retry if we get a HTTP 500 or an unknown error.
if code := grpcutil.ErrorToStatusCode(err); code == codes.Unknown || code/100 == 5 {
lastErr = err
level.Error(util_log.WithContext(ctx, r.log)).Log(
"msg", "error processing request",
"try", tries,
"type", logImplementingType(req),
"query", query,
"query_hash", util.HashedQuery(query),
"start", start.Format(time.RFC3339Nano),
Expand All @@ -113,3 +122,18 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) {
}
return nil, lastErr
}

func logImplementingType(i Request) string {
if i == nil {
return "nil"
}

t := reflect.TypeOf(i)

// Check if it's a pointer and get the underlying type if so
if t.Kind() == reflect.Ptr {
t = t.Elem()
}

return t.String()
}
23 changes: 14 additions & 9 deletions pkg/querier/queryrange/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func NewQueryShardMiddleware(
limits Limits,
maxShards int,
statsHandler queryrangebase.Handler,
retryNextHandler queryrangebase.Handler,
shardAggregation []string,
) queryrangebase.Middleware {
noshards := !hasShards(confs)
Expand All @@ -56,7 +57,7 @@ func NewQueryShardMiddleware(
}

mapperware := queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
return newASTMapperware(confs, engineOpts, next, statsHandler, logger, shardingMetrics, limits, maxShards, shardAggregation)
return newASTMapperware(confs, engineOpts, next, retryNextHandler, statsHandler, logger, shardingMetrics, limits, maxShards, shardAggregation)
})

return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
Expand All @@ -76,6 +77,7 @@ func newASTMapperware(
confs ShardingConfigs,
engineOpts logql.EngineOpts,
next queryrangebase.Handler,
retryNextHandler queryrangebase.Handler,
statsHandler queryrangebase.Handler,
logger log.Logger,
metrics *logql.MapperMetrics,
Expand All @@ -88,6 +90,7 @@ func newASTMapperware(
logger: log.With(logger, "middleware", "QueryShard.astMapperware"),
limits: limits,
next: next,
retryNextHandler: retryNextHandler,
statsHandler: next,
ng: logql.NewDownstreamEngine(engineOpts, DownstreamHandler{next: next, limits: limits}, limits, logger),
metrics: metrics,
Expand All @@ -103,14 +106,15 @@ func newASTMapperware(
}

type astMapperware struct {
confs ShardingConfigs
logger log.Logger
limits Limits
next queryrangebase.Handler
statsHandler queryrangebase.Handler
ng *logql.DownstreamEngine
metrics *logql.MapperMetrics
maxShards int
confs ShardingConfigs
logger log.Logger
limits Limits
next queryrangebase.Handler
retryNextHandler queryrangebase.Handler
statsHandler queryrangebase.Handler
ng *logql.DownstreamEngine
metrics *logql.MapperMetrics
maxShards int

// Feature flag for sharding range and vector aggregations such as
// quantile_ver_time with probabilistic data structures.
Expand Down Expand Up @@ -191,6 +195,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que
ast.maxShards,
r,
ast.statsHandler,
ast.retryNextHandler,
ast.next,
ast.limits,
)
Expand Down
6 changes: 6 additions & 0 deletions pkg/querier/queryrange/querysharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func Test_astMapper(t *testing.T) {
},
testEngineOpts,
handler,
handler,
nil,
log.NewNopLogger(),
nilShardingMetrics,
Expand Down Expand Up @@ -307,6 +308,7 @@ func Test_astMapper_QuerySizeLimits(t *testing.T) {
},
testEngineOpts,
handler,
handler,
nil,
log.NewNopLogger(),
nilShardingMetrics,
Expand Down Expand Up @@ -352,6 +354,7 @@ func Test_ShardingByPass(t *testing.T) {
},
testEngineOpts,
handler,
handler,
nil,
log.NewNopLogger(),
nilShardingMetrics,
Expand Down Expand Up @@ -439,6 +442,7 @@ func Test_InstantSharding(t *testing.T) {
},
0,
nil,
nil,
[]string{},
)
response, err := sharding.Wrap(queryrangebase.HandlerFunc(func(c context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
Expand Down Expand Up @@ -718,6 +722,7 @@ func TestShardingAcrossConfigs_ASTMapper(t *testing.T) {
confs,
testEngineOpts,
handler,
handler,
nil,
log.NewNopLogger(),
nilShardingMetrics,
Expand Down Expand Up @@ -853,6 +858,7 @@ func Test_ASTMapper_MaxLookBackPeriod(t *testing.T) {
testSchemasTSDB,
engineOpts,
queryHandler,
queryHandler,
statsHandler,
log.NewNopLogger(),
nilShardingMetrics,
Expand Down
32 changes: 32 additions & 0 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ func NewMiddleware(
func NewDetectedLabelsTripperware(cfg Config, opts logql.EngineOpts, logger log.Logger, l Limits, schema config.SchemaConfig, metrics *Metrics, mw base.Middleware, namespace string, merger base.Merger, limits Limits, iqo util.IngesterQueryOptions) (base.Middleware, error) {
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := mw.Wrap(next)
if cfg.MaxRetries > 0 {
rm := base.NewRetryMiddleware(logger, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, namespace)
statsHandler = rm.Wrap(statsHandler)
}
splitter := newDefaultSplitter(limits, iqo)

queryRangeMiddleware := []base.Middleware{
Expand Down Expand Up @@ -553,6 +557,12 @@ func getOperation(path string) string {
func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, iqo util.IngesterQueryOptions, c cache.Cache, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) {
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := indexStatsTripperware.Wrap(next)
retryNextHandler := next
if cfg.MaxRetries > 0 {
rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace)
statsHandler = rm.Wrap(statsHandler)
retryNextHandler = rm.Wrap(next)
}

queryRangeMiddleware := []base.Middleware{
QueryMetricsMiddleware(metrics.QueryMetrics),
Expand Down Expand Up @@ -592,6 +602,7 @@ func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Lo
limits,
0, // 0 is unlimited shards
statsHandler,
retryNextHandler,
cfg.ShardAggregations,
),
)
Expand All @@ -618,6 +629,12 @@ func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Lo
func NewLimitedTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, merger base.Merger, iqo util.IngesterQueryOptions, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) {
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := indexStatsTripperware.Wrap(next)
retryNextHandler := next
if cfg.MaxRetries > 0 {
rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace)
statsHandler = rm.Wrap(statsHandler)
retryNextHandler = rm.Wrap(next)
}

queryRangeMiddleware := []base.Middleware{
StatsCollectorMiddleware(),
Expand All @@ -639,6 +656,7 @@ func NewLimitedTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logg
// and overwhelming the frontend, therefore we fix the number of shards to prevent this.
32, // 0 is unlimited shards
statsHandler,
retryNextHandler,
cfg.ShardAggregations,
),
)
Expand Down Expand Up @@ -854,6 +872,12 @@ func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logge

return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := indexStatsTripperware.Wrap(next)
retryNextHandler := next
if cfg.MaxRetries > 0 {
rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace)
statsHandler = rm.Wrap(statsHandler)
retryNextHandler = rm.Wrap(next)
}

queryRangeMiddleware := []base.Middleware{
QueryMetricsMiddleware(metrics.QueryMetrics),
Expand Down Expand Up @@ -895,6 +919,7 @@ func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logge
limits,
0, // 0 is unlimited shards
statsHandler,
retryNextHandler,
cfg.ShardAggregations,
),
)
Expand Down Expand Up @@ -976,6 +1001,12 @@ func NewInstantMetricTripperware(

return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := indexStatsTripperware.Wrap(next)
retryNextHandler := next
if cfg.MaxRetries > 0 {
rm := base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace)
statsHandler = rm.Wrap(statsHandler)
retryNextHandler = rm.Wrap(next)
}

queryRangeMiddleware := []base.Middleware{
StatsCollectorMiddleware(),
Expand Down Expand Up @@ -1003,6 +1034,7 @@ func NewInstantMetricTripperware(
limits,
0, // 0 is unlimited shards
statsHandler,
retryNextHandler,
cfg.ShardAggregations,
),
)
Expand Down
35 changes: 19 additions & 16 deletions pkg/querier/queryrange/shard_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,22 @@ func shardResolverForConf(
maxParallelism int,
maxShards int,
r queryrangebase.Request,
statsHandler, next queryrangebase.Handler,
statsHandler, next, retryNext queryrangebase.Handler,
limits Limits,
) (logql.ShardResolver, bool) {
if conf.IndexType == types.TSDBType {
return &dynamicShardResolver{
ctx: ctx,
logger: logger,
statsHandler: statsHandler,
next: next,
limits: limits,
from: model.Time(r.GetStart().UnixMilli()),
through: model.Time(r.GetEnd().UnixMilli()),
maxParallelism: maxParallelism,
maxShards: maxShards,
defaultLookback: defaultLookback,
ctx: ctx,
logger: logger,
statsHandler: statsHandler,
retryNextHandler: retryNext,
next: next,
limits: limits,
from: model.Time(r.GetStart().UnixMilli()),
through: model.Time(r.GetEnd().UnixMilli()),
maxParallelism: maxParallelism,
maxShards: maxShards,
defaultLookback: defaultLookback,
}, true
}
if conf.RowShards < 2 {
Expand All @@ -64,10 +65,11 @@ type dynamicShardResolver struct {
ctx context.Context
// TODO(owen-d): shouldn't have to fork handlers here -- one should just transparently handle the right logic
// depending on the underlying type?
statsHandler queryrangebase.Handler // index stats handler (hooked up to results cache, etc)
next queryrangebase.Handler // next handler in the chain (used for non-stats reqs)
logger log.Logger
limits Limits
statsHandler queryrangebase.Handler // index stats handler (hooked up to results cache, etc)
retryNextHandler queryrangebase.Handler // next handler wrapped with retries
next queryrangebase.Handler // next handler in the chain (used for non-stats reqs)
logger log.Logger
limits Limits

from, through model.Time
maxParallelism int
Expand Down Expand Up @@ -251,7 +253,8 @@ func (r *dynamicShardResolver) ShardingRanges(expr syntax.Expr, targetBytesPerSh
exprStr := expr.String()
// try to get shards for the given expression
// if it fails, fallback to linearshards based on stats
resp, err := r.next.Do(r.ctx, &logproto.ShardsRequest{
// use the retry handler here to retry transient errors
resp, err := r.retryNextHandler.Do(r.ctx, &logproto.ShardsRequest{
From: adjustedFrom,
Through: r.through,
Query: expr.String(),
Expand Down

0 comments on commit 7232795

Please sign in to comment.