diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index d1fb61d8aa0..f01940d228e 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -124,6 +124,44 @@ func (gc *GenericController) WatchTransformationOf(obj metav1.Object, mapFn even eventhandlers.MapAndEnqueue{Map: mapFn, Predicates: p}) } +// WatchTransformationsOf watches objects matching obj's type and enqueues the keys returned by mapFn. +func (gc *GenericController) WatchTransformationsOf(obj metav1.Object, mapFn eventhandlers.ObjToKeys, + p ...predicates.Predicate) error { + gc.once.Do(gc.init) + return gc.queue.addEventHandler(obj, + eventhandlers.MapAndEnqueue{MultiMap: func(i interface{}) []types.ReconcileKey { + result := []types.ReconcileKey{} + for _, k := range mapFn(i) { + if namespace, name, err := cache.SplitMetaNamespaceKey(k); err == nil { + result = append(result, types.ReconcileKey{namespace, name}) + } + } + return result + }, Predicates: p}) +} + +// WatchTransformationKeyOf watches objects matching obj's type and enqueues the key returned by mapFn. +func (gc *GenericController) WatchTransformationKeyOf(obj metav1.Object, mapFn eventhandlers.ObjToReconcileKey, + p ...predicates.Predicate) error { + gc.once.Do(gc.init) + return gc.queue.addEventHandler(obj, + eventhandlers.MapAndEnqueue{MultiMap: func(i interface{}) []types.ReconcileKey { + if k := mapFn(i); len(k.Name) > 0 { + return []types.ReconcileKey{k} + } else { + return []types.ReconcileKey{} + } + }, Predicates: p}) +} + +// WatchTransformationKeysOf watches objects matching obj's type and enqueues the keys returned by mapFn. +func (gc *GenericController) WatchTransformationKeysOf(obj metav1.Object, mapFn eventhandlers.ObjToReconcileKeys, + p ...predicates.Predicate) error { + gc.once.Do(gc.init) + return gc.queue.addEventHandler(obj, + eventhandlers.MapAndEnqueue{MultiMap: mapFn, Predicates: p}) +} + // WatchEvents watches objects matching obj's type and uses the functions from provider to handle events. func (gc *GenericController) WatchEvents(obj metav1.Object, provider types.HandleFnProvider) error { gc.once.Do(gc.init) diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 3b30d51a713..dd71a472e6b 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -20,6 +20,8 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "time" + "github.com/kubernetes-sigs/kubebuilder/pkg/controller/eventhandlers" "github.com/kubernetes-sigs/kubebuilder/pkg/controller/test" "github.com/kubernetes-sigs/kubebuilder/pkg/controller/types" @@ -29,7 +31,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" - "time" ) var _ = Describe("GenericController", func() { @@ -218,7 +219,7 @@ var _ = Describe("GenericController", func() { Expect(instance.GetMetrics().QueueLength).Should(Equal(0)) }) - It("should use the map function to reconcile a different key", func() { + It("should use the transformation function to reconcile a different key", func() { // Listen for Pod changes Expect(instance.WatchTransformationOf(&corev1.Pod{}, func(obj interface{}) string { p := obj.(*corev1.Pod) @@ -233,6 +234,60 @@ var _ = Describe("GenericController", func() { Expect(instance.GetMetrics().QueueLength).Should(Equal(0)) }) + It("should use the transformationkey function to reconcile a different key", func() { + // Listen for Pod changes + Expect(instance.WatchTransformationKeyOf(&corev1.Pod{}, func(obj interface{}) types.ReconcileKey { + p := obj.(*corev1.Pod) + return types.ReconcileKey{p.Namespace + "-namespace", p.Name + "-name"} + })).Should(Succeed()) + + fakePodInformer.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}}) + + val := ChannelResult{} + Eventually(result).Should(Receive(&val.result)) + Expect(val.result).Should(Equal("default-namespace/test-pod-name")) + Expect(instance.GetMetrics().QueueLength).Should(Equal(0)) + }) + + It("should use the transformationsof function to reconcile multiple different keys", func() { + // Listen for Pod changes + Expect(instance.WatchTransformationsOf(&corev1.Pod{}, func(obj interface{}) []string { + p := obj.(*corev1.Pod) + return []string{ + p.Namespace + "-namespace/" + p.Name + "-name-1", + p.Namespace + "-namespace/" + p.Name + "-name-2"} + })).Should(Succeed()) + + fakePodInformer.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}}) + + val := ChannelResult{} + Eventually(result).Should(Receive(&val.result)) + Expect(val.result).Should(Equal("default-namespace/test-pod-name-1")) + Eventually(result).Should(Receive(&val.result)) + Expect(val.result).Should(Equal("default-namespace/test-pod-name-2")) + Expect(instance.GetMetrics().QueueLength).Should(Equal(0)) + }) + + It("should use the transformationkeysof function to reconcile multiple different keys", func() { + // Listen for Pod changes + Expect(instance.WatchTransformationKeysOf(&corev1.Pod{}, func(obj interface{}) []types.ReconcileKey { + p := obj.(*corev1.Pod) + return []types.ReconcileKey{ + {p.Namespace + "-namespace", p.Name + "-name-1"}, + {p.Namespace + "-namespace", p.Name + "-name-2"}, + } + })).Should(Succeed()) + + fakePodInformer.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}}) + + val := ChannelResult{} + Eventually(result).Should(Receive(&val.result)) + Expect(val.result).Should(Equal("default-namespace/test-pod-name-1")) + Eventually(result).Should(Receive(&val.result)) + Expect(val.result).Should(Equal("default-namespace/test-pod-name-2")) + Expect(instance.GetMetrics().QueueLength).Should(Equal(0)) + }) + It("should call the event handling add function", func() { // Listen for Pod changes Expect(instance.WatchEvents(&corev1.Pod{}, diff --git a/pkg/controller/eventhandlers/eventhandlers.go b/pkg/controller/eventhandlers/eventhandlers.go index c2f990bbeca..3fcd8772e79 100644 --- a/pkg/controller/eventhandlers/eventhandlers.go +++ b/pkg/controller/eventhandlers/eventhandlers.go @@ -39,6 +39,8 @@ type MapAndEnqueue struct { Predicates []predicates.Predicate // Map maps an object to a key that can be enqueued Map func(interface{}) string + + MultiMap func(interface{}) []types.ReconcileKey } // Get returns ResourceEventHandlerFuncs that Map an object to a Key and enqueue the key if it is non-empty @@ -74,9 +76,15 @@ func (mp MapAndEnqueue) Get(r workqueue.RateLimitingInterface) cache.ResourceEve // addRateLimited maps the obj to a string. If the string is non-empty, it is enqueued. func (mp MapAndEnqueue) addRateLimited(r workqueue.RateLimitingInterface, obj interface{}) { - k := mp.Map(obj) - if len(k) > 0 { - r.AddRateLimited(k) + if mp.Map != nil { + if k := mp.Map(obj); len(k) > 0 { + r.AddRateLimited(k) + } + } + if mp.MultiMap != nil { + for _, k := range mp.MultiMap(obj) { + r.AddRateLimited(k.Namespace + "/" + k.Name) + } } } @@ -141,6 +149,12 @@ func (m MapToController) Map(obj interface{}) string { // ObjToKey returns a string namespace/name key for an object type ObjToKey func(interface{}) string +type ObjToKeys func(interface{}) []string + +type ObjToReconcileKey func(interface{}) types.ReconcileKey + +type ObjToReconcileKeys func(interface{}) []types.ReconcileKey + // MapToSelf returns the namespace/name key of obj func MapToSelf(obj interface{}) string { if key, err := cache.MetaNamespaceKeyFunc(obj); err != nil { diff --git a/pkg/controller/example_watchandmap_test.go b/pkg/controller/example_watchandmap_test.go index 60c5aee8192..f8de998413b 100644 --- a/pkg/controller/example_watchandmap_test.go +++ b/pkg/controller/example_watchandmap_test.go @@ -71,3 +71,135 @@ func ExampleGenericController_WatchTransformationOf() { // One time for program controller.RunInformersAndControllers(run.CreateRunArguments()) } + +func ExampleGenericController_WatchTransformationsOf() { + // One time setup for program + flag.Parse() + informerFactory := config.GetKubernetesInformersOrDie() + if err := controller.AddInformerProvider(&corev1.Pod{}, informerFactory.Core().V1().Pods()); err != nil { + log.Fatalf("Could not set informer %v", err) + } + if err := controller.AddInformerProvider(&appsv1.ReplicaSet{}, informerFactory.Apps().V1().ReplicaSets()); err != nil { + log.Fatalf("Could not set informer %v", err) + } + + // Per-controller setup + c := &controller.GenericController{ + Reconcile: func(key types.ReconcileKey) error { + fmt.Printf("Reconciling Pod %s\n", key) + return nil + }, + } + err := c.Watch(&appsv1.ReplicaSet{}) + if err != nil { + log.Fatalf("%v", err) + } + err = c.WatchTransformationsOf(&corev1.Pod{}, + func(i interface{}) []string { + p, ok := i.(*corev1.Pod) + if !ok { + return []string{} + } + + // Find multiple parents based off the name + return []string{ + p.Namespace + "/" + strings.Split(p.Name, "-")[0] + "-parent-1", + p.Namespace + "/" + strings.Split(p.Name, "-")[0] + "-parent-2", + } + }, + ) + if err != nil { + log.Fatalf("%v", err) + } + controller.AddController(c) + + // One time for program + controller.RunInformersAndControllers(run.CreateRunArguments()) +} + +func ExampleGenericController_WatchTransformationKeyOf() { + // One time setup for program + flag.Parse() + informerFactory := config.GetKubernetesInformersOrDie() + if err := controller.AddInformerProvider(&corev1.Pod{}, informerFactory.Core().V1().Pods()); err != nil { + log.Fatalf("Could not set informer %v", err) + } + if err := controller.AddInformerProvider(&appsv1.ReplicaSet{}, informerFactory.Apps().V1().ReplicaSets()); err != nil { + log.Fatalf("Could not set informer %v", err) + } + + // Per-controller setup + c := &controller.GenericController{ + Reconcile: func(key types.ReconcileKey) error { + fmt.Printf("Reconciling Pod %s\n", key) + return nil + }, + } + err := c.Watch(&appsv1.ReplicaSet{}) + if err != nil { + log.Fatalf("%v", err) + } + err = c.WatchTransformationKeyOf(&corev1.Pod{}, + func(i interface{}) types.ReconcileKey { + p, ok := i.(*corev1.Pod) + if !ok { + return types.ReconcileKey{} + } + + // Find multiple parents based off the name + return types.ReconcileKey{p.Namespace, strings.Split(p.Name, "-")[0]} + }, + ) + if err != nil { + log.Fatalf("%v", err) + } + controller.AddController(c) + + // One time for program + controller.RunInformersAndControllers(run.CreateRunArguments()) +} + +func ExampleGenericController_WatchTransformationKeysOf() { + // One time setup for program + flag.Parse() + informerFactory := config.GetKubernetesInformersOrDie() + if err := controller.AddInformerProvider(&corev1.Pod{}, informerFactory.Core().V1().Pods()); err != nil { + log.Fatalf("Could not set informer %v", err) + } + if err := controller.AddInformerProvider(&appsv1.ReplicaSet{}, informerFactory.Apps().V1().ReplicaSets()); err != nil { + log.Fatalf("Could not set informer %v", err) + } + + // Per-controller setup + c := &controller.GenericController{ + Reconcile: func(key types.ReconcileKey) error { + fmt.Printf("Reconciling Pod %s\n", key) + return nil + }, + } + err := c.Watch(&appsv1.ReplicaSet{}) + if err != nil { + log.Fatalf("%v", err) + } + err = c.WatchTransformationKeysOf(&corev1.Pod{}, + func(i interface{}) []types.ReconcileKey { + p, ok := i.(*corev1.Pod) + if !ok { + return []types.ReconcileKey{} + } + + // Find multiple parents based off the name + return []types.ReconcileKey{ + {p.Namespace, strings.Split(p.Name, "-")[0] + "-parent-1"}, + {p.Namespace, strings.Split(p.Name, "-")[0] + "-parent-2"}, + } + }, + ) + if err != nil { + log.Fatalf("%v", err) + } + controller.AddController(c) + + // One time for program + controller.RunInformersAndControllers(run.CreateRunArguments()) +}