diff --git a/pkg/reconciler/broker/controller.go b/pkg/reconciler/broker/controller.go index d445cc095cf..1e946b8846b 100644 --- a/pkg/reconciler/broker/controller.go +++ b/pkg/reconciler/broker/controller.go @@ -69,7 +69,13 @@ func NewController( configmapInformer := configmapinformer.Get(ctx) secretInformer := secretinformer.Get(ctx) - featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store")) + var globalResync func(obj interface{}) + + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { + if globalResync != nil { + globalResync(nil) + } + }) featureStore.WatchConfigs(cmw) var err error @@ -112,7 +118,7 @@ func NewController( // When the endpoints in our multi-tenant filter/ingress change, do a global resync. // During installation, we might reconcile Brokers before our shared filter/ingress is // ready, so when these endpoints change perform a global resync. - grCb := func(obj interface{}) { + globalResync = func(obj interface{}) { // Since changes in the Filter/Ingress Service endpoints affect all the Broker objects, // do a global resync. logger.Info("Doing a global resync due to endpoint changes in shared broker component") @@ -123,18 +129,18 @@ func NewController( FilterFunc: pkgreconciler.ChainFilterFuncs( pkgreconciler.NamespaceFilterFunc(system.Namespace()), pkgreconciler.NameFilterFunc(names.BrokerFilterName)), - Handler: controller.HandleAll(grCb), + Handler: controller.HandleAll(globalResync), }) // Resync for the ingress. endpointsInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: pkgreconciler.ChainFilterFuncs( pkgreconciler.NamespaceFilterFunc(system.Namespace()), pkgreconciler.NameFilterFunc(names.BrokerIngressName)), - Handler: controller.HandleAll(grCb), + Handler: controller.HandleAll(globalResync), }) secretInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterWithName(ingressServerTLSSecretName), - Handler: controller.HandleAll(grCb), + Handler: controller.HandleAll(globalResync), }) return impl diff --git a/pkg/reconciler/inmemorychannel/controller/controller.go b/pkg/reconciler/inmemorychannel/controller/controller.go index 16af702db1e..e60a5414e7b 100644 --- a/pkg/reconciler/inmemorychannel/controller/controller.go +++ b/pkg/reconciler/inmemorychannel/controller/controller.go @@ -28,11 +28,13 @@ import ( "knative.dev/pkg/controller" "knative.dev/pkg/system" + "knative.dev/pkg/resolver" + + "knative.dev/eventing/pkg/apis/feature" "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/inmemorychannel" inmemorychannelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/inmemorychannel" "knative.dev/eventing/pkg/eventingtls" "knative.dev/eventing/pkg/reconciler/inmemorychannel/controller/config" - "knative.dev/pkg/resolver" "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints" @@ -84,9 +86,22 @@ func NewController( logger.Panic("unable to process in-memory channel's required environment variables (missing DISPATCHER_IMAGE)") } + var globalResync func(obj interface{}) + + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { + if globalResync != nil { + globalResync(nil) + } + }) + featureStore.WatchConfigs(cmw) + r.dispatcherImage = env.Image - impl := inmemorychannelreconciler.NewImpl(ctx, r) + impl := inmemorychannelreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options { + return controller.Options{ + ConfigStore: featureStore, + } + }) r.uriResolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker) inmemorychannelInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) @@ -96,33 +111,33 @@ func NewController( // a global Resync for all the channels to take stock of their health when these change. // Call GlobalResync on inmemorychannels. - grCh := func(obj interface{}) { + globalResync = func(interface{}) { impl.GlobalResync(inmemorychannelInformer.Informer()) } deploymentInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterWithName(dispatcherName), - Handler: controller.HandleAll(grCh), + Handler: controller.HandleAll(globalResync), }) serviceInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterWithName(dispatcherName), - Handler: controller.HandleAll(grCh), + Handler: controller.HandleAll(globalResync), }) endpointsInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterWithName(dispatcherName), - Handler: controller.HandleAll(grCh), + Handler: controller.HandleAll(globalResync), }) serviceAccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterWithName(dispatcherName), - Handler: controller.HandleAll(grCh), + Handler: controller.HandleAll(globalResync), }) roleBindingInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterWithName(dispatcherName), - Handler: controller.HandleAll(grCh), + Handler: controller.HandleAll(globalResync), }) secretInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterWithName(eventingtls.IMCDispatcherServerTLSSecretName), - Handler: controller.HandleAll(grCh), + Handler: controller.HandleAll(globalResync), }) // Setup the watch on the config map of dispatcher config diff --git a/pkg/reconciler/inmemorychannel/controller/controller_test.go b/pkg/reconciler/inmemorychannel/controller/controller_test.go index 39783287d76..f6c60bda7fa 100644 --- a/pkg/reconciler/inmemorychannel/controller/controller_test.go +++ b/pkg/reconciler/inmemorychannel/controller/controller_test.go @@ -48,12 +48,20 @@ func TestNew(t *testing.T) { ctx = v1addr.WithDuck(ctx) os.Setenv("DISPATCHER_IMAGE", "animage") - cmw := configmap.NewStaticWatcher(&corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: config.EventDispatcherConfigMap, - Namespace: "knative-eventing", + cmw := configmap.NewStaticWatcher( + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: config.EventDispatcherConfigMap, + Namespace: "knative-eventing", + }, }, - }) + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-features", + Namespace: "knative-eventing", + }, + }, + ) secret := types.NamespacedName{ Namespace: system.Namespace(),