Skip to content

Commit

Permalink
Fixes blockage of Transfer Queue if UpsertSearchAttributes is invoked…
Browse files Browse the repository at this point in the history
… on a Temporal Server w/o Elastic Search #727

The current implementations of UpsertWorkflowExecution on the Cassandra/SQL visibility persistence stores return a Service Error since Query Operations are not supported for those stores.

The problem is that we do allow customers to invoke UpsertWorkflowExecution successfully even if Elastic Search isn't enabled. This results in our Transfer Task Queues getting clogged because it keeps retrying a persistence operation that will always fail.

The fix is to just have all attempts to UpsertWorkflowExecution on Cassandra/SQL to function as a no-op so that it does not "fail" the transfer task perpetually. We will still fail with explicit errors on List/Scan/Count, so the user should be able to easily tell that their application logic is broken if it does depend on Elastic Search.

The change was verified as follows:

Existing Unit test was modified to ensure that no-op is always returned for UpsertWorkflowExecution
Running Bench Test on Temporal Server w/o ES cluster stopped spamming the "Critical error processing task" error log once this change was made.

This is a very low risk change.
  • Loading branch information
mastermanu authored Sep 15, 2020
1 parent f286feb commit a09867d
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,8 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed(
}

func (v *cassandraVisibilityPersistence) UpsertWorkflowExecution(
request *p.InternalUpsertWorkflowExecutionRequest) error {
if p.IsNopUpsertWorkflowRequest(request) {
return nil
}
return p.NewOperationNotSupportErrorForVis()
_ *p.InternalUpsertWorkflowExecutionRequest) error {
return nil
}

func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,10 @@ func (s *VisibilityPersistenceSuite) TestUpsertWorkflowExecution() {
Memo: nil,
SearchAttributes: nil,
},
expected: p.NewOperationNotSupportErrorForVis(),
// To avoid blocking the task queue processors on non-ElasticSearch visibility stores
// we simply treat any attempts to perform Upserts as "no-ops"
// Attempts to Scan, Count or List will still fail for non-ES stores.
expected: nil,
},
}

Expand Down
7 changes: 2 additions & 5 deletions common/persistence/sql/sqlVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,8 @@ func (s *sqlVisibilityStore) RecordWorkflowExecutionClosed(request *p.InternalRe
return nil
}

func (s *sqlVisibilityStore) UpsertWorkflowExecution(request *p.InternalUpsertWorkflowExecutionRequest) error {
if p.IsNopUpsertWorkflowRequest(request) {
return nil
}
return p.NewOperationNotSupportErrorForVis()
func (s *sqlVisibilityStore) UpsertWorkflowExecution(_ *p.InternalUpsertWorkflowExecutionRequest) error {
return nil
}

func (s *sqlVisibilityStore) ListOpenWorkflowExecutions(request *p.ListWorkflowExecutionsRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
Expand Down

0 comments on commit a09867d

Please sign in to comment.