Skip to content

Commit

Permalink
Watch config-features in IMC controller (#7125)
Browse files Browse the repository at this point in the history
This will allow updating IMC addresses based on transport-encryption
feature flag

---------

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi authored Aug 1, 2023
1 parent 63fd638 commit 5e76ff2
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 14 deletions.
33 changes: 24 additions & 9 deletions pkg/reconciler/inmemorychannel/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ import (
"knative.dev/pkg/controller"
"knative.dev/pkg/system"

"knative.dev/pkg/resolver"

"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/client/injection/informers/messaging/v1/inmemorychannel"
inmemorychannelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/inmemorychannel"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/eventing/pkg/reconciler/inmemorychannel/controller/config"
"knative.dev/pkg/resolver"

"knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
"knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints"
Expand Down Expand Up @@ -84,9 +86,22 @@ func NewController(
logger.Panic("unable to process in-memory channel's required environment variables (missing DISPATCHER_IMAGE)")
}

var globalResync func(obj interface{})

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
if globalResync != nil {
globalResync(nil)
}
})
featureStore.WatchConfigs(cmw)

r.dispatcherImage = env.Image

impl := inmemorychannelreconciler.NewImpl(ctx, r)
impl := inmemorychannelreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options {
return controller.Options{
ConfigStore: featureStore,
}
})
r.uriResolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker)

inmemorychannelInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
Expand All @@ -96,33 +111,33 @@ func NewController(
// a global Resync for all the channels to take stock of their health when these change.

// Call GlobalResync on inmemorychannels.
grCh := func(obj interface{}) {
globalResync = func(interface{}) {
impl.GlobalResync(inmemorychannelInformer.Informer())
}

deploymentInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithName(dispatcherName),
Handler: controller.HandleAll(grCh),
Handler: controller.HandleAll(globalResync),
})
serviceInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithName(dispatcherName),
Handler: controller.HandleAll(grCh),
Handler: controller.HandleAll(globalResync),
})
endpointsInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithName(dispatcherName),
Handler: controller.HandleAll(grCh),
Handler: controller.HandleAll(globalResync),
})
serviceAccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithName(dispatcherName),
Handler: controller.HandleAll(grCh),
Handler: controller.HandleAll(globalResync),
})
roleBindingInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithName(dispatcherName),
Handler: controller.HandleAll(grCh),
Handler: controller.HandleAll(globalResync),
})
secretInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithName(eventingtls.IMCDispatcherServerTLSSecretName),
Handler: controller.HandleAll(grCh),
Handler: controller.HandleAll(globalResync),
})

// Setup the watch on the config map of dispatcher config
Expand Down
18 changes: 13 additions & 5 deletions pkg/reconciler/inmemorychannel/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,20 @@ func TestNew(t *testing.T) {
ctx = v1addr.WithDuck(ctx)

os.Setenv("DISPATCHER_IMAGE", "animage")
cmw := configmap.NewStaticWatcher(&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: config.EventDispatcherConfigMap,
Namespace: "knative-eventing",
cmw := configmap.NewStaticWatcher(
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: config.EventDispatcherConfigMap,
Namespace: "knative-eventing",
},
},
})
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-features",
Namespace: "knative-eventing",
},
},
)

secret := types.NamespacedName{
Namespace: system.Namespace(),
Expand Down

0 comments on commit 5e76ff2

Please sign in to comment.