Skip to content

Commit

Permalink
Reconcile trigger on OIDC service account changes only, if SA referen…
Browse files Browse the repository at this point in the history
…ces a trigger for correct broker class
  • Loading branch information
creydr committed May 2, 2024
1 parent 5b5b633 commit 3e9129d
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 3 deletions.
33 changes: 31 additions & 2 deletions control-plane/pkg/reconciler/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package trigger

import (
"context"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -156,9 +158,8 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
secretinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(reconciler.Tracker.OnChanged))

// Reconciler Trigger when the OIDC service account changes
// serviceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
oidcServiceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&eventing.Trigger{}),
FilterFunc: filterOIDCServiceAccounts(triggerInformer.Lister(), brokerInformer.Lister(), kafka.BrokerClass, FinalizerName),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

Expand Down Expand Up @@ -186,6 +187,34 @@ func filterTriggers(lister eventinglisters.BrokerLister, brokerClass string, fin
}
}

// filterOIDCServiceAccounts returns a function that returns true if the resource passed
// is a service account, which is owned by a trigger pointing to a the given broker class.
func filterOIDCServiceAccounts(triggerLister eventinglisters.TriggerLister, brokerLister eventinglisters.BrokerLister, brokerClass string, finalizer string) func(interface{}) bool {
return func(obj interface{}) bool {
controlledByTrigger := controller.FilterController(&eventing.Trigger{})(obj)
if !controlledByTrigger {
return false
}

sa, ok := obj.(*corev1.ServiceAccount)
if !ok {
return false
}

owner := metav1.GetControllerOf(sa)
if owner == nil {
return false
}

trigger, err := triggerLister.Triggers(sa.Namespace).Get(owner.Name)
if err != nil {
return false
}

return filterTriggers(brokerLister, brokerClass, finalizer)(trigger)
}
}

func hasKafkaBrokerTriggerFinalizer(finalizers []string, finalizerName string) bool {
for _, f := range finalizers {
if f == finalizerName {
Expand Down
165 changes: 165 additions & 0 deletions control-plane/pkg/reconciler/trigger/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package trigger

import (
"context"
triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
"knative.dev/pkg/ptr"
"testing"

"knative.dev/eventing/pkg/auth"
Expand Down Expand Up @@ -194,3 +196,166 @@ func TestFilterTriggers(t *testing.T) {
})
}
}

func TestFilterOIDCServiceAccounts(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector)

tt := []struct {
name string
sa *corev1.ServiceAccount
trigger *eventing.Trigger
brokers []*eventing.Broker
pass bool
}{{
name: "matching owner reference",
sa: &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "sa",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: eventing.SchemeGroupVersion.String(),
Kind: "Trigger",
Name: "tr",
Controller: ptr.Bool(true),
},
},
},
},
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
Finalizers: []string{FinalizerName},
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
Annotations: map[string]string{
eventing.BrokerClassAnnotationKey: kafka.BrokerClass,
},
},
}},
pass: true,
}, {
name: "references trigger for wrong broker class",
sa: &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "sa",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: eventing.SchemeGroupVersion.String(),
Kind: "Trigger",
Name: "tr",
Controller: ptr.Bool(true),
},
},
},
},
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
Annotations: map[string]string{
eventing.BrokerClassAnnotationKey: "another-broker-class",
},
},
}},
pass: false,
}, {
name: "references trigger with correct finalizer",
sa: &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "sa",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: eventing.SchemeGroupVersion.String(),
Kind: "Trigger",
Name: "tr",
Controller: ptr.Bool(true),
},
},
},
},
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
Finalizers: []string{FinalizerName},
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
},
}},
pass: true,
}, {
name: "no owner reference",
sa: &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "sa",
},
},
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
Finalizers: []string{FinalizerName},
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
Annotations: map[string]string{
eventing.BrokerClassAnnotationKey: kafka.BrokerClass,
},
},
}},
pass: false,
}}

for _, tc := range tt {
tc := tc
t.Run(tc.name, func(t *testing.T) {
brokerInformer := brokerinformer.Get(ctx)
for _, obj := range tc.brokers {
err := brokerInformer.Informer().GetStore().Add(obj)
assert.NoError(t, err)
}

triggerInformer := triggerinformer.Get(ctx)
err := triggerInformer.Informer().GetStore().Add(tc.trigger)
assert.NoError(t, err)

filter := filterOIDCServiceAccounts(triggerInformer.Lister(), brokerInformer.Lister(), kafka.BrokerClass, FinalizerName)
pass := filter(tc.sa)
assert.Equal(t, tc.pass, pass)
})
}
}
32 changes: 31 additions & 1 deletion control-plane/pkg/reconciler/trigger/v2/controllerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package v2

import (
"context"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -138,7 +140,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf

// Reconciler Trigger when the OIDC service account changes
oidcServiceAccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&eventing.Trigger{}),
FilterFunc: filterOIDCServiceAccounts(triggerInformer.Lister(), brokerInformer.Lister()),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

Expand Down Expand Up @@ -166,6 +168,34 @@ func filterTriggers(lister eventinglisters.BrokerLister) func(interface{}) bool
}
}

// filterOIDCServiceAccounts returns a function that returns true if the resource passed
// is a service account, which is owned by a trigger pointing to a the given broker class.
func filterOIDCServiceAccounts(triggerLister eventinglisters.TriggerLister, brokerLister eventinglisters.BrokerLister) func(interface{}) bool {
return func(obj interface{}) bool {
controlledByTrigger := controller.FilterController(&eventing.Trigger{})(obj)
if !controlledByTrigger {
return false
}

sa, ok := obj.(*corev1.ServiceAccount)
if !ok {
return false
}

owner := metav1.GetControllerOf(sa)
if owner == nil {
return false
}

trigger, err := triggerLister.Triggers(sa.Namespace).Get(owner.Name)
if err != nil {
return false
}

return filterTriggers(brokerLister)(trigger)
}
}

func hasKafkaBrokerTriggerFinalizer(finalizers []string, finalizerName string) bool {
for _, f := range finalizers {
if f == finalizerName {
Expand Down

0 comments on commit 3e9129d

Please sign in to comment.