Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add maxConcurrentCanaries flag to limit concurrent progressing canaries #1568

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions charts/flagger/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ The following tables lists the configurable parameters of the Flagger chart and
| `podDisruptionBudget.minAvailable` | The minimal number of available replicas that will be set in the PodDisruptionBudget | `1` |
| `noCrossNamespaceRefs` | If `true`, cross namespace references to custom resources will be disabled | `false` |
| `namespace` | When specified, Flagger will restrict itself to watching Canary objects from that namespace | `""` |
| `maxConcurrentCanaries ` | Limits how many canaries can process in parallel. No limit if "0" | `0` |

Specify each parameter using the `--set key=value[,key=value]` argument to `helm upgrade`. For example,

Expand Down
3 changes: 3 additions & 0 deletions charts/flagger/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ spec:
{{- if .Values.noCrossNamespaceRefs }}
- -no-cross-namespace-refs={{ .Values.noCrossNamespaceRefs }}
{{- end }}
{{- if .Values.maxConcurrentCanaries }}
- -max-concurrent-canaries={{ .Values.maxConcurrentCanaries }}
{{- end }}
livenessProbe:
exec:
command:
Expand Down
2 changes: 2 additions & 0 deletions charts/flagger/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ podLabels: {}

noCrossNamespaceRefs: false

maxConcurrentCanaries: 0

#Placeholder to supply additional volumes to the flagger pod
additionalVolumes: {}
# - name: tmpfs
Expand Down
3 changes: 3 additions & 0 deletions cmd/flagger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ var (
kubeconfigServiceMesh string
clusterName string
noCrossNamespaceRefs bool
maxConcurrentCanaries int
)

func init() {
Expand Down Expand Up @@ -121,6 +122,7 @@ func init() {
flag.StringVar(&kubeconfigServiceMesh, "kubeconfig-service-mesh", "", "Path to a kubeconfig for the service mesh control plane cluster.")
flag.StringVar(&clusterName, "cluster-name", "", "Cluster name to be included in alert msgs.")
flag.BoolVar(&noCrossNamespaceRefs, "no-cross-namespace-refs", false, "When set to true, Flagger can only refer to resources in the same namespace.")
flag.IntVar(&maxConcurrentCanaries, "max-concurrent-canaries", 0, "Limit parallel processing canaries. Unlimited if set to 0, which is default")
}

func main() {
Expand Down Expand Up @@ -253,6 +255,7 @@ func main() {
fromEnv("EVENT_WEBHOOK_URL", eventWebhook),
clusterName,
noCrossNamespaceRefs,
maxConcurrentCanaries,
cfg,
)

Expand Down
87 changes: 45 additions & 42 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,28 @@ const controllerAgentName = "flagger"

// Controller is managing the canary objects and schedules canary deployments
type Controller struct {
kubeConfig *rest.Config
kubeClient kubernetes.Interface
flaggerClient clientset.Interface
flaggerInformers Informers
flaggerSynced cache.InformerSynced
flaggerWindow time.Duration
workqueue workqueue.RateLimitingInterface
eventRecorder record.EventRecorder
logger *zap.SugaredLogger
canaries *sync.Map
jobs map[string]CanaryJob
recorder metrics.Recorder
notifier notifier.Interface
canaryFactory *canary.Factory
routerFactory *router.Factory
observerFactory *observers.Factory
meshProvider string
eventWebhook string
clusterName string
noCrossNamespaceRefs bool
kubeConfig *rest.Config
kubeClient kubernetes.Interface
flaggerClient clientset.Interface
flaggerInformers Informers
flaggerSynced cache.InformerSynced
flaggerWindow time.Duration
workqueue workqueue.RateLimitingInterface
eventRecorder record.EventRecorder
logger *zap.SugaredLogger
canaries *sync.Map
jobs map[string]CanaryJob
recorder metrics.Recorder
notifier notifier.Interface
canaryFactory *canary.Factory
routerFactory *router.Factory
observerFactory *observers.Factory
meshProvider string
eventWebhook string
clusterName string
noCrossNamespaceRefs bool
pendingCanaries map[string]bool
maxConcurrentCanaries int
}

type Informers struct {
Expand All @@ -94,6 +96,7 @@ func NewController(
eventWebhook string,
clusterName string,
noCrossNamespaceRefs bool,
maxConcurrentCanaries int,
kubeConfig *rest.Config,
) *Controller {
logger.Debug("Creating event broadcaster")
Expand All @@ -109,26 +112,28 @@ func NewController(
recorder.SetInfo(version, meshProvider)

ctrl := &Controller{
kubeConfig: kubeConfig,
kubeClient: kubeClient,
flaggerClient: flaggerClient,
flaggerInformers: flaggerInformers,
flaggerSynced: flaggerInformers.CanaryInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName),
eventRecorder: eventRecorder,
logger: logger,
canaries: new(sync.Map),
jobs: map[string]CanaryJob{},
flaggerWindow: flaggerWindow,
observerFactory: observerFactory,
recorder: recorder,
notifier: notifier,
canaryFactory: canaryFactory,
routerFactory: routerFactory,
meshProvider: meshProvider,
eventWebhook: eventWebhook,
clusterName: clusterName,
noCrossNamespaceRefs: noCrossNamespaceRefs,
kubeConfig: kubeConfig,
kubeClient: kubeClient,
flaggerClient: flaggerClient,
flaggerInformers: flaggerInformers,
flaggerSynced: flaggerInformers.CanaryInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName),
eventRecorder: eventRecorder,
logger: logger,
canaries: new(sync.Map),
jobs: map[string]CanaryJob{},
flaggerWindow: flaggerWindow,
observerFactory: observerFactory,
recorder: recorder,
notifier: notifier,
canaryFactory: canaryFactory,
routerFactory: routerFactory,
meshProvider: meshProvider,
eventWebhook: eventWebhook,
clusterName: clusterName,
noCrossNamespaceRefs: noCrossNamespaceRefs,
pendingCanaries: map[string]bool{},
maxConcurrentCanaries: maxConcurrentCanaries,
}

flaggerInformers.CanaryInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -237,7 +242,6 @@ func (c *Controller) processNextWorkItem() bool {
c.workqueue.Forget(obj)
return nil
}(obj)

if err != nil {
utilruntime.HandleError(err)
return true
Expand Down Expand Up @@ -307,7 +311,6 @@ func (c *Controller) syncHandler(key string) error {
if err := c.addFinalizer(cd); err != nil {
return fmt.Errorf("unable to add finalizer to canary %s.%s: %w", cd.Name, cd.Namespace, err)
}

}
c.logger.Infof("Synced %s", key)

Expand Down
17 changes: 12 additions & 5 deletions pkg/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (c *Controller) scheduleCanaries() {
for job := range c.jobs {
if _, exists := current[job]; !exists {
c.jobs[job].Stop()
delete(c.pendingCanaries, job)
delete(c.jobs, job)
}
}
Expand Down Expand Up @@ -283,11 +284,22 @@ func (c *Controller) advanceCanary(name string, namespace string) {
return
}

key := fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)

if !shouldAdvance {
delete(c.pendingCanaries, key)
c.recorder.SetStatus(cd, cd.Status.Phase)
return
}

if _, exists := c.pendingCanaries[key]; c.maxConcurrentCanaries > 0 && len(c.pendingCanaries) >= c.maxConcurrentCanaries && !exists {
canaryController.SetStatusPhase(cd, flaggerv1.CanaryPhaseWaiting)
c.recordEventInfof(cd, "waiting with canary %v.%v %v to process, because maximum of concurrent canaries reached", cd.Name, cd.Namespace, cd.UID)
return
}

c.pendingCanaries[key] = true

maxWeight := c.maxWeight(cd)

// check primary status
Expand Down Expand Up @@ -485,7 +497,6 @@ func (c *Controller) advanceCanary(name string, namespace string) {
}
c.runCanary(cd, canaryController, meshRouter, mirrored, canaryWeight, primaryWeight, maxWeight)
}

}

func (c *Controller) runPromotionTrafficShift(canary *flaggerv1.Canary, canaryController canary.Controller,
Expand Down Expand Up @@ -542,7 +553,6 @@ func (c *Controller) runPromotionTrafficShift(canary *flaggerv1.Canary, canaryCo
}

return

}

func (c *Controller) runCanary(canary *flaggerv1.Canary, canaryController canary.Controller,
Expand Down Expand Up @@ -729,7 +739,6 @@ func (c *Controller) runBlueGreen(canary *flaggerv1.Canary, canaryController can
return
}
}

}

func (c *Controller) runAnalysis(canary *flaggerv1.Canary) bool {
Expand Down Expand Up @@ -853,7 +862,6 @@ func (c *Controller) shouldAdvance(canary *flaggerv1.Canary, canaryController ca
}

return newCfg, nil

}

func (c *Controller) checkCanaryStatus(canary *flaggerv1.Canary, canaryController canary.Controller, scalerReconciler canary.ScalerReconciler, shouldAdvance bool) bool {
Expand Down Expand Up @@ -1010,7 +1018,6 @@ func (c *Controller) setPhaseInitializing(cd *flaggerv1.Canary) error {
firstTry = false
return
})

if err != nil {
return fmt.Errorf("failed after retries: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/scheduler_daemonset_fixture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func newDaemonSetFixture(c *flaggerv1.Canary) daemonSetFixture {
recorder: metrics.NewRecorder(controllerAgentName, false),
routerFactory: rf,
notifier: &notifier.NopNotifier{},
pendingCanaries: map[string]bool{},
}
ctrl.flaggerSynced = alwaysReady
ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(c)
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/scheduler_deployment_fixture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func newDeploymentFixture(c *flaggerv1.Canary) fixture {
recorder: metrics.NewRecorder(controllerAgentName, false),
routerFactory: rf,
notifier: &notifier.NopNotifier{},
pendingCanaries: map[string]bool{},
}
ctrl.flaggerSynced = alwaysReady
ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(c)
Expand Down
77 changes: 77 additions & 0 deletions pkg/controller/scheduler_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,83 @@ func TestScheduler_DeploymentPromotion(t *testing.T) {
assert.Equal(t, flaggerv1.CanaryPhaseSucceeded, c.Status.Phase)
}

func TestScheduler_DeploymentMaxConcurrent(t *testing.T) {
mocks := newDeploymentFixture(nil)

secondCanary := newDeploymentTestCanary()
secondCanary.Name = "podinfo2"

mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Create(context.TODO(), secondCanary, metav1.CreateOptions{})
mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(secondCanary)

// initializing
mocks.ctrl.advanceCanary("podinfo", "default")
mocks.ctrl.advanceCanary("podinfo2", "default")

// make primary ready
mocks.makePrimaryReady(t)

// initialized
mocks.ctrl.advanceCanary("podinfo", "default")
mocks.ctrl.advanceCanary("podinfo2", "default")

// update
dep2 := newDeploymentTestDeploymentV2()
_, err := mocks.kubeClient.AppsV1().Deployments("default").Update(context.TODO(), dep2, metav1.UpdateOptions{})
require.NoError(t, err)

// detect pod spec changes
mocks.ctrl.advanceCanary("podinfo", "default")
mocks.ctrl.advanceCanary("podinfo2", "default")

// if no maxConcurrentCanaries is set, all canaries should proceed
c, err := mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo", metav1.GetOptions{})
require.NoError(t, err)
assert.Equal(t, flaggerv1.CanaryPhaseProgressing, c.Status.Phase)

c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo2", metav1.GetOptions{})
require.NoError(t, err)
assert.Equal(t, flaggerv1.CanaryPhaseProgressing, c.Status.Phase)

// delete second canary and set maxConcurrency. Then add it again
delete(mocks.ctrl.pendingCanaries, "podinfo2.default")
mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Delete(secondCanary)
mocks.ctrl.maxConcurrentCanaries = 1
mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(secondCanary)

mocks.ctrl.advanceCanary("podinfo2", "default")
mocks.ctrl.advanceCanary("podinfo2", "default")
_, err = mocks.kubeClient.AppsV1().Deployments("default").Update(context.TODO(), dep2, metav1.UpdateOptions{})
require.NoError(t, err)

// check if second canary is waiting now
c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo2", metav1.GetOptions{})
mocks.ctrl.advanceCanary("podinfo2", "default")
require.NoError(t, err)
assert.Equal(t, flaggerv1.CanaryPhaseWaiting, c.Status.Phase)

// make first deployment succeeded
mocks.ctrl.advanceCanary("podinfo", "default")
mocks.ctrl.advanceCanary("podinfo", "default")
mocks.ctrl.advanceCanary("podinfo", "default")
mocks.ctrl.advanceCanary("podinfo", "default")
mocks.ctrl.advanceCanary("podinfo", "default")
mocks.ctrl.advanceCanary("podinfo", "default")
mocks.ctrl.advanceCanary("podinfo", "default")
mocks.ctrl.advanceCanary("podinfo", "default")

// after succeeded it should get removed from pendingCanaries
mocks.ctrl.advanceCanary("podinfo", "default")

// second canary should start with next call
mocks.ctrl.advanceCanary("podinfo2", "default")

// check if second canary is starting
c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo2", metav1.GetOptions{})
require.NoError(t, err)
assert.Equal(t, flaggerv1.CanaryPhaseProgressing, c.Status.Phase)
}

func TestScheduler_DeploymentMirroring(t *testing.T) {
mocks := newDeploymentFixture(newDeploymentTestCanaryMirror())

Expand Down