From eedb8e8fcb720cdddf3797929c65b6bdaf9f2cde Mon Sep 17 00:00:00 2001 From: Louis Halbritter Date: Mon, 11 Dec 2023 16:53:57 +0100 Subject: [PATCH] add maxConcurrentCanaries flag to limit concurrent progressing canaries This adds a flag to limit concurrent progessing canaries to avoid high requests of resources at once. The flag will not take effect if set to "0", which is default. Closes #1069 Signed-off-by: Louis Halbritter chore: update Helm default values and README Signed-off-by: Louis Halbritter --- charts/flagger/README.md | 1 + charts/flagger/templates/deployment.yaml | 3 + charts/flagger/values.yaml | 2 + cmd/flagger/main.go | 3 + pkg/controller/controller.go | 87 ++++++++++--------- pkg/controller/scheduler.go | 17 ++-- .../scheduler_daemonset_fixture_test.go | 1 + .../scheduler_deployment_fixture_test.go | 1 + pkg/controller/scheduler_deployment_test.go | 77 ++++++++++++++++ 9 files changed, 145 insertions(+), 47 deletions(-) diff --git a/charts/flagger/README.md b/charts/flagger/README.md index ba74e4837..8a5dc24af 100644 --- a/charts/flagger/README.md +++ b/charts/flagger/README.md @@ -186,6 +186,7 @@ The following tables lists the configurable parameters of the Flagger chart and | `podDisruptionBudget.minAvailable` | The minimal number of available replicas that will be set in the PodDisruptionBudget | `1` | | `noCrossNamespaceRefs` | If `true`, cross namespace references to custom resources will be disabled | `false` | | `namespace` | When specified, Flagger will restrict itself to watching Canary objects from that namespace | `""` | +| `maxConcurrentCanaries ` | Limits how many canaries can process in parallel. No limit if "0" | `0` | Specify each parameter using the `--set key=value[,key=value]` argument to `helm upgrade`. For example, diff --git a/charts/flagger/templates/deployment.yaml b/charts/flagger/templates/deployment.yaml index 0180ec8bf..a0f923394 100644 --- a/charts/flagger/templates/deployment.yaml +++ b/charts/flagger/templates/deployment.yaml @@ -148,6 +148,9 @@ spec: {{- if .Values.noCrossNamespaceRefs }} - -no-cross-namespace-refs={{ .Values.noCrossNamespaceRefs }} {{- end }} + {{- if .Values.maxConcurrentCanaries }} + - -max-concurrent-canaries={{ .Values.maxConcurrentCanaries }} + {{- end }} livenessProbe: exec: command: diff --git a/charts/flagger/values.yaml b/charts/flagger/values.yaml index 52d335097..8d9ba47ab 100644 --- a/charts/flagger/values.yaml +++ b/charts/flagger/values.yaml @@ -199,6 +199,8 @@ podLabels: {} noCrossNamespaceRefs: false +maxConcurrentCanaries: 0 + #Placeholder to supply additional volumes to the flagger pod additionalVolumes: {} # - name: tmpfs diff --git a/cmd/flagger/main.go b/cmd/flagger/main.go index 6438187fd..5dd479cb9 100644 --- a/cmd/flagger/main.go +++ b/cmd/flagger/main.go @@ -86,6 +86,7 @@ var ( kubeconfigServiceMesh string clusterName string noCrossNamespaceRefs bool + maxConcurrentCanaries int ) func init() { @@ -121,6 +122,7 @@ func init() { flag.StringVar(&kubeconfigServiceMesh, "kubeconfig-service-mesh", "", "Path to a kubeconfig for the service mesh control plane cluster.") flag.StringVar(&clusterName, "cluster-name", "", "Cluster name to be included in alert msgs.") flag.BoolVar(&noCrossNamespaceRefs, "no-cross-namespace-refs", false, "When set to true, Flagger can only refer to resources in the same namespace.") + flag.IntVar(&maxConcurrentCanaries, "max-concurrent-canaries", 0, "Limit parallel processing canaries. Unlimited if set to 0, which is default") } func main() { @@ -253,6 +255,7 @@ func main() { fromEnv("EVENT_WEBHOOK_URL", eventWebhook), clusterName, noCrossNamespaceRefs, + maxConcurrentCanaries, cfg, ) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index cabed052a..cbfff562e 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -51,26 +51,28 @@ const controllerAgentName = "flagger" // Controller is managing the canary objects and schedules canary deployments type Controller struct { - kubeConfig *rest.Config - kubeClient kubernetes.Interface - flaggerClient clientset.Interface - flaggerInformers Informers - flaggerSynced cache.InformerSynced - flaggerWindow time.Duration - workqueue workqueue.RateLimitingInterface - eventRecorder record.EventRecorder - logger *zap.SugaredLogger - canaries *sync.Map - jobs map[string]CanaryJob - recorder metrics.Recorder - notifier notifier.Interface - canaryFactory *canary.Factory - routerFactory *router.Factory - observerFactory *observers.Factory - meshProvider string - eventWebhook string - clusterName string - noCrossNamespaceRefs bool + kubeConfig *rest.Config + kubeClient kubernetes.Interface + flaggerClient clientset.Interface + flaggerInformers Informers + flaggerSynced cache.InformerSynced + flaggerWindow time.Duration + workqueue workqueue.RateLimitingInterface + eventRecorder record.EventRecorder + logger *zap.SugaredLogger + canaries *sync.Map + jobs map[string]CanaryJob + recorder metrics.Recorder + notifier notifier.Interface + canaryFactory *canary.Factory + routerFactory *router.Factory + observerFactory *observers.Factory + meshProvider string + eventWebhook string + clusterName string + noCrossNamespaceRefs bool + pendingCanaries map[string]bool + maxConcurrentCanaries int } type Informers struct { @@ -94,6 +96,7 @@ func NewController( eventWebhook string, clusterName string, noCrossNamespaceRefs bool, + maxConcurrentCanaries int, kubeConfig *rest.Config, ) *Controller { logger.Debug("Creating event broadcaster") @@ -109,26 +112,28 @@ func NewController( recorder.SetInfo(version, meshProvider) ctrl := &Controller{ - kubeConfig: kubeConfig, - kubeClient: kubeClient, - flaggerClient: flaggerClient, - flaggerInformers: flaggerInformers, - flaggerSynced: flaggerInformers.CanaryInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName), - eventRecorder: eventRecorder, - logger: logger, - canaries: new(sync.Map), - jobs: map[string]CanaryJob{}, - flaggerWindow: flaggerWindow, - observerFactory: observerFactory, - recorder: recorder, - notifier: notifier, - canaryFactory: canaryFactory, - routerFactory: routerFactory, - meshProvider: meshProvider, - eventWebhook: eventWebhook, - clusterName: clusterName, - noCrossNamespaceRefs: noCrossNamespaceRefs, + kubeConfig: kubeConfig, + kubeClient: kubeClient, + flaggerClient: flaggerClient, + flaggerInformers: flaggerInformers, + flaggerSynced: flaggerInformers.CanaryInformer.Informer().HasSynced, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName), + eventRecorder: eventRecorder, + logger: logger, + canaries: new(sync.Map), + jobs: map[string]CanaryJob{}, + flaggerWindow: flaggerWindow, + observerFactory: observerFactory, + recorder: recorder, + notifier: notifier, + canaryFactory: canaryFactory, + routerFactory: routerFactory, + meshProvider: meshProvider, + eventWebhook: eventWebhook, + clusterName: clusterName, + noCrossNamespaceRefs: noCrossNamespaceRefs, + pendingCanaries: map[string]bool{}, + maxConcurrentCanaries: maxConcurrentCanaries, } flaggerInformers.CanaryInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -237,7 +242,6 @@ func (c *Controller) processNextWorkItem() bool { c.workqueue.Forget(obj) return nil }(obj) - if err != nil { utilruntime.HandleError(err) return true @@ -307,7 +311,6 @@ func (c *Controller) syncHandler(key string) error { if err := c.addFinalizer(cd); err != nil { return fmt.Errorf("unable to add finalizer to canary %s.%s: %w", cd.Name, cd.Namespace, err) } - } c.logger.Infof("Synced %s", key) diff --git a/pkg/controller/scheduler.go b/pkg/controller/scheduler.go index 6a9a2664a..e4bef05e9 100644 --- a/pkg/controller/scheduler.go +++ b/pkg/controller/scheduler.go @@ -134,6 +134,7 @@ func (c *Controller) scheduleCanaries() { for job := range c.jobs { if _, exists := current[job]; !exists { c.jobs[job].Stop() + delete(c.pendingCanaries, job) delete(c.jobs, job) } } @@ -283,11 +284,22 @@ func (c *Controller) advanceCanary(name string, namespace string) { return } + key := fmt.Sprintf("%s.%s", cd.Name, cd.Namespace) + if !shouldAdvance { + delete(c.pendingCanaries, key) c.recorder.SetStatus(cd, cd.Status.Phase) return } + if _, exists := c.pendingCanaries[key]; c.maxConcurrentCanaries > 0 && len(c.pendingCanaries) >= c.maxConcurrentCanaries && !exists { + canaryController.SetStatusPhase(cd, flaggerv1.CanaryPhaseWaiting) + c.recordEventInfof(cd, "waiting with canary %v.%v %v to process, because maximum of concurrent canaries reached", cd.Name, cd.Namespace, cd.UID) + return + } + + c.pendingCanaries[key] = true + maxWeight := c.maxWeight(cd) // check primary status @@ -485,7 +497,6 @@ func (c *Controller) advanceCanary(name string, namespace string) { } c.runCanary(cd, canaryController, meshRouter, mirrored, canaryWeight, primaryWeight, maxWeight) } - } func (c *Controller) runPromotionTrafficShift(canary *flaggerv1.Canary, canaryController canary.Controller, @@ -542,7 +553,6 @@ func (c *Controller) runPromotionTrafficShift(canary *flaggerv1.Canary, canaryCo } return - } func (c *Controller) runCanary(canary *flaggerv1.Canary, canaryController canary.Controller, @@ -729,7 +739,6 @@ func (c *Controller) runBlueGreen(canary *flaggerv1.Canary, canaryController can return } } - } func (c *Controller) runAnalysis(canary *flaggerv1.Canary) bool { @@ -853,7 +862,6 @@ func (c *Controller) shouldAdvance(canary *flaggerv1.Canary, canaryController ca } return newCfg, nil - } func (c *Controller) checkCanaryStatus(canary *flaggerv1.Canary, canaryController canary.Controller, scalerReconciler canary.ScalerReconciler, shouldAdvance bool) bool { @@ -1010,7 +1018,6 @@ func (c *Controller) setPhaseInitializing(cd *flaggerv1.Canary) error { firstTry = false return }) - if err != nil { return fmt.Errorf("failed after retries: %w", err) } diff --git a/pkg/controller/scheduler_daemonset_fixture_test.go b/pkg/controller/scheduler_daemonset_fixture_test.go index af7e39655..f896f1227 100644 --- a/pkg/controller/scheduler_daemonset_fixture_test.go +++ b/pkg/controller/scheduler_daemonset_fixture_test.go @@ -120,6 +120,7 @@ func newDaemonSetFixture(c *flaggerv1.Canary) daemonSetFixture { recorder: metrics.NewRecorder(controllerAgentName, false), routerFactory: rf, notifier: ¬ifier.NopNotifier{}, + pendingCanaries: map[string]bool{}, } ctrl.flaggerSynced = alwaysReady ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(c) diff --git a/pkg/controller/scheduler_deployment_fixture_test.go b/pkg/controller/scheduler_deployment_fixture_test.go index 57357567b..dbb5d6fa5 100644 --- a/pkg/controller/scheduler_deployment_fixture_test.go +++ b/pkg/controller/scheduler_deployment_fixture_test.go @@ -149,6 +149,7 @@ func newDeploymentFixture(c *flaggerv1.Canary) fixture { recorder: metrics.NewRecorder(controllerAgentName, false), routerFactory: rf, notifier: ¬ifier.NopNotifier{}, + pendingCanaries: map[string]bool{}, } ctrl.flaggerSynced = alwaysReady ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(c) diff --git a/pkg/controller/scheduler_deployment_test.go b/pkg/controller/scheduler_deployment_test.go index 51161ec44..59a644839 100644 --- a/pkg/controller/scheduler_deployment_test.go +++ b/pkg/controller/scheduler_deployment_test.go @@ -394,6 +394,83 @@ func TestScheduler_DeploymentPromotion(t *testing.T) { assert.Equal(t, flaggerv1.CanaryPhaseSucceeded, c.Status.Phase) } +func TestScheduler_DeploymentMaxConcurrent(t *testing.T) { + mocks := newDeploymentFixture(nil) + + secondCanary := newDeploymentTestCanary() + secondCanary.Name = "podinfo2" + + mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Create(context.TODO(), secondCanary, metav1.CreateOptions{}) + mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(secondCanary) + + // initializing + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo2", "default") + + // make primary ready + mocks.makePrimaryReady(t) + + // initialized + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo2", "default") + + // update + dep2 := newDeploymentTestDeploymentV2() + _, err := mocks.kubeClient.AppsV1().Deployments("default").Update(context.TODO(), dep2, metav1.UpdateOptions{}) + require.NoError(t, err) + + // detect pod spec changes + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo2", "default") + + // if no maxConcurrentCanaries is set, all canaries should proceed + c, err := mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo", metav1.GetOptions{}) + require.NoError(t, err) + assert.Equal(t, flaggerv1.CanaryPhaseProgressing, c.Status.Phase) + + c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo2", metav1.GetOptions{}) + require.NoError(t, err) + assert.Equal(t, flaggerv1.CanaryPhaseProgressing, c.Status.Phase) + + // delete second canary and set maxConcurrency. Then add it again + delete(mocks.ctrl.pendingCanaries, "podinfo2.default") + mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Delete(secondCanary) + mocks.ctrl.maxConcurrentCanaries = 1 + mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(secondCanary) + + mocks.ctrl.advanceCanary("podinfo2", "default") + mocks.ctrl.advanceCanary("podinfo2", "default") + _, err = mocks.kubeClient.AppsV1().Deployments("default").Update(context.TODO(), dep2, metav1.UpdateOptions{}) + require.NoError(t, err) + + // check if second canary is waiting now + c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo2", metav1.GetOptions{}) + mocks.ctrl.advanceCanary("podinfo2", "default") + require.NoError(t, err) + assert.Equal(t, flaggerv1.CanaryPhaseWaiting, c.Status.Phase) + + // make first deployment succeeded + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + mocks.ctrl.advanceCanary("podinfo", "default") + + // after succeeded it should get removed from pendingCanaries + mocks.ctrl.advanceCanary("podinfo", "default") + + // second canary should start with next call + mocks.ctrl.advanceCanary("podinfo2", "default") + + // check if second canary is starting + c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo2", metav1.GetOptions{}) + require.NoError(t, err) + assert.Equal(t, flaggerv1.CanaryPhaseProgressing, c.Status.Phase) +} + func TestScheduler_DeploymentMirroring(t *testing.T) { mocks := newDeploymentFixture(newDeploymentTestCanaryMirror())