diff --git a/deploy/crd.yaml b/deploy/crd.yaml index 4be01739..2d05d90d 100644 --- a/deploy/crd.yaml +++ b/deploy/crd.yaml @@ -85,7 +85,7 @@ spec: type: boolean deploymentMode: type: string - enum: [Dual] + enum: [Dual, BlueGreen] rpcPort: type: integer minimum: 1 diff --git a/docs/blue_green_state_machine.mmd b/docs/blue_green_state_machine.mmd new file mode 100644 index 00000000..47e1ae6d --- /dev/null +++ b/docs/blue_green_state_machine.mmd @@ -0,0 +1,46 @@ +%% This file can be compiled into blue_green_state_machine.png by installing mermaidjs (https://mermaidjs.github.io/) and running +%% mmdc -i blue_green_state_machine.mmd -o blue_green_state_machine.png -w 1732 -b transparent + +graph LR +New --> ClusterStarting + +subgraph Running +Running +DeployFailed +end + +subgraph Updating +Running --> Updating +Updating --> ClusterStarting +DeployFailed --> Updating + +ClusterStarting -- savepoint disabled --> SubmittingJob +ClusterStarting -- savepoint enabled --> Savepointing +ClusterStarting -- Create fails --> DeployFailed + +Savepointing --> SubmittingJob +Savepointing -- Savepoint fails --> Recovering + +Recovering --> SubmittingJob +Recovering -- No externalized checkpoint --> RollingBackJob + +SubmittingJob -- first deploy --> Running +SubmittingJob -- updating existing application --> DualRunning +SubmittingJob -- job start fails --> RollingBackJob +RollingBackJob --> DeployFailed + +DualRunning -- tearDownVersionHash set --> Running +DualRunning -- tear down fails --> DeployFailed +end + +linkStyle 4 stroke:#303030 +linkStyle 5 stroke:#303030 +linkStyle 6 stroke:#FF0000 +linkStyle 8 stroke:#FF0000 +linkStyle 10 stroke:#FF0000 +linkStyle 11 stroke:#303030 +linkStyle 12 stroke:#303030 +linkStyle 13 stroke:#FF0000 +linkStyle 14 stroke:#FF0000 +linkStyle 15 stroke:#303030 +linkStyle 16 stroke:#FF0000 \ No newline at end of file diff --git a/docs/blue_green_state_machine.png b/docs/blue_green_state_machine.png new file mode 100644 index 00000000..b9d4cb1a Binary files /dev/null and b/docs/blue_green_state_machine.png differ diff --git a/docs/crd.md b/docs/crd.md index 71db6720..e71a8852 100644 --- a/docs/crd.md +++ b/docs/crd.md @@ -103,10 +103,12 @@ Below is the list of fields in the custom resource and their description Optional map of flink configuration, which passed on to the deployment as environment variable with `OPERATOR_FLINK_CONFIG` * **deploymentMode** `type:DeploymentMode` - Indicates the type of deployment that operator should perform if the custom resource is updated. Currently only Dual is supported. + Indicates the type of deployment that operator should perform if the custom resource is updated. Currently two deployment modes, Dual and BlueGreen are supported. `Dual` This deployment mode is intended for applications where downtime during deployment needs to be as minimal as possible. In this deployment mode, the operator brings up a second Flink cluster with the new image, while the original Flink cluster is still active. Once the pods and containers in the new flink cluster are ready, the Operator cancels the job in the first Cluster with savepoint, deletes the cluster and starts the job in the second cluster. (More information in the state machine section below). This mode is suitable for real time processing applications. - + `BlueGreen` This deployment mode is intended for applications where downtime during deployment needs to be zero. In this mode, the operator brings up a whole new flink job/cluster along side the original Flink job. The two versions of Flink jobs are differentiated by a color: blue/green. Once the new Flink application version is created, the application transitions to a `DualRunning` phase. To transition back from the `DualRunning` phase to a single application version, users must + set a `tearDownVersionHash` that enables the operator to teardown the version corresponding to the hash specified. + * **deleteMode** `type:DeleteMode` Indicates how Flink jobs are torn down when the FlinkApplication resource is deleted @@ -134,3 +136,8 @@ Below is the list of fields in the custom resource and their description is used during the operator update workflow. This default exists only to protect one from accidentally restarting the application using a very old checkpoint (which might put your application under huge load). **Note:** this doesn't affect the flink application's checkpointing mechanism in anyway. + + * **tearDownVersionHash** `type:string` + Used **only** with the BlueGreen deployment mode. This is set typically once a FlinkApplication successfully transitions to the `DualRunning` phase. + Once set, the application version corresponding to the hash is torn down. On successful teardown, the FlinkApplication transitions to a `Running` phase. + diff --git a/docs/state_machine.mmd b/docs/dual_state_machine.mmd similarity index 92% rename from docs/state_machine.mmd rename to docs/dual_state_machine.mmd index fd042a46..e856f8fe 100644 --- a/docs/state_machine.mmd +++ b/docs/dual_state_machine.mmd @@ -1,5 +1,5 @@ %% This file can be compiled into state_machine.png by installing mermaidjs (https://mermaidjs.github.io/) and running -%% mmdc -i state_machine.mmd -o state_machine.png -w 1732 -b transparent +%% mmdc -i state_machine.mmd -o dual_state_machine.png -w 1732 -b transparent graph LR New --> ClusterStarting diff --git a/docs/state_machine.md b/docs/state_machine.md index ca92e269..4d6c9c65 100644 --- a/docs/state_machine.md +++ b/docs/state_machine.md @@ -6,9 +6,10 @@ with the underlying Kubernetes resources, it takes the necessary actions to upda Typically this will involve traversing the state machine. The final desired state is `Running`, which indicates that a healthy Flink cluster has been started and the Flink job has been successfully submitted. -The full state machine looks like this: -![Flink operator state machine](state_machine.png) - +The state machine for a `Dual` deployment mode (default) looks like this: +![Flink operator state machine for Dual deployment mode](dual_state_machine.png) +The state machine for a `BlueGreen` deployment mode looks like this: +![Flink operator state machine for BlueGreen deployment mode](blue_green_state_machine.png) # States ### New / Updating @@ -17,52 +18,73 @@ The full state machine looks like this: created, and we transition to the ClusterStarting phase to monitor. The deployment objects created by the operator are labelled and annotated as indicated in the custom resource. The operator also sets the corresponding environment variables and arguments for the containers to start up the Flink application from the image. - +#### BlueGreen deployment mode +Along with the annotations and labels in the custom resources, the deployment objects are suffixed with the application +version name, that is either `blue` or `green`. The version name is also injected into the container environment. +Additionally, the external URLs for each of the versions is also suffixed with the color. ### ClusterStarting In this state, the operator monitors the Flink cluster created in the New state. Once it successfully starts, we check if the spec has `savepointDisabled` field set to true. If yes, we transition to `Cancelling` state else to `Savepointing`. If we are unable to start the cluster for some reason (an invalid image, bad configuration, not enough Kubernetes resources, etc.), we transition to the `DeployFailed` state. - +#### BlueGreen deployment mode +In this mode, once the new cluster is started, we transition into the `Savepointing`/`SubmittingJob` mode based on the `savepointDisabled` +flag. There is no job cancellation involved in the update process during a BlueGreen deployment. ### Cancelling In this state, the operator attempts to cancel the running job (if existing) and transition to `SubmittingJob` state. If it fails, we transition to `RollingBack`. - +#### BlueGreen deployment mode +This state is not reached during a BlueGreen deployment ### Savepointing In the `Savepointing` state, the operator attempts to cancel the existing job with a [savepoint](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html) (if this is the first deploy for the FlinkApplication and there is no existing job, we transition straight to `SubmittingJob`). The operator monitors the savepoint process until it succeeds or fails. If savepointing succeeds, we move to the `SubmittingJob` phase. If it fails, we move to the `Recovering` phase to attempt to recover from an externalized checkpoint. - +#### BlueGreen deployment mode +In this state, during a BlueGreen deployment, the currently running Flink job is savepointed (without cancellation). ### Recovering If savepointing fails, the operator will look for an [externalized checkpoint](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint) and attempt to use that for recovery. If one is not availble, the application transitions to the `DeployFailed` state. Otherwise, it transitions to the `SubmittingJob` state. - +#### BlueGreen deployment mode +There is no change in behavior for this state during a BlueGreen deployment. ### SubmittingJob In this state, the operator waits until the JobManager is ready, then attempts to submit the Flink job to the cluster. If we are updating an existing job or the user has specified a savepoint to restore from, that will be used. Once the job is successfully running the application transitions to the `Running` state. If the job submission fails we transition to the `RollingBack` state. - +#### BlueGreen deployment mode +During a BlueGreen deployment, the operator submits a job to the newly created cluster (with a version that's different from the +originally running Flink application version). ### RollingBack This state is reached when, in the middle of a deploy, the old job has been canceled but the new job did not come up successfully. In that case we will attempt to roll back by resubmitting the old job on the old cluster, after which we transition to the `DeployFailed` state. - +#### BlueGreen deployment mode +In the BlueGreen deployment mode, the operator does not attempt to resubmit the old job (as we never cancel it in the first place). +We transition directly to the `DeployFailed` state. ### Running The `Running` state indicates that the FlinkApplication custom resource has reached the desired state, and the job is running in the Flink cluster. In this state the operator continuously checks if the resource has been modified and monitors the health of the Flink cluster and job. - +#### BlueGreen deployment mode +There is no change in behavior for this state during a BlueGreen deployment. ### DeployFailed The `DeployFailed` state operates exactly like the `Running` state. It exists to inform the user that an attempted update has failed, i.e., that the FlinkApplication status does not currently match the desired spec. In this state, the user should look at the Flink logs and Kubernetes events to determine what went wrong. The user can then perform a new deploy by updating the FlinkApplication. - +#### BlueGreen deployment mode +There is no change in behavior for this state during a BlueGreen deployment. ### Deleting This state indicates that the FlinkApplication resource has been deleted. The operator will clean up the job according to the DeleteMode configured. Once all clean up steps have been performed the FlinkApplication will be deleted. +#### BlueGreen deployment mode +In this mode, if there are two application versions running, both versions are deleted (as per the `DeleteMode` configuration). +### DualRunning +This state is only ever reached when the FlinkApplication is deployed with the BlueGreen deployment mode. In this state, +there are two application versions running — `blue` and `green`. Once a user is ready to tear down one of the versions, they +set a `tearDownVersionHash`. If this is set, the operator then tears down the application version corresponding to +the `tearDownVersionHash`. Once the teardown is complete, we transition back to the `Running` state. diff --git a/integ/blue_green_deployment_test.go b/integ/blue_green_deployment_test.go new file mode 100644 index 00000000..414b5606 --- /dev/null +++ b/integ/blue_green_deployment_test.go @@ -0,0 +1,147 @@ +package integ + +import ( + "time" + + "github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1" + "github.com/prometheus/common/log" + . "gopkg.in/check.v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func WaitForUpdate(c *C, s *IntegSuite, name string, updateFn func(app *v1beta1.FlinkApplication), phase v1beta1.FlinkApplicationPhase, failurePhase v1beta1.FlinkApplicationPhase) *v1beta1.FlinkApplication { + + // update with new image. + app, err := s.Util.Update(name, updateFn) + c.Assert(err, IsNil) + + for { + // keep trying until the new job is launched + newApp, err := s.Util.GetFlinkApplication(name) + c.Assert(err, IsNil) + if newApp.Status.VersionStatuses[s.Util.GetCurrentStatusIndex(app)].JobStatus.JobID != "" { + break + } + time.Sleep(100 * time.Millisecond) + } + + c.Assert(s.Util.WaitForPhase(name, phase, failurePhase), IsNil) + c.Assert(s.Util.WaitForAllTasksRunning(name), IsNil) + + newApp, _ := s.Util.GetFlinkApplication(name) + return newApp +} + +func (s *IntegSuite) TestUpdateWithBlueGreenDeploymentMode(c *C) { + + testName := "bluegreenupdate" + const finalizer = "bluegreen.finalizers.test.com" + + // start a simple app + config, err := s.Util.ReadFlinkApplication("test_app.yaml") + c.Assert(err, IsNil, Commentf("Failed to read test app yaml")) + + config.Name = testName + "job" + config.Spec.DeploymentMode = v1beta1.DeploymentModeBlueGreen + config.ObjectMeta.Labels["integTest"] = testName + config.Finalizers = append(config.Finalizers, finalizer) + + c.Assert(s.Util.CreateFlinkApplication(config), IsNil, + Commentf("Failed to create flink application")) + + c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil) + c.Assert(s.Util.WaitForAllTasksRunning(config.Name), IsNil) + + pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). + List(v1.ListOptions{LabelSelector: "integTest=" + testName}) + c.Assert(err, IsNil) + c.Assert(len(pods.Items), Equals, 3) + for _, pod := range pods.Items { + c.Assert(pod.Spec.Containers[0].Image, Equals, config.Spec.Image) + } + + // test updating the app with a new image + newApp := WaitForUpdate(c, s, config.Name, func(app *v1beta1.FlinkApplication) { + app.Spec.Image = NewImage + }, v1beta1.FlinkApplicationDualRunning, v1beta1.FlinkApplicationDeployFailed) + + c.Assert(newApp.Spec.Image, Equals, NewImage) + c.Assert(newApp.Status.SavepointPath, NotNil) + + pods, err = s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). + List(v1.ListOptions{LabelSelector: "integTest=" + testName}) + c.Assert(err, IsNil) + // We have 2 applications running + c.Assert(len(pods.Items), Equals, 6) + c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationDualRunning, v1beta1.FlinkApplicationDeployFailed), IsNil) + c.Assert(s.Util.GetJobID(newApp), NotNil) + c.Assert(newApp.Status.UpdatingVersion, Equals, v1beta1.BlueFlinkApplication) + c.Assert(newApp.Status.DeployVersion, Equals, v1beta1.GreenFlinkApplication) + + // TearDownVersionHash + teardownVersion := newApp.Status.DeployVersion + hashToTeardown := newApp.Status.DeployHash + oldHash := newApp.Status.DeployHash + log.Infof("Tearing down version %s", teardownVersion) + newApp = WaitForUpdate(c, s, config.Name, func(app *v1beta1.FlinkApplication) { + app.Spec.TearDownVersionHash = hashToTeardown + }, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed) + + // wait for the old cluster to be cleaned up + for { + pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). + List(v1.ListOptions{LabelSelector: "flink-app-hash=" + oldHash}) + c.Assert(err, IsNil) + if len(pods.Items) == 0 { + break + } + time.Sleep(100 * time.Millisecond) + } + + c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil) + c.Assert(newApp.Status.TeardownHash, NotNil) + c.Assert(newApp.Status.DeployVersion, Equals, v1beta1.BlueFlinkApplication) + c.Assert(newApp.Status.VersionStatuses[0].JobStatus.JobID, NotNil) + c.Assert(newApp.Status.VersionStatuses[1].JobStatus, Equals, v1beta1.FlinkJobStatus{}) + + pods, err = s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). + List(v1.ListOptions{LabelSelector: "flink-app-hash=" + oldHash}) + for _, pod := range pods.Items { + log.Infof("Pod name %s", pod.Name) + c.Assert(pod.Labels["flink-application-version"], Not(Equals), teardownVersion) + } + + c.Assert(err, IsNil) + c.Assert(len(pods.Items), Equals, 0) + + // cleanup + c.Assert(s.Util.FlinkApps().Delete(newApp.Name, &v1.DeleteOptions{}), IsNil) + var app *v1beta1.FlinkApplication + for { + app, err = s.Util.GetFlinkApplication(config.Name) + c.Assert(err, IsNil) + if len(app.Finalizers) == 1 && app.Finalizers[0] == finalizer { + break + } + time.Sleep(100 * time.Millisecond) + } + + job := s.Util.GetJobOverview(app) + c.Assert(job["status"], Equals, "CANCELED") + c.Assert(app.Status.SavepointPath, NotNil) + + // delete our finalizer + app.Finalizers = []string{} + _, err = s.Util.FlinkApps().Update(app) + c.Assert(err, IsNil) + + for { + pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). + List(v1.ListOptions{LabelSelector: "integTest=" + testName}) + c.Assert(err, IsNil) + if len(pods.Items) == 0 { + break + } + } + log.Info("All pods torn down") +} diff --git a/integ/utils/utils.go b/integ/utils/utils.go index fb632179..d32a5674 100644 --- a/integ/utils/utils.go +++ b/integ/utils/utils.go @@ -386,7 +386,10 @@ func (f *TestUtil) FlinkAPIGet(app *flinkapp.FlinkApplication, endpoint string) url := fmt.Sprintf("http://localhost:8001/api/v1/namespaces/%s/"+ "services/%s:8081/proxy/%s", f.Namespace.Name, app.Name, endpoint) + if flinkapp.IsBlueGreenDeploymentMode(app.Spec.DeploymentMode) { + url = f.getURLForBlueGreenDeployment(app, endpoint) + } resp, err := resty.SetRedirectPolicy(resty.FlexibleRedirectPolicy(5)).R().Get(url) if err != nil { return nil, err @@ -405,6 +408,17 @@ func (f *TestUtil) FlinkAPIGet(app *flinkapp.FlinkApplication, endpoint string) return result, nil } +func (f *TestUtil) getURLForBlueGreenDeployment(app *flinkapp.FlinkApplication, endpoint string) string { + versionSuffix := string(app.Status.UpdatingVersion) + if versionSuffix == "" { + versionSuffix = string(app.Status.DeployVersion) + } + return fmt.Sprintf("http://localhost:8001/api/v1/namespaces/%s/"+ + "services/%s-%s:8081/proxy/%s", + f.Namespace.Name, app.Name, versionSuffix, endpoint) + +} + func (f *TestUtil) FlinkAPIPatch(app *flinkapp.FlinkApplication, endpoint string) (interface{}, error) { url := fmt.Sprintf("http://localhost:8001/api/v1/namespaces/%s/"+ @@ -453,7 +467,7 @@ func (f *TestUtil) WaitForAllTasksRunning(name string) error { return err } - endpoint := fmt.Sprintf("jobs/%s", flinkApp.Status.JobStatus.JobID) + endpoint := fmt.Sprintf("jobs/%s", f.GetJobID(flinkApp)) for { res, err := f.FlinkAPIGet(flinkApp, endpoint) if err != nil { @@ -514,9 +528,42 @@ func (f *TestUtil) GetJobOverview(app *flinkapp.FlinkApplication) map[string]int jobList := jobMap["jobs"].([]interface{}) for _, j := range jobList { job := j.(map[string]interface{}) - if job["id"] == app.Status.JobStatus.JobID { + if job["id"] == f.GetJobID(app) { return job } } return nil } + +func (f *TestUtil) Min(x, y int32) int32 { + if x < y { + return x + } + return y +} + +func (f *TestUtil) GetJobID(app *flinkapp.FlinkApplication) string { + if flinkapp.IsBlueGreenDeploymentMode(app.Spec.DeploymentMode) { + return app.Status.VersionStatuses[f.GetCurrentStatusIndex(app)].JobStatus.JobID + } + + return app.Status.JobStatus.JobID +} + +func (f *TestUtil) GetCurrentStatusIndex(app *flinkapp.FlinkApplication) int32 { + if flinkapp.IsRunningPhase(app.Status.Phase) || app.Status.DeployHash == "" || + app.Status.Phase == flinkapp.FlinkApplicationSavepointing || app.Status.Phase == flinkapp.FlinkApplicationDeleting { + return 0 + } + + if app.Status.Phase == flinkapp.FlinkApplicationDualRunning { + return 1 + } + + // activeJobs and maxRunningJobs would be different once a TearDownVersionHash has happened and + // the app has moved back to a Running state. + activeJobs := int32(len(app.Status.VersionStatuses)) + maxRunningJobs := flinkapp.GetMaxRunningJobs(app.Spec.DeploymentMode) + index := f.Min(activeJobs, maxRunningJobs) - 1 + return index +} diff --git a/pkg/apis/app/v1beta1/types.go b/pkg/apis/app/v1beta1/types.go index 14e47f37..8b646b1b 100644 --- a/pkg/apis/app/v1beta1/types.go +++ b/pkg/apis/app/v1beta1/types.go @@ -58,6 +58,7 @@ type FlinkApplicationSpec struct { AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"` ForceRollback bool `json:"forceRollback"` MaxCheckpointRestoreAgeSeconds *int32 `json:"maxCheckpointRestoreAgeSeconds,omitempty"` + TearDownVersionHash string `json:"tearDownVersionHash,omitempty"` } type FlinkConfig map[string]interface{} @@ -168,30 +169,34 @@ type FlinkJobStatus struct { } type FlinkApplicationStatus struct { - Phase FlinkApplicationPhase `json:"phase"` - StartedAt *metav1.Time `json:"startedAt,omitempty"` - LastUpdatedAt *metav1.Time `json:"lastUpdatedAt,omitempty"` - Reason string `json:"reason,omitempty"` - DeployVersion string `json:"deployVersion,omitempty"` - UpdatingVersion string `json:"updatingVersion,omitempty"` - // To ensure backward compatibility, repeat ClusterStatus and JobStatus + Phase FlinkApplicationPhase `json:"phase"` + StartedAt *metav1.Time `json:"startedAt,omitempty"` + LastUpdatedAt *metav1.Time `json:"lastUpdatedAt,omitempty"` + Reason string `json:"reason,omitempty"` + DeployVersion FlinkApplicationVersion `json:"deployVersion,omitempty"` + UpdatingVersion FlinkApplicationVersion `json:"updatingVersion,omitempty"` ClusterStatus FlinkClusterStatus `json:"clusterStatus,omitempty"` JobStatus FlinkJobStatus `json:"jobStatus,omitempty"` VersionStatuses []FlinkApplicationVersionStatus `json:"versionStatuses,omitempty"` FailedDeployHash string `json:"failedDeployHash,omitempty"` RollbackHash string `json:"rollbackHash,omitempty"` DeployHash string `json:"deployHash"` + UpdatingHash string `json:"updatingHash,omitempty"` + TeardownHash string `json:"teardownHash,omitempty"` SavepointTriggerID string `json:"savepointTriggerId,omitempty"` SavepointPath string `json:"savepointPath,omitempty"` RetryCount int32 `json:"retryCount,omitempty"` LastSeenError *FlinkApplicationError `json:"lastSeenError,omitempty"` + // We store deployment mode in the status to prevent incompatible migrations from + // Dual --> BlueGreen and BlueGreen --> Dual + DeploymentMode DeploymentMode `json:"deploymentMode,omitempty"` } type FlinkApplicationVersion string const ( - BlueFlinkApplication FlinkApplicationVersion = "Blue" - GreenFlinkApplication FlinkApplicationVersion = "Green" + BlueFlinkApplication FlinkApplicationVersion = "blue" + GreenFlinkApplication FlinkApplicationVersion = "green" ) type FlinkApplicationVersionStatus struct { @@ -245,7 +250,6 @@ const ( FlinkApplicationRollingBackJob FlinkApplicationPhase = "RollingBackJob" FlinkApplicationDeployFailed FlinkApplicationPhase = "DeployFailed" FlinkApplicationDualRunning FlinkApplicationPhase = "DualRunning" - FlinkApplicationTeardown FlinkApplicationPhase = "Teardown" ) var FlinkApplicationPhases = []FlinkApplicationPhase{ @@ -261,7 +265,6 @@ var FlinkApplicationPhases = []FlinkApplicationPhase{ FlinkApplicationDeployFailed, FlinkApplicationRollingBackJob, FlinkApplicationDualRunning, - FlinkApplicationTeardown, } func IsRunningPhase(phase FlinkApplicationPhase) bool { @@ -351,4 +354,5 @@ const ( GetTaskManagers FlinkMethod = "GetTaskManagers" GetCheckpointCounts FlinkMethod = "GetCheckpointCounts" GetJobOverview FlinkMethod = "GetJobOverview" + SavepointJob FlinkMethod = "SavepointJob" ) diff --git a/pkg/controller/flink/client/api.go b/pkg/controller/flink/client/api.go index 76048d0b..5084f768 100644 --- a/pkg/controller/flink/client/api.go +++ b/pkg/controller/flink/client/api.go @@ -48,6 +48,7 @@ const jobSubmissionException = "org.apache.flink.runtime.client.JobSubmissionExc type FlinkAPIInterface interface { CancelJobWithSavepoint(ctx context.Context, url string, jobID string) (string, error) + SavepointJob(ctx context.Context, url string, jobID string) (string, error) ForceCancelJob(ctx context.Context, url string, jobID string) error SubmitJob(ctx context.Context, url string, jarID string, submitJobRequest SubmitJobRequest) (*SubmitJobResponse, error) CheckSavepointStatus(ctx context.Context, url string, jobID, triggerID string) (*SavepointResponse, error) @@ -82,6 +83,8 @@ type flinkJobManagerClientMetrics struct { getClusterFailureCounter labeled.Counter getCheckpointsSuccessCounter labeled.Counter getCheckpointsFailureCounter labeled.Counter + savepointJobSuccessCounter labeled.Counter + savepointJobFailureCounter labeled.Counter } func newFlinkJobManagerClientMetrics(scope promutils.Scope) *flinkJobManagerClientMetrics { @@ -104,6 +107,8 @@ func newFlinkJobManagerClientMetrics(scope promutils.Scope) *flinkJobManagerClie getClusterFailureCounter: labeled.NewCounter("get_cluster_failure", "Get cluster overview failed", flinkJmClientScope), getCheckpointsSuccessCounter: labeled.NewCounter("get_checkpoints_success", "Get checkpoint request succeeded", flinkJmClientScope), getCheckpointsFailureCounter: labeled.NewCounter("get_checkpoints_failed", "Get checkpoint request failed", flinkJmClientScope), + savepointJobSuccessCounter: labeled.NewCounter("savepoint_job_success", "Savepoint job request succeeded", flinkJmClientScope), + savepointJobFailureCounter: labeled.NewCounter("savepoint_job_failed", "Savepoint job request failed", flinkJmClientScope), } } @@ -181,7 +186,7 @@ func (c *FlinkJobManagerClient) CancelJobWithSavepoint(ctx context.Context, url path := fmt.Sprintf(savepointURL, jobID) url = url + path - cancelJobRequest := CancelJobRequest{ + cancelJobRequest := SavepointJobRequest{ CancelJob: true, } response, err := c.executeRequest(ctx, httpPost, url, cancelJobRequest) @@ -194,7 +199,7 @@ func (c *FlinkJobManagerClient) CancelJobWithSavepoint(ctx context.Context, url logger.Errorf(ctx, fmt.Sprintf("Cancel job failed with response %v", response)) return "", GetRetryableError(err, v1beta1.CancelJobWithSavepoint, response.Status(), 5) } - var cancelJobResponse CancelJobResponse + var cancelJobResponse SavepointJobResponse if err = json.Unmarshal(response.Body(), &cancelJobResponse); err != nil { logger.Errorf(ctx, "Unable to Unmarshal cancelJobResponse %v, err: %v", response, err) return "", GetRetryableError(err, v1beta1.CancelJobWithSavepoint, JSONUnmarshalError, 5) @@ -227,7 +232,6 @@ func (c *FlinkJobManagerClient) ForceCancelJob(ctx context.Context, url string, func (c *FlinkJobManagerClient) SubmitJob(ctx context.Context, url string, jarID string, submitJobRequest SubmitJobRequest) (*SubmitJobResponse, error) { path := fmt.Sprintf(submitJobURL, jarID) url = url + path - response, err := c.executeRequest(ctx, httpPost, url, submitJobRequest) if err != nil { c.metrics.submitJobFailureCounter.Inc(ctx) @@ -385,6 +389,32 @@ func (c *FlinkJobManagerClient) GetJobOverview(ctx context.Context, url string, return &jobOverviewResponse, nil } +func (c *FlinkJobManagerClient) SavepointJob(ctx context.Context, url string, jobID string) (string, error) { + path := fmt.Sprintf(savepointURL, jobID) + + url = url + path + savepointJobRequest := SavepointJobRequest{ + CancelJob: false, + } + response, err := c.executeRequest(ctx, httpPost, url, savepointJobRequest) + if err != nil { + c.metrics.savepointJobFailureCounter.Inc(ctx) + return "", GetRetryableError(err, v1beta1.CancelJobWithSavepoint, GlobalFailure, 5) + } + if response != nil && !response.IsSuccess() { + c.metrics.cancelJobFailureCounter.Inc(ctx) + logger.Errorf(ctx, fmt.Sprintf("Savepointing job failed with response %v", response)) + return "", GetRetryableError(err, v1beta1.SavepointJob, response.Status(), 5) + } + var savepointJobResponse SavepointJobResponse + if err = json.Unmarshal(response.Body(), &savepointJobResponse); err != nil { + logger.Errorf(ctx, "Unable to Unmarshal savepointJobResponse %v, err: %v", response, err) + return "", GetRetryableError(err, v1beta1.SavepointJob, JSONUnmarshalError, 5) + } + c.metrics.savepointJobSuccessCounter.Inc(ctx) + return savepointJobResponse.TriggerID, nil +} + func NewFlinkJobManagerClient(config config.RuntimeConfig) FlinkAPIInterface { metrics := newFlinkJobManagerClientMetrics(config.MetricsScope) return &FlinkJobManagerClient{ diff --git a/pkg/controller/flink/client/api_test.go b/pkg/controller/flink/client/api_test.go index f2fd582e..f7e13930 100644 --- a/pkg/controller/flink/client/api_test.go +++ b/pkg/controller/flink/client/api_test.go @@ -405,7 +405,7 @@ func TestCancelJobHappyCase(t *testing.T) { httpmock.Activate() defer httpmock.DeactivateAndReset() ctx := context.Background() - response := CancelJobResponse{ + response := SavepointJobResponse{ TriggerID: "133", } responder, _ := httpmock.NewJsonResponder(203, response) diff --git a/pkg/controller/flink/client/entities.go b/pkg/controller/flink/client/entities.go index afcb87fa..ffbdb8f3 100644 --- a/pkg/controller/flink/client/entities.go +++ b/pkg/controller/flink/client/entities.go @@ -31,7 +31,7 @@ const ( Reconciling JobState = "RECONCILING" ) -type CancelJobRequest struct { +type SavepointJobRequest struct { CancelJob bool `json:"cancel-job"` TargetDirectory string `json:"target-directory,omitempty"` } @@ -63,7 +63,7 @@ type FailureCause struct { StackTrace string `json:"stack-trace"` } -type CancelJobResponse struct { +type SavepointJobResponse struct { TriggerID string `json:"request-id"` } diff --git a/pkg/controller/flink/client/mock/mock_api.go b/pkg/controller/flink/client/mock/mock_api.go index 67b7b773..873d96a8 100644 --- a/pkg/controller/flink/client/mock/mock_api.go +++ b/pkg/controller/flink/client/mock/mock_api.go @@ -17,7 +17,7 @@ type GetJobConfigFunc func(ctx context.Context, url string, jobID string) (*clie type GetTaskManagersFunc func(ctx context.Context, url string) (*client.TaskManagersResponse, error) type GetCheckpointCountsFunc func(ctx context.Context, url string, jobID string) (*client.CheckpointResponse, error) type GetJobOverviewFunc func(ctx context.Context, url string, jobID string) (*client.FlinkJobOverview, error) - +type SavepointJobFunc func(ctx context.Context, url string, jobID string) (string, error) type JobManagerClient struct { CancelJobWithSavepointFunc CancelJobWithSavepointFunc ForceCancelJobFunc ForceCancelJobFunc @@ -30,6 +30,7 @@ type JobManagerClient struct { GetTaskManagersFunc GetTaskManagersFunc GetCheckpointCountsFunc GetCheckpointCountsFunc GetJobOverviewFunc GetJobOverviewFunc + SavepointJobFunc SavepointJobFunc } func (m *JobManagerClient) SubmitJob(ctx context.Context, url string, jarID string, submitJobRequest client.SubmitJobRequest) (*client.SubmitJobResponse, error) { @@ -108,3 +109,11 @@ func (m *JobManagerClient) GetJobOverview(ctx context.Context, url string, jobID } return nil, nil } + +func (m *JobManagerClient) SavepointJob(ctx context.Context, url string, jobID string) (string, error) { + if m.SavepointJobFunc != nil { + return m.SavepointJobFunc(ctx, url, jobID) + } + + return "", nil +} diff --git a/pkg/controller/flink/container_utils.go b/pkg/controller/flink/container_utils.go index e760b6f9..64b22051 100644 --- a/pkg/controller/flink/container_utils.go +++ b/pkg/controller/flink/container_utils.go @@ -47,7 +47,11 @@ func getFlinkContainerName(containerName string) string { } func getCommonAppLabels(app *v1beta1.FlinkApplication) map[string]string { - return k8.GetAppLabel(app.Name) + labels := common.DuplicateMap(k8.GetAppLabel(app.Name)) + if v1beta1.IsBlueGreenDeploymentMode(app.Status.DeploymentMode) { + labels[FlinkApplicationVersion] = string(app.Status.UpdatingVersion) + } + return labels } func getCommonAnnotations(app *v1beta1.FlinkApplication) map[string]string { @@ -58,8 +62,8 @@ func getCommonAnnotations(app *v1beta1.FlinkApplication) map[string]string { if app.Spec.RestartNonce != "" { annotations[RestartNonce] = app.Spec.RestartNonce } - if v1beta1.IsBlueGreenDeploymentMode(app.Spec.DeploymentMode) { - annotations[FlinkApplicationVersion] = app.Status.UpdatingVersion + if v1beta1.IsBlueGreenDeploymentMode(app.Status.DeploymentMode) { + annotations[FlinkApplicationVersion] = string(app.Status.UpdatingVersion) } return annotations } @@ -227,14 +231,14 @@ func InjectOperatorCustomizedConfig(deployment *appsv1.Deployment, app *v1beta1. // Injects labels and environment variables required for blue green deploys func GetDeploySpecificEnv(app *v1beta1.FlinkApplication) []v1.EnvVar { - if !v1beta1.IsBlueGreenDeploymentMode(app.Spec.DeploymentMode) { + if !v1beta1.IsBlueGreenDeploymentMode(app.Status.DeploymentMode) { return []v1.EnvVar{} } return []v1.EnvVar{ { Name: FlinkApplicationVersionEnv, - Value: app.Status.UpdatingVersion, + Value: string(app.Status.UpdatingVersion), }, } diff --git a/pkg/controller/flink/flink.go b/pkg/controller/flink/flink.go index e8f36ed6..9bb8418f 100644 --- a/pkg/controller/flink/flink.go +++ b/pkg/controller/flink/flink.go @@ -26,6 +26,8 @@ import ( ) const proxyURL = "http://localhost:%d/api/v1/namespaces/%s/services/%s:8081/proxy" +const proxyVersionURL = "http://localhost:%d/api/v1/namespaces/%s/services/%s-%s:8081/proxy" +const externalVersionURL = "%s-%s" const port = 8081 const indexOffset = 1 @@ -46,10 +48,10 @@ type ControllerInterface interface { CreateCluster(ctx context.Context, application *v1beta1.FlinkApplication) error // Cancels the running/active jobs in the Cluster for the Application after savepoint is created - CancelWithSavepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) + Savepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string, isCancel bool, jobID string) (string, error) // Force cancels the running/active job without taking a savepoint - ForceCancel(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error + ForceCancel(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) error // Starts the Job in the Flink Cluster StartFlinkJob(ctx context.Context, application *v1beta1.FlinkApplication, hash string, @@ -58,7 +60,7 @@ type ControllerInterface interface { // Savepoint creation is asynchronous. // Polls the status of the Savepoint, using the triggerID - GetSavepointStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) + GetSavepointStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) (*client.SavepointResponse, error) // Check if the Flink Kubernetes Cluster is Ready. // Checks if all the pods of task and job managers are ready. @@ -111,6 +113,20 @@ type ControllerInterface interface { // Update clusterStatus on the latest VersionStatuses UpdateLatestClusterStatus(ctx context.Context, app *v1beta1.FlinkApplication, jobStatus v1beta1.FlinkClusterStatus) + + // Update Version and Hash for application + UpdateLatestVersionAndHash(application *v1beta1.FlinkApplication, version v1beta1.FlinkApplicationVersion, hash string) + + // Delete Resources with Hash + DeleteResourcesForAppWithHash(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error + // Delete status for torn down cluster/job + DeleteStatusPostTeardown(ctx context.Context, application *v1beta1.FlinkApplication, hash string) + // Get job given hash + GetJobToDeleteForApplication(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) + // Get hash given the version + GetVersionAndJobIDForHash(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, string, error) + // Get version and hash after teardown is complete + GetVersionAndHashPostTeardown(ctx context.Context, application *v1beta1.FlinkApplication) (v1beta1.FlinkApplicationVersion, string) } func NewController(k8sCluster k8.ClusterInterface, eventRecorder record.EventRecorder, config controllerConfig.RuntimeConfig) ControllerInterface { @@ -160,28 +176,34 @@ func (f *Controller) getURLFromApp(application *v1beta1.FlinkApplication, hash s return fmt.Sprintf("http://%s.%s:%d", service, application.Namespace, port) } -func (f *Controller) getClusterOverviewURL(app *v1beta1.FlinkApplication) string { - externalURL := f.getExternalURLFromApp(app) +func (f *Controller) getClusterOverviewURL(app *v1beta1.FlinkApplication, version string) string { + externalURL := f.getExternalURLFromApp(app, version) if externalURL != "" { return fmt.Sprintf(externalURL + client.WebUIAnchor + client.GetClusterOverviewURL) } return "" } -func (f *Controller) getJobOverviewURL(ctx context.Context, app *v1beta1.FlinkApplication) string { - externalURL := f.getExternalURLFromApp(app) +func (f *Controller) getJobOverviewURL(app *v1beta1.FlinkApplication, version string, jobID string) string { + externalURL := f.getExternalURLFromApp(app, version) if externalURL != "" { - return fmt.Sprintf(externalURL+client.WebUIAnchor+client.GetJobsOverviewURL, f.GetLatestJobID(ctx, app)) + return fmt.Sprintf(externalURL+client.WebUIAnchor+client.GetJobsOverviewURL, jobID) } return "" } -func (f *Controller) getExternalURLFromApp(application *v1beta1.FlinkApplication) string { +func (f *Controller) getExternalURLFromApp(application *v1beta1.FlinkApplication, version string) string { cfg := controllerConfig.GetConfig() // Local environment if cfg.UseProxy { + if version != "" { + return fmt.Sprintf(proxyVersionURL, cfg.ProxyPort.Port, application.Namespace, application.Name, version) + } return fmt.Sprintf(proxyURL, cfg.ProxyPort.Port, application.Namespace, application.Name) } + if version != "" { + return GetFlinkUIIngressURL(fmt.Sprintf(externalVersionURL, application.Name, version)) + } return GetFlinkUIIngressURL(application.Name) } @@ -235,29 +257,14 @@ func (f *Controller) GetJobForApplication(ctx context.Context, application *v1be return jobResponse, nil } -// The operator for now assumes and is intended to run single application per Flink Cluster. -// Once we move to run multiple applications, this has to be removed/updated -func (f *Controller) getJobIDForApplication(ctx context.Context, application *v1beta1.FlinkApplication) (string, error) { - if f.GetLatestJobID(ctx, application) != "" { - return f.GetLatestJobID(ctx, application), nil - } - - return "", errors.New("active job id not available") -} - -func (f *Controller) CancelWithSavepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) { - jobID, err := f.getJobIDForApplication(ctx, application) - if err != nil { - return "", err +func (f *Controller) Savepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string, isCancel bool, jobID string) (string, error) { + if isCancel { + return f.flinkClient.CancelJobWithSavepoint(ctx, f.getURLFromApp(application, hash), jobID) } - return f.flinkClient.CancelJobWithSavepoint(ctx, f.getURLFromApp(application, hash), jobID) + return f.flinkClient.SavepointJob(ctx, f.getURLFromApp(application, hash), jobID) } -func (f *Controller) ForceCancel(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error { - jobID, err := f.getJobIDForApplication(ctx, application) - if err != nil { - return err - } +func (f *Controller) ForceCancel(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) error { return f.flinkClient.ForceCancelJob(ctx, f.getURLFromApp(application, hash), jobID) } @@ -311,11 +318,7 @@ func (f *Controller) StartFlinkJob(ctx context.Context, application *v1beta1.Fli return response.JobID, nil } -func (f *Controller) GetSavepointStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) { - jobID, err := f.getJobIDForApplication(ctx, application) - if err != nil { - return nil, err - } +func (f *Controller) GetSavepointStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) (*client.SavepointResponse, error) { return f.flinkClient.CheckSavepointStatus(ctx, f.getURLFromApp(application, hash), jobID, application.Status.SavepointTriggerID) } @@ -380,9 +383,10 @@ func listToFlinkDeployment(ds []v1.Deployment, hash string) *common.FlinkDeploym func getCurrentHash(app *v1beta1.FlinkApplication) string { appHash := HashForApplication(app) - if appHash == app.Status.FailedDeployHash { + if appHash == app.Status.FailedDeployHash || appHash == app.Status.TeardownHash { return app.Status.DeployHash } + return appHash } @@ -400,7 +404,7 @@ func (f *Controller) GetCurrentDeploymentsForApp(ctx context.Context, applicatio } cur := listToFlinkDeployment(deployments.Items, curHash) - if cur != nil && application.Status.FailedDeployHash == "" && + if cur != nil && application.Status.FailedDeployHash == "" && application.Status.TeardownHash == "" && (!f.deploymentMatches(ctx, cur.Jobmanager, application, curHash) || !f.deploymentMatches(ctx, cur.Taskmanager, application, curHash)) { // we had a hash collision (i.e., the previous application has the same hash as the new one) // this is *very* unlikely to occur (1/2^32) @@ -499,14 +503,20 @@ func isCheckpointOldToRecover(checkpointTime int64, maxCheckpointRecoveryAgeSec } func (f *Controller) LogEvent(ctx context.Context, app *v1beta1.FlinkApplication, eventType string, reason string, message string) { + // Augment message with version for blue-green deployments + if v1beta1.IsBlueGreenDeploymentMode(app.Status.DeploymentMode) { + version := app.Status.UpdatingVersion + message = fmt.Sprintf("%s for version %s", message, version) + } + f.eventRecorder.Event(app, eventType, reason, message) logger.Infof(ctx, "Logged %s event: %s: %s", eventType, reason, message) } // Gets and updates the cluster status func (f *Controller) CompareAndUpdateClusterStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) { - if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { - return f.compareAndUpdateBlueGreenClusterStatus(ctx, application, hash) + if v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { + return f.compareAndUpdateBlueGreenClusterStatus(ctx, application) } // Error retrieving cluster / taskmanagers overview (after startup/readiness) --> Red // If there is an error this loop will return with Health set to Red @@ -518,7 +528,7 @@ func (f *Controller) CompareAndUpdateClusterStatus(ctx context.Context, applicat return false, err } - application.Status.ClusterStatus.ClusterOverviewURL = f.getClusterOverviewURL(application) + application.Status.ClusterStatus.ClusterOverviewURL = f.getClusterOverviewURL(application, "") application.Status.ClusterStatus.NumberOfTaskManagers = deployment.Taskmanager.Status.AvailableReplicas // Get Cluster overview response, err := f.flinkClient.GetClusterOverview(ctx, f.getURLFromApp(application, hash)) @@ -562,8 +572,8 @@ func getHealthyTaskManagerCount(response *client.TaskManagersResponse) int32 { } func (f *Controller) CompareAndUpdateJobStatus(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (bool, error) { - if v1beta1.IsBlueGreenDeploymentMode(app.Spec.DeploymentMode) { - return f.compareAndUpdateBlueGreenJobStatus(ctx, app, hash) + if v1beta1.IsBlueGreenDeploymentMode(app.Status.DeploymentMode) { + return f.compareAndUpdateBlueGreenJobStatus(ctx, app) } if app.Status.JobStatus.LastFailingTime == nil { initTime := metav1.NewTime(time.Time{}) @@ -582,7 +592,7 @@ func (f *Controller) CompareAndUpdateJobStatus(ctx context.Context, app *v1beta1 } // Job status - app.Status.JobStatus.JobOverviewURL = f.getJobOverviewURL(ctx, app) + app.Status.JobStatus.JobOverviewURL = f.getJobOverviewURL(app, "", app.Status.JobStatus.JobID) app.Status.JobStatus.State = v1beta1.JobState(jobResponse.State) jobStartTime := metav1.NewTime(time.Unix(jobResponse.StartTime/1000, 0)) app.Status.JobStatus.StartTime = &jobStartTime @@ -666,7 +676,7 @@ func getCurrentStatusIndex(app *v1beta1.FlinkApplication) int32 { return 1 } - // activeJobs and maxRunningJobs would be different once a Teardown has happened and + // activeJobs and maxRunningJobs would be different once a TearDownVersionHash has happened and // the app has moved back to a Running state. activeJobs := int32(len(app.Status.VersionStatuses)) maxRunningJobs := v1beta1.GetMaxRunningJobs(app.Spec.DeploymentMode) @@ -682,14 +692,14 @@ func Min(x, y int32) int32 { } func (f *Controller) GetLatestClusterStatus(ctx context.Context, application *v1beta1.FlinkApplication) v1beta1.FlinkClusterStatus { - if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + if v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { return application.Status.VersionStatuses[getCurrentStatusIndex(application)].ClusterStatus } return application.Status.ClusterStatus } func (f *Controller) GetLatestJobStatus(ctx context.Context, application *v1beta1.FlinkApplication) v1beta1.FlinkJobStatus { - if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + if v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { return application.Status.VersionStatuses[getCurrentStatusIndex(application)].JobStatus } return application.Status.JobStatus @@ -697,7 +707,7 @@ func (f *Controller) GetLatestJobStatus(ctx context.Context, application *v1beta } func (f *Controller) UpdateLatestJobStatus(ctx context.Context, app *v1beta1.FlinkApplication, jobStatus v1beta1.FlinkJobStatus) { - if v1beta1.IsBlueGreenDeploymentMode(app.Spec.DeploymentMode) { + if v1beta1.IsBlueGreenDeploymentMode(app.Status.DeploymentMode) { app.Status.VersionStatuses[getCurrentStatusIndex(app)].JobStatus = jobStatus return } @@ -705,7 +715,7 @@ func (f *Controller) UpdateLatestJobStatus(ctx context.Context, app *v1beta1.Fli } func (f *Controller) UpdateLatestClusterStatus(ctx context.Context, app *v1beta1.FlinkApplication, clusterStatus v1beta1.FlinkClusterStatus) { - if v1beta1.IsBlueGreenDeploymentMode(app.Spec.DeploymentMode) { + if v1beta1.IsBlueGreenDeploymentMode(app.Status.DeploymentMode) { app.Status.VersionStatuses[getCurrentStatusIndex(app)].ClusterStatus = clusterStatus return } @@ -713,26 +723,26 @@ func (f *Controller) UpdateLatestClusterStatus(ctx context.Context, app *v1beta1 } func (f *Controller) GetLatestJobID(ctx context.Context, application *v1beta1.FlinkApplication) string { - if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + if v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { return application.Status.VersionStatuses[getCurrentStatusIndex(application)].JobStatus.JobID } return application.Status.JobStatus.JobID } func (f *Controller) UpdateLatestJobID(ctx context.Context, app *v1beta1.FlinkApplication, jobID string) { - if v1beta1.IsBlueGreenDeploymentMode(app.Spec.DeploymentMode) { + if v1beta1.IsBlueGreenDeploymentMode(app.Status.DeploymentMode) { app.Status.VersionStatuses[getCurrentStatusIndex(app)].JobStatus.JobID = jobID } app.Status.JobStatus.JobID = jobID } -func (f *Controller) compareAndUpdateBlueGreenClusterStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) { +func (f *Controller) compareAndUpdateBlueGreenClusterStatus(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) { isEqual := false for currIndex := range application.Status.VersionStatuses { if application.Status.VersionStatuses[currIndex].VersionHash == "" { continue } - + hash := application.Status.VersionStatuses[currIndex].VersionHash oldClusterStatus := application.Status.VersionStatuses[currIndex].ClusterStatus application.Status.VersionStatuses[currIndex].ClusterStatus.Health = v1beta1.Red @@ -741,7 +751,8 @@ func (f *Controller) compareAndUpdateBlueGreenClusterStatus(ctx context.Context, return false, err } - application.Status.VersionStatuses[currIndex].ClusterStatus.ClusterOverviewURL = f.getClusterOverviewURL(application) + version := string(application.Status.VersionStatuses[currIndex].Version) + application.Status.VersionStatuses[currIndex].ClusterStatus.ClusterOverviewURL = f.getClusterOverviewURL(application, version) application.Status.VersionStatuses[currIndex].ClusterStatus.NumberOfTaskManagers = deployment.Taskmanager.Status.AvailableReplicas // Get Cluster overview response, err := f.flinkClient.GetClusterOverview(ctx, f.getURLFromApp(application, hash)) @@ -773,14 +784,14 @@ func (f *Controller) compareAndUpdateBlueGreenClusterStatus(ctx context.Context, return isEqual, nil } -func (f *Controller) compareAndUpdateBlueGreenJobStatus(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (bool, error) { +func (f *Controller) compareAndUpdateBlueGreenJobStatus(ctx context.Context, app *v1beta1.FlinkApplication) (bool, error) { isEqual := false var err error for statusIndex := range app.Status.VersionStatuses { if app.Status.VersionStatuses[statusIndex].JobStatus.JobID == "" { continue } - + hash := app.Status.VersionStatuses[statusIndex].VersionHash if app.Status.VersionStatuses[statusIndex].JobStatus.LastFailingTime == nil { initTime := metav1.NewTime(time.Time{}) app.Status.VersionStatuses[statusIndex].JobStatus.LastFailingTime = &initTime @@ -797,7 +808,8 @@ func (f *Controller) compareAndUpdateBlueGreenJobStatus(ctx context.Context, app } // Job status - app.Status.VersionStatuses[statusIndex].JobStatus.JobOverviewURL = f.getJobOverviewURL(ctx, app) + version := string(app.Status.VersionStatuses[statusIndex].Version) + app.Status.VersionStatuses[statusIndex].JobStatus.JobOverviewURL = f.getJobOverviewURL(app, version, app.Status.VersionStatuses[statusIndex].JobStatus.JobID) app.Status.VersionStatuses[statusIndex].JobStatus.State = v1beta1.JobState(jobResponse.State) jobStartTime := metav1.NewTime(time.Unix(jobResponse.StartTime/1000, 0)) app.Status.VersionStatuses[statusIndex].JobStatus.StartTime = &jobStartTime @@ -866,3 +878,110 @@ func (f *Controller) compareAndUpdateBlueGreenJobStatus(ctx context.Context, app } return isEqual, err } + +func (f *Controller) UpdateLatestVersionAndHash(application *v1beta1.FlinkApplication, version v1beta1.FlinkApplicationVersion, hash string) { + currIndex := getCurrentStatusIndex(application) + application.Status.VersionStatuses[currIndex].Version = version + application.Status.VersionStatuses[currIndex].VersionHash = hash + application.Status.UpdatingHash = hash +} + +func (f *Controller) DeleteResourcesForAppWithHash(ctx context.Context, app *v1beta1.FlinkApplication, hash string) error { + appLabel := k8.GetAppLabel(app.Name) + deployments, err := f.k8Cluster.GetDeploymentsWithLabel(ctx, app.Namespace, appLabel) + if err != nil { + return err + } + + oldObjects := make([]metav1.Object, 0) + + for _, d := range deployments.Items { + if d.Labels[FlinkAppHash] == hash && + // verify that this deployment matches the jobmanager or taskmanager naming format + (d.Name == fmt.Sprintf(JobManagerVersionNameFormat, app.Name, d.Labels[FlinkAppHash], d.Labels[FlinkApplicationVersion]) || + d.Name == fmt.Sprintf(TaskManagerVersionNameFormat, app.Name, d.Labels[FlinkAppHash], d.Labels[FlinkApplicationVersion])) { + oldObjects = append(oldObjects, d.DeepCopy()) + } + } + + services, err := f.k8Cluster.GetServicesWithLabel(ctx, app.Namespace, appLabel) + + if err != nil { + return err + } + + for _, d := range services.Items { + if d.Labels[FlinkAppHash] == hash || d.Spec.Selector[FlinkAppHash] == hash { + oldObjects = append(oldObjects, d.DeepCopy()) + } + } + + deletedHashes := make(map[string]bool) + + for _, resource := range oldObjects { + err := f.k8Cluster.DeleteK8Object(ctx, resource.(runtime.Object)) + if err != nil { + f.metrics.deleteResourceFailedCounter.Inc(ctx) + return err + } + f.metrics.deleteResourceSuccessCounter.Inc(ctx) + deletedHashes[resource.GetLabels()[FlinkAppHash]] = true + } + + for k := range deletedHashes { + f.LogEvent(ctx, app, corev1.EventTypeNormal, "ToreDownCluster", + fmt.Sprintf("Deleted old cluster with hash %s", k)) + } + return nil +} + +func (f *Controller) DeleteStatusPostTeardown(ctx context.Context, application *v1beta1.FlinkApplication, hash string) { + var indexToDelete int + for index, status := range application.Status.VersionStatuses { + if status.VersionHash == hash { + indexToDelete = index + } + } + application.Status.VersionStatuses[0] = application.Status.VersionStatuses[indexOffset-indexToDelete] + application.Status.VersionStatuses[1] = v1beta1.FlinkApplicationVersionStatus{} +} + +func (f *Controller) GetJobToDeleteForApplication(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) { + jobID := "" + for _, status := range app.Status.VersionStatuses { + if status.VersionHash == hash { + jobID = status.JobStatus.JobID + } + } + if jobID == "" { + return nil, nil + } + + jobResponse, err := f.flinkClient.GetJobOverview(ctx, f.getURLFromApp(app, hash), jobID) + if err != nil { + return nil, err + } + + return jobResponse, nil +} + +func (f *Controller) GetVersionAndJobIDForHash(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (string, string, error) { + version := "" + jobID := "" + for _, status := range app.Status.VersionStatuses { + if status.VersionHash == hash { + version = string(status.Version) + jobID = status.JobStatus.JobID + } + } + if hash == "" || jobID == "" { + return "", "", errors.New("could not find jobID and hash for application") + } + + return version, jobID, nil +} + +func (f *Controller) GetVersionAndHashPostTeardown(ctx context.Context, application *v1beta1.FlinkApplication) (v1beta1.FlinkApplicationVersion, string) { + versionStatus := application.Status.VersionStatuses[0] + return versionStatus.Version, versionStatus.VersionHash +} diff --git a/pkg/controller/flink/flink_test.go b/pkg/controller/flink/flink_test.go index 57e4c575..09b1dfc7 100644 --- a/pkg/controller/flink/flink_test.go +++ b/pkg/controller/flink/flink_test.go @@ -250,7 +250,7 @@ func TestFlinkGetSavepointStatus(t *testing.T) { }, }, nil } - status, err := flinkControllerForTest.GetSavepointStatus(context.Background(), &flinkApp, "hash") + status, err := flinkControllerForTest.GetSavepointStatus(context.Background(), &flinkApp, "hash", testJobID) assert.Nil(t, err) assert.NotNil(t, status) @@ -267,7 +267,7 @@ func TestFlinkGetSavepointStatusErr(t *testing.T) { assert.Equal(t, jobID, testJobID) return nil, errors.New("Savepoint error") } - status, err := flinkControllerForTest.GetSavepointStatus(context.Background(), &flinkApp, "hash") + status, err := flinkControllerForTest.GetSavepointStatus(context.Background(), &flinkApp, "hash", testJobID) assert.Nil(t, status) assert.NotNil(t, err) @@ -518,12 +518,12 @@ func TestCancelWithSavepoint(t *testing.T) { assert.Equal(t, jobID, testJobID) return "t1", nil } - triggerID, err := flinkControllerForTest.CancelWithSavepoint(context.Background(), &flinkApp, "hash") + triggerID, err := flinkControllerForTest.Savepoint(context.Background(), &flinkApp, "hash", true, testJobID) assert.Nil(t, err) assert.Equal(t, triggerID, "t1") } -func TestCancelWithSavepointErr(t *testing.T) { +func TestSavepointErr(t *testing.T) { flinkControllerForTest := getTestFlinkController() flinkApp := getFlinkTestApp() @@ -531,7 +531,7 @@ func TestCancelWithSavepointErr(t *testing.T) { mockJmClient.CancelJobWithSavepointFunc = func(ctx context.Context, url string, jobID string) (string, error) { return "", errors.New("cancel error") } - triggerID, err := flinkControllerForTest.CancelWithSavepoint(context.Background(), &flinkApp, "hash") + triggerID, err := flinkControllerForTest.Savepoint(context.Background(), &flinkApp, "hash", true, testJobID) assert.EqualError(t, err, "cancel error") assert.Empty(t, triggerID) } @@ -949,3 +949,166 @@ func TestMaxCheckpointRestoreAge(t *testing.T) { // Test valid checkpoint that can be recovered. Recovery age is 10 minutes assert.False(t, isCheckpointOldToRecover(time.Now().Unix()-100, 600)) } + +func TestGetCurrentStatusIndex(t *testing.T) { + app := getFlinkTestApp() + // Dual deployment should always return 0 + app.Status.Phase = v1beta1.FlinkApplicationRunning + assert.Equal(t, int32(0), getCurrentStatusIndex(&app)) + app.Status.Phase = v1beta1.FlinkApplicationUpdating + assert.Equal(t, int32(0), getCurrentStatusIndex(&app)) + app.Status.Phase = v1beta1.FlinkApplicationClusterStarting + assert.Equal(t, int32(0), getCurrentStatusIndex(&app)) + app.Status.Phase = v1beta1.FlinkApplicationSavepointing + assert.Equal(t, int32(0), getCurrentStatusIndex(&app)) + app.Status.Phase = v1beta1.FlinkApplicationRecovering + assert.Equal(t, int32(0), getCurrentStatusIndex(&app)) + app.Status.Phase = v1beta1.FlinkApplicationRollingBackJob + assert.Equal(t, int32(0), getCurrentStatusIndex(&app)) + app.Status.Phase = v1beta1.FlinkApplicationSubmittingJob + assert.Equal(t, int32(0), getCurrentStatusIndex(&app)) + + // Tests for bluegreen deployment mode + app.Spec.DeploymentMode = v1beta1.DeploymentModeBlueGreen + + // First deploy should always return 0 + app.Status.Phase = v1beta1.FlinkApplicationRunning + assert.Equal(t, int32(0), getCurrentStatusIndex(&app)) + app.Status.Phase = v1beta1.FlinkApplicationUpdating + assert.Equal(t, int32(0), getCurrentStatusIndex(&app)) + app.Status.Phase = v1beta1.FlinkApplicationClusterStarting + assert.Equal(t, int32(0), getCurrentStatusIndex(&app)) + app.Status.Phase = v1beta1.FlinkApplicationSavepointing + assert.Equal(t, int32(0), getCurrentStatusIndex(&app)) + app.Status.Phase = v1beta1.FlinkApplicationRecovering + assert.Equal(t, int32(0), getCurrentStatusIndex(&app)) + app.Status.Phase = v1beta1.FlinkApplicationRollingBackJob + assert.Equal(t, int32(0), getCurrentStatusIndex(&app)) + app.Status.Phase = v1beta1.FlinkApplicationSubmittingJob + assert.Equal(t, int32(0), getCurrentStatusIndex(&app)) + + // Subsequent deploys return 0 when in Running or Savepointing phase + app.Status.DeployHash = "hash" + statuses := make([]v1beta1.FlinkApplicationVersionStatus, 2) + app.Status.VersionStatuses = statuses + app.Status.Phase = v1beta1.FlinkApplicationSavepointing + assert.Equal(t, int32(0), getCurrentStatusIndex(&app)) + app.Status.Phase = v1beta1.FlinkApplicationRunning + assert.Equal(t, int32(0), getCurrentStatusIndex(&app)) + // Else return 1 + app.Status.Phase = v1beta1.FlinkApplicationUpdating + assert.Equal(t, int32(1), getCurrentStatusIndex(&app)) + app.Status.Phase = v1beta1.FlinkApplicationClusterStarting + assert.Equal(t, int32(1), getCurrentStatusIndex(&app)) + app.Status.Phase = v1beta1.FlinkApplicationRecovering + assert.Equal(t, int32(1), getCurrentStatusIndex(&app)) + app.Status.Phase = v1beta1.FlinkApplicationRollingBackJob + assert.Equal(t, int32(1), getCurrentStatusIndex(&app)) + app.Status.Phase = v1beta1.FlinkApplicationSubmittingJob + assert.Equal(t, int32(1), getCurrentStatusIndex(&app)) + app.Status.Phase = v1beta1.FlinkApplicationDualRunning + assert.Equal(t, int32(1), getCurrentStatusIndex(&app)) + + // Once teardown has happened and the one of the two apps have been deleted + app.Status.VersionStatuses = make([]v1beta1.FlinkApplicationVersionStatus, 1) + app.Status.Phase = v1beta1.FlinkApplicationRunning + assert.Equal(t, int32(0), getCurrentStatusIndex(&app)) + +} + +func TestDeleteStatusPostTeardown(t *testing.T) { + controller := getTestFlinkController() + app := getFlinkTestApp() + app.Status.VersionStatuses = []v1beta1.FlinkApplicationVersionStatus{ + { + Version: v1beta1.BlueFlinkApplication, + VersionHash: "blue-hash", + ClusterStatus: v1beta1.FlinkClusterStatus{ + ClusterOverviewURL: "blue-overview", + }, + JobStatus: v1beta1.FlinkJobStatus{ + JobID: "blue-job-id", + }, + }, + { + Version: v1beta1.GreenFlinkApplication, + VersionHash: "green-hash", + ClusterStatus: v1beta1.FlinkClusterStatus{ + ClusterOverviewURL: "green-overview", + }, + JobStatus: v1beta1.FlinkJobStatus{ + JobID: "green-job-id", + }, + }, + } + expectedStatus := app.Status.VersionStatuses[1] + controller.DeleteStatusPostTeardown(context.Background(), &app, "blue-hash") + assert.Equal(t, expectedStatus, app.Status.VersionStatuses[0]) + assert.Empty(t, app.Status.VersionStatuses[1]) +} + +func TestDeleteResourcesForAppWithHash(t *testing.T) { + flinkControllerForTest := getTestFlinkController() + app := getFlinkTestApp() + app.Spec.DeploymentMode = v1beta1.DeploymentModeBlueGreen + app.Status.DeploymentMode = v1beta1.DeploymentModeBlueGreen + app.Status.UpdatingVersion = testVersion + jmDeployment := FetchTaskMangerDeploymentCreateObj(&app, "oldhash") + tmDeployment := FetchJobMangerDeploymentCreateObj(&app, "oldhash") + service := FetchJobManagerServiceCreateObj(&app, "oldhash") + service.Labels[FlinkAppHash] = "oldhash" + service.Name = VersionedJobManagerServiceName(&app, "oldhash") + genericService := FetchJobManagerServiceCreateObj(&app, "oldhash") + genericService.Name = app.Name + + mockK8Cluster := flinkControllerForTest.k8Cluster.(*k8mock.K8Cluster) + + mockK8Cluster.GetDeploymentsWithLabelFunc = func(ctx context.Context, namespace string, labelMap map[string]string) (*v1.DeploymentList, error) { + curJobmanager := FetchJobMangerDeploymentCreateObj(&app, testAppHash) + curTaskmanager := FetchTaskMangerDeploymentCreateObj(&app, testAppHash) + return &v1.DeploymentList{ + Items: []v1.Deployment{ + *jmDeployment, + *tmDeployment, + *curJobmanager, + *curTaskmanager, + }, + }, nil + } + + mockK8Cluster.GetServicesWithLabelFunc = func(ctx context.Context, namespace string, labelMap map[string]string) (*corev1.ServiceList, error) { + curService := FetchJobManagerServiceCreateObj(&app, testAppHash) + curService.Labels[FlinkAppHash] = testAppHash + curService.Name = VersionedJobManagerServiceName(&app, testAppHash) + + generic := FetchJobManagerServiceCreateObj(&app, testAppHash) + return &corev1.ServiceList{ + Items: []corev1.Service{ + *service, + *curService, + *generic, + }, + }, nil + } + + ctr := 0 + mockK8Cluster.DeleteK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + ctr++ + switch ctr { + case 1: + assert.Equal(t, jmDeployment, object) + case 2: + assert.Equal(t, tmDeployment, object) + case 3: + assert.Equal(t, service, object) + case 4: + assert.Equal(t, genericService, object) + + } + return nil + } + + err := flinkControllerForTest.DeleteResourcesForAppWithHash(context.Background(), &app, "oldhash") + assert.Equal(t, 3, ctr) + assert.Nil(t, err) +} diff --git a/pkg/controller/flink/ingress.go b/pkg/controller/flink/ingress.go index e45e4614..ccb5c0c1 100644 --- a/pkg/controller/flink/ingress.go +++ b/pkg/controller/flink/ingress.go @@ -1,6 +1,7 @@ package flink import ( + "fmt" "regexp" flinkapp "github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1" @@ -12,6 +13,8 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" ) +const AppIngressName = "%s-%s" + var inputRegex = regexp.MustCompile(`{{[$]jobCluster}}`) func ReplaceJobURL(value string, input string) string { @@ -27,7 +30,7 @@ func FetchJobManagerIngressCreateObj(app *flinkapp.FlinkApplication) *v1beta1.In podLabels = common.CopyMap(podLabels, k8.GetAppLabel(app.Name)) ingressMeta := v1.ObjectMeta{ - Name: app.Name, + Name: getJobManagerServiceName(app), Labels: podLabels, Namespace: app.Namespace, OwnerReferences: []v1.OwnerReference{ @@ -36,7 +39,7 @@ func FetchJobManagerIngressCreateObj(app *flinkapp.FlinkApplication) *v1beta1.In } backend := v1beta1.IngressBackend{ - ServiceName: app.Name, + ServiceName: getJobManagerServiceName(app), ServicePort: intstr.IntOrString{ Type: intstr.Int, IntVal: getUIPort(app), @@ -45,7 +48,7 @@ func FetchJobManagerIngressCreateObj(app *flinkapp.FlinkApplication) *v1beta1.In ingressSpec := v1beta1.IngressSpec{ Rules: []v1beta1.IngressRule{{ - Host: GetFlinkUIIngressURL(app.Name), + Host: GetFlinkUIIngressURL(getIngressName(app)), IngressRuleValue: v1beta1.IngressRuleValue{ HTTP: &v1beta1.HTTPIngressRuleValue{ Paths: []v1beta1.HTTPIngressPath{{ @@ -65,3 +68,10 @@ func FetchJobManagerIngressCreateObj(app *flinkapp.FlinkApplication) *v1beta1.In } } + +func getIngressName(app *flinkapp.FlinkApplication) string { + if flinkapp.IsBlueGreenDeploymentMode(app.Spec.DeploymentMode) { + return fmt.Sprintf(AppIngressName, app.Name, string(app.Status.UpdatingVersion)) + } + return app.Name +} diff --git a/pkg/controller/flink/ingress_test.go b/pkg/controller/flink/ingress_test.go index 3272dfcc..95f1c42f 100644 --- a/pkg/controller/flink/ingress_test.go +++ b/pkg/controller/flink/ingress_test.go @@ -3,6 +3,8 @@ package flink import ( "testing" + "github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1" + config2 "github.com/lyft/flinkk8soperator/pkg/controller/config" "github.com/stretchr/testify/assert" ) @@ -25,3 +27,16 @@ func TestGetFlinkUIIngressURL(t *testing.T) { "ABC.lyft.xyz", GetFlinkUIIngressURL("ABC")) } + +func TestGetFlinkUIIngressURLBlueGreenDeployment(t *testing.T) { + err := initTestConfigForIngress() + assert.Nil(t, err) + app := v1beta1.FlinkApplication{} + app.Spec.DeploymentMode = v1beta1.DeploymentModeBlueGreen + app.Name = "ABC" + app.Status.UpdatingVersion = v1beta1.GreenFlinkApplication + assert.Equal(t, "ABC-green", getIngressName(&app)) + assert.Equal(t, + "ABC-green.lyft.xyz", + GetFlinkUIIngressURL(getIngressName(&app))) +} diff --git a/pkg/controller/flink/job_manager_controller.go b/pkg/controller/flink/job_manager_controller.go index 279c0a44..9f41a935 100644 --- a/pkg/controller/flink/job_manager_controller.go +++ b/pkg/controller/flink/job_manager_controller.go @@ -21,8 +21,10 @@ import ( const ( JobManagerNameFormat = "%s-%s-jm" + JobManagerVersionNameFormat = "%s-%s-%s-jm" JobManagerPodNameFormat = "%s-%s-jm-pod" - JobManagerVersionPodNameFormat = "%s-%s-jm-%s-pod" + JobManagerServiceName = "%s" + JobManagerVersionServiceName = "%s-%s" JobManagerContainerName = "jobmanager" JobManagerArg = "jobmanager" JobManagerReadinessPath = "/overview" @@ -170,20 +172,20 @@ var JobManagerDefaultResources = coreV1.ResourceRequirements{ func getJobManagerPodName(application *v1beta1.FlinkApplication, hash string) string { applicationName := application.Name - if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { - applicationVersion := application.Status.UpdatingVersion - return fmt.Sprintf(JobManagerVersionPodNameFormat, applicationName, hash, applicationVersion) - } return fmt.Sprintf(JobManagerPodNameFormat, applicationName, hash) } func getJobManagerName(application *v1beta1.FlinkApplication, hash string) string { applicationName := application.Name + if v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { + applicationVersion := application.Status.UpdatingVersion + return fmt.Sprintf(JobManagerVersionNameFormat, applicationName, hash, applicationVersion) + } return fmt.Sprintf(JobManagerNameFormat, applicationName, hash) } func FetchJobManagerServiceCreateObj(app *v1beta1.FlinkApplication, hash string) *coreV1.Service { - jmServiceName := app.Name + jmServiceName := getJobManagerServiceName(app) serviceLabels := getCommonAppLabels(app) serviceLabels[FlinkAppHash] = hash serviceLabels[FlinkDeploymentType] = FlinkDeploymentTypeJobmanager @@ -208,6 +210,15 @@ func FetchJobManagerServiceCreateObj(app *v1beta1.FlinkApplication, hash string) } } +func getJobManagerServiceName(app *v1beta1.FlinkApplication) string { + serviceName := app.Name + versionName := app.Status.UpdatingVersion + if v1beta1.IsBlueGreenDeploymentMode(app.Status.DeploymentMode) { + return fmt.Sprintf(JobManagerVersionServiceName, serviceName, versionName) + } + return serviceName +} + func getJobManagerServicePorts(app *v1beta1.FlinkApplication) []coreV1.ServicePort { ports := getJobManagerPorts(app) servicePorts := make([]coreV1.ServicePort, 0, len(ports)) diff --git a/pkg/controller/flink/job_manager_controller_test.go b/pkg/controller/flink/job_manager_controller_test.go index fe5376ae..c33b9ad5 100644 --- a/pkg/controller/flink/job_manager_controller_test.go +++ b/pkg/controller/flink/job_manager_controller_test.go @@ -3,7 +3,7 @@ package flink import ( "testing" - flinkapp "github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1" + v1beta12 "github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1" "github.com/lyft/flinkk8soperator/pkg/controller/config" @@ -44,11 +44,12 @@ func TestGetJobManagerPodName(t *testing.T) { assert.Equal(t, "app-name-"+testAppHash+"-jm-pod", getJobManagerPodName(&app, testAppHash)) } -func TestGetJobManagerPodNameWithVersion(t *testing.T) { +func TestGetJobManagerNameWithVersion(t *testing.T) { app := getFlinkTestApp() - app.Spec.DeploymentMode = flinkapp.DeploymentModeBlueGreen + app.Spec.DeploymentMode = v1beta12.DeploymentModeBlueGreen + app.Status.DeploymentMode = v1beta12.DeploymentModeBlueGreen app.Status.UpdatingVersion = testVersion - assert.Equal(t, "app-name-"+testAppHash+"-jm-"+testVersion+"-pod", getJobManagerPodName(&app, testAppHash)) + assert.Equal(t, "app-name-"+testAppHash+"-"+testVersion+"-jm", getJobManagerName(&app, testAppHash)) } func TestJobManagerCreateSuccess(t *testing.T) { @@ -313,7 +314,8 @@ func TestJobManagerCreateSuccessWithVersion(t *testing.T) { app.Spec.JarName = testJarName app.Spec.EntryClass = testEntryClass app.Spec.ProgramArgs = testProgramArgs - app.Spec.DeploymentMode = flinkapp.DeploymentModeBlueGreen + app.Spec.DeploymentMode = v1beta12.DeploymentModeBlueGreen + app.Status.DeploymentMode = v1beta12.DeploymentModeBlueGreen app.Status.UpdatingVersion = testVersion annotations := map[string]string{ "key": "annotation", @@ -321,11 +323,12 @@ func TestJobManagerCreateSuccessWithVersion(t *testing.T) { "flink-job-properties": "jarName: " + testJarName + "\nparallelism: 8\nentryClass:" + testEntryClass + "\nprogramArgs:\"" + testProgramArgs + "\"", } app.Annotations = annotations - hash := "f0bd1679" + hash := "5cb5943e" expectedLabels := map[string]string{ - "flink-app": "app-name", - "flink-app-hash": hash, - "flink-deployment-type": "jobmanager", + "flink-app": "app-name", + "flink-app-hash": hash, + "flink-deployment-type": "jobmanager", + "flink-application-version": testVersion, } ctr := 0 mockK8Cluster := testController.k8Cluster.(*k8mock.K8Cluster) @@ -358,21 +361,21 @@ func TestJobManagerCreateSuccessWithVersion(t *testing.T) { "FLINK_APPLICATION_VERSION").Value) case 2: service := object.(*coreV1.Service) - assert.Equal(t, app.Name, service.Name) + assert.Equal(t, app.Name+"-"+testVersion, service.Name) assert.Equal(t, app.Namespace, service.Namespace) - assert.Equal(t, map[string]string{"flink-app": "app-name", "flink-app-hash": hash, "flink-deployment-type": "jobmanager"}, service.Spec.Selector) + assert.Equal(t, map[string]string{"flink-app": "app-name", "flink-app-hash": hash, "flink-deployment-type": "jobmanager", "flink-application-version": testVersion}, service.Spec.Selector) case 3: service := object.(*coreV1.Service) assert.Equal(t, app.Name+"-"+hash, service.Name) assert.Equal(t, "app-name", service.OwnerReferences[0].Name) assert.Equal(t, app.Namespace, service.Namespace) - assert.Equal(t, map[string]string{"flink-app": "app-name", "flink-app-hash": hash, "flink-deployment-type": "jobmanager"}, service.Spec.Selector) + assert.Equal(t, map[string]string{"flink-app": "app-name", "flink-app-hash": hash, "flink-application-version": testVersion, "flink-deployment-type": "jobmanager"}, service.Spec.Selector) case 4: labels := map[string]string{ "flink-app": "app-name", } ingress := object.(*v1beta1.Ingress) - assert.Equal(t, app.Name, ingress.Name) + assert.Equal(t, app.Name+"-"+testVersion, ingress.Name) assert.Equal(t, app.Namespace, ingress.Namespace) assert.Equal(t, labels, ingress.Labels) } diff --git a/pkg/controller/flink/mock/mock_flink.go b/pkg/controller/flink/mock/mock_flink.go index 15b64c91..830bcfa2 100644 --- a/pkg/controller/flink/mock/mock_flink.go +++ b/pkg/controller/flink/mock/mock_flink.go @@ -11,11 +11,11 @@ import ( type CreateClusterFunc func(ctx context.Context, application *v1beta1.FlinkApplication) error type DeleteOldResourcesForApp func(ctx context.Context, application *v1beta1.FlinkApplication) error -type CancelWithSavepointFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) -type ForceCancelFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error +type SavepointFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, isCancel bool, jobID string) (string, error) +type ForceCancelFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) error type StartFlinkJobFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (string, error) -type GetSavepointStatusFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) +type GetSavepointStatusFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) (*client.SavepointResponse, error) type IsClusterReadyFunc func(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) type IsServiceReadyFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) type GetJobsForApplicationFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) ([]client.FlinkJob, error) @@ -30,11 +30,16 @@ type GetLatestJobIDFunc func(ctx context.Context, app *v1beta1.FlinkApplication) type UpdateLatestJobIDFunc func(ctx context.Context, app *v1beta1.FlinkApplication, jobID string) type UpdateLatestJobStatusFunc func(ctx context.Context, app *v1beta1.FlinkApplication, jobStatus v1beta1.FlinkJobStatus) type UpdateLatestClusterStatusFunc func(ctx context.Context, app *v1beta1.FlinkApplication, clusterStatus v1beta1.FlinkClusterStatus) - +type UpdateLatestVersionAndHashFunc func(application *v1beta1.FlinkApplication, version v1beta1.FlinkApplicationVersion, hash string) +type DeleteResourcesForAppWithHashFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error +type DeleteStatusPostTeardownFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) +type GetJobToDeleteForApplicationFunc func(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) +type GetVersionAndJobIDForHashFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, string, error) +type GetVersionAndHashPostTeardownFunc func(ctx context.Context, application *v1beta1.FlinkApplication) (v1beta1.FlinkApplicationVersion, string) type FlinkController struct { CreateClusterFunc CreateClusterFunc DeleteOldResourcesForAppFunc DeleteOldResourcesForApp - CancelWithSavepointFunc CancelWithSavepointFunc + SavepointFunc SavepointFunc ForceCancelFunc ForceCancelFunc StartFlinkJobFunc StartFlinkJobFunc GetSavepointStatusFunc GetSavepointStatusFunc @@ -53,6 +58,12 @@ type FlinkController struct { UpdateLatestJobIDFunc UpdateLatestJobIDFunc UpdateLatestJobStatusFunc UpdateLatestJobStatusFunc UpdateLatestClusterStatusFunc UpdateLatestClusterStatusFunc + UpdateLatestVersionAndHashFunc UpdateLatestVersionAndHashFunc + DeleteResourcesForAppWithHashFunc DeleteResourcesForAppWithHashFunc + DeleteStatusPostTeardownFunc DeleteStatusPostTeardownFunc + GetJobToDeleteForApplicationFunc GetJobToDeleteForApplicationFunc + GetVersionAndJobIDForHashFunc GetVersionAndJobIDForHashFunc + GetVersionAndHashPostTeardownFunc GetVersionAndHashPostTeardownFunc } func (m *FlinkController) GetCurrentDeploymentsForApp(ctx context.Context, application *v1beta1.FlinkApplication) (*common.FlinkDeployment, error) { @@ -76,16 +87,16 @@ func (m *FlinkController) CreateCluster(ctx context.Context, application *v1beta return nil } -func (m *FlinkController) CancelWithSavepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) { - if m.CancelWithSavepointFunc != nil { - return m.CancelWithSavepointFunc(ctx, application, hash) +func (m *FlinkController) Savepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string, isCancel bool, jobID string) (string, error) { + if m.SavepointFunc != nil { + return m.SavepointFunc(ctx, application, hash, isCancel, jobID) } return "", nil } -func (m *FlinkController) ForceCancel(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error { +func (m *FlinkController) ForceCancel(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) error { if m.ForceCancelFunc != nil { - return m.ForceCancelFunc(ctx, application, hash) + return m.ForceCancelFunc(ctx, application, hash, jobID) } return nil } @@ -98,9 +109,9 @@ func (m *FlinkController) StartFlinkJob(ctx context.Context, application *v1beta return "", nil } -func (m *FlinkController) GetSavepointStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) { +func (m *FlinkController) GetSavepointStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) (*client.SavepointResponse, error) { if m.GetSavepointStatusFunc != nil { - return m.GetSavepointStatusFunc(ctx, application, hash) + return m.GetSavepointStatusFunc(ctx, application, hash, jobID) } return nil, nil } @@ -173,27 +184,27 @@ func (m *FlinkController) GetLatestClusterStatus(ctx context.Context, applicatio if m.GetLatestClusterStatusFunc != nil { return m.GetLatestClusterStatusFunc(ctx, application) } - if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + if v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { return application.Status.VersionStatuses[getCurrentStatusIndex(application)].ClusterStatus } return application.Status.ClusterStatus } func (m *FlinkController) GetLatestJobStatus(ctx context.Context, application *v1beta1.FlinkApplication) v1beta1.FlinkJobStatus { - if m.GetLatestClusterStatusFunc != nil { + if m.GetLatestJobStatusFunc != nil { return m.GetLatestJobStatusFunc(ctx, application) } - if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + if v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { return application.Status.VersionStatuses[getCurrentStatusIndex(application)].JobStatus } return application.Status.JobStatus } func (m *FlinkController) GetLatestJobID(ctx context.Context, application *v1beta1.FlinkApplication) string { - if m.GetLatestClusterStatusFunc != nil { + if m.GetLatestJobIDFunc != nil { return m.GetLatestJobIDFunc(ctx, application) } - if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + if v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { return application.Status.VersionStatuses[getCurrentStatusIndex(application)].JobStatus.JobID } return application.Status.JobStatus.JobID @@ -203,7 +214,7 @@ func (m *FlinkController) UpdateLatestJobID(ctx context.Context, application *v1 if m.UpdateLatestJobIDFunc != nil { m.UpdateLatestJobIDFunc(ctx, application, jobID) } - if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + if v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { application.Status.VersionStatuses[getCurrentStatusIndex(application)].JobStatus.JobID = jobID return } @@ -214,7 +225,7 @@ func (m *FlinkController) UpdateLatestJobStatus(ctx context.Context, application if m.UpdateLatestJobStatusFunc != nil { m.UpdateLatestJobStatusFunc(ctx, application, jobStatus) } - if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + if v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { application.Status.VersionStatuses[getCurrentStatusIndex(application)].JobStatus = jobStatus return } @@ -225,13 +236,60 @@ func (m *FlinkController) UpdateLatestClusterStatus(ctx context.Context, applica if m.UpdateLatestClusterStatusFunc != nil { m.UpdateLatestClusterStatusFunc(ctx, application, clusterStatus) } - if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + if v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { application.Status.VersionStatuses[getCurrentStatusIndex(application)].ClusterStatus = clusterStatus return } application.Status.ClusterStatus = clusterStatus } +func (m *FlinkController) UpdateLatestVersionAndHash(application *v1beta1.FlinkApplication, version v1beta1.FlinkApplicationVersion, hash string) { + if m.UpdateLatestVersionAndHashFunc != nil { + m.UpdateLatestVersionAndHashFunc(application, version, hash) + } + currIndex := getCurrentStatusIndex(application) + application.Status.VersionStatuses[currIndex].Version = version + application.Status.VersionStatuses[currIndex].VersionHash = hash + application.Status.UpdatingHash = hash + +} + +func (m *FlinkController) DeleteResourcesForAppWithHash(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error { + if m.DeleteResourcesForAppWithHashFunc != nil { + return m.DeleteResourcesForAppWithHashFunc(ctx, application, hash) + } + return nil +} + +func (m *FlinkController) DeleteStatusPostTeardown(ctx context.Context, application *v1beta1.FlinkApplication, hash string) { + if m.DeleteStatusPostTeardownFunc != nil { + m.DeleteStatusPostTeardownFunc(ctx, application, hash) + } + application.Status.VersionStatuses[0] = application.Status.VersionStatuses[1] + application.Status.VersionStatuses[1] = v1beta1.FlinkApplicationVersionStatus{} +} + +func (m *FlinkController) GetJobToDeleteForApplication(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) { + if m.GetJobToDeleteForApplicationFunc != nil { + return m.GetJobToDeleteForApplicationFunc(ctx, app, hash) + } + return nil, nil +} + +func (m *FlinkController) GetVersionAndJobIDForHash(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, string, error) { + if m.GetVersionAndJobIDForHashFunc != nil { + return m.GetVersionAndJobIDForHashFunc(ctx, application, hash) + } + return "", "", nil +} + +func (m *FlinkController) GetVersionAndHashPostTeardown(ctx context.Context, application *v1beta1.FlinkApplication) (v1beta1.FlinkApplicationVersion, string) { + if m.GetVersionAndHashPostTeardownFunc != nil { + return m.GetVersionAndHashPostTeardownFunc(ctx, application) + } + return application.Status.VersionStatuses[0].Version, application.Status.VersionStatuses[0].VersionHash +} + func getCurrentStatusIndex(app *v1beta1.FlinkApplication) int32 { desiredCount := v1beta1.GetMaxRunningJobs(app.Spec.DeploymentMode) if v1beta1.IsRunningPhase(app.Status.Phase) { diff --git a/pkg/controller/flink/task_manager_controller.go b/pkg/controller/flink/task_manager_controller.go index 3b0d9d30..36f2085f 100644 --- a/pkg/controller/flink/task_manager_controller.go +++ b/pkg/controller/flink/task_manager_controller.go @@ -20,12 +20,12 @@ import ( ) const ( - TaskManagerNameFormat = "%s-%s-tm" - TaskManagerPodNameFormat = "%s-%s-tm-pod" - TaskManagerVersionPodNameFormat = "%s-%s-tm-%s-pod" - TaskManagerContainerName = "taskmanager" - TaskManagerArg = "taskmanager" - TaskManagerHostnameEnvVar = "TASKMANAGER_HOSTNAME" + TaskManagerNameFormat = "%s-%s-tm" + TaskManagerVersionNameFormat = "%s-%s-%s-tm" + TaskManagerPodNameFormat = "%s-%s-tm-pod" + TaskManagerContainerName = "taskmanager" + TaskManagerArg = "taskmanager" + TaskManagerHostnameEnvVar = "TASKMANAGER_HOSTNAME" ) type TaskManagerControllerInterface interface { @@ -143,15 +143,15 @@ func FetchTaskManagerContainerObj(application *v1beta1.FlinkApplication) *coreV1 func getTaskManagerPodName(application *v1beta1.FlinkApplication, hash string) string { applicationName := application.Name - if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { - applicationVersion := application.Status.UpdatingVersion - return fmt.Sprintf(TaskManagerVersionPodNameFormat, applicationName, hash, applicationVersion) - } return fmt.Sprintf(TaskManagerPodNameFormat, applicationName, hash) } func getTaskManagerName(application *v1beta1.FlinkApplication, hash string) string { applicationName := application.Name + if v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { + applicationVersion := application.Status.UpdatingVersion + return fmt.Sprintf(TaskManagerVersionNameFormat, applicationName, hash, applicationVersion) + } return fmt.Sprintf(TaskManagerNameFormat, applicationName, hash) } diff --git a/pkg/controller/flink/task_manager_controller_test.go b/pkg/controller/flink/task_manager_controller_test.go index 9f3edca6..4200383d 100644 --- a/pkg/controller/flink/task_manager_controller_test.go +++ b/pkg/controller/flink/task_manager_controller_test.go @@ -53,8 +53,9 @@ func TestGetTaskManagerPodName(t *testing.T) { func TestGetTaskManagerPodNameWithVersion(t *testing.T) { app := getFlinkTestApp() app.Spec.DeploymentMode = v1beta1.DeploymentModeBlueGreen + app.Status.DeploymentMode = v1beta1.DeploymentModeBlueGreen app.Status.UpdatingVersion = testVersion - assert.Equal(t, "app-name-"+testAppHash+"-tm-"+testVersion+"-pod", getTaskManagerPodName(&app, testAppHash)) + assert.Equal(t, "app-name-"+testAppHash+"-"+testVersion+"-tm", getTaskManagerName(&app, testAppHash)) } func TestTaskManagerCreateSuccess(t *testing.T) { @@ -238,6 +239,7 @@ func TestTaskManagerCreateSuccessWithVersion(t *testing.T) { app.Spec.EntryClass = testEntryClass app.Spec.ProgramArgs = testProgramArgs app.Spec.DeploymentMode = v1beta1.DeploymentModeBlueGreen + app.Status.DeploymentMode = v1beta1.DeploymentModeBlueGreen app.Status.UpdatingVersion = testVersion annotations := map[string]string{ "key": "annotation", @@ -245,13 +247,14 @@ func TestTaskManagerCreateSuccessWithVersion(t *testing.T) { "flink-job-properties": "jarName: test.jar\nparallelism: 8\nentryClass:com.test.MainClass\nprogramArgs:\"--test\"", } - hash := "f0bd1679" + hash := "5cb5943e" app.Annotations = annotations expectedLabels := map[string]string{ - "flink-app": "app-name", - "flink-app-hash": hash, - "flink-deployment-type": "taskmanager", + "flink-app": "app-name", + "flink-app-hash": hash, + "flink-deployment-type": "taskmanager", + "flink-application-version": testVersion, } mockK8Cluster := testController.k8Cluster.(*k8mock.K8Cluster) mockK8Cluster.CreateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index 8ecd201d..2a7d79eb 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -154,7 +154,7 @@ func (s *FlinkStateMachine) handle(ctx context.Context, application *v1beta1.Fli updateLastSeenError := false appPhase := application.Status.Phase // initialize application status array if it's not yet been initialized - s.initializeAppStatusIfEmpty(ctx, application) + s.initializeAppStatusIfEmpty(application) if !application.ObjectMeta.DeletionTimestamp.IsZero() && appPhase != v1beta1.FlinkApplicationDeleting { s.updateApplicationPhase(application, v1beta1.FlinkApplicationDeleting) @@ -186,6 +186,9 @@ func (s *FlinkStateMachine) handle(ctx context.Context, application *v1beta1.Fli updateApplication, appErr = s.handleRollingBack(ctx, application) case v1beta1.FlinkApplicationDeleting: updateApplication, appErr = s.handleApplicationDeleting(ctx, application) + case v1beta1.FlinkApplicationDualRunning: + updateApplication, appErr = s.handleDualRunning(ctx, application) + } if !v1beta1.IsRunningPhase(appPhase) { @@ -237,7 +240,16 @@ func (s *FlinkStateMachine) handleNewOrUpdating(ctx context.Context, application fmt.Sprintf("Failed to create Flink Cluster: %s", reason)) return s.deployFailed(application) } - + // Update version if blue/green deploy + if v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { + application.Status.UpdatingVersion = getUpdatingVersion(application) + // First deploy both versions are the same + if application.Status.DeployHash == "" { + application.Status.DeployVersion = application.Status.UpdatingVersion + } + // Reset teardown hash if set + application.Status.TeardownHash = "" + } // Create the Flink cluster err := s.flinkController.CreateCluster(ctx, application) if err != nil { @@ -285,36 +297,35 @@ func (s *FlinkStateMachine) handleClusterStarting(ctx context.Context, applicati return statusUnchanged, nil } + if v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { + // Update hashes + s.flinkController.UpdateLatestVersionAndHash(application, application.Status.UpdatingVersion, flink.HashForApplication(application)) + + } + logger.Infof(ctx, "Flink cluster has started successfully") // TODO: in single mode move to submitting job - if application.Spec.SavepointDisabled { + if application.Spec.SavepointDisabled && !v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { s.updateApplicationPhase(application, v1beta1.FlinkApplicationCancelling) + } else if application.Spec.SavepointDisabled && v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { + // Blue Green deployment and no savepoint required implies, we directly transition to submitting job + s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob) } else { s.updateApplicationPhase(application, v1beta1.FlinkApplicationSavepointing) } return statusChanged, nil } -func (s *FlinkStateMachine) initializeAppStatusIfEmpty(ctx context.Context, application *v1beta1.FlinkApplication) { - // initialize the app status array to include 2 status elements in case of blue green deploys - // else use a one element array - if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { - application.Status.VersionStatuses = make([]v1beta1.FlinkApplicationVersionStatus, v1beta1.GetMaxRunningJobs(application.Spec.DeploymentMode)) - - // If an application is moving from a Dual to BlueGreen deployment mode, - // We pre-populate the version statuses array with the current Job and Cluster Status - // And reset top-level ClusterStatus and JobStatus to empty structs - // as they'll no longer get updated - if application.Status.JobStatus != (v1beta1.FlinkJobStatus{}) { - s.flinkController.UpdateLatestJobStatus(ctx, application, application.Status.JobStatus) - application.Status.JobStatus = v1beta1.FlinkJobStatus{} - } - - if application.Status.ClusterStatus != (v1beta1.FlinkClusterStatus{}) { - s.flinkController.UpdateLatestClusterStatus(ctx, application, application.Status.ClusterStatus) - application.Status.ClusterStatus = v1beta1.FlinkClusterStatus{} +func (s *FlinkStateMachine) initializeAppStatusIfEmpty(application *v1beta1.FlinkApplication) { + if v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { + if len(application.Status.VersionStatuses) == 0 { + application.Status.VersionStatuses = make([]v1beta1.FlinkApplicationVersionStatus, v1beta1.GetMaxRunningJobs(application.Spec.DeploymentMode)) } } + // Set the deployment mode if it's never been set + if application.Status.DeploymentMode == "" { + application.Status.DeploymentMode = application.Spec.DeploymentMode + } } func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) { @@ -331,24 +342,29 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a s.updateApplicationPhase(application, v1beta1.FlinkApplicationRecovering) return statusChanged, nil } - + cancelFlag := getCancelFlag(application) // we haven't started savepointing yet; do so now // TODO: figure out the idempotence of this if application.Status.SavepointTriggerID == "" { - triggerID, err := s.flinkController.CancelWithSavepoint(ctx, application, application.Status.DeployHash) + triggerID, err := s.flinkController.Savepoint(ctx, application, application.Status.DeployHash, cancelFlag, s.flinkController.GetLatestJobID(ctx, application)) if err != nil { return statusUnchanged, err } + if cancelFlag { + s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "CancellingJob", + fmt.Sprintf("Cancelling job %s with a final savepoint", s.flinkController.GetLatestJobID(ctx, application))) + } else { + s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "SavepointingJob", + fmt.Sprintf("Savepointing job %s with a final savepoint", s.flinkController.GetLatestJobID(ctx, application))) - s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "CancellingJob", - fmt.Sprintf("Cancelling job %s with a final savepoint", s.flinkController.GetLatestJobID(ctx, application))) + } application.Status.SavepointTriggerID = triggerID return statusChanged, nil } // check the savepoints in progress - savepointStatusResponse, err := s.flinkController.GetSavepointStatus(ctx, application, application.Status.DeployHash) + savepointStatusResponse, err := s.flinkController.GetSavepointStatus(ctx, application, application.Status.DeployHash, s.flinkController.GetLatestJobID(ctx, application)) if err != nil { return statusUnchanged, err } @@ -364,11 +380,21 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a s.updateApplicationPhase(application, v1beta1.FlinkApplicationRecovering) return statusChanged, nil } else if savepointStatusResponse.SavepointStatus.Status == client.SavePointCompleted { - s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "CanceledJob", - fmt.Sprintf("Canceled job with savepoint %s", - savepointStatusResponse.Operation.Location)) + if cancelFlag { + s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "CanceledJob", + fmt.Sprintf("Canceled job with savepoint %s", + savepointStatusResponse.Operation.Location)) + } else { + s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "SavepointCompleted", + fmt.Sprintf("Completed savepoint at %s", + savepointStatusResponse.Operation.Location)) + } + application.Status.SavepointPath = savepointStatusResponse.Operation.Location - s.flinkController.UpdateLatestJobID(ctx, application, "") + // We haven't cancelled the job in this case, so don't reset job ID + if !v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { + s.flinkController.UpdateLatestJobID(ctx, application, "") + } s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob) return statusChanged, nil } @@ -400,7 +426,7 @@ func (s *FlinkStateMachine) handleApplicationCancelling(ctx context.Context, app if job != nil && job.State != client.Canceled && job.State != client.Failed { - err := s.flinkController.ForceCancel(ctx, application, application.Status.DeployHash) + err := s.flinkController.ForceCancel(ctx, application, application.Status.DeployHash, s.flinkController.GetLatestJobID(ctx, application)) if err != nil { return statusUnchanged, err } @@ -499,7 +525,7 @@ func (s *FlinkStateMachine) submitJobIfNeeded(ctx context.Context, app *v1beta1. } func (s *FlinkStateMachine) updateGenericService(ctx context.Context, app *v1beta1.FlinkApplication, newHash string) error { - service, err := s.k8Cluster.GetService(ctx, app.Namespace, app.Name) + service, err := s.k8Cluster.GetService(ctx, app.Namespace, app.Name, string(app.Status.UpdatingVersion)) if err != nil { return err } @@ -591,10 +617,7 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta } if job.State == client.Running && allVerticesStarted { - // Update the application status with the running job info - app.Status.DeployHash = hash - app.Status.SavepointPath = "" - app.Status.SavepointTriggerID = "" + // Update job status jobStatus := s.flinkController.GetLatestJobStatus(ctx, app) jobStatus.JarName = app.Spec.JarName jobStatus.Parallelism = app.Spec.Parallelism @@ -602,6 +625,14 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta jobStatus.ProgramArgs = app.Spec.ProgramArgs jobStatus.AllowNonRestoredState = app.Spec.AllowNonRestoredState s.flinkController.UpdateLatestJobStatus(ctx, app, jobStatus) + // Update the application status with the running job info + app.Status.SavepointPath = "" + app.Status.SavepointTriggerID = "" + if v1beta1.IsBlueGreenDeploymentMode(app.Status.DeploymentMode) && app.Status.DeployHash != "" { + s.updateApplicationPhase(app, v1beta1.FlinkApplicationDualRunning) + return statusChanged, nil + } + app.Status.DeployHash = hash s.updateApplicationPhase(app, v1beta1.FlinkApplicationRunning) return statusChanged, nil } @@ -623,6 +654,11 @@ func (s *FlinkStateMachine) handleRollingBack(ctx context.Context, app *v1beta1. s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "DeployFailed", fmt.Sprintf("Deployment %s failed, rolling back", flink.HashForApplication(app))) + // In the case of blue green deploys, we don't try to submit a new job + // and instead transition to a deploy failed state + if v1beta1.IsBlueGreenDeploymentMode(app.Status.DeploymentMode) && app.Status.DeployHash != "" { + return s.deployFailed(app) + } // TODO: handle single mode // TODO: it's possible that a job is successfully running in the new cluster at this point -- should cancel it @@ -679,6 +715,11 @@ func (s *FlinkStateMachine) handleApplicationRunning(ctx context.Context, applic // If the application has changed (i.e., there are no current deployments), and we haven't already failed trying to // do the update, move to the cluster starting phase to create the new cluster if cur == nil { + if s.isIncompatibleDeploymentModeChange(application) { + s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "UnsupportedChange", + fmt.Sprintf("Changing deployment mode from %s to %s is unsupported", application.Status.DeploymentMode, application.Spec.DeploymentMode)) + return s.deployFailed(application) + } logger.Infof(ctx, "Application resource has changed. Moving to Updating") // TODO: handle single mode s.updateApplicationPhase(application, v1beta1.FlinkApplicationUpdating) @@ -697,8 +738,16 @@ func (s *FlinkStateMachine) handleApplicationRunning(ctx context.Context, applic logger.Debugf(ctx, "Application running with job %v", job.JobID) } - // If there are old resources left-over from a previous version, clean them up - err = s.flinkController.DeleteOldResourcesForApp(ctx, application) + // For blue-green deploys, specify the hash to be deleted + if application.Status.FailedDeployHash != "" && v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { + err = s.flinkController.DeleteResourcesForAppWithHash(ctx, application, application.Status.FailedDeployHash) + // Delete status object for the failed hash + s.flinkController.DeleteStatusPostTeardown(ctx, application, application.Status.FailedDeployHash) + } else if !v1beta1.IsBlueGreenDeploymentMode(application.Status.DeploymentMode) { + // If there are old resources left-over from a previous version, clean them up + err = s.flinkController.DeleteOldResourcesForApp(ctx, application) + } + if err != nil { logger.Warn(ctx, "Failed to clean up old resources: %v", err) } @@ -769,7 +818,9 @@ func (s *FlinkStateMachine) handleApplicationDeleting(ctx context.Context, app * if app.Spec.DeleteMode == v1beta1.DeleteModeNone || app.Status.DeployHash == "" { return s.clearFinalizers(ctx, app) } - + if v1beta1.IsBlueGreenDeploymentMode(app.Status.DeploymentMode) { + return s.deleteBlueGreenApplication(ctx, app) + } job, err := s.flinkController.GetJobForApplication(ctx, app, app.Status.DeployHash) if err != nil { return statusUnchanged, err @@ -790,7 +841,7 @@ func (s *FlinkStateMachine) handleApplicationDeleting(ctx context.Context, app * } logger.Infof(ctx, "Force-cancelling job without a savepoint") - return statusUnchanged, s.flinkController.ForceCancel(ctx, app, app.Status.DeployHash) + return statusUnchanged, s.flinkController.ForceCancel(ctx, app, app.Status.DeployHash, s.flinkController.GetLatestJobID(ctx, app)) case v1beta1.DeleteModeSavepoint, "": if app.Status.SavepointPath != "" { // we've already created the savepoint, now just waiting for the job to be cancelled @@ -803,7 +854,7 @@ func (s *FlinkStateMachine) handleApplicationDeleting(ctx context.Context, app * if app.Status.SavepointTriggerID == "" { // delete with savepoint - triggerID, err := s.flinkController.CancelWithSavepoint(ctx, app, app.Status.DeployHash) + triggerID, err := s.flinkController.Savepoint(ctx, app, app.Status.DeployHash, getCancelFlag(app), s.flinkController.GetLatestJobID(ctx, app)) if err != nil { return statusUnchanged, err } @@ -812,7 +863,7 @@ func (s *FlinkStateMachine) handleApplicationDeleting(ctx context.Context, app * app.Status.SavepointTriggerID = triggerID } else { // we've already started savepointing; check the status - status, err := s.flinkController.GetSavepointStatus(ctx, app, app.Status.DeployHash) + status, err := s.flinkController.GetSavepointStatus(ctx, app, app.Status.DeployHash, s.flinkController.GetLatestJobID(ctx, app)) if err != nil { return statusUnchanged, err } @@ -867,6 +918,204 @@ func (s *FlinkStateMachine) compareAndUpdateError(application *v1beta1.FlinkAppl } +func getUpdatingVersion(application *v1beta1.FlinkApplication) v1beta1.FlinkApplicationVersion { + if getDeployedVersion(application) == v1beta1.BlueFlinkApplication { + return v1beta1.GreenFlinkApplication + } + + return v1beta1.BlueFlinkApplication +} + +func getDeployedVersion(application *v1beta1.FlinkApplication) v1beta1.FlinkApplicationVersion { + // First deploy, set the version to Blue + if application.Status.DeployVersion == "" { + application.Status.DeployVersion = v1beta1.BlueFlinkApplication + } + return application.Status.DeployVersion +} + +func getCancelFlag(app *v1beta1.FlinkApplication) bool { + if v1beta1.IsBlueGreenDeploymentMode(app.Status.DeploymentMode) && app.Status.Phase != v1beta1.FlinkApplicationDeleting { + return false + } + return true +} + +// Two applications are running in this phase. This phase is only ever reached when the +// DeploymentMode is set to BlueGreen +func (s *FlinkStateMachine) handleDualRunning(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) { + if application.Spec.TearDownVersionHash != "" { + return s.teardownApplicationVersion(ctx, application) + } + + // Update status of the cluster + hasClusterStatusChanged, clusterErr := s.flinkController.CompareAndUpdateClusterStatus(ctx, application, application.Status.DeployHash) + if clusterErr != nil { + logger.Errorf(ctx, "Updating cluster status failed with %v", clusterErr) + } + + // Update status of jobs on the cluster + hasJobStatusChanged, jobsErr := s.flinkController.CompareAndUpdateJobStatus(ctx, application, application.Status.DeployHash) + if jobsErr != nil { + logger.Errorf(ctx, "Updating jobs status failed with %v", jobsErr) + } + + // Update k8s object if either job or cluster status has changed + if hasJobStatusChanged || hasClusterStatusChanged { + return statusChanged, nil + } + return statusUnchanged, nil +} + +func (s *FlinkStateMachine) teardownApplicationVersion(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) { + versionHashToTeardown := application.Spec.TearDownVersionHash + versionToTeardown, jobID, err := s.flinkController.GetVersionAndJobIDForHash(ctx, application, versionHashToTeardown) + + if err != nil { + s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "TeardownFailed", + fmt.Sprintf("Failed to find application version %v", + versionHashToTeardown)) + return statusUnchanged, nil + } + + s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "TeardownInitated", + fmt.Sprintf("Tearing down application with hash %s and version %v", versionHashToTeardown, + versionToTeardown)) + // Force-cancel job first + s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "ForceCanceling", + fmt.Sprintf("Force-canceling application with version %v and hash %s", + versionToTeardown, versionHashToTeardown)) + + err = s.flinkController.ForceCancel(ctx, application, versionHashToTeardown, jobID) + if err != nil { + s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "TeardownFailed", + fmt.Sprintf("Failed to force-cancel application version %v and hash %s; will attempt to tear down cluster immediately: %s", + versionToTeardown, versionHashToTeardown, err)) + return s.deployFailed(application) + } + + // Delete all resources associated with the teardown version + err = s.flinkController.DeleteResourcesForAppWithHash(ctx, application, versionHashToTeardown) + if err != nil { + s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "TeardownFailed", + fmt.Sprintf("Failed to teardown application with hash %s and version %v, manual intervention needed: %s", versionHashToTeardown, + versionToTeardown, err)) + return s.deployFailed(application) + } + s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "TeardownCompleted", + fmt.Sprintf("Tore down application with hash %s and version %v", versionHashToTeardown, + versionToTeardown)) + + s.flinkController.DeleteStatusPostTeardown(ctx, application, versionHashToTeardown) + versionPostTeardown, versionHashPostTeardown := s.flinkController.GetVersionAndHashPostTeardown(ctx, application) + application.Status.DeployVersion = versionPostTeardown + application.Status.UpdatingVersion = "" + application.Status.DeployHash = versionHashPostTeardown + application.Status.UpdatingHash = "" + application.Status.TeardownHash = flink.HashForApplication(application) + s.updateApplicationPhase(application, v1beta1.FlinkApplicationRunning) + return statusChanged, nil +} + +func (s *FlinkStateMachine) deleteBlueGreenApplication(ctx context.Context, app *v1beta1.FlinkApplication) (bool, error) { + // Cancel deployed job + deployedJob, err := s.flinkController.GetJobToDeleteForApplication(ctx, app, app.Status.DeployHash) + if err != nil { + return statusUnchanged, nil + } + if !jobFinished(deployedJob) { + isFinished, err := s.cancelAndDeleteJob(ctx, app, deployedJob, app.Status.DeployHash) + if err != nil { + return statusUnchanged, nil + } + return isFinished, nil + } + + deploySavepointPath := app.Status.SavepointPath + // Cancel Updating job + updatingJob, err := s.flinkController.GetJobToDeleteForApplication(ctx, app, app.Status.UpdatingHash) + if err != nil { + return statusUnchanged, nil + } + if !jobFinished(updatingJob) { + if app.Status.SavepointPath == deploySavepointPath { + app.Status.SavepointPath = "" + } + isFinished, err := s.cancelAndDeleteJob(ctx, app, updatingJob, app.Status.UpdatingHash) + if err != nil { + return statusUnchanged, nil + } + return isFinished, nil + } + + if jobFinished(deployedJob) && jobFinished(updatingJob) { + return s.clearFinalizers(ctx, app) + } + return statusUnchanged, nil +} + +func (s *FlinkStateMachine) cancelAndDeleteJob(ctx context.Context, app *v1beta1.FlinkApplication, job *client.FlinkJobOverview, hash string) (bool, error) { + switch app.Spec.DeleteMode { + case v1beta1.DeleteModeForceCancel: + if job.State == client.Cancelling { + // we've already cancelled the job, waiting for it to finish + return statusUnchanged, nil + } else if jobFinished(job) { + return statusUnchanged, nil + } + + logger.Infof(ctx, "Force-cancelling job without a savepoint") + return statusUnchanged, s.flinkController.ForceCancel(ctx, app, hash, s.flinkController.GetLatestJobID(ctx, app)) + case v1beta1.DeleteModeSavepoint, "": + if app.Status.SavepointPath != "" { + return statusChanged, nil + } + + if app.Status.SavepointTriggerID == "" { + // delete with savepoint + triggerID, err := s.flinkController.Savepoint(ctx, app, hash, getCancelFlag(app), job.JobID) + if err != nil { + return statusUnchanged, err + } + s.flinkController.LogEvent(ctx, app, corev1.EventTypeNormal, "CancellingJob", + fmt.Sprintf("Cancelling job with savepoint %v", triggerID)) + app.Status.SavepointTriggerID = triggerID + } else { + // we've already started savepointing; check the status + status, err := s.flinkController.GetSavepointStatus(ctx, app, hash, job.JobID) + if err != nil { + return statusUnchanged, err + } + + if status.Operation.Location == "" && status.SavepointStatus.Status != client.SavePointInProgress { + // savepointing failed + s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "SavepointFailed", + fmt.Sprintf("Failed to take savepoint %v", status.Operation.FailureCause)) + // clear the trigger id so that we can try again + app.Status.SavepointTriggerID = "" + return true, client.GetRetryableError(errors.New("failed to take savepoint"), + v1beta1.CancelJobWithSavepoint, "500", math.MaxInt32) + } else if status.SavepointStatus.Status == client.SavePointCompleted { + // we're done, clean up + s.flinkController.LogEvent(ctx, app, corev1.EventTypeNormal, "CanceledJob", + fmt.Sprintf("Cancelled job with savepoint '%s'", status.Operation.Location)) + app.Status.SavepointPath = status.Operation.Location + app.Status.SavepointTriggerID = "" + } + } + + return statusChanged, nil + default: + logger.Errorf(ctx, "Unsupported DeleteMode %s", app.Spec.DeleteMode) + } + + return statusUnchanged, nil +} + +func (s *FlinkStateMachine) isIncompatibleDeploymentModeChange(application *v1beta1.FlinkApplication) bool { + return application.Spec.DeploymentMode != application.Status.DeploymentMode +} + func createRetryHandler() client.RetryHandlerInterface { return client.NewRetryHandler(config.GetConfig().BaseBackoffDuration.Duration, config.GetConfig().MaxErrDuration.Duration, config.GetConfig().MaxBackoffDuration.Duration) diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index fc15c62c..99389ff1 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -149,7 +149,7 @@ func TestHandleApplicationCancel(t *testing.T) { }, nil } - mockFlinkController.ForceCancelFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (e error) { + mockFlinkController.ForceCancelFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) (e error) { assert.Equal(t, "old-hash", hash) cancelInvoked = true @@ -187,7 +187,7 @@ func TestHandleApplicationCancelFailedWithMaxRetries(t *testing.T) { updateInvoked := false stateMachineForTest := getTestStateMachine() mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) - mockFlinkController.ForceCancelFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error { + mockFlinkController.ForceCancelFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) error { // given we maxed out on retries, we should never have come here assert.False(t, true) return nil @@ -256,7 +256,7 @@ func TestHandleApplicationSavepointingInitialDeploy(t *testing.T) { stateMachineForTest := getTestStateMachine() mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) - mockFlinkController.CancelWithSavepointFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (s string, e error) { + mockFlinkController.SavepointFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, isCancel bool, jobID string) (s string, e error) { // should not be called assert.False(t, true) return "", nil @@ -291,14 +291,14 @@ func TestHandleApplicationSavepointingDual(t *testing.T) { stateMachineForTest := getTestStateMachine() mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) - mockFlinkController.CancelWithSavepointFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (s string, e error) { + mockFlinkController.SavepointFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, isCancel bool, jobID string) (s string, e error) { assert.Equal(t, "old-hash", hash) cancelInvoked = true return "trigger", nil } - mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) { + mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) (*client.SavepointResponse, error) { assert.Equal(t, "old-hash", hash) return &client.SavepointResponse{ SavepointStatus: client.SavepointStatusResponse{ @@ -340,7 +340,7 @@ func TestHandleApplicationSavepointingFailed(t *testing.T) { updateInvoked := false stateMachineForTest := getTestStateMachine() mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) - mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) { + mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) (*client.SavepointResponse, error) { return &client.SavepointResponse{ SavepointStatus: client.SavepointStatusResponse{ Status: client.SavePointCompleted, @@ -470,7 +470,7 @@ func TestSubmittingToRunning(t *testing.T) { mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) getServiceCount := 0 - mockK8Cluster.GetServiceFunc = func(ctx context.Context, namespace string, name string) (*v1.Service, error) { + mockK8Cluster.GetServiceFunc = func(ctx context.Context, namespace string, name string, version string) (*v1.Service, error) { assert.Equal(t, "flink", namespace) assert.Equal(t, "test-app", name) @@ -655,7 +655,7 @@ func TestRollingBack(t *testing.T) { mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) getServiceCount := 0 - mockK8Cluster.GetServiceFunc = func(ctx context.Context, namespace string, name string) (*v1.Service, error) { + mockK8Cluster.GetServiceFunc = func(ctx context.Context, namespace string, name string, version string) (*v1.Service, error) { assert.Equal(t, "flink", namespace) assert.Equal(t, "test-app", name) @@ -797,7 +797,7 @@ func TestDeleteWithSavepoint(t *testing.T) { savepointPath := "s3:///path/to/savepoint" mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) - mockFlinkController.CancelWithSavepointFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) { + mockFlinkController.SavepointFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, isCancel bool, jobID string) (string, error) { return triggerID, nil } @@ -837,7 +837,7 @@ func TestDeleteWithSavepoint(t *testing.T) { assert.NoError(t, err) savepointStatusCount := 0 - mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) { + mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) (*client.SavepointResponse, error) { savepointStatusCount++ if savepointStatusCount == 1 { @@ -972,7 +972,7 @@ func TestDeleteWithForceCancel(t *testing.T) { } cancelled := false - mockFlinkController.ForceCancelFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error { + mockFlinkController.ForceCancelFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) error { cancelled = true return nil } @@ -1038,7 +1038,7 @@ func TestDeleteModeNone(t *testing.T) { } cancelled := false - mockFlinkController.ForceCancelFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error { + mockFlinkController.ForceCancelFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) error { cancelled = true return nil } @@ -1097,7 +1097,7 @@ func TestRollbackWithRetryableError(t *testing.T) { retryableErr := client.GetRetryableError(errors.New("blah"), "GetClusterOverview", "FAILED", 3) stateMachineForTest := getTestStateMachine() mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) - mockFlinkController.CancelWithSavepointFunc = func(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (savepoint string, err error) { + mockFlinkController.SavepointFunc = func(ctx context.Context, app *v1beta1.FlinkApplication, hash string, isCancel bool, jobID string) (savepoint string, err error) { return "", retryableErr } @@ -1198,7 +1198,7 @@ func TestRollbackWithFailFastError(t *testing.T) { mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) appHash := flink.HashForApplication(&app) getServiceCount := 0 - mockK8Cluster.GetServiceFunc = func(ctx context.Context, namespace string, name string) (*v1.Service, error) { + mockK8Cluster.GetServiceFunc = func(ctx context.Context, namespace string, name string, version string) (*v1.Service, error) { hash := "old-hash-retry-err" if getServiceCount > 0 { hash = appHash @@ -1351,7 +1351,7 @@ func TestForceRollback(t *testing.T) { mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) getServiceCount := 0 - mockK8Cluster.GetServiceFunc = func(ctx context.Context, namespace string, name string) (*v1.Service, error) { + mockK8Cluster.GetServiceFunc = func(ctx context.Context, namespace string, name string, version string) (*v1.Service, error) { hash := oldHash if getServiceCount > 0 { hash = oldHash @@ -1452,7 +1452,7 @@ func TestCheckSavepointStatusFailing(t *testing.T) { stateMachineForTest := getTestStateMachine() mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) - mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) { + mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) (*client.SavepointResponse, error) { return nil, retryableErr.(*v1beta1.FlinkApplicationError) } @@ -1506,10 +1506,10 @@ func TestDeleteWhenCheckSavepointStatusFailing(t *testing.T) { stateMachineForTest := getTestStateMachine() mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) - mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) { + mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) (*client.SavepointResponse, error) { return nil, retryableErr.(*v1beta1.FlinkApplicationError) } - mockFlinkController.CancelWithSavepointFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (s string, e error) { + mockFlinkController.SavepointFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, isCancel bool, jobID string) (s string, e error) { return "triggerId", nil } mockRetryHandler := stateMachineForTest.retryHandler.(*mock.RetryHandler) @@ -1536,7 +1536,7 @@ func TestDeleteWhenCheckSavepointStatusFailing(t *testing.T) { }, nil } - mockFlinkController.ForceCancelFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error { + mockFlinkController.ForceCancelFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) error { return nil } err = stateMachineForTest.Handle(context.Background(), &app) @@ -1546,3 +1546,511 @@ func TestDeleteWhenCheckSavepointStatusFailing(t *testing.T) { assert.Nil(t, app.GetFinalizers()) } + +func TestRunningToDualRunning(t *testing.T) { + deployHash := "appHash" + updatingHash := "2845d780" + triggerID := "trigger" + savepointPath := "savepointPath" + app := v1beta1.FlinkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "flink", + }, + Spec: v1beta1.FlinkApplicationSpec{ + JarName: "job.jar", + Parallelism: 5, + EntryClass: "com.my.Class", + ProgramArgs: "--test", + DeploymentMode: v1beta1.DeploymentModeBlueGreen, + }, + Status: v1beta1.FlinkApplicationStatus{ + Phase: v1beta1.FlinkApplicationRunning, + DeployHash: deployHash, + DeployVersion: v1beta1.GreenFlinkApplication, + VersionStatuses: []v1beta1.FlinkApplicationVersionStatus{ + { + JobStatus: v1beta1.FlinkJobStatus{ + JobID: "jobId", + State: v1beta1.Running, + }, + VersionHash: deployHash, + Version: v1beta1.GreenFlinkApplication, + }, + {}, + }, + }, + } + + stateMachineForTest := getTestStateMachine() + mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) + + mockFlinkController.GetJobForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) { + return &client.FlinkJobOverview{ + JobID: "jobID2", + State: client.Running, + }, nil + + } + + mockFlinkController.IsClusterReadyFunc = func(ctx context.Context, application *v1beta1.FlinkApplication) (b bool, err error) { + return true, nil + } + + mockFlinkController.IsServiceReadyFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (b bool, err error) { + return true, nil + } + + // Handle Running and move to Updating + err := stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + assert.Equal(t, v1beta1.FlinkApplicationUpdating, app.Status.Phase) + assert.Equal(t, 2, len(app.Status.VersionStatuses)) + assert.Equal(t, "", app.Status.UpdatingHash) + + // Handle Updating and move to ClusterStarting + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Equal(t, v1beta1.FlinkApplicationClusterStarting, app.Status.Phase) + assert.Equal(t, v1beta1.BlueFlinkApplication, app.Status.UpdatingVersion) + assert.Nil(t, err) + + // Handle ClusterStarting and move to Savepointing + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Equal(t, v1beta1.FlinkApplicationSavepointing, app.Status.Phase) + assert.Equal(t, updatingHash, app.Status.VersionStatuses[1].VersionHash) + assert.Equal(t, v1beta1.BlueFlinkApplication, app.Status.VersionStatuses[1].Version) + assert.Equal(t, updatingHash, app.Status.UpdatingHash) + assert.Nil(t, err) + + // Handle Savepointing and move to SubmittingJob + mockFlinkController.SavepointFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, isCancel bool, jobID string) (s string, err error) { + assert.False(t, isCancel) + return triggerID, nil + } + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) (response *client.SavepointResponse, err error) { + return &client.SavepointResponse{ + SavepointStatus: client.SavepointStatusResponse{ + Status: client.SavePointCompleted, + }, + Operation: client.SavepointOperationResponse{ + Location: savepointPath, + FailureCause: client.FailureCause{}, + }, + }, nil + } + assert.Equal(t, app.Status.SavepointTriggerID, triggerID) + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + assert.Equal(t, app.Status.SavepointPath, savepointPath) + assert.Equal(t, v1beta1.FlinkApplicationSubmittingJob, app.Status.Phase) + + // Handle SubmittingJob and move to DualRunning + mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) + + mockK8Cluster.GetServiceFunc = func(ctx context.Context, namespace string, name string, version string) (*v1.Service, error) { + + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "flink", + }, + Spec: v1.ServiceSpec{ + Selector: map[string]string{ + "flink-app-hash": updatingHash, + }, + }, + }, nil + } + + mockFlinkController.StartFlinkJobFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (s string, err error) { + return "jobID2", nil + } + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + assert.Equal(t, "jobID2", app.Status.VersionStatuses[1].JobStatus.JobID) + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + assert.Equal(t, "jobID2", app.Status.VersionStatuses[1].JobStatus.JobID) + assert.Equal(t, app.Spec.JarName, app.Status.VersionStatuses[1].JobStatus.JarName) + assert.Equal(t, app.Spec.Parallelism, app.Status.VersionStatuses[1].JobStatus.Parallelism) + assert.Equal(t, app.Spec.EntryClass, app.Status.VersionStatuses[1].JobStatus.EntryClass) + assert.Equal(t, app.Spec.ProgramArgs, app.Status.VersionStatuses[1].JobStatus.ProgramArgs) + + assert.Equal(t, v1beta1.FlinkApplicationDualRunning, app.Status.Phase) + assert.Equal(t, deployHash, app.Status.DeployHash) + assert.Equal(t, updatingHash, app.Status.UpdatingHash) + assert.Equal(t, v1beta1.GreenFlinkApplication, app.Status.DeployVersion) + assert.Equal(t, v1beta1.BlueFlinkApplication, app.Status.UpdatingVersion) + +} + +func TestDualRunningToRunning(t *testing.T) { + deployHash := "appHash" + updatingHash := "2845d780" + teardownHash := "9dc7d91b" + + app := v1beta1.FlinkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "flink", + }, + Spec: v1beta1.FlinkApplicationSpec{ + JarName: "job.jar", + Parallelism: 5, + EntryClass: "com.my.Class", + ProgramArgs: "--test", + DeploymentMode: v1beta1.DeploymentModeBlueGreen, + }, + Status: v1beta1.FlinkApplicationStatus{ + Phase: v1beta1.FlinkApplicationDualRunning, + DeployHash: deployHash, + UpdatingHash: updatingHash, + DeployVersion: v1beta1.GreenFlinkApplication, + UpdatingVersion: v1beta1.BlueFlinkApplication, + VersionStatuses: []v1beta1.FlinkApplicationVersionStatus{ + { + JobStatus: v1beta1.FlinkJobStatus{ + JobID: "jobId", + State: v1beta1.Running, + }, + VersionHash: deployHash, + Version: v1beta1.GreenFlinkApplication, + }, + { + JobStatus: v1beta1.FlinkJobStatus{ + JobID: "jobId2", + State: v1beta1.Running, + }, + VersionHash: updatingHash, + Version: v1beta1.BlueFlinkApplication, + }, + }, + }, + } + + stateMachineForTest := getTestStateMachine() + mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) + mockFlinkController.DeleteResourcesForAppWithHashFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error { + assert.Equal(t, deployHash, hash) + return nil + } + mockFlinkController.GetVersionAndJobIDForHashFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, string, error) { + assert.Equal(t, deployHash, hash) + return string(v1beta1.GreenFlinkApplication), "jobId", nil + } + mockFlinkController.ForceCancelFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) (e error) { + assert.Equal(t, "jobId", jobID) + return nil + } + app.Spec.TearDownVersionHash = deployHash + expectedVersionStatus := app.Status.VersionStatuses[1] + // Handle DualRunning and move to Running + err := stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + assert.Equal(t, v1beta1.FlinkApplicationRunning, app.Status.Phase) + assert.Empty(t, app.Status.VersionStatuses[1]) + assert.Equal(t, teardownHash, app.Status.TeardownHash) + assert.Equal(t, expectedVersionStatus, app.Status.VersionStatuses[0]) + assert.Equal(t, "", app.Status.UpdatingHash) + assert.Equal(t, updatingHash, app.Status.DeployHash) + assert.Equal(t, "", string(app.Status.UpdatingVersion)) + assert.Equal(t, v1beta1.BlueFlinkApplication, app.Status.DeployVersion) +} + +func TestBlueGreenUpdateWithError(t *testing.T) { + deployHash := "deployHash" + updatingHash := "updateHash" + + app := v1beta1.FlinkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "flink", + }, + Spec: v1beta1.FlinkApplicationSpec{ + JarName: "job.jar", + Parallelism: 5, + EntryClass: "com.my.Class", + ProgramArgs: "--test", + DeploymentMode: v1beta1.DeploymentModeBlueGreen, + }, + Status: v1beta1.FlinkApplicationStatus{ + Phase: v1beta1.FlinkApplicationSubmittingJob, + DeployHash: deployHash, + DeployVersion: v1beta1.GreenFlinkApplication, + VersionStatuses: []v1beta1.FlinkApplicationVersionStatus{ + { + JobStatus: v1beta1.FlinkJobStatus{ + JobID: "jobId", + State: v1beta1.Running, + }, + VersionHash: deployHash, + Version: v1beta1.GreenFlinkApplication, + }, + {}, + }, + }, + } + + stateMachineForTest := getTestStateMachine() + mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) + mockFlinkController.StartFlinkJobFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (s string, err error) { + return "", client.GetNonRetryableError(errors.New("bad submission"), "SubmitJob", "500") + + } + mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) + mockK8Cluster.GetServiceFunc = func(ctx context.Context, namespace string, name string, version string) (*v1.Service, error) { + + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "flink", + }, + Spec: v1.ServiceSpec{ + Selector: map[string]string{ + "flink-app-hash": updatingHash, + }, + }, + }, nil + } + + err := stateMachineForTest.Handle(context.Background(), &app) + assert.NotNil(t, err) + assert.NotNil(t, app.Status.LastSeenError) + assert.Equal(t, v1beta1.FlinkApplicationSubmittingJob, app.Status.Phase) + + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, app.Status.Phase) + assert.Equal(t, "", app.Status.VersionStatuses[1].JobStatus.JobID) + + // We should have moved to DeployFailed without affecting the existing job + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, app.Status.Phase) + assert.Equal(t, "jobId", app.Status.VersionStatuses[0].JobStatus.JobID) + +} + +func TestBlueGreenDeployWithSavepointDisabled(t *testing.T) { + deployHash := "appHashTest" + app := v1beta1.FlinkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "flink", + }, + Spec: v1beta1.FlinkApplicationSpec{ + JarName: "job.jar", + Parallelism: 5, + EntryClass: "com.my.Class", + ProgramArgs: "--test", + DeploymentMode: v1beta1.DeploymentModeBlueGreen, + SavepointDisabled: true, + }, + Status: v1beta1.FlinkApplicationStatus{ + Phase: v1beta1.FlinkApplicationClusterStarting, + DeployHash: deployHash, + DeployVersion: v1beta1.GreenFlinkApplication, + VersionStatuses: []v1beta1.FlinkApplicationVersionStatus{ + { + JobStatus: v1beta1.FlinkJobStatus{ + JobID: "jobId", + State: v1beta1.Running, + }, + VersionHash: deployHash, + Version: v1beta1.GreenFlinkApplication, + }, + {}, + }, + }, + } + stateMachineForTest := getTestStateMachine() + mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) + + mockFlinkController.GetJobForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) { + return &client.FlinkJobOverview{ + JobID: "jobID2", + State: client.Running, + }, nil + + } + + mockFlinkController.IsClusterReadyFunc = func(ctx context.Context, application *v1beta1.FlinkApplication) (b bool, err error) { + return true, nil + } + + mockFlinkController.IsServiceReadyFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (b bool, err error) { + return true, nil + } + err := stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + assert.Equal(t, v1beta1.FlinkApplicationSubmittingJob, app.Status.Phase) +} + +func TestDeleteBlueGreenDeployment(t *testing.T) { + deployHash := "deployHashDelete" + updateHash := "updateHashDelete" + app := v1beta1.FlinkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "flink", + }, + Spec: v1beta1.FlinkApplicationSpec{ + JarName: "job.jar", + Parallelism: 5, + EntryClass: "com.my.Class", + ProgramArgs: "--test", + DeploymentMode: v1beta1.DeploymentModeBlueGreen, + SavepointDisabled: true, + }, + Status: v1beta1.FlinkApplicationStatus{ + Phase: v1beta1.FlinkApplicationDeleting, + DeployHash: deployHash, + DeployVersion: v1beta1.GreenFlinkApplication, + UpdatingHash: updateHash, + UpdatingVersion: v1beta1.BlueFlinkApplication, + VersionStatuses: []v1beta1.FlinkApplicationVersionStatus{ + { + JobStatus: v1beta1.FlinkJobStatus{ + JobID: "greenId", + State: v1beta1.Running, + }, + VersionHash: deployHash, + Version: v1beta1.GreenFlinkApplication, + }, + { + JobStatus: v1beta1.FlinkJobStatus{ + JobID: "blueId", + State: v1beta1.Running, + }, + VersionHash: deployHash, + Version: v1beta1.BlueFlinkApplication, + }, + }, + }, + } + + stateMachineForTest := getTestStateMachine() + mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) + + jobCtr1 := 0 + jobCtr2 := 0 + mockFlinkController.GetJobToDeleteForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) { + if hash == deployHash { + jobCtr1++ + if jobCtr1 <= 2 { + return &client.FlinkJobOverview{ + JobID: "greenId", + State: client.Running, + }, nil + } + return &client.FlinkJobOverview{ + JobID: "greenId", + State: client.Canceled, + }, nil + } + + jobCtr2++ + if jobCtr2 <= 2 { + return &client.FlinkJobOverview{ + JobID: "blueId", + State: client.Running, + }, nil + } + + return &client.FlinkJobOverview{ + JobID: "blueId", + State: client.Canceled, + }, nil + + } + triggerID := "t1" + savepointCtr := 0 + mockFlinkController.SavepointFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, isCancel bool, jobID string) (string, error) { + return triggerID, nil + } + + mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) (*client.SavepointResponse, error) { + if savepointCtr == 0 { + assert.Equal(t, deployHash, hash) + assert.Equal(t, "greenId", jobID) + } else { + assert.Equal(t, updateHash, hash) + assert.Equal(t, "blueId", jobID) + } + savepointCtr++ + return &client.SavepointResponse{ + SavepointStatus: client.SavepointStatusResponse{ + Status: client.SavePointCompleted, + }, + Operation: client.SavepointOperationResponse{ + Location: testSavepointLocation + hash, + }, + }, nil + } + // Go through deletes until both applications are deleted + err := stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + assert.Equal(t, 2, savepointCtr) + assert.Empty(t, app.Finalizers) + assert.Equal(t, testSavepointLocation+updateHash, app.Status.SavepointPath) +} + +func TestIncompatibleDeploymentModeSwitch(t *testing.T) { + app := v1beta1.FlinkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "flink", + }, + Spec: v1beta1.FlinkApplicationSpec{ + JarName: "job.jar", + Parallelism: 5, + EntryClass: "com.my.Class", + ProgramArgs: "--test", + DeploymentMode: v1beta1.DeploymentModeDual, + }, + } + + app.Status.Phase = v1beta1.FlinkApplicationRunning + stateMachineForTest := getTestStateMachine() + mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) + mockFlinkController.GetCurrentDeploymentsForAppFunc = func(ctx context.Context, application *v1beta1.FlinkApplication) (*common.FlinkDeployment, error) { + return nil, nil + } + + err := stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + assert.Equal(t, v1beta1.DeploymentModeDual, app.Status.DeploymentMode) + + // Try to switch from Dual to BlueGreen + app.Status.Phase = v1beta1.FlinkApplicationRunning + app.Spec.DeploymentMode = v1beta1.DeploymentModeBlueGreen + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, app.Status.Phase) + + app.Spec.DeploymentMode = v1beta1.DeploymentModeBlueGreen + app.Status.Phase = v1beta1.FlinkApplicationRunning + app.Status.DeploymentMode = "" + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + assert.Equal(t, v1beta1.DeploymentModeBlueGreen, app.Status.DeploymentMode) + + // Try to switch from BlueGreen to Dual + app.Status.Phase = v1beta1.FlinkApplicationRunning + app.Spec.DeploymentMode = v1beta1.DeploymentModeDual + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, app.Status.Phase) +} diff --git a/pkg/controller/k8/cluster.go b/pkg/controller/k8/cluster.go index 1da90a1c..3f4fe249 100644 --- a/pkg/controller/k8/cluster.go +++ b/pkg/controller/k8/cluster.go @@ -32,7 +32,7 @@ type ClusterInterface interface { GetDeploymentsWithLabel(ctx context.Context, namespace string, labelMap map[string]string) (*v1.DeploymentList, error) // Tries to fetch the value from the controller runtime manager cache, if it does not exist, call API server - GetService(ctx context.Context, namespace string, name string) (*coreV1.Service, error) + GetService(ctx context.Context, namespace string, name string, version string) (*coreV1.Service, error) GetServicesWithLabel(ctx context.Context, namespace string, labelMap map[string]string) (*coreV1.ServiceList, error) CreateK8Object(ctx context.Context, object runtime.Object) error @@ -90,7 +90,11 @@ type k8ClusterMetrics struct { getDeploymentFailure labeled.Counter } -func (k *Cluster) GetService(ctx context.Context, namespace string, name string) (*coreV1.Service, error) { +func (k *Cluster) GetService(ctx context.Context, namespace string, name string, version string) (*coreV1.Service, error) { + serviceName := name + if version != "" { + serviceName = name + "-" + version + } service := &coreV1.Service{ TypeMeta: metav1.TypeMeta{ APIVersion: coreV1.SchemeGroupVersion.String(), @@ -98,7 +102,7 @@ func (k *Cluster) GetService(ctx context.Context, namespace string, name string) }, } key := types.NamespacedName{ - Name: name, + Name: serviceName, Namespace: namespace, } err := k.cache.Get(ctx, key, service) diff --git a/pkg/controller/k8/mock/mock_k8.go b/pkg/controller/k8/mock/mock_k8.go index 9e3c79c7..de383800 100644 --- a/pkg/controller/k8/mock/mock_k8.go +++ b/pkg/controller/k8/mock/mock_k8.go @@ -10,7 +10,7 @@ import ( type GetDeploymentsWithLabelFunc func(ctx context.Context, namespace string, labelMap map[string]string) (*v1.DeploymentList, error) type CreateK8ObjectFunc func(ctx context.Context, object runtime.Object) error -type GetServiceFunc func(ctx context.Context, namespace string, name string) (*corev1.Service, error) +type GetServiceFunc func(ctx context.Context, namespace string, name string, version string) (*corev1.Service, error) type GetServiceWithLabelFunc func(ctx context.Context, namespace string, labelMap map[string]string) (*corev1.ServiceList, error) type UpdateK8ObjectFunc func(ctx context.Context, object runtime.Object) error type UpdateStatusFunc func(ctx context.Context, object runtime.Object) error @@ -40,9 +40,9 @@ func (m *K8Cluster) GetServicesWithLabel(ctx context.Context, namespace string, return nil, nil } -func (m *K8Cluster) GetService(ctx context.Context, namespace string, name string) (*corev1.Service, error) { +func (m *K8Cluster) GetService(ctx context.Context, namespace string, name string, version string) (*corev1.Service, error) { if m.GetServiceFunc != nil { - return m.GetServiceFunc(ctx, namespace, name) + return m.GetServiceFunc(ctx, namespace, name, version) } return nil, nil }