Skip to content

Commit

Permalink
Adds validation for workflow task for child start and cancel (#599)
Browse files Browse the repository at this point in the history
Adds validation to fail workflow task when child execution is started
and cancelled in the same workflow task.
  • Loading branch information
samarabbas authored Jul 24, 2020
1 parent 79269f9 commit e0fdc8c
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 12 deletions.
171 changes: 171 additions & 0 deletions host/cancelworkflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ package host
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"strconv"
"time"

Expand Down Expand Up @@ -534,3 +536,172 @@ CheckHistoryLoopForCancelSent:

s.True(cancellationSentFailed)
}

func (s *integrationSuite) TestImmediateChildCancellation_WorkflowTaskFailed() {
id := "integration-immediate-child-cancellation-worflow-task-failed-test"
wt := "integration-immediate-child-cancellation-worflow-task-failed-test-type"
tl := "integration-immediate-child-cancellation-worflow-task-failed-test-taskqueue"
childWorkflowID := "integration-immediate-child-cancellation-worflow-task-failed-child-test"
childTaskQueue := "integration-immediate-child-cancellation-worflow-task-failed-child-test-taskqueue"
identity := "worker1"

workflowType := &commonpb.WorkflowType{Name: wt}

taskQueue := &taskqueuepb.TaskQueue{Name: tl}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
Input: nil,
WorkflowRunTimeoutSeconds: 100,
WorkflowTaskTimeoutSeconds: 1,
Identity: identity,
}
we, err0 := s.engine.StartWorkflowExecution(NewContext(), request)
s.NoError(err0)
s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId))

s.engine.RequestCancelWorkflowExecution(NewContext(), &workflowservice.RequestCancelWorkflowExecutionRequest{
Namespace: s.namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: we.RunId,
},
Identity: identity,
RequestId: uuid.New(),
})

childCancelled := false
var initiatedEvent *historypb.HistoryEvent
var requestCancelEvent *historypb.HistoryEvent
var workflowtaskFailedEvent *historypb.HistoryEvent
workflowComplete := false
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
if !childCancelled {
startEvent := history.Events[0]
if startEvent.EventType != enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED {
return nil, errors.New("first event is not workflow execution started")
}

workflowTaskScheduledEvent := history.Events[1]
if workflowTaskScheduledEvent.EventType != enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED {
return nil, errors.New("second event is not workflow task scheduled")
}

cancelRequestedEvent := history.Events[2]
if cancelRequestedEvent.EventType != enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED {
return nil, errors.New("third event is not cancel requested")
}

// Schedule and cancel child workflow in the same decision
childCancelled = true
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, 1)
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_StartChildWorkflowExecutionCommandAttributes{StartChildWorkflowExecutionCommandAttributes: &commandpb.StartChildWorkflowExecutionCommandAttributes{
Namespace: s.namespace,
WorkflowId: childWorkflowID,
WorkflowType: &commonpb.WorkflowType{Name: "childTypeA"},
TaskQueue: &taskqueuepb.TaskQueue{Name: childTaskQueue},
Input: payloads.EncodeBytes(buf.Bytes()),
}},
}, {
CommandType: enumspb.COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_RequestCancelExternalWorkflowExecutionCommandAttributes{RequestCancelExternalWorkflowExecutionCommandAttributes: &commandpb.RequestCancelExternalWorkflowExecutionCommandAttributes{
Namespace: s.namespace,
WorkflowId: childWorkflowID,
ChildWorkflowOnly: true,
}},
}}, nil
}

if previousStartedEventID != 0 {
return nil, errors.New("previous started decision moved unexpectedly after first failed workflow task")
}
// Validate child workflow as cancelled
for _, event := range history.Events[previousStartedEventID:] {
s.Logger.Info(fmt.Sprintf("Processing EventID: %v, Event: %v", event.GetEventId(), event))
switch event.GetEventType() {
case enumspb.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED:
initiatedEvent = event
case enumspb.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED:
requestCancelEvent = event
case enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED:
workflowtaskFailedEvent = event
}
}

if initiatedEvent != nil {
return nil, errors.New("start child workflow command accepted from previous workflow task")
}

if requestCancelEvent != nil {
return nil, errors.New("request cancel command accepted from previous workflow task")
}

if workflowtaskFailedEvent == nil {
return nil, errors.New("workflow task failed event not found due to previous bad commands")
}

taskFailure := workflowtaskFailedEvent.GetWorkflowTaskFailedEventAttributes().Failure
if taskFailure.GetMessage() != "Start and RequestCancel for child workflow is not allowed in same workflow task." {
return nil, errors.New("Unexpected workflow task failure")
}

workflowComplete = true
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("Done"),
}},
}}, nil
}

poller := &TaskPoller{
Engine: s.engine,
Namespace: s.namespace,
TaskQueue: taskQueue,
Identity: identity,
WorkflowTaskHandler: wtHandler,
Logger: s.Logger,
T: s.T(),
}

s.Logger.Info("Process first workflow task which starts and request cancels child workflow")
_, err := poller.PollAndProcessWorkflowTask(false, false)
s.NoError(err)

s.printWorkflowHistory(s.namespace, &commonpb.WorkflowExecution{
WorkflowId: id,
})

s.Logger.Info("Process second workflow task which observes child workflow is cancelled and completes")
_, err = poller.PollAndProcessWorkflowTask(false, false)
s.NoError(err)

s.printWorkflowHistory(s.namespace, &commonpb.WorkflowExecution{
WorkflowId: id,
})

_, err = s.engine.DescribeWorkflowExecution(NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: s.namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: childWorkflowID,
},
})
if err == nil {
s.printWorkflowHistory(s.namespace, &commonpb.WorkflowExecution{
WorkflowId: childWorkflowID,
})
}
s.Logger.Error("Describe error", tag.Error(err))
s.Error(err, "Child workflow execution started instead of getting cancelled")
s.IsType(&serviceerror.NotFound{}, err, "Error is not of type 'NotFound'")

s.True(workflowComplete)
}
4 changes: 4 additions & 0 deletions service/history/commandChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ func (v *commandAttrValidator) validateCancelWorkflowExecutionAttributes(
func (v *commandAttrValidator) validateCancelExternalWorkflowExecutionAttributes(
namespaceID string,
targetNamespaceID string,
initiatedChildExecutionsInSession map[string]struct{},
attributes *commandpb.RequestCancelExternalWorkflowExecutionCommandAttributes,
) error {

Expand All @@ -425,6 +426,9 @@ func (v *commandAttrValidator) validateCancelExternalWorkflowExecutionAttributes
if runID != "" && uuid.Parse(runID) == nil {
return serviceerror.NewInvalidArgument("Invalid RunId set on command.")
}
if _, ok := initiatedChildExecutionsInSession[attributes.GetWorkflowId()]; ok {
return serviceerror.NewInvalidArgument("Start and RequestCancel for child workflow is not allowed in same workflow task.")
}

return nil
}
Expand Down
32 changes: 20 additions & 12 deletions service/history/workflowTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ type (
namespaceEntry *cache.NamespaceCacheEntry

// internal state
hasBufferedEvents bool
failWorkflowTaskInfo *failWorkflowTaskInfo
activityNotStartedCancelled bool
continueAsNewBuilder mutableState
stopProcessing bool // should stop processing any more commands
mutableState mutableState
hasBufferedEvents bool
failWorkflowTaskInfo *failWorkflowTaskInfo
activityNotStartedCancelled bool
continueAsNewBuilder mutableState
stopProcessing bool // should stop processing any more commands
mutableState mutableState
initiatedChildExecutionsInBatch map[string]struct{} // Set of initiated child executions in the workflow task

// validation
attrValidator *commandAttrValidator
Expand Down Expand Up @@ -96,12 +97,13 @@ func newWorkflowTaskHandler(
namespaceEntry: namespaceEntry,

// internal state
hasBufferedEvents: mutableState.HasBufferedEvents(),
failWorkflowTaskInfo: nil,
activityNotStartedCancelled: false,
continueAsNewBuilder: nil,
stopProcessing: false,
mutableState: mutableState,
hasBufferedEvents: mutableState.HasBufferedEvents(),
failWorkflowTaskInfo: nil,
activityNotStartedCancelled: false,
continueAsNewBuilder: nil,
stopProcessing: false,
mutableState: mutableState,
initiatedChildExecutionsInBatch: make(map[string]struct{}),

// validation
attrValidator: attrValidator,
Expand Down Expand Up @@ -582,6 +584,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandRequestCancelExternalWorkfl
return handler.attrValidator.validateCancelExternalWorkflowExecutionAttributes(
namespaceID,
targetNamespaceID,
handler.initiatedChildExecutionsInBatch,
attr,
)
},
Expand All @@ -594,6 +597,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandRequestCancelExternalWorkfl
_, _, err := handler.mutableState.AddRequestCancelExternalWorkflowExecutionInitiatedEvent(
handler.workflowTaskCompletedID, cancelRequestID, attr,
)

return err
}

Expand Down Expand Up @@ -776,6 +780,10 @@ func (handler *workflowTaskHandlerImpl) handleCommandStartChildWorkflow(
_, _, err = handler.mutableState.AddStartChildWorkflowExecutionInitiatedEvent(
handler.workflowTaskCompletedID, requestID, attr,
)
if err == nil {
// Keep track of all child initiated commands in this workflow task to validate request cancel commands
handler.initiatedChildExecutionsInBatch[attr.GetWorkflowId()] = struct{}{}
}
return err
}

Expand Down

0 comments on commit e0fdc8c

Please sign in to comment.