Skip to content

Commit

Permalink
scheduler: fail canary according to progress deadline
Browse files Browse the repository at this point in the history
Modify `canary.IsPrimaryReady()` and `canary.Initialize()` to return a
boolean indicating if the error is retriable. Modify the scheduler to
rollback the analysis and mark the Canary object as failed if the above
two functions or `canary.IsCanaryRead()` returns false along with an
error.

Signed-off-by: Sanskar Jaiswal <[email protected]>
  • Loading branch information
aryan9600 committed Mar 4, 2024
1 parent 1a27295 commit 757d901
Show file tree
Hide file tree
Showing 13 changed files with 63 additions and 53 deletions.
2 changes: 1 addition & 1 deletion pkg/canary/config_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestConfigTracker_ConfigMaps(t *testing.T) {
configMap := newDaemonSetControllerTestConfigMap()
configMapProjected := newDaemonSetControllerTestConfigProjected()

err := mocks.controller.Initialize(mocks.canary)
_, err := mocks.controller.Initialize(mocks.canary)
require.NoError(t, err)

daePrimary, err := mocks.kubeClient.AppsV1().DaemonSets("default").Get(context.TODO(), "podinfo-primary", metav1.GetOptions{})
Expand Down
4 changes: 2 additions & 2 deletions pkg/canary/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import (
)

type Controller interface {
IsPrimaryReady(canary *flaggerv1.Canary) error
IsPrimaryReady(canary *flaggerv1.Canary) (bool, error)
IsCanaryReady(canary *flaggerv1.Canary) (bool, error)
GetMetadata(canary *flaggerv1.Canary) (string, string, map[string]int32, error)
SyncStatus(canary *flaggerv1.Canary, status flaggerv1.CanaryStatus) error
SetStatusFailedChecks(canary *flaggerv1.Canary, val int) error
SetStatusWeight(canary *flaggerv1.Canary, val int) error
SetStatusIterations(canary *flaggerv1.Canary, val int) error
SetStatusPhase(canary *flaggerv1.Canary, phase flaggerv1.CanaryPhase) error
Initialize(canary *flaggerv1.Canary) error
Initialize(canary *flaggerv1.Canary) (bool, error)
Promote(canary *flaggerv1.Canary) error
HasTargetChanged(canary *flaggerv1.Canary) (bool, error)
HaveDependenciesChanged(canary *flaggerv1.Canary) (bool, error)
Expand Down
12 changes: 6 additions & 6 deletions pkg/canary/daemonset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,21 @@ func (c *DaemonSetController) ScaleFromZero(cd *flaggerv1.Canary) error {
}

// Initialize creates the primary DaemonSet if it does not exist.
func (c *DaemonSetController) Initialize(cd *flaggerv1.Canary) (err error) {
err = c.createPrimaryDaemonSet(cd, c.includeLabelPrefix)
func (c *DaemonSetController) Initialize(cd *flaggerv1.Canary) (bool, error) {
err := c.createPrimaryDaemonSet(cd, c.includeLabelPrefix)
if err != nil {
return fmt.Errorf("createPrimaryDaemonSet failed: %w", err)
return true, fmt.Errorf("createPrimaryDaemonSet failed: %w", err)
}

if cd.Status.Phase == "" || cd.Status.Phase == flaggerv1.CanaryPhaseInitializing {
if !cd.SkipAnalysis() {
if err := c.IsPrimaryReady(cd); err != nil {
return fmt.Errorf("%w", err)
if retriable, err := c.IsPrimaryReady(cd); err != nil {
return retriable, fmt.Errorf("%w", err)
}
}
}

return nil
return true, nil
}

// Promote copies the pod spec, secrets and config maps from canary to primary
Expand Down
16 changes: 8 additions & 8 deletions pkg/canary/daemonset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
func TestDaemonSetController_Sync_ConsistentNaming(t *testing.T) {
dc := daemonsetConfigs{name: "podinfo", label: "name", labelValue: "podinfo"}
mocks := newDaemonSetFixture(dc)
err := mocks.controller.Initialize(mocks.canary)
_, err := mocks.controller.Initialize(mocks.canary)
require.NoError(t, err)

daePrimary, err := mocks.kubeClient.AppsV1().DaemonSets("default").Get(context.TODO(), fmt.Sprintf("%s-primary", dc.name), metav1.GetOptions{})
Expand All @@ -56,7 +56,7 @@ func TestDaemonSetController_Sync_ConsistentNaming(t *testing.T) {
func TestDaemonSetController_Sync_InconsistentNaming(t *testing.T) {
dc := daemonsetConfigs{name: "podinfo-service", label: "name", labelValue: "podinfo"}
mocks := newDaemonSetFixture(dc)
err := mocks.controller.Initialize(mocks.canary)
_, err := mocks.controller.Initialize(mocks.canary)
require.NoError(t, err)

daePrimary, err := mocks.kubeClient.AppsV1().DaemonSets("default").Get(context.TODO(), fmt.Sprintf("%s-primary", dc.name), metav1.GetOptions{})
Expand All @@ -75,7 +75,7 @@ func TestDaemonSetController_Sync_InconsistentNaming(t *testing.T) {
func TestDaemonSetController_Promote(t *testing.T) {
dc := daemonsetConfigs{name: "podinfo", label: "name", labelValue: "podinfo"}
mocks := newDaemonSetFixture(dc)
err := mocks.controller.Initialize(mocks.canary)
_, err := mocks.controller.Initialize(mocks.canary)
require.NoError(t, err)

dae2 := newDaemonSetControllerTestPodInfoV2()
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestDaemonSetController_NoConfigTracking(t *testing.T) {
mocks := newDaemonSetFixture(dc)
mocks.controller.configTracker = &NopTracker{}

err := mocks.controller.Initialize(mocks.canary)
_, err := mocks.controller.Initialize(mocks.canary)
require.NoError(t, err)

daePrimary, err := mocks.kubeClient.AppsV1().DaemonSets("default").Get(context.TODO(), "podinfo-primary", metav1.GetOptions{})
Expand All @@ -132,7 +132,7 @@ func TestDaemonSetController_NoConfigTracking(t *testing.T) {
func TestDaemonSetController_HasTargetChanged(t *testing.T) {
dc := daemonsetConfigs{name: "podinfo", label: "name", labelValue: "podinfo"}
mocks := newDaemonSetFixture(dc)
err := mocks.controller.Initialize(mocks.canary)
_, err := mocks.controller.Initialize(mocks.canary)
require.NoError(t, err)

// save last applied hash
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestDaemonSetController_Scale(t *testing.T) {
t.Run("ScaleToZero", func(t *testing.T) {
dc := daemonsetConfigs{name: "podinfo", label: "name", labelValue: "podinfo"}
mocks := newDaemonSetFixture(dc)
err := mocks.controller.Initialize(mocks.canary)
_, err := mocks.controller.Initialize(mocks.canary)
require.NoError(t, err)

err = mocks.controller.ScaleToZero(mocks.canary)
Expand All @@ -238,7 +238,7 @@ func TestDaemonSetController_Scale(t *testing.T) {
t.Run("ScaleFromZeo", func(t *testing.T) {
dc := daemonsetConfigs{name: "podinfo", label: "name", labelValue: "podinfo"}
mocks := newDaemonSetFixture(dc)
err := mocks.controller.Initialize(mocks.canary)
_, err := mocks.controller.Initialize(mocks.canary)
require.NoError(t, err)

err = mocks.controller.ScaleFromZero(mocks.canary)
Expand All @@ -257,7 +257,7 @@ func TestDaemonSetController_Scale(t *testing.T) {
func TestDaemonSetController_Finalize(t *testing.T) {
dc := daemonsetConfigs{name: "podinfo", label: "name", labelValue: "podinfo"}
mocks := newDaemonSetFixture(dc)
err := mocks.controller.Initialize(mocks.canary)
_, err := mocks.controller.Initialize(mocks.canary)
require.NoError(t, err)

err = mocks.controller.Finalize(mocks.canary)
Expand Down
12 changes: 6 additions & 6 deletions pkg/canary/daemonset_ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,21 @@ import (

// IsPrimaryReady checks the primary daemonset status and returns an error if
// the daemonset is in the middle of a rolling update
func (c *DaemonSetController) IsPrimaryReady(cd *flaggerv1.Canary) error {
func (c *DaemonSetController) IsPrimaryReady(cd *flaggerv1.Canary) (bool, error) {
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)
primary, err := c.kubeClient.AppsV1().DaemonSets(cd.Namespace).Get(context.TODO(), primaryName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("daemonset %s.%s get query error: %w", primaryName, cd.Namespace, err)
return true, fmt.Errorf("daemonset %s.%s get query error: %w", primaryName, cd.Namespace, err)
}

_, err = c.isDaemonSetReady(cd, primary, cd.GetAnalysisPrimaryReadyThreshold())
retriable, err := c.isDaemonSetReady(cd, primary, cd.GetAnalysisPrimaryReadyThreshold())
if err != nil {
return fmt.Errorf("primary daemonset %s.%s not ready: %w", primaryName, cd.Namespace, err)
return retriable, fmt.Errorf("primary daemonset %s.%s not ready: %w", primaryName, cd.Namespace, err)
}
return nil
return true, nil
}

// IsCanaryReady checks the primary daemonset and returns an error if
// IsCanaryReady checks the canary daemonset and returns an error if
// the daemonset is in the middle of a rolling update
func (c *DaemonSetController) IsCanaryReady(cd *flaggerv1.Canary) (bool, error) {
targetName := cd.Spec.TargetRef.Name
Expand Down
4 changes: 2 additions & 2 deletions pkg/canary/daemonset_ready_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import (
func TestDaemonSetController_IsReady(t *testing.T) {
dc := daemonsetConfigs{name: "podinfo", label: "name", labelValue: "podinfo"}
mocks := newDaemonSetFixture(dc)
err := mocks.controller.Initialize(mocks.canary)
_, err := mocks.controller.Initialize(mocks.canary)
require.NoError(t, err)

err = mocks.controller.IsPrimaryReady(mocks.canary)
_, err = mocks.controller.IsPrimaryReady(mocks.canary)
require.NoError(t, err)

_, err = mocks.controller.IsCanaryReady(mocks.canary)
Expand Down
6 changes: 3 additions & 3 deletions pkg/canary/daemonset_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
func TestDaemonSetController_SyncStatus(t *testing.T) {
dc := daemonsetConfigs{name: "podinfo", label: "name", labelValue: "podinfo"}
mocks := newDaemonSetFixture(dc)
err := mocks.controller.Initialize(mocks.canary)
_, err := mocks.controller.Initialize(mocks.canary)
require.NoError(t, err)

status := flaggerv1.CanaryStatus{
Expand All @@ -55,7 +55,7 @@ func TestDaemonSetController_SyncStatus(t *testing.T) {
func TestDaemonSetController_SetFailedChecks(t *testing.T) {
dc := daemonsetConfigs{name: "podinfo", label: "name", labelValue: "podinfo"}
mocks := newDaemonSetFixture(dc)
err := mocks.controller.Initialize(mocks.canary)
_, err := mocks.controller.Initialize(mocks.canary)
require.NoError(t, err)

err = mocks.controller.SetStatusFailedChecks(mocks.canary, 1)
Expand All @@ -69,7 +69,7 @@ func TestDaemonSetController_SetFailedChecks(t *testing.T) {
func TestDaemonSetController_SetState(t *testing.T) {
dc := daemonsetConfigs{name: "podinfo", label: "name", labelValue: "podinfo"}
mocks := newDaemonSetFixture(dc)
err := mocks.controller.Initialize(mocks.canary)
_, err := mocks.controller.Initialize(mocks.canary)
require.NoError(t, err)

err = mocks.controller.SetStatusPhase(mocks.canary, flaggerv1.CanaryPhaseProgressing)
Expand Down
10 changes: 5 additions & 5 deletions pkg/canary/deployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,20 @@ type DeploymentController struct {
}

// Initialize creates the primary deployment if it does not exist.
func (c *DeploymentController) Initialize(cd *flaggerv1.Canary) (err error) {
func (c *DeploymentController) Initialize(cd *flaggerv1.Canary) (bool, error) {
if err := c.createPrimaryDeployment(cd, c.includeLabelPrefix); err != nil {
return fmt.Errorf("createPrimaryDeployment failed: %w", err)
return true, fmt.Errorf("createPrimaryDeployment failed: %w", err)
}

if cd.Status.Phase == "" || cd.Status.Phase == flaggerv1.CanaryPhaseInitializing {
if !cd.SkipAnalysis() {
if err := c.IsPrimaryReady(cd); err != nil {
return fmt.Errorf("%w", err)
if retriable, err := c.IsPrimaryReady(cd); err != nil {
return retriable, fmt.Errorf("%w", err)
}
}
}

return nil
return true, nil
}

// Promote copies the pod spec, secrets and config maps from canary to primary
Expand Down
5 changes: 3 additions & 2 deletions pkg/canary/deployment_fixture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type deploymentConfigs struct {
}

func (d deploymentControllerFixture) initializeCanary(t *testing.T) {
err := d.controller.Initialize(d.canary)
_, err := d.controller.Initialize(d.canary)
require.Error(t, err) // not ready yet

primaryName := fmt.Sprintf("%s-primary", d.canary.Spec.TargetRef.Name)
Expand All @@ -73,7 +73,8 @@ func (d deploymentControllerFixture) initializeCanary(t *testing.T) {
_, err = d.controller.kubeClient.AppsV1().Deployments(d.canary.Namespace).Update(context.TODO(), p, metav1.UpdateOptions{})
require.NoError(t, err)

require.NoError(t, d.controller.Initialize(d.canary))
_, err = d.controller.Initialize(d.canary)
require.NoError(t, err)
}

func newDeploymentFixture(dc deploymentConfigs) deploymentControllerFixture {
Expand Down
12 changes: 6 additions & 6 deletions pkg/canary/deployment_ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,23 @@ import (
// IsPrimaryReady checks the primary deployment status and returns an error if
// the deployment is in the middle of a rolling update or if the pods are unhealthy
// it will return a non retryable error if the rolling update is stuck
func (c *DeploymentController) IsPrimaryReady(cd *flaggerv1.Canary) error {
func (c *DeploymentController) IsPrimaryReady(cd *flaggerv1.Canary) (bool, error) {
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)
primary, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(context.TODO(), primaryName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("deployment %s.%s get query error: %w", primaryName, cd.Namespace, err)
return true, fmt.Errorf("deployment %s.%s get query error: %w", primaryName, cd.Namespace, err)
}

_, err = c.isDeploymentReady(primary, cd.GetProgressDeadlineSeconds(), cd.GetAnalysisPrimaryReadyThreshold())
retriable, err := c.isDeploymentReady(primary, cd.GetProgressDeadlineSeconds(), cd.GetAnalysisPrimaryReadyThreshold())
if err != nil {
return fmt.Errorf("%s.%s not ready: %w", primaryName, cd.Namespace, err)
return retriable, fmt.Errorf("%s.%s not ready: %w", primaryName, cd.Namespace, err)
}

if primary.Spec.Replicas == int32p(0) {
return fmt.Errorf("halt %s.%s advancement: primary deployment is scaled to zero",
return false, fmt.Errorf("halt %s.%s advancement: primary deployment is scaled to zero",
cd.Name, cd.Namespace)
}
return nil
return true, nil
}

// IsCanaryReady checks the canary deployment status and returns an error if
Expand Down
2 changes: 1 addition & 1 deletion pkg/canary/deployment_ready_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestDeploymentController_IsReady(t *testing.T) {
mocks := newDeploymentFixture(dc)
mocks.controller.Initialize(mocks.canary)

err := mocks.controller.IsPrimaryReady(mocks.canary)
_, err := mocks.controller.IsPrimaryReady(mocks.canary)
require.Error(t, err)

_, err = mocks.controller.IsCanaryReady(mocks.canary)
Expand Down
14 changes: 7 additions & 7 deletions pkg/canary/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,25 @@ func (c *ServiceController) GetMetadata(_ *flaggerv1.Canary) (string, string, ma
}

// Initialize creates or updates the primary and canary services to prepare for the canary release process targeted on the K8s service
func (c *ServiceController) Initialize(cd *flaggerv1.Canary) (err error) {
func (c *ServiceController) Initialize(cd *flaggerv1.Canary) (bool, error) {
targetName := cd.Spec.TargetRef.Name
primaryName := fmt.Sprintf("%s-primary", targetName)
canaryName := fmt.Sprintf("%s-canary", targetName)

svc, err := c.kubeClient.CoreV1().Services(cd.Namespace).Get(context.TODO(), targetName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("service %s.%s get query error: %w", primaryName, cd.Namespace, err)
return true, fmt.Errorf("service %s.%s get query error: %w", primaryName, cd.Namespace, err)
}

if err = c.reconcileCanaryService(cd, canaryName, svc); err != nil {
return fmt.Errorf("reconcileCanaryService failed: %w", err)
return true, fmt.Errorf("reconcileCanaryService failed: %w", err)
}

if err = c.reconcilePrimaryService(cd, primaryName, svc); err != nil {
return fmt.Errorf("reconcilePrimaryService failed: %w", err)
return true, fmt.Errorf("reconcilePrimaryService failed: %w", err)
}

return nil
return true, nil
}

func (c *ServiceController) reconcileCanaryService(canary *flaggerv1.Canary, name string, src *corev1.Service) error {
Expand Down Expand Up @@ -249,8 +249,8 @@ func (c *ServiceController) HaveDependenciesChanged(_ *flaggerv1.Canary) (bool,
return false, nil
}

func (c *ServiceController) IsPrimaryReady(_ *flaggerv1.Canary) error {
return nil
func (c *ServiceController) IsPrimaryReady(_ *flaggerv1.Canary) (bool, error) {
return true, nil
}

func (c *ServiceController) IsCanaryReady(_ *flaggerv1.Canary) (bool, error) {
Expand Down
17 changes: 13 additions & 4 deletions pkg/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,12 @@ func (c *Controller) advanceCanary(name string, namespace string) {
}

// create primary workload
err = canaryController.Initialize(cd)
retriable, err := canaryController.Initialize(cd)
if err != nil {
c.recordEventWarningf(cd, "%v", err)
if !retriable {
c.rollback(cd, canaryController, meshRouter, scalerReconciler)
}
return
}

Expand Down Expand Up @@ -289,8 +292,12 @@ func (c *Controller) advanceCanary(name string, namespace string) {

// check primary status
if !cd.SkipAnalysis() {
if err := canaryController.IsPrimaryReady(cd); err != nil {
retriable, err := canaryController.IsPrimaryReady(cd)
if err != nil {
c.recordEventWarningf(cd, "%v", err)
if !retriable {
c.rollback(cd, canaryController, meshRouter, scalerReconciler)
}
return
}
}
Expand Down Expand Up @@ -336,10 +343,12 @@ func (c *Controller) advanceCanary(name string, namespace string) {
}

// check canary status
var retriable = true
retriable, err = canaryController.IsCanaryReady(cd)
if err != nil && retriable {
if err != nil {
c.recordEventWarningf(cd, "%v", err)
if !retriable {
c.rollback(cd, canaryController, meshRouter, scalerReconciler)
}
return
}

Expand Down

0 comments on commit 757d901

Please sign in to comment.