Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

removed current workflow CAS from persistence used by 2dc only #755

Merged
merged 6 commits into from
Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 1 addition & 25 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -1560,31 +1560,7 @@ func (d *cassandraPersistence) ConflictResolveWorkflowExecution(request *p.Inter
return serviceerror.NewInternal(fmt.Sprintf("ConflictResolveWorkflowExecution operation failed. Error: %v", err))
}

if request.CurrentWorkflowCAS != nil {
prevRunID = request.CurrentWorkflowCAS.PrevRunID
prevLastWriteVersion := request.CurrentWorkflowCAS.PrevLastWriteVersion
prevState := request.CurrentWorkflowCAS.PrevState

batch.Query(templateUpdateCurrentWorkflowExecutionForNewQuery,
runID,
executionStateDatablob.Data,
executionStateDatablob.Encoding.String(),
replicationVersions.Data,
replicationVersions.Encoding.String(),
lastWriteVersion,
state,
shardID,
rowTypeExecution,
namespaceID,
workflowID,
permanentRunID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
prevRunID,
prevLastWriteVersion,
prevState,
)
} else if currentWorkflow != nil {
if currentWorkflow != nil {
prevRunID = currentWorkflow.ExecutionInfo.ExecutionState.RunId

batch.Query(templateUpdateCurrentWorkflowExecutionQuery,
Expand Down
12 changes: 0 additions & 12 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,18 +583,6 @@ type (

// current workflow
CurrentWorkflowMutation *WorkflowMutation

// TODO deprecate this once nDC migration is completed
// basically should use CurrentWorkflowMutation instead
CurrentWorkflowCAS *CurrentWorkflowCAS
}

// CurrentWorkflowCAS represent a compare and swap on current record
// TODO deprecate this once nDC migration is completed
CurrentWorkflowCAS struct {
PrevRunID string
PrevLastWriteVersion int64
PrevState enumsspb.WorkflowExecutionState
}

// ResetWorkflowExecutionRequest is used to reset workflow execution state for current run and create new run
Expand Down
6 changes: 1 addition & 5 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (m *executionManagerImpl) ConflictResolveWorkflowExecution(
}
}

if request.CurrentWorkflowMutation != nil && request.CurrentWorkflowCAS != nil {
if request.CurrentWorkflowMutation != nil {
return serviceerror.NewInternal("ConflictResolveWorkflowExecution: current workflow & current workflow CAS both set")
}

Expand All @@ -324,10 +324,6 @@ func (m *executionManagerImpl) ConflictResolveWorkflowExecution(
NewWorkflowSnapshot: serializedNewWorkflowMutation,

CurrentWorkflowMutation: serializedCurrentWorkflowMutation,

// TODO deprecate this once nDC migration is completed
// basically should use CurrentWorkflowMutation instead
CurrentWorkflowCAS: request.CurrentWorkflowCAS,
}
return m.persistence.ConflictResolveWorkflowExecution(newRequest)
}
Expand Down
5 changes: 0 additions & 5 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,11 +876,6 @@ func (s *TestBase) ConflictResolveWorkflowExecution(prevRunID string, prevLastWr
requestCancelInfos []*persistenceblobs.RequestCancelInfo, signalInfos []*persistenceblobs.SignalInfo, ids []string) error {
return s.ExecutionManager.ConflictResolveWorkflowExecution(&p.ConflictResolveWorkflowExecutionRequest{
RangeID: s.ShardInfo.GetRangeId(),
CurrentWorkflowCAS: &p.CurrentWorkflowCAS{
PrevRunID: prevRunID,
PrevLastWriteVersion: prevLastWriteVersion,
PrevState: prevState,
},
ResetWorkflowSnapshot: p.WorkflowSnapshot{
ExecutionInfo: info,
ExecutionStats: stats,
Expand Down
4 changes: 0 additions & 4 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,6 @@ type (

// current workflow
CurrentWorkflowMutation *InternalWorkflowMutation

// TODO deprecate this once nDC migration is completed
// basically should use CurrentWorkflowMutation instead
CurrentWorkflowCAS *CurrentWorkflowCAS
}

// InternalResetWorkflowExecutionRequest is used to reset workflow execution state for Persistence Interface
Expand Down
22 changes: 1 addition & 21 deletions common/persistence/sql/sqlExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,27 +569,7 @@ func (m *sqlExecutionManager) conflictResolveWorkflowExecutionTx(
state := executionInfo.GetExecutionState().State
status := executionInfo.ExecutionState.Status

if request.CurrentWorkflowCAS != nil {
prevRunID := primitives.MustParseUUID(request.CurrentWorkflowCAS.PrevRunID)
prevLastWriteVersion := request.CurrentWorkflowCAS.PrevLastWriteVersion
prevState := request.CurrentWorkflowCAS.PrevState

if err := assertAndUpdateCurrentExecution(tx,
m.shardID,
namespaceID,
workflowID,
runID,
prevRunID,
prevLastWriteVersion,
prevState,
createRequestID,
state,
status,
startVersion,
lastWriteVersion); err != nil {
return serviceerror.NewInternal(fmt.Sprintf("ConflictResolveWorkflowExecution. Failed to comare and swap the current record. Error: %v", err))
}
} else if currentWorkflow != nil {
if currentWorkflow != nil {
prevRunID := primitives.MustParseUUID(currentWorkflow.ExecutionInfo.ExecutionState.RunId)

if err := assertRunIDAndUpdateCurrentExecution(tx,
Expand Down
47 changes: 0 additions & 47 deletions common/persistence/sql/sqlExecutionManagerUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,53 +968,6 @@ func assertRunIDAndUpdateCurrentExecution(
return updateCurrentExecution(tx, shardID, namespaceID, workflowID, newRunID, createRequestID, state, status, startVersion, lastWriteVersion)
}

func assertAndUpdateCurrentExecution(
tx sqlplugin.Tx,
shardID int,
namespaceID primitives.UUID,
workflowID string,
newRunID primitives.UUID,
previousRunID primitives.UUID,
previousLastWriteVersion int64,
previousState enumsspb.WorkflowExecutionState,
createRequestID string,
state enumsspb.WorkflowExecutionState,
status enumspb.WorkflowExecutionStatus,
startVersion int64,
lastWriteVersion int64,
) error {

assertFn := func(currentRow *sqlplugin.CurrentExecutionsRow) error {
if !bytes.Equal(currentRow.RunID, previousRunID) {
return &p.ConditionFailedError{Msg: fmt.Sprintf(
"assertAndUpdateCurrentExecution failed. Current run ID was %v, expected %v",
currentRow.RunID,
previousRunID,
)}
}
if currentRow.LastWriteVersion != previousLastWriteVersion {
return &p.ConditionFailedError{Msg: fmt.Sprintf(
"assertAndUpdateCurrentExecution failed. Current last write version was %v, expected %v",
currentRow.LastWriteVersion,
previousLastWriteVersion,
)}
}
if currentRow.State != previousState {
return &p.ConditionFailedError{Msg: fmt.Sprintf(
"assertAndUpdateCurrentExecution failed. Current state %v, expected %v",
currentRow.State,
previousState,
)}
}
return nil
}
if err := assertCurrentExecution(tx, shardID, namespaceID, workflowID, assertFn); err != nil {
return err
}

return updateCurrentExecution(tx, shardID, namespaceID, workflowID, newRunID, createRequestID, state, status, startVersion, lastWriteVersion)
}

func assertCurrentExecution(
tx sqlplugin.Tx,
shardID int,
Expand Down
3 changes: 0 additions & 3 deletions service/history/nDCTransactionMgrForExistingWorkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) suppressCurrentAndUpdateAsCur
currentWorkflow.getContext(),
currentWorkflow.getMutableState(),
currentWorkflowPolicy.ptr(),
nil,
)
}

Expand All @@ -382,7 +381,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) conflictResolveAsCurrent(
nil,
nil,
nil,
nil,
)
}

Expand Down Expand Up @@ -454,7 +452,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) conflictResolveAsZombie(
nil,
nil,
nil,
nil,
)
}

Expand Down
6 changes: 0 additions & 6 deletions service/history/nDCTransactionMgrForExistingWorkflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ func (s *nDCTransactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkf
currentContext,
currentMutableState,
currentWorkflowPolicy.ptr(),
(*persistence.CurrentWorkflowCAS)(nil),
).Return(nil).Times(1)

err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow)
Expand Down Expand Up @@ -272,7 +271,6 @@ func (s *nDCTransactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkf
currentContext,
currentMutableState,
currentWorkflowPolicy.ptr(),
(*persistence.CurrentWorkflowCAS)(nil),
).Return(nil).Times(1)

err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow)
Expand Down Expand Up @@ -469,7 +467,6 @@ func (s *nDCTransactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkf
(workflowExecutionContext)(nil),
(mutableState)(nil),
(*transactionPolicy)(nil),
(*persistence.CurrentWorkflowCAS)(nil),
).Return(nil).Times(1)

err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow)
Expand Down Expand Up @@ -541,7 +538,6 @@ func (s *nDCTransactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkf
currentContext,
currentMutableState,
currentWorkflowPolicy.ptr(),
(*persistence.CurrentWorkflowCAS)(nil),
).Return(nil).Times(1)

err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow)
Expand Down Expand Up @@ -614,7 +610,6 @@ func (s *nDCTransactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkf
(workflowExecutionContext)(nil),
(mutableState)(nil),
(*transactionPolicy)(nil),
(*persistence.CurrentWorkflowCAS)(nil),
).Return(nil).Times(1)

err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow)
Expand Down Expand Up @@ -687,7 +682,6 @@ func (s *nDCTransactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkf
(workflowExecutionContext)(nil),
(mutableState)(nil),
(*transactionPolicy)(nil),
(*persistence.CurrentWorkflowCAS)(nil),
).Return(nil).Times(1)

err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow)
Expand Down
3 changes: 0 additions & 3 deletions service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ type (
currentContext workflowExecutionContext,
currentMutableState mutableState,
currentTransactionPolicy *transactionPolicy,
workflowCAS *persistence.CurrentWorkflowCAS,
) error
updateWorkflowExecutionAsActive(
now time.Time,
Expand Down Expand Up @@ -414,7 +413,6 @@ func (c *workflowExecutionContextImpl) conflictResolveWorkflowExecution(
currentContext workflowExecutionContext,
currentMutableState mutableState,
currentTransactionPolicy *transactionPolicy,
workflowCAS *persistence.CurrentWorkflowCAS,
) (retError error) {

defer func() {
Expand Down Expand Up @@ -523,7 +521,6 @@ func (c *workflowExecutionContextImpl) conflictResolveWorkflowExecution(

CurrentWorkflowMutation: currentWorkflow,

CurrentWorkflowCAS: workflowCAS,
// Encoding, this is set by shard context
}); err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions service/history/workflowExecutionContext_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.