Skip to content

Commit

Permalink
Create sonic scheduler for test
Browse files Browse the repository at this point in the history
  • Loading branch information
losha-ms committed Sep 27, 2022
1 parent d831125 commit ea799e2
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 3 deletions.
10 changes: 10 additions & 0 deletions cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,26 @@ import (
"sigs.k8s.io/scheduler-plugins/pkg/podstate"
"sigs.k8s.io/scheduler-plugins/pkg/preemptiontoleration"
"sigs.k8s.io/scheduler-plugins/pkg/qos"
"sigs.k8s.io/scheduler-plugins/pkg/sonic"
"sigs.k8s.io/scheduler-plugins/pkg/trimaran/loadvariationriskbalancing"
"sigs.k8s.io/scheduler-plugins/pkg/trimaran/targetloadpacking"

// Ensure scheme package is initialized.
"fmt"
"runtime/debug"

_ "sigs.k8s.io/scheduler-plugins/apis/config/scheme"
)

func main() {
// Register custom plugins to the scheduler framework.
// Later they can consist of scheduler profile(s) and hence
// used by various kinds of workloads.
defer func() {
if r := recover(); r != nil {
fmt.Println("stacktrace from panic: \n" + string(debug.Stack()))
}
}()
command := app.NewSchedulerCommand(
app.WithPlugin(capacityscheduling.Name, capacityscheduling.New),
app.WithPlugin(coscheduling.Name, coscheduling.New),
Expand All @@ -52,6 +61,7 @@ func main() {
// app.WithPlugin(crossnodepreemption.Name, crossnodepreemption.New),
app.WithPlugin(podstate.Name, podstate.New),
app.WithPlugin(qos.Name, qos.New),
app.WithPlugin(sonic.Name, sonic.New),
)

code := cli.Run(command)
Expand Down
6 changes: 3 additions & 3 deletions manifests/install/charts/as-a-second-scheduler/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@

scheduler:
name: scheduler-plugins-scheduler
image: k8s.gcr.io/scheduler-plugins/kube-scheduler:v0.23.10
image: localhost:5000/scheduler-plugins/kube-scheduler:latest
namespace: scheduler-plugins
replicaCount: 1
leaderElect: false

controller:
name: scheduler-plugins-controller
image: k8s.gcr.io/scheduler-plugins/controller:v0.23.10
image: localhost:5000/scheduler-plugins/controller:latest
namespace: scheduler-plugins
replicaCount: 1

# LoadVariationRiskBalancing and TargetLoadPacking are not enabled by default
# as they need extra RBAC privileges on metrics.k8s.io.

plugins:
enabled: ["Coscheduling","CapacityScheduling","NodeResourceTopologyMatch","NodeResourcesAllocatable"]
enabled: ["Coscheduling","CapacityScheduling","NodeResourceTopologyMatch","NodeResourcesAllocatable", "SonicScheduling"]
disabled: ["PrioritySort"] # only in-tree plugins need to be defined here

# Customize the enabled plugins' config.
Expand Down
37 changes: 37 additions & 0 deletions pkg/sonic/pod_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sonic

import (
"time"

v1 "k8s.io/api/core/v1"
)

// DefaultWaitTime is 60s if ScheduleTimeoutSeconds is not specified.
const DefaultWaitTime = 60 * time.Second

// GetPodGroupLabel get pod group from pod annotations
func GetPodLabel(pod *v1.Pod, label_name string) string {
return pod.Labels[label_name]

}

// GetPodGroupLabel get pod group from pod annotations
func GetSchedulerName(pod *v1.Pod) string {
return pod.Spec.SchedulerName
}
80 changes: 80 additions & 0 deletions pkg/sonic/pod_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package sonic

/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

import (
"fmt"
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
informerv1 "k8s.io/client-go/informers/core/v1"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
)

type Status string

const (
Success Status = "Success"
Wait Status = "Wait"
)

// PodGroupManager defines the scheduling operation called
type PodManager struct {
// snapshotSharedLister is pod shared list
snapshotSharedLister framework.SharedLister
// scheduleTimeout is the default timeout for podgroup scheduling.
// If podgroup's scheduleTimeoutSeconds is set, it will be used.
scheduleTimeout *time.Duration
// podLister is pod lister
podLister listerv1.PodLister
// reserveResourcePercentage is the reserved resource for the max finished group, range (0,100]
reserveResourcePercentage int32
sync.RWMutex
}

// NewPodGroupManager creates a new operation object.
func NewPodManager(snapshotSharedLister framework.SharedLister, scheduleTimeout *time.Duration,
podInformer informerv1.PodInformer) *PodManager {
podMgr := &PodManager{
snapshotSharedLister: snapshotSharedLister,
scheduleTimeout: scheduleTimeout,
podLister: podInformer.Lister(),
}
return podMgr
}

// GetNamespacedName returns the namespaced name.
func (pm *PodManager) GetPodCount(ns string) int {
pods, err := pm.podLister.Pods(ns).List(
labels.SelectorFromSet(labels.Set{"test": "liveness"}),
)
if err != nil {
klog.ErrorS(err, "Failed to obtain pods with label, test:liveness")
return 0
}

return len(pods)
}

// GetNamespacedName returns the namespaced name.
func GetNamespacedName(obj metav1.Object) string {
return fmt.Sprintf("%v/%v", obj.GetNamespace(), obj.GetName())
}
148 changes: 148 additions & 0 deletions pkg/sonic/sonic_scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sonic

import (
"context"
"fmt"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

"sigs.k8s.io/scheduler-plugins/apis/config"
"sigs.k8s.io/scheduler-plugins/apis/scheduling"
)

// Coscheduling is a plugin that schedules pods in a group.
type SonicScheduling struct {
frameworkHandler framework.Handle
podMgr *PodManager
scheduleTimeout *time.Duration
}

var _ framework.PreFilterPlugin = &SonicScheduling{}
var _ framework.PostFilterPlugin = &SonicScheduling{}
var _ framework.PermitPlugin = &SonicScheduling{}
var _ framework.ReservePlugin = &SonicScheduling{}
var _ framework.PreBindPlugin = &SonicScheduling{}
var _ framework.PostBindPlugin = &SonicScheduling{}
var _ framework.EnqueueExtensions = &SonicScheduling{}

const (
// Name is the name of the plugin used in Registry and configurations.
Name = "SonicScheduling"
)

// New initializes and returns a new Coscheduling plugin.
func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
args, ok := obj.(*config.CoschedulingArgs)
if !ok {
klog.Error(fmt.Errorf("want args to be of type CoschedulingArgs, got %T", obj))
}

podInformer := handle.SharedInformerFactory().Core().V1().Pods()
scheduleTimeDuration := time.Duration(args.PermitWaitingTimeSeconds) * time.Second

podMgr := NewPodManager(handle.SnapshotSharedLister(), &scheduleTimeDuration, podInformer)
plugin := &SonicScheduling{
frameworkHandler: handle,
podMgr: podMgr,
scheduleTimeout: &scheduleTimeDuration,
}

return plugin, nil
}

func (ss *SonicScheduling) EventsToRegister() []framework.ClusterEvent {
// To register a custom event, follow the naming convention at:
// https://git.k8s.io/kubernetes/pkg/scheduler/eventhandlers.go#L403-L410
pgGVK := fmt.Sprintf("podgroups.v1alpha1.%v", scheduling.GroupName)
return []framework.ClusterEvent{
{Resource: framework.Pod, ActionType: framework.Add},
{Resource: framework.GVK(pgGVK), ActionType: framework.Add | framework.Update},
}
}

// Name returns name of the plugin. It is used in logs, etc.
func (ss *SonicScheduling) Name() string {
return Name
}

// PreFilter performs the following validations.
// 1. Whether the PodGroup that the Pod belongs to is on the deny list.
// 2. Whether the total number of pods in a PodGroup is less than its `minMember`.
func (ss *SonicScheduling) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
// If PreFilter fails, return framework.UnschedulableAndUnresolvable to avoid
// any preemption attempts.

klog.V(4).InfoS("PreFilter", "pod", klog.KObj(pod))
return nil, framework.NewStatus(framework.Success, "")
}

// PostFilter is used to reject a group of pods if a pod does not pass PreFilter or Filter.
func (ss *SonicScheduling) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod,
filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
ss.log("PostFilter", "PostFilter is called for pod ", pod, "")
count := ss.podMgr.GetPodCount(pod.Namespace)
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable,
fmt.Sprintf("There are %v pod in namespace %v,Pod %v is unschedulable even after PostFilter", count, pod.Namespace, pod.Name))
}

// PreFilterExtensions returns a PreFilterExtensions interface if the plugin implements one.
func (ss *SonicScheduling) PreFilterExtensions() framework.PreFilterExtensions {
return nil
}

// Permit is the functions invoked by the framework at "Permit" extension point.
func (ss *SonicScheduling) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
waitTime := *ss.scheduleTimeout
var retStatus *framework.Status
retStatus = framework.NewStatus(framework.Success, "")
waitTime = 0
ss.log("Permit", "Permit is called for pod for precheck", pod, nodeName)
return retStatus, waitTime
}

// Reserve is the functions invoked by the framework at "reserve" extension point.
func (ss *SonicScheduling) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
ss.log("Reserve", "Reserve is called for pod", pod, nodeName)
return nil
}

// Unreserve rejects all other Pods in the PodGroup when one of the pods in the group times out.
func (ss *SonicScheduling) Unreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
ss.log("Unreserve", "Unreserve is called for pod", pod, nodeName)
return
}

// PostBind is called after a pod is successfully bound. These plugins are used update PodGroup when pod is bound.
func (ss *SonicScheduling) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
ss.log("PreBind", "pod pre-upgrade: lock the device ", pod, nodeName)
return framework.NewStatus(framework.Success, "")
}

// PostBind is called after a pod is successfully bound.
func (ss *SonicScheduling) PostBind(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeName string) {
ss.log("PostBind", "pod post-upgrade: release device lock", pod, nodeName)
}

func (ss *SonicScheduling) log(method, msg string, pod *v1.Pod, nodeName string) {
klog.ErrorS(nil, method, msg+" pod: ", klog.KObj(pod))
}

0 comments on commit ea799e2

Please sign in to comment.