Skip to content

Commit

Permalink
Add generic retry in query frontend (cortexproject#5561)
Browse files Browse the repository at this point in the history
* Add generic retry in query frontend

Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>

* Remove retry middleware

Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>

* Update Changelog

Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>

* Fail fast if context errors out

Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>

* Fix test

Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>

---------

Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
  • Loading branch information
harry671003 authored Sep 14, 2023
1 parent f560115 commit a897070
Show file tree
Hide file tree
Showing 14 changed files with 223 additions and 280 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
* [ENHANCEMENT] BasicLifeCycler: allow final-sleep during shutdown #5517
* [ENHANCEMENT] All: Handling CMK Access Denied errors. #5420 #5542
* [ENHANCEMENT] Querier: Retry store gateway client connection closing gRPC error. #5558
* [ENHANCEMENT] QueryFrontend: Add generic retry for all APIs. #5561.
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293
Expand Down
3 changes: 2 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,8 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
}

func (t *Cortex) initQueryFrontend() (serv services.Service, err error) {
roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(t.Cfg.Frontend, t.Overrides, t.Cfg.Server.GRPCListenPort, util_log.Logger, prometheus.DefaultRegisterer)
retry := transport.NewRetry(t.Cfg.QueryRange.MaxRetries, prometheus.DefaultRegisterer)
roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(t.Cfg.Frontend, t.Overrides, t.Cfg.Server.GRPCListenPort, util_log.Logger, prometheus.DefaultRegisterer, retry)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) {
// Returned RoundTripper can be wrapped in more round-tripper middlewares, and then eventually registered
// into HTTP server using the Handler from this package. Returned RoundTripper is always non-nil
// (if there are no errors), and it uses the returned frontend (if any).
func InitFrontend(cfg CombinedFrontendConfig, limits v1.Limits, grpcListenPort int, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, *v1.Frontend, *v2.Frontend, error) {
func InitFrontend(cfg CombinedFrontendConfig, limits v1.Limits, grpcListenPort int, log log.Logger, reg prometheus.Registerer, retry *transport.Retry) (http.RoundTripper, *v1.Frontend, *v2.Frontend, error) {
switch {
case cfg.DownstreamURL != "":
// If the user has specified a downstream Prometheus, then we should use that.
Expand All @@ -59,12 +59,12 @@ func InitFrontend(cfg CombinedFrontendConfig, limits v1.Limits, grpcListenPort i
cfg.FrontendV2.Port = grpcListenPort
}

fr, err := v2.NewFrontend(cfg.FrontendV2, log, reg)
fr, err := v2.NewFrontend(cfg.FrontendV2, log, reg, retry)
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), nil, fr, err

default:
// No scheduler = use original frontend.
fr, err := v1.New(cfg.FrontendV1, limits, log, reg)
fr, err := v1.New(cfg.FrontendV1, limits, log, reg, retry)
if err != nil {
return nil, nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand
httpListen, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

rt, v1, v2, err := InitFrontend(config, frontendv1.MockLimits{}, 0, logger, nil)
rt, v1, v2, err := InitFrontend(config, frontendv1.MockLimits{}, 0, logger, nil, transport.NewRetry(0, nil))
require.NoError(t, err)
require.NotNil(t, rt)
// v1 will be nil if DownstreamURL is defined.
Expand Down
56 changes: 56 additions & 0 deletions pkg/frontend/transport/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package transport

import (
"context"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
)

type Retry struct {
maxRetries int
retriesCount prometheus.Histogram
}

func NewRetry(maxRetries int, reg prometheus.Registerer) *Retry {
return &Retry{
maxRetries: maxRetries,
retriesCount: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "query_frontend_retries",
Help: "Number of times a request is retried.",
Buckets: []float64{0, 1, 2, 3, 4, 5},
}),
}
}

func (r *Retry) Do(ctx context.Context, f func() (*httpgrpc.HTTPResponse, error)) (*httpgrpc.HTTPResponse, error) {
if r.maxRetries == 0 {
// Retries are disabled. Try only once.
return f()
}

tries := 0
defer func() { r.retriesCount.Observe(float64(tries)) }()

var (
resp *httpgrpc.HTTPResponse
err error
)
for ; tries < r.maxRetries; tries++ {
if ctx.Err() != nil {
return nil, ctx.Err()
}

resp, err = f()
if err != nil && err != context.Canceled {
continue // Retryable
} else if resp != nil && resp.Code/100 == 5 {
continue // Retryable
} else {
break
}
}
return resp, err
}
31 changes: 31 additions & 0 deletions pkg/frontend/transport/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package transport

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"go.uber.org/atomic"
)

func TestRetry(t *testing.T) {
tries := atomic.NewInt64(3)
r := NewRetry(3, nil)
ctx := context.Background()
res, err := r.Do(ctx, func() (*httpgrpc.HTTPResponse, error) {
try := tries.Dec()
if try > 1 {
return &httpgrpc.HTTPResponse{
Code: 500,
}, nil
}
return &httpgrpc.HTTPResponse{
Code: 200,
}, nil

})

require.NoError(t, err)
require.Equal(t, int32(200), res.Code)
}
47 changes: 26 additions & 21 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/scheduler/queue"
Expand Down Expand Up @@ -66,6 +67,7 @@ type Frontend struct {
cfg Config
log log.Logger
limits Limits
retry *transport.Retry

requestQueue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService
Expand All @@ -92,11 +94,12 @@ type request struct {
}

// New creates a new frontend. Frontend implements service, and must be started and stopped.
func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) (*Frontend, error) {
func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer, retry *transport.Retry) (*Frontend, error) {
f := &Frontend{
cfg: cfg,
log: log,
limits: limits,
retry: retry,
queueLength: promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_query_frontend_queue_length",
Help: "Number of queries in the queue.",
Expand Down Expand Up @@ -173,31 +176,33 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
}
}

request := request{
request: req,
originalCtx: ctx,
return f.retry.Do(ctx, func() (*httpgrpc.HTTPResponse, error) {
request := request{
request: req,
originalCtx: ctx,

// Buffer of 1 to ensure response can be written by the server side
// of the Process stream, even if this goroutine goes away due to
// client context cancellation.
err: make(chan error, 1),
response: make(chan *httpgrpc.HTTPResponse, 1),
}
// Buffer of 1 to ensure response can be written by the server side
// of the Process stream, even if this goroutine goes away due to
// client context cancellation.
err: make(chan error, 1),
response: make(chan *httpgrpc.HTTPResponse, 1),
}

if err := f.queueRequest(ctx, &request); err != nil {
return nil, err
}
if err := f.queueRequest(ctx, &request); err != nil {
return nil, err
}

select {
case <-ctx.Done():
return nil, ctx.Err()
select {
case <-ctx.Done():
return nil, ctx.Err()

case resp := <-request.response:
return resp, nil
case resp := <-request.response:
return resp, nil

case err := <-request.err:
return nil, err
}
case err := <-request.err:
return nil, err
}
})
}

// Process allows backends to pull requests from the frontend.
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/v1/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a
require.NoError(t, err)

limits := MockLimits{MockLimits: queue.MockLimits{MaxOutstanding: 100}}
v1, err := New(config, limits, logger, reg)
v1, err := New(config, limits, logger, reg, transport.NewRetry(0, nil))
require.NoError(t, err)
require.NotNil(t, v1)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), v1))
Expand Down
3 changes: 2 additions & 1 deletion pkg/frontend/v1/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/weaveworks/common/user"
"google.golang.org/grpc/metadata"

"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/scheduler/queue"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand All @@ -25,7 +26,7 @@ func setupFrontend(t *testing.T, config Config) (*Frontend, error) {
logger := log.NewNopLogger()

limits := MockLimits{Queriers: 3, MockLimits: queue.MockLimits{MaxOutstanding: 100}}
frontend, err := New(config, limits, logger, nil)
frontend, err := New(config, limits, logger, nil, transport.NewRetry(0, nil))
require.NoError(t, err)

t.Cleanup(func() {
Expand Down
Loading

0 comments on commit a897070

Please sign in to comment.