Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry "non retryable" error on worker long poll #1034

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions internal/common/backoff/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ func (c *ConcurrentRetrier) Throttle(doneCh <-chan struct{}) {
c.throttleInternal(doneCh)
}

// GetElapsedTime gets the amount of time since that last ConcurrentRetrier.Succeeded call
func (c *ConcurrentRetrier) GetElapsedTime() time.Duration {
c.Lock()
defer c.Unlock()
return c.retrier.GetElapsedTime()
}

func (c *ConcurrentRetrier) throttleInternal(doneCh <-chan struct{}) time.Duration {
next := done

Expand Down
7 changes: 5 additions & 2 deletions internal/common/backoff/retrypolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type (

// Retrier manages the state of retry operation
Retrier interface {
GetElapsedTime() time.Duration
NextBackOff() time.Duration
Reset()
}
Expand Down Expand Up @@ -195,13 +196,15 @@ func (r *retrierImpl) Reset() {

// NextBackOff returns the next delay interval. This is used by Retry to delay calling the operation again
func (r *retrierImpl) NextBackOff() time.Duration {
nextInterval := r.policy.ComputeNextDelay(r.getElapsedTime(), r.currentAttempt)
nextInterval := r.policy.ComputeNextDelay(r.GetElapsedTime(), r.currentAttempt)

// Now increment the current attempt
r.currentAttempt++
return nextInterval
}

func (r *retrierImpl) getElapsedTime() time.Duration {
// GetElapsedTime returns the amount of time since the retrier was created or the last reset,
// whatever was sooner.
func (r *retrierImpl) GetElapsedTime() time.Duration {
return r.clock.Now().Sub(r.startTime)
}
5 changes: 4 additions & 1 deletion internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
retryPollOperationMaxInterval = 10 * time.Second
retryPollResourceExhaustedInitialInterval = time.Second
retryPollResourceExhaustedMaxInterval = 10 * time.Second
retryLongPollGracePeriod = 2 * time.Minute
// How long the same poll task error can remain suppressed
lastPollTaskErrSuppressTime = 1 * time.Minute
)
Expand Down Expand Up @@ -332,7 +333,9 @@ func (bw *baseWorker) pollTask() {
task, err = bw.options.taskWorker.PollTask()
bw.logPollTaskError(err)
if err != nil {
if isNonRetriableError(err) {
// We retry "non retriable" errors while long polling for a while, because some proxies return
// unexpected values causing unnecessary downtime.
if isNonRetriableError(err) && bw.retrier.GetElapsedTime() > retryLongPollGracePeriod {
bw.logger.Error("Worker received non-retriable error. Shutting down.", tagError, err)
if bw.fatalErrCb != nil {
bw.fatalErrCb(err)
Expand Down
6 changes: 3 additions & 3 deletions internal/internal_workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ func (s *WorkersTestSuite) TestActivityWorkerStop() {
TaskToken: []byte("token"),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: "wID",
RunId: "rID"},
RunId: "rID",
},
ActivityType: &commonpb.ActivityType{Name: "test"},
ActivityId: uuid.New(),
ScheduledTime: &now,
Expand Down Expand Up @@ -224,7 +225,6 @@ func (s *WorkersTestSuite) TestLongRunningWorkflowTask() {
}
ctx = WithLocalActivityOptions(ctx, lao)
err := ExecuteLocalActivity(ctx, localActivitySleep, time.Second).Get(ctx, nil)

if err != nil {
return err
}
Expand Down Expand Up @@ -290,6 +290,7 @@ func (s *WorkersTestSuite) TestLongRunningWorkflowTask() {
History: &historypb.History{Events: testEvents[0:3]},
NextPageToken: nil,
}
s.service.EXPECT().PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.PollWorkflowTaskQueueResponse{}, serviceerror.NewInvalidArgument("")).Times(1)
s.service.EXPECT().PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(task, nil).Times(1)
s.service.EXPECT().PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.PollWorkflowTaskQueueResponse{}, serviceerror.NewInternal("")).AnyTimes()
s.service.EXPECT().PollActivityTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.PollActivityTaskQueueResponse{}, nil).AnyTimes()
Expand Down Expand Up @@ -362,7 +363,6 @@ func (s *WorkersTestSuite) TestMultipleLocalActivities() {
}
ctx = WithLocalActivityOptions(ctx, lao)
err := ExecuteLocalActivity(ctx, localActivitySleep, time.Second).Get(ctx, nil)

if err != nil {
return err
}
Expand Down