diff --git a/charts/flagger/README.md b/charts/flagger/README.md index 1528121b3..de1a99eb8 100644 --- a/charts/flagger/README.md +++ b/charts/flagger/README.md @@ -185,6 +185,7 @@ The following tables lists the configurable parameters of the Flagger chart and | `podDisruptionBudget.minAvailable` | The minimal number of available replicas that will be set in the PodDisruptionBudget | `1` | | `noCrossNamespaceRefs` | If `true`, cross namespace references to custom resources will be disabled | `false` | | `namespace` | When specified, Flagger will restrict itself to watching Canary objects from that namespace | `""` | +| `maxConcurrentCanaries ` | Limits how many canaries can process in parallel. No limit if "0" | `0` | Specify each parameter using the `--set key=value[,key=value]` argument to `helm upgrade`. For example, diff --git a/charts/flagger/templates/deployment.yaml b/charts/flagger/templates/deployment.yaml index a41b5f401..c33983322 100644 --- a/charts/flagger/templates/deployment.yaml +++ b/charts/flagger/templates/deployment.yaml @@ -148,6 +148,9 @@ spec: {{- if .Values.noCrossNamespaceRefs }} - -no-cross-namespace-refs={{ .Values.noCrossNamespaceRefs }} {{- end }} + {{- if .Values.maxConcurrentCanaries }} + - -max-concurrent-canaries={{ .Values.maxConcurrentCanaries }} + {{- end }} livenessProbe: exec: command: diff --git a/charts/flagger/values.yaml b/charts/flagger/values.yaml index b9fbeaedc..ae3c58880 100644 --- a/charts/flagger/values.yaml +++ b/charts/flagger/values.yaml @@ -198,6 +198,8 @@ podLabels: {} noCrossNamespaceRefs: false +maxConcurrentCanaries: 0 + #Placeholder to supply additional volumes to the flagger pod additionalVolumes: {} # - name: tmpfs diff --git a/cmd/flagger/main.go b/cmd/flagger/main.go index 66e5c8e8a..93bc031ea 100644 --- a/cmd/flagger/main.go +++ b/cmd/flagger/main.go @@ -86,6 +86,7 @@ var ( kubeconfigServiceMesh string clusterName string noCrossNamespaceRefs bool + maxConcurrentCanaries int ) func init() { @@ -121,6 +122,7 @@ func init() { flag.StringVar(&kubeconfigServiceMesh, "kubeconfig-service-mesh", "", "Path to a kubeconfig for the service mesh control plane cluster.") flag.StringVar(&clusterName, "cluster-name", "", "Cluster name to be included in alert msgs.") flag.BoolVar(&noCrossNamespaceRefs, "no-cross-namespace-refs", false, "When set to true, Flagger can only refer to resources in the same namespace.") + flag.IntVar(&maxConcurrentCanaries, "max-concurrent-canaries", 0, "Limit parallel processing canaries. Unlimited if set to 0, which is default") } func main() { @@ -253,6 +255,7 @@ func main() { fromEnv("EVENT_WEBHOOK_URL", eventWebhook), clusterName, noCrossNamespaceRefs, + maxConcurrentCanaries, ) // leader election context diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 50fb63e28..959bec75f 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -49,25 +49,27 @@ const controllerAgentName = "flagger" // Controller is managing the canary objects and schedules canary deployments type Controller struct { - kubeClient kubernetes.Interface - flaggerClient clientset.Interface - flaggerInformers Informers - flaggerSynced cache.InformerSynced - flaggerWindow time.Duration - workqueue workqueue.RateLimitingInterface - eventRecorder record.EventRecorder - logger *zap.SugaredLogger - canaries *sync.Map - jobs map[string]CanaryJob - recorder metrics.Recorder - notifier notifier.Interface - canaryFactory *canary.Factory - routerFactory *router.Factory - observerFactory *observers.Factory - meshProvider string - eventWebhook string - clusterName string - noCrossNamespaceRefs bool + kubeClient kubernetes.Interface + flaggerClient clientset.Interface + flaggerInformers Informers + flaggerSynced cache.InformerSynced + flaggerWindow time.Duration + workqueue workqueue.RateLimitingInterface + eventRecorder record.EventRecorder + logger *zap.SugaredLogger + canaries *sync.Map + jobs map[string]CanaryJob + recorder metrics.Recorder + notifier notifier.Interface + canaryFactory *canary.Factory + routerFactory *router.Factory + observerFactory *observers.Factory + meshProvider string + eventWebhook string + clusterName string + noCrossNamespaceRefs bool + pendingCanaries map[string]bool + maxConcurrentCanaries int } type Informers struct { @@ -91,6 +93,7 @@ func NewController( eventWebhook string, clusterName string, noCrossNamespaceRefs bool, + maxConcurrentCanaries int, ) *Controller { logger.Debug("Creating event broadcaster") flaggerscheme.AddToScheme(scheme.Scheme) @@ -105,25 +108,27 @@ func NewController( recorder.SetInfo(version, meshProvider) ctrl := &Controller{ - kubeClient: kubeClient, - flaggerClient: flaggerClient, - flaggerInformers: flaggerInformers, - flaggerSynced: flaggerInformers.CanaryInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName), - eventRecorder: eventRecorder, - logger: logger, - canaries: new(sync.Map), - jobs: map[string]CanaryJob{}, - flaggerWindow: flaggerWindow, - observerFactory: observerFactory, - recorder: recorder, - notifier: notifier, - canaryFactory: canaryFactory, - routerFactory: routerFactory, - meshProvider: meshProvider, - eventWebhook: eventWebhook, - clusterName: clusterName, - noCrossNamespaceRefs: noCrossNamespaceRefs, + kubeClient: kubeClient, + flaggerClient: flaggerClient, + flaggerInformers: flaggerInformers, + flaggerSynced: flaggerInformers.CanaryInformer.Informer().HasSynced, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName), + eventRecorder: eventRecorder, + logger: logger, + canaries: new(sync.Map), + jobs: map[string]CanaryJob{}, + flaggerWindow: flaggerWindow, + observerFactory: observerFactory, + recorder: recorder, + notifier: notifier, + canaryFactory: canaryFactory, + routerFactory: routerFactory, + meshProvider: meshProvider, + eventWebhook: eventWebhook, + clusterName: clusterName, + noCrossNamespaceRefs: noCrossNamespaceRefs, + pendingCanaries: map[string]bool{}, + maxConcurrentCanaries: maxConcurrentCanaries, } flaggerInformers.CanaryInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -232,7 +237,6 @@ func (c *Controller) processNextWorkItem() bool { c.workqueue.Forget(obj) return nil }(obj) - if err != nil { utilruntime.HandleError(err) return true @@ -302,7 +306,6 @@ func (c *Controller) syncHandler(key string) error { if err := c.addFinalizer(cd); err != nil { return fmt.Errorf("unable to add finalizer to canary %s.%s: %w", cd.Name, cd.Namespace, err) } - } c.logger.Infof("Synced %s", key) diff --git a/pkg/controller/scheduler.go b/pkg/controller/scheduler.go index 6168b24a6..978856ace 100644 --- a/pkg/controller/scheduler.go +++ b/pkg/controller/scheduler.go @@ -134,6 +134,7 @@ func (c *Controller) scheduleCanaries() { for job := range c.jobs { if _, exists := current[job]; !exists { c.jobs[job].Stop() + delete(c.pendingCanaries, job) delete(c.jobs, job) } } @@ -280,11 +281,22 @@ func (c *Controller) advanceCanary(name string, namespace string) { return } + key := fmt.Sprintf("%s.%s", cd.Name, cd.Namespace) + if !shouldAdvance { + delete(c.pendingCanaries, key) c.recorder.SetStatus(cd, cd.Status.Phase) return } + if _, exists := c.pendingCanaries[key]; c.maxConcurrentCanaries > 0 && len(c.pendingCanaries) >= c.maxConcurrentCanaries && !exists { + canaryController.SetStatusPhase(cd, flaggerv1.CanaryPhaseWaiting) + c.recordEventInfof(cd, "waiting with canary %v.%v %v to process, because maximum of concurrent canaries reached", cd.Name, cd.Namespace, cd.UID) + return + } + + c.pendingCanaries[key] = true + maxWeight := c.maxWeight(cd) // check primary status @@ -476,7 +488,6 @@ func (c *Controller) advanceCanary(name string, namespace string) { } c.runCanary(cd, canaryController, meshRouter, mirrored, canaryWeight, primaryWeight, maxWeight) } - } func (c *Controller) runPromotionTrafficShift(canary *flaggerv1.Canary, canaryController canary.Controller, @@ -533,7 +544,6 @@ func (c *Controller) runPromotionTrafficShift(canary *flaggerv1.Canary, canaryCo } return - } func (c *Controller) runCanary(canary *flaggerv1.Canary, canaryController canary.Controller, @@ -720,7 +730,6 @@ func (c *Controller) runBlueGreen(canary *flaggerv1.Canary, canaryController can return } } - } func (c *Controller) runAnalysis(canary *flaggerv1.Canary) bool { @@ -844,7 +853,6 @@ func (c *Controller) shouldAdvance(canary *flaggerv1.Canary, canaryController ca } return newCfg, nil - } func (c *Controller) checkCanaryStatus(canary *flaggerv1.Canary, canaryController canary.Controller, scalerReconciler canary.ScalerReconciler, shouldAdvance bool) bool { @@ -1001,7 +1009,6 @@ func (c *Controller) setPhaseInitializing(cd *flaggerv1.Canary) error { firstTry = false return }) - if err != nil { return fmt.Errorf("failed after retries: %w", err) } diff --git a/pkg/controller/scheduler_daemonset_fixture_test.go b/pkg/controller/scheduler_daemonset_fixture_test.go index 354874f53..2ccbceeb0 100644 --- a/pkg/controller/scheduler_daemonset_fixture_test.go +++ b/pkg/controller/scheduler_daemonset_fixture_test.go @@ -120,6 +120,7 @@ func newDaemonSetFixture(c *flaggerv1.Canary) daemonSetFixture { recorder: metrics.NewRecorder(controllerAgentName, false), routerFactory: rf, notifier: ¬ifier.NopNotifier{}, + pendingCanaries: map[string]bool{}, } ctrl.flaggerSynced = alwaysReady ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(c) diff --git a/pkg/controller/scheduler_deployment_fixture_test.go b/pkg/controller/scheduler_deployment_fixture_test.go index 4ad0ad6e4..2b5674dd1 100644 --- a/pkg/controller/scheduler_deployment_fixture_test.go +++ b/pkg/controller/scheduler_deployment_fixture_test.go @@ -149,6 +149,7 @@ func newDeploymentFixture(c *flaggerv1.Canary) fixture { recorder: metrics.NewRecorder(controllerAgentName, false), routerFactory: rf, notifier: ¬ifier.NopNotifier{}, + pendingCanaries: map[string]bool{}, } ctrl.flaggerSynced = alwaysReady ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(c) diff --git a/pkg/controller/scheduler_deployment_test.go b/pkg/controller/scheduler_deployment_test.go index 51161ec44..59a644839 100644 --- a/pkg/controller/scheduler_deployment_test.go +++ b/pkg/controller/scheduler_deployment_test.go @@ -394,6 +394,83 @@ func TestScheduler_DeploymentPromotion(t *testing.T) { assert.Equal(t, flaggerv1.CanaryPhaseSucceeded, c.Status.Phase) } +func TestScheduler_DeploymentMaxConcurrent(t *testing.T) { + mocks := newDeploymentFixture(nil) + + secondCanary := newDeploymentTestCanary() + secondCanary.Name = "podinfo2" + + mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Create(context.TODO(), secondCanary, metav1.CreateOptions{}) + mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(secondCanary) + + // initializing + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo2", "default") + + // make primary ready + mocks.makePrimaryReady(t) + + // initialized + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo2", "default") + + // update + dep2 := newDeploymentTestDeploymentV2() + _, err := mocks.kubeClient.AppsV1().Deployments("default").Update(context.TODO(), dep2, metav1.UpdateOptions{}) + require.NoError(t, err) + + // detect pod spec changes + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo2", "default") + + // if no maxConcurrentCanaries is set, all canaries should proceed + c, err := mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo", metav1.GetOptions{}) + require.NoError(t, err) + assert.Equal(t, flaggerv1.CanaryPhaseProgressing, c.Status.Phase) + + c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo2", metav1.GetOptions{}) + require.NoError(t, err) + assert.Equal(t, flaggerv1.CanaryPhaseProgressing, c.Status.Phase) + + // delete second canary and set maxConcurrency. Then add it again + delete(mocks.ctrl.pendingCanaries, "podinfo2.default") + mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Delete(secondCanary) + mocks.ctrl.maxConcurrentCanaries = 1 + mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(secondCanary) + + mocks.ctrl.advanceCanary("podinfo2", "default") + mocks.ctrl.advanceCanary("podinfo2", "default") + _, err = mocks.kubeClient.AppsV1().Deployments("default").Update(context.TODO(), dep2, metav1.UpdateOptions{}) + require.NoError(t, err) + + // check if second canary is waiting now + c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo2", metav1.GetOptions{}) + mocks.ctrl.advanceCanary("podinfo2", "default") + require.NoError(t, err) + assert.Equal(t, flaggerv1.CanaryPhaseWaiting, c.Status.Phase) + + // make first deployment succeeded + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + + // after succeeded it should get removed from pendingCanaries + mocks.ctrl.advanceCanary("podinfo", "default") + + // second canary should start with next call + mocks.ctrl.advanceCanary("podinfo2", "default") + + // check if second canary is starting + c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo2", metav1.GetOptions{}) + require.NoError(t, err) + assert.Equal(t, flaggerv1.CanaryPhaseProgressing, c.Status.Phase) +} + func TestScheduler_DeploymentMirroring(t *testing.T) { mocks := newDeploymentFixture(newDeploymentTestCanaryMirror())