Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PodDisruptionBudget to scheduler cache. #53914

Merged
merged 2 commits into from
Oct 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugin/cmd/kube-scheduler/app/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_library(
"//vendor/k8s.io/client-go/informers/apps/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/informers/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/informers/policy/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
Expand Down
3 changes: 3 additions & 0 deletions plugin/cmd/kube-scheduler/app/configurator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
appsinformers "k8s.io/client-go/informers/apps/v1beta1"
coreinformers "k8s.io/client-go/informers/core/v1"
extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1"
policyinformers "k8s.io/client-go/informers/policy/v1beta1"
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -86,6 +87,7 @@ func CreateScheduler(
replicaSetInformer extensionsinformers.ReplicaSetInformer,
statefulSetInformer appsinformers.StatefulSetInformer,
serviceInformer coreinformers.ServiceInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer,
recorder record.EventRecorder,
) (*scheduler.Scheduler, error) {
configurator := factory.NewConfigFactory(
Expand All @@ -99,6 +101,7 @@ func CreateScheduler(
replicaSetInformer,
statefulSetInformer,
serviceInformer,
pdbInformer,
s.HardPodAffinitySymmetricWeight,
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
)
Expand Down
1 change: 1 addition & 0 deletions plugin/cmd/kube-scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func Run(s *options.SchedulerServer) error {
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
recorder,
)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ func ClusterRoles() []rbac.ClusterRole {
rbac.NewRule(Read...).Groups(legacyGroup).Resources("services", "replicationcontrollers").RuleOrDie(),
rbac.NewRule(Read...).Groups(extensionsGroup).Resources("replicasets").RuleOrDie(),
rbac.NewRule(Read...).Groups(appsGroup).Resources("statefulsets").RuleOrDie(),
// things that pods use
// things that pods use or applies to them
rbac.NewRule(Read...).Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(),
rbac.NewRule(Read...).Groups(legacyGroup).Resources("persistentvolumeclaims", "persistentvolumes").RuleOrDie(),
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,14 @@ items:
- get
- list
- watch
- apiGroups:
- policy
resources:
- poddisruptionbudgets
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
).CreateFromConfig(policy); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions plugin/pkg/scheduler/factory/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"//plugin/pkg/scheduler/util:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/policy/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
Expand All @@ -40,10 +41,12 @@ go_library(
"//vendor/k8s.io/client-go/informers/apps/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/informers/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/informers/policy/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/listers/apps/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/listers/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)
Expand Down
66 changes: 66 additions & 0 deletions plugin/pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/golang/glog"

"k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -39,10 +40,12 @@ import (
appsinformers "k8s.io/client-go/informers/apps/v1beta1"
coreinformers "k8s.io/client-go/informers/core/v1"
extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1"
policyinformers "k8s.io/client-go/informers/policy/v1beta1"
clientset "k8s.io/client-go/kubernetes"
appslisters "k8s.io/client-go/listers/apps/v1beta1"
corelisters "k8s.io/client-go/listers/core/v1"
extensionslisters "k8s.io/client-go/listers/extensions/v1beta1"
policylisters "k8s.io/client-go/listers/policy/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api/helper"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
Expand Down Expand Up @@ -93,6 +96,8 @@ type configFactory struct {
replicaSetLister extensionslisters.ReplicaSetLister
// a means to list all statefulsets
statefulSetLister appslisters.StatefulSetLister
// a means to list all PodDisruptionBudgets
pdbLister policylisters.PodDisruptionBudgetLister

// Close this to stop all reflectors
StopEverything chan struct{}
Expand Down Expand Up @@ -130,6 +135,7 @@ func NewConfigFactory(
replicaSetInformer extensionsinformers.ReplicaSetInformer,
statefulSetInformer appsinformers.StatefulSetInformer,
serviceInformer coreinformers.ServiceInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer,
hardPodAffinitySymmetricWeight int,
enableEquivalenceClassCache bool,
) scheduler.Configurator {
Expand All @@ -146,6 +152,7 @@ func NewConfigFactory(
controllerLister: replicationControllerInformer.Lister(),
replicaSetLister: replicaSetInformer.Lister(),
statefulSetLister: statefulSetInformer.Lister(),
pdbLister: pdbInformer.Lister(),
schedulerCache: schedulerCache,
StopEverything: stopEverything,
schedulerName: schedulerName,
Expand Down Expand Up @@ -220,6 +227,15 @@ func NewConfigFactory(
)
c.nodeLister = nodeInformer.Lister()

pdbInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addPDBToCache,
UpdateFunc: c.updatePDBInCache,
DeleteFunc: c.deletePDBFromCache,
},
)
c.pdbLister = pdbInformer.Lister()

// On add and delete of PVs, it will affect equivalence cache items
// related to persistent volume
pvInformer.Informer().AddEventHandler(
Expand Down Expand Up @@ -654,6 +670,56 @@ func (c *configFactory) deleteNodeFromCache(obj interface{}) {
}
}

func (c *configFactory) addPDBToCache(obj interface{}) {
pdb, ok := obj.(*v1beta1.PodDisruptionBudget)
if !ok {
glog.Errorf("cannot convert to *v1beta1.PodDisruptionBudget: %v", obj)
return
}

if err := c.schedulerCache.AddPDB(pdb); err != nil {
glog.Errorf("scheduler cache AddPDB failed: %v", err)
}
}

func (c *configFactory) updatePDBInCache(oldObj, newObj interface{}) {
oldPDB, ok := oldObj.(*v1beta1.PodDisruptionBudget)
if !ok {
glog.Errorf("cannot convert oldObj to *v1beta1.PodDisruptionBudget: %v", oldObj)
return
}
newPDB, ok := newObj.(*v1beta1.PodDisruptionBudget)
if !ok {
glog.Errorf("cannot convert newObj to *v1beta1.PodDisruptionBudget: %v", newObj)
return
}

if err := c.schedulerCache.UpdatePDB(oldPDB, newPDB); err != nil {
glog.Errorf("scheduler cache UpdatePDB failed: %v", err)
}
}

func (c *configFactory) deletePDBFromCache(obj interface{}) {
var pdb *v1beta1.PodDisruptionBudget
switch t := obj.(type) {
case *v1beta1.PodDisruptionBudget:
pdb = t
case cache.DeletedFinalStateUnknown:
var ok bool
pdb, ok = t.Obj.(*v1beta1.PodDisruptionBudget)
if !ok {
glog.Errorf("cannot convert to *v1beta1.PodDisruptionBudget: %v", t.Obj)
return
}
default:
glog.Errorf("cannot convert to *v1beta1.PodDisruptionBudget: %v", t)
return
}
if err := c.schedulerCache.RemovePDB(pdb); err != nil {
glog.Errorf("scheduler cache RemovePDB failed: %v", err)
}
}

// Create creates a scheduler with the default algorithm provider.
func (f *configFactory) Create() (*scheduler.Config, error) {
return f.CreateFromProvider(DefaultProvider)
Expand Down
9 changes: 9 additions & 0 deletions plugin/pkg/scheduler/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func TestCreate(t *testing.T) {
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
Expand Down Expand Up @@ -96,6 +97,7 @@ func TestCreateFromConfig(t *testing.T) {
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
Expand Down Expand Up @@ -155,6 +157,7 @@ func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) {
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
Expand Down Expand Up @@ -215,6 +218,7 @@ func TestCreateFromEmptyConfig(t *testing.T) {
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
Expand Down Expand Up @@ -272,6 +276,7 @@ func TestDefaultErrorFunc(t *testing.T) {
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
Expand Down Expand Up @@ -385,6 +390,7 @@ func TestResponsibleForPod(t *testing.T) {
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
Expand All @@ -400,6 +406,7 @@ func TestResponsibleForPod(t *testing.T) {
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
Expand Down Expand Up @@ -470,6 +477,7 @@ func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) {
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
-1,
enableEquivalenceCache,
)
Expand Down Expand Up @@ -516,6 +524,7 @@ func TestInvalidFactoryArgs(t *testing.T) {
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
test.hardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
Expand Down
3 changes: 3 additions & 0 deletions plugin/pkg/scheduler/schedulercache/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//plugin/pkg/scheduler/util:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/policy/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
Expand All @@ -38,9 +39,11 @@ go_test(
"//plugin/pkg/scheduler/algorithm/priorities/util:go_default_library",
"//plugin/pkg/scheduler/util:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/policy/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
],
)

Expand Down
36 changes: 36 additions & 0 deletions plugin/pkg/scheduler/schedulercache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"

"github.com/golang/glog"
policy "k8s.io/api/policy/v1beta1"
)

var (
Expand Down Expand Up @@ -55,6 +56,7 @@ type schedulerCache struct {
// a map from pod key to podState.
podStates map[string]*podState
nodes map[string]*NodeInfo
pdbs map[string]*policy.PodDisruptionBudget
}

type podState struct {
Expand All @@ -74,6 +76,7 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
nodes: make(map[string]*NodeInfo),
assumedPods: make(map[string]bool),
podStates: make(map[string]*podState),
pdbs: make(map[string]*policy.PodDisruptionBudget),
}
}

Expand Down Expand Up @@ -382,6 +385,39 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
return nil
}

func (cache *schedulerCache) AddPDB(pdb *policy.PodDisruptionBudget) error {
cache.mu.Lock()
defer cache.mu.Unlock()

// Unconditionally update cache.
cache.pdbs[pdb.Name] = pdb
return nil
}

func (cache *schedulerCache) UpdatePDB(oldPDB, newPDB *policy.PodDisruptionBudget) error {
return cache.AddPDB(newPDB)
}

func (cache *schedulerCache) RemovePDB(pdb *policy.PodDisruptionBudget) error {
cache.mu.Lock()
defer cache.mu.Unlock()

delete(cache.pdbs, pdb.Name)
return nil
}

func (cache *schedulerCache) ListPDBs(selector labels.Selector) ([]*policy.PodDisruptionBudget, error) {
cache.mu.Lock()
defer cache.mu.Unlock()
var pdbs []*policy.PodDisruptionBudget
for _, pdb := range cache.pdbs {
if selector.Matches(labels.Set(pdb.Labels)) {
pdbs = append(pdbs, pdb)
}
}
return pdbs, nil
}

func (cache *schedulerCache) run() {
go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
}
Expand Down
Loading