diff --git a/control-plane/pkg/reconciler/trigger/controller.go b/control-plane/pkg/reconciler/trigger/controller.go index 4f59f70d05..41138ca061 100644 --- a/control-plane/pkg/reconciler/trigger/controller.go +++ b/control-plane/pkg/reconciler/trigger/controller.go @@ -19,9 +19,13 @@ package trigger import ( "context" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "go.uber.org/zap" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" + "knative.dev/eventing/pkg/auth" v1 "knative.dev/eventing/pkg/client/informers/externalversions/eventing/v1" kubeclient "knative.dev/pkg/client/injection/kube/client" configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap" @@ -44,7 +48,8 @@ import ( triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger" triggerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger" eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" - serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount" + + serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" @@ -65,7 +70,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf brokerInformer := brokerinformer.Get(ctx) triggerInformer := triggerinformer.Get(ctx) triggerLister := triggerInformer.Lister() - serviceaccountInformer := serviceaccountinformer.Get(ctx) + oidcServiceaccountInformer := serviceaccountinformer.Get(ctx, auth.OIDCLabelSelector) clientPool := clientpool.Get(ctx) @@ -95,7 +100,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf GetKafkaClient: clientPool.GetClient, GetKafkaClusterAdmin: clientPool.GetClusterAdmin, InitOffsetsFunc: offset.InitOffsets, - ServiceAccountLister: serviceaccountInformer.Lister(), + ServiceAccountLister: oidcServiceaccountInformer.Lister(), } impl := triggerreconciler.NewImpl(ctx, reconciler, func(impl *controller.Impl) controller.Options { @@ -153,8 +158,8 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf secretinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(reconciler.Tracker.OnChanged)) // Reconciler Trigger when the OIDC service account changes - serviceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: controller.FilterController(&eventing.Trigger{}), + oidcServiceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: filterOIDCServiceAccounts(triggerInformer.Lister(), brokerInformer.Lister(), kafka.BrokerClass, FinalizerName), Handler: controller.HandleAll(impl.EnqueueControllerOf), }) @@ -182,6 +187,34 @@ func filterTriggers(lister eventinglisters.BrokerLister, brokerClass string, fin } } +// filterOIDCServiceAccounts returns a function that returns true if the resource passed +// is a service account, which is owned by a trigger pointing to a the given broker class. +func filterOIDCServiceAccounts(triggerLister eventinglisters.TriggerLister, brokerLister eventinglisters.BrokerLister, brokerClass string, finalizer string) func(interface{}) bool { + return func(obj interface{}) bool { + controlledByTrigger := controller.FilterController(&eventing.Trigger{})(obj) + if !controlledByTrigger { + return false + } + + sa, ok := obj.(*corev1.ServiceAccount) + if !ok { + return false + } + + owner := metav1.GetControllerOf(sa) + if owner == nil { + return false + } + + trigger, err := triggerLister.Triggers(sa.Namespace).Get(owner.Name) + if err != nil { + return false + } + + return filterTriggers(brokerLister, brokerClass, finalizer)(trigger) + } +} + func hasKafkaBrokerTriggerFinalizer(finalizers []string, finalizerName string) bool { for _, f := range finalizers { if f == finalizerName { diff --git a/control-plane/pkg/reconciler/trigger/controller_test.go b/control-plane/pkg/reconciler/trigger/controller_test.go index a5abc59f24..80ed0165ff 100644 --- a/control-plane/pkg/reconciler/trigger/controller_test.go +++ b/control-plane/pkg/reconciler/trigger/controller_test.go @@ -17,8 +17,15 @@ package trigger import ( + "context" "testing" + triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger" + "knative.dev/pkg/ptr" + + "knative.dev/eventing/pkg/auth" + filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -27,7 +34,8 @@ import ( _ "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/secret/fake" - _ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake" + _ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered/fake" + _ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake" "knative.dev/pkg/configmap" reconcilertesting "knative.dev/pkg/reconciler/testing" @@ -42,8 +50,7 @@ import ( ) func TestNewController(t *testing.T) { - ctx, _ := reconcilertesting.SetupFakeContext(t) - + ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector) ctx = clientpool.WithKafkaClientPool(ctx) controller := NewController(ctx, configmap.NewStaticWatcher(&corev1.ConfigMap{ @@ -60,8 +67,13 @@ func TestNewController(t *testing.T) { } } +func SetUpInformerSelector(ctx context.Context) context.Context { + ctx = filteredFactory.WithSelectors(ctx, auth.OIDCLabelSelector) + return ctx +} + func TestFilterTriggers(t *testing.T) { - ctx, _ := reconcilertesting.SetupFakeContext(t) + ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector) tt := []struct { name string @@ -184,3 +196,166 @@ func TestFilterTriggers(t *testing.T) { }) } } + +func TestFilterOIDCServiceAccounts(t *testing.T) { + ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector) + + tt := []struct { + name string + sa *corev1.ServiceAccount + trigger *eventing.Trigger + brokers []*eventing.Broker + pass bool + }{{ + name: "matching owner reference", + sa: &corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "sa", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: eventing.SchemeGroupVersion.String(), + Kind: "Trigger", + Name: "tr", + Controller: ptr.Bool(true), + }, + }, + }, + }, + trigger: &eventing.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "tr", + Finalizers: []string{FinalizerName}, + }, + Spec: eventing.TriggerSpec{ + Broker: "br", + }, + }, + brokers: []*eventing.Broker{{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "br", + Annotations: map[string]string{ + eventing.BrokerClassAnnotationKey: kafka.BrokerClass, + }, + }, + }}, + pass: true, + }, { + name: "references trigger for wrong broker class", + sa: &corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "sa", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: eventing.SchemeGroupVersion.String(), + Kind: "Trigger", + Name: "tr", + Controller: ptr.Bool(true), + }, + }, + }, + }, + trigger: &eventing.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "tr", + }, + Spec: eventing.TriggerSpec{ + Broker: "br", + }, + }, + brokers: []*eventing.Broker{{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "br", + Annotations: map[string]string{ + eventing.BrokerClassAnnotationKey: "another-broker-class", + }, + }, + }}, + pass: false, + }, { + name: "references trigger with correct finalizer", + sa: &corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "sa", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: eventing.SchemeGroupVersion.String(), + Kind: "Trigger", + Name: "tr", + Controller: ptr.Bool(true), + }, + }, + }, + }, + trigger: &eventing.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "tr", + Finalizers: []string{FinalizerName}, + }, + Spec: eventing.TriggerSpec{ + Broker: "br", + }, + }, + brokers: []*eventing.Broker{{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "br", + }, + }}, + pass: true, + }, { + name: "no owner reference", + sa: &corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "sa", + }, + }, + trigger: &eventing.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "tr", + Finalizers: []string{FinalizerName}, + }, + Spec: eventing.TriggerSpec{ + Broker: "br", + }, + }, + brokers: []*eventing.Broker{{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "br", + Annotations: map[string]string{ + eventing.BrokerClassAnnotationKey: kafka.BrokerClass, + }, + }, + }}, + pass: false, + }} + + for _, tc := range tt { + tc := tc + t.Run(tc.name, func(t *testing.T) { + brokerInformer := brokerinformer.Get(ctx) + for _, obj := range tc.brokers { + err := brokerInformer.Informer().GetStore().Add(obj) + assert.NoError(t, err) + } + + triggerInformer := triggerinformer.Get(ctx) + err := triggerInformer.Informer().GetStore().Add(tc.trigger) + assert.NoError(t, err) + + filter := filterOIDCServiceAccounts(triggerInformer.Lister(), brokerInformer.Lister(), kafka.BrokerClass, FinalizerName) + pass := filter(tc.sa) + assert.Equal(t, tc.pass, pass) + }) + } +} diff --git a/control-plane/pkg/reconciler/trigger/namespaced_controller.go b/control-plane/pkg/reconciler/trigger/namespaced_controller.go index c8f622c8ab..e021784959 100644 --- a/control-plane/pkg/reconciler/trigger/namespaced_controller.go +++ b/control-plane/pkg/reconciler/trigger/namespaced_controller.go @@ -24,17 +24,19 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset" "k8s.io/client-go/tools/cache" + "knative.dev/eventing/pkg/auth" kubeclient "knative.dev/pkg/client/injection/kube/client" configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap" podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret" - serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount" + + serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered" + "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/logging" "knative.dev/pkg/resolver" - eventing "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/apis/feature" eventingclient "knative.dev/eventing/pkg/client/injection/client" brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker" @@ -60,7 +62,7 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con brokerInformer := brokerinformer.Get(ctx) triggerInformer := triggerinformer.Get(ctx) triggerLister := triggerInformer.Lister() - serviceaccountInformer := serviceaccountinformer.Get(ctx) + oidcServiceaccountInformer := serviceaccountinformer.Get(ctx, auth.OIDCLabelSelector) clientPool := clientpool.Get(ctx) @@ -82,7 +84,7 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con }, BrokerLister: brokerInformer.Lister(), ConfigMapLister: configmapInformer.Lister(), - ServiceAccountLister: serviceaccountInformer.Lister(), + ServiceAccountLister: oidcServiceaccountInformer.Lister(), EventingClient: eventingclient.Get(ctx), Env: configs, GetKafkaClient: clientPool.GetClient, @@ -150,8 +152,8 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con secretinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(reconciler.Tracker.OnChanged)) // Reconciler Trigger when the OIDC service account changes - serviceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: controller.FilterController(&eventing.Trigger{}), + oidcServiceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: filterOIDCServiceAccounts(triggerInformer.Lister(), brokerInformer.Lister(), kafka.NamespacedBrokerClass, FinalizerName), Handler: controller.HandleAll(impl.EnqueueControllerOf), }) diff --git a/control-plane/pkg/reconciler/trigger/namespaced_controller_test.go b/control-plane/pkg/reconciler/trigger/namespaced_controller_test.go index 7789fa96c0..97adbaacda 100644 --- a/control-plane/pkg/reconciler/trigger/namespaced_controller_test.go +++ b/control-plane/pkg/reconciler/trigger/namespaced_controller_test.go @@ -29,6 +29,9 @@ import ( "knative.dev/pkg/configmap" reconcilertesting "knative.dev/pkg/reconciler/testing" + _ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered/fake" + _ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake" + _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake" _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake" @@ -37,7 +40,7 @@ import ( ) func TestNewNamespacedController(t *testing.T) { - ctx, _ := reconcilertesting.SetupFakeContext(t) + ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector) ctx = clientpool.WithKafkaClientPool(ctx) diff --git a/control-plane/pkg/reconciler/trigger/v2/controllerv2.go b/control-plane/pkg/reconciler/trigger/v2/controllerv2.go index 75190796c2..a76be1ccdb 100644 --- a/control-plane/pkg/reconciler/trigger/v2/controllerv2.go +++ b/control-plane/pkg/reconciler/trigger/v2/controllerv2.go @@ -19,6 +19,9 @@ package v2 import ( "context" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "go.uber.org/zap" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" @@ -127,7 +130,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf // Reconciler Trigger when the OIDC service account changes oidcServiceAccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: controller.FilterController(&eventing.Trigger{}), + FilterFunc: filterOIDCServiceAccounts(triggerInformer.Lister(), brokerInformer.Lister()), Handler: controller.HandleAll(impl.EnqueueControllerOf), }) @@ -155,6 +158,34 @@ func filterTriggers(lister eventinglisters.BrokerLister) func(interface{}) bool } } +// filterOIDCServiceAccounts returns a function that returns true if the resource passed +// is a service account, which is owned by a trigger pointing to a the given broker class. +func filterOIDCServiceAccounts(triggerLister eventinglisters.TriggerLister, brokerLister eventinglisters.BrokerLister) func(interface{}) bool { + return func(obj interface{}) bool { + controlledByTrigger := controller.FilterController(&eventing.Trigger{})(obj) + if !controlledByTrigger { + return false + } + + sa, ok := obj.(*corev1.ServiceAccount) + if !ok { + return false + } + + owner := metav1.GetControllerOf(sa) + if owner == nil { + return false + } + + trigger, err := triggerLister.Triggers(sa.Namespace).Get(owner.Name) + if err != nil { + return false + } + + return filterTriggers(brokerLister)(trigger) + } +} + func hasKafkaBrokerTriggerFinalizer(finalizers []string, finalizerName string) bool { for _, f := range finalizers { if f == finalizerName { diff --git a/test/e2e_new/trigger_finalizer_test.go b/test/e2e_new/trigger_finalizer_test.go index 512ae79f41..bd5d5bf716 100644 --- a/test/e2e_new/trigger_finalizer_test.go +++ b/test/e2e_new/trigger_finalizer_test.go @@ -36,8 +36,6 @@ import ( "knative.dev/reconciler-test/pkg/k8s" "knative.dev/reconciler-test/pkg/knative" "knative.dev/reconciler-test/pkg/resources/service" - - triggerreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger" ) func TestTriggerNoFinalizer(t *testing.T) { @@ -101,7 +99,7 @@ func hasNoKafkaBrokerFinalizer() func(ctx context.Context, t feature.T) { time.Sleep(time.Second * 20) // "eventually" tr := triggerfeatures.GetTrigger(ctx, t) for _, f := range tr.Finalizers { - require.NotEqual(t, f, triggerreconciler.FinalizerName, "%+v", tr) + require.NotEqual(t, f, "kafka.triggers.eventing.knative.dev", "%+v", tr) } } }