Skip to content

Commit

Permalink
Unify cassandra error conversion (#1345)
Browse files Browse the repository at this point in the history
* Unify cassandra error conversion
* Fix initialization issue of new gocql wrapper
  • Loading branch information
wxing1292 authored Mar 3, 2021
1 parent 03bbfc2 commit ad8342b
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 305 deletions.
8 changes: 4 additions & 4 deletions common/persistence/cassandra/cassandraClusterMetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (m *cassandraClusterMetadata) GetClusterMetadata() (*p.InternalGetClusterMe
var version int64
err := query.Scan(&clusterMetadata, &encoding, &version)
if err != nil {
return nil, convertCommonErrors("GetClusterMetadata", err)
return nil, gocql.ConvertError("GetClusterMetadata", err)
}
return &p.InternalGetClusterMetadataResponse{
ClusterMetadata: p.NewDataBlob(clusterMetadata, encoding),
Expand Down Expand Up @@ -130,7 +130,7 @@ func (m *cassandraClusterMetadata) SaveClusterMetadata(request *p.InternalSaveCl
previous := make(map[string]interface{})
applied, err := query.MapScanCAS(previous)
if err != nil {
return false, convertCommonErrors("SaveClusterMetadata", err)
return false, gocql.ConvertError("SaveClusterMetadata", err)
}
if !applied {
return false, serviceerror.NewInternal("SaveClusterMetadata operation encountered concurrent write.")
Expand Down Expand Up @@ -212,7 +212,7 @@ func (m *cassandraClusterMetadata) GetClusterMembers(request *p.GetClusterMember
}

if err := iter.Close(); err != nil {
return nil, convertCommonErrors("GetClusterMembers", err)
return nil, gocql.ConvertError("GetClusterMembers", err)
}

return &p.GetClusterMembersResponse{ActiveMembers: clusterMembers, NextPageToken: pagingTokenCopy}, nil
Expand All @@ -224,7 +224,7 @@ func (m *cassandraClusterMetadata) UpsertClusterMembership(request *p.UpsertClus
err := query.Exec()

if err != nil {
return convertCommonErrors("UpsertClusterMembership", err)
return gocql.ConvertError("UpsertClusterMembership", err)
}

return nil
Expand Down
19 changes: 0 additions & 19 deletions common/persistence/cassandra/cassandraCommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ package cassandra

import (
"fmt"

"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql"
)

type (
Expand Down Expand Up @@ -69,17 +64,3 @@ func newPersistedTypeMismatchError(
Msg: fmt.Sprintf("Field '%s' is of type '%T' but expected type '%T' in payload - '%v'",
fieldName, received, expectedType, payload)}
}

func convertCommonErrors(
operation string,
err error,
) error {
if gocql.IsNotFoundError(err) {
return serviceerror.NewNotFound(fmt.Sprintf("%v failed. Error: %v ", operation, err))
} else if gocql.IsTimeoutError(err) {
return &persistence.TimeoutError{Msg: fmt.Sprintf("%v timed out. Error: %v", operation, err)}
} else if gocql.IsThrottlingError(err) {
return serviceerror.NewResourceExhausted(fmt.Sprintf("%v operation failed. Error: %v", operation, err))
}
return serviceerror.NewInternal(fmt.Sprintf("%v operation failed. Error: %v", operation, err))
}
16 changes: 8 additions & 8 deletions common/persistence/cassandra/cassandraHistoryPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (h *cassandraHistoryV2Persistence) AppendHistoryNodes(
query := h.session.Query(v2templateUpsertData,
branchInfo.TreeId, branchInfo.BranchId, request.NodeID, request.TransactionID, request.Events.Data, request.Events.EncodingType.String())
if err := query.Exec(); err != nil {
return convertCommonErrors("AppendHistoryNodes", err)
return gocql.ConvertError("AppendHistoryNodes", err)
}
return nil
}
Expand All @@ -124,7 +124,7 @@ func (h *cassandraHistoryV2Persistence) AppendHistoryNodes(
Info: request.Info,
})
if err != nil {
return convertCommonErrors("AppendHistoryNodes", err)
return gocql.ConvertError("AppendHistoryNodes", err)
}

batch := h.session.NewBatch(gocql.LoggedBatch)
Expand All @@ -133,7 +133,7 @@ func (h *cassandraHistoryV2Persistence) AppendHistoryNodes(
batch.Query(v2templateUpsertData,
branchInfo.TreeId, branchInfo.BranchId, request.NodeID, request.TransactionID, request.Events.Data, request.Events.EncodingType.String())
if err = h.session.ExecuteBatch(batch); err != nil {
return convertCommonErrors("AppendHistoryNodes", err)
return gocql.ConvertError("AppendHistoryNodes", err)
}
return nil
}
Expand Down Expand Up @@ -318,7 +318,7 @@ func (h *cassandraHistoryV2Persistence) ForkHistoryBranch(
query := h.session.Query(v2templateInsertTree, cqlTreeID, cqlNewBranchID, datablob.Data, datablob.EncodingType.String())
err = query.Exec()
if err != nil {
return nil, convertCommonErrors("ForkHistoryBranch", err)
return nil, gocql.ConvertError("ForkHistoryBranch", err)
}

return &p.InternalForkHistoryBranchResponse{
Expand Down Expand Up @@ -378,7 +378,7 @@ func (h *cassandraHistoryV2Persistence) DeleteHistoryBranch(

err = h.session.ExecuteBatch(batch)
if err != nil {
return convertCommonErrors("DeleteHistoryBranch", err)
return gocql.ConvertError("DeleteHistoryBranch", err)
}
return nil
}
Expand Down Expand Up @@ -417,7 +417,7 @@ func (h *cassandraHistoryV2Persistence) GetAllHistoryTreeBranches(
for iter.Scan(&treeUUID, &branchUUID, &data, &encoding) {
hti, err := serialization.HistoryTreeInfoFromBlob(data, encoding)
if err != nil {
return nil, convertCommonErrors("GetAllHistoryTreeBranches", err)
return nil, gocql.ConvertError("GetAllHistoryTreeBranches", err)
}

branchDetail := p.HistoryBranchDetail{
Expand Down Expand Up @@ -475,7 +475,7 @@ func (h *cassandraHistoryV2Persistence) GetHistoryTree(
for iter.Scan(&branchUUID, &data, &encoding) {
br, err := serialization.HistoryTreeInfoFromBlob(data, encoding)
if err != nil {
return nil, convertCommonErrors("GetHistoryTree", err)
return nil, gocql.ConvertError("GetHistoryTree", err)
}

branches = append(branches, br.BranchInfo)
Expand All @@ -486,7 +486,7 @@ func (h *cassandraHistoryV2Persistence) GetHistoryTree(
}

if err := iter.Close(); err != nil {
return nil, convertCommonErrors("GetHistoryTree", err)
return nil, gocql.ConvertError("GetHistoryTree", err)
}

if len(pagingToken) == 0 {
Expand Down
Loading

0 comments on commit ad8342b

Please sign in to comment.