Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use filtered informer to watch OIDC service accounts #3719

Merged
merged 22 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 38 additions & 5 deletions control-plane/pkg/reconciler/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
import (
"context"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/auth"
v1 "knative.dev/eventing/pkg/client/informers/externalversions/eventing/v1"
kubeclient "knative.dev/pkg/client/injection/kube/client"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
Expand All @@ -44,7 +48,8 @@
triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
triggerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger"
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1"
serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount"

serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered"

"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
Expand All @@ -65,7 +70,7 @@
brokerInformer := brokerinformer.Get(ctx)
triggerInformer := triggerinformer.Get(ctx)
triggerLister := triggerInformer.Lister()
serviceaccountInformer := serviceaccountinformer.Get(ctx)
oidcServiceaccountInformer := serviceaccountinformer.Get(ctx, auth.OIDCLabelSelector)

clientPool := clientpool.Get(ctx)

Expand Down Expand Up @@ -95,7 +100,7 @@
GetKafkaClient: clientPool.GetClient,
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
InitOffsetsFunc: offset.InitOffsets,
ServiceAccountLister: serviceaccountInformer.Lister(),
ServiceAccountLister: oidcServiceaccountInformer.Lister(),
}

impl := triggerreconciler.NewImpl(ctx, reconciler, func(impl *controller.Impl) controller.Options {
Expand Down Expand Up @@ -153,8 +158,8 @@
secretinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(reconciler.Tracker.OnChanged))

// Reconciler Trigger when the OIDC service account changes
serviceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&eventing.Trigger{}),
oidcServiceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: filterOIDCServiceAccounts(triggerInformer.Lister(), brokerInformer.Lister(), kafka.BrokerClass, FinalizerName),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

Expand Down Expand Up @@ -182,6 +187,34 @@
}
}

// filterOIDCServiceAccounts returns a function that returns true if the resource passed
// is a service account, which is owned by a trigger pointing to a the given broker class.
func filterOIDCServiceAccounts(triggerLister eventinglisters.TriggerLister, brokerLister eventinglisters.BrokerLister, brokerClass string, finalizer string) func(interface{}) bool {
return func(obj interface{}) bool {
controlledByTrigger := controller.FilterController(&eventing.Trigger{})(obj)
if !controlledByTrigger {
return false
}

sa, ok := obj.(*corev1.ServiceAccount)
if !ok {
return false

Check warning on line 201 in control-plane/pkg/reconciler/trigger/controller.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/trigger/controller.go#L201

Added line #L201 was not covered by tests
}

owner := metav1.GetControllerOf(sa)
if owner == nil {
return false

Check warning on line 206 in control-plane/pkg/reconciler/trigger/controller.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/trigger/controller.go#L206

Added line #L206 was not covered by tests
}

trigger, err := triggerLister.Triggers(sa.Namespace).Get(owner.Name)
if err != nil {
return false

Check warning on line 211 in control-plane/pkg/reconciler/trigger/controller.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/trigger/controller.go#L211

Added line #L211 was not covered by tests
}

return filterTriggers(brokerLister, brokerClass, finalizer)(trigger)
}
}

func hasKafkaBrokerTriggerFinalizer(finalizers []string, finalizerName string) bool {
for _, f := range finalizers {
if f == finalizerName {
Expand Down
183 changes: 179 additions & 4 deletions control-plane/pkg/reconciler/trigger/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@
package trigger

import (
"context"
"testing"

triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
"knative.dev/pkg/ptr"

"knative.dev/eventing/pkg/auth"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -27,7 +34,8 @@ import (
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/secret/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered/fake"
_ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake"
"knative.dev/pkg/configmap"
reconcilertesting "knative.dev/pkg/reconciler/testing"

Expand All @@ -42,8 +50,7 @@ import (
)

func TestNewController(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)

ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector)
ctx = clientpool.WithKafkaClientPool(ctx)

controller := NewController(ctx, configmap.NewStaticWatcher(&corev1.ConfigMap{
Expand All @@ -60,8 +67,13 @@ func TestNewController(t *testing.T) {
}
}

func SetUpInformerSelector(ctx context.Context) context.Context {
ctx = filteredFactory.WithSelectors(ctx, auth.OIDCLabelSelector)
return ctx
}

func TestFilterTriggers(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector)

tt := []struct {
name string
Expand Down Expand Up @@ -184,3 +196,166 @@ func TestFilterTriggers(t *testing.T) {
})
}
}

func TestFilterOIDCServiceAccounts(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector)

tt := []struct {
name string
sa *corev1.ServiceAccount
trigger *eventing.Trigger
brokers []*eventing.Broker
pass bool
}{{
name: "matching owner reference",
sa: &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "sa",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: eventing.SchemeGroupVersion.String(),
Kind: "Trigger",
Name: "tr",
Controller: ptr.Bool(true),
},
},
},
},
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
Finalizers: []string{FinalizerName},
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
Annotations: map[string]string{
eventing.BrokerClassAnnotationKey: kafka.BrokerClass,
},
},
}},
pass: true,
}, {
name: "references trigger for wrong broker class",
sa: &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "sa",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: eventing.SchemeGroupVersion.String(),
Kind: "Trigger",
Name: "tr",
Controller: ptr.Bool(true),
},
},
},
},
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
Annotations: map[string]string{
eventing.BrokerClassAnnotationKey: "another-broker-class",
},
},
}},
pass: false,
}, {
name: "references trigger with correct finalizer",
sa: &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "sa",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: eventing.SchemeGroupVersion.String(),
Kind: "Trigger",
Name: "tr",
Controller: ptr.Bool(true),
},
},
},
},
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
Finalizers: []string{FinalizerName},
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
},
}},
pass: true,
}, {
name: "no owner reference",
sa: &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "sa",
},
},
trigger: &eventing.Trigger{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "tr",
Finalizers: []string{FinalizerName},
},
Spec: eventing.TriggerSpec{
Broker: "br",
},
},
brokers: []*eventing.Broker{{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "br",
Annotations: map[string]string{
eventing.BrokerClassAnnotationKey: kafka.BrokerClass,
},
},
}},
pass: false,
}}

for _, tc := range tt {
tc := tc
t.Run(tc.name, func(t *testing.T) {
brokerInformer := brokerinformer.Get(ctx)
for _, obj := range tc.brokers {
err := brokerInformer.Informer().GetStore().Add(obj)
assert.NoError(t, err)
}

triggerInformer := triggerinformer.Get(ctx)
err := triggerInformer.Informer().GetStore().Add(tc.trigger)
assert.NoError(t, err)

filter := filterOIDCServiceAccounts(triggerInformer.Lister(), brokerInformer.Lister(), kafka.BrokerClass, FinalizerName)
pass := filter(tc.sa)
assert.Equal(t, tc.pass, pass)
})
}
}
14 changes: 8 additions & 6 deletions control-plane/pkg/reconciler/trigger/namespaced_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,19 @@ import (
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset"

"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/auth"
kubeclient "knative.dev/pkg/client/injection/kube/client"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount"

serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered"

"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/resolver"

eventing "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/feature"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
Expand All @@ -60,7 +62,7 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con
brokerInformer := brokerinformer.Get(ctx)
triggerInformer := triggerinformer.Get(ctx)
triggerLister := triggerInformer.Lister()
serviceaccountInformer := serviceaccountinformer.Get(ctx)
oidcServiceaccountInformer := serviceaccountinformer.Get(ctx, auth.OIDCLabelSelector)

clientPool := clientpool.Get(ctx)

Expand All @@ -82,7 +84,7 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con
},
BrokerLister: brokerInformer.Lister(),
ConfigMapLister: configmapInformer.Lister(),
ServiceAccountLister: serviceaccountInformer.Lister(),
ServiceAccountLister: oidcServiceaccountInformer.Lister(),
EventingClient: eventingclient.Get(ctx),
Env: configs,
GetKafkaClient: clientPool.GetClient,
Expand Down Expand Up @@ -150,8 +152,8 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con
secretinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(reconciler.Tracker.OnChanged))

// Reconciler Trigger when the OIDC service account changes
serviceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&eventing.Trigger{}),
oidcServiceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: filterOIDCServiceAccounts(triggerInformer.Lister(), brokerInformer.Lister(), kafka.NamespacedBrokerClass, FinalizerName),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
"knative.dev/pkg/configmap"
reconcilertesting "knative.dev/pkg/reconciler/testing"

_ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered/fake"
_ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake"

_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake"

Expand All @@ -37,7 +40,7 @@ import (
)

func TestNewNamespacedController(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector)

ctx = clientpool.WithKafkaClientPool(ctx)

Expand Down
Loading
Loading