Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sets default WorkflowExecutionTimeout and WorkflowRunTimeout on the request itself. #763

Merged
merged 6 commits into from
Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,22 @@ const (
MaxWorkflowRetentionPeriod = 30 * time.Hour * 24
)

const (
// DefaultWorkflowExecutionTimeout is the Default Workflow Execution timeout applied to a Workflow when
// this value is not explicitly set by the user on a Start Workflow request
// Intention is 10 years
DefaultWorkflowExecutionTimeout = 24 * 365 * 10 * time.Hour

// DefaultWorkflowRunTimeout is the Default Workflow Run timeout applied to a Workflow when
// this value is not explicitly set by the user on a Start Workflow request
// Intention is 10 years
DefaultWorkflowRunTimeout = 24 * 365 * 10 * time.Hour

// DefaultWorkflowTaskTimeout sets the Default Workflow Task timeout for a Workflow
// when the value is not explicitly set by the user. Intention is 10 seconds.
DefaultWorkflowTaskTimeout = 10 * time.Second
)

const (
// DefaultTransactionSizeLimit is the largest allowed transaction size to persistence
DefaultTransactionSizeLimit = 14 * 1024 * 1024
Expand Down
52 changes: 52 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/service/dynamicconfig"
serviceerrors "go.temporal.io/server/common/serviceerror"
)

Expand Down Expand Up @@ -668,3 +669,54 @@ func GetPayloadsMapSize(data map[string]*commonpb.Payloads) int {

return size
}

// GetWorkflowExecutionTimeout gets the default allowed execution timeout or truncates the requested value to the maximum allowed timeout
func GetWorkflowExecutionTimeout(
mastermanu marked this conversation as resolved.
Show resolved Hide resolved
namespace string,
requestedTimeout time.Duration,
getDefaultTimeoutFunc dynamicconfig.DurationPropertyFnWithNamespaceFilter,
getMaxAllowedTimeoutFunc dynamicconfig.DurationPropertyFnWithNamespaceFilter) time.Duration {

if requestedTimeout == 0 {
requestedTimeout = getDefaultTimeoutFunc(namespace)
}

return timestamp.MinDuration(
requestedTimeout,
getMaxAllowedTimeoutFunc(namespace),
)
}

// GetWorkflowRunTimeout gets the default allowed run timeout or truncates the requested value to the maximum allowed timeout
func GetWorkflowRunTimeout(
namespace string,
requestedTimeout time.Duration,
executionTimeout time.Duration,
getDefaultTimeoutFunc dynamicconfig.DurationPropertyFnWithNamespaceFilter,
getMaxAllowedTimeoutFunc dynamicconfig.DurationPropertyFnWithNamespaceFilter) time.Duration {

if requestedTimeout == 0 {
requestedTimeout = getDefaultTimeoutFunc(namespace)
}

return timestamp.MinDuration(
timestamp.MinDuration(
requestedTimeout,
executionTimeout,
),
getMaxAllowedTimeoutFunc(namespace),
)
}

// GetWorkflowTaskTimeout gets the default allowed execution timeout or truncates the requested value to the maximum allowed timeout
func GetWorkflowTaskTimeout(
namespace string,
requestedTimeout time.Duration,
getDefaultTimeoutFunc dynamicconfig.DurationPropertyFnWithNamespaceFilter) time.Duration {

if requestedTimeout == 0 {
return getDefaultTimeoutFunc(namespace)
}

return requestedTimeout
}
18 changes: 18 additions & 0 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,19 @@ type Config struct {

EnableRPCReplication dynamicconfig.BoolPropertyFn
EnableCleanupReplicationTask dynamicconfig.BoolPropertyFn

// The execution timeout a workflow execution defaults to if not specified
DefaultWorkflowExecutionTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
// The run timeout a workflow run defaults to if not specified
DefaultWorkflowRunTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter

// The execution timeout a workflow execution defaults to if not specified
MaxWorkflowExecutionTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
// The run timeout a workflow run defaults to if not specified
MaxWorkflowRunTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter

// DefaultWorkflowTaskTimeout the default workflow task timeout
DefaultWorkflowTaskTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
}

// NewConfig returns new service config with default values
Expand Down Expand Up @@ -149,6 +162,11 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableReadFro
EnableRPCReplication: dc.GetBoolProperty(dynamicconfig.FrontendEnableRPCReplication, false),
EnableCleanupReplicationTask: dc.GetBoolProperty(dynamicconfig.FrontendEnableCleanupReplicationTask, true),
DefaultWorkflowRetryPolicy: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.DefaultWorkflowRetryPolicy, common.GetDefaultRetryPolicyConfigOptions()),
DefaultWorkflowExecutionTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowExecutionTimeout, common.DefaultWorkflowExecutionTimeout),
DefaultWorkflowRunTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowRunTimeout, common.DefaultWorkflowRunTimeout),
MaxWorkflowExecutionTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowExecutionTimeout, common.DefaultWorkflowExecutionTimeout),
MaxWorkflowRunTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowRunTimeout, common.DefaultWorkflowRunTimeout),
DefaultWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowTaskTimeout, common.DefaultWorkflowTaskTimeout),
}
}

Expand Down
114 changes: 94 additions & 20 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,16 +452,8 @@ func (wh *WorkflowHandler) StartWorkflowExecution(ctx context.Context, request *
return nil, err
}

if timestamp.DurationValue(request.GetWorkflowExecutionTimeout()) < 0 {
return nil, wh.error(errInvalidWorkflowExecutionTimeoutSeconds, scope)
}

if timestamp.DurationValue(request.GetWorkflowRunTimeout()) < 0 {
return nil, wh.error(errInvalidWorkflowRunTimeoutSeconds, scope)
}

if timestamp.DurationValue(request.GetWorkflowTaskTimeout()) < 0 {
return nil, wh.error(errInvalidWorkflowTaskTimeoutSeconds, scope)
if err := wh.validateStartWorkflowTimeouts(scope, request); err != nil {
return nil, err
}

if request.GetRequestId() == "" {
Expand Down Expand Up @@ -2144,16 +2136,8 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context,
return nil, wh.error(errRequestIDTooLong, scope)
}

if timestamp.DurationValue(request.GetWorkflowExecutionTimeout()) < 0 {
return nil, wh.error(errInvalidWorkflowExecutionTimeoutSeconds, scope)
}

if timestamp.DurationValue(request.GetWorkflowRunTimeout()) < 0 {
return nil, wh.error(errInvalidWorkflowRunTimeoutSeconds, scope)
}

if timestamp.DurationValue(request.GetWorkflowTaskTimeout()) < 0 {
return nil, wh.error(errInvalidWorkflowTaskTimeoutSeconds, scope)
if err := wh.validateSignalWithStartWorkflowTimeouts(scope, request); err != nil {
return nil, err
}

if err := wh.validateRetryPolicy(request.GetNamespace(), request.RetryPolicy); err != nil {
Expand Down Expand Up @@ -3755,3 +3739,93 @@ func (wh *WorkflowHandler) validateRetryPolicy(namespace string, retryPolicy *co
common.EnsureRetryPolicyDefaults(retryPolicy, defaultWorkflowRetrySettings)
return common.ValidateRetryPolicy(retryPolicy)
}

func (wh *WorkflowHandler) validateStartWorkflowTimeouts(
scope metrics.Scope,
request *workflowservice.StartWorkflowExecutionRequest) error {
if timestamp.DurationValue(request.GetWorkflowExecutionTimeout()) < 0 {
return wh.error(errInvalidWorkflowExecutionTimeoutSeconds, scope)
}

if timestamp.DurationValue(request.GetWorkflowRunTimeout()) < 0 {
return wh.error(errInvalidWorkflowRunTimeoutSeconds, scope)
}

if timestamp.DurationValue(request.GetWorkflowTaskTimeout()) < 0 {
return wh.error(errInvalidWorkflowTaskTimeoutSeconds, scope)
}

request.WorkflowExecutionTimeout = timestamp.DurationPtr(
common.GetWorkflowExecutionTimeout(
request.GetNamespace(),
timestamp.DurationValue(request.GetWorkflowExecutionTimeout()),
wh.config.DefaultWorkflowExecutionTimeout,
wh.config.MaxWorkflowExecutionTimeout,
),
)

request.WorkflowRunTimeout = timestamp.DurationPtr(
common.GetWorkflowRunTimeout(
request.GetNamespace(),
timestamp.DurationValue(request.GetWorkflowRunTimeout()),
timestamp.DurationValue(request.GetWorkflowExecutionTimeout()),
wh.config.DefaultWorkflowRunTimeout,
wh.config.MaxWorkflowRunTimeout,
),
)

request.WorkflowTaskTimeout = timestamp.DurationPtr(
common.GetWorkflowTaskTimeout(
request.GetNamespace(),
timestamp.DurationValue(request.GetWorkflowTaskTimeout()),
wh.config.DefaultWorkflowTaskTimeout,
),
)

return nil
}

func (wh *WorkflowHandler) validateSignalWithStartWorkflowTimeouts(
scope metrics.Scope,
request *workflowservice.SignalWithStartWorkflowExecutionRequest) error {
if timestamp.DurationValue(request.GetWorkflowExecutionTimeout()) < 0 {
return wh.error(errInvalidWorkflowExecutionTimeoutSeconds, scope)
}

if timestamp.DurationValue(request.GetWorkflowRunTimeout()) < 0 {
return wh.error(errInvalidWorkflowRunTimeoutSeconds, scope)
}

if timestamp.DurationValue(request.GetWorkflowTaskTimeout()) < 0 {
return wh.error(errInvalidWorkflowTaskTimeoutSeconds, scope)
}

request.WorkflowExecutionTimeout = timestamp.DurationPtr(
common.GetWorkflowExecutionTimeout(
request.GetNamespace(),
timestamp.DurationValue(request.GetWorkflowExecutionTimeout()),
wh.config.DefaultWorkflowExecutionTimeout,
wh.config.MaxWorkflowExecutionTimeout,
),
)

request.WorkflowRunTimeout = timestamp.DurationPtr(
common.GetWorkflowRunTimeout(
request.GetNamespace(),
timestamp.DurationValue(request.GetWorkflowRunTimeout()),
timestamp.DurationValue(request.GetWorkflowExecutionTimeout()),
wh.config.DefaultWorkflowRunTimeout,
wh.config.MaxWorkflowRunTimeout,
),
)

request.WorkflowTaskTimeout = timestamp.DurationPtr(
common.GetWorkflowTaskTimeout(
request.GetNamespace(),
timestamp.DurationValue(request.GetWorkflowTaskTimeout()),
wh.config.DefaultWorkflowTaskTimeout,
),
)

return nil
}
10 changes: 5 additions & 5 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, storeType strin
VisibilityOpenMaxQPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryVisibilityOpenMaxQPS, 300),
VisibilityClosedMaxQPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryVisibilityClosedMaxQPS, 300),
MaxAutoResetPoints: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryMaxAutoResetPoints, defaultHistoryMaxAutoResetPoints),
DefaultWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowTaskTimeout, time.Second*10),
DefaultWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowTaskTimeout, common.DefaultWorkflowTaskTimeout),
MaxWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowTaskTimeout, time.Second*60),
AdvancedVisibilityWritingMode: dc.GetStringProperty(dynamicconfig.AdvancedVisibilityWritingMode, common.GetDefaultAdvancedVisibilityWritingMode(isAdvancedVisConfigExist)),
EmitShardDiffLog: dc.GetBoolProperty(dynamicconfig.EmitShardDiffLog, false),
Expand Down Expand Up @@ -387,10 +387,10 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, storeType strin
SearchAttributesTotalSizeLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.SearchAttributesTotalSizeLimit, 40*1024),
StickyTTL: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.StickyTTL, time.Hour*24*365),
WorkflowTaskHeartbeatTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.WorkflowTaskHeartbeatTimeout, time.Minute*30),
DefaultWorkflowExecutionTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowExecutionTimeout, time.Hour*24*365*10),
DefaultWorkflowRunTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowRunTimeout, time.Hour*24*365*10),
MaxWorkflowExecutionTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowExecutionTimeout, time.Hour*24*365*10),
MaxWorkflowRunTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowRunTimeout, time.Hour*24*365*10),
DefaultWorkflowExecutionTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowExecutionTimeout, common.DefaultWorkflowExecutionTimeout),
DefaultWorkflowRunTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowRunTimeout, common.DefaultWorkflowRunTimeout),
MaxWorkflowExecutionTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowExecutionTimeout, common.DefaultWorkflowExecutionTimeout),
MaxWorkflowRunTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowRunTimeout, common.DefaultWorkflowRunTimeout),
ReplicationTaskFetcherParallelism: dc.GetIntProperty(dynamicconfig.ReplicationTaskFetcherParallelism, 1),
ReplicationTaskFetcherAggregationInterval: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherAggregationInterval, 2*time.Second),
ReplicationTaskFetcherTimerJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicationTaskFetcherTimerJitterCoefficient, 0.15),
Expand Down
30 changes: 13 additions & 17 deletions service/history/workflowExecutionUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,26 +230,22 @@ func terminateWorkflow(
}

func getWorkflowExecutionTimeout(namespace string, requestedTimeout time.Duration, serviceConfig *Config) time.Duration {
executionTimeoutSeconds := requestedTimeout
if executionTimeoutSeconds == 0 {
executionTimeoutSeconds = timestamp.RoundUp(serviceConfig.DefaultWorkflowExecutionTimeout(namespace))
}
maxWorkflowExecutionTimeout := timestamp.RoundUp(serviceConfig.MaxWorkflowExecutionTimeout(namespace))
executionTimeoutSeconds = timestamp.MinDuration(executionTimeoutSeconds, maxWorkflowExecutionTimeout)

return executionTimeoutSeconds
return common.GetWorkflowExecutionTimeout(
namespace,
requestedTimeout,
serviceConfig.DefaultWorkflowExecutionTimeout,
serviceConfig.MaxWorkflowExecutionTimeout,
)
}

func getWorkflowRunTimeout(namespace string, requestedTimeout, executionTimeout time.Duration, serviceConfig *Config) time.Duration {
runTimeoutSeconds := requestedTimeout
if runTimeoutSeconds == 0 {
runTimeoutSeconds = timestamp.RoundUp(serviceConfig.DefaultWorkflowRunTimeout(namespace))
}
maxWorkflowRunTimeout := timestamp.RoundUp(serviceConfig.MaxWorkflowRunTimeout(namespace))
runTimeoutSeconds = timestamp.MinDuration(runTimeoutSeconds, maxWorkflowRunTimeout)
runTimeoutSeconds = timestamp.MinDuration(runTimeoutSeconds, executionTimeout)

return runTimeoutSeconds
return common.GetWorkflowRunTimeout(
namespace,
requestedTimeout,
executionTimeout,
serviceConfig.DefaultWorkflowRunTimeout,
serviceConfig.MaxWorkflowRunTimeout,
)
}

// FindAutoResetPoint returns the auto reset point
Expand Down