Skip to content

Commit

Permalink
Retry "non retryable" error on worker long poll (#1034)
Browse files Browse the repository at this point in the history
Retry "non retryable" error on worker long poll
  • Loading branch information
Quinn-With-Two-Ns authored Feb 9, 2023
1 parent ac60814 commit f554827
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 6 deletions.
7 changes: 7 additions & 0 deletions internal/common/backoff/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ func (c *ConcurrentRetrier) Throttle(doneCh <-chan struct{}) {
c.throttleInternal(doneCh)
}

// GetElapsedTime gets the amount of time since that last ConcurrentRetrier.Succeeded call
func (c *ConcurrentRetrier) GetElapsedTime() time.Duration {
c.Lock()
defer c.Unlock()
return c.retrier.GetElapsedTime()
}

func (c *ConcurrentRetrier) throttleInternal(doneCh <-chan struct{}) time.Duration {
next := done

Expand Down
7 changes: 5 additions & 2 deletions internal/common/backoff/retrypolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type (

// Retrier manages the state of retry operation
Retrier interface {
GetElapsedTime() time.Duration
NextBackOff() time.Duration
Reset()
}
Expand Down Expand Up @@ -195,13 +196,15 @@ func (r *retrierImpl) Reset() {

// NextBackOff returns the next delay interval. This is used by Retry to delay calling the operation again
func (r *retrierImpl) NextBackOff() time.Duration {
nextInterval := r.policy.ComputeNextDelay(r.getElapsedTime(), r.currentAttempt)
nextInterval := r.policy.ComputeNextDelay(r.GetElapsedTime(), r.currentAttempt)

// Now increment the current attempt
r.currentAttempt++
return nextInterval
}

func (r *retrierImpl) getElapsedTime() time.Duration {
// GetElapsedTime returns the amount of time since the retrier was created or the last reset,
// whatever was sooner.
func (r *retrierImpl) GetElapsedTime() time.Duration {
return r.clock.Now().Sub(r.startTime)
}
16 changes: 15 additions & 1 deletion internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
var (
pollOperationRetryPolicy = createPollRetryPolicy()
pollResourceExhaustedRetryPolicy = createPollResourceExhaustedRetryPolicy()
retryLongPollGracePeriod = 2 * time.Minute
)

var errStop = errors.New("worker stopping")
Expand Down Expand Up @@ -193,6 +194,17 @@ type (
}
)

// SetRetryLongPollGracePeriod sets the amount of time a long poller retrys on
// fatal errors before it actually fails. For test use only,
// not safe to call with a running worker.
func SetRetryLongPollGracePeriod(period time.Duration) {
retryLongPollGracePeriod = period
}

func getRetryLongPollGracePeriod() time.Duration {
return retryLongPollGracePeriod
}

func createPollRetryPolicy() backoff.RetryPolicy {
policy := backoff.NewExponentialRetryPolicy(retryPollOperationInitialInterval)
policy.SetMaximumInterval(retryPollOperationMaxInterval)
Expand Down Expand Up @@ -332,7 +344,9 @@ func (bw *baseWorker) pollTask() {
task, err = bw.options.taskWorker.PollTask()
bw.logPollTaskError(err)
if err != nil {
if isNonRetriableError(err) {
// We retry "non retriable" errors while long polling for a while, because some proxies return
// unexpected values causing unnecessary downtime.
if isNonRetriableError(err) && bw.retrier.GetElapsedTime() > getRetryLongPollGracePeriod() {
bw.logger.Error("Worker received non-retriable error. Shutting down.", tagError, err)
if bw.fatalErrCb != nil {
bw.fatalErrCb(err)
Expand Down
6 changes: 3 additions & 3 deletions internal/internal_workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ func (s *WorkersTestSuite) TestActivityWorkerStop() {
TaskToken: []byte("token"),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: "wID",
RunId: "rID"},
RunId: "rID",
},
ActivityType: &commonpb.ActivityType{Name: "test"},
ActivityId: uuid.New(),
ScheduledTime: &now,
Expand Down Expand Up @@ -224,7 +225,6 @@ func (s *WorkersTestSuite) TestLongRunningWorkflowTask() {
}
ctx = WithLocalActivityOptions(ctx, lao)
err := ExecuteLocalActivity(ctx, localActivitySleep, time.Second).Get(ctx, nil)

if err != nil {
return err
}
Expand Down Expand Up @@ -290,6 +290,7 @@ func (s *WorkersTestSuite) TestLongRunningWorkflowTask() {
History: &historypb.History{Events: testEvents[0:3]},
NextPageToken: nil,
}
s.service.EXPECT().PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.PollWorkflowTaskQueueResponse{}, serviceerror.NewInvalidArgument("")).Times(1)
s.service.EXPECT().PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(task, nil).Times(1)
s.service.EXPECT().PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.PollWorkflowTaskQueueResponse{}, serviceerror.NewInternal("")).AnyTimes()
s.service.EXPECT().PollActivityTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.PollActivityTaskQueueResponse{}, nil).AnyTimes()
Expand Down Expand Up @@ -362,7 +363,6 @@ func (s *WorkersTestSuite) TestMultipleLocalActivities() {
}
ctx = WithLocalActivityOptions(ctx, lao)
err := ExecuteLocalActivity(ctx, localActivitySleep, time.Second).Get(ctx, nil)

if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"go.temporal.io/sdk/client"
contribtally "go.temporal.io/sdk/contrib/tally"
"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/internal"
"go.temporal.io/sdk/internal/common"
"go.temporal.io/sdk/internal/common/metrics"
"go.temporal.io/sdk/internal/interceptortest"
Expand Down Expand Up @@ -2276,6 +2277,8 @@ func (ts *IntegrationTestSuite) TestWorkerFatalErrorOnStart() {
}

func (ts *IntegrationTestSuite) testWorkerFatalError(useWorkerRun bool) {
// Allow the worker to fail faster so the test does not take 2 minutes.
internal.SetRetryLongPollGracePeriod(5 * time.Second)
// Make a new client that will fail a poll with a namespace not found
c, err := client.Dial(client.Options{
HostPort: ts.config.ServiceAddr,
Expand Down

0 comments on commit f554827

Please sign in to comment.