Skip to content

Commit

Permalink
Fix potential data race in task processing (#1386)
Browse files Browse the repository at this point in the history
* Make sure task processing logic does not release lock while still holding a reference protected by the lock

Mutable state is protected by sync.Mutex against concurrent access.
Current implementation of task processing does not really respect the lock & date within the dock, e.g. after earlier release the lock, some logic will hold the reference to the execution info or child workflow
  • Loading branch information
wxing1292 authored Mar 19, 2021
1 parent 3331307 commit 5c53b4d
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 23 deletions.
1 change: 1 addition & 0 deletions service/history/timerQueueActiveTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityRetryTimerTask(
}
scheduleToStartTimeout := timestamp.DurationValue(activityInfo.ScheduleToStartTimeout)

// NOTE: do not access anything related mutable state after this lock release
release(nil) // release earlier as we don't need the lock anymore

ctx, cancel := context.WithTimeout(context.Background(), transferActiveTaskDefaultTimeout)
Expand Down
1 change: 1 addition & 0 deletions service/history/timerQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ func (t *timerQueueStandbyTaskExecutor) processTimer(
return err
}

// NOTE: do not access anything related mutable state after this lock release
release(nil)
return postActionFn(timerTask, historyResendInfo, t.logger)
}
Expand Down
61 changes: 43 additions & 18 deletions service/history/transferQueueActiveTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"fmt"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pborman/uuid"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
Expand Down Expand Up @@ -158,6 +159,8 @@ func (t *transferQueueActiveTaskExecutor) processActivityTask(
}

timeout := timestamp.DurationValue(ai.ScheduleToStartTimeout)

// NOTE: do not access anything related mutable state after this lock release
// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release(nil)
Expand Down Expand Up @@ -220,6 +223,7 @@ func (t *transferQueueActiveTaskExecutor) processWorkflowTask(
taskScheduleToStartTimeoutSeconds = int64(workflowRunTimeout.Round(time.Second).Seconds())
}

// NOTE: do not access anything related mutable state after this lock release
// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release(nil)
Expand Down Expand Up @@ -273,18 +277,20 @@ func (t *transferQueueActiveTaskExecutor) processCloseExecution(
workflowCloseTime := wfCloseTime
workflowStatus := executionState.Status
workflowHistoryLength := mutableState.GetNextEventID() - 1
taskQueue := executionInfo.TaskQueue

startEvent, err := mutableState.GetStartEvent()
if err != nil {
return err
}
workflowStartTime := timestamp.TimeValue(startEvent.GetEventTime())
workflowExecutionTime := getWorkflowExecutionTime(mutableState, startEvent)
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := getSearchAttributes(executionInfo.SearchAttributes)
visibilityMemo := getWorkflowMemo(copyMemo(executionInfo.Memo))
searchAttr := getSearchAttributes(copySearchAttributes(executionInfo.SearchAttributes))
namespace := mutableState.GetNamespaceEntry().GetInfo().Name
children := mutableState.GetPendingChildExecutionInfos()
children := copyChildWorkflowInfos(mutableState.GetPendingChildExecutionInfos())

// NOTE: do not access anything related mutable state after this lock release
// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release(nil)
Expand All @@ -300,7 +306,7 @@ func (t *transferQueueActiveTaskExecutor) processCloseExecution(
workflowHistoryLength,
task.GetTaskId(),
visibilityMemo,
executionInfo.TaskQueue,
taskQueue,
searchAttr,
)
if err != nil {
Expand All @@ -311,7 +317,7 @@ func (t *transferQueueActiveTaskExecutor) processCloseExecution(
if replyToParentWorkflow {
ctx, cancel := context.WithTimeout(context.Background(), transferActiveTaskDefaultTimeout)
defer cancel()
_, err = t.historyClient.RecordChildExecutionCompleted(ctx, &historyservice.RecordChildExecutionCompletedRequest{
_, err := t.historyClient.RecordChildExecutionCompleted(ctx, &historyservice.RecordChildExecutionCompletedRequest{
NamespaceId: parentNamespaceID,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: parentWorkflowID,
Expand All @@ -324,17 +330,16 @@ func (t *transferQueueActiveTaskExecutor) processCloseExecution(
},
CompletionEvent: completionEvent,
})

// Check to see if the error is non-transient, in which case reset the error and continue with processing
if _, ok := err.(*serviceerror.NotFound); ok {
err = nil
switch err.(type) {
case nil:
// noop
case *serviceerror.NotFound:
// parent gone, noop
default:
return err
}
}

if err != nil {
return err
}

return t.processParentClosePolicy(task.GetNamespaceId(), namespace, children)
}

Expand Down Expand Up @@ -514,6 +519,8 @@ func (t *transferQueueActiveTaskExecutor) processSignalExecution(
return err
}

signalRequestID := signalInfo.GetRequestId()

// release the weContext lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release(retError)
Expand All @@ -526,7 +533,7 @@ func (t *transferQueueActiveTaskExecutor) processSignalExecution(
WorkflowId: task.GetTargetWorkflowId(),
RunId: task.GetTargetRunId(),
},
RequestId: signalInfo.GetRequestId(),
RequestId: signalRequestID,
})
return err
}
Expand Down Expand Up @@ -685,18 +692,21 @@ func (t *transferQueueActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHe

executionInfo := mutableState.GetExecutionInfo()
executionState := mutableState.GetExecutionState()
executionStatus := executionState.GetStatus()
runTimeout := executionInfo.WorkflowRunTimeout
wfTypeName := executionInfo.WorkflowTypeName
taskQueue := executionInfo.TaskQueue

startEvent, err := mutableState.GetStartEvent()
if err != nil {
return err
}
startTimestamp := timestamp.TimeValue(startEvent.GetEventTime())
executionTimestamp := getWorkflowExecutionTime(mutableState, startEvent)
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
visibilityMemo := getWorkflowMemo(copyMemo(executionInfo.Memo))
searchAttr := getSearchAttributes(copySearchAttributes(executionInfo.SearchAttributes))

// NOTE: do not access anything related mutable state after this lock release
// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release(nil)
Expand All @@ -711,7 +721,7 @@ func (t *transferQueueActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHe
executionTimestamp.UnixNano(),
runTimeout,
task.GetTaskId(),
executionInfo.TaskQueue,
taskQueue,
visibilityMemo,
searchAttr,
)
Expand All @@ -725,8 +735,8 @@ func (t *transferQueueActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHe
executionTimestamp.UnixNano(),
runTimeout,
task.GetTaskId(),
executionState.GetStatus(),
executionInfo.TaskQueue,
executionStatus,
taskQueue,
visibilityMemo,
searchAttr,
)
Expand Down Expand Up @@ -1434,3 +1444,18 @@ func (t *transferQueueActiveTaskExecutor) applyParentClosePolicy(
}
}
}

func copyChildWorkflowInfos(
input map[int64]*persistencespb.ChildExecutionInfo,
) map[int64]*persistencespb.ChildExecutionInfo {

result := make(map[int64]*persistencespb.ChildExecutionInfo)
if input == nil {
return result
}

for k, v := range input {
result[k] = proto.Clone(v).(*persistencespb.ChildExecutionInfo)
}
return result
}
3 changes: 2 additions & 1 deletion service/history/transferQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func (t *transferQueueStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertH
startTime := timestamp.TimeValue(startEvent.GetEventTime())
executionTimestamp := getWorkflowExecutionTime(mutableState, startEvent)
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := getSearchAttributes(copySearchAttributes(executionInfo.SearchAttributes))
searchAttr := getSearchAttributes(executionInfo.SearchAttributes)

if isRecordStart {
return t.recordWorkflowStarted(
Expand Down Expand Up @@ -530,6 +530,7 @@ func (t *transferQueueStandbyTaskExecutor) processTransfer(
return err
}

// NOTE: do not access anything related mutable state after this lock release
release(nil)
return postActionFn(taskInfo, historyResendInfo, t.logger)
}
Expand Down
24 changes: 20 additions & 4 deletions service/history/visibilityQueueTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,12 @@ func (t *visibilityQueueTaskExecutor) processStartOrUpsertExecution(
}
startTimestamp := timestamp.TimeValue(startEvent.GetEventTime())
executionTimestamp := getWorkflowExecutionTime(mutableState, startEvent)
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
// TODO (alex): remove copy?
visibilityMemo := getWorkflowMemo(copyMemo(executionInfo.Memo))
searchAttr := getSearchAttributes(copySearchAttributes(executionInfo.SearchAttributes))
executionStatus := executionState.GetStatus()
taskQueue := executionInfo.TaskQueue

// NOTE: do not access anything related mutable state after this lock release
// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release(nil)
Expand Down Expand Up @@ -366,10 +366,11 @@ func (t *visibilityQueueTaskExecutor) processCloseExecution(
}
workflowStartTime := timestamp.TimeValue(startEvent.GetEventTime())
workflowExecutionTime := getWorkflowExecutionTime(mutableState, startEvent)
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := getSearchAttributes(executionInfo.SearchAttributes)
visibilityMemo := getWorkflowMemo(copyMemo(executionInfo.Memo))
searchAttr := getSearchAttributes(copySearchAttributes(executionInfo.SearchAttributes))
taskQueue := executionInfo.TaskQueue

// NOTE: do not access anything related mutable state after this lock release
// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release(nil)
Expand Down Expand Up @@ -501,6 +502,21 @@ func getWorkflowMemo(
return &commonpb.Memo{Fields: memoFields}
}

func copyMemo(
memoFields map[string]*commonpb.Payload,
) map[string]*commonpb.Payload {

if memoFields == nil {
return nil
}

result := make(map[string]*commonpb.Payload)
for k, v := range memoFields {
result[k] = proto.Clone(v).(*commonpb.Payload)
}
return result
}

func getSearchAttributes(
indexedFields map[string]*commonpb.Payload,
) *commonpb.SearchAttributes {
Expand Down

0 comments on commit 5c53b4d

Please sign in to comment.