Skip to content

Commit

Permalink
feat: reduce fetch interval time to 1s
Browse files Browse the repository at this point in the history
  • Loading branch information
tikazyq committed Jun 19, 2023
1 parent a3f94f2 commit 94f5d1d
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 164 deletions.
9 changes: 1 addition & 8 deletions spider/admin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,20 +132,13 @@ func (svc *Service) scheduleTasks(s *models.Spider, opts *interfaces.SpiderRunOp

if svc.isMultiTask(opts) {
// multi tasks
// TODO: implement associated tasks
//mainTask.HasSub = true
//if err := delegate.NewModelDelegate(mainTask).Add(); err != nil {
// return err
//}
nodeIds, err := svc.getNodeIds(opts)
if err != nil {
return nil, err
}
for _, nodeId := range nodeIds {
t := &models.Task{
SpiderId: s.Id,
// TODO: implement associated tasks
//ParentId: mainTask.Id,
SpiderId: s.Id,
Mode: opts.Mode,
Cmd: opts.Cmd,
Param: opts.Param,
Expand Down
2 changes: 1 addition & 1 deletion task/handler/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func NewTaskHandlerService(opts ...Option) (svc2 interfaces.TaskHandlerService,
svc := &Service{
TaskBaseService: baseSvc,
exitWatchDuration: 60 * time.Second,
fetchInterval: 5 * time.Second,
fetchInterval: 1 * time.Second,
fetchTimeout: 15 * time.Second,
reportInterval: 5 * time.Second,
cancelTimeout: 5 * time.Second,
Expand Down
155 changes: 0 additions & 155 deletions task/scheduler/service.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package scheduler

import (
"fmt"
"github.com/apex/log"
config2 "github.com/crawlab-team/crawlab-core/config"
"github.com/crawlab-team/crawlab-core/constants"
"github.com/crawlab-team/crawlab-core/errors"
"github.com/crawlab-team/crawlab-core/grpc/server"
"github.com/crawlab-team/crawlab-core/interfaces"
"github.com/crawlab-team/crawlab-core/models/client"
"github.com/crawlab-team/crawlab-core/models/delegate"
"github.com/crawlab-team/crawlab-core/models/models"
"github.com/crawlab-team/crawlab-core/models/service"
Expand All @@ -24,7 +21,6 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
mongo2 "go.mongodb.org/mongo-driver/mongo"
"go.uber.org/dig"
"math/rand"
"sync"
"time"
)
Expand Down Expand Up @@ -158,157 +154,6 @@ func (svc *Service) SetInterval(interval time.Duration) {
svc.interval = interval
}

func (svc *Service) getTaskQueueItems() (tqList []models.TaskQueueItem, err error) {
opts := &mongo.FindOptions{
Sort: bson.D{
{"p", 1},
{"_id", 1},
},
}
if err := mongo.GetMongoCol(interfaces.ModelColNameTaskQueue).Find(nil, opts).All(&tqList); err != nil {
if err == mongo2.ErrNoDocuments {
return nil, nil
}
return nil, err
}
return tqList, nil
}

func (svc *Service) getResourcesAndNodesMap() (resources map[string]models.Node, nodesMap map[primitive.ObjectID]models.Node, err error) {
nodesMap = map[primitive.ObjectID]models.Node{}
resources = map[string]models.Node{}
query := bson.M{
// enabled: true
"enabled": true,
// active: true
"active": true,
// available_runners > 0
"available_runners": bson.M{
"$gt": 0,
},
}
nodes, err := svc.modelSvc.GetNodeList(query, nil)
if err != nil {
if err == mongo2.ErrNoDocuments {
return nil, nil, nil
}
return nil, nil, err
}
for _, n := range nodes {
nodesMap[n.Id] = n
for i := 0; i < n.AvailableRunners; i++ {
key := fmt.Sprintf("%s:%d", n.Id.Hex(), i)
resources[key] = n
}
}
return resources, nodesMap, nil
}

func (svc *Service) matchResources(tqList []models.TaskQueueItem) (tasks []interfaces.Task, nodesMap map[primitive.ObjectID]models.Node, err error) {
// get resources and nodes map
resources, nodesMap, err := svc.getResourcesAndNodesMap()
if err != nil {
return nil, nil, err
}
if resources == nil || len(resources) == 0 {
return nil, nil, nil
}

// resources list
var resourcesList []models.Node
for _, r := range resources {
resourcesList = append(resourcesList, r)
}

// shuffle resources list
rand.Seed(time.Now().Unix())
rand.Shuffle(len(resourcesList), func(i, j int) {
resourcesList[i], resourcesList[j] = resourcesList[j], resourcesList[i]
})

// iterate task queue items
for _, tq := range tqList {
// task
t, err := svc.modelSvc.GetTaskById(tq.GetId())
if err != nil {
// remove task queue item if it is not found in tasks
_ = mongo.GetMongoCol(interfaces.ModelColNameTaskQueue).DeleteId(tq.GetId())

// set task status as abnormal
t.Status = constants.TaskStatusAbnormal
t.Error = err.Error()
_ = delegate.NewModelDelegate(t, nil).Save()
continue
}

// iterate shuffled resources to match a resource
for i, r := range resourcesList {
// If node id is unset or node id of task matches with resource id (node id),
// assign corresponding resource id to the task
if t.GetNodeId().IsZero() ||
t.GetNodeId() == r.GetId() {
// assign resource id
t.NodeId = r.GetId()

// append to tasks
tasks = append(tasks, t)

// delete from resources list
resourcesList = append(resourcesList[:i], resourcesList[(i+1):]...)

// decrement available runners
n := nodesMap[r.GetId()]
n.DecrementAvailableRunners()

// break loop
break
}
}
}

return tasks, nodesMap, nil
}

func (svc *Service) updateResources(nodesMap map[primitive.ObjectID]models.Node) (err error) {
for _, n := range nodesMap {
if err := delegate.NewModelNodeDelegate(&n).Save(); err != nil {
return err
}
}
return nil
}

func (svc *Service) updateTasks(tasks []interfaces.Task) (err error) {
for _, t := range tasks {
// save task with node id
if err := delegate.NewModelDelegate(t).Save(); err != nil {
return err
}
}
return nil
}

func (svc *Service) deleteTaskQueueItems(tasks []interfaces.Task) (err error) {
for _, t := range tasks {
if err := mongo.GetMongoCol(interfaces.ModelColNameTaskQueue).DeleteId(t.GetId()); err != nil {
log.Warnf("task[id: %s] missing task queue: %s", t.GetId(), err.Error())
continue
}
}
return nil
}

func (svc *Service) handleTaskError(n interfaces.Node, t interfaces.Task, err error) {
trace.PrintError(err)
t.SetStatus(constants.TaskStatusError)
t.SetError(err.Error())
if n.GetIsMaster() {
_ = delegate.NewModelDelegate(t).Save()
} else {
_ = client.NewModelDelegate(t).Save()
}
}

// initTaskStatus initialize task status of existing tasks
func (svc *Service) initTaskStatus() {
// set status of running tasks as TaskStatusAbnormal
Expand Down

0 comments on commit 94f5d1d

Please sign in to comment.