Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bcs-task manager add ListTask #3545

Merged
merged 2 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bcs-common/common/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ func (m *TaskManager) GetTaskWithID(ctx context.Context, taskId string) (*types.
return GetGlobalStorage().GetTask(ctx, taskId)
}

// ListTask list tasks with options, returns a paginated list of tasks
func (m *TaskManager) ListTask(ctx context.Context, opt *istore.ListOption) (*istore.Pagination[types.Task], error) {
return GetGlobalStorage().ListTask(ctx, opt)
}

// UpdateTask update task
// ! warning: modify task status will cause task status not consistent
func (m *TaskManager) UpdateTask(ctx context.Context, task *types.Task) error {
Expand Down
57 changes: 18 additions & 39 deletions bcs-common/common/task/stores/iface/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,46 +22,25 @@ import (

// ListOption ...
type ListOption struct {
// Sort map for sort list results
Sort map[string]int
// Offset offset for list results
Offset int64
// Limit limit for list results
Limit int64
// All for all results
All bool
// Count for index
Count bool
// SkipDecrypt skip data decrypt
SkipDecrypt bool
TaskID string
TaskType string
TaskName string
TaskIndex string
TaskIndexType string
CurrentStep string
Status string
Creator string
StartGte *time.Time // StartGte start time greater or equal to
StartLte *time.Time // StartLte start time less or equal to
Sort map[string]int // Sort map for sort list results
Offset int64 // Offset offset for list results
Limit int64 // Limit limit for list results
}

// UpdateOption ...
type UpdateOption struct {
CurrentStep string `json:"currentStep"`
CommonParams map[string]string `json:"commonParams"`
ExtraJson string `json:"extraJson"`
Status string `json:"status"`
Message string `json:"message"`
Start time.Time `json:"start"`
End time.Time `json:"end"`
ExecutionTime uint32 `json:"executionTime"`
LastUpdate time.Time `json:"lastUpdate"`
Updater string `json:"updater"`
StepOptions map[string]*UpdateStepOption `json:"stepOptions"`
}

// UpdateStepOption ...
type UpdateStepOption struct {
Params map[string]string `json:"params"`
Extras string `json:"extras"`
Status string `json:"status"`
Message string `json:"message"`
RetryCount uint32 `json:"retryCount"`
Start time.Time `json:"start"`
End time.Time `json:"end"`
ExecutionTime uint32 `json:"executionTime"`
LastUpdate time.Time `json:"lastUpdate"`
// Pagination generic pagination for list results
type Pagination[T any] struct {
Count int64 `json:"count"`
Items []*T `json:"items"`
}

// PatchOption 主要实时更新params, payload信息
Expand All @@ -74,7 +53,7 @@ type PatchOption struct {
type Store interface {
EnsureTable(ctx context.Context, dst ...any) error
CreateTask(ctx context.Context, task *types.Task) error
ListTask(ctx context.Context, opt *ListOption) ([]types.Task, error)
ListTask(ctx context.Context, opt *ListOption) (*Pagination[types.Task], error)
GetTask(ctx context.Context, taskID string) (*types.Task, error)
DeleteTask(ctx context.Context, taskID string) error
UpdateTask(ctx context.Context, task *types.Task) error
Expand Down
2 changes: 1 addition & 1 deletion bcs-common/common/task/stores/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *memStore) CreateTask(ctx context.Context, task *types.Task) error {
return nil
}

func (s *memStore) ListTask(ctx context.Context, opt *iface.ListOption) ([]types.Task, error) {
func (s *memStore) ListTask(ctx context.Context, opt *iface.ListOption) (*iface.Pagination[types.Task], error) {
return nil, types.ErrNotImplemented
}

Expand Down
10 changes: 7 additions & 3 deletions bcs-common/common/task/stores/mongo/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func (m *ModelTask) GetTask(ctx context.Context, taskID string) (*types.Task, er
}

// ListTask list clusters
func (m *ModelTask) ListTask(ctx context.Context, opt *iface.ListOption) ([]types.Task, error) {
taskList := make([]types.Task, 0)
func (m *ModelTask) ListTask(ctx context.Context, opt *iface.ListOption) (*iface.Pagination[types.Task], error) {
taskList := make([]*types.Task, 0)
finder := m.db.Table(m.tableName).Find(operator.EmptyCondition)
if len(opt.Sort) != 0 {
finder = finder.WithSort(MapInt2MapIf(opt.Sort))
Expand All @@ -182,5 +182,9 @@ func (m *ModelTask) ListTask(ctx context.Context, opt *iface.ListOption) ([]type
if err := finder.All(ctx, &taskList); err != nil {
return nil, err
}
return taskList, nil
result := &iface.Pagination[types.Task]{
Count: int64(len(taskList)),
Items: taskList,
}
return result, nil
}
18 changes: 18 additions & 0 deletions bcs-common/common/task/stores/mysql/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
package mysql

import (
"gorm.io/gorm"

"github.com/Tencent/bk-bcs/bcs-common/common/task/types"
)

Expand Down Expand Up @@ -164,3 +166,19 @@ func getUpdateStepRecord(t *types.Step) *StepRecord {
}
return record
}

// FindByPage 分页查询
func FindByPage[T any](db *gorm.DB, offset int, limit int) (result []*T, count int64, err error) {
err = db.Offset(offset).Limit(limit).Find(&result).Error
if err != nil {
return
}

if size := len(result); 0 < limit && 0 < size && size < limit {
count = int64(size + offset)
return
}

err = db.Offset(-1).Limit(-1).Count(&count).Error
return
}
50 changes: 47 additions & 3 deletions bcs-common/common/task/stores/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *mysqlStore) initDsn(raw string) {
s.dsn = u.String()
}

// EnsureTable 创建db表
// EnsureTable implement istore EnsureTable interface
func (s *mysqlStore) EnsureTable(ctx context.Context, dst ...any) error {
// 没有自定义数据, 使用默认表结构
if len(dst) == 0 {
Expand All @@ -86,6 +86,7 @@ func (s *mysqlStore) EnsureTable(ctx context.Context, dst ...any) error {
return s.db.WithContext(ctx).AutoMigrate(dst...)
}

// CreateTask implement istore CreateTask interface
func (s *mysqlStore) CreateTask(ctx context.Context, task *types.Task) error {
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
record := getTaskRecord(task)
Expand All @@ -102,10 +103,50 @@ func (s *mysqlStore) CreateTask(ctx context.Context, task *types.Task) error {
})
}

func (s *mysqlStore) ListTask(ctx context.Context, opt *iface.ListOption) ([]types.Task, error) {
return nil, types.ErrNotImplemented
// ListTask implement istore ListTask interface
func (s *mysqlStore) ListTask(ctx context.Context, opt *iface.ListOption) (*iface.Pagination[types.Task], error) {
tx := s.db.WithContext(ctx)

// 条件过滤 0值gorm自动忽略查询
tx = tx.Where(&TaskRecord{
TaskID: opt.TaskID,
TaskType: opt.TaskType,
TaskName: opt.TaskName,
TaskIndex: opt.TaskIndex,
TaskIndexType: opt.TaskIndexType,
Status: opt.Status,
CurrentStep: opt.CurrentStep,
Creator: opt.Creator,
})

// mysql store 使用创建时间过滤
if opt.StartGte != nil {
tx = tx.Where("created_at >= ?", opt.StartGte)
}
if opt.StartLte != nil {
tx = tx.Where("created_at <= ?", opt.StartLte)
}

// 只使用id排序
tx = tx.Order("id DESC")

result, count, err := FindByPage[TaskRecord](tx, int(opt.Offset), int(opt.Limit))
if err != nil {
return nil, err
}

items := make([]*types.Task, 0, len(result))
for _, record := range result {
items = append(items, toTask(record, []*StepRecord{}))
}

return &iface.Pagination[types.Task]{
Count: count,
Items: items,
}, nil
}

// UpdateTask implement istore UpdateTask interface
func (s *mysqlStore) UpdateTask(ctx context.Context, task *types.Task) error {
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
updateTask := getUpdateTaskRecord(task)
Expand All @@ -132,6 +173,7 @@ func (s *mysqlStore) UpdateTask(ctx context.Context, task *types.Task) error {
})
}

// DeleteTask implement istore DeleteTask interface
func (s *mysqlStore) DeleteTask(ctx context.Context, taskID string) error {
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
if err := tx.Where("task_id = ?", taskID).Delete(&TaskRecord{}).Error; err != nil {
Expand All @@ -145,6 +187,7 @@ func (s *mysqlStore) DeleteTask(ctx context.Context, taskID string) error {
})
}

// GetTask implement istore GetTask interface
func (s *mysqlStore) GetTask(ctx context.Context, taskID string) (*types.Task, error) {
tx := s.db.WithContext(ctx)
taskRecord := TaskRecord{}
Expand All @@ -159,6 +202,7 @@ func (s *mysqlStore) GetTask(ctx context.Context, taskID string) (*types.Task, e
return toTask(&taskRecord, stepRecord), nil
}

// PatchTask implement istore PatchTask interface
func (s *mysqlStore) PatchTask(ctx context.Context, opt *iface.PatchOption) error {
return types.ErrNotImplemented
}
Loading