From 419e32af0a7ba6328205b9fd2b309e1dbc7c2d86 Mon Sep 17 00:00:00 2001 From: Vince Prignano Date: Fri, 6 Jan 2023 12:49:42 -0800 Subject: [PATCH] Refactor source/handler/predicate packages to remove dep injection Signed-off-by: Vince Prignano --- examples/builtins/main.go | 6 +- hack/test-all.sh | 2 +- pkg/builder/controller.go | 40 +++-- pkg/builder/controller_test.go | 18 ++- pkg/cluster/cluster.go | 6 +- pkg/controller/controller.go | 1 - pkg/controller/controller_integration_test.go | 9 +- pkg/controller/example_test.go | 6 +- pkg/handler/enqueue_owner.go | 85 +++++----- pkg/handler/eventhandler_test.go | 150 +++--------------- pkg/handler/example_test.go | 15 +- pkg/internal/controller/controller.go | 26 --- pkg/internal/controller/controller_test.go | 143 +---------------- .../recorder/recorder_integration_test.go | 2 +- pkg/manager/internal.go | 3 - pkg/manager/manager_test.go | 21 --- pkg/predicate/predicate.go | 23 --- pkg/runtime/inject/inject.go | 15 -- pkg/runtime/inject/inject_test.go | 22 --- pkg/source/example_test.go | 4 +- pkg/source/source.go | 108 +++++-------- pkg/source/source_integration_test.go | 10 +- pkg/source/source_test.go | 62 ++------ 23 files changed, 184 insertions(+), 593 deletions(-) diff --git a/examples/builtins/main.go b/examples/builtins/main.go index ff1f0dfa3b..cddaae24bb 100644 --- a/examples/builtins/main.go +++ b/examples/builtins/main.go @@ -59,14 +59,14 @@ func main() { } // Watch ReplicaSets and enqueue ReplicaSet object key - if err := c.Watch(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForObject{}); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}); err != nil { entryLog.Error(err, "unable to watch ReplicaSets") os.Exit(1) } // Watch Pods and enqueue owning ReplicaSet key - if err := c.Watch(&source.Kind{Type: &corev1.Pod{}}, - &handler.EnqueueRequestForOwner{OwnerType: &appsv1.ReplicaSet{}, IsController: true}); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), + handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner())); err != nil { entryLog.Error(err, "unable to watch Pods") os.Exit(1) } diff --git a/hack/test-all.sh b/hack/test-all.sh index 202dd89492..50b87ab862 100755 --- a/hack/test-all.sh +++ b/hack/test-all.sh @@ -25,7 +25,7 @@ if [[ -n ${ARTIFACTS:-} ]]; then fi result=0 -go test -race ${P_FLAG} ${MOD_OPT} ./... ${GINKGO_ARGS} || result=$? +go test -v -race ${P_FLAG} ${MOD_OPT} ./... --ginkgo.timeout=2m --ginkgo.fail-fast ${GINKGO_ARGS} || result=$? if [[ -n ${ARTIFACTS:-} ]]; then mkdir -p ${ARTIFACTS} diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index 03f9633a74..38cabbbc57 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -22,7 +22,6 @@ import ( "strings" "github.com/go-logr/logr" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/klog/v2" @@ -197,18 +196,16 @@ func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, erro return blder.ctrl, nil } -func (blder *Builder) project(obj client.Object, proj objectProjection) (client.Object, error) { +func (blder *Builder) project(obj client.Object, proj objectProjection) (source.KindSource, error) { + src := source.Kind(blder.mgr.GetCache(), obj) switch proj { case projectAsNormal: - return obj, nil + return src, nil case projectAsMetadata: - metaObj := &metav1.PartialObjectMetadata{} - gvk, err := getGvk(obj, blder.mgr.GetScheme()) - if err != nil { - return nil, fmt.Errorf("unable to determine GVK of %T for a metadata-only watch: %w", obj, err) + if err := src.AsPartialMetadata(blder.mgr.GetScheme()); err != nil { + return nil, err } - metaObj.SetGroupVersionKind(gvk) - return metaObj, nil + return src, nil default: panic(fmt.Sprintf("unexpected projection type %v on type %T, should not be possible since this is an internal field", proj, obj)) } @@ -217,11 +214,10 @@ func (blder *Builder) project(obj client.Object, proj objectProjection) (client. func (blder *Builder) doWatch() error { // Reconcile type if blder.forInput.object != nil { - typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection) + src, err := blder.project(blder.forInput.object, blder.forInput.objectProjection) if err != nil { return err } - src := &source.Kind{Type: typeForSrc} hdler := &handler.EnqueueRequestForObject{} allPredicates := append(blder.globalPredicates, blder.forInput.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { @@ -234,15 +230,15 @@ func (blder *Builder) doWatch() error { return errors.New("Owns() can only be used together with For()") } for _, own := range blder.ownsInput { - typeForSrc, err := blder.project(own.object, own.objectProjection) + src, err := blder.project(own.object, own.objectProjection) if err != nil { return err } - src := &source.Kind{Type: typeForSrc} - hdler := &handler.EnqueueRequestForOwner{ - OwnerType: blder.forInput.object, - IsController: true, - } + hdler := handler.EnqueueRequestForOwner( + blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(), + blder.forInput.object, + handler.OnlyControllerOwner(), + ) allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, own.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { @@ -259,12 +255,12 @@ func (blder *Builder) doWatch() error { allPredicates = append(allPredicates, w.predicates...) // If the source of this watch is of type *source.Kind, project it. - if srckind, ok := w.src.(*source.Kind); ok { - typeForSrc, err := blder.project(srckind.Type, w.objectProjection) - if err != nil { - return err + if srckind, ok := w.src.(source.KindSource); ok { + if w.objectProjection == projectAsMetadata { + if err := srckind.AsPartialMetadata(blder.mgr.GetScheme()); err != nil { + return err + } } - srckind.Type = typeForSrc } if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil { diff --git a/pkg/builder/controller_test.go b/pkg/builder/controller_test.go index 782c20ab16..092bbe46ae 100644 --- a/pkg/builder/controller_test.go +++ b/pkg/builder/controller_test.go @@ -118,7 +118,7 @@ var _ = Describe("application", func() { Expect(err).NotTo(HaveOccurred()) instance, err := ControllerManagedBy(m). - Watches(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForObject{}). + Watches(source.Kind(m.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}). Build(noop) Expect(err).To(MatchError(ContainSubstring("one of For() or Named() must be called"))) Expect(instance).To(BeNil()) @@ -157,7 +157,7 @@ var _ = Describe("application", func() { instance, err := ControllerManagedBy(m). Named("my_controller"). - Watches(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForObject{}). + Watches(source.Kind(m.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}). Build(noop) Expect(err).NotTo(HaveOccurred()) Expect(instance).NotTo(BeNil()) @@ -369,8 +369,9 @@ var _ = Describe("application", func() { bldr := ControllerManagedBy(m). For(&appsv1.Deployment{}). Watches( // Equivalent of Owns - &source.Kind{Type: &appsv1.ReplicaSet{}}, - &handler.EnqueueRequestForOwner{OwnerType: &appsv1.Deployment{}, IsController: true}) + source.Kind(m.GetCache(), &appsv1.ReplicaSet{}), + handler.EnqueueRequestForOwner(m.GetScheme(), m.GetRESTMapper(), &appsv1.Deployment{}, handler.OnlyControllerOwner()), + ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -384,10 +385,11 @@ var _ = Describe("application", func() { bldr := ControllerManagedBy(m). Named("Deployment"). Watches( // Equivalent of For - &source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForObject{}). + source.Kind(m.GetCache(), &appsv1.Deployment{}), &handler.EnqueueRequestForObject{}). Watches( // Equivalent of Owns - &source.Kind{Type: &appsv1.ReplicaSet{}}, - &handler.EnqueueRequestForOwner{OwnerType: &appsv1.Deployment{}, IsController: true}) + source.Kind(m.GetCache(), &appsv1.ReplicaSet{}), + handler.EnqueueRequestForOwner(m.GetScheme(), m.GetRESTMapper(), &appsv1.Deployment{}, handler.OnlyControllerOwner()), + ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -481,7 +483,7 @@ var _ = Describe("application", func() { bldr := ControllerManagedBy(mgr). For(&appsv1.Deployment{}, OnlyMetadata). Owns(&appsv1.ReplicaSet{}, OnlyMetadata). - Watches(&source.Kind{Type: &appsv1.StatefulSet{}}, + Watches(source.Kind(mgr.GetCache(), &appsv1.StatefulSet{}), handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request { defer GinkgoRecover() diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 905296cd35..6f7d2af0a5 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -45,6 +45,9 @@ type Cluster interface { // GetConfig returns an initialized Config GetConfig() *rest.Config + // GetCache returns a cache.Cache + GetCache() cache.Cache + // GetScheme returns an initialized Scheme GetScheme() *runtime.Scheme @@ -57,9 +60,6 @@ type Cluster interface { // GetFieldIndexer returns a client.FieldIndexer configured with the client GetFieldIndexer() client.FieldIndexer - // GetCache returns a cache.Cache - GetCache() cache.Cache - // GetEventRecorderFor returns a new EventRecorder for the provided name GetEventRecorderFor(name string) record.EventRecorder diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 8e0a9a91de..4aaa56a6f3 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -156,7 +156,6 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller }, MaxConcurrentReconciles: options.MaxConcurrentReconciles, CacheSyncTimeout: options.CacheSyncTimeout, - SetFields: mgr.SetFields, Name: name, LogConstructor: options.LogConstructor, RecoverPanic: options.RecoverPanic, diff --git a/pkg/controller/controller_integration_test.go b/pkg/controller/controller_integration_test.go index 3ddd2ccf60..48facf1e94 100644 --- a/pkg/controller/controller_integration_test.go +++ b/pkg/controller/controller_integration_test.go @@ -64,12 +64,13 @@ var _ = Describe("controller", func() { Expect(err).NotTo(HaveOccurred()) By("Watching Resources") - err = instance.Watch(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.Deployment{}, - }) + err = instance.Watch( + source.Kind(cm.GetCache(), &appsv1.ReplicaSet{}), + handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}), + ) Expect(err).NotTo(HaveOccurred()) - err = instance.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForObject{}) + err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}), &handler.EnqueueRequestForObject{}) Expect(err).NotTo(HaveOccurred()) err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{}) diff --git a/pkg/controller/example_test.go b/pkg/controller/example_test.go index 3d8e399703..d4fa1aef0b 100644 --- a/pkg/controller/example_test.go +++ b/pkg/controller/example_test.go @@ -71,7 +71,7 @@ func ExampleController() { } // Watch for Pod create / update / delete events and call Reconcile - err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}) + err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{}) if err != nil { log.Error(err, "unable to watch pods") os.Exit(1) @@ -108,7 +108,7 @@ func ExampleController_unstructured() { Version: "v1", }) // Watch for Pod create / update / delete events and call Reconcile - err = c.Watch(&source.Kind{Type: u}, &handler.EnqueueRequestForObject{}) + err = c.Watch(source.Kind(mgr.GetCache(), u), &handler.EnqueueRequestForObject{}) if err != nil { log.Error(err, "unable to watch pods") os.Exit(1) @@ -139,7 +139,7 @@ func ExampleNewUnmanaged() { os.Exit(1) } - if err := c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{}); err != nil { log.Error(err, "unable to watch pods") os.Exit(1) } diff --git a/pkg/handler/enqueue_owner.go b/pkg/handler/enqueue_owner.go index 63699893fc..bfa2ff140e 100644 --- a/pkg/handler/enqueue_owner.go +++ b/pkg/handler/enqueue_owner.go @@ -25,15 +25,18 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/runtime/inject" ) -var _ EventHandler = &EnqueueRequestForOwner{} +var _ EventHandler = &enqueueRequestForOwner{} -var log = logf.RuntimeLog.WithName("eventhandler").WithName("EnqueueRequestForOwner") +var log = logf.RuntimeLog.WithName("eventhandler").WithName("enqueueRequestForOwner") + +// OwnerOption modifies an EnqueueRequestForOwner EventHandler. +type OwnerOption func(e *enqueueRequestForOwner) // EnqueueRequestForOwner enqueues Requests for the Owners of an object. E.g. the object that created // the object that was the source of the Event. @@ -42,13 +45,34 @@ var log = logf.RuntimeLog.WithName("eventhandler").WithName("EnqueueRequestForOw // // - a source.Kind Source with Type of Pod. // -// - a handler.EnqueueRequestForOwner EventHandler with an OwnerType of ReplicaSet and IsController set to true. -type EnqueueRequestForOwner struct { - // OwnerType is the type of the Owner object to look for in OwnerReferences. Only Group and Kind are compared. - OwnerType runtime.Object +// - a handler.enqueueRequestForOwner EventHandler with an OwnerType of ReplicaSet and OnlyControllerOwner set to true. +func EnqueueRequestForOwner(scheme *runtime.Scheme, mapper meta.RESTMapper, owner client.Object, opts ...OwnerOption) EventHandler { + e := &enqueueRequestForOwner{ + ownerType: owner, + mapper: mapper, + } + if err := e.parseOwnerTypeGroupKind(scheme); err != nil { + panic(err) + } + for _, opt := range opts { + opt(e) + } + return e +} + +// OnlyControllerOwner if provided will only look at the first OwnerReference with Controller: true. +func OnlyControllerOwner() OwnerOption { + return func(e *enqueueRequestForOwner) { + e.isController = true + } +} - // IsController if set will only look at the first OwnerReference with Controller: true. - IsController bool +type enqueueRequestForOwner struct { + // ownerType is the type of the Owner object to look for in OwnerReferences. Only Group and Kind are compared. + ownerType runtime.Object + + // isController if set will only look at the first OwnerReference with Controller: true. + isController bool // groupKind is the cached Group and Kind from OwnerType groupKind schema.GroupKind @@ -58,7 +82,7 @@ type EnqueueRequestForOwner struct { } // Create implements EventHandler. -func (e *EnqueueRequestForOwner) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestForOwner) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} e.getOwnerReconcileRequest(evt.Object, reqs) for req := range reqs { @@ -67,7 +91,7 @@ func (e *EnqueueRequestForOwner) Create(evt event.CreateEvent, q workqueue.RateL } // Update implements EventHandler. -func (e *EnqueueRequestForOwner) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestForOwner) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} e.getOwnerReconcileRequest(evt.ObjectOld, reqs) e.getOwnerReconcileRequest(evt.ObjectNew, reqs) @@ -77,7 +101,7 @@ func (e *EnqueueRequestForOwner) Update(evt event.UpdateEvent, q workqueue.RateL } // Delete implements EventHandler. -func (e *EnqueueRequestForOwner) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestForOwner) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} e.getOwnerReconcileRequest(evt.Object, reqs) for req := range reqs { @@ -86,7 +110,7 @@ func (e *EnqueueRequestForOwner) Delete(evt event.DeleteEvent, q workqueue.RateL } // Generic implements EventHandler. -func (e *EnqueueRequestForOwner) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { +func (e *enqueueRequestForOwner) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { reqs := map[reconcile.Request]empty{} e.getOwnerReconcileRequest(evt.Object, reqs) for req := range reqs { @@ -96,17 +120,17 @@ func (e *EnqueueRequestForOwner) Generic(evt event.GenericEvent, q workqueue.Rat // parseOwnerTypeGroupKind parses the OwnerType into a Group and Kind and caches the result. Returns false // if the OwnerType could not be parsed using the scheme. -func (e *EnqueueRequestForOwner) parseOwnerTypeGroupKind(scheme *runtime.Scheme) error { +func (e *enqueueRequestForOwner) parseOwnerTypeGroupKind(scheme *runtime.Scheme) error { // Get the kinds of the type - kinds, _, err := scheme.ObjectKinds(e.OwnerType) + kinds, _, err := scheme.ObjectKinds(e.ownerType) if err != nil { - log.Error(err, "Could not get ObjectKinds for OwnerType", "owner type", fmt.Sprintf("%T", e.OwnerType)) + log.Error(err, "Could not get ObjectKinds for OwnerType", "owner type", fmt.Sprintf("%T", e.ownerType)) return err } // Expect only 1 kind. If there is more than one kind this is probably an edge case such as ListOptions. if len(kinds) != 1 { - err := fmt.Errorf("expected exactly 1 kind for OwnerType %T, but found %s kinds", e.OwnerType, kinds) - log.Error(nil, "expected exactly 1 kind for OwnerType", "owner type", fmt.Sprintf("%T", e.OwnerType), "kinds", kinds) + err := fmt.Errorf("expected exactly 1 kind for OwnerType %T, but found %s kinds", e.ownerType, kinds) + log.Error(nil, "expected exactly 1 kind for OwnerType", "owner type", fmt.Sprintf("%T", e.ownerType), "kinds", kinds) return err } // Cache the Group and Kind for the OwnerType @@ -116,7 +140,7 @@ func (e *EnqueueRequestForOwner) parseOwnerTypeGroupKind(scheme *runtime.Scheme) // getOwnerReconcileRequest looks at object and builds a map of reconcile.Request to reconcile // owners of object that match e.OwnerType. -func (e *EnqueueRequestForOwner) getOwnerReconcileRequest(object metav1.Object, result map[reconcile.Request]empty) { +func (e *enqueueRequestForOwner) getOwnerReconcileRequest(object metav1.Object, result map[reconcile.Request]empty) { // Iterate through the OwnerReferences looking for a match on Group and Kind against what was requested // by the user for _, ref := range e.getOwnersReferences(object) { @@ -138,7 +162,7 @@ func (e *EnqueueRequestForOwner) getOwnerReconcileRequest(object metav1.Object, Name: ref.Name, }} - // if owner is not namespaced then we should set the namespace to the empty + // if owner is not namespaced then we should not set the namespace mapping, err := e.mapper.RESTMapping(e.groupKind, refGV.Version) if err != nil { log.Error(err, "Could not retrieve rest mapping", "kind", e.groupKind) @@ -153,16 +177,16 @@ func (e *EnqueueRequestForOwner) getOwnerReconcileRequest(object metav1.Object, } } -// getOwnersReferences returns the OwnerReferences for an object as specified by the EnqueueRequestForOwner +// getOwnersReferences returns the OwnerReferences for an object as specified by the enqueueRequestForOwner // - if IsController is true: only take the Controller OwnerReference (if found) // - if IsController is false: take all OwnerReferences. -func (e *EnqueueRequestForOwner) getOwnersReferences(object metav1.Object) []metav1.OwnerReference { +func (e *enqueueRequestForOwner) getOwnersReferences(object metav1.Object) []metav1.OwnerReference { if object == nil { return nil } // If not filtered as Controller only, then use all the OwnerReferences - if !e.IsController { + if !e.isController { return object.GetOwnerReferences() } // If filtered to a Controller, only take the Controller OwnerReference @@ -172,18 +196,3 @@ func (e *EnqueueRequestForOwner) getOwnersReferences(object metav1.Object) []met // No Controller OwnerReference found return nil } - -var _ inject.Scheme = &EnqueueRequestForOwner{} - -// InjectScheme is called by the Controller to provide a singleton scheme to the EnqueueRequestForOwner. -func (e *EnqueueRequestForOwner) InjectScheme(s *runtime.Scheme) error { - return e.parseOwnerTypeGroupKind(s) -} - -var _ inject.Mapper = &EnqueueRequestForOwner{} - -// InjectMapper is called by the Controller to provide the rest mapper used by the manager. -func (e *EnqueueRequestForOwner) InjectMapper(m meta.RESTMapper) error { - e.mapper = m - return nil -} diff --git a/pkg/handler/eventhandler_test.go b/pkg/handler/eventhandler_test.go index d8b2211869..ea59805c42 100644 --- a/pkg/handler/eventhandler_test.go +++ b/pkg/handler/eventhandler_test.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/util/workqueue" + "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" @@ -40,7 +41,6 @@ var _ = Describe("Eventhandler", func() { var instance handler.EnqueueRequestForObject var pod *corev1.Pod var mapper meta.RESTMapper - t := true BeforeEach(func() { q = controllertest.Queue{Interface: workqueue.New()} pod = &corev1.Pod{ @@ -303,11 +303,7 @@ var _ = Describe("Eventhandler", func() { Describe("EnqueueRequestForOwner", func() { It("should enqueue a Request with the Owner of the object in the CreateEvent.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}) pod.OwnerReferences = []metav1.OwnerReference{ { @@ -328,11 +324,7 @@ var _ = Describe("Eventhandler", func() { }) It("should enqueue a Request with the Owner of the object in the DeleteEvent.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}) pod.OwnerReferences = []metav1.OwnerReference{ { @@ -357,11 +349,7 @@ var _ = Describe("Eventhandler", func() { newPod.Name = pod.Name + "2" newPod.Namespace = pod.Namespace + "2" - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}) pod.OwnerReferences = []metav1.OwnerReference{ { @@ -398,11 +386,7 @@ var _ = Describe("Eventhandler", func() { newPod := pod.DeepCopy() newPod.Name = pod.Name + "2" - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}) pod.OwnerReferences = []metav1.OwnerReference{ { @@ -431,12 +415,7 @@ var _ = Describe("Eventhandler", func() { }) It("should enqueue a Request with the Owner of the object in the GenericEvent.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) - + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}) pod.OwnerReferences = []metav1.OwnerReference{ { Name: "foo-parent", @@ -456,12 +435,7 @@ var _ = Describe("Eventhandler", func() { }) It("should not enqueue a Request if there are no owners matching Group and Kind.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - IsController: t, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()) pod.OwnerReferences = []metav1.OwnerReference{ { // Wrong group Name: "foo1-parent", @@ -483,11 +457,7 @@ var _ = Describe("Eventhandler", func() { It("should enqueue a Request if there are owners matching Group "+ "and Kind with a different version.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &autoscalingv1.HorizontalPodAutoscaler{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &autoscalingv1.HorizontalPodAutoscaler{}) pod.OwnerReferences = []metav1.OwnerReference{ { Name: "foo-parent", @@ -507,11 +477,7 @@ var _ = Describe("Eventhandler", func() { }) It("should enqueue a Request for a owner that is cluster scoped", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &corev1.Node{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &corev1.Node{}) pod.OwnerReferences = []metav1.OwnerReference{ { Name: "node-1", @@ -532,11 +498,7 @@ var _ = Describe("Eventhandler", func() { }) It("should not enqueue a Request if there are no owners.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}) evt := event.CreateEvent{ Object: pod, } @@ -547,12 +509,7 @@ var _ = Describe("Eventhandler", func() { Context("with the Controller field set to true", func() { It("should enqueue reconcile.Requests for only the first the Controller if there are "+ "multiple Controller owners.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - IsController: t, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()) pod.OwnerReferences = []metav1.OwnerReference{ { Name: "foo1-parent", @@ -563,7 +520,7 @@ var _ = Describe("Eventhandler", func() { Name: "foo2-parent", Kind: "ReplicaSet", APIVersion: "apps/v1", - Controller: &t, + Controller: pointer.Bool(true), }, { Name: "foo3-parent", @@ -574,7 +531,7 @@ var _ = Describe("Eventhandler", func() { Name: "foo4-parent", Kind: "ReplicaSet", APIVersion: "apps/v1", - Controller: &t, + Controller: pointer.Bool(true), }, { Name: "foo5-parent", @@ -593,12 +550,7 @@ var _ = Describe("Eventhandler", func() { }) It("should not enqueue reconcile.Requests if there are no Controller owners.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - IsController: t, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()) pod.OwnerReferences = []metav1.OwnerReference{ { Name: "foo1-parent", @@ -624,12 +576,7 @@ var _ = Describe("Eventhandler", func() { }) It("should not enqueue reconcile.Requests if there are no owners.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - IsController: t, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()) evt := event.CreateEvent{ Object: pod, } @@ -640,11 +587,7 @@ var _ = Describe("Eventhandler", func() { Context("with the Controller field set to false", func() { It("should enqueue a reconcile.Requests for all owners.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()) pod.OwnerReferences = []metav1.OwnerReference{ { Name: "foo1-parent", @@ -684,11 +627,7 @@ var _ = Describe("Eventhandler", func() { Context("with a nil object", func() { It("should do nothing.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}) pod.OwnerReferences = []metav1.OwnerReference{ { Name: "foo1-parent", @@ -704,54 +643,9 @@ var _ = Describe("Eventhandler", func() { }) }) - Context("with a multiple matching kinds", func() { - It("should do nothing.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &metav1.ListOptions{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).NotTo(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) - pod.OwnerReferences = []metav1.OwnerReference{ - { - Name: "foo1-parent", - Kind: "ListOptions", - APIVersion: "meta/v1", - }, - } - evt := event.CreateEvent{ - Object: pod, - } - instance.Create(evt, q) - Expect(q.Len()).To(Equal(0)) - }) - }) - Context("with an OwnerType that cannot be resolved", func() { - It("should do nothing.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &controllertest.ErrorType{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).NotTo(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) - pod.OwnerReferences = []metav1.OwnerReference{ - { - Name: "foo1-parent", - Kind: "ListOptions", - APIVersion: "meta/v1", - }, - } - evt := event.CreateEvent{ - Object: pod, - } - instance.Create(evt, q) - Expect(q.Len()).To(Equal(0)) - }) - }) - Context("with a nil OwnerType", func() { - It("should do nothing.", func() { - instance := handler.EnqueueRequestForOwner{} - Expect(instance.InjectScheme(scheme.Scheme)).NotTo(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + It("should panic", func() { + Expect(handler.EnqueueRequestForOwner(nil, nil, nil)).To(Panic()) pod.OwnerReferences = []metav1.OwnerReference{ { Name: "foo1-parent", @@ -769,11 +663,7 @@ var _ = Describe("Eventhandler", func() { Context("with an invalid APIVersion in the OwnerReference", func() { It("should do nothing.", func() { - instance := handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.ReplicaSet{}, - } - Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed()) - Expect(instance.InjectMapper(mapper)).To(Succeed()) + instance := handler.EnqueueRequestForOwner(scheme.Scheme, mapper, &appsv1.ReplicaSet{}) pod.OwnerReferences = []metav1.OwnerReference{ { Name: "foo1-parent", diff --git a/pkg/handler/example_test.go b/pkg/handler/example_test.go index dbfab46157..9e1ad0a1d4 100644 --- a/pkg/handler/example_test.go +++ b/pkg/handler/example_test.go @@ -25,10 +25,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) +var mgr manager.Manager var c controller.Controller // This example watches Pods and enqueues Requests with the Name and Namespace of the Pod from @@ -36,7 +38,7 @@ var c controller.Controller func ExampleEnqueueRequestForObject() { // controller is a controller.controller err := c.Watch( - &source.Kind{Type: &corev1.Pod{}}, + source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{}, ) if err != nil { @@ -49,11 +51,8 @@ func ExampleEnqueueRequestForObject() { func ExampleEnqueueRequestForOwner() { // controller is a controller.controller err := c.Watch( - &source.Kind{Type: &appsv1.ReplicaSet{}}, - &handler.EnqueueRequestForOwner{ - OwnerType: &appsv1.Deployment{}, - IsController: true, - }, + source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}), + handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.Deployment{}, handler.OnlyControllerOwner()), ) if err != nil { // handle it @@ -65,7 +64,7 @@ func ExampleEnqueueRequestForOwner() { func ExampleEnqueueRequestsFromMapFunc() { // controller is a controller.controller err := c.Watch( - &source.Kind{Type: &appsv1.Deployment{}}, + source.Kind(mgr.GetCache(), &appsv1.Deployment{}), handler.EnqueueRequestsFromMapFunc(func(a client.Object) []reconcile.Request { return []reconcile.Request{ {NamespacedName: types.NamespacedName{ @@ -88,7 +87,7 @@ func ExampleEnqueueRequestsFromMapFunc() { func ExampleFuncs() { // controller is a controller.controller err := c.Watch( - &source.Kind{Type: &corev1.Pod{}}, + source.Kind(mgr.GetCache(), &corev1.Pod{}), handler.Funcs{ CreateFunc: func(e event.CreateEvent, q workqueue.RateLimitingInterface) { q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 1f8f8b7398..969eeeb7d2 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -33,12 +33,9 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/runtime/inject" "sigs.k8s.io/controller-runtime/pkg/source" ) -var _ inject.Injector = &Controller{} - // Controller implements controller.Controller. type Controller struct { // Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required. @@ -61,10 +58,6 @@ type Controller struct { // the Queue for processing Queue workqueue.RateLimitingInterface - // SetFields is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates - // Deprecated: the caller should handle injected fields itself. - SetFields func(i interface{}) error - // mu is used to synchronize Controller setup mu sync.Mutex @@ -130,19 +123,6 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc c.mu.Lock() defer c.mu.Unlock() - // Inject Cache into arguments - if err := c.SetFields(src); err != nil { - return err - } - if err := c.SetFields(evthdler); err != nil { - return err - } - for _, pr := range prct { - if err := c.SetFields(pr); err != nil { - return err - } - } - // Controller hasn't started yet, store the watches locally and return. // // These watches are going to be held on the controller struct until the manager or user calls Start(...). @@ -362,12 +342,6 @@ func (c *Controller) GetLogger() logr.Logger { return c.LogConstructor(nil) } -// InjectFunc implement SetFields.Injector. -func (c *Controller) InjectFunc(f inject.Func) error { - c.SetFields = f - return nil -} - // updateMetrics updates prometheus metrics within the controller. func (c *Controller) updateMetrics(reconcileTime time.Duration) { ctrlmetrics.ReconcileTime.WithLabelValues(c.Name).Observe(reconcileTime.Seconds()) diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 5d3b1c9bea..905b4c4f4d 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -44,7 +44,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/internal/log" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/runtime/inject" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -52,7 +51,6 @@ var _ = Describe("controller", func() { var fakeReconcile *fakeReconciler var ctrl *Controller var queue *controllertest.Queue - var informers *informertest.FakeInformers var reconciled chan reconcile.Request var request = reconcile.Request{ NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}, @@ -67,7 +65,6 @@ var _ = Describe("controller", func() { queue = &controllertest.Queue{ Interface: workqueue.New(), } - informers = &informertest.FakeInformers{} ctrl = &Controller{ MaxConcurrentReconciles: 1, Do: fakeReconcile, @@ -76,7 +73,6 @@ var _ = Describe("controller", func() { return log.RuntimeLog.WithName("controller").WithName("test") }, } - Expect(ctrl.InjectFunc(func(interface{}) error { return nil })).To(Succeed()) }) Describe("Reconciler", func() { @@ -131,7 +127,7 @@ var _ = Describe("controller", func() { It("should return an error if there is an error waiting for the informers", func() { f := false ctrl.startWatches = []watchDescription{{ - src: source.NewKindWithCache(&corev1.Pod{}, &informertest.FakeInformers{Synced: &f}), + src: source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}), }} ctrl.Name = "foo" ctx, cancel := context.WithCancel(context.Background()) @@ -149,7 +145,7 @@ var _ = Describe("controller", func() { c = &cacheWithIndefinitelyBlockingGetInformer{c} ctrl.startWatches = []watchDescription{{ - src: source.NewKindWithCache(&appsv1.Deployment{}, c), + src: source.Kind(c, &appsv1.Deployment{}), }} ctrl.Name = "testcontroller" @@ -167,7 +163,7 @@ var _ = Describe("controller", func() { c = &cacheWithIndefinitelyBlockingGetInformer{c} ctrl.startWatches = []watchDescription{{ src: &singnallingSourceWrapper{ - SyncingSource: source.NewKindWithCache(&appsv1.Deployment{}, c), + SyncingSource: source.Kind(c, &appsv1.Deployment{}), cacheSyncDone: sourceSynced, }, }} @@ -195,7 +191,7 @@ var _ = Describe("controller", func() { Expect(err).NotTo(HaveOccurred()) ctrl.startWatches = []watchDescription{{ src: &singnallingSourceWrapper{ - SyncingSource: source.NewKindWithCache(&appsv1.Deployment{}, c), + SyncingSource: source.Kind(c, &appsv1.Deployment{}), cacheSyncDone: sourceSynced, }, }} @@ -230,9 +226,8 @@ var _ = Describe("controller", func() { Object: p, } - ins := &source.Channel{Source: ch} + ins := &source.Channel{Source: ch, Stop: ctx.Done()} ins.DestBufferSize = 1 - Expect(inject.StopChannelInto(ctx.Done(), ins)).To(BeTrue()) // send the event to the channel ch <- evt @@ -267,18 +262,16 @@ var _ = Describe("controller", func() { e := ctrl.Start(ctx) Expect(e).NotTo(BeNil()) - Expect(e.Error()).To(ContainSubstring("must call InjectStop on Channel before calling Start")) + Expect(e.Error()).To(ContainSubstring("must specify Channel.Stop before calling Start")) }) It("should error when channel source is not specified", func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ins := &source.Channel{} - Expect(inject.StopChannelInto(make(<-chan struct{}), ins)).To(BeTrue()) - + ins := &source.Channel{Stop: make(<-chan struct{})} ctrl.startWatches = []watchDescription{{ - src: &source.Channel{}, + src: ins, }} e := ctrl.Start(ctx) @@ -336,126 +329,6 @@ var _ = Describe("controller", func() { }) - Describe("Watch", func() { - It("should inject dependencies into the Source", func() { - src := &source.Kind{Type: &corev1.Pod{}} - Expect(src.InjectCache(informers)).To(Succeed()) - evthdl := &handler.EnqueueRequestForObject{} - found := false - ctrl.SetFields = func(i interface{}) error { - defer GinkgoRecover() - if i == src { - found = true - } - return nil - } - Expect(ctrl.Watch(src, evthdl)).NotTo(HaveOccurred()) - Expect(found).To(BeTrue(), "Source not injected") - }) - - It("should return an error if there is an error injecting into the Source", func() { - src := &source.Kind{Type: &corev1.Pod{}} - Expect(src.InjectCache(informers)).To(Succeed()) - evthdl := &handler.EnqueueRequestForObject{} - expected := fmt.Errorf("expect fail source") - ctrl.SetFields = func(i interface{}) error { - defer GinkgoRecover() - if i == src { - return expected - } - return nil - } - Expect(ctrl.Watch(src, evthdl)).To(Equal(expected)) - }) - - It("should inject dependencies into the EventHandler", func() { - src := &source.Kind{Type: &corev1.Pod{}} - Expect(src.InjectCache(informers)).To(Succeed()) - evthdl := &handler.EnqueueRequestForObject{} - found := false - ctrl.SetFields = func(i interface{}) error { - defer GinkgoRecover() - if i == evthdl { - found = true - } - return nil - } - Expect(ctrl.Watch(src, evthdl)).NotTo(HaveOccurred()) - Expect(found).To(BeTrue(), "EventHandler not injected") - }) - - It("should return an error if there is an error injecting into the EventHandler", func() { - src := &source.Kind{Type: &corev1.Pod{}} - evthdl := &handler.EnqueueRequestForObject{} - expected := fmt.Errorf("expect fail eventhandler") - ctrl.SetFields = func(i interface{}) error { - defer GinkgoRecover() - if i == evthdl { - return expected - } - return nil - } - Expect(ctrl.Watch(src, evthdl)).To(Equal(expected)) - }) - - PIt("should inject dependencies into the Reconciler", func() { - // TODO(community): Write this - }) - - PIt("should return an error if there is an error injecting into the Reconciler", func() { - // TODO(community): Write this - }) - - It("should inject dependencies into all of the Predicates", func() { - src := &source.Kind{Type: &corev1.Pod{}} - Expect(src.InjectCache(informers)).To(Succeed()) - evthdl := &handler.EnqueueRequestForObject{} - pr1 := &predicate.Funcs{} - pr2 := &predicate.Funcs{} - found1 := false - found2 := false - ctrl.SetFields = func(i interface{}) error { - defer GinkgoRecover() - if i == pr1 { - found1 = true - } - if i == pr2 { - found2 = true - } - return nil - } - Expect(ctrl.Watch(src, evthdl, pr1, pr2)).NotTo(HaveOccurred()) - Expect(found1).To(BeTrue(), "First Predicated not injected") - Expect(found2).To(BeTrue(), "Second Predicated not injected") - }) - - It("should return an error if there is an error injecting into any of the Predicates", func() { - src := &source.Kind{Type: &corev1.Pod{}} - Expect(src.InjectCache(informers)).To(Succeed()) - evthdl := &handler.EnqueueRequestForObject{} - pr1 := &predicate.Funcs{} - pr2 := &predicate.Funcs{} - expected := fmt.Errorf("expect fail predicate") - ctrl.SetFields = func(i interface{}) error { - defer GinkgoRecover() - if i == pr1 { - return expected - } - return nil - } - Expect(ctrl.Watch(src, evthdl, pr1, pr2)).To(Equal(expected)) - - ctrl.SetFields = func(i interface{}) error { - defer GinkgoRecover() - if i == pr2 { - return expected - } - return nil - } - Expect(ctrl.Watch(src, evthdl, pr1, pr2)).To(Equal(expected)) - }) - }) - Describe("Processing queue items from a Controller", func() { It("should call Reconciler if an item is enqueued", func() { ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/internal/recorder/recorder_integration_test.go b/pkg/internal/recorder/recorder_integration_test.go index 30928c390f..130a306053 100644 --- a/pkg/internal/recorder/recorder_integration_test.go +++ b/pkg/internal/recorder/recorder_integration_test.go @@ -56,7 +56,7 @@ var _ = Describe("recorder", func() { Expect(err).NotTo(HaveOccurred()) By("Watching Resources") - err = instance.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForObject{}) + err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}), &handler.EnqueueRequestForObject{}) Expect(err).NotTo(HaveOccurred()) By("Starting the Manager") diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 3e79f50bbd..1100b2c832 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -206,9 +206,6 @@ func (cm *controllerManager) SetFields(i interface{}) error { if _, err := inject.InjectorInto(cm.SetFields, i); err != nil { return err } - if _, err := inject.StopChannelInto(cm.internalProceduresStop, i); err != nil { - return err - } if _, err := inject.LoggerInto(cm.logger, i); err != nil { return err } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index f3b8443a95..030982fe4f 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -1547,11 +1547,6 @@ var _ = Describe("manger.Manager", func() { Expect(c).To(Equal(m.GetCache())) return nil }, - stop: func(stop <-chan struct{}) error { - defer GinkgoRecover() - Expect(stop).NotTo(BeNil()) - return nil - }, f: func(f inject.Func) error { defer GinkgoRecover() Expect(f).NotTo(BeNil()) @@ -1602,13 +1597,6 @@ var _ = Describe("manger.Manager", func() { }, }) Expect(err).To(Equal(expected)) - - err = m.SetFields(&injectable{ - stop: func(<-chan struct{}) error { - return expected - }, - }) - Expect(err).To(Equal(expected)) }) }) @@ -1742,7 +1730,6 @@ var _ inject.Cache = &injectable{} var _ inject.Client = &injectable{} var _ inject.Scheme = &injectable{} var _ inject.Config = &injectable{} -var _ inject.Stoppable = &injectable{} var _ inject.Logger = &injectable{} type injectable struct { @@ -1751,7 +1738,6 @@ type injectable struct { config func(config *rest.Config) error cache func(cache.Cache) error f func(inject.Func) error - stop func(<-chan struct{}) error log func(logger logr.Logger) error } @@ -1790,13 +1776,6 @@ func (i *injectable) InjectFunc(f inject.Func) error { return i.f(f) } -func (i *injectable) InjectStopChannel(stop <-chan struct{}) error { - if i.stop == nil { - return nil - } - return i.stop(stop) -} - func (i *injectable) InjectLogger(log logr.Logger) error { if i.log == nil { return nil diff --git a/pkg/predicate/predicate.go b/pkg/predicate/predicate.go index 8b0f3634e4..314635875e 100644 --- a/pkg/predicate/predicate.go +++ b/pkg/predicate/predicate.go @@ -24,7 +24,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" - "sigs.k8s.io/controller-runtime/pkg/runtime/inject" ) var log = logf.RuntimeLog.WithName("predicate").WithName("eventFilters") @@ -242,15 +241,6 @@ type and struct { predicates []Predicate } -func (a and) InjectFunc(f inject.Func) error { - for _, p := range a.predicates { - if err := f(p); err != nil { - return err - } - } - return nil -} - func (a and) Create(e event.CreateEvent) bool { for _, p := range a.predicates { if !p.Create(e) { @@ -296,15 +286,6 @@ type or struct { predicates []Predicate } -func (o or) InjectFunc(f inject.Func) error { - for _, p := range o.predicates { - if err := f(p); err != nil { - return err - } - } - return nil -} - func (o or) Create(e event.CreateEvent) bool { for _, p := range o.predicates { if p.Create(e) { @@ -350,10 +331,6 @@ type not struct { predicate Predicate } -func (n not) InjectFunc(f inject.Func) error { - return f(n.predicate) -} - func (n not) Create(e event.CreateEvent) bool { return !n.predicate.Create(e) } diff --git a/pkg/runtime/inject/inject.go b/pkg/runtime/inject/inject.go index c8c56ba817..91254b65b1 100644 --- a/pkg/runtime/inject/inject.go +++ b/pkg/runtime/inject/inject.go @@ -102,21 +102,6 @@ func SchemeInto(scheme *runtime.Scheme, i interface{}) (bool, error) { return false, nil } -// Stoppable is used by the ControllerManager to inject stop channel into Sources, -// EventHandlers, Predicates, and Reconciles. -type Stoppable interface { - InjectStopChannel(<-chan struct{}) error -} - -// StopChannelInto will set stop channel on i and return the result if it implements Stoppable. -// Returns false if i does not implement Stoppable. -func StopChannelInto(stop <-chan struct{}, i interface{}) (bool, error) { - if s, ok := i.(Stoppable); ok { - return true, s.InjectStopChannel(stop) - } - return false, nil -} - // Mapper is used to inject the rest mapper to components that may need it. type Mapper interface { InjectMapper(meta.RESTMapper) error diff --git a/pkg/runtime/inject/inject_test.go b/pkg/runtime/inject/inject_test.go index 7818909221..974bae61bc 100644 --- a/pkg/runtime/inject/inject_test.go +++ b/pkg/runtime/inject/inject_test.go @@ -130,28 +130,6 @@ var _ = Describe("runtime inject", func() { Expect(res).To(Equal(true)) }) - It("should set stop channel", func() { - - stop := make(<-chan struct{}) - - By("Validating injecting stop channel") - res, err := StopChannelInto(stop, instance) - Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal(true)) - Expect(stop).To(Equal(instance.GetStop())) - - By("Returning false if the type does not implement inject.Stoppable") - res, err = StopChannelInto(stop, uninjectable) - Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal(false)) - Expect(uninjectable.GetStop()).To(BeNil()) - - By("Returning an error if stop channel injection fails") - res, err = StopChannelInto(nil, instance) - Expect(err).To(Equal(errInjectFail)) - Expect(res).To(Equal(true)) - }) - It("should set api reader", func() { apiReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{Client: fake.NewClientBuilder().Build()}) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/source/example_test.go b/pkg/source/example_test.go index d306eaf583..77857729de 100644 --- a/pkg/source/example_test.go +++ b/pkg/source/example_test.go @@ -21,15 +21,17 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/source" ) +var mgr manager.Manager var ctrl controller.Controller // This example Watches for Pod Events (e.g. Create / Update / Delete) and enqueues a reconcile.Request // with the Name and Namespace of the Pod. func ExampleKind() { - err := ctrl.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}) + err := ctrl.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{}) if err != nil { // handle it } diff --git a/pkg/source/source.go b/pkg/source/source.go index 6b67563924..e7797cc98e 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -24,14 +24,15 @@ import ( "time" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" - "sigs.k8s.io/controller-runtime/pkg/runtime/inject" "sigs.k8s.io/controller-runtime/pkg/source/internal" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -52,8 +53,7 @@ const ( // // * Use Channel for events originating outside the cluster (eh.g. GitHub Webhook callback, Polling external urls). // -// Users may build their own Source implementations. If their implementations implement any of the inject package -// interfaces, the dependencies will be injected by the Controller when Watch is called. +// Users may build their own Source implementations. type Source interface { // Start is internal and should be called only by the Controller to register an EventHandler with the Informer // to enqueue reconcile.Requests. @@ -67,34 +67,21 @@ type SyncingSource interface { WaitForSync(ctx context.Context) error } -// NewKindWithCache creates a Source without InjectCache, so that it is assured that the given cache is used -// and not overwritten. It can be used to watch objects in a different cluster by passing the cache -// from that other cluster. -func NewKindWithCache(object client.Object, cache cache.Cache) SyncingSource { - return &kindWithCache{kind: Kind{Type: object, cache: cache}} +// KindSource is a syncing source that targets a specific object type. +type KindSource interface { + SyncingSource + AsPartialMetadata(*runtime.Scheme) error } -type kindWithCache struct { - kind Kind +// Kind creates a KindSource with the given cache provider. +func Kind(cache cache.Cache, object client.Object) KindSource { + return &kind{obj: object, cache: cache} } -func (ks *kindWithCache) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, - prct ...predicate.Predicate) error { - return ks.kind.Start(ctx, handler, queue, prct...) -} - -func (ks *kindWithCache) String() string { - return ks.kind.String() -} - -func (ks *kindWithCache) WaitForSync(ctx context.Context) error { - return ks.kind.WaitForSync(ctx) -} - -// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). -type Kind struct { - // Type is the type of object to watch. e.g. &v1.Pod{} - Type client.Object +// kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). +type kind struct { + // obj is the type of object to watch. e.g. &v1.Pod{} + obj client.Object // cache used to watch APIs cache cache.Cache @@ -105,22 +92,12 @@ type Kind struct { startCancel func() } -var _ SyncingSource = &Kind{} +var _ KindSource = &kind{} // Start is internal and should be called only by the Controller to register an EventHandler with the Informer // to enqueue reconcile.Requests. -func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, +func (ks *kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { - // Type should have been specified by the user. - if ks.Type == nil { - return fmt.Errorf("must specify Kind.Type") - } - - // cache should have been injected before Start was called - if ks.cache == nil { - return fmt.Errorf("must call CacheInto on Kind before calling Start") - } - // cache.GetInformer will block until its context is cancelled if the cache was already started and it can not // sync that informer (most commonly due to RBAC issues). ctx, ks.startCancel = context.WithCancel(ctx) @@ -135,7 +112,7 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w // an error or the specified context is cancelled or expired. if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) { // Lookup the Informer from the Cache and add an EventHandler which populates the Queue - i, lastErr = ks.cache.GetInformer(ctx, ks.Type) + i, lastErr = ks.cache.GetInformer(ctx, ks.obj) if lastErr != nil { kindMatchErr := &meta.NoKindMatchError{} switch { @@ -174,16 +151,27 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w return nil } -func (ks *Kind) String() string { - if ks.Type != nil { - return fmt.Sprintf("kind source: %T", ks.Type) +func (ks *kind) AsPartialMetadata(scheme *runtime.Scheme) error { + metaObj := &metav1.PartialObjectMetadata{} + gvk, err := apiutil.GVKForObject(ks.obj, scheme) + if err != nil { + return fmt.Errorf("unable to determine GVK of %T for a metadata-only watch: %w", ks.obj, err) + } + metaObj.SetGroupVersionKind(gvk) + ks.obj = metaObj + return nil +} + +func (ks *kind) String() string { + if ks.obj != nil { + return fmt.Sprintf("kind source: %T", ks.obj) } return "kind source: unknown type" } // WaitForSync implements SyncingSource to allow controllers to wait with starting // workers until the cache is synced. -func (ks *Kind) WaitForSync(ctx context.Context) error { +func (ks *kind) WaitForSync(ctx context.Context) error { select { case err := <-ks.started: return err @@ -196,17 +184,6 @@ func (ks *Kind) WaitForSync(ctx context.Context) error { } } -var _ inject.Cache = &Kind{} - -// InjectCache is internal should be called only by the Controller. InjectCache is used to inject -// the Cache dependency initialized by the ControllerManager. -func (ks *Kind) InjectCache(c cache.Cache) error { - if ks.cache == nil { - ks.cache = c - } - return nil -} - var _ Source = &Channel{} // Channel is used to provide a source of events originating outside the cluster @@ -219,8 +196,8 @@ type Channel struct { // Source is the source channel to fetch GenericEvents Source <-chan event.GenericEvent - // stop is to end ongoing goroutine, and close the channels - stop <-chan struct{} + // Stop is to end ongoing goroutine, and close the channels + Stop <-chan struct{} // dest is the destination channels of the added event handlers dest []chan event.GenericEvent @@ -237,18 +214,6 @@ func (cs *Channel) String() string { return fmt.Sprintf("channel source: %p", cs) } -var _ inject.Stoppable = &Channel{} - -// InjectStopChannel is internal should be called only by the Controller. -// It is used to inject the stop channel initialized by the ControllerManager. -func (cs *Channel) InjectStopChannel(stop <-chan struct{}) error { - if cs.stop == nil { - cs.stop = stop - } - - return nil -} - // Start implements Source and should only be called by the Controller. func (cs *Channel) Start( ctx context.Context, @@ -260,9 +225,8 @@ func (cs *Channel) Start( return fmt.Errorf("must specify Channel.Source") } - // stop should have been injected before Start was called - if cs.stop == nil { - return fmt.Errorf("must call InjectStop on Channel before calling Start") + if cs.Stop == nil { + return fmt.Errorf("must specify Channel.Stop before calling Start") } // use default value if DestBufferSize not specified diff --git a/pkg/source/source_integration_test.go b/pkg/source/source_integration_test.go index c7b3da39e2..2527cf0034 100644 --- a/pkg/source/source_integration_test.go +++ b/pkg/source/source_integration_test.go @@ -23,7 +23,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/runtime/inject" "sigs.k8s.io/controller-runtime/pkg/source" . "github.com/onsi/ginkgo/v2" @@ -37,7 +36,7 @@ import ( ) var _ = Describe("Source", func() { - var instance1, instance2 *source.Kind + var instance1, instance2 source.Source var obj client.Object var q workqueue.RateLimitingInterface var c1, c2 chan interface{} @@ -59,11 +58,8 @@ var _ = Describe("Source", func() { }) JustBeforeEach(func() { - instance1 = &source.Kind{Type: obj} - Expect(inject.CacheInto(icache, instance1)).To(BeTrue()) - - instance2 = &source.Kind{Type: obj} - Expect(inject.CacheInto(icache, instance2)).To(BeTrue()) + instance1 = source.Kind(icache, obj) + instance2 = source.Kind(icache, obj) }) AfterEach(func() { diff --git a/pkg/source/source_test.go b/pkg/source/source_test.go index c2e6904180..8891fb51ff 100644 --- a/pkg/source/source_test.go +++ b/pkg/source/source_test.go @@ -27,7 +27,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/runtime/inject" "sigs.k8s.io/controller-runtime/pkg/source" corev1 "k8s.io/api/core/v1" @@ -65,10 +64,7 @@ var _ = Describe("Source", func() { } q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Kind{ - Type: &corev1.Pod{}, - } - Expect(inject.CacheInto(ic, instance)).To(BeTrue()) + instance := source.Kind(ic, &corev1.Pod{}) err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() @@ -105,10 +101,7 @@ var _ = Describe("Source", func() { ic := &informertest.FakeInformers{} q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Kind{ - Type: &corev1.Pod{}, - } - Expect(instance.InjectCache(ic)).To(Succeed()) + instance := source.Kind(ic, &corev1.Pod{}) err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() @@ -153,10 +146,7 @@ var _ = Describe("Source", func() { } q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Kind{ - Type: &corev1.Pod{}, - } - Expect(inject.CacheInto(ic, instance)).To(BeTrue()) + instance := source.Kind(ic, &corev1.Pod{}) err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() @@ -189,24 +179,22 @@ var _ = Describe("Source", func() { }) It("should return an error from Start if informers were not injected", func() { - instance := source.Kind{Type: &corev1.Pod{}} + instance := source.Kind(ic, &corev1.Pod{}) err := instance.Start(ctx, nil, nil) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("must call CacheInto on Kind before calling Start")) }) It("should return an error from Start if a type was not provided", func() { - instance := source.Kind{} - Expect(instance.InjectCache(&informertest.FakeInformers{})).To(Succeed()) + instance := source.Kind(ic, nil) err := instance.Start(ctx, nil, nil) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("must specify Kind.Type")) }) It("should return an error if syncing fails", func() { - instance := source.Kind{Type: &corev1.Pod{}} f := false - Expect(instance.InjectCache(&informertest.FakeInformers{Synced: &f})).To(Succeed()) + instance := source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}) Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred()) err := instance.WaitForSync(context.Background()) Expect(err).To(HaveOccurred()) @@ -222,28 +210,16 @@ var _ = Describe("Source", func() { ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() - instance := &source.Kind{ - Type: &corev1.Pod{}, - } - Expect(instance.InjectCache(ic)).To(Succeed()) + instance := source.Kind(ic, nil) err := instance.Start(ctx, handler.Funcs{}, q) Expect(err).NotTo(HaveOccurred()) Eventually(instance.WaitForSync(context.Background())).Should(HaveOccurred()) }) }) - }) - - Describe("KindWithCache", func() { - It("should not allow injecting a cache", func() { - instance := source.NewKindWithCache(nil, nil) - injected, err := inject.CacheInto(&informertest.FakeInformers{}, instance) - Expect(err).To(BeNil()) - Expect(injected).To(BeFalse()) - }) It("should return an error if syncing fails", func() { f := false - instance := source.NewKindWithCache(&corev1.Pod{}, &informertest.FakeInformers{Synced: &f}) + instance := source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}) Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred()) err := instance.WaitForSync(context.Background()) Expect(err).To(HaveOccurred()) @@ -312,8 +288,7 @@ var _ = Describe("Source", func() { } q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Channel{Source: ch} - Expect(inject.StopChannelInto(ctx.Done(), instance)).To(BeTrue()) + instance := &source.Channel{Source: ch, Stop: ctx.Done()} err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() @@ -351,9 +326,8 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") // Add a handler to get distribution blocked - instance := &source.Channel{Source: ch} + instance := &source.Channel{Source: ch, Stop: ctx.Done()} instance.DestBufferSize = 1 - Expect(inject.StopChannelInto(ctx.Done(), instance)).To(BeTrue()) err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() @@ -408,9 +382,8 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") // Add a handler to get distribution blocked - instance := &source.Channel{Source: ch} + instance := &source.Channel{Source: ch, Stop: ctx.Done()} instance.DestBufferSize = 1 - Expect(inject.StopChannelInto(ctx.Done(), instance)).To(BeTrue()) err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { @@ -448,8 +421,7 @@ var _ = Describe("Source", func() { close(ch) By("feeding that channel to a channel source") - src := &source.Channel{Source: ch} - Expect(inject.StopChannelInto(ctx.Done(), src)).To(BeTrue()) + src := &source.Channel{Source: ch, Stop: ctx.Done()} processed := make(chan struct{}) defer close(processed) @@ -481,16 +453,15 @@ var _ = Describe("Source", func() { }) It("should get error if no source specified", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Channel{ /*no source specified*/ } - Expect(inject.StopChannelInto(ctx.Done(), instance)).To(BeTrue()) + instance := &source.Channel{ /*no source specified*/ Stop: ctx.Done()} err := instance.Start(ctx, handler.Funcs{}, q) Expect(err).To(Equal(fmt.Errorf("must specify Channel.Source"))) }) - It("should get error if no stop channel injected", func() { + It("should get error if no stop channel provided", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := &source.Channel{Source: ch} err := instance.Start(ctx, handler.Funcs{}, q) - Expect(err).To(Equal(fmt.Errorf("must call InjectStop on Channel before calling Start"))) + Expect(err).To(Equal(fmt.Errorf("must specify Channel.Stop before calling Start"))) }) }) Context("for multi sources (handlers)", func() { @@ -508,8 +479,7 @@ var _ = Describe("Source", func() { c2 := make(chan struct{}) q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Channel{Source: ch} - Expect(inject.StopChannelInto(ctx.Done(), instance)).To(BeTrue()) + instance := &source.Channel{Source: ch, Stop: ctx.Done()} err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover()