diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index fc49a63ab..3a56120a0 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -51,7 +51,6 @@ const ( retryPollOperationMaxInterval = 10 * time.Second retryPollResourceExhaustedInitialInterval = time.Second retryPollResourceExhaustedMaxInterval = 10 * time.Second - retryLongPollGracePeriod = 2 * time.Minute // How long the same poll task error can remain suppressed lastPollTaskErrSuppressTime = 1 * time.Minute ) @@ -59,6 +58,7 @@ const ( var ( pollOperationRetryPolicy = createPollRetryPolicy() pollResourceExhaustedRetryPolicy = createPollResourceExhaustedRetryPolicy() + retryLongPollGracePeriod = 2 * time.Minute ) var errStop = errors.New("worker stopping") @@ -194,6 +194,17 @@ type ( } ) +// SetRetryLongPollGracePeriod sets the amount of time a long poller retys on +// fatal errors before it actually fails. For test use only, +// not safe to call with a running worker. +func SetRetryLongPollGracePeriod(period time.Duration) { + retryLongPollGracePeriod = period +} + +func getRetryLongPollGracePeriod() time.Duration { + return retryLongPollGracePeriod +} + func createPollRetryPolicy() backoff.RetryPolicy { policy := backoff.NewExponentialRetryPolicy(retryPollOperationInitialInterval) policy.SetMaximumInterval(retryPollOperationMaxInterval) @@ -335,7 +346,7 @@ func (bw *baseWorker) pollTask() { if err != nil { // We retry "non retriable" errors while long polling for a while, because some proxies return // unexpected values causing unnecessary downtime. - if isNonRetriableError(err) && bw.retrier.GetElapsedTime() > retryLongPollGracePeriod { + if isNonRetriableError(err) && bw.retrier.GetElapsedTime() > getRetryLongPollGracePeriod() { bw.logger.Error("Worker received non-retriable error. Shutting down.", tagError, err) if bw.fatalErrCb != nil { bw.fatalErrCb(err) diff --git a/test/integration_test.go b/test/integration_test.go index bf0d4756d..9705dfde7 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -62,6 +62,7 @@ import ( "go.temporal.io/sdk/client" contribtally "go.temporal.io/sdk/contrib/tally" "go.temporal.io/sdk/interceptor" + "go.temporal.io/sdk/internal" "go.temporal.io/sdk/internal/common" "go.temporal.io/sdk/internal/common/metrics" "go.temporal.io/sdk/internal/interceptortest" @@ -2276,6 +2277,8 @@ func (ts *IntegrationTestSuite) TestWorkerFatalErrorOnStart() { } func (ts *IntegrationTestSuite) testWorkerFatalError(useWorkerRun bool) { + // Allow the worker to fail faster so the test does not take 2 minutes. + internal.SetRetryLongPollGracePeriod(5 * time.Second) // Make a new client that will fail a poll with a namespace not found c, err := client.Dial(client.Options{ HostPort: ts.config.ServiceAddr,