Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor handler for workflowIdReusePolicy #5486

Merged
merged 1 commit into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,13 @@ func createWorkflowMutationFunction(
return nil, nil
}
currentExecutionState := currentWorkflowLease.GetMutableState().GetExecutionState()
workflowMutationFunc, err := api.ApplyWorkflowIDReusePolicy(
currentExecutionState.CreateRequestId,
workflowMutationFunc, err := api.ResolveDuplicateWorkflowID(
currentWorkflowLease.GetContext().GetWorkflowKey().WorkflowID,
newRunID,
currentExecutionState.RunId,
currentExecutionState.State,
currentExecutionState.Status,
currentWorkflowLease.GetContext().GetWorkflowKey().WorkflowID,
newRunID,
currentExecutionState.CreateRequestId,
workflowIDReusePolicy,
)
return workflowMutationFunc, err
Expand Down
30 changes: 17 additions & 13 deletions service/history/api/startworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,23 +282,30 @@ func (s *Starter) handleConflict(
creationParams *creationParams,
currentWorkflowConditionFailed *persistence.CurrentWorkflowConditionFailedError,
) (*historyservice.StartWorkflowExecutionResponse, error) {
request := s.request.StartRequest
if currentWorkflowConditionFailed.RequestID == request.GetRequestId() {
if currentWorkflowConditionFailed.RequestID == s.request.StartRequest.GetRequestId() {
return s.respondToRetriedRequest(ctx, currentWorkflowConditionFailed.RunID)
}

if err := s.verifyNamespaceActive(creationParams, currentWorkflowConditionFailed); err != nil {
return nil, err
}
response, err := s.applyWorkflowIDReusePolicy(ctx, currentWorkflowConditionFailed, creationParams)

response, err := s.resolveDuplicateWorkflowID(ctx, currentWorkflowConditionFailed, creationParams)
if err != nil {
return nil, err
} else if response != nil {
return response, nil
}

if err := s.createAsCurrent(ctx, creationParams, currentWorkflowConditionFailed); err != nil {
return nil, err
}
return s.generateResponse(creationParams.runID, creationParams.workflowTaskInfo, extractHistoryEvents(creationParams.workflowEventBatches))

return s.generateResponse(
creationParams.runID,
creationParams.workflowTaskInfo,
extractHistoryEvents(creationParams.workflowEventBatches),
)
stephanos marked this conversation as resolved.
Show resolved Hide resolved
}

// createAsCurrent creates a new workflow execution and sets it to "current".
Expand Down Expand Up @@ -332,24 +339,21 @@ func (s *Starter) verifyNamespaceActive(creationParams *creationParams, currentW
return nil
}

// applyWorkflowIDReusePolicy applies the workflow ID reuse policy in case a workflow start requests fails with a
// duplicate execution.
// At the time of this writing, the only possible action here is to terminate the current execution in case the start
// request's ID reuse policy is TERMINATE_IF_RUNNING.
stephanos marked this conversation as resolved.
Show resolved Hide resolved
// resolveDuplicateWorkflowID determines how to resolve a duplicate workflow ID.
// Returns non-nil response if an action was required and completed successfully resulting in a newly created execution.
func (s *Starter) applyWorkflowIDReusePolicy(
func (s *Starter) resolveDuplicateWorkflowID(
ctx context.Context,
currentWorkflowConditionFailed *persistence.CurrentWorkflowConditionFailedError,
creationParams *creationParams,
) (*historyservice.StartWorkflowExecutionResponse, error) {
workflowID := s.request.StartRequest.WorkflowId
prevExecutionUpdateAction, err := api.ApplyWorkflowIDReusePolicy(
currentWorkflowConditionFailed.RequestID,
prevExecutionUpdateAction, err := api.ResolveDuplicateWorkflowID(
workflowID,
creationParams.runID,
currentWorkflowConditionFailed.RunID,
currentWorkflowConditionFailed.State,
currentWorkflowConditionFailed.Status,
workflowID,
creationParams.runID,
currentWorkflowConditionFailed.RequestID,
s.request.StartRequest.GetWorkflowIdReusePolicy(),
)
if err != nil {
Expand Down
110 changes: 57 additions & 53 deletions service/history/api/workflow_id_reuse_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,71 +36,75 @@ import (
"go.temporal.io/server/service/history/workflow"
)

// ApplyWorkflowIDReusePolicy returns updateWorkflowActionFunc
// for updating the previous execution and an error if the situation is
// not allowed by the workflowIDReusePolicy.
// Both result may be nil, if the case is to allow and no update is needed
// for the previous execution.
func ApplyWorkflowIDReusePolicy(
prevStartRequestID,
prevRunID string,
prevState enumsspb.WorkflowExecutionState,
prevStatus enumspb.WorkflowExecutionStatus,
stephanos marked this conversation as resolved.
Show resolved Hide resolved
workflowID string,
runID string,
// ResolveDuplicateWorkflowID determines how to resolve a workflow ID duplication upon workflow start according
// to the WorkflowIDReusePolicy.
//
// An action (ie "mitigate and allow"), an error (ie "deny") or neither (ie "allow") is returned.
func ResolveDuplicateWorkflowID(
workflowID,
newRunID,
currentRunID string,
currentState enumsspb.WorkflowExecutionState,
currentStatus enumspb.WorkflowExecutionStatus,
currentStartRequestID string,
stephanos marked this conversation as resolved.
Show resolved Hide resolved
wfIDReusePolicy enumspb.WorkflowIdReusePolicy,
) (UpdateWorkflowActionFunc, error) {

// here we know there is some information about the prev workflow, i.e. either running right now
// or has history check if the this workflow is finished
switch prevState {
case enumsspb.WORKFLOW_EXECUTION_STATE_CREATED,
enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING:
if wfIDReusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING {
return func(workflowLease WorkflowLease) (*UpdateWorkflowAction, error) {
mutableState := workflowLease.GetMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}

return UpdateWorkflowWithoutWorkflowTask, workflow.TerminateWorkflow(
mutableState,
"TerminateIfRunning WorkflowIdReusePolicy Policy",
payloads.EncodeString(
fmt.Sprintf("terminated by new runID: %s", runID),
),
consts.IdentityHistoryService,
false,
)
}, nil
stephanos marked this conversation as resolved.
Show resolved Hide resolved
switch currentState {
// *running* workflow
case enumsspb.WORKFLOW_EXECUTION_STATE_CREATED, enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING:
switch wfIDReusePolicy {
case enumspb.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING:
return terminateWorkflowAction(newRunID)
default:
msg := "Workflow execution is already running. WorkflowId: %v, RunId: %v."
return nil, generateWorkflowAlreadyStartedError(msg, currentStartRequestID, workflowID, currentRunID)
}

msg := "Workflow execution is already running. WorkflowId: %v, RunId: %v."
return nil, generateWorkflowAlreadyStartedError(msg, prevStartRequestID, workflowID, prevRunID)
// *completed* workflow
case enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED:
// previous workflow completed, proceed
switch wfIDReusePolicy {
case enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, enumspb.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING:
// no action or error
case enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY:
if _, ok := consts.FailedWorkflowStatuses[currentStatus]; !ok {
msg := "Workflow execution already finished successfully. WorkflowId: %v, RunId: %v. Workflow Id reuse policy: allow duplicate workflow Id if last run failed."
return nil, generateWorkflowAlreadyStartedError(msg, currentStartRequestID, workflowID, currentRunID)
}
case enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE:
msg := "Workflow execution already finished. WorkflowId: %v, RunId: %v. Workflow Id reuse policy: reject duplicate workflow Id."
return nil, generateWorkflowAlreadyStartedError(msg, currentStartRequestID, workflowID, currentRunID)
default:
return nil, serviceerror.NewInternal(fmt.Sprintf("Failed to process start workflow id reuse policy: %v.", wfIDReusePolicy))
}

default:
// persistence.WorkflowStateZombie or unknown type
return nil, serviceerror.NewInternal(fmt.Sprintf("Failed to process workflow, workflow has invalid state: %v.", prevState))
return nil, serviceerror.NewInternal(fmt.Sprintf("Failed to process workflow, workflow has invalid state: %v.", currentState))
}

switch wfIDReusePolicy {
case enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY:
if _, ok := consts.FailedWorkflowStatuses[prevStatus]; !ok {
msg := "Workflow execution already finished successfully. WorkflowId: %v, RunId: %v. Workflow Id reuse policy: allow duplicate workflow Id if last run failed."
return nil, generateWorkflowAlreadyStartedError(msg, prevStartRequestID, workflowID, prevRunID)
return nil, nil
}

func terminateWorkflowAction(
newRunID string,
) (UpdateWorkflowActionFunc, error) {
return func(workflowLease WorkflowLease) (*UpdateWorkflowAction, error) {
mutableState := workflowLease.GetMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return nil, consts.ErrWorkflowCompleted
}
case enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
enumspb.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING:
// as long as workflow not running, so this case has no check
case enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE:
msg := "Workflow execution already finished. WorkflowId: %v, RunId: %v. Workflow Id reuse policy: reject duplicate workflow Id."
return nil, generateWorkflowAlreadyStartedError(msg, prevStartRequestID, workflowID, prevRunID)
default:
return nil, serviceerror.NewInternal(fmt.Sprintf("Failed to process start workflow reuse policy: %v.", wfIDReusePolicy))
}

return nil, nil
return UpdateWorkflowWithoutWorkflowTask, workflow.TerminateWorkflow(
mutableState,
"TerminateIfRunning WorkflowIdReusePolicy",
stephanos marked this conversation as resolved.
Show resolved Hide resolved
payloads.EncodeString(
fmt.Sprintf("terminated by new runID: %s", newRunID),
),
consts.IdentityHistoryService,
false,
)
}, nil
}

func generateWorkflowAlreadyStartedError(
Expand Down
14 changes: 14 additions & 0 deletions service/history/history_engine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1386,6 +1386,20 @@ func (s *engine2Suite) TestStartWorkflowExecution_Dedup() {
s.NotEqual(prevRunID, resp.GetRunId())
})

s.Run("and id reuse policy is TERMINATE_IF_RUNNING", func() {
s.mockExecutionMgr.EXPECT().CreateWorkflowExecution(gomock.Any(), brandNewExecutionRequest).
Return(nil, makeCurrentWorkflowConditionFailedError(prevRequestID))
s.mockExecutionMgr.EXPECT().CreateWorkflowExecution(gomock.Any(), updateExecutionRequest).
Return(tests.CreateWorkflowExecutionResponse, nil)

resp, err := s.historyEngine.StartWorkflowExecution(
metrics.AddMetricsContext(context.Background()),
makeStartRequest(enumspb.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING))

s.NoError(err)
s.NotEqual(prevRunID, resp.GetRunId())
})

s.Run("and id reuse policy ALLOW_DUPLICATE_FAILED_ONLY", func() {
s.mockExecutionMgr.EXPECT().CreateWorkflowExecution(gomock.Any(), brandNewExecutionRequest).
Return(nil, makeCurrentWorkflowConditionFailedError(prevRequestID))
Expand Down
Loading