Skip to content

Commit

Permalink
Refactor all controllers to generics
Browse files Browse the repository at this point in the history
- Point to fork CR, subject to change

Signed-off-by: Danil Grigorev <[email protected]>
  • Loading branch information
Danil-Grigorev committed Apr 17, 2024
1 parent 7506b7f commit 4942537
Show file tree
Hide file tree
Showing 44 changed files with 449 additions and 693 deletions.
55 changes: 20 additions & 35 deletions bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,30 +108,31 @@ func (r *KubeadmConfigReconciler) SetupWithManager(ctx context.Context, mgr ctrl
}

b := ctrl.NewControllerManagedBy(mgr).
For(&bootstrapv1.KubeadmConfig{}).
Add(builder.For(mgr,
&bootstrapv1.KubeadmConfig{},
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &bootstrapv1.KubeadmConfig{}),
)).
WithOptions(options).
Watches(
Add(builder.Watches(mgr,
&clusterv1.Machine{},
handler.EnqueueRequestsFromMapFunc(r.MachineToBootstrapMapFunc),
).WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue))
handler.EnqueueRequestsFromObjectMap(r.MachineToBootstrapMapFunc),
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Machine{}),
))

if feature.Gates.Enabled(feature.MachinePool) {
b = b.Watches(
b = b.Add(builder.Watches(mgr,
&expv1.MachinePool{},
handler.EnqueueRequestsFromMapFunc(r.MachinePoolToBootstrapMapFunc),
)
handler.EnqueueRequestsFromObjectMap(r.MachinePoolToBootstrapMapFunc),
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &expv1.MachinePool{}),
))
}

b = b.Watches(
b = b.Add(builder.Watches(mgr,
&clusterv1.Cluster{},
handler.EnqueueRequestsFromMapFunc(r.ClusterToKubeadmConfigs),
builder.WithPredicates(
predicates.All(ctrl.LoggerFrom(ctx),
predicates.ClusterUnpausedAndInfrastructureReady(ctrl.LoggerFrom(ctx)),
predicates.ResourceHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue),
),
),
)
handler.EnqueueRequestsFromObjectMap(r.ClusterToKubeadmConfigs),
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}),
predicates.ClusterUnpausedAndInfrastructureReady(ctrl.LoggerFrom(ctx)),
))

if err := b.Complete(r); err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
Expand Down Expand Up @@ -875,14 +876,8 @@ func (r *KubeadmConfigReconciler) tokenCheckRefreshOrRotationInterval() time.Dur

// ClusterToKubeadmConfigs is a handler.ToRequestsFunc to be used to enqueue
// requests for reconciliation of KubeadmConfigs.
func (r *KubeadmConfigReconciler) ClusterToKubeadmConfigs(ctx context.Context, o client.Object) []ctrl.Request {
func (r *KubeadmConfigReconciler) ClusterToKubeadmConfigs(ctx context.Context, c *clusterv1.Cluster) []ctrl.Request {
result := []ctrl.Request{}

c, ok := o.(*clusterv1.Cluster)
if !ok {
panic(fmt.Sprintf("Expected a Cluster but got a %T", o))
}

selectors := []client.ListOption{
client.InNamespace(c.Namespace),
client.MatchingLabels{
Expand Down Expand Up @@ -923,12 +918,7 @@ func (r *KubeadmConfigReconciler) ClusterToKubeadmConfigs(ctx context.Context, o

// MachineToBootstrapMapFunc is a handler.ToRequestsFunc to be used to enqueue
// request for reconciliation of KubeadmConfig.
func (r *KubeadmConfigReconciler) MachineToBootstrapMapFunc(_ context.Context, o client.Object) []ctrl.Request {
m, ok := o.(*clusterv1.Machine)
if !ok {
panic(fmt.Sprintf("Expected a Machine but got a %T", o))
}

func (r *KubeadmConfigReconciler) MachineToBootstrapMapFunc(_ context.Context, m *clusterv1.Machine) []ctrl.Request {
result := []ctrl.Request{}
if m.Spec.Bootstrap.ConfigRef != nil && m.Spec.Bootstrap.ConfigRef.GroupVersionKind() == bootstrapv1.GroupVersion.WithKind("KubeadmConfig") {
name := client.ObjectKey{Namespace: m.Namespace, Name: m.Spec.Bootstrap.ConfigRef.Name}
Expand All @@ -939,12 +929,7 @@ func (r *KubeadmConfigReconciler) MachineToBootstrapMapFunc(_ context.Context, o

// MachinePoolToBootstrapMapFunc is a handler.ToRequestsFunc to be used to enqueue
// request for reconciliation of KubeadmConfig.
func (r *KubeadmConfigReconciler) MachinePoolToBootstrapMapFunc(_ context.Context, o client.Object) []ctrl.Request {
m, ok := o.(*expv1.MachinePool)
if !ok {
panic(fmt.Sprintf("Expected a MachinePool but got a %T", o))
}

func (r *KubeadmConfigReconciler) MachinePoolToBootstrapMapFunc(_ context.Context, m *expv1.MachinePool) []ctrl.Request {
result := []ctrl.Request{}
configRef := m.Spec.Template.Spec.Bootstrap.ConfigRef
if configRef != nil && configRef.GroupVersionKind().GroupKind() == bootstrapv1.GroupVersion.WithKind("KubeadmConfig").GroupKind() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestKubeadmConfigReconciler_MachineToBootstrapMapFuncReturn(t *testing.T) {
g := NewWithT(t)
cluster := builder.Cluster("my-cluster", metav1.NamespaceDefault).Build()
objs := []client.Object{cluster}
machineObjs := []client.Object{}
machineObjs := []*clusterv1.Machine{}
var expectedConfigName string
for i := 0; i < 3; i++ {
configName := fmt.Sprintf("my-config-%d", i)
Expand Down
8 changes: 3 additions & 5 deletions controllers/external/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"github.com/go-logr/logr"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand All @@ -42,7 +42,7 @@ type ObjectTracker struct {
}

// Watch uses the controller to issue a Watch only if the object hasn't been seen before.
func (o *ObjectTracker) Watch(log logr.Logger, obj runtime.Object, handler handler.EventHandler, p ...predicate.Predicate) error {
func (o *ObjectTracker) Watch(log logr.Logger, obj client.Object, handler handler.EventHandler, p ...predicate.Predicate) error {
// Consider this a no-op if the controller isn't present.
if o.Controller == nil {
return nil
Expand All @@ -59,9 +59,7 @@ func (o *ObjectTracker) Watch(log logr.Logger, obj runtime.Object, handler handl

log.Info(fmt.Sprintf("Adding watch on external object %q", gvk.String()))
err := o.Controller.Watch(
source.Kind(o.Cache, u),
handler,
append(p, predicates.ResourceNotPaused(log))...,
source.Kind(o.Cache, u).Prepare(handler, append(p, predicates.ResourceNotPaused(log, obj))...),
)
if err != nil {
o.m.Delete(key)
Expand Down
4 changes: 1 addition & 3 deletions controllers/external/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
Expand Down Expand Up @@ -55,7 +53,7 @@ func newWatchCountController(raiseError bool) *watchCountController {
}
}

func (c *watchCountController) Watch(_ source.Source, _ handler.EventHandler, _ ...predicate.Predicate) error {
func (c *watchCountController) Watch(_ source.Source) error {
c.count++
if c.raiseError {
return errors.New("injected failure")
Expand Down
6 changes: 4 additions & 2 deletions controllers/remote/cluster_cache_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -43,9 +44,10 @@ type ClusterCacheReconciler struct {
func (r *ClusterCacheReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
err := ctrl.NewControllerManagedBy(mgr).
Named("remote/clustercache").
For(&clusterv1.Cluster{}).
Add(builder.For(mgr, &clusterv1.Cluster{},
predicates.ResourceHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}),
)).
WithOptions(options).
WithEventFilter(predicates.ResourceHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue)).
Complete(r)

if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion controllers/remote/cluster_cache_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
Expand Down Expand Up @@ -84,8 +85,10 @@ func TestClusterCacheTracker(t *testing.T) {
c = &testController{
ch: make(chan string),
}
w, err = ctrl.NewControllerManagedBy(mgr).For(&clusterv1.MachineDeployment{}).Build(c)

watch, err := ctrl.NewControllerManagedBy(mgr).For(&clusterv1.MachineDeployment{}).Build(c)
g.Expect(err).ToNot(HaveOccurred())
w = &controller.ControllerAdapter{Controller: watch}

mgrContext, mgrCancel = context.WithCancel(ctx)
t.Log("Starting the manager")
Expand Down
33 changes: 15 additions & 18 deletions controlplane/kubeadm/internal/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,22 @@ type KubeadmControlPlaneReconciler struct {

func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
c, err := ctrl.NewControllerManagedBy(mgr).
For(&controlplanev1.KubeadmControlPlane{}).
Owns(&clusterv1.Machine{}).
Add(builder.For(mgr,
&controlplanev1.KubeadmControlPlane{},
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &controlplanev1.KubeadmControlPlane{}),
)).
Add(builder.Owns(mgr,
&controlplanev1.KubeadmControlPlane{},
&clusterv1.Machine{},
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Machine{}),
)).
WithOptions(options).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue)).
Watches(
Add(builder.Watches(mgr,
&clusterv1.Cluster{},
handler.EnqueueRequestsFromMapFunc(r.ClusterToKubeadmControlPlane),
builder.WithPredicates(
predicates.All(ctrl.LoggerFrom(ctx),
predicates.ResourceHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue),
predicates.ClusterUnpausedAndInfrastructureReady(ctrl.LoggerFrom(ctx)),
),
),
).Build(r)
handler.EnqueueRequestsFromObjectMap(r.ClusterToKubeadmControlPlane),
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}),
predicates.ClusterUnpausedAndInfrastructureReady(ctrl.LoggerFrom(ctx)),
)).Build(r)
if err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
}
Expand Down Expand Up @@ -574,12 +576,7 @@ func (r *KubeadmControlPlaneReconciler) reconcileDelete(ctx context.Context, con

// ClusterToKubeadmControlPlane is a handler.ToRequestsFunc to be used to enqueue requests for reconciliation
// for KubeadmControlPlane based on updates to a Cluster.
func (r *KubeadmControlPlaneReconciler) ClusterToKubeadmControlPlane(_ context.Context, o client.Object) []ctrl.Request {
c, ok := o.(*clusterv1.Cluster)
if !ok {
panic(fmt.Sprintf("Expected a Cluster but got a %T", o))
}

func (r *KubeadmControlPlaneReconciler) ClusterToKubeadmControlPlane(_ context.Context, c *clusterv1.Cluster) []ctrl.Request {
controlPlaneRef := c.Spec.ControlPlaneRef
if controlPlaneRef != nil && controlPlaneRef.Kind == kubeadmControlPlaneKind {
return []ctrl.Request{{NamespacedName: client.ObjectKey{Namespace: controlPlaneRef.Namespace, Name: controlPlaneRef.Name}}}
Expand Down
2 changes: 1 addition & 1 deletion controlplane/kubeadm/internal/webhooks/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (v *ScaleValidator) SetupWebhookWithManager(mgr ctrl.Manager) error {
// ScaleValidator validates KCP for replicas.
type ScaleValidator struct {
Client client.Reader
decoder *admission.Decoder
decoder admission.Decoder
}

// Handle will validate for number of replicas.
Expand Down
21 changes: 8 additions & 13 deletions exp/addons/internal/controllers/clusterresourceset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package controllers

import (
"context"
"fmt"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -67,11 +66,14 @@ type ClusterResourceSetReconciler struct {

func (r *ClusterResourceSetReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
err := ctrl.NewControllerManagedBy(mgr).
For(&addonsv1.ClusterResourceSet{}).
Watches(
Add(builder.For(mgr, &addonsv1.ClusterResourceSet{},
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &addonsv1.ClusterResourceSet{}),
)).
Add(builder.Watches(mgr,
&clusterv1.Cluster{},
handler.EnqueueRequestsFromMapFunc(r.clusterToClusterResourceSet),
).
handler.EnqueueRequestsFromObjectMap(r.clusterToClusterResourceSet),
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}),
)).
WatchesMetadata(
&corev1.ConfigMap{},
handler.EnqueueRequestsFromMapFunc(r.resourceToClusterResourceSet),
Expand All @@ -87,7 +89,6 @@ func (r *ClusterResourceSetReconciler) SetupWithManager(ctx context.Context, mgr
),
).
WithOptions(options).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue)).
Complete(r)
if err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
Expand Down Expand Up @@ -434,14 +435,8 @@ func (r *ClusterResourceSetReconciler) ensureResourceOwnerRef(ctx context.Contex
}

// clusterToClusterResourceSet is mapper function that maps clusters to ClusterResourceSet.
func (r *ClusterResourceSetReconciler) clusterToClusterResourceSet(ctx context.Context, o client.Object) []ctrl.Request {
func (r *ClusterResourceSetReconciler) clusterToClusterResourceSet(ctx context.Context, cluster *clusterv1.Cluster) []ctrl.Request {
result := []ctrl.Request{}

cluster, ok := o.(*clusterv1.Cluster)
if !ok {
panic(fmt.Sprintf("Expected a Cluster but got a %T", o))
}

resourceList := &addonsv1.ClusterResourceSetList{}
if err := r.Client.List(ctx, resourceList, client.InNamespace(cluster.Namespace)); err != nil {
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -50,12 +51,12 @@ type ClusterResourceSetBindingReconciler struct {
func (r *ClusterResourceSetBindingReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
err := ctrl.NewControllerManagedBy(mgr).
For(&addonsv1.ClusterResourceSetBinding{}).
Watches(
Add(builder.Watches(mgr,
&clusterv1.Cluster{},
handler.EnqueueRequestsFromMapFunc(r.clusterToClusterResourceSetBinding),
).
handler.EnqueueRequestsFromObjectMap(r.clusterToClusterResourceSetBinding),
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}),
)).
WithOptions(options).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue)).
Complete(r)
if err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
Expand Down Expand Up @@ -106,7 +107,7 @@ func (r *ClusterResourceSetBindingReconciler) Reconcile(ctx context.Context, req
}

// clusterToClusterResourceSetBinding is mapper function that maps clusters to ClusterResourceSetBinding.
func (r *ClusterResourceSetBindingReconciler) clusterToClusterResourceSetBinding(_ context.Context, o client.Object) []ctrl.Request {
func (r *ClusterResourceSetBindingReconciler) clusterToClusterResourceSetBinding(_ context.Context, o *clusterv1.Cluster) []ctrl.Request {
return []reconcile.Request{
{
NamespacedName: client.ObjectKey{
Expand Down
31 changes: 11 additions & 20 deletions exp/internal/controllers/machinepool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package controllers

import (
"context"
"fmt"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -88,20 +87,17 @@ func (r *MachinePoolReconciler) SetupWithManager(ctx context.Context, mgr ctrl.M
}

c, err := ctrl.NewControllerManagedBy(mgr).
For(&expv1.MachinePool{}).
Add(builder.For(mgr,
&expv1.MachinePool{},
predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &expv1.MachinePool{}))).
WithOptions(options).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue)).
Watches(
Add(builder.Watches(mgr,
&clusterv1.Cluster{},
handler.EnqueueRequestsFromMapFunc(clusterToMachinePools),
handler.EnqueueRequestsFromObjectMap(clusterToMachinePools),
// TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources?
builder.WithPredicates(
predicates.All(ctrl.LoggerFrom(ctx),
predicates.ClusterUnpaused(ctrl.LoggerFrom(ctx)),
predicates.ResourceHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue),
),
),
).
predicates.ClusterUnpaused(ctrl.LoggerFrom(ctx)),
predicates.ResourceHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue, &clusterv1.Cluster{}),
)).
Build(r)
if err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
Expand Down Expand Up @@ -335,18 +331,13 @@ func (r *MachinePoolReconciler) watchClusterNodes(ctx context.Context, cluster *
return r.Tracker.Watch(ctx, remote.WatchInput{
Name: "machinepool-watchNodes",
Cluster: util.ObjectKey(cluster),
Watcher: r.controller,
Watcher: &controller.ControllerAdapter{Controller: r.controller},
Kind: &corev1.Node{},
EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachinePool),
EventHandler: handler.EnqueueRequestsFromObjectMapFunc(r.nodeToMachinePool),
})
}

func (r *MachinePoolReconciler) nodeToMachinePool(ctx context.Context, o client.Object) []reconcile.Request {
node, ok := o.(*corev1.Node)
if !ok {
panic(fmt.Sprintf("Expected a Node but got a %T", o))
}

func (r *MachinePoolReconciler) nodeToMachinePool(ctx context.Context, node *corev1.Node) []reconcile.Request {
var filters []client.ListOption
// Match by clusterName when the node has the annotation.
if clusterName, ok := node.GetAnnotations()[clusterv1.ClusterNameAnnotation]; ok {
Expand Down
Loading

0 comments on commit 4942537

Please sign in to comment.