diff --git a/docs/crd.md b/docs/crd.md index 8d4c3d57..934a6e74 100644 --- a/docs/crd.md +++ b/docs/crd.md @@ -105,3 +105,8 @@ Below is the list of fields in the custom resource and their description * **VolumeMounts** `type:[]v1.VolumeMount` Describes a mounting of a Volume within a container. + + * **ForceRollback** `type:bool` + Can be set to true to force rollback a deploy/update. The rollback is **not** performed when the application is in a **RUNNING** phase. + If an application is successfully rolled back, it is moved to a *DeployFailed* phase. Un-setting or setting `ForceRollback` to `False` will allow updates to progress normally. + \ No newline at end of file diff --git a/integ/simple_test.go b/integ/simple_test.go index 66be328c..9cdae5fb 100644 --- a/integ/simple_test.go +++ b/integ/simple_test.go @@ -4,6 +4,8 @@ import ( "encoding/json" "fmt" + "k8s.io/apimachinery/pkg/api/resource" + "os" "time" @@ -170,6 +172,59 @@ func (s *IntegSuite) TestSimple(c *C) { }, "") } + // Test force rollback of an active deploy + + { + newApp, err := s.Util.GetFlinkApplication(config.Name) + c.Assert(err, IsNil) + // User sets large (bad) value for cluster update + var TaskManagerDefaultResources = corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("5Gi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("5Gi"), + }, + } + newApp.Spec.TaskManagerConfig.Resources = &TaskManagerDefaultResources + + _, _ = s.Util.FlinkApps().Update(newApp) + c.Assert(s.Util.WaitForPhase(newApp.Name, v1alpha1.FlinkApplicationClusterStarting, ""), IsNil) + + // User realizes error and cancels the deploy + log.Infof("Cancelling deploy...") + newApp.Spec.ForceRollback = true + _, _ = s.Util.FlinkApps().Update(newApp) + + // we should end up in the DeployFailed phase + c.Assert(s.Util.WaitForPhase(newApp.Name, v1alpha1.FlinkApplicationDeployFailed, ""), IsNil) + c.Assert(newApp.Spec.ForceRollback, Equals, true) + log.Info("User cancelled deploy. Job is in deploy failed, waiting for tasks to start") + + // but the job should still be running + c.Assert(newApp.Status.JobStatus.State, Equals, v1alpha1.Running) + log.Info("Attempting to roll forward with fix") + + // Fixing update + var TaskManagerFixedResources = corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("0.2"), + corev1.ResourceMemory: resource.MustParse("200Mi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("0.2"), + corev1.ResourceMemory: resource.MustParse("200Mi"), + }, + } + // and we should be able to roll forward by resubmitting with a fixed config + updateAndValidate(c, s, config.Name, func(app *v1alpha1.FlinkApplication) { + app.Spec.TaskManagerConfig.Resources = &TaskManagerFixedResources + app.Spec.ForceRollback = false + }, "") + } + // delete the application and ensure everything is cleaned up successfully c.Assert(s.Util.FlinkApps().Delete(config.Name, &v1.DeleteOptions{}), IsNil) diff --git a/integ/test.sh b/integ/test.sh index 6250af09..21e7bb3a 100755 --- a/integ/test.sh +++ b/integ/test.sh @@ -9,5 +9,5 @@ export OPERATOR_IMAGE=127.0.0.1:32000/flinkk8soperator:local umask 000 cd $(dirname "$0") -go test -timeout 20m -check.vv IntegSuite +go test -timeout 22m -check.vv IntegSuite diff --git a/pkg/apis/app/v1alpha1/types.go b/pkg/apis/app/v1alpha1/types.go index 4e0e9fea..c4d951dd 100644 --- a/pkg/apis/app/v1alpha1/types.go +++ b/pkg/apis/app/v1alpha1/types.go @@ -53,6 +53,7 @@ type FlinkApplicationSpec struct { RestartNonce string `json:"restartNonce"` DeleteMode DeleteMode `json:"deleteMode,omitempty"` AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"` + ForceRollback bool `json:"forceRollback"` } type FlinkConfig map[string]interface{} @@ -162,6 +163,7 @@ type FlinkApplicationStatus struct { ClusterStatus FlinkClusterStatus `json:"clusterStatus,omitempty"` JobStatus FlinkJobStatus `json:"jobStatus"` FailedDeployHash string `json:"failedDeployHash,omitEmpty"` + RollbackHash string `json:"rollbackHash,omitEmpty"` DeployHash string `json:"deployHash"` RetryCount int32 `json:"retryCount,omitEmpty"` LastSeenError *client.FlinkApplicationError `json:"lastSeenError,omitEmpty"` diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index 86acd05c..1585e8d9 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -78,6 +78,9 @@ func (s *FlinkStateMachine) updateApplicationPhase(application *v1alpha1.FlinkAp } func (s *FlinkStateMachine) shouldRollback(ctx context.Context, application *v1alpha1.FlinkApplication) bool { + if application.Spec.ForceRollback && application.Status.Phase != v1alpha1.FlinkApplicationRollingBackJob { + return true + } if application.Status.DeployHash == "" { // TODO: we may want some more sophisticated way of handling this case // there's no previous deploy for this application, so nothing to roll back to @@ -228,7 +231,8 @@ func (s *FlinkStateMachine) deployFailed(ctx context.Context, app *v1alpha1.Flin s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "RolledBackDeploy", fmt.Sprintf("Successfull rolled back deploy %s", hash)) app.Status.FailedDeployHash = hash - + // set rollbackHash to deployHash + app.Status.RollbackHash = app.Status.DeployHash // Reset error and retry count app.Status.LastSeenError = nil app.Status.RetryCount = 0 @@ -484,6 +488,8 @@ func (s *FlinkStateMachine) handleRollingBack(ctx context.Context, app *v1alpha1 app.Status.JobStatus.EntryClass, app.Status.JobStatus.ProgramArgs, app.Status.JobStatus.AllowNonRestoredState) + // set rollbackHash + app.Status.RollbackHash = app.Status.DeployHash if err != nil { return applicationUnchanged, err } diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index ea308836..d94e6be6 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -1128,3 +1128,71 @@ func TestErrorHandlingInRunningPhase(t *testing.T) { assert.Nil(t, app.Status.LastSeenError) } + +func TestForceRollback(t *testing.T) { + oldHash := "old-hash-force-rollback" + app := v1alpha1.FlinkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "flink", + }, + Spec: v1alpha1.FlinkApplicationSpec{ + JarName: "job.jar", + Parallelism: 5, + EntryClass: "com.my.Class", + ProgramArgs: "--test", + ForceRollback: true, + }, + Status: v1alpha1.FlinkApplicationStatus{ + Phase: v1alpha1.FlinkApplicationSubmittingJob, + DeployHash: oldHash, + }, + } + + stateMachineForTest := getTestStateMachine() + stateMachineForTest.clock.(*clock.FakeClock).SetTime(time.Now()) + + mockRetryHandler := stateMachineForTest.retryHandler.(*mock.RetryHandler) + mockRetryHandler.WaitOnErrorFunc = func(clock clock.Clock, lastUpdatedTime time.Time) (duration time.Duration, b bool) { + return time.Millisecond, true + } + + mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) + + getServiceCount := 0 + mockK8Cluster.GetServiceFunc = func(ctx context.Context, namespace string, name string) (*v1.Service, error) { + hash := oldHash + if getServiceCount > 0 { + hash = oldHash + } + + getServiceCount++ + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "flink", + }, + Spec: v1.ServiceSpec{ + Selector: map[string]string{ + "flink-app-hash": hash, + }, + }, + }, nil + } + + mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) + mockFlinkController.IsServiceReadyFunc = func(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) (bool, error) { + return true, nil + } + + err := stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + // rolled deploy while cluster is starting + assert.Equal(t, v1alpha1.FlinkApplicationRollingBackJob, app.Status.Phase) + assert.True(t, app.Spec.ForceRollback) + + err = stateMachineForTest.Handle(context.Background(), &app) + // Check if rollback hash is set + assert.Nil(t, err) + assert.Equal(t, oldHash, app.Status.RollbackHash) +}