Skip to content

Commit

Permalink
Merge pull request #2120 from vincepri/rework-source-predicate-handlers
Browse files Browse the repository at this point in the history
⚠️ Refactor source/handler/predicate packages to remove dep injection
  • Loading branch information
k8s-ci-robot committed Jan 18, 2023
2 parents 16f7965 + ea1fcf3 commit 9241bce
Show file tree
Hide file tree
Showing 35 changed files with 289 additions and 1,414 deletions.
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ issues:
- Subprocess launch(ed with variable|ing should be audited)
- (G204|G104|G307)
- "ST1000: at least one file in a package should have a package comment"
- "SA1019: \"sigs.k8s.io/controller-runtime/pkg/runtime/inject\""
- "SA1019: inject.*"
exclude-rules:
- linters:
- gosec
Expand Down
6 changes: 3 additions & 3 deletions examples/builtins/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ func main() {
}

// Watch ReplicaSets and enqueue ReplicaSet object key
if err := c.Watch(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForObject{}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}); err != nil {
entryLog.Error(err, "unable to watch ReplicaSets")
os.Exit(1)
}

// Watch Pods and enqueue owning ReplicaSet key
if err := c.Watch(&source.Kind{Type: &corev1.Pod{}},
&handler.EnqueueRequestForOwner{OwnerType: &appsv1.ReplicaSet{}, IsController: true}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner())); err != nil {
entryLog.Error(err, "unable to watch Pods")
os.Exit(1)
}
Expand Down
2 changes: 1 addition & 1 deletion hack/test-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ if [[ -n ${ARTIFACTS:-} ]]; then
fi

result=0
go test -race ${P_FLAG} ${MOD_OPT} ./... ${GINKGO_ARGS} || result=$?
go test -v -race ${P_FLAG} ${MOD_OPT} ./... --ginkgo.fail-fast ${GINKGO_ARGS} || result=$?

if [[ -n ${ARTIFACTS:-} ]]; then
mkdir -p ${ARTIFACTS}
Expand Down
22 changes: 12 additions & 10 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
internalsource "sigs.k8s.io/controller-runtime/pkg/internal/source"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -217,11 +218,11 @@ func (blder *Builder) project(obj client.Object, proj objectProjection) (client.
func (blder *Builder) doWatch() error {
// Reconcile type
if blder.forInput.object != nil {
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
obj, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
src := source.Kind(blder.mgr.GetCache(), obj)
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
Expand All @@ -234,15 +235,16 @@ func (blder *Builder) doWatch() error {
return errors.New("Owns() can only be used together with For()")
}
for _, own := range blder.ownsInput {
typeForSrc, err := blder.project(own.object, own.objectProjection)
obj, err := blder.project(own.object, own.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.forInput.object,
IsController: true,
}
src := source.Kind(blder.mgr.GetCache(), obj)
hdler := handler.EnqueueRequestForOwner(
blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(),
blder.forInput.object,
handler.OnlyControllerOwner(),
)
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
Expand All @@ -258,8 +260,8 @@ func (blder *Builder) doWatch() error {
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)

// If the source of this watch is of type *source.Kind, project it.
if srckind, ok := w.src.(*source.Kind); ok {
// If the source of this watch is of type Kind, project it.
if srckind, ok := w.src.(*internalsource.Kind); ok {
typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
if err != nil {
return err
Expand Down
18 changes: 10 additions & 8 deletions pkg/builder/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ var _ = Describe("application", func() {
Expect(err).NotTo(HaveOccurred())

instance, err := ControllerManagedBy(m).
Watches(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForObject{}).
Watches(source.Kind(m.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}).
Build(noop)
Expect(err).To(MatchError(ContainSubstring("one of For() or Named() must be called")))
Expect(instance).To(BeNil())
Expand Down Expand Up @@ -157,7 +157,7 @@ var _ = Describe("application", func() {

instance, err := ControllerManagedBy(m).
Named("my_controller").
Watches(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForObject{}).
Watches(source.Kind(m.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}).
Build(noop)
Expect(err).NotTo(HaveOccurred())
Expect(instance).NotTo(BeNil())
Expand Down Expand Up @@ -369,8 +369,9 @@ var _ = Describe("application", func() {
bldr := ControllerManagedBy(m).
For(&appsv1.Deployment{}).
Watches( // Equivalent of Owns
&source.Kind{Type: &appsv1.ReplicaSet{}},
&handler.EnqueueRequestForOwner{OwnerType: &appsv1.Deployment{}, IsController: true})
source.Kind(m.GetCache(), &appsv1.ReplicaSet{}),
handler.EnqueueRequestForOwner(m.GetScheme(), m.GetRESTMapper(), &appsv1.Deployment{}, handler.OnlyControllerOwner()),
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -384,10 +385,11 @@ var _ = Describe("application", func() {
bldr := ControllerManagedBy(m).
Named("Deployment").
Watches( // Equivalent of For
&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForObject{}).
source.Kind(m.GetCache(), &appsv1.Deployment{}), &handler.EnqueueRequestForObject{}).
Watches( // Equivalent of Owns
&source.Kind{Type: &appsv1.ReplicaSet{}},
&handler.EnqueueRequestForOwner{OwnerType: &appsv1.Deployment{}, IsController: true})
source.Kind(m.GetCache(), &appsv1.ReplicaSet{}),
handler.EnqueueRequestForOwner(m.GetScheme(), m.GetRESTMapper(), &appsv1.Deployment{}, handler.OnlyControllerOwner()),
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -481,7 +483,7 @@ var _ = Describe("application", func() {
bldr := ControllerManagedBy(mgr).
For(&appsv1.Deployment{}, OnlyMetadata).
Owns(&appsv1.ReplicaSet{}, OnlyMetadata).
Watches(&source.Kind{Type: &appsv1.StatefulSet{}},
Watches(source.Kind(mgr.GetCache(), &appsv1.StatefulSet{}),
handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
defer GinkgoRecover()

Expand Down
11 changes: 3 additions & 8 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,12 @@ import (

// Cluster provides various methods to interact with a cluster.
type Cluster interface {
// SetFields will set any dependencies on an object for which the object has implemented the inject
// interface - e.g. inject.Client.
// Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10.
SetFields(interface{}) error

// GetConfig returns an initialized Config
GetConfig() *rest.Config

// GetCache returns a cache.Cache
GetCache() cache.Cache

// GetScheme returns an initialized Scheme
GetScheme() *runtime.Scheme

Expand All @@ -57,9 +55,6 @@ type Cluster interface {
// GetFieldIndexer returns a client.FieldIndexer configured with the client
GetFieldIndexer() client.FieldIndexer

// GetCache returns a cache.Cache
GetCache() cache.Cache

// GetEventRecorderFor returns a new EventRecorder for the provided name
GetEventRecorderFor(name string) record.EventRecorder

Expand Down
128 changes: 0 additions & 128 deletions pkg/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
)

var _ = Describe("cluster.Cluster", func() {
Expand Down Expand Up @@ -111,78 +108,6 @@ var _ = Describe("cluster.Cluster", func() {
})
})

Describe("SetFields", func() {
It("should inject field values", func() {
c, err := New(cfg, func(o *Options) {
o.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
return &informertest.FakeInformers{}, nil
}
})
Expect(err).NotTo(HaveOccurred())

By("Injecting the dependencies")
err = c.SetFields(&injectable{
scheme: func(scheme *runtime.Scheme) error {
defer GinkgoRecover()
Expect(scheme).To(Equal(c.GetScheme()))
return nil
},
config: func(config *rest.Config) error {
defer GinkgoRecover()
Expect(config).To(Equal(c.GetConfig()))
return nil
},
client: func(client client.Client) error {
defer GinkgoRecover()
Expect(client).To(Equal(c.GetClient()))
return nil
},
cache: func(cache cache.Cache) error {
defer GinkgoRecover()
Expect(cache).To(Equal(c.GetCache()))
return nil
},
log: func(logger logr.Logger) error {
defer GinkgoRecover()
Expect(logger).To(Equal(logf.RuntimeLog.WithName("cluster")))
return nil
},
})
Expect(err).NotTo(HaveOccurred())

By("Returning an error if dependency injection fails")

expected := fmt.Errorf("expected error")
err = c.SetFields(&injectable{
client: func(client client.Client) error {
return expected
},
})
Expect(err).To(Equal(expected))

err = c.SetFields(&injectable{
scheme: func(scheme *runtime.Scheme) error {
return expected
},
})
Expect(err).To(Equal(expected))

err = c.SetFields(&injectable{
config: func(config *rest.Config) error {
return expected
},
})
Expect(err).To(Equal(expected))

err = c.SetFields(&injectable{
cache: func(c cache.Cache) error {
return expected
},
})
Expect(err).To(Equal(expected))
})
})

It("should not leak goroutines when stopped", func() {
currentGRs := goleak.IgnoreCurrent()

Expand Down Expand Up @@ -242,56 +167,3 @@ var _ = Describe("cluster.Cluster", func() {
Expect(c.GetAPIReader()).NotTo(BeNil())
})
})

var _ inject.Cache = &injectable{}
var _ inject.Client = &injectable{}
var _ inject.Scheme = &injectable{}
var _ inject.Config = &injectable{}
var _ inject.Logger = &injectable{}

type injectable struct {
scheme func(scheme *runtime.Scheme) error
client func(client.Client) error
config func(config *rest.Config) error
cache func(cache.Cache) error
log func(logger logr.Logger) error
}

func (i *injectable) InjectCache(c cache.Cache) error {
if i.cache == nil {
return nil
}
return i.cache(c)
}

func (i *injectable) InjectConfig(config *rest.Config) error {
if i.config == nil {
return nil
}
return i.config(config)
}

func (i *injectable) InjectClient(c client.Client) error {
if i.client == nil {
return nil
}
return i.client(c)
}

func (i *injectable) InjectScheme(scheme *runtime.Scheme) error {
if i.scheme == nil {
return nil
}
return i.scheme(scheme)
}

func (i *injectable) InjectLogger(log logr.Logger) error {
if i.log == nil {
return nil
}
return i.log(log)
}

func (i *injectable) Start(<-chan struct{}) error {
return nil
}
23 changes: 0 additions & 23 deletions pkg/cluster/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
)

type cluster struct {
Expand Down Expand Up @@ -64,28 +63,6 @@ type cluster struct {
logger logr.Logger
}

func (c *cluster) SetFields(i interface{}) error {
if _, err := inject.ConfigInto(c.config, i); err != nil {
return err
}
if _, err := inject.ClientInto(c.client, i); err != nil {
return err
}
if _, err := inject.APIReaderInto(c.apiReader, i); err != nil {
return err
}
if _, err := inject.SchemeInto(c.scheme, i); err != nil {
return err
}
if _, err := inject.CacheInto(c.cache, i); err != nil {
return err
}
if _, err := inject.MapperInto(c.mapper, i); err != nil {
return err
}
return nil
}

func (c *cluster) GetConfig() *rest.Config {
return c.config
}
Expand Down
6 changes: 0 additions & 6 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,6 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
options.RateLimiter = workqueue.DefaultControllerRateLimiter()
}

// Inject dependencies into Reconciler
if err := mgr.SetFields(options.Reconciler); err != nil {
return nil, err
}

if options.RecoverPanic == nil {
options.RecoverPanic = mgr.GetControllerOptions().RecoverPanic
}
Expand All @@ -156,7 +151,6 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
},
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
CacheSyncTimeout: options.CacheSyncTimeout,
SetFields: mgr.SetFields,
Name: name,
LogConstructor: options.LogConstructor,
RecoverPanic: options.RecoverPanic,
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,13 @@ var _ = Describe("controller", func() {
Expect(err).NotTo(HaveOccurred())

By("Watching Resources")
err = instance.Watch(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForOwner{
OwnerType: &appsv1.Deployment{},
})
err = instance.Watch(
source.Kind(cm.GetCache(), &appsv1.ReplicaSet{}),
handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
)
Expect(err).NotTo(HaveOccurred())

err = instance.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForObject{})
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}), &handler.EnqueueRequestForObject{})
Expect(err).NotTo(HaveOccurred())

err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{})
Expand Down
Loading

0 comments on commit 9241bce

Please sign in to comment.