Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
wilkermichael authored Feb 5, 2024
2 parents c847499 + 6efb3cc commit d2c7780
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 79 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ jobs:
path: coverage.out

- name: Upload code coverage information to codecov.io
uses: codecov/codecov-action@v3.1.5
uses: codecov/codecov-action@v4.0.1
with:
file: coverage.out

Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,6 @@ You can reach the Argo Rollouts community and developers via the following chann
* [How Scalable is Argo-Rollouts: A Cloud Operator’s Perspective](https://www.youtube.com/watch?v=rCEhxJ2NSTI)
* [Minimize Impact in Kubernetes Using Argo Rollouts](https://medium.com/@arielsimhon/minimize-impact-in-kubernetes-using-argo-rollouts-992fb9519969)
* [Progressive Application Delivery with GitOps on Red Hat OpenShift](https://www.youtube.com/watch?v=DfeL7cdTx4c)
* [Progressive delivery for Kubernetes Config Maps using Argo Rollouts](https://codefresh.io/blog/progressive-delivery-for-kubernetes-config-maps-using-argo-rollouts/)
* [Multi-Service Progressive Delivery with Argo Rollouts](https://codefresh.io/blog/multi-service-progressive-delivery-with-argo-rollouts/)
* [Progressive Delivery for Stateful Services Using Argo Rollouts](https://codefresh.io/blog/progressive-delivery-for-stateful-services-using-argo-rollouts/)
113 changes: 78 additions & 35 deletions cmd/rollouts-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/argoproj/argo-rollouts/utils/record"

"github.com/argoproj/pkg/kubeclientmetrics"
smiclientset "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -42,8 +41,12 @@ const (
cliName = "argo-rollouts"
jsonFormat = "json"
textFormat = "text"

controllerAnalysis = "analysis"
)

var supportedControllers = map[string]bool{controllerAnalysis: true}

func newCommand() *cobra.Command {
var (
clientConfig clientcmd.ClientConfig
Expand Down Expand Up @@ -72,6 +75,7 @@ func newCommand() *cobra.Command {
namespaced bool
printVersion bool
selfServiceNotificationEnabled bool
controllersEnabled []string
)
electOpts := controller.NewLeaderElectionOptions()
var command = cobra.Command{
Expand Down Expand Up @@ -185,41 +189,67 @@ func newCommand() *cobra.Command {
ingressWrapper, err := ingressutil.NewIngressWrapper(mode, kubeClient, kubeInformerFactory)
checkError(err)

cm := controller.NewManager(
namespace,
kubeClient,
argoprojClient,
dynamicClient,
smiClient,
discoveryClient,
kubeInformerFactory.Apps().V1().ReplicaSets(),
kubeInformerFactory.Core().V1().Services(),
ingressWrapper,
jobInformerFactory.Batch().V1().Jobs(),
tolerantinformer.NewTolerantRolloutInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantExperimentInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantAnalysisRunInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantAnalysisTemplateInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantClusterAnalysisTemplateInformer(clusterDynamicInformerFactory),
istioPrimaryDynamicClient,
istioDynamicInformerFactory.ForResource(istioutil.GetIstioVirtualServiceGVR()).Informer(),
istioDynamicInformerFactory.ForResource(istioutil.GetIstioDestinationRuleGVR()).Informer(),
notificationConfigMapInformerFactory,
notificationSecretInformerFactory,
resyncDuration,
instanceID,
metricsPort,
healthzPort,
k8sRequestProvider,
nginxIngressClasses,
albIngressClasses,
dynamicInformerFactory,
clusterDynamicInformerFactory,
istioDynamicInformerFactory,
namespaced,
kubeInformerFactory,
jobInformerFactory)
var cm *controller.Manager

enabledControllers, err := getEnabledControllers(controllersEnabled)
checkError(err)

// currently only supports running analysis controller independently
if enabledControllers[controllerAnalysis] {
log.Info("Running only analysis controller")
cm = controller.NewAnalysisManager(
namespace,
kubeClient,
argoprojClient,
jobInformerFactory.Batch().V1().Jobs(),
tolerantinformer.NewTolerantAnalysisRunInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantAnalysisTemplateInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantClusterAnalysisTemplateInformer(clusterDynamicInformerFactory),
resyncDuration,
metricsPort,
healthzPort,
k8sRequestProvider,
dynamicInformerFactory,
clusterDynamicInformerFactory,
namespaced,
kubeInformerFactory,
jobInformerFactory)
} else {
cm = controller.NewManager(
namespace,
kubeClient,
argoprojClient,
dynamicClient,
smiClient,
discoveryClient,
kubeInformerFactory.Apps().V1().ReplicaSets(),
kubeInformerFactory.Core().V1().Services(),
ingressWrapper,
jobInformerFactory.Batch().V1().Jobs(),
tolerantinformer.NewTolerantRolloutInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantExperimentInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantAnalysisRunInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantAnalysisTemplateInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantClusterAnalysisTemplateInformer(clusterDynamicInformerFactory),
istioPrimaryDynamicClient,
istioDynamicInformerFactory.ForResource(istioutil.GetIstioVirtualServiceGVR()).Informer(),
istioDynamicInformerFactory.ForResource(istioutil.GetIstioDestinationRuleGVR()).Informer(),
notificationConfigMapInformerFactory,
notificationSecretInformerFactory,
resyncDuration,
instanceID,
metricsPort,
healthzPort,
k8sRequestProvider,
nginxIngressClasses,
albIngressClasses,
dynamicInformerFactory,
clusterDynamicInformerFactory,
istioDynamicInformerFactory,
namespaced,
kubeInformerFactory,
jobInformerFactory)
}
if err = cm.Run(ctx, rolloutThreads, serviceThreads, ingressThreads, experimentThreads, analysisThreads, electOpts); err != nil {
log.Fatalf("Error running controller: %s", err.Error())
}
Expand Down Expand Up @@ -262,6 +292,7 @@ func newCommand() *cobra.Command {
command.Flags().DurationVar(&electOpts.LeaderElectionRenewDeadline, "leader-election-renew-deadline", controller.DefaultLeaderElectionRenewDeadline, "The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than or equal to the lease duration. This is only applicable if leader election is enabled.")
command.Flags().DurationVar(&electOpts.LeaderElectionRetryPeriod, "leader-election-retry-period", controller.DefaultLeaderElectionRetryPeriod, "The duration the clients should wait between attempting acquisition and renewal of a leadership. This is only applicable if leader election is enabled.")
command.Flags().BoolVar(&selfServiceNotificationEnabled, "self-service-notification-enabled", false, "Allows rollouts controller to pull notification config from the namespace that the rollout resource is in. This is useful for self-service notification.")
command.Flags().StringSliceVar(&controllersEnabled, "controllers", nil, "Explicitly specify the list of controllers to run, currently only supports 'analysis', eg. --controller=analysis. Default: all controllers are enabled")
return &command
}

Expand Down Expand Up @@ -315,3 +346,15 @@ func checkError(err error) {
log.Fatal(err)
}
}

func getEnabledControllers(controllersEnabled []string) (map[string]bool, error) {
enabledControllers := make(map[string]bool)
for _, controller := range controllersEnabled {
if supportedControllers[controller] {
enabledControllers[controller] = true
} else {
return nil, fmt.Errorf("unsupported controller: %s", controller)
}
}
return enabledControllers, nil
}
160 changes: 129 additions & 31 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,87 @@ type Manager struct {
notificationSecretInformerFactory kubeinformers.SharedInformerFactory
jobInformerFactory kubeinformers.SharedInformerFactory
istioPrimaryDynamicClient dynamic.Interface

onlyAnalysisMode bool
}

func NewAnalysisManager(
namespace string,
kubeclientset kubernetes.Interface,
argoprojclientset clientset.Interface,
jobInformer batchinformers.JobInformer,
analysisRunInformer informers.AnalysisRunInformer,
analysisTemplateInformer informers.AnalysisTemplateInformer,
clusterAnalysisTemplateInformer informers.ClusterAnalysisTemplateInformer,
resyncPeriod time.Duration,
metricsPort int,
healthzPort int,
k8sRequestProvider *metrics.K8sRequestsCountProvider,
dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory,
clusterDynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory,
namespaced bool,
kubeInformerFactory kubeinformers.SharedInformerFactory,
jobInformerFactory kubeinformers.SharedInformerFactory,
) *Manager {
runtime.Must(rolloutscheme.AddToScheme(scheme.Scheme))
log.Info("Creating event broadcaster")

Check warning on line 189 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L187-L189

Added lines #L187 - L189 were not covered by tests

metricsAddr := fmt.Sprintf(listenAddr, metricsPort)
metricsServer := metrics.NewMetricsServer(metrics.ServerConfig{
Addr: metricsAddr,
RolloutLister: nil,
AnalysisRunLister: analysisRunInformer.Lister(),
AnalysisTemplateLister: analysisTemplateInformer.Lister(),
ClusterAnalysisTemplateLister: clusterAnalysisTemplateInformer.Lister(),
ExperimentLister: nil,
K8SRequestProvider: k8sRequestProvider,
})

Check warning on line 200 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L191-L200

Added lines #L191 - L200 were not covered by tests

healthzServer := NewHealthzServer(fmt.Sprintf(listenAddr, healthzPort))
analysisRunWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "AnalysisRuns")
recorder := record.NewEventRecorder(kubeclientset, metrics.MetricRolloutEventsTotal, metrics.MetricNotificationFailedTotal, metrics.MetricNotificationSuccessTotal, metrics.MetricNotificationSend, nil)
analysisController := analysis.NewController(analysis.ControllerConfig{
KubeClientSet: kubeclientset,
ArgoProjClientset: argoprojclientset,
AnalysisRunInformer: analysisRunInformer,
JobInformer: jobInformer,
ResyncPeriod: resyncPeriod,
AnalysisRunWorkQueue: analysisRunWorkqueue,
MetricsServer: metricsServer,
Recorder: recorder,
})

Check warning on line 214 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L202-L214

Added lines #L202 - L214 were not covered by tests

cm := &Manager{
wg: &sync.WaitGroup{},
metricsServer: metricsServer,
healthzServer: healthzServer,
jobSynced: jobInformer.Informer().HasSynced,
analysisRunSynced: analysisRunInformer.Informer().HasSynced,
analysisTemplateSynced: analysisTemplateInformer.Informer().HasSynced,
clusterAnalysisTemplateSynced: clusterAnalysisTemplateInformer.Informer().HasSynced,
analysisRunWorkqueue: analysisRunWorkqueue,
analysisController: analysisController,
namespace: namespace,
kubeClientSet: kubeclientset,
dynamicInformerFactory: dynamicInformerFactory,
clusterDynamicInformerFactory: clusterDynamicInformerFactory,
namespaced: namespaced,
kubeInformerFactory: kubeInformerFactory,
jobInformerFactory: jobInformerFactory,
onlyAnalysisMode: true,

Check warning on line 233 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L216-L233

Added lines #L216 - L233 were not covered by tests
}

_, err := rolloutsConfig.InitializeConfig(kubeclientset, defaults.DefaultRolloutsConfigMapName)
if err != nil {
log.Fatalf("Failed to init config: %v", err)

Check warning on line 238 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L236-L238

Added lines #L236 - L238 were not covered by tests
}

err = plugin.DownloadPlugins(plugin.FileDownloaderImpl{})
if err != nil {
log.Fatalf("Failed to download plugins: %v", err)

Check warning on line 243 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L241-L243

Added lines #L241 - L243 were not covered by tests
}

return cm

Check warning on line 246 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L246

Added line #L246 was not covered by tests
}

// NewManager returns a new manager to manage all the controllers
Expand Down Expand Up @@ -441,11 +522,13 @@ func (c *Manager) Run(ctx context.Context, rolloutThreadiness, serviceThreadines
log.Info("Shutting down workers")
goPlugin.CleanupClients()

c.serviceWorkqueue.ShutDownWithDrain()
c.ingressWorkqueue.ShutDownWithDrain()
c.rolloutWorkqueue.ShutDownWithDrain()
c.experimentWorkqueue.ShutDownWithDrain()
c.analysisRunWorkqueue.ShutDownWithDrain()
if !c.onlyAnalysisMode {
c.serviceWorkqueue.ShutDownWithDrain()
c.ingressWorkqueue.ShutDownWithDrain()
c.rolloutWorkqueue.ShutDownWithDrain()
c.experimentWorkqueue.ShutDownWithDrain()
}

c.analysisRunWorkqueue.ShutDownWithDrain()

ctxWithTimeout, cancel := context.WithTimeout(ctx, 5*time.Second) // give max of 10 seconds for http servers to shut down
Expand All @@ -463,12 +546,6 @@ func (c *Manager) startLeading(ctx context.Context, rolloutThreadiness, serviceT
// Start the informer factories to begin populating the informer caches
log.Info("Starting Controllers")

c.notificationConfigMapInformerFactory.Start(ctx.Done())
c.notificationSecretInformerFactory.Start(ctx.Done())
if ok := cache.WaitForCacheSync(ctx.Done(), c.configMapSynced, c.secretSynced); !ok {
log.Fatalf("failed to wait for configmap/secret caches to sync, exiting")
}

// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
c.dynamicInformerFactory.Start(ctx.Done())
Expand All @@ -479,29 +556,50 @@ func (c *Manager) startLeading(ctx context.Context, rolloutThreadiness, serviceT

c.jobInformerFactory.Start(ctx.Done())

// Check if Istio installed on cluster before starting dynamicInformerFactory
if istioutil.DoesIstioExist(c.istioPrimaryDynamicClient, c.namespace) {
c.istioDynamicInformerFactory.Start(ctx.Done())
}
if c.onlyAnalysisMode {
log.Info("Waiting for controller's informer caches to sync")
if ok := cache.WaitForCacheSync(ctx.Done(), c.analysisRunSynced, c.analysisTemplateSynced, c.jobSynced); !ok {
log.Fatalf("failed to wait for caches to sync, exiting")

Check warning on line 562 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L560-L562

Added lines #L560 - L562 were not covered by tests
}
// only wait for cluster scoped informers to sync if we are running in cluster-wide mode
if c.namespace == metav1.NamespaceAll {
if ok := cache.WaitForCacheSync(ctx.Done(), c.clusterAnalysisTemplateSynced); !ok {
log.Fatalf("failed to wait for cluster-scoped caches to sync, exiting")

Check warning on line 567 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L565-L567

Added lines #L565 - L567 were not covered by tests
}
}
go wait.Until(func() { c.wg.Add(1); c.analysisController.Run(ctx, analysisThreadiness); c.wg.Done() }, time.Second, ctx.Done())

Check warning on line 570 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L570

Added line #L570 was not covered by tests
} else {

// Wait for the caches to be synced before starting workers
log.Info("Waiting for controller's informer caches to sync")
if ok := cache.WaitForCacheSync(ctx.Done(), c.serviceSynced, c.ingressSynced, c.jobSynced, c.rolloutSynced, c.experimentSynced, c.analysisRunSynced, c.analysisTemplateSynced, c.replicasSetSynced, c.configMapSynced, c.secretSynced); !ok {
log.Fatalf("failed to wait for caches to sync, exiting")
}
// only wait for cluster scoped informers to sync if we are running in cluster-wide mode
if c.namespace == metav1.NamespaceAll {
if ok := cache.WaitForCacheSync(ctx.Done(), c.clusterAnalysisTemplateSynced); !ok {
log.Fatalf("failed to wait for cluster-scoped caches to sync, exiting")
c.notificationConfigMapInformerFactory.Start(ctx.Done())
c.notificationSecretInformerFactory.Start(ctx.Done())
if ok := cache.WaitForCacheSync(ctx.Done(), c.configMapSynced, c.secretSynced); !ok {
log.Fatalf("failed to wait for configmap/secret caches to sync, exiting")

Check warning on line 576 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L576

Added line #L576 was not covered by tests
}
}

go wait.Until(func() { c.wg.Add(1); c.rolloutController.Run(ctx, rolloutThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.serviceController.Run(ctx, serviceThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.ingressController.Run(ctx, ingressThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.experimentController.Run(ctx, experimentThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.analysisController.Run(ctx, analysisThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.notificationsController.Run(rolloutThreadiness, ctx.Done()); c.wg.Done() }, time.Second, ctx.Done())
// Check if Istio installed on cluster before starting dynamicInformerFactory
if istioutil.DoesIstioExist(c.istioPrimaryDynamicClient, c.namespace) {
c.istioDynamicInformerFactory.Start(ctx.Done())
}

// Wait for the caches to be synced before starting workers
log.Info("Waiting for controller's informer caches to sync")
if ok := cache.WaitForCacheSync(ctx.Done(), c.serviceSynced, c.ingressSynced, c.jobSynced, c.rolloutSynced, c.experimentSynced, c.analysisRunSynced, c.analysisTemplateSynced, c.replicasSetSynced, c.configMapSynced, c.secretSynced); !ok {
log.Fatalf("failed to wait for caches to sync, exiting")

Check warning on line 587 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L587

Added line #L587 was not covered by tests
}
// only wait for cluster scoped informers to sync if we are running in cluster-wide mode
if c.namespace == metav1.NamespaceAll {
if ok := cache.WaitForCacheSync(ctx.Done(), c.clusterAnalysisTemplateSynced); !ok {
log.Fatalf("failed to wait for cluster-scoped caches to sync, exiting")

Check warning on line 592 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L592

Added line #L592 was not covered by tests
}
}

go wait.Until(func() { c.wg.Add(1); c.rolloutController.Run(ctx, rolloutThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.serviceController.Run(ctx, serviceThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.ingressController.Run(ctx, ingressThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.experimentController.Run(ctx, experimentThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.analysisController.Run(ctx, analysisThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.notificationsController.Run(rolloutThreadiness, ctx.Done()); c.wg.Done() }, time.Second, ctx.Done())

}
log.Info("Started controller")
}
8 changes: 6 additions & 2 deletions controller/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,13 @@ func NewMetricsServer(cfg ServerConfig) *MetricsServer {

reg := prometheus.NewRegistry()

reg.MustRegister(NewRolloutCollector(cfg.RolloutLister))
if cfg.RolloutLister != nil {
reg.MustRegister(NewRolloutCollector(cfg.RolloutLister))
}
if cfg.ExperimentLister != nil {
reg.MustRegister(NewExperimentCollector(cfg.ExperimentLister))
}
reg.MustRegister(NewAnalysisRunCollector(cfg.AnalysisRunLister, cfg.AnalysisTemplateLister, cfg.ClusterAnalysisTemplateLister))
reg.MustRegister(NewExperimentCollector(cfg.ExperimentLister))
cfg.K8SRequestProvider.MustRegister(reg)
reg.MustRegister(MetricRolloutReconcile)
reg.MustRegister(MetricRolloutReconcileError)
Expand Down
Loading

0 comments on commit d2c7780

Please sign in to comment.