Skip to content

Commit

Permalink
Create CLI command to describe Replication Task (#416)
Browse files Browse the repository at this point in the history
* Create CLI command to describe Replication Task
  • Loading branch information
feedmeapples authored Jun 4, 2020
1 parent b16833c commit d06e307
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 4 deletions.
3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ const (
PersistenceCompleteTransferTaskScope
// PersistenceRangeCompleteTransferTaskScope tracks CompleteTransferTasks calls made by service to persistence layer
PersistenceRangeCompleteTransferTaskScope
// PersistenceGetReplicationTaskScope tracks GetReplicationTask calls made by service to persistence layer
PersistenceGetReplicationTaskScope
// PersistenceGetReplicationTasksScope tracks GetReplicationTasks calls made by service to persistence layer
PersistenceGetReplicationTasksScope
// PersistenceCompleteReplicationTaskScope tracks CompleteReplicationTasks calls made by service to persistence layer
Expand Down Expand Up @@ -1074,6 +1076,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceGetTransferTasksScope: {operation: "GetTransferTasks"},
PersistenceCompleteTransferTaskScope: {operation: "CompleteTransferTask"},
PersistenceRangeCompleteTransferTaskScope: {operation: "RangeCompleteTransferTask"},
PersistenceGetReplicationTaskScope: {operation: "GetReplicationTask"},
PersistenceGetReplicationTasksScope: {operation: "GetReplicationTasks"},
PersistenceCompleteReplicationTaskScope: {operation: "CompleteReplicationTask"},
PersistenceRangeCompleteReplicationTaskScope: {operation: "RangeCompleteReplicationTask"},
Expand Down
23 changes: 23 additions & 0 deletions common/mocks/ExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,29 @@ func (_m *ExecutionManager) RangeCompleteTransferTask(request *persistence.Range
return r0
}

// GetReplicationTask provides a mock function with given fields: request
func (_m *ExecutionManager) GetReplicationTask(request *persistence.GetReplicationTaskRequest) (*persistence.GetReplicationTaskResponse, error) {
ret := _m.Called(request)

var r0 *persistence.GetReplicationTaskResponse
if rf, ok := ret.Get(0).(func(*persistence.GetReplicationTaskRequest) *persistence.GetReplicationTaskResponse); ok {
r0 = rf(request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.GetReplicationTaskResponse)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(*persistence.GetReplicationTaskRequest) error); ok {
r1 = rf(request)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// GetReplicationTasks provides a mock function with given fields: request
func (_m *ExecutionManager) GetReplicationTasks(request *persistence.GetReplicationTasksRequest) (*persistence.GetReplicationTasksResponse, error) {
ret := _m.Called(request)
Expand Down
37 changes: 37 additions & 0 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,16 @@ workflow_state = ? ` +
`and task_id > ? ` +
`and task_id <= ?`

templateGetReplicationTaskQuery = `SELECT replication, replication_encoding ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and namespace_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id = ? `

templateGetReplicationTasksQuery = `SELECT replication, replication_encoding ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
Expand Down Expand Up @@ -1964,6 +1974,33 @@ func (d *cassandraPersistence) GetTransferTasks(request *p.GetTransferTasksReque
return response, nil
}

func (d *cassandraPersistence) GetReplicationTask(request *p.GetReplicationTaskRequest) (*p.GetReplicationTaskResponse, error) {
shardID := d.shardID
taskID := request.TaskID
query := d.session.Query(templateGetReplicationTaskQuery,
shardID,
rowTypeReplicationTask,
rowTypeReplicationNamespaceID,
rowTypeReplicationWorkflowID,
rowTypeReplicationRunID,
defaultVisibilityTimestamp,
taskID)

var data []byte
var encoding string
if err := query.Scan(&data, &encoding); err != nil {
return nil, convertCommonErrors("GetReplicationTask", err)
}

info, err := serialization.ReplicationTaskInfoFromBlob(data, encoding)

if err != nil {
return nil, convertCommonErrors("GetReplicationTask", err)
}

return &p.GetReplicationTaskResponse{ReplicationTaskInfo: info}, nil
}

func (d *cassandraPersistence) GetReplicationTasks(
request *p.GetReplicationTasksRequest,
) (*p.GetReplicationTasksResponse, error) {
Expand Down
12 changes: 12 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,17 @@ type (
NextPageToken []byte
}

// GetReplicationTaskRequest is the request for GetReplicationTask
GetReplicationTaskRequest struct {
ShardID int32
TaskID int64
}

// GetReplicationTaskResponse is the response to GetReplicationTask
GetReplicationTaskResponse struct {
ReplicationTaskInfo *persistenceblobs.ReplicationTaskInfo
}

// GetReplicationTasksRequest is used to read tasks from the replication task queue
GetReplicationTasksRequest struct {
ReadLevel int64
Expand Down Expand Up @@ -1345,6 +1356,7 @@ type (
RangeCompleteTransferTask(request *RangeCompleteTransferTaskRequest) error

// Replication task related methods
GetReplicationTask(request *GetReplicationTaskRequest) (*GetReplicationTaskResponse, error)
GetReplicationTasks(request *GetReplicationTasksRequest) (*GetReplicationTasksResponse, error)
CompleteReplicationTask(request *CompleteReplicationTaskRequest) error
RangeCompleteReplicationTask(request *RangeCompleteReplicationTaskRequest) error
Expand Down
6 changes: 6 additions & 0 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,12 @@ func (m *executionManagerImpl) RangeCompleteTransferTask(
}

// Replication task related methods
func (m *executionManagerImpl) GetReplicationTask(
request *GetReplicationTaskRequest,
) (*GetReplicationTaskResponse, error) {
return m.persistence.GetReplicationTask(request)
}

func (m *executionManagerImpl) GetReplicationTasks(
request *GetReplicationTasksRequest,
) (*GetReplicationTasksResponse, error) {
Expand Down
1 change: 1 addition & 0 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type (
RangeCompleteTransferTask(request *RangeCompleteTransferTaskRequest) error

// Replication task related methods
GetReplicationTask(request *GetReplicationTaskRequest) (*GetReplicationTaskResponse, error)
GetReplicationTasks(request *GetReplicationTasksRequest) (*GetReplicationTasksResponse, error)
CompleteReplicationTask(request *CompleteReplicationTaskRequest) error
RangeCompleteReplicationTask(request *RangeCompleteReplicationTaskRequest) error
Expand Down
14 changes: 14 additions & 0 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,20 @@ func (p *workflowExecutionPersistenceClient) GetTransferTasks(request *GetTransf
return response, err
}

func (p *workflowExecutionPersistenceClient) GetReplicationTask(request *GetReplicationTaskRequest) (*GetReplicationTaskResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceGetReplicationTaskScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceGetReplicationTaskScope, metrics.PersistenceLatency)
response, err := p.persistence.GetReplicationTask(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceGetReplicationTaskScope, err)
}

return response, err
}

func (p *workflowExecutionPersistenceClient) GetReplicationTasks(request *GetReplicationTasksRequest) (*GetReplicationTasksResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceGetReplicationTasksScope, metrics.PersistenceRequests)

Expand Down
9 changes: 9 additions & 0 deletions common/persistence/persistenceRateLimitedClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,15 @@ func (p *workflowExecutionRateLimitedPersistenceClient) GetTransferTasks(request
return response, err
}

func (p *workflowExecutionRateLimitedPersistenceClient) GetReplicationTask(request *GetReplicationTaskRequest) (*GetReplicationTaskResponse, error) {
if ok := p.rateLimiter.Allow(); !ok {
return nil, ErrPersistenceLimitExceeded
}

response, err := p.persistence.GetReplicationTask(request)
return response, err
}

func (p *workflowExecutionRateLimitedPersistenceClient) GetReplicationTasks(request *GetReplicationTasksRequest) (*GetReplicationTasksResponse, error) {
if ok := p.rateLimiter.Allow(); !ok {
return nil, ErrPersistenceLimitExceeded
Expand Down
24 changes: 24 additions & 0 deletions common/persistence/sql/sqlExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,30 @@ func (m *sqlExecutionManager) RangeCompleteTransferTask(
return nil
}

func (m *sqlExecutionManager) GetReplicationTask(request *persistence.GetReplicationTaskRequest) (*persistence.GetReplicationTaskResponse, error) {
rows, err := m.db.SelectFromReplicationTasks(&sqlplugin.ReplicationTasksFilter{ShardID: int(request.ShardID), TaskID: request.TaskID})
if err != nil {
if err == sql.ErrNoRows {
return nil, serviceerror.NewNotFound(fmt.Sprintf("GetReplicationTask operation failed. Task with ID %v not found. Error: %v", request.TaskID, err))
}
return nil, serviceerror.NewInternal(fmt.Sprintf("GetReplicationTask operation failed. Failed to get record. TaskId: %v. Error: %v", request.TaskID, err))
}

if len(rows) == 0 {
return nil, serviceerror.NewInternal(fmt.Sprintf("GetReplicationTask operation failed. Failed to get record. TaskId: %v", request.TaskID))
}

replicationRow := rows[0]
replicationInfo, err := serialization.ReplicationTaskInfoFromBlob(replicationRow.Data, replicationRow.DataEncoding)
if err != nil {
return nil, err
}

resp := &persistence.GetReplicationTaskResponse{ReplicationTaskInfo: replicationInfo}

return resp, nil
}

func (m *sqlExecutionManager) GetReplicationTasks(
request *p.GetReplicationTasksRequest,
) (*p.GetReplicationTasksResponse, error) {
Expand Down
22 changes: 18 additions & 4 deletions tools/cli/adminCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,17 +374,31 @@ func AdminDescribeTask(c *cli.Context) {
sid := getRequiredIntOption(c, FlagShardID)
tid := getRequiredIntOption(c, FlagTaskID)
vis := getRequiredInt64Option(c, FlagTaskVisibilityTimestamp)
category := commongenpb.TaskCategory(c.Int(FlagTaskType))

pFactory := CreatePersistenceFactory(c)
executionManager, err := pFactory.NewExecutionManager(sid)
if err != nil {
ErrorAndExit("Failed to initialize execution manager", err)
}

req := &persistence.GetTimerTaskRequest{ShardID: int32(sid), TaskID: int64(tid), VisibilityTimestamp: time.Unix(0, vis)}
timerTask, err := executionManager.GetTimerTask(req)

prettyPrintJSONObject(timerTask)
if category == commongenpb.TaskCategory_TaskCategory_Timer {
req := &persistence.GetTimerTaskRequest{ShardID: int32(sid), TaskID: int64(tid), VisibilityTimestamp: time.Unix(0, vis)}
task, err := executionManager.GetTimerTask(req)
if err != nil {
ErrorAndExit("Failed to get Timer Task", err)
}
prettyPrintJSONObject(task)
} else if category == commongenpb.TaskCategory_TaskCategory_Replication {
req := &persistence.GetReplicationTaskRequest{ShardID: int32(sid), TaskID: int64(tid)}
task, err := executionManager.GetReplicationTask(req)
if err != nil {
ErrorAndExit("Failed to get Replication Task", err)
}
prettyPrintJSONObject(task)
} else {
ErrorAndExit("Failed to describe task", fmt.Errorf("Unrecognized task type, task_type=%v", category))
}
}

// AdminRemoveTask describes history host
Expand Down

0 comments on commit d06e307

Please sign in to comment.