From 5c53b4d5b49ae8609b605d8a365f542cbbc54789 Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Fri, 19 Mar 2021 13:50:38 -0700 Subject: [PATCH] Fix potential data race in task processing (#1386) * Make sure task processing logic does not release lock while still holding a reference protected by the lock Mutable state is protected by sync.Mutex against concurrent access. Current implementation of task processing does not really respect the lock & date within the dock, e.g. after earlier release the lock, some logic will hold the reference to the execution info or child workflow --- .../history/timerQueueActiveTaskExecutor.go | 1 + .../history/timerQueueStandbyTaskExecutor.go | 1 + .../transferQueueActiveTaskExecutor.go | 61 +++++++++++++------ .../transferQueueStandbyTaskExecutor.go | 3 +- .../history/visibilityQueueTaskExecutor.go | 24 ++++++-- 5 files changed, 67 insertions(+), 23 deletions(-) diff --git a/service/history/timerQueueActiveTaskExecutor.go b/service/history/timerQueueActiveTaskExecutor.go index 9d5a1e20845..e4033a4bdf1 100644 --- a/service/history/timerQueueActiveTaskExecutor.go +++ b/service/history/timerQueueActiveTaskExecutor.go @@ -443,6 +443,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityRetryTimerTask( } scheduleToStartTimeout := timestamp.DurationValue(activityInfo.ScheduleToStartTimeout) + // NOTE: do not access anything related mutable state after this lock release release(nil) // release earlier as we don't need the lock anymore ctx, cancel := context.WithTimeout(context.Background(), transferActiveTaskDefaultTimeout) diff --git a/service/history/timerQueueStandbyTaskExecutor.go b/service/history/timerQueueStandbyTaskExecutor.go index 71973354a10..4d65cf4f5aa 100644 --- a/service/history/timerQueueStandbyTaskExecutor.go +++ b/service/history/timerQueueStandbyTaskExecutor.go @@ -433,6 +433,7 @@ func (t *timerQueueStandbyTaskExecutor) processTimer( return err } + // NOTE: do not access anything related mutable state after this lock release release(nil) return postActionFn(timerTask, historyResendInfo, t.logger) } diff --git a/service/history/transferQueueActiveTaskExecutor.go b/service/history/transferQueueActiveTaskExecutor.go index c81e4d563f8..76f0ea24d01 100644 --- a/service/history/transferQueueActiveTaskExecutor.go +++ b/service/history/transferQueueActiveTaskExecutor.go @@ -29,6 +29,7 @@ import ( "fmt" "time" + "github.com/gogo/protobuf/proto" "github.com/pborman/uuid" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" @@ -158,6 +159,8 @@ func (t *transferQueueActiveTaskExecutor) processActivityTask( } timeout := timestamp.DurationValue(ai.ScheduleToStartTimeout) + + // NOTE: do not access anything related mutable state after this lock release // release the context lock since we no longer need mutable state builder and // the rest of logic is making RPC call, which takes time. release(nil) @@ -220,6 +223,7 @@ func (t *transferQueueActiveTaskExecutor) processWorkflowTask( taskScheduleToStartTimeoutSeconds = int64(workflowRunTimeout.Round(time.Second).Seconds()) } + // NOTE: do not access anything related mutable state after this lock release // release the context lock since we no longer need mutable state builder and // the rest of logic is making RPC call, which takes time. release(nil) @@ -273,6 +277,7 @@ func (t *transferQueueActiveTaskExecutor) processCloseExecution( workflowCloseTime := wfCloseTime workflowStatus := executionState.Status workflowHistoryLength := mutableState.GetNextEventID() - 1 + taskQueue := executionInfo.TaskQueue startEvent, err := mutableState.GetStartEvent() if err != nil { @@ -280,11 +285,12 @@ func (t *transferQueueActiveTaskExecutor) processCloseExecution( } workflowStartTime := timestamp.TimeValue(startEvent.GetEventTime()) workflowExecutionTime := getWorkflowExecutionTime(mutableState, startEvent) - visibilityMemo := getWorkflowMemo(executionInfo.Memo) - searchAttr := getSearchAttributes(executionInfo.SearchAttributes) + visibilityMemo := getWorkflowMemo(copyMemo(executionInfo.Memo)) + searchAttr := getSearchAttributes(copySearchAttributes(executionInfo.SearchAttributes)) namespace := mutableState.GetNamespaceEntry().GetInfo().Name - children := mutableState.GetPendingChildExecutionInfos() + children := copyChildWorkflowInfos(mutableState.GetPendingChildExecutionInfos()) + // NOTE: do not access anything related mutable state after this lock release // release the context lock since we no longer need mutable state builder and // the rest of logic is making RPC call, which takes time. release(nil) @@ -300,7 +306,7 @@ func (t *transferQueueActiveTaskExecutor) processCloseExecution( workflowHistoryLength, task.GetTaskId(), visibilityMemo, - executionInfo.TaskQueue, + taskQueue, searchAttr, ) if err != nil { @@ -311,7 +317,7 @@ func (t *transferQueueActiveTaskExecutor) processCloseExecution( if replyToParentWorkflow { ctx, cancel := context.WithTimeout(context.Background(), transferActiveTaskDefaultTimeout) defer cancel() - _, err = t.historyClient.RecordChildExecutionCompleted(ctx, &historyservice.RecordChildExecutionCompletedRequest{ + _, err := t.historyClient.RecordChildExecutionCompleted(ctx, &historyservice.RecordChildExecutionCompletedRequest{ NamespaceId: parentNamespaceID, WorkflowExecution: &commonpb.WorkflowExecution{ WorkflowId: parentWorkflowID, @@ -324,17 +330,16 @@ func (t *transferQueueActiveTaskExecutor) processCloseExecution( }, CompletionEvent: completionEvent, }) - - // Check to see if the error is non-transient, in which case reset the error and continue with processing - if _, ok := err.(*serviceerror.NotFound); ok { - err = nil + switch err.(type) { + case nil: + // noop + case *serviceerror.NotFound: + // parent gone, noop + default: + return err } } - if err != nil { - return err - } - return t.processParentClosePolicy(task.GetNamespaceId(), namespace, children) } @@ -514,6 +519,8 @@ func (t *transferQueueActiveTaskExecutor) processSignalExecution( return err } + signalRequestID := signalInfo.GetRequestId() + // release the weContext lock since we no longer need mutable state builder and // the rest of logic is making RPC call, which takes time. release(retError) @@ -526,7 +533,7 @@ func (t *transferQueueActiveTaskExecutor) processSignalExecution( WorkflowId: task.GetTargetWorkflowId(), RunId: task.GetTargetRunId(), }, - RequestId: signalInfo.GetRequestId(), + RequestId: signalRequestID, }) return err } @@ -685,8 +692,10 @@ func (t *transferQueueActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHe executionInfo := mutableState.GetExecutionInfo() executionState := mutableState.GetExecutionState() + executionStatus := executionState.GetStatus() runTimeout := executionInfo.WorkflowRunTimeout wfTypeName := executionInfo.WorkflowTypeName + taskQueue := executionInfo.TaskQueue startEvent, err := mutableState.GetStartEvent() if err != nil { @@ -694,9 +703,10 @@ func (t *transferQueueActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHe } startTimestamp := timestamp.TimeValue(startEvent.GetEventTime()) executionTimestamp := getWorkflowExecutionTime(mutableState, startEvent) - visibilityMemo := getWorkflowMemo(executionInfo.Memo) + visibilityMemo := getWorkflowMemo(copyMemo(executionInfo.Memo)) searchAttr := getSearchAttributes(copySearchAttributes(executionInfo.SearchAttributes)) + // NOTE: do not access anything related mutable state after this lock release // release the context lock since we no longer need mutable state builder and // the rest of logic is making RPC call, which takes time. release(nil) @@ -711,7 +721,7 @@ func (t *transferQueueActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHe executionTimestamp.UnixNano(), runTimeout, task.GetTaskId(), - executionInfo.TaskQueue, + taskQueue, visibilityMemo, searchAttr, ) @@ -725,8 +735,8 @@ func (t *transferQueueActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHe executionTimestamp.UnixNano(), runTimeout, task.GetTaskId(), - executionState.GetStatus(), - executionInfo.TaskQueue, + executionStatus, + taskQueue, visibilityMemo, searchAttr, ) @@ -1434,3 +1444,18 @@ func (t *transferQueueActiveTaskExecutor) applyParentClosePolicy( } } } + +func copyChildWorkflowInfos( + input map[int64]*persistencespb.ChildExecutionInfo, +) map[int64]*persistencespb.ChildExecutionInfo { + + result := make(map[int64]*persistencespb.ChildExecutionInfo) + if input == nil { + return result + } + + for k, v := range input { + result[k] = proto.Clone(v).(*persistencespb.ChildExecutionInfo) + } + return result +} diff --git a/service/history/transferQueueStandbyTaskExecutor.go b/service/history/transferQueueStandbyTaskExecutor.go index 64ea8367c4e..9ff3385046f 100644 --- a/service/history/transferQueueStandbyTaskExecutor.go +++ b/service/history/transferQueueStandbyTaskExecutor.go @@ -459,7 +459,7 @@ func (t *transferQueueStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertH startTime := timestamp.TimeValue(startEvent.GetEventTime()) executionTimestamp := getWorkflowExecutionTime(mutableState, startEvent) visibilityMemo := getWorkflowMemo(executionInfo.Memo) - searchAttr := getSearchAttributes(copySearchAttributes(executionInfo.SearchAttributes)) + searchAttr := getSearchAttributes(executionInfo.SearchAttributes) if isRecordStart { return t.recordWorkflowStarted( @@ -530,6 +530,7 @@ func (t *transferQueueStandbyTaskExecutor) processTransfer( return err } + // NOTE: do not access anything related mutable state after this lock release release(nil) return postActionFn(taskInfo, historyResendInfo, t.logger) } diff --git a/service/history/visibilityQueueTaskExecutor.go b/service/history/visibilityQueueTaskExecutor.go index 92dfcc1c0ca..4c38fdc587b 100644 --- a/service/history/visibilityQueueTaskExecutor.go +++ b/service/history/visibilityQueueTaskExecutor.go @@ -171,12 +171,12 @@ func (t *visibilityQueueTaskExecutor) processStartOrUpsertExecution( } startTimestamp := timestamp.TimeValue(startEvent.GetEventTime()) executionTimestamp := getWorkflowExecutionTime(mutableState, startEvent) - visibilityMemo := getWorkflowMemo(executionInfo.Memo) - // TODO (alex): remove copy? + visibilityMemo := getWorkflowMemo(copyMemo(executionInfo.Memo)) searchAttr := getSearchAttributes(copySearchAttributes(executionInfo.SearchAttributes)) executionStatus := executionState.GetStatus() taskQueue := executionInfo.TaskQueue + // NOTE: do not access anything related mutable state after this lock release // release the context lock since we no longer need mutable state builder and // the rest of logic is making RPC call, which takes time. release(nil) @@ -366,10 +366,11 @@ func (t *visibilityQueueTaskExecutor) processCloseExecution( } workflowStartTime := timestamp.TimeValue(startEvent.GetEventTime()) workflowExecutionTime := getWorkflowExecutionTime(mutableState, startEvent) - visibilityMemo := getWorkflowMemo(executionInfo.Memo) - searchAttr := getSearchAttributes(executionInfo.SearchAttributes) + visibilityMemo := getWorkflowMemo(copyMemo(executionInfo.Memo)) + searchAttr := getSearchAttributes(copySearchAttributes(executionInfo.SearchAttributes)) taskQueue := executionInfo.TaskQueue + // NOTE: do not access anything related mutable state after this lock release // release the context lock since we no longer need mutable state builder and // the rest of logic is making RPC call, which takes time. release(nil) @@ -501,6 +502,21 @@ func getWorkflowMemo( return &commonpb.Memo{Fields: memoFields} } +func copyMemo( + memoFields map[string]*commonpb.Payload, +) map[string]*commonpb.Payload { + + if memoFields == nil { + return nil + } + + result := make(map[string]*commonpb.Payload) + for k, v := range memoFields { + result[k] = proto.Clone(v).(*commonpb.Payload) + } + return result +} + func getSearchAttributes( indexedFields map[string]*commonpb.Payload, ) *commonpb.SearchAttributes {