Skip to content

Commit

Permalink
Convert ReplicationState/ExecutionState POGO to Proto (#622)
Browse files Browse the repository at this point in the history
  • Loading branch information
shawnhathaway authored Jul 28, 2020
1 parent 463fc8a commit 17e3b67
Show file tree
Hide file tree
Showing 29 changed files with 1,599 additions and 856 deletions.
1,793 changes: 1,265 additions & 528 deletions api/persistenceblobs/v1/message.pb.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -2806,7 +2806,7 @@ func (d *cassandraPersistence) RangeDeleteReplicationTaskFromDLQ(
return nil
}

func workflowExecutionFromRow(result map[string]interface{}) (*p.InternalWorkflowExecutionInfo, *p.ReplicationState, error) {
func workflowExecutionFromRow(result map[string]interface{}) (*p.InternalWorkflowExecutionInfo, *persistenceblobs.ReplicationState, error) {
eiBytes, ok := result["execution"].([]byte)
if !ok {
return nil, nil, newPersistedTypeMismatchError("execution", "", eiBytes, result)
Expand Down Expand Up @@ -2834,7 +2834,7 @@ func workflowExecutionFromRow(result map[string]interface{}) (*p.InternalWorkflo

info := p.ProtoWorkflowExecutionToPartialInternalExecution(protoInfo, protoState, nextEventID)

var state *p.ReplicationState
var state *persistenceblobs.ReplicationState
if protoInfo.ReplicationData != nil {
protoReplVersions, err := ProtoReplicationVersionsFromResultMap(result)
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions common/persistence/cassandra/cassandraPersistenceUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func createExecution(
batch *gocql.Batch,
shardID int,
executionInfo *p.InternalWorkflowExecutionInfo,
replicationState *p.ReplicationState,
replicationState *persistenceblobs.ReplicationState,
versionHistories *serialization.DataBlob,
checksum checksum.Checksum,
cqlNowTimestampMillis int64,
Expand Down Expand Up @@ -506,7 +506,7 @@ func createExecution(
checksumDatablob.Data,
checksumDatablob.Encoding.String())
} else if replicationState != nil {
replicationVersions, err := serialization.ReplicationVersionsToBlob(replicationState.GenerateVersionProto())
replicationVersions, err := serialization.ReplicationVersionsToBlob(p.GenerateVersionProto(replicationState))
if err != nil {
return err
}
Expand Down Expand Up @@ -537,7 +537,7 @@ func updateExecution(
batch *gocql.Batch,
shardID int,
executionInfo *p.InternalWorkflowExecutionInfo,
replicationState *p.ReplicationState,
replicationState *persistenceblobs.ReplicationState,
versionHistories *serialization.DataBlob,
cqlNowTimestampMillis int64,
condition int64,
Expand Down Expand Up @@ -620,7 +620,7 @@ func updateExecution(
rowTypeExecutionTaskID,
condition)
} else if replicationState != nil {
replicationVersions, err := serialization.ReplicationVersionsToBlob(replicationState.GenerateVersionProto())
replicationVersions, err := serialization.ReplicationVersionsToBlob(p.GenerateVersionProto(replicationState))
if err != nil {
return err
}
Expand Down Expand Up @@ -1536,12 +1536,12 @@ func updateBufferedEvents(
}
}

func ReplicationStateFromProtos(wei *persistenceblobs.WorkflowExecutionInfo, rv *persistenceblobs.ReplicationVersions) *p.ReplicationState {
func ReplicationStateFromProtos(wei *persistenceblobs.WorkflowExecutionInfo, rv *persistenceblobs.ReplicationVersions) *persistenceblobs.ReplicationState {
if rv == nil && wei.ReplicationData == nil {
return nil
}

info := &p.ReplicationState{}
info := &persistenceblobs.ReplicationState{}
info.CurrentVersion = wei.CurrentVersion

if rv != nil {
Expand All @@ -1551,7 +1551,7 @@ func ReplicationStateFromProtos(wei *persistenceblobs.WorkflowExecutionInfo, rv

if wei.ReplicationData != nil {
info.LastReplicationInfo = wei.ReplicationData.LastReplicationInfo
info.LastWriteEventID = wei.ReplicationData.LastWriteEventId
info.LastWriteEventId = wei.ReplicationData.LastWriteEventId
}

if info.LastReplicationInfo == nil {
Expand Down
29 changes: 7 additions & 22 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,21 +257,6 @@ type (
CronSchedule string
}

// ExecutionStats is the statistics about workflow execution
ExecutionStats struct {
HistorySize int64
}

// ReplicationState represents mutable state information for global namespaces.
// This information is used by replication protocol when applying events from remote clusters
ReplicationState struct {
CurrentVersion int64
StartVersion int64
LastWriteVersion int64
LastWriteEventID int64
LastReplicationInfo map[string]*replicationspb.ReplicationInfo
}

// ReplicationTaskInfoWrapper describes a replication task.
ReplicationTaskInfoWrapper struct {
*persistenceblobs.ReplicationTaskInfo
Expand Down Expand Up @@ -490,8 +475,8 @@ type (
SignalInfos map[int64]*persistenceblobs.SignalInfo
SignalRequestedIDs map[string]struct{}
ExecutionInfo *WorkflowExecutionInfo
ExecutionStats *ExecutionStats
ReplicationState *ReplicationState
ExecutionStats *persistenceblobs.ExecutionStats
ReplicationState *persistenceblobs.ReplicationState
BufferedEvents []*historypb.HistoryEvent
VersionHistories *VersionHistories
Checksum checksum.Checksum
Expand Down Expand Up @@ -658,8 +643,8 @@ type (
// WorkflowMutation is used as generic workflow execution state mutation
WorkflowMutation struct {
ExecutionInfo *WorkflowExecutionInfo
ExecutionStats *ExecutionStats
ReplicationState *ReplicationState
ExecutionStats *persistenceblobs.ExecutionStats
ReplicationState *persistenceblobs.ReplicationState
VersionHistories *VersionHistories

UpsertActivityInfos []*persistenceblobs.ActivityInfo
Expand Down Expand Up @@ -688,8 +673,8 @@ type (
// WorkflowSnapshot is used as generic workflow execution state snapshot
WorkflowSnapshot struct {
ExecutionInfo *WorkflowExecutionInfo
ExecutionStats *ExecutionStats
ReplicationState *ReplicationState
ExecutionStats *persistenceblobs.ExecutionStats
ReplicationState *persistenceblobs.ReplicationState
VersionHistories *VersionHistories

ActivityInfos []*persistenceblobs.ActivityInfo
Expand Down Expand Up @@ -2168,7 +2153,7 @@ func NewGetReplicationTasksFromDLQRequest(
}
}

func (r *ReplicationState) GenerateVersionProto() *persistenceblobs.ReplicationVersions {
func GenerateVersionProto(r *persistenceblobs.ReplicationState) *persistenceblobs.ReplicationVersions {
return &persistenceblobs.ReplicationVersions{
StartVersion: &types.Int64Value{Value: r.StartVersion},
LastWriteVersion: &types.Int64Value{Value: r.LastWriteVersion},
Expand Down
11 changes: 6 additions & 5 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/api/persistenceblobs/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/persistence/serialization"
Expand Down Expand Up @@ -109,7 +110,7 @@ func (m *executionManagerImpl) GetWorkflowExecution(

func (m *executionManagerImpl) DeserializeExecutionInfo(
info *InternalWorkflowExecutionInfo,
) (*WorkflowExecutionInfo, *ExecutionStats, error) {
) (*WorkflowExecutionInfo, *persistenceblobs.ExecutionStats, error) {

completionEvent, err := m.serializer.DeserializeEvent(info.CompletionEvent)
if err != nil {
Expand Down Expand Up @@ -177,7 +178,7 @@ func (m *executionManagerImpl) DeserializeExecutionInfo(
SearchAttributes: info.SearchAttributes,
Memo: info.Memo,
}
newStats := &ExecutionStats{
newStats := &persistenceblobs.ExecutionStats{
HistorySize: info.HistorySize,
}
return newInfo, newStats, nil
Expand Down Expand Up @@ -230,7 +231,7 @@ func (m *executionManagerImpl) UpdateWorkflowExecution(

func (m *executionManagerImpl) SerializeExecutionInfo(
info *WorkflowExecutionInfo,
stats *ExecutionStats,
stats *persistenceblobs.ExecutionStats,
encoding common.EncodingType,
) (*InternalWorkflowExecutionInfo, error) {

Expand Down Expand Up @@ -691,7 +692,7 @@ func (m *executionManagerImpl) Close() {

func getStartVersion(
versionHistories *VersionHistories,
replicationState *ReplicationState,
replicationState *persistenceblobs.ReplicationState,
) (int64, error) {

if replicationState == nil && versionHistories == nil {
Expand All @@ -715,7 +716,7 @@ func getStartVersion(

func getLastWriteVersion(
versionHistories *VersionHistories,
replicationState *ReplicationState,
replicationState *persistenceblobs.ReplicationState,
) (int64, error) {

if replicationState == nil && versionHistories == nil {
Expand Down
Loading

0 comments on commit 17e3b67

Please sign in to comment.