Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds automatic population of unset retrypolicy field when starting Workflows #654

Merged
merged 14 commits into from
Aug 4, 2020
10 changes: 0 additions & 10 deletions common/defaultActivityRetrySettings.go

This file was deleted.

13 changes: 13 additions & 0 deletions common/defaultRetrySettings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package common

import "time"

// DefaultRetrySettings indicates what the "default" retry settings
// are if it is not specified on an Activity or for any unset fields
// if a policy is explicitly set on a workflow
type DefaultRetrySettings struct {
InitialInterval time.Duration
MaximumIntervalCoefficient float64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it called MaximumIntervalCoefficient? I think we should just call it MaximumInterval and use Duration as type.

Copy link
Member Author

@mastermanu mastermanu Aug 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We needed to initialize maximum interval in the case maximum interval was not set by the user, but a default maximum value might be less than the initial interval, which would then fail validation.

Btw, this change is already part of code-complete (this is just moving code around) and Maxim and I walked through this specific choice to use Coefficient vs. Interval before we landed this (it was initially an Interval).

Happy to discuss offline if we want to make further changes here though.

BackoffCoefficient float64
MaximumAttempts int32
}
15 changes: 15 additions & 0 deletions common/service/dynamicconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ type MapPropertyFn func(opts ...FilterOption) map[string]interface{}
// StringPropertyFnWithNamespaceFilter is a wrapper to get string property from dynamic config
type StringPropertyFnWithNamespaceFilter func(namespace string) string

// MapPropertyFnWithNamespaceFilter is a wrapper to get map property from dynamic config
type MapPropertyFnWithNamespaceFilter func(namespace string) map[string]interface{}

// BoolPropertyFnWithNamespaceFilter is a wrapper to get bool property from dynamic config
type BoolPropertyFnWithNamespaceFilter func(namespace string) bool

Expand Down Expand Up @@ -358,6 +361,18 @@ func (c *Collection) GetStringPropertyFnWithNamespaceFilter(key Key, defaultValu
}
}

// GetMapPropertyFnWithNamespaceFilter gets property and asserts that it's a map
func (c *Collection) GetMapPropertyFnWithNamespaceFilter(key Key, defaultValue map[string]interface{}) MapPropertyFnWithNamespaceFilter {
return func(namespace string) map[string]interface{} {
val, err := c.client.GetMapValue(key, getFilterMap(NamespaceFilter(namespace)), defaultValue)
if err != nil {
c.logError(key, err)
}
c.logValue(key, val, defaultValue, reflect.DeepEqual)
return val
}
}

// GetBoolPropertyFnWithNamespaceFilter gets property with namespace filter and asserts that its namespace
func (c *Collection) GetBoolPropertyFnWithNamespaceFilter(key Key, defaultValue bool) BoolPropertyFnWithNamespaceFilter {
return func(namespace string) bool {
Expand Down
5 changes: 5 additions & 0 deletions common/service/dynamicconfig/config_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,8 @@ func GetStringPropertyFn(value string) func(opts ...FilterOption) string {
func GetMapPropertyFn(value map[string]interface{}) func(opts ...FilterOption) map[string]interface{} {
return func(...FilterOption) map[string]interface{} { return value }
}

// GetMapPropertyFnWithNamespaceFilter returns value as MapPropertyFn
func GetMapPropertyFnWithNamespaceFilter(value map[string]interface{}) func(namespace string) map[string]interface{} {
return func(namespace string) map[string]interface{} { return value }
}
4 changes: 4 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ var keys = map[Key]string{
EnableDropStuckTaskByNamespaceID: "history.DropStuckTaskByNamespace",
SkipReapplicationByNamespaceId: "history.SkipReapplicationByNamespaceId",
DefaultActivityRetryPolicy: "history.defaultActivityRetryPolicy",
DefaultWorkflowRetryPolicy: "history.defaultWorkflowRetryPolicy",

WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS",
WorkerPersistenceGlobalMaxQPS: "worker.persistenceGlobalMaxQPS",
Expand Down Expand Up @@ -635,6 +636,9 @@ const (
// DefaultActivityRetryPolicy represents the out-of-box retry policy for activities where
// the user has not specified an explicit RetryPolicy
DefaultActivityRetryPolicy
// DefaultWorkflowRetryPolicy represents the out-of-box retry policy for unset fields
// where the user has set an explicit RetryPolicy, but not specified all the fields
DefaultWorkflowRetryPolicy

// EnableAdminProtection is whether to enable admin checking
EnableAdminProtection
Expand Down
61 changes: 58 additions & 3 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
workflowspb "go.temporal.io/server/api/workflow/v1"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/matchingservice/v1"
Expand Down Expand Up @@ -79,6 +80,16 @@ const (
retryKafkaOperationMaxInterval = 10 * time.Second
retryKafkaOperationExpirationInterval = 30 * time.Second

defaultInitialInterval = time.Second
defaultMaximumIntervalCoefficient = 100.0
defaultBackoffCoefficient = 2.0
defaultMaximumAttempts = 0

initialIntervalInSecondsConfigKey = "InitialIntervalInSeconds"
maximumIntervalCoefficientConfigKey = "MaximumIntervalCoefficient"
backoffCoefficientConfigKey = "BackoffCoefficient"
maximumAttemptsConfigKey = "MaximumAttempts"

contextExpireThreshold = 10 * time.Millisecond

// FailureReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit
Expand Down Expand Up @@ -370,13 +381,13 @@ func SortInt64Slice(slice []int64) {
}

// EnsureRetryPolicyDefaults ensures the policy subfields, if not explicitly set, are set to the specified defaults
func EnsureRetryPolicyDefaults(originalPolicy *commonpb.RetryPolicy, defaultSettings DefaultActivityRetrySettings) {
func EnsureRetryPolicyDefaults(originalPolicy *commonpb.RetryPolicy, defaultSettings DefaultRetrySettings) {
if originalPolicy.GetMaximumAttempts() == 0 {
originalPolicy.MaximumAttempts = defaultSettings.MaximumAttempts
}

if timestamp.DurationValue(originalPolicy.GetInitialInterval()) == 0 {
originalPolicy.InitialInterval = timestamp.DurationPtr(time.Duration(defaultSettings.InitialIntervalInSeconds) * time.Second)
originalPolicy.InitialInterval = timestamp.DurationPtr(defaultSettings.InitialInterval)
}

if timestamp.DurationValue(originalPolicy.GetMaximumInterval()) == 0 {
Expand All @@ -394,6 +405,7 @@ func ValidateRetryPolicy(policy *commonpb.RetryPolicy) error {
// nil policy is valid which means no retry
return nil
}

if policy.GetMaximumAttempts() == 1 {
// One maximum attempt effectively disable retries. Validating the
// rest of the arguments is pointless
Expand All @@ -417,18 +429,61 @@ func ValidateRetryPolicy(policy *commonpb.RetryPolicy) error {
return nil
}

func GetDefaultRetryPolicyConfigOptions() map[string]interface{} {
return map[string]interface{}{
initialIntervalInSecondsConfigKey: int(defaultInitialInterval.Seconds()),
maximumIntervalCoefficientConfigKey: defaultMaximumIntervalCoefficient,
backoffCoefficientConfigKey: defaultBackoffCoefficient,
maximumAttemptsConfigKey: defaultMaximumAttempts,
}
}

func FromConfigToDefaultRetrySettings(options map[string]interface{}) DefaultRetrySettings {
defaultSettings := DefaultRetrySettings{
InitialInterval: defaultInitialInterval,
MaximumIntervalCoefficient: defaultMaximumIntervalCoefficient,
BackoffCoefficient: defaultBackoffCoefficient,
MaximumAttempts: defaultMaximumAttempts,
}

initialIntervalInSeconds, ok := options[initialIntervalInSecondsConfigKey]
if ok {
defaultSettings.InitialInterval = time.Duration(initialIntervalInSeconds.(int)) * time.Second
}

maximumIntervalCoefficient, ok := options[maximumIntervalCoefficientConfigKey]
if ok {
defaultSettings.MaximumIntervalCoefficient = maximumIntervalCoefficient.(float64)
}

backoffCoefficient, ok := options[backoffCoefficientConfigKey]
if ok {
defaultSettings.BackoffCoefficient = backoffCoefficient.(float64)
}

maximumAttempts, ok := options[maximumAttemptsConfigKey]
if ok {
defaultSettings.MaximumAttempts = int32(maximumAttempts.(int))
}

return defaultSettings
}

// CreateHistoryStartWorkflowRequest create a start workflow request for history
func CreateHistoryStartWorkflowRequest(
namespaceID string,
startRequest *workflowservice.StartWorkflowExecutionRequest,
parentExecutionInfo *workflowspb.ParentExecutionInfo,
now time.Time,
) *historyservice.StartWorkflowExecutionRequest {
now := time.Now()
histRequest := &historyservice.StartWorkflowExecutionRequest{
NamespaceId: namespaceID,
StartRequest: startRequest,
ContinueAsNewInitiator: enumspb.CONTINUE_AS_NEW_INITIATOR_WORKFLOW,
Attempt: 1,
ParentExecutionInfo: parentExecutionInfo,
}

if timestamp.DurationValue(startRequest.GetWorkflowExecutionTimeout()) > 0 {
deadline := now.Add(timestamp.DurationValue(startRequest.GetWorkflowExecutionTimeout()))
histRequest.WorkflowExecutionExpirationTime = timestamp.TimePtr(deadline.Round(time.Millisecond))
Expand Down
21 changes: 18 additions & 3 deletions common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func TestValidateRetryPolicy(t *testing.T) {
}

func TestEnsureRetryPolicyDefaults(t *testing.T) {
defaultActivityRetrySettings := DefaultActivityRetrySettings{
InitialIntervalInSeconds: 1,
defaultRetrySettings := DefaultRetrySettings{
InitialInterval: time.Second,
MaximumIntervalCoefficient: 100,
BackoffCoefficient: 2.0,
MaximumAttempts: 120,
Expand Down Expand Up @@ -181,8 +181,23 @@ func TestEnsureRetryPolicyDefaults(t *testing.T) {

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
EnsureRetryPolicyDefaults(tt.input, defaultActivityRetrySettings)
EnsureRetryPolicyDefaults(tt.input, defaultRetrySettings)
assert.Equal(t, tt.want, tt.input)
})
}
}

func Test_FromConfigToRetryPolicy(t *testing.T) {
options := map[string]interface{}{
initialIntervalInSecondsConfigKey: 2,
maximumIntervalCoefficientConfigKey: 100.0,
backoffCoefficientConfigKey: 4.0,
maximumAttemptsConfigKey: 5,
}

defaultSettings := FromConfigToDefaultRetrySettings(options)
assert.Equal(t, 2*time.Second, defaultSettings.InitialInterval)
assert.Equal(t, 100.0, defaultSettings.MaximumIntervalCoefficient)
assert.Equal(t, 4.0, defaultSettings.BackoffCoefficient)
assert.Equal(t, int32(5), defaultSettings.MaximumAttempts)
}
6 changes: 6 additions & 0 deletions config/dynamicconfig/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,9 @@ history.defaultActivityRetryPolicy:
MaximumIntervalCoefficient: 100.0
BackoffCoefficient: 2.0
MaximumAttempts: 0
history.defaultWorkflowRetryPolicy:
- value:
InitialIntervalInSeconds: 1
MaximumIntervalCoefficient: 100.0
BackoffCoefficient: 2.0
MaximumAttempts: 0
2 changes: 1 addition & 1 deletion host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ func (s *integrationSuite) TestWorkflowRetry() {
WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),
Identity: identity,
RetryPolicy: &commonpb.RetryPolicy{
InitialInterval: &initialInterval,
// Intentionally test server-initialization of Initial Interval value (which should be 1 second)
MaximumAttempts: int32(maximumAttempts),
MaximumInterval: timestamp.DurationPtr(1 * time.Second),
NonRetryableErrorTypes: []string{"bad-bug"},
Expand Down
5 changes: 5 additions & 0 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ type Config struct {
SearchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
SearchAttributesTotalSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter

// DefaultWorkflowRetryPolicy represents default values for unset fields on a Workflow's
// specified RetryPolicy
DefaultWorkflowRetryPolicy dynamicconfig.MapPropertyFnWithNamespaceFilter

// VisibilityArchival system protection
VisibilityArchivalQueryMaxPageSize dynamicconfig.IntPropertyFn

Expand Down Expand Up @@ -144,6 +148,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableReadFro
SendRawWorkflowHistory: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.SendRawWorkflowHistory, false),
EnableRPCReplication: dc.GetBoolProperty(dynamicconfig.FrontendEnableRPCReplication, false),
EnableCleanupReplicationTask: dc.GetBoolProperty(dynamicconfig.FrontendEnableCleanupReplicationTask, true),
DefaultWorkflowRetryPolicy: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.DefaultWorkflowRetryPolicy, common.GetDefaultRetryPolicyConfigOptions()),
}
}

Expand Down
38 changes: 26 additions & 12 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/service/dynamicconfig"
)

const (
Expand All @@ -87,15 +88,16 @@ type (
WorkflowHandler struct {
resource.Resource

shuttingDown int32
healthStatus int32
tokenSerializer common.TaskTokenSerializer
rateLimiter quotas.Policy
config *Config
versionChecker headers.VersionChecker
namespaceHandler namespace.Handler
visibilityQueryValidator *validator.VisibilityQueryValidator
searchAttributesValidator *validator.SearchAttributesValidator
shuttingDown int32
healthStatus int32
tokenSerializer common.TaskTokenSerializer
rateLimiter quotas.Policy
config *Config
versionChecker headers.VersionChecker
namespaceHandler namespace.Handler
visibilityQueryValidator *validator.VisibilityQueryValidator
searchAttributesValidator *validator.SearchAttributesValidator
getDefaultWorkflowRetrySettings dynamicconfig.MapPropertyFnWithNamespaceFilter
}

// HealthStatus is an enum that refers to the rpc handler health status
Expand Down Expand Up @@ -151,6 +153,7 @@ func NewWorkflowHandler(
config.SearchAttributesSizeOfValueLimit,
config.SearchAttributesTotalSizeLimit,
),
getDefaultWorkflowRetrySettings: config.DefaultWorkflowRetryPolicy,
}

return handler
Expand Down Expand Up @@ -425,7 +428,7 @@ func (wh *WorkflowHandler) StartWorkflowExecution(ctx context.Context, request *
return nil, wh.error(errWorkflowIDTooLong, scope)
}

if err := common.ValidateRetryPolicy(request.RetryPolicy); err != nil {
if err := wh.validateRetryPolicy(request.GetNamespace(), request.RetryPolicy); err != nil {
return nil, wh.error(err, scope)
}

Expand Down Expand Up @@ -505,7 +508,7 @@ func (wh *WorkflowHandler) StartWorkflowExecution(ctx context.Context, request *
}

wh.GetLogger().Debug("Start workflow execution request namespaceID", tag.WorkflowNamespaceID(namespaceID))
resp, err := wh.GetHistoryClient().StartWorkflowExecution(ctx, common.CreateHistoryStartWorkflowRequest(namespaceID, request))
resp, err := wh.GetHistoryClient().StartWorkflowExecution(ctx, common.CreateHistoryStartWorkflowRequest(namespaceID, request, nil, time.Now()))

if err != nil {
return nil, wh.error(err, scope)
Expand Down Expand Up @@ -2153,7 +2156,7 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context,
return nil, wh.error(errInvalidWorkflowTaskTimeoutSeconds, scope)
}

if err := common.ValidateRetryPolicy(request.RetryPolicy); err != nil {
if err := wh.validateRetryPolicy(request.GetNamespace(), request.RetryPolicy); err != nil {
return nil, wh.error(err, scope)
}

Expand Down Expand Up @@ -3741,3 +3744,14 @@ func (hs HealthStatus) String() string {
return "unknown"
}
}

func (wh *WorkflowHandler) validateRetryPolicy(namespace string, retryPolicy *commonpb.RetryPolicy) error {
if retryPolicy == nil {
// By default, if the user does not explicitly set a retry policy for a Workflow, do not perform any retries.
return nil
}

defaultWorkflowRetrySettings := common.FromConfigToDefaultRetrySettings(wh.getDefaultWorkflowRetrySettings(namespace))
common.EnsureRetryPolicyDefaults(retryPolicy, defaultWorkflowRetrySettings)
return common.ValidateRetryPolicy(retryPolicy)
}
Loading