From d06e307c2067860a4b52c9ddc4683fb095dc2d97 Mon Sep 17 00:00:00 2001 From: Ruslan <11838981+feedmeapples@users.noreply.github.com> Date: Thu, 4 Jun 2020 13:18:29 -0700 Subject: [PATCH] Create CLI command to describe Replication Task (#416) * Create CLI command to describe Replication Task --- common/metrics/defs.go | 3 ++ common/mocks/ExecutionManager.go | 23 ++++++++++++ .../cassandra/cassandraPersistence.go | 37 +++++++++++++++++++ common/persistence/dataInterfaces.go | 12 ++++++ common/persistence/executionStore.go | 6 +++ common/persistence/persistenceInterface.go | 1 + .../persistence/persistenceMetricClients.go | 14 +++++++ .../persistenceRateLimitedClients.go | 9 +++++ common/persistence/sql/sqlExecutionManager.go | 24 ++++++++++++ tools/cli/adminCommands.go | 22 +++++++++-- 10 files changed, 147 insertions(+), 4 deletions(-) diff --git a/common/metrics/defs.go b/common/metrics/defs.go index f7c7e09dccc..5b0c50cb903 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -160,6 +160,8 @@ const ( PersistenceCompleteTransferTaskScope // PersistenceRangeCompleteTransferTaskScope tracks CompleteTransferTasks calls made by service to persistence layer PersistenceRangeCompleteTransferTaskScope + // PersistenceGetReplicationTaskScope tracks GetReplicationTask calls made by service to persistence layer + PersistenceGetReplicationTaskScope // PersistenceGetReplicationTasksScope tracks GetReplicationTasks calls made by service to persistence layer PersistenceGetReplicationTasksScope // PersistenceCompleteReplicationTaskScope tracks CompleteReplicationTasks calls made by service to persistence layer @@ -1074,6 +1076,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ PersistenceGetTransferTasksScope: {operation: "GetTransferTasks"}, PersistenceCompleteTransferTaskScope: {operation: "CompleteTransferTask"}, PersistenceRangeCompleteTransferTaskScope: {operation: "RangeCompleteTransferTask"}, + PersistenceGetReplicationTaskScope: {operation: "GetReplicationTask"}, PersistenceGetReplicationTasksScope: {operation: "GetReplicationTasks"}, PersistenceCompleteReplicationTaskScope: {operation: "CompleteReplicationTask"}, PersistenceRangeCompleteReplicationTaskScope: {operation: "RangeCompleteReplicationTask"}, diff --git a/common/mocks/ExecutionManager.go b/common/mocks/ExecutionManager.go index 4f7be96893a..f5119f716af 100644 --- a/common/mocks/ExecutionManager.go +++ b/common/mocks/ExecutionManager.go @@ -285,6 +285,29 @@ func (_m *ExecutionManager) RangeCompleteTransferTask(request *persistence.Range return r0 } +// GetReplicationTask provides a mock function with given fields: request +func (_m *ExecutionManager) GetReplicationTask(request *persistence.GetReplicationTaskRequest) (*persistence.GetReplicationTaskResponse, error) { + ret := _m.Called(request) + + var r0 *persistence.GetReplicationTaskResponse + if rf, ok := ret.Get(0).(func(*persistence.GetReplicationTaskRequest) *persistence.GetReplicationTaskResponse); ok { + r0 = rf(request) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*persistence.GetReplicationTaskResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(*persistence.GetReplicationTaskRequest) error); ok { + r1 = rf(request) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetReplicationTasks provides a mock function with given fields: request func (_m *ExecutionManager) GetReplicationTasks(request *persistence.GetReplicationTasksRequest) (*persistence.GetReplicationTasksResponse, error) { ret := _m.Called(request) diff --git a/common/persistence/cassandra/cassandraPersistence.go b/common/persistence/cassandra/cassandraPersistence.go index a6d46e31839..79fb0c91def 100644 --- a/common/persistence/cassandra/cassandraPersistence.go +++ b/common/persistence/cassandra/cassandraPersistence.go @@ -517,6 +517,16 @@ workflow_state = ? ` + `and task_id > ? ` + `and task_id <= ?` + templateGetReplicationTaskQuery = `SELECT replication, replication_encoding ` + + `FROM executions ` + + `WHERE shard_id = ? ` + + `and type = ? ` + + `and namespace_id = ? ` + + `and workflow_id = ? ` + + `and run_id = ? ` + + `and visibility_ts = ? ` + + `and task_id = ? ` + templateGetReplicationTasksQuery = `SELECT replication, replication_encoding ` + `FROM executions ` + `WHERE shard_id = ? ` + @@ -1964,6 +1974,33 @@ func (d *cassandraPersistence) GetTransferTasks(request *p.GetTransferTasksReque return response, nil } +func (d *cassandraPersistence) GetReplicationTask(request *p.GetReplicationTaskRequest) (*p.GetReplicationTaskResponse, error) { + shardID := d.shardID + taskID := request.TaskID + query := d.session.Query(templateGetReplicationTaskQuery, + shardID, + rowTypeReplicationTask, + rowTypeReplicationNamespaceID, + rowTypeReplicationWorkflowID, + rowTypeReplicationRunID, + defaultVisibilityTimestamp, + taskID) + + var data []byte + var encoding string + if err := query.Scan(&data, &encoding); err != nil { + return nil, convertCommonErrors("GetReplicationTask", err) + } + + info, err := serialization.ReplicationTaskInfoFromBlob(data, encoding) + + if err != nil { + return nil, convertCommonErrors("GetReplicationTask", err) + } + + return &p.GetReplicationTaskResponse{ReplicationTaskInfo: info}, nil +} + func (d *cassandraPersistence) GetReplicationTasks( request *p.GetReplicationTasksRequest, ) (*p.GetReplicationTasksResponse, error) { diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 14fc92ea260..38054adfbf1 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -790,6 +790,17 @@ type ( NextPageToken []byte } + // GetReplicationTaskRequest is the request for GetReplicationTask + GetReplicationTaskRequest struct { + ShardID int32 + TaskID int64 + } + + // GetReplicationTaskResponse is the response to GetReplicationTask + GetReplicationTaskResponse struct { + ReplicationTaskInfo *persistenceblobs.ReplicationTaskInfo + } + // GetReplicationTasksRequest is used to read tasks from the replication task queue GetReplicationTasksRequest struct { ReadLevel int64 @@ -1345,6 +1356,7 @@ type ( RangeCompleteTransferTask(request *RangeCompleteTransferTaskRequest) error // Replication task related methods + GetReplicationTask(request *GetReplicationTaskRequest) (*GetReplicationTaskResponse, error) GetReplicationTasks(request *GetReplicationTasksRequest) (*GetReplicationTasksResponse, error) CompleteReplicationTask(request *CompleteReplicationTaskRequest) error RangeCompleteReplicationTask(request *RangeCompleteReplicationTaskRequest) error diff --git a/common/persistence/executionStore.go b/common/persistence/executionStore.go index 023188cf07e..afa0e53d290 100644 --- a/common/persistence/executionStore.go +++ b/common/persistence/executionStore.go @@ -821,6 +821,12 @@ func (m *executionManagerImpl) RangeCompleteTransferTask( } // Replication task related methods +func (m *executionManagerImpl) GetReplicationTask( + request *GetReplicationTaskRequest, +) (*GetReplicationTaskResponse, error) { + return m.persistence.GetReplicationTask(request) +} + func (m *executionManagerImpl) GetReplicationTasks( request *GetReplicationTasksRequest, ) (*GetReplicationTasksResponse, error) { diff --git a/common/persistence/persistenceInterface.go b/common/persistence/persistenceInterface.go index 35cab9ad96d..bffbd8b54ea 100644 --- a/common/persistence/persistenceInterface.go +++ b/common/persistence/persistenceInterface.go @@ -103,6 +103,7 @@ type ( RangeCompleteTransferTask(request *RangeCompleteTransferTaskRequest) error // Replication task related methods + GetReplicationTask(request *GetReplicationTaskRequest) (*GetReplicationTaskResponse, error) GetReplicationTasks(request *GetReplicationTasksRequest) (*GetReplicationTasksResponse, error) CompleteReplicationTask(request *CompleteReplicationTaskRequest) error RangeCompleteReplicationTask(request *RangeCompleteReplicationTaskRequest) error diff --git a/common/persistence/persistenceMetricClients.go b/common/persistence/persistenceMetricClients.go index f4f8aa58cab..9d834934508 100644 --- a/common/persistence/persistenceMetricClients.go +++ b/common/persistence/persistenceMetricClients.go @@ -379,6 +379,20 @@ func (p *workflowExecutionPersistenceClient) GetTransferTasks(request *GetTransf return response, err } +func (p *workflowExecutionPersistenceClient) GetReplicationTask(request *GetReplicationTaskRequest) (*GetReplicationTaskResponse, error) { + p.metricClient.IncCounter(metrics.PersistenceGetReplicationTaskScope, metrics.PersistenceRequests) + + sw := p.metricClient.StartTimer(metrics.PersistenceGetReplicationTaskScope, metrics.PersistenceLatency) + response, err := p.persistence.GetReplicationTask(request) + sw.Stop() + + if err != nil { + p.updateErrorMetric(metrics.PersistenceGetReplicationTaskScope, err) + } + + return response, err +} + func (p *workflowExecutionPersistenceClient) GetReplicationTasks(request *GetReplicationTasksRequest) (*GetReplicationTasksResponse, error) { p.metricClient.IncCounter(metrics.PersistenceGetReplicationTasksScope, metrics.PersistenceRequests) diff --git a/common/persistence/persistenceRateLimitedClients.go b/common/persistence/persistenceRateLimitedClients.go index c1600c09c99..cd1619fa730 100644 --- a/common/persistence/persistenceRateLimitedClients.go +++ b/common/persistence/persistenceRateLimitedClients.go @@ -302,6 +302,15 @@ func (p *workflowExecutionRateLimitedPersistenceClient) GetTransferTasks(request return response, err } +func (p *workflowExecutionRateLimitedPersistenceClient) GetReplicationTask(request *GetReplicationTaskRequest) (*GetReplicationTaskResponse, error) { + if ok := p.rateLimiter.Allow(); !ok { + return nil, ErrPersistenceLimitExceeded + } + + response, err := p.persistence.GetReplicationTask(request) + return response, err +} + func (p *workflowExecutionRateLimitedPersistenceClient) GetReplicationTasks(request *GetReplicationTasksRequest) (*GetReplicationTasksResponse, error) { if ok := p.rateLimiter.Allow(); !ok { return nil, ErrPersistenceLimitExceeded diff --git a/common/persistence/sql/sqlExecutionManager.go b/common/persistence/sql/sqlExecutionManager.go index 1fac5e31d40..45e56b62356 100644 --- a/common/persistence/sql/sqlExecutionManager.go +++ b/common/persistence/sql/sqlExecutionManager.go @@ -777,6 +777,30 @@ func (m *sqlExecutionManager) RangeCompleteTransferTask( return nil } +func (m *sqlExecutionManager) GetReplicationTask(request *persistence.GetReplicationTaskRequest) (*persistence.GetReplicationTaskResponse, error) { + rows, err := m.db.SelectFromReplicationTasks(&sqlplugin.ReplicationTasksFilter{ShardID: int(request.ShardID), TaskID: request.TaskID}) + if err != nil { + if err == sql.ErrNoRows { + return nil, serviceerror.NewNotFound(fmt.Sprintf("GetReplicationTask operation failed. Task with ID %v not found. Error: %v", request.TaskID, err)) + } + return nil, serviceerror.NewInternal(fmt.Sprintf("GetReplicationTask operation failed. Failed to get record. TaskId: %v. Error: %v", request.TaskID, err)) + } + + if len(rows) == 0 { + return nil, serviceerror.NewInternal(fmt.Sprintf("GetReplicationTask operation failed. Failed to get record. TaskId: %v", request.TaskID)) + } + + replicationRow := rows[0] + replicationInfo, err := serialization.ReplicationTaskInfoFromBlob(replicationRow.Data, replicationRow.DataEncoding) + if err != nil { + return nil, err + } + + resp := &persistence.GetReplicationTaskResponse{ReplicationTaskInfo: replicationInfo} + + return resp, nil +} + func (m *sqlExecutionManager) GetReplicationTasks( request *p.GetReplicationTasksRequest, ) (*p.GetReplicationTasksResponse, error) { diff --git a/tools/cli/adminCommands.go b/tools/cli/adminCommands.go index 84cf121f700..c1501a534a4 100644 --- a/tools/cli/adminCommands.go +++ b/tools/cli/adminCommands.go @@ -374,6 +374,7 @@ func AdminDescribeTask(c *cli.Context) { sid := getRequiredIntOption(c, FlagShardID) tid := getRequiredIntOption(c, FlagTaskID) vis := getRequiredInt64Option(c, FlagTaskVisibilityTimestamp) + category := commongenpb.TaskCategory(c.Int(FlagTaskType)) pFactory := CreatePersistenceFactory(c) executionManager, err := pFactory.NewExecutionManager(sid) @@ -381,10 +382,23 @@ func AdminDescribeTask(c *cli.Context) { ErrorAndExit("Failed to initialize execution manager", err) } - req := &persistence.GetTimerTaskRequest{ShardID: int32(sid), TaskID: int64(tid), VisibilityTimestamp: time.Unix(0, vis)} - timerTask, err := executionManager.GetTimerTask(req) - - prettyPrintJSONObject(timerTask) + if category == commongenpb.TaskCategory_TaskCategory_Timer { + req := &persistence.GetTimerTaskRequest{ShardID: int32(sid), TaskID: int64(tid), VisibilityTimestamp: time.Unix(0, vis)} + task, err := executionManager.GetTimerTask(req) + if err != nil { + ErrorAndExit("Failed to get Timer Task", err) + } + prettyPrintJSONObject(task) + } else if category == commongenpb.TaskCategory_TaskCategory_Replication { + req := &persistence.GetReplicationTaskRequest{ShardID: int32(sid), TaskID: int64(tid)} + task, err := executionManager.GetReplicationTask(req) + if err != nil { + ErrorAndExit("Failed to get Replication Task", err) + } + prettyPrintJSONObject(task) + } else { + ErrorAndExit("Failed to describe task", fmt.Errorf("Unrecognized task type, task_type=%v", category)) + } } // AdminRemoveTask describes history host