From 2a8c41e03d87414f4483ba5a67d4a79f866ef8e4 Mon Sep 17 00:00:00 2001 From: Samar Abbas Date: Wed, 23 Sep 2020 08:27:29 -0700 Subject: [PATCH 1/3] removed current workflow CAS from persistence used by 2dc only --- .../cassandra/cassandraPersistence.go | 26 +--------- common/persistence/dataInterfaces.go | 12 ----- common/persistence/executionStore.go | 6 +-- .../persistence-tests/persistenceTestBase.go | 5 -- common/persistence/persistenceInterface.go | 4 -- common/persistence/sql/sqlExecutionManager.go | 22 +-------- .../sql/sqlExecutionManagerUtil.go | 47 ------------------- .../nDCTransactionMgrForExistingWorkflow.go | 3 -- ...CTransactionMgrForExistingWorkflow_test.go | 6 --- service/history/workflowExecutionContext.go | 3 -- .../history/workflowExecutionContext_mock.go | 8 ++-- 11 files changed, 7 insertions(+), 135 deletions(-) diff --git a/common/persistence/cassandra/cassandraPersistence.go b/common/persistence/cassandra/cassandraPersistence.go index ce1e6e12c58..671c9f71ad4 100644 --- a/common/persistence/cassandra/cassandraPersistence.go +++ b/common/persistence/cassandra/cassandraPersistence.go @@ -1560,31 +1560,7 @@ func (d *cassandraPersistence) ConflictResolveWorkflowExecution(request *p.Inter return serviceerror.NewInternal(fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Error: %v", err)) } - if request.CurrentWorkflowCAS != nil { - prevRunID = request.CurrentWorkflowCAS.PrevRunID - prevLastWriteVersion := request.CurrentWorkflowCAS.PrevLastWriteVersion - prevState := request.CurrentWorkflowCAS.PrevState - - batch.Query(templateUpdateCurrentWorkflowExecutionForNewQuery, - runID, - executionStateDatablob.Data, - executionStateDatablob.Encoding.String(), - replicationVersions.Data, - replicationVersions.Encoding.String(), - lastWriteVersion, - state, - shardID, - rowTypeExecution, - namespaceID, - workflowID, - permanentRunID, - defaultVisibilityTimestamp, - rowTypeExecutionTaskID, - prevRunID, - prevLastWriteVersion, - prevState, - ) - } else if currentWorkflow != nil { + if currentWorkflow != nil { prevRunID = currentWorkflow.ExecutionInfo.ExecutionState.RunId batch.Query(templateUpdateCurrentWorkflowExecutionQuery, diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index e7a016186eb..af48921d434 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -583,18 +583,6 @@ type ( // current workflow CurrentWorkflowMutation *WorkflowMutation - - // TODO deprecate this once nDC migration is completed - // basically should use CurrentWorkflowMutation instead - CurrentWorkflowCAS *CurrentWorkflowCAS - } - - // CurrentWorkflowCAS represent a compare and swap on current record - // TODO deprecate this once nDC migration is completed - CurrentWorkflowCAS struct { - PrevRunID string - PrevLastWriteVersion int64 - PrevState enumsspb.WorkflowExecutionState } // ResetWorkflowExecutionRequest is used to reset workflow execution state for current run and create new run diff --git a/common/persistence/executionStore.go b/common/persistence/executionStore.go index b565904546b..d9f0b1a8a54 100644 --- a/common/persistence/executionStore.go +++ b/common/persistence/executionStore.go @@ -310,7 +310,7 @@ func (m *executionManagerImpl) ConflictResolveWorkflowExecution( } } - if request.CurrentWorkflowMutation != nil && request.CurrentWorkflowCAS != nil { + if request.CurrentWorkflowMutation != nil { return serviceerror.NewInternal("ConflictResolveWorkflowExecution: current workflow & current workflow CAS both set") } @@ -324,10 +324,6 @@ func (m *executionManagerImpl) ConflictResolveWorkflowExecution( NewWorkflowSnapshot: serializedNewWorkflowMutation, CurrentWorkflowMutation: serializedCurrentWorkflowMutation, - - // TODO deprecate this once nDC migration is completed - // basically should use CurrentWorkflowMutation instead - CurrentWorkflowCAS: request.CurrentWorkflowCAS, } return m.persistence.ConflictResolveWorkflowExecution(newRequest) } diff --git a/common/persistence/persistence-tests/persistenceTestBase.go b/common/persistence/persistence-tests/persistenceTestBase.go index 9879af5563e..34148285325 100644 --- a/common/persistence/persistence-tests/persistenceTestBase.go +++ b/common/persistence/persistence-tests/persistenceTestBase.go @@ -876,11 +876,6 @@ func (s *TestBase) ConflictResolveWorkflowExecution(prevRunID string, prevLastWr requestCancelInfos []*persistenceblobs.RequestCancelInfo, signalInfos []*persistenceblobs.SignalInfo, ids []string) error { return s.ExecutionManager.ConflictResolveWorkflowExecution(&p.ConflictResolveWorkflowExecutionRequest{ RangeID: s.ShardInfo.GetRangeId(), - CurrentWorkflowCAS: &p.CurrentWorkflowCAS{ - PrevRunID: prevRunID, - PrevLastWriteVersion: prevLastWriteVersion, - PrevState: prevState, - }, ResetWorkflowSnapshot: p.WorkflowSnapshot{ ExecutionInfo: info, ExecutionStats: stats, diff --git a/common/persistence/persistenceInterface.go b/common/persistence/persistenceInterface.go index 2b735966018..db1be82c8ee 100644 --- a/common/persistence/persistenceInterface.go +++ b/common/persistence/persistenceInterface.go @@ -241,10 +241,6 @@ type ( // current workflow CurrentWorkflowMutation *InternalWorkflowMutation - - // TODO deprecate this once nDC migration is completed - // basically should use CurrentWorkflowMutation instead - CurrentWorkflowCAS *CurrentWorkflowCAS } // InternalResetWorkflowExecutionRequest is used to reset workflow execution state for Persistence Interface diff --git a/common/persistence/sql/sqlExecutionManager.go b/common/persistence/sql/sqlExecutionManager.go index a87003993b7..479eb10a213 100644 --- a/common/persistence/sql/sqlExecutionManager.go +++ b/common/persistence/sql/sqlExecutionManager.go @@ -569,27 +569,7 @@ func (m *sqlExecutionManager) conflictResolveWorkflowExecutionTx( state := executionInfo.GetExecutionState().State status := executionInfo.ExecutionState.Status - if request.CurrentWorkflowCAS != nil { - prevRunID := primitives.MustParseUUID(request.CurrentWorkflowCAS.PrevRunID) - prevLastWriteVersion := request.CurrentWorkflowCAS.PrevLastWriteVersion - prevState := request.CurrentWorkflowCAS.PrevState - - if err := assertAndUpdateCurrentExecution(tx, - m.shardID, - namespaceID, - workflowID, - runID, - prevRunID, - prevLastWriteVersion, - prevState, - createRequestID, - state, - status, - startVersion, - lastWriteVersion); err != nil { - return serviceerror.NewInternal(fmt.Sprintf("ConflictResolveWorkflowExecution. Failed to comare and swap the current record. Error: %v", err)) - } - } else if currentWorkflow != nil { + if currentWorkflow != nil { prevRunID := primitives.MustParseUUID(currentWorkflow.ExecutionInfo.ExecutionState.RunId) if err := assertRunIDAndUpdateCurrentExecution(tx, diff --git a/common/persistence/sql/sqlExecutionManagerUtil.go b/common/persistence/sql/sqlExecutionManagerUtil.go index 6df52956cd5..8880dca3a1a 100644 --- a/common/persistence/sql/sqlExecutionManagerUtil.go +++ b/common/persistence/sql/sqlExecutionManagerUtil.go @@ -968,53 +968,6 @@ func assertRunIDAndUpdateCurrentExecution( return updateCurrentExecution(tx, shardID, namespaceID, workflowID, newRunID, createRequestID, state, status, startVersion, lastWriteVersion) } -func assertAndUpdateCurrentExecution( - tx sqlplugin.Tx, - shardID int, - namespaceID primitives.UUID, - workflowID string, - newRunID primitives.UUID, - previousRunID primitives.UUID, - previousLastWriteVersion int64, - previousState enumsspb.WorkflowExecutionState, - createRequestID string, - state enumsspb.WorkflowExecutionState, - status enumspb.WorkflowExecutionStatus, - startVersion int64, - lastWriteVersion int64, -) error { - - assertFn := func(currentRow *sqlplugin.CurrentExecutionsRow) error { - if !bytes.Equal(currentRow.RunID, previousRunID) { - return &p.ConditionFailedError{Msg: fmt.Sprintf( - "assertAndUpdateCurrentExecution failed. Current run ID was %v, expected %v", - currentRow.RunID, - previousRunID, - )} - } - if currentRow.LastWriteVersion != previousLastWriteVersion { - return &p.ConditionFailedError{Msg: fmt.Sprintf( - "assertAndUpdateCurrentExecution failed. Current last write version was %v, expected %v", - currentRow.LastWriteVersion, - previousLastWriteVersion, - )} - } - if currentRow.State != previousState { - return &p.ConditionFailedError{Msg: fmt.Sprintf( - "assertAndUpdateCurrentExecution failed. Current state %v, expected %v", - currentRow.State, - previousState, - )} - } - return nil - } - if err := assertCurrentExecution(tx, shardID, namespaceID, workflowID, assertFn); err != nil { - return err - } - - return updateCurrentExecution(tx, shardID, namespaceID, workflowID, newRunID, createRequestID, state, status, startVersion, lastWriteVersion) -} - func assertCurrentExecution( tx sqlplugin.Tx, shardID int, diff --git a/service/history/nDCTransactionMgrForExistingWorkflow.go b/service/history/nDCTransactionMgrForExistingWorkflow.go index bcef8172765..3f7354bf85c 100644 --- a/service/history/nDCTransactionMgrForExistingWorkflow.go +++ b/service/history/nDCTransactionMgrForExistingWorkflow.go @@ -355,7 +355,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) suppressCurrentAndUpdateAsCur currentWorkflow.getContext(), currentWorkflow.getMutableState(), currentWorkflowPolicy.ptr(), - nil, ) } @@ -382,7 +381,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) conflictResolveAsCurrent( nil, nil, nil, - nil, ) } @@ -454,7 +452,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) conflictResolveAsZombie( nil, nil, nil, - nil, ) } diff --git a/service/history/nDCTransactionMgrForExistingWorkflow_test.go b/service/history/nDCTransactionMgrForExistingWorkflow_test.go index 498ede673a8..aec8e84fc99 100644 --- a/service/history/nDCTransactionMgrForExistingWorkflow_test.go +++ b/service/history/nDCTransactionMgrForExistingWorkflow_test.go @@ -198,7 +198,6 @@ func (s *nDCTransactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkf currentContext, currentMutableState, currentWorkflowPolicy.ptr(), - (*persistence.CurrentWorkflowCAS)(nil), ).Return(nil).Times(1) err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) @@ -272,7 +271,6 @@ func (s *nDCTransactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkf currentContext, currentMutableState, currentWorkflowPolicy.ptr(), - (*persistence.CurrentWorkflowCAS)(nil), ).Return(nil).Times(1) err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) @@ -469,7 +467,6 @@ func (s *nDCTransactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkf (workflowExecutionContext)(nil), (mutableState)(nil), (*transactionPolicy)(nil), - (*persistence.CurrentWorkflowCAS)(nil), ).Return(nil).Times(1) err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) @@ -541,7 +538,6 @@ func (s *nDCTransactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkf currentContext, currentMutableState, currentWorkflowPolicy.ptr(), - (*persistence.CurrentWorkflowCAS)(nil), ).Return(nil).Times(1) err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) @@ -614,7 +610,6 @@ func (s *nDCTransactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkf (workflowExecutionContext)(nil), (mutableState)(nil), (*transactionPolicy)(nil), - (*persistence.CurrentWorkflowCAS)(nil), ).Return(nil).Times(1) err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) @@ -687,7 +682,6 @@ func (s *nDCTransactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkf (workflowExecutionContext)(nil), (mutableState)(nil), (*transactionPolicy)(nil), - (*persistence.CurrentWorkflowCAS)(nil), ).Return(nil).Times(1) err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) diff --git a/service/history/workflowExecutionContext.go b/service/history/workflowExecutionContext.go index db7825c4cf6..eb4269a1ac7 100644 --- a/service/history/workflowExecutionContext.go +++ b/service/history/workflowExecutionContext.go @@ -99,7 +99,6 @@ type ( currentContext workflowExecutionContext, currentMutableState mutableState, currentTransactionPolicy *transactionPolicy, - workflowCAS *persistence.CurrentWorkflowCAS, ) error updateWorkflowExecutionAsActive( now time.Time, @@ -414,7 +413,6 @@ func (c *workflowExecutionContextImpl) conflictResolveWorkflowExecution( currentContext workflowExecutionContext, currentMutableState mutableState, currentTransactionPolicy *transactionPolicy, - workflowCAS *persistence.CurrentWorkflowCAS, ) (retError error) { defer func() { @@ -523,7 +521,6 @@ func (c *workflowExecutionContextImpl) conflictResolveWorkflowExecution( CurrentWorkflowMutation: currentWorkflow, - CurrentWorkflowCAS: workflowCAS, // Encoding, this is set by shard context }); err != nil { return err diff --git a/service/history/workflowExecutionContext_mock.go b/service/history/workflowExecutionContext_mock.go index ef098dc80c5..7f6e857aaac 100644 --- a/service/history/workflowExecutionContext_mock.go +++ b/service/history/workflowExecutionContext_mock.go @@ -272,17 +272,17 @@ func (mr *MockworkflowExecutionContextMockRecorder) createWorkflowExecution(newW } // conflictResolveWorkflowExecution mocks base method. -func (m *MockworkflowExecutionContext) conflictResolveWorkflowExecution(now time.Time, conflictResolveMode persistence.ConflictResolveWorkflowMode, resetMutableState mutableState, newContext workflowExecutionContext, newMutableState mutableState, currentContext workflowExecutionContext, currentMutableState mutableState, currentTransactionPolicy *transactionPolicy, workflowCAS *persistence.CurrentWorkflowCAS) error { +func (m *MockworkflowExecutionContext) conflictResolveWorkflowExecution(now time.Time, conflictResolveMode persistence.ConflictResolveWorkflowMode, resetMutableState mutableState, newContext workflowExecutionContext, newMutableState mutableState, currentContext workflowExecutionContext, currentMutableState mutableState, currentTransactionPolicy *transactionPolicy) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "conflictResolveWorkflowExecution", now, conflictResolveMode, resetMutableState, newContext, newMutableState, currentContext, currentMutableState, currentTransactionPolicy, workflowCAS) + ret := m.ctrl.Call(m, "conflictResolveWorkflowExecution", now, conflictResolveMode, resetMutableState, newContext, newMutableState, currentContext, currentMutableState, currentTransactionPolicy) ret0, _ := ret[0].(error) return ret0 } // conflictResolveWorkflowExecution indicates an expected call of conflictResolveWorkflowExecution. -func (mr *MockworkflowExecutionContextMockRecorder) conflictResolveWorkflowExecution(now, conflictResolveMode, resetMutableState, newContext, newMutableState, currentContext, currentMutableState, currentTransactionPolicy, workflowCAS interface{}) *gomock.Call { +func (mr *MockworkflowExecutionContextMockRecorder) conflictResolveWorkflowExecution(now, conflictResolveMode, resetMutableState, newContext, newMutableState, currentContext, currentMutableState, currentTransactionPolicy interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "conflictResolveWorkflowExecution", reflect.TypeOf((*MockworkflowExecutionContext)(nil).conflictResolveWorkflowExecution), now, conflictResolveMode, resetMutableState, newContext, newMutableState, currentContext, currentMutableState, currentTransactionPolicy, workflowCAS) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "conflictResolveWorkflowExecution", reflect.TypeOf((*MockworkflowExecutionContext)(nil).conflictResolveWorkflowExecution), now, conflictResolveMode, resetMutableState, newContext, newMutableState, currentContext, currentMutableState, currentTransactionPolicy) } // updateWorkflowExecutionAsActive mocks base method. From ac017542428d76eeadada20de05121d99c0a69b4 Mon Sep 17 00:00:00 2001 From: Samar Abbas Date: Fri, 25 Sep 2020 09:54:37 -0700 Subject: [PATCH 2/3] remove invalid validation --- common/persistence/executionStore.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/common/persistence/executionStore.go b/common/persistence/executionStore.go index d9f0b1a8a54..047a24b5372 100644 --- a/common/persistence/executionStore.go +++ b/common/persistence/executionStore.go @@ -295,13 +295,16 @@ func (m *executionManagerImpl) ConflictResolveWorkflowExecution( if err != nil { return err } + + if request.CurrentWorkflowMutation == nil { + return serviceerror.NewInternal("ConflictResolveWorkflowExecution: current workflow mutation not set on request") + } var serializedCurrentWorkflowMutation *InternalWorkflowMutation - if request.CurrentWorkflowMutation != nil { - serializedCurrentWorkflowMutation, err = m.SerializeWorkflowMutation(request.CurrentWorkflowMutation) - if err != nil { - return err - } + serializedCurrentWorkflowMutation, err = m.SerializeWorkflowMutation(request.CurrentWorkflowMutation) + if err != nil { + return err } + var serializedNewWorkflowMutation *InternalWorkflowSnapshot if request.NewWorkflowSnapshot != nil { serializedNewWorkflowMutation, err = m.SerializeWorkflowSnapshot(request.NewWorkflowSnapshot) @@ -310,10 +313,6 @@ func (m *executionManagerImpl) ConflictResolveWorkflowExecution( } } - if request.CurrentWorkflowMutation != nil { - return serviceerror.NewInternal("ConflictResolveWorkflowExecution: current workflow & current workflow CAS both set") - } - newRequest := &InternalConflictResolveWorkflowExecutionRequest{ RangeID: request.RangeID, From 37f57ce54c9cfcf66d9f54ea882fd162fed30eba Mon Sep 17 00:00:00 2001 From: Samar Abbas Date: Fri, 25 Sep 2020 10:38:22 -0700 Subject: [PATCH 3/3] revert current workflow mutation nil check --- common/persistence/executionStore.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/common/persistence/executionStore.go b/common/persistence/executionStore.go index 047a24b5372..d47247ad4b0 100644 --- a/common/persistence/executionStore.go +++ b/common/persistence/executionStore.go @@ -27,7 +27,6 @@ package persistence import ( enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" - "go.temporal.io/api/serviceerror" workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/server/api/persistenceblobs/v1" @@ -295,16 +294,13 @@ func (m *executionManagerImpl) ConflictResolveWorkflowExecution( if err != nil { return err } - - if request.CurrentWorkflowMutation == nil { - return serviceerror.NewInternal("ConflictResolveWorkflowExecution: current workflow mutation not set on request") - } var serializedCurrentWorkflowMutation *InternalWorkflowMutation - serializedCurrentWorkflowMutation, err = m.SerializeWorkflowMutation(request.CurrentWorkflowMutation) - if err != nil { - return err + if request.CurrentWorkflowMutation != nil { + serializedCurrentWorkflowMutation, err = m.SerializeWorkflowMutation(request.CurrentWorkflowMutation) + if err != nil { + return err + } } - var serializedNewWorkflowMutation *InternalWorkflowSnapshot if request.NewWorkflowSnapshot != nil { serializedNewWorkflowMutation, err = m.SerializeWorkflowSnapshot(request.NewWorkflowSnapshot)