Skip to content

Commit

Permalink
Convert ChildExecutionInfo Persistence POGOs to Proto (#616)
Browse files Browse the repository at this point in the history
  • Loading branch information
shawnhathaway authored Jul 28, 2020
1 parent 36643c5 commit f2bf9b2
Show file tree
Hide file tree
Showing 29 changed files with 547 additions and 814 deletions.
777 changes: 341 additions & 436 deletions api/persistenceblobs/v1/message.pb.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -1148,15 +1148,15 @@ func (d *cassandraPersistence) GetWorkflowExecution(request *p.GetWorkflowExecut
}
state.TimerInfos = timerInfos

childExecutionInfos := make(map[int64]*p.InternalChildExecutionInfo)
childExecutionInfos := make(map[int64]*persistenceblobs.ChildExecutionInfo)
cMap := result["child_executions_map"].(map[int64][]byte)
cMapEncoding := result["child_executions_map_encoding"].(string)
for key, value := range cMap {
cInfo, err := serialization.ChildExecutionInfoFromBlob(value, cMapEncoding)
if err != nil {
return nil, err
}
childExecutionInfos[key] = p.ProtoChildExecutionInfoToInternal(cInfo)
childExecutionInfos[key] = cInfo
}
state.ChildExecutionInfos = childExecutionInfos

Expand Down
24 changes: 8 additions & 16 deletions common/persistence/cassandra/cassandraPersistenceUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,7 @@ func resetTimerInfos(

func updateChildExecutionInfos(
batch *gocql.Batch,
childExecutionInfos []*p.InternalChildExecutionInfo,
childExecutionInfos []*persistenceblobs.ChildExecutionInfo,
deleteInfo *int64,
shardID int,
namespaceID string,
Expand All @@ -1237,17 +1237,13 @@ func updateChildExecutionInfos(
) error {

for _, c := range childExecutionInfos {
if c.StartedEvent != nil && c.InitiatedEvent.Encoding != c.StartedEvent.Encoding {
return p.NewSerializationError(fmt.Sprintf("expect to have the same encoding, but %v != %v", c.InitiatedEvent.Encoding, c.StartedEvent.Encoding))
}

datablob, err := serialization.ChildExecutionInfoToBlob(c.ToProto())
datablob, err := serialization.ChildExecutionInfoToBlob(c)
if err != nil {
return nil
}

batch.Query(templateUpdateChildExecutionInfoQuery,
c.InitiatedID,
c.InitiatedId,
datablob.Data,
datablob.Encoding,
shardID,
Expand Down Expand Up @@ -1276,7 +1272,7 @@ func updateChildExecutionInfos(

func resetChildExecutionInfos(
batch *gocql.Batch,
childExecutionInfos []*p.InternalChildExecutionInfo,
childExecutionInfos []*persistenceblobs.ChildExecutionInfo,
shardID int,
namespaceID string,
workflowID string,
Expand Down Expand Up @@ -1606,21 +1602,17 @@ func resetTimerInfoMap(
}

func resetChildExecutionInfoMap(
childExecutionInfos []*p.InternalChildExecutionInfo,
childExecutionInfos []*persistenceblobs.ChildExecutionInfo,
) (map[int64][]byte, common.EncodingType, error) {

cMap := make(map[int64][]byte)
encoding := common.EncodingTypeUnknown
for _, c := range childExecutionInfos {
if c.StartedEvent != nil && c.InitiatedEvent.Encoding != c.StartedEvent.Encoding {
return nil, common.EncodingTypeUnknown, p.NewSerializationError(fmt.Sprintf("expect to have the same encoding, but %v != %v", c.InitiatedEvent.Encoding, c.StartedEvent.Encoding))
}

datablob, err := serialization.ChildExecutionInfoToBlob(c.ToProto())
datablob, err := serialization.ChildExecutionInfoToBlob(c)
if err != nil {
return nil, common.EncodingTypeUnknown, p.NewSerializationError(fmt.Sprintf("failed to serialize child execution infos - Execution: %v", c.InitiatedID))
return nil, common.EncodingTypeUnknown, p.NewSerializationError(fmt.Sprintf("failed to serialize child execution infos - Execution: %v", c.InitiatedId))
}
cMap[c.InitiatedID] = datablob.Data
cMap[c.InitiatedId] = datablob.Data
encoding = datablob.Encoding
}

Expand Down
22 changes: 3 additions & 19 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ type (
WorkflowMutableState struct {
ActivityInfos map[int64]*persistenceblobs.ActivityInfo
TimerInfos map[string]*persistenceblobs.TimerInfo
ChildExecutionInfos map[int64]*ChildExecutionInfo
ChildExecutionInfos map[int64]*persistenceblobs.ChildExecutionInfo
RequestCancelInfos map[int64]*persistenceblobs.RequestCancelInfo
SignalInfos map[int64]*persistenceblobs.SignalInfo
SignalRequestedIDs map[string]struct{}
Expand All @@ -506,22 +506,6 @@ type (
TaskStatus int64
}

// ChildExecutionInfo has details for pending child executions.
ChildExecutionInfo struct {
Version int64
InitiatedID int64
InitiatedEventBatchID int64
InitiatedEvent *historypb.HistoryEvent
StartedID int64
StartedWorkflowID string
StartedRunID string
StartedEvent *historypb.HistoryEvent
CreateRequestID string
Namespace string
WorkflowTypeName string
ParentClosePolicy enumspb.ParentClosePolicy
}

// CreateShardRequest is used to create a shard in executions table
CreateShardRequest struct {
ShardInfo *persistenceblobs.ShardInfo
Expand Down Expand Up @@ -682,7 +666,7 @@ type (
DeleteActivityInfos []int64
UpsertTimerInfos []*persistenceblobs.TimerInfo
DeleteTimerInfos []string
UpsertChildExecutionInfos []*ChildExecutionInfo
UpsertChildExecutionInfos []*persistenceblobs.ChildExecutionInfo
DeleteChildExecutionInfo *int64
UpsertRequestCancelInfos []*persistenceblobs.RequestCancelInfo
DeleteRequestCancelInfo *int64
Expand Down Expand Up @@ -710,7 +694,7 @@ type (

ActivityInfos []*persistenceblobs.ActivityInfo
TimerInfos []*persistenceblobs.TimerInfo
ChildExecutionInfos []*ChildExecutionInfo
ChildExecutionInfos []*persistenceblobs.ChildExecutionInfo
RequestCancelInfos []*persistenceblobs.RequestCancelInfo
SignalInfos []*persistenceblobs.SignalInfo
SignalRequestedIDs []string
Expand Down
101 changes: 10 additions & 91 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,17 @@ func (m *executionManagerImpl) GetWorkflowExecution(
}
newResponse := &GetWorkflowExecutionResponse{
State: &WorkflowMutableState{
ActivityInfos: response.State.ActivityInfos,
TimerInfos: response.State.TimerInfos,
RequestCancelInfos: response.State.RequestCancelInfos,
SignalInfos: response.State.SignalInfos,
SignalRequestedIDs: response.State.SignalRequestedIDs,
ReplicationState: response.State.ReplicationState,
Checksum: response.State.Checksum,
ActivityInfos: response.State.ActivityInfos,
TimerInfos: response.State.TimerInfos,
RequestCancelInfos: response.State.RequestCancelInfos,
SignalInfos: response.State.SignalInfos,
SignalRequestedIDs: response.State.SignalRequestedIDs,
ReplicationState: response.State.ReplicationState,
Checksum: response.State.Checksum,
ChildExecutionInfos: response.State.ChildExecutionInfos,
},
}

newResponse.State.ChildExecutionInfos, err = m.DeserializeChildExecutionInfos(response.State.ChildExecutionInfos)
if err != nil {
return nil, err
}
newResponse.State.BufferedEvents, err = m.DeserializeBufferedEvents(response.State.BufferedEvents)
if err != nil {
return nil, err
Expand Down Expand Up @@ -201,41 +198,6 @@ func (m *executionManagerImpl) DeserializeBufferedEvents(
return events, nil
}

func (m *executionManagerImpl) DeserializeChildExecutionInfos(
infos map[int64]*InternalChildExecutionInfo,
) (map[int64]*ChildExecutionInfo, error) {

newInfos := make(map[int64]*ChildExecutionInfo, 0)
for k, v := range infos {
initiatedEvent, err := m.serializer.DeserializeEvent(v.InitiatedEvent)
if err != nil {
return nil, err
}
startedEvent, err := m.serializer.DeserializeEvent(v.StartedEvent)
if err != nil {
return nil, err
}
c := &ChildExecutionInfo{
InitiatedEvent: initiatedEvent,
StartedEvent: startedEvent,

Version: v.Version,
InitiatedID: v.InitiatedID,
InitiatedEventBatchID: v.InitiatedEventBatchID,
StartedID: v.StartedID,
StartedWorkflowID: v.StartedWorkflowID,
StartedRunID: v.StartedRunID,
CreateRequestID: v.CreateRequestID,
Namespace: v.Namespace,
WorkflowTypeName: v.WorkflowTypeName,
ParentClosePolicy: v.ParentClosePolicy,
}

newInfos[k] = c
}
return newInfos, nil
}

func (m *executionManagerImpl) UpdateWorkflowExecution(
request *UpdateWorkflowExecutionRequest,
) (*UpdateWorkflowExecutionResponse, error) {
Expand Down Expand Up @@ -266,41 +228,6 @@ func (m *executionManagerImpl) UpdateWorkflowExecution(
return &UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: msuss}, err1
}

func (m *executionManagerImpl) SerializeUpsertChildExecutionInfos(
infos []*ChildExecutionInfo,
encoding common.EncodingType,
) ([]*InternalChildExecutionInfo, error) {

newInfos := make([]*InternalChildExecutionInfo, 0)
for _, v := range infos {
initiatedEvent, err := m.serializer.SerializeEvent(v.InitiatedEvent, encoding)
if err != nil {
return nil, err
}
startedEvent, err := m.serializer.SerializeEvent(v.StartedEvent, encoding)
if err != nil {
return nil, err
}
i := &InternalChildExecutionInfo{
InitiatedEvent: initiatedEvent,
StartedEvent: startedEvent,

Version: v.Version,
InitiatedID: v.InitiatedID,
InitiatedEventBatchID: v.InitiatedEventBatchID,
CreateRequestID: v.CreateRequestID,
StartedID: v.StartedID,
StartedWorkflowID: v.StartedWorkflowID,
StartedRunID: v.StartedRunID,
Namespace: v.Namespace,
WorkflowTypeName: v.WorkflowTypeName,
ParentClosePolicy: v.ParentClosePolicy,
}
newInfos = append(newInfos, i)
}
return newInfos, nil
}

func (m *executionManagerImpl) SerializeExecutionInfo(
info *WorkflowExecutionInfo,
stats *ExecutionStats,
Expand Down Expand Up @@ -499,10 +426,6 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
if err != nil {
return nil, err
}
serializedUpsertChildExecutionInfos, err := m.SerializeUpsertChildExecutionInfos(input.UpsertChildExecutionInfos, encoding)
if err != nil {
return nil, err
}
var serializedNewBufferedEvents *serialization.DataBlob
if input.NewBufferedEvents != nil {
serializedNewBufferedEvents, err = m.serializer.SerializeBatchEvents(input.NewBufferedEvents, encoding)
Expand Down Expand Up @@ -531,7 +454,7 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
DeleteActivityInfos: input.DeleteActivityInfos,
UpsertTimerInfos: input.UpsertTimerInfos,
DeleteTimerInfos: input.DeleteTimerInfos,
UpsertChildExecutionInfos: serializedUpsertChildExecutionInfos,
UpsertChildExecutionInfos: input.UpsertChildExecutionInfos,
DeleteChildExecutionInfo: input.DeleteChildExecutionInfo,
UpsertRequestCancelInfos: input.UpsertRequestCancelInfos,
DeleteRequestCancelInfo: input.DeleteRequestCancelInfo,
Expand Down Expand Up @@ -568,10 +491,6 @@ func (m *executionManagerImpl) SerializeWorkflowSnapshot(
if err != nil {
return nil, err
}
serializedChildExecutionInfos, err := m.SerializeUpsertChildExecutionInfos(input.ChildExecutionInfos, encoding)
if err != nil {
return nil, err
}

startVersion, err := getStartVersion(input.VersionHistories, input.ReplicationState)
if err != nil {
Expand All @@ -591,7 +510,7 @@ func (m *executionManagerImpl) SerializeWorkflowSnapshot(

ActivityInfos: input.ActivityInfos,
TimerInfos: input.TimerInfos,
ChildExecutionInfos: serializedChildExecutionInfos,
ChildExecutionInfos: input.ChildExecutionInfos,
RequestCancelInfos: input.RequestCancelInfos,
SignalInfos: input.SignalInfos,
SignalRequestedIDs: input.SignalRequestedIDs,
Expand Down
Loading

0 comments on commit f2bf9b2

Please sign in to comment.