Skip to content

Commit

Permalink
fix: still run tasks even if node is disabled
Browse files Browse the repository at this point in the history
  • Loading branch information
tikazyq committed Apr 8, 2023
1 parent 3bb0d1e commit d687e0e
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 141 deletions.
6 changes: 0 additions & 6 deletions interfaces/task_scheduler_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,6 @@ type TaskSchedulerService interface {
TaskBaseService
// Enqueue task into the task queue
Enqueue(t Task) (err error)
// DequeueAndSchedule continuously dequeue task and schedule to corresponding node
DequeueAndSchedule()
// Dequeue task with node info from the task queue
Dequeue() (tasks []Task, err error)
// Schedule task to corresponding node
Schedule(tasks []Task) (err error)
// Cancel task to corresponding node
Cancel(id primitive.ObjectID, args ...interface{}) (err error)
// SetInterval set the interval or duration between two adjacent fetches
Expand Down
5 changes: 5 additions & 0 deletions task/handler/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func (svc *Service) Fetch() {
continue
}

// skip if node is not active or enabled
if !n.GetActive() || !n.GetEnabled() {
continue
}

// validate if there are available runners
if svc.getRunnerCount() >= n.GetMaxRunners() {
continue
Expand Down
135 changes: 0 additions & 135 deletions task/scheduler/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/crawlab-team/crawlab-db/mongo"
grpc "github.com/crawlab-team/crawlab-grpc"
"github.com/crawlab-team/go-trace"
"github.com/joeshaw/multierror"
"github.com/spf13/viper"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
Expand Down Expand Up @@ -93,140 +92,6 @@ func (svc *Service) Enqueue(t interfaces.Task) (err error) {
return nil
}

func (svc *Service) DequeueAndSchedule() {
for {
if svc.IsStopped() {
return
}

// wait
time.Sleep(svc.interval)

if err := mongo.RunTransaction(func(sc mongo2.SessionContext) error {
// dequeue tasks
tasks, err := svc.Dequeue()
if err != nil {
return trace.TraceError(err)
}

// skip if no tasks available
if tasks == nil || len(tasks) == 0 {
return nil
}

// schedule tasks
if err := svc.Schedule(tasks); err != nil {
return trace.TraceError(err)
}

return nil
}); err != nil {
trace.PrintError(err)
}
}
}

func (svc *Service) Dequeue() (tasks []interfaces.Task, err error) {
// get task queue items
tqList, err := svc.getTaskQueueItems()
if err != nil {
return nil, err
}
if tqList == nil {
return nil, nil
}

// match resources
tasks, nodesMap, err := svc.matchResources(tqList)
if err != nil {
return nil, err
}
if tasks == nil {
return nil, nil
}

// update resources
if err := svc.updateResources(nodesMap); err != nil {
return nil, err
}

// update tasks
if err := svc.updateTasks(tasks); err != nil {
return nil, err
}

// delete task queue items
if err := svc.deleteTaskQueueItems(tasks); err != nil {
return nil, err
}

return tasks, nil
}

func (svc *Service) Schedule(tasks []interfaces.Task) (err error) {
var e multierror.Errors

// nodes cache
nodesCache := sync.Map{}

// wait group
wg := sync.WaitGroup{}
wg.Add(len(tasks))

// iterate tasks and execute each of them
for _, t := range tasks {
go func(t interfaces.Task) {
var err error

// node of the task
var n interfaces.Node
res, ok := nodesCache.Load(t.GetNodeId())
if !ok {
// not exists in cache
n, err = svc.modelSvc.GetNodeById(t.GetNodeId())
if err == nil {
nodesCache.Store(n.GetId(), n)
}
} else {
// exists in cache
n, ok = res.(interfaces.Node)
if !ok {
err = errors.ErrorTaskInvalidType
}
}
if err != nil {
e = append(e, err)
svc.handleTaskError(n, t, err)
wg.Done()
return
}

// schedule task
if n.GetIsMaster() {
// execute task on master
err = svc.handlerSvc.Run(t.GetId())
} else {
// send to execute task on worker nodes
err = svc.svr.SendStreamMessageWithData("node:"+n.GetKey(), grpc.StreamMessageCode_RUN_TASK, t)
}
if err != nil {
e = append(e, err)
svc.handleTaskError(n, t, err)
wg.Done()
return
}

// success
wg.Done()
}(t)
}

// wait
wg.Wait()

return e.Err()
}

func (svc *Service) Cancel(id primitive.ObjectID, args ...interface{}) (err error) {
// user
u := utils.GetUserFromArgs(args...)
Expand Down

0 comments on commit d687e0e

Please sign in to comment.