Skip to content

Commit

Permalink
Create CLI command to describe Timer Task (#408)
Browse files Browse the repository at this point in the history
* Create CLI admin command to describe Timer Task
  • Loading branch information
feedmeapples authored Jun 1, 2020
1 parent 4b25273 commit bd7a944
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 0 deletions.
3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ const (
PersistenceDeleteReplicationTaskFromDLQScope
// PersistenceRangeDeleteReplicationTaskFromDLQScope tracks PersistenceRangeDeleteReplicationTaskFromDLQScope calls made by service to persistence layer
PersistenceRangeDeleteReplicationTaskFromDLQScope
// PersistenceGetTimerTaskScope tracks GetTimerTask calls made by service to persistence layer
PersistenceGetTimerTaskScope
// PersistenceGetTimerIndexTasksScope tracks GetTimerIndexTasks calls made by service to persistence layer
PersistenceGetTimerIndexTasksScope
// PersistenceCompleteTimerTaskScope tracks CompleteTimerTasks calls made by service to persistence layer
Expand Down Expand Up @@ -1079,6 +1081,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceGetReplicationTasksFromDLQScope: {operation: "GetReplicationTasksFromDLQ"},
PersistenceDeleteReplicationTaskFromDLQScope: {operation: "DeleteReplicationTaskFromDLQ"},
PersistenceRangeDeleteReplicationTaskFromDLQScope: {operation: "RangeDeleteReplicationTaskFromDLQ"},
PersistenceGetTimerTaskScope: {operation: "GetTimerTask"},
PersistenceGetTimerIndexTasksScope: {operation: "GetTimerIndexTasks"},
PersistenceCompleteTimerTaskScope: {operation: "CompleteTimerTask"},
PersistenceRangeCompleteTimerTaskScope: {operation: "RangeCompleteTimerTask"},
Expand Down
23 changes: 23 additions & 0 deletions common/mocks/ExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,29 @@ func (_m *ExecutionManager) RangeDeleteReplicationTaskFromDLQ(
return r0
}

// GetTimerTask provides a mock function with given fields: request
func (_m *ExecutionManager) GetTimerTask(request *persistence.GetTimerTaskRequest) (*persistence.GetTimerTaskResponse, error) {
ret := _m.Called(request)

var r0 *persistence.GetTimerTaskResponse
if rf, ok := ret.Get(0).(func(*persistence.GetTimerTaskRequest) *persistence.GetTimerTaskResponse); ok {
r0 = rf(request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.GetTimerTaskResponse)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(*persistence.GetTimerTaskRequest) error); ok {
r1 = rf(request)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// GetTimerIndexTasks provides a mock function with given fields: request
func (_m *ExecutionManager) GetTimerIndexTasks(request *persistence.GetTimerIndexTasksRequest) (*persistence.GetTimerIndexTasksResponse, error) {
ret := _m.Called(request)
Expand Down
38 changes: 38 additions & 0 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,16 @@ workflow_state = ? ` +

templateRangeCompleteReplicationTaskQuery = templateRangeCompleteTransferTaskQuery

templateGetTimerTaskQuery = `SELECT timer, timer_encoding ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and namespace_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id = ? `

templateGetTimerTasksQuery = `SELECT timer, timer_encoding ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
Expand Down Expand Up @@ -2558,6 +2568,34 @@ func (d *cassandraPersistence) CompleteTasksLessThan(request *p.CompleteTasksLes
return p.UnknownNumRowsAffected, nil
}

func (d *cassandraPersistence) GetTimerTask(request *p.GetTimerTaskRequest) (*p.GetTimerTaskResponse, error) {
shardID := d.shardID
taskID := request.TaskID
visibilityTs := request.VisibilityTimestamp
query := d.session.Query(templateGetTimerTaskQuery,
shardID,
rowTypeTimerTask,
rowTypeTimerNamespaceID,
rowTypeTimerWorkflowID,
rowTypeTimerRunID,
visibilityTs,
taskID)

var data []byte
var encoding string
if err := query.Scan(&data, &encoding); err != nil {
return nil, convertCommonErrors("GetTimerTask", err)
}

info, err := serialization.TimerTaskInfoFromBlob(data, encoding)

if err != nil {
return nil, convertCommonErrors("GetTimerTask", err)
}

return &p.GetTimerTaskResponse{TimerTaskInfo: info}, nil
}

func (d *cassandraPersistence) GetTimerIndexTasks(request *p.GetTimerIndexTasksRequest) (*p.GetTimerIndexTasksResponse,
error) {
// Reading timer tasks need to be quorum level consistent, otherwise we could loose task
Expand Down
13 changes: 13 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,18 @@ type (
Limit int // Limit on the max number of tasks that can be completed. Required param
}

// GetTimerTaskRequest is the request for GetTimerTask
GetTimerTaskRequest struct {
ShardID int32
TaskID int64
VisibilityTimestamp time.Time
}

// GetTimerTaskResponse is the response to GetTimerTask
GetTimerTaskResponse struct {
TimerTaskInfo *persistenceblobs.TimerTaskInfo
}

// GetTimerIndexTasksRequest is the request for GetTimerIndexTasks
// TODO: replace this with an iterator that can configure min and max index.
GetTimerIndexTasksRequest struct {
Expand Down Expand Up @@ -1342,6 +1354,7 @@ type (
RangeDeleteReplicationTaskFromDLQ(request *RangeDeleteReplicationTaskFromDLQRequest) error

// Timer related methods.
GetTimerTask(request *GetTimerTaskRequest) (*GetTimerTaskResponse, error)
GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error)
CompleteTimerTask(request *CompleteTimerTaskRequest) error
RangeCompleteTimerTask(request *RangeCompleteTimerTaskRequest) error
Expand Down
6 changes: 6 additions & 0 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,12 @@ func (m *executionManagerImpl) RangeDeleteReplicationTaskFromDLQ(
}

// Timer related methods.
func (m *executionManagerImpl) GetTimerTask(
request *GetTimerTaskRequest,
) (*GetTimerTaskResponse, error) {
return m.persistence.GetTimerTask(request)
}

func (m *executionManagerImpl) GetTimerIndexTasks(
request *GetTimerIndexTasksRequest,
) (*GetTimerIndexTasksResponse, error) {
Expand Down
1 change: 1 addition & 0 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type (
RangeDeleteReplicationTaskFromDLQ(request *RangeDeleteReplicationTaskFromDLQRequest) error

// Timer related methods.
GetTimerTask(request *GetTimerTaskRequest) (*GetTimerTaskResponse, error)
GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error)
CompleteTimerTask(request *CompleteTimerTaskRequest) error
RangeCompleteTimerTask(request *RangeCompleteTimerTaskRequest) error
Expand Down
14 changes: 14 additions & 0 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,20 @@ func (p *workflowExecutionPersistenceClient) RangeDeleteReplicationTaskFromDLQ(
return nil
}

func (p *workflowExecutionPersistenceClient) GetTimerTask(request *GetTimerTaskRequest) (*GetTimerTaskResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceGetTimerTaskScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceGetTimerTaskScope, metrics.PersistenceLatency)
response, err := p.persistence.GetTimerTask(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceGetTimerTaskScope, err)
}

return response, err
}

func (p *workflowExecutionPersistenceClient) GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceGetTimerIndexTasksScope, metrics.PersistenceRequests)

Expand Down
9 changes: 9 additions & 0 deletions common/persistence/persistenceRateLimitedClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,15 @@ func (p *workflowExecutionRateLimitedPersistenceClient) RangeDeleteReplicationTa
return p.persistence.RangeDeleteReplicationTaskFromDLQ(request)
}

func (p *workflowExecutionRateLimitedPersistenceClient) GetTimerTask(request *GetTimerTaskRequest) (*GetTimerTaskResponse, error) {
if ok := p.rateLimiter.Allow(); !ok {
return nil, ErrPersistenceLimitExceeded
}

response, err := p.persistence.GetTimerTask(request)
return response, err
}

func (p *workflowExecutionRateLimitedPersistenceClient) GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) {
if ok := p.rateLimiter.Allow(); !ok {
return nil, ErrPersistenceLimitExceeded
Expand Down
25 changes: 25 additions & 0 deletions common/persistence/sql/sqlExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/temporalio/temporal/common/collection"
"github.com/temporalio/temporal/common/convert"
"github.com/temporalio/temporal/common/log"
"github.com/temporalio/temporal/common/persistence"
p "github.com/temporalio/temporal/common/persistence"
"github.com/temporalio/temporal/common/persistence/serialization"
"github.com/temporalio/temporal/common/persistence/sql/sqlplugin"
Expand Down Expand Up @@ -958,6 +959,30 @@ func (t *timerTaskPageToken) deserialize(payload []byte) error {
return json.Unmarshal(payload, t)
}

func (m *sqlExecutionManager) GetTimerTask(request *persistence.GetTimerTaskRequest) (*persistence.GetTimerTaskResponse, error) {
rows, err := m.db.SelectFromTimerTasks(&sqlplugin.TimerTasksFilter{ShardID: int(request.ShardID), TaskID: int64(request.TaskID), VisibilityTimestamp: &request.VisibilityTimestamp})
if err != nil {
if err == sql.ErrNoRows {
return nil, serviceerror.NewNotFound(fmt.Sprintf("GetTimerTask operation failed. Task with ID %v not found. Error: %v", request.TaskID, err))
}
return nil, serviceerror.NewInternal(fmt.Sprintf("GetTimerTask operation failed. Failed to get record. TaskId: %v. Error: %v", request.TaskID, err))
}

if len(rows) == 0 {
return nil, serviceerror.NewInternal(fmt.Sprintf("GetTimerTask operation failed. Failed to get record. TaskId: %v", request.TaskID))
}

timerRow := rows[0]
timerInfo, err := serialization.TimerTaskInfoFromBlob(timerRow.Data, timerRow.DataEncoding)
if err != nil {
return nil, err
}

resp := &persistence.GetTimerTaskResponse{TimerTaskInfo: timerInfo}

return resp, nil
}

func (m *sqlExecutionManager) GetTimerIndexTasks(
request *p.GetTimerIndexTasksRequest,
) (*p.GetTimerIndexTasksResponse, error) {
Expand Down
34 changes: 34 additions & 0 deletions tools/cli/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,40 @@ func newAdminShardManagementCommands() []cli.Command {
AdminDescribeShard(c)
},
},
{
Name: "describe-task",
Aliases: []string{"dt"},
Usage: "Describe a task based on task Id, task type, shard Id and task visibility timestamp",
Flags: append(
getDBFlags(),
cli.IntFlag{
Name: FlagShardID,
Usage: "The ID of the shard",
},
cli.IntFlag{
Name: FlagTaskID,
Usage: "The ID of the timer task to describe",
},
cli.IntFlag{
Name: FlagTaskType,
Value: 2,
Usage: "Task type: 2 - transfer task, 3 - timer task, 4 - replication task",
},
cli.Int64Flag{
Name: FlagTaskVisibilityTimestamp,
Usage: "Task visibility timestamp in nano",
},
cli.StringFlag{
Name: FlagTargetCluster,
Value: "active",
Usage: "Temporal cluster to use",
},
),
Action: func(c *cli.Context) {
AdminDescribeTask(c)
},
},

{
Name: "closeShard",
Aliases: []string{"clsh"},
Expand Down
18 changes: 18 additions & 0 deletions tools/cli/adminCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,24 @@ func AdminGetShardID(c *cli.Context) {
fmt.Printf("ShardId for workflowId: %v is %v \n", wid, shardID)
}

// AdminDescribeTask outputs the details of a task given Task Id, Task Type, Shard Id and Visibility Timestamp
func AdminDescribeTask(c *cli.Context) {
sid := getRequiredIntOption(c, FlagShardID)
tid := getRequiredIntOption(c, FlagTaskID)
vis := getRequiredInt64Option(c, FlagTaskVisibilityTimestamp)

pFactory := CreatePersistenceFactory(c)
executionManager, err := pFactory.NewExecutionManager(sid)
if err != nil {
ErrorAndExit("Failed to initialize execution manager", err)
}

req := &persistence.GetTimerTaskRequest{ShardID: int32(sid), TaskID: int64(tid), VisibilityTimestamp: time.Unix(0, vis)}
timerTask, err := executionManager.GetTimerTask(req)

prettyPrintJSONObject(timerTask)
}

// AdminRemoveTask describes history host
func AdminRemoveTask(c *cli.Context) {
adminClient := cFactory.AdminClient(c)
Expand Down

0 comments on commit bd7a944

Please sign in to comment.