Skip to content

Commit

Permalink
feature: add prebind for scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: googs1025 <[email protected]>
  • Loading branch information
googs1025 committed Jul 30, 2024
1 parent 7dc3db9 commit 5e4c5db
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 0 deletions.
11 changes: 11 additions & 0 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,17 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
}
}

// before binding to a node, run prebind fn
if err := ssn.PreBindFn(task, bestNode.Name); err != nil {
klog.V(3).Infof("PreBind for task %s/%s failed for: %v", task.Namespace, task.Name, err)
// If prebind failed, we need to unallocate the task
err = ssn.Unallocate(task)
if err != nil {
klog.V(3).Infof("Unallocate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
}
continue
}

// Allocate idle resource to the task.
if task.InitResreq.LessEqual(bestNode.Idle, api.Zero) {
klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, bestNode.Name)
Expand Down
11 changes: 11 additions & 0 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ func (backfill *Action) Execute(ssn *framework.Session) {
}
}

// before binding to a node, run prebind fn
if err := ssn.PreBindFn(task, node.Name); err != nil {
klog.V(3).Infof("PreBind for task %s/%s failed for: %v", task.Namespace, task.Name, err)
// If prebind failed, we need to unallocate the task
err = ssn.Unallocate(task)
if err != nil {
klog.V(3).Infof("Unallocate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
}
continue
}

klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID)
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,6 @@ type VictimTasksFn func([]*TaskInfo) []*TaskInfo

// AllocatableFn is the func declaration used to check whether the task can be allocated
type AllocatableFn func(*QueueInfo, *TaskInfo) bool

// PreBindFn is the func declaration used to do pre-bind operations.
type PreBindFn func(task *TaskInfo, nodeName string) error
36 changes: 36 additions & 0 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type Session struct {
reservedNodesFns map[string]api.ReservedNodesFn
victimTasksFns map[string][]api.VictimTasksFn
jobStarvingFns map[string]api.ValidateFn
preBindFns map[string]api.PreBindFn
}

func openSession(cache cache.Cache) *Session {
Expand Down Expand Up @@ -437,6 +438,41 @@ func (ssn *Session) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err er
return nil
}

// Unallocate is called when the prebind function fails
func (ssn *Session) Unallocate(task *api.TaskInfo) error {
ssn.cache.RevertVolumes(task, task.PodVolumes)

// Update status in session
job, found := ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Pending); err != nil {
klog.Errorf("Failed to update task <%v/%v> status to %v when unallocating in Session <%v>: %v",
task.Namespace, task.Name, api.Pending, ssn.UID, err)
}
} else {
klog.Errorf("Failed to find Job <%s> in Session <%s> index when unallocating.",
task.Job, ssn.UID)
}

if node, found := ssn.Nodes[task.NodeName]; found {
klog.V(3).Infof("Remove Task <%v> on node <%v>", task.Name, task.NodeName)
err := node.RemoveTask(task)
if err != nil {
klog.Errorf("Failed to remove Task <%v> on node <%v> when unallocating: %s", task.Name, task.NodeName, err.Error())
}
}

for _, eh := range ssn.eventHandlers {
if eh.DeallocateFunc != nil {
eh.DeallocateFunc(&Event{
Task: task,
})
}
}
task.NodeName = ""
return nil
}

func (ssn *Session) dispatch(task *api.TaskInfo) error {
if err := ssn.cache.AddBindTask(task); err != nil {
return err
Expand Down
23 changes: 23 additions & 0 deletions pkg/scheduler/framework/session_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ func (ssn *Session) AddJobStarvingFns(name string, fn api.ValidateFn) {
ssn.jobStarvingFns[name] = fn
}

// AddPreBindFns add preBindFns function
func (ssn *Session) AddPreBindFns(name string, fn api.PreBindFn) {
ssn.preBindFns[name] = fn
}

// Reclaimable invoke reclaimable function of the plugins
func (ssn *Session) Reclaimable(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) []*api.TaskInfo {
var victims []*api.TaskInfo
Expand Down Expand Up @@ -807,3 +812,21 @@ func (ssn *Session) BuildVictimsPriorityQueue(victims []*api.TaskInfo) *util.Pri
}
return victimsQueue
}

// PreBindFn invoke pre-bind function of the plugins
// is called before binding a task to a node.
func (ssn *Session) PreBindFn(task *api.TaskInfo, node string) error {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
pfn, found := ssn.preBindFns[plugin.Name]
if !found {
continue
}
err := pfn(task, node)
if err != nil {
return err
}
}
}
return nil
}

0 comments on commit 5e4c5db

Please sign in to comment.