Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable self service notification support #2930

Merged
merged 20 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 54 additions & 32 deletions cmd/rollouts-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"time"

"github.com/argoproj/argo-rollouts/utils/record"

"github.com/argoproj/pkg/kubeclientmetrics"
smiclientset "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -44,31 +46,32 @@ const (

func newCommand() *cobra.Command {
var (
clientConfig clientcmd.ClientConfig
rolloutResyncPeriod int64
logLevel string
logFormat string
klogLevel int
metricsPort int
healthzPort int
instanceID string
qps float32
burst int
rolloutThreads int
experimentThreads int
analysisThreads int
serviceThreads int
ingressThreads int
istioVersion string
trafficSplitVersion string
ambassadorVersion string
ingressVersion string
appmeshCRDVersion string
albIngressClasses []string
nginxIngressClasses []string
awsVerifyTargetGroup bool
namespaced bool
printVersion bool
clientConfig clientcmd.ClientConfig
rolloutResyncPeriod int64
logLevel string
logFormat string
klogLevel int
metricsPort int
healthzPort int
instanceID string
qps float32
burst int
rolloutThreads int
experimentThreads int
analysisThreads int
serviceThreads int
ingressThreads int
istioVersion string
trafficSplitVersion string
ambassadorVersion string
ingressVersion string
appmeshCRDVersion string
albIngressClasses []string
nginxIngressClasses []string
awsVerifyTargetGroup bool
namespaced bool
printVersion bool
selfServiceNotificationEnabled bool
)
electOpts := controller.NewLeaderElectionOptions()
var command = cobra.Command{
Expand Down Expand Up @@ -151,12 +154,31 @@ func newCommand() *cobra.Command {
}
istioDynamicInformerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(istioPrimaryDynamicClient, resyncDuration, namespace, nil)

controllerNamespaceInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(
var notificationConfigNamespace string
if selfServiceNotificationEnabled {
notificationConfigNamespace = metav1.NamespaceAll
} else {
notificationConfigNamespace = defaults.Namespace()
}
notificationSecretInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(
kubeClient,
resyncDuration,
kubeinformers.WithNamespace(defaults.Namespace()))
configMapInformer := controllerNamespaceInformerFactory.Core().V1().ConfigMaps()
secretInformer := controllerNamespaceInformerFactory.Core().V1().Secrets()
kubeinformers.WithNamespace(notificationConfigNamespace),
kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.Kind = "Secrete"
options.FieldSelector = fmt.Sprintf("metadata.name=%s", record.NotificationSecret)
}),
)

notificationConfigMapInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(
kubeClient,
resyncDuration,
kubeinformers.WithNamespace(notificationConfigNamespace),
kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.Kind = "ConfigMap"
options.FieldSelector = fmt.Sprintf("metadata.name=%s", record.NotificationConfigMap)
}),
)

mode, err := ingressutil.DetermineIngressMode(ingressVersion, kubeClient.DiscoveryClient)
checkError(err)
Expand All @@ -182,8 +204,8 @@ func newCommand() *cobra.Command {
istioPrimaryDynamicClient,
istioDynamicInformerFactory.ForResource(istioutil.GetIstioVirtualServiceGVR()).Informer(),
istioDynamicInformerFactory.ForResource(istioutil.GetIstioDestinationRuleGVR()).Informer(),
configMapInformer,
secretInformer,
notificationConfigMapInformerFactory,
notificationSecretInformerFactory,
resyncDuration,
instanceID,
metricsPort,
Expand All @@ -196,7 +218,6 @@ func newCommand() *cobra.Command {
istioDynamicInformerFactory,
namespaced,
kubeInformerFactory,
controllerNamespaceInformerFactory,
jobInformerFactory)

if err = cm.Run(ctx, rolloutThreads, serviceThreads, ingressThreads, experimentThreads, analysisThreads, electOpts); err != nil {
Expand Down Expand Up @@ -240,6 +261,7 @@ func newCommand() *cobra.Command {
command.Flags().DurationVar(&electOpts.LeaderElectionLeaseDuration, "leader-election-lease-duration", controller.DefaultLeaderElectionLeaseDuration, "The duration that non-leader candidates will wait after observing a leadership renewal until attempting to acquire leadership of a led but unrenewed leader slot. This is effectively the maximum duration that a leader can be stopped before it is replaced by another candidate. This is only applicable if leader election is enabled.")
command.Flags().DurationVar(&electOpts.LeaderElectionRenewDeadline, "leader-election-renew-deadline", controller.DefaultLeaderElectionRenewDeadline, "The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than or equal to the lease duration. This is only applicable if leader election is enabled.")
command.Flags().DurationVar(&electOpts.LeaderElectionRetryPeriod, "leader-election-retry-period", controller.DefaultLeaderElectionRetryPeriod, "The duration the clients should wait between attempting acquisition and renewal of a leadership. This is only applicable if leader election is enabled.")
command.Flags().BoolVar(&selfServiceNotificationEnabled, "self-service-notification-enabled", false, "Allows rollouts controller to pull notification config from the namespace that the rollout resource is in. This is useful for self-service notification.")
return &command
}

Expand Down
105 changes: 54 additions & 51 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,15 @@ type Manager struct {

namespace string

dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory
clusterDynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory
istioDynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory
namespaced bool
kubeInformerFactory kubeinformers.SharedInformerFactory
controllerNamespaceInformerFactory kubeinformers.SharedInformerFactory
jobInformerFactory kubeinformers.SharedInformerFactory
istioPrimaryDynamicClient dynamic.Interface
dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory
clusterDynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory
istioDynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory
namespaced bool
kubeInformerFactory kubeinformers.SharedInformerFactory
notificationConfigMapInformerFactory kubeinformers.SharedInformerFactory
notificationSecretInformerFactory kubeinformers.SharedInformerFactory
jobInformerFactory kubeinformers.SharedInformerFactory
istioPrimaryDynamicClient dynamic.Interface
}

// NewManager returns a new manager to manage all the controllers
Expand All @@ -184,8 +185,8 @@ func NewManager(
istioPrimaryDynamicClient dynamic.Interface,
istioVirtualServiceInformer cache.SharedIndexInformer,
istioDestinationRuleInformer cache.SharedIndexInformer,
configMapInformer coreinformers.ConfigMapInformer,
secretInformer coreinformers.SecretInformer,
notificationConfigMapInformerFactory kubeinformers.SharedInformerFactory,
notificationSecretInformerFactory kubeinformers.SharedInformerFactory,
resyncPeriod time.Duration,
instanceID string,
metricsPort int,
Expand All @@ -198,10 +199,8 @@ func NewManager(
istioDynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory,
namespaced bool,
kubeInformerFactory kubeinformers.SharedInformerFactory,
controllerNamespaceInformerFactory kubeinformers.SharedInformerFactory,
jobInformerFactory kubeinformers.SharedInformerFactory,
) *Manager {

runtime.Must(rolloutscheme.AddToScheme(scheme.Scheme))
log.Info("Creating event broadcaster")

Expand All @@ -224,9 +223,9 @@ func NewManager(
ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Ingresses")

refResolver := rollout.NewInformerBasedWorkloadRefResolver(namespace, dynamicclientset, discoveryClient, argoprojclientset, rolloutsInformer.Informer())
apiFactory := notificationapi.NewFactory(record.NewAPIFactorySettings(), defaults.Namespace(), secretInformer.Informer(), configMapInformer.Informer())
apiFactory := notificationapi.NewFactory(record.NewAPIFactorySettings(), defaults.Namespace(), notificationSecretInformerFactory.Core().V1().Secrets().Informer(), notificationConfigMapInformerFactory.Core().V1().ConfigMaps().Informer())
recorder := record.NewEventRecorder(kubeclientset, metrics.MetricRolloutEventsTotal, metrics.MetricNotificationFailedTotal, metrics.MetricNotificationSuccessTotal, metrics.MetricNotificationSend, apiFactory)
notificationsController := notificationcontroller.NewController(dynamicclientset.Resource(v1alpha1.RolloutGVR), rolloutsInformer.Informer(), apiFactory,
notificationsController := notificationcontroller.NewControllerWithNamespaceSupport(dynamicclientset.Resource(v1alpha1.RolloutGVR), rolloutsInformer.Informer(), apiFactory,
notificationcontroller.WithToUnstructured(func(obj metav1.Object) (*unstructured.Unstructured, error) {
data, err := json.Marshal(obj)
if err != nil {
Expand Down Expand Up @@ -320,42 +319,43 @@ func NewManager(
})

cm := &Manager{
wg: &sync.WaitGroup{},
metricsServer: metricsServer,
healthzServer: healthzServer,
rolloutSynced: rolloutsInformer.Informer().HasSynced,
serviceSynced: servicesInformer.Informer().HasSynced,
ingressSynced: ingressWrap.HasSynced,
jobSynced: jobInformer.Informer().HasSynced,
experimentSynced: experimentsInformer.Informer().HasSynced,
analysisRunSynced: analysisRunInformer.Informer().HasSynced,
analysisTemplateSynced: analysisTemplateInformer.Informer().HasSynced,
clusterAnalysisTemplateSynced: clusterAnalysisTemplateInformer.Informer().HasSynced,
replicasSetSynced: replicaSetInformer.Informer().HasSynced,
configMapSynced: configMapInformer.Informer().HasSynced,
secretSynced: secretInformer.Informer().HasSynced,
rolloutWorkqueue: rolloutWorkqueue,
experimentWorkqueue: experimentWorkqueue,
analysisRunWorkqueue: analysisRunWorkqueue,
serviceWorkqueue: serviceWorkqueue,
ingressWorkqueue: ingressWorkqueue,
rolloutController: rolloutController,
serviceController: serviceController,
ingressController: ingressController,
experimentController: experimentController,
analysisController: analysisController,
notificationsController: notificationsController,
refResolver: refResolver,
namespace: namespace,
kubeClientSet: kubeclientset,
dynamicInformerFactory: dynamicInformerFactory,
clusterDynamicInformerFactory: clusterDynamicInformerFactory,
istioDynamicInformerFactory: istioDynamicInformerFactory,
namespaced: namespaced,
kubeInformerFactory: kubeInformerFactory,
controllerNamespaceInformerFactory: controllerNamespaceInformerFactory,
jobInformerFactory: jobInformerFactory,
istioPrimaryDynamicClient: istioPrimaryDynamicClient,
wg: &sync.WaitGroup{},
metricsServer: metricsServer,
healthzServer: healthzServer,
rolloutSynced: rolloutsInformer.Informer().HasSynced,
serviceSynced: servicesInformer.Informer().HasSynced,
ingressSynced: ingressWrap.HasSynced,
jobSynced: jobInformer.Informer().HasSynced,
experimentSynced: experimentsInformer.Informer().HasSynced,
analysisRunSynced: analysisRunInformer.Informer().HasSynced,
analysisTemplateSynced: analysisTemplateInformer.Informer().HasSynced,
clusterAnalysisTemplateSynced: clusterAnalysisTemplateInformer.Informer().HasSynced,
replicasSetSynced: replicaSetInformer.Informer().HasSynced,
configMapSynced: notificationConfigMapInformerFactory.Core().V1().ConfigMaps().Informer().HasSynced,
secretSynced: notificationSecretInformerFactory.Core().V1().Secrets().Informer().HasSynced,
rolloutWorkqueue: rolloutWorkqueue,
experimentWorkqueue: experimentWorkqueue,
analysisRunWorkqueue: analysisRunWorkqueue,
serviceWorkqueue: serviceWorkqueue,
ingressWorkqueue: ingressWorkqueue,
rolloutController: rolloutController,
serviceController: serviceController,
ingressController: ingressController,
experimentController: experimentController,
analysisController: analysisController,
notificationsController: notificationsController,
refResolver: refResolver,
namespace: namespace,
kubeClientSet: kubeclientset,
dynamicInformerFactory: dynamicInformerFactory,
clusterDynamicInformerFactory: clusterDynamicInformerFactory,
istioDynamicInformerFactory: istioDynamicInformerFactory,
namespaced: namespaced,
kubeInformerFactory: kubeInformerFactory,
jobInformerFactory: jobInformerFactory,
istioPrimaryDynamicClient: istioPrimaryDynamicClient,
notificationConfigMapInformerFactory: notificationConfigMapInformerFactory,
notificationSecretInformerFactory: notificationSecretInformerFactory,
}

_, err := rolloutsConfig.InitializeConfig(kubeclientset, defaults.DefaultRolloutsConfigMapName)
Expand Down Expand Up @@ -470,7 +470,10 @@ func (c *Manager) startLeading(ctx context.Context, rolloutThreadiness, serviceT
c.clusterDynamicInformerFactory.Start(ctx.Done())
}
c.kubeInformerFactory.Start(ctx.Done())
c.controllerNamespaceInformerFactory.Start(ctx.Done())

c.notificationConfigMapInformerFactory.Start(ctx.Done())
c.notificationSecretInformerFactory.Start(ctx.Done())

c.jobInformerFactory.Start(ctx.Done())

// Check if Istio installed on cluster before starting dynamicInformerFactory
Expand Down
51 changes: 26 additions & 25 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,27 +67,29 @@ func (f *fixture) newManager(t *testing.T) *Manager {
analysisRunWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "AnalysisRuns")

cm := &Manager{
wg: &sync.WaitGroup{},
healthzServer: NewHealthzServer(fmt.Sprintf(listenAddr, 8080)),
rolloutSynced: alwaysReady,
experimentSynced: alwaysReady,
analysisRunSynced: alwaysReady,
analysisTemplateSynced: alwaysReady,
clusterAnalysisTemplateSynced: alwaysReady,
serviceSynced: alwaysReady,
ingressSynced: alwaysReady,
jobSynced: alwaysReady,
replicasSetSynced: alwaysReady,
configMapSynced: alwaysReady,
secretSynced: alwaysReady,
rolloutWorkqueue: rolloutWorkqueue,
serviceWorkqueue: serviceWorkqueue,
ingressWorkqueue: ingressWorkqueue,
experimentWorkqueue: experimentWorkqueue,
analysisRunWorkqueue: analysisRunWorkqueue,
kubeClientSet: f.kubeclient,
namespace: "",
namespaced: false,
wg: &sync.WaitGroup{},
healthzServer: NewHealthzServer(fmt.Sprintf(listenAddr, 8080)),
rolloutSynced: alwaysReady,
experimentSynced: alwaysReady,
analysisRunSynced: alwaysReady,
analysisTemplateSynced: alwaysReady,
clusterAnalysisTemplateSynced: alwaysReady,
serviceSynced: alwaysReady,
ingressSynced: alwaysReady,
jobSynced: alwaysReady,
replicasSetSynced: alwaysReady,
configMapSynced: alwaysReady,
secretSynced: alwaysReady,
rolloutWorkqueue: rolloutWorkqueue,
serviceWorkqueue: serviceWorkqueue,
ingressWorkqueue: ingressWorkqueue,
experimentWorkqueue: experimentWorkqueue,
analysisRunWorkqueue: analysisRunWorkqueue,
kubeClientSet: f.kubeclient,
namespace: "",
namespaced: false,
notificationSecretInformerFactory: kubeinformers.NewSharedInformerFactoryWithOptions(f.kubeclient, noResyncPeriodFunc()),
notificationConfigMapInformerFactory: kubeinformers.NewSharedInformerFactoryWithOptions(f.kubeclient, noResyncPeriodFunc()),
}

metricsAddr := fmt.Sprintf(listenAddr, 8090)
Expand Down Expand Up @@ -120,7 +122,7 @@ func (f *fixture) newManager(t *testing.T) *Manager {
cm.dynamicInformerFactory = dynamicInformerFactory
cm.clusterDynamicInformerFactory = dynamicInformerFactory
cm.kubeInformerFactory = k8sI
cm.controllerNamespaceInformerFactory = k8sI
//cm.controllerClusterInformerFactory = k8sI
cm.jobInformerFactory = k8sI
cm.istioPrimaryDynamicClient = dynamicClient
cm.istioDynamicInformerFactory = dynamicInformerFactory
Expand Down Expand Up @@ -253,8 +255,8 @@ func TestNewManager(t *testing.T) {
dynamicClient,
istioVirtualServiceInformer,
istioDestinationRuleInformer,
k8sI.Core().V1().ConfigMaps(),
k8sI.Core().V1().Secrets(),
k8sI,
k8sI,
noResyncPeriodFunc(),
"test",
8090,
Expand All @@ -268,7 +270,6 @@ func TestNewManager(t *testing.T) {
false,
nil,
nil,
nil,
)

assert.NotNil(t, cm)
Expand Down
Loading
Loading