Skip to content

Commit

Permalink
Implement DualRunning state to enable blue-green deploys (#186)
Browse files Browse the repository at this point in the history
* Working version 1

* Create setup for blue green deploys

* [WIP] Setup status sub-resource for blue green deploys

* Updates

* fix bug

* Fixes

* Make running jobs calculation idempotent

* Fix bugs

* Reset running jobs in recovering phase

* Make status index calculation simpler

* Add container env and annotations

* Update CRD to v1beta2

* Update CRD to v1beta2

* Fix CRD update issues

* Fix lint

* Merge master and restore v1beta1 to original version

* Upgrade integ test to v1beta2

* Backward compatibility changes

* Work around status subresource bug

* Rename status array to VersionStatuses and add comment on k8s bug

* Remove DesiredApplicationCount

* Minor updates

* Minor updates

* Initialize counter

* Handle edge case for jobId

* Debug

* Debug

* fixes

* Fix edge case

* Fix unit tests

* Debug logs

* Fix overwriting of versionstatuses

* Remove debug logs

* Implement DualRunning state

* Happy path:Add DualRunning and Teardown states

* Add unit tests and make Teardown a bool

* Minor fixes

* Merge master

* Revert CRD upgrade

* Keep Status.ClusterStatus and Status.JobStatus unchanged for Dual mode

* Remove unwarranted changes

* Add version to deployment names

* Account for SavepointDisabled

* Add integration test and fix delete

* Fix build

* Handle delete when there are two running jobs

* Handle delete for blue green deploys correctly

* Fix ingress

* Fix ingress URL and event name

* Allow teardown to be a version instead of bool and force-cancel job in teardown

* Redesign teardown feature

* Disallow switching between deployment modes

* Disallow switching between deployment modes

* Disallow deployment mode change

* Fixes and address all review comments

* Add docs and update state machine diagram

* Update links to state machine pngs
  • Loading branch information
lrao100 committed Apr 11, 2020
1 parent 4c377a3 commit 9fe7fc1
Show file tree
Hide file tree
Showing 27 changed files with 1,684 additions and 225 deletions.
2 changes: 1 addition & 1 deletion deploy/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ spec:
type: boolean
deploymentMode:
type: string
enum: [Dual]
enum: [Dual, BlueGreen]
rpcPort:
type: integer
minimum: 1
Expand Down
46 changes: 46 additions & 0 deletions docs/blue_green_state_machine.mmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
%% This file can be compiled into blue_green_state_machine.png by installing mermaidjs (https://mermaidjs.github.io/) and running
%% mmdc -i blue_green_state_machine.mmd -o blue_green_state_machine.png -w 1732 -b transparent

graph LR
New --> ClusterStarting

subgraph Running
Running
DeployFailed
end

subgraph Updating
Running --> Updating
Updating --> ClusterStarting
DeployFailed --> Updating

ClusterStarting -- savepoint disabled --> SubmittingJob
ClusterStarting -- savepoint enabled --> Savepointing
ClusterStarting -- Create fails --> DeployFailed

Savepointing --> SubmittingJob
Savepointing -- Savepoint fails --> Recovering

Recovering --> SubmittingJob
Recovering -- No externalized checkpoint --> RollingBackJob

SubmittingJob -- first deploy --> Running
SubmittingJob -- updating existing application --> DualRunning
SubmittingJob -- job start fails --> RollingBackJob
RollingBackJob --> DeployFailed

DualRunning -- tearDownVersionHash set --> Running
DualRunning -- tear down fails --> DeployFailed
end

linkStyle 4 stroke:#303030
linkStyle 5 stroke:#303030
linkStyle 6 stroke:#FF0000
linkStyle 8 stroke:#FF0000
linkStyle 10 stroke:#FF0000
linkStyle 11 stroke:#303030
linkStyle 12 stroke:#303030
linkStyle 13 stroke:#FF0000
linkStyle 14 stroke:#FF0000
linkStyle 15 stroke:#303030
linkStyle 16 stroke:#FF0000
Binary file added docs/blue_green_state_machine.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 9 additions & 2 deletions docs/crd.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,12 @@ Below is the list of fields in the custom resource and their description
Optional map of flink configuration, which passed on to the deployment as environment variable with `OPERATOR_FLINK_CONFIG`

* **deploymentMode** `type:DeploymentMode`
Indicates the type of deployment that operator should perform if the custom resource is updated. Currently only Dual is supported.
Indicates the type of deployment that operator should perform if the custom resource is updated. Currently two deployment modes, Dual and BlueGreen are supported.

`Dual` This deployment mode is intended for applications where downtime during deployment needs to be as minimal as possible. In this deployment mode, the operator brings up a second Flink cluster with the new image, while the original Flink cluster is still active. Once the pods and containers in the new flink cluster are ready, the Operator cancels the job in the first Cluster with savepoint, deletes the cluster and starts the job in the second cluster. (More information in the state machine section below). This mode is suitable for real time processing applications.

`BlueGreen` This deployment mode is intended for applications where downtime during deployment needs to be zero. In this mode, the operator brings up a whole new flink job/cluster along side the original Flink job. The two versions of Flink jobs are differentiated by a color: blue/green. Once the new Flink application version is created, the application transitions to a `DualRunning` phase. To transition back from the `DualRunning` phase to a single application version, users must
set a `tearDownVersionHash` that enables the operator to teardown the version corresponding to the hash specified.

* **deleteMode** `type:DeleteMode`
Indicates how Flink jobs are torn down when the FlinkApplication resource is deleted

Expand Down Expand Up @@ -134,3 +136,8 @@ Below is the list of fields in the custom resource and their description
is used during the operator update workflow. This default exists only
to protect one from accidentally restarting the application using a very old checkpoint (which might put your application
under huge load). **Note:** this doesn't affect the flink application's checkpointing mechanism in anyway.

* **tearDownVersionHash** `type:string`
Used **only** with the BlueGreen deployment mode. This is set typically once a FlinkApplication successfully transitions to the `DualRunning` phase.
Once set, the application version corresponding to the hash is torn down. On successful teardown, the FlinkApplication transitions to a `Running` phase.

2 changes: 1 addition & 1 deletion docs/state_machine.mmd → docs/dual_state_machine.mmd
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%% This file can be compiled into state_machine.png by installing mermaidjs (https://mermaidjs.github.io/) and running
%% mmdc -i state_machine.mmd -o state_machine.png -w 1732 -b transparent
%% mmdc -i state_machine.mmd -o dual_state_machine.png -w 1732 -b transparent

graph LR
New --> ClusterStarting
Expand Down
46 changes: 34 additions & 12 deletions docs/state_machine.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ with the underlying Kubernetes resources, it takes the necessary actions to upda
Typically this will involve traversing the state machine. The final desired state is `Running`, which indicates that a
healthy Flink cluster has been started and the Flink job has been successfully submitted.

The full state machine looks like this:
![Flink operator state machine](state_machine.png)

The state machine for a `Dual` deployment mode (default) looks like this:
![Flink operator state machine for Dual deployment mode](dual_state_machine.png)
The state machine for a `BlueGreen` deployment mode looks like this:
![Flink operator state machine for BlueGreen deployment mode](blue_green_state_machine.png)
# States

### New / Updating
Expand All @@ -17,52 +18,73 @@ The full state machine looks like this:
created, and we transition to the ClusterStarting phase to monitor. The deployment objects created by the operator are
labelled and annotated as indicated in the custom resource. The operator also sets the corresponding environment
variables and arguments for the containers to start up the Flink application from the image.

#### BlueGreen deployment mode
Along with the annotations and labels in the custom resources, the deployment objects are suffixed with the application
version name, that is either `blue` or `green`. The version name is also injected into the container environment.
Additionally, the external URLs for each of the versions is also suffixed with the color.
### ClusterStarting
In this state, the operator monitors the Flink cluster created in the New state. Once it successfully starts, we check
if the spec has `savepointDisabled` field set to true. If yes, we transition to `Cancelling` state else to `Savepointing`.
If we are unable to start the cluster for some reason (an invalid
image, bad configuration, not enough Kubernetes resources, etc.), we transition to the `DeployFailed` state.

#### BlueGreen deployment mode
In this mode, once the new cluster is started, we transition into the `Savepointing`/`SubmittingJob` mode based on the `savepointDisabled`
flag. There is no job cancellation involved in the update process during a BlueGreen deployment.
### Cancelling
In this state, the operator attempts to cancel the running job (if existing) and transition to `SubmittingJob` state.
If it fails, we transition to `RollingBack`.

#### BlueGreen deployment mode
This state is not reached during a BlueGreen deployment
### Savepointing
In the `Savepointing` state, the operator attempts to cancel the existing job with a
[savepoint](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html) (if this is the first
deploy for the FlinkApplication and there is no existing job, we transition straight to `SubmittingJob`). The operator
monitors the savepoint process until it succeeds or fails. If savepointing succeeds, we move to the `SubmittingJob`
phase. If it fails, we move to the `Recovering` phase to attempt to recover from an externalized checkpoint.

#### BlueGreen deployment mode
In this state, during a BlueGreen deployment, the currently running Flink job is savepointed (without cancellation).
### Recovering
If savepointing fails, the operator will look for an
[externalized checkpoint](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint)
and attempt to use that for recovery. If one is not availble, the application transitions to the `DeployFailed` state.
Otherwise, it transitions to the `SubmittingJob` state.

#### BlueGreen deployment mode
There is no change in behavior for this state during a BlueGreen deployment.
### SubmittingJob
In this state, the operator waits until the JobManager is ready, then attempts to submit the Flink job to the cluster.
If we are updating an existing job or the user has specified a savepoint to restore from, that will be used. Once the
job is successfully running the application transitions to the `Running` state. If the job submission fails we
transition to the `RollingBack` state.

#### BlueGreen deployment mode
During a BlueGreen deployment, the operator submits a job to the newly created cluster (with a version that's different from the
originally running Flink application version).
### RollingBack
This state is reached when, in the middle of a deploy, the old job has been canceled but the new job did not come up
successfully. In that case we will attempt to roll back by resubmitting the old job on the old cluster, after which
we transition to the `DeployFailed` state.

#### BlueGreen deployment mode
In the BlueGreen deployment mode, the operator does not attempt to resubmit the old job (as we never cancel it in the first place).
We transition directly to the `DeployFailed` state.
### Running
The `Running` state indicates that the FlinkApplication custom resource has reached the desired state, and the job is
running in the Flink cluster. In this state the operator continuously checks if the resource has been modified and
monitors the health of the Flink cluster and job.

#### BlueGreen deployment mode
There is no change in behavior for this state during a BlueGreen deployment.
### DeployFailed
The `DeployFailed` state operates exactly like the `Running` state. It exists to inform the user that an attempted
update has failed, i.e., that the FlinkApplication status does not currently match the desired spec. In this state,
the user should look at the Flink logs and Kubernetes events to determine what went wrong. The user can then perform
a new deploy by updating the FlinkApplication.

#### BlueGreen deployment mode
There is no change in behavior for this state during a BlueGreen deployment.
### Deleting
This state indicates that the FlinkApplication resource has been deleted. The operator will clean up the job according
to the DeleteMode configured. Once all clean up steps have been performed the FlinkApplication will be deleted.
#### BlueGreen deployment mode
In this mode, if there are two application versions running, both versions are deleted (as per the `DeleteMode` configuration).
### DualRunning
This state is only ever reached when the FlinkApplication is deployed with the BlueGreen deployment mode. In this state,
there are two application versions running — `blue` and `green`. Once a user is ready to tear down one of the versions, they
set a `tearDownVersionHash`. If this is set, the operator then tears down the application version corresponding to
the `tearDownVersionHash`. Once the teardown is complete, we transition back to the `Running` state.
147 changes: 147 additions & 0 deletions integ/blue_green_deployment_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package integ

import (
"time"

"github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1"
"github.com/prometheus/common/log"
. "gopkg.in/check.v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func WaitForUpdate(c *C, s *IntegSuite, name string, updateFn func(app *v1beta1.FlinkApplication), phase v1beta1.FlinkApplicationPhase, failurePhase v1beta1.FlinkApplicationPhase) *v1beta1.FlinkApplication {

// update with new image.
app, err := s.Util.Update(name, updateFn)
c.Assert(err, IsNil)

for {
// keep trying until the new job is launched
newApp, err := s.Util.GetFlinkApplication(name)
c.Assert(err, IsNil)
if newApp.Status.VersionStatuses[s.Util.GetCurrentStatusIndex(app)].JobStatus.JobID != "" {
break
}
time.Sleep(100 * time.Millisecond)
}

c.Assert(s.Util.WaitForPhase(name, phase, failurePhase), IsNil)
c.Assert(s.Util.WaitForAllTasksRunning(name), IsNil)

newApp, _ := s.Util.GetFlinkApplication(name)
return newApp
}

func (s *IntegSuite) TestUpdateWithBlueGreenDeploymentMode(c *C) {

testName := "bluegreenupdate"
const finalizer = "bluegreen.finalizers.test.com"

// start a simple app
config, err := s.Util.ReadFlinkApplication("test_app.yaml")
c.Assert(err, IsNil, Commentf("Failed to read test app yaml"))

config.Name = testName + "job"
config.Spec.DeploymentMode = v1beta1.DeploymentModeBlueGreen
config.ObjectMeta.Labels["integTest"] = testName
config.Finalizers = append(config.Finalizers, finalizer)

c.Assert(s.Util.CreateFlinkApplication(config), IsNil,
Commentf("Failed to create flink application"))

c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil)
c.Assert(s.Util.WaitForAllTasksRunning(config.Name), IsNil)

pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
List(v1.ListOptions{LabelSelector: "integTest=" + testName})
c.Assert(err, IsNil)
c.Assert(len(pods.Items), Equals, 3)
for _, pod := range pods.Items {
c.Assert(pod.Spec.Containers[0].Image, Equals, config.Spec.Image)
}

// test updating the app with a new image
newApp := WaitForUpdate(c, s, config.Name, func(app *v1beta1.FlinkApplication) {
app.Spec.Image = NewImage
}, v1beta1.FlinkApplicationDualRunning, v1beta1.FlinkApplicationDeployFailed)

c.Assert(newApp.Spec.Image, Equals, NewImage)
c.Assert(newApp.Status.SavepointPath, NotNil)

pods, err = s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
List(v1.ListOptions{LabelSelector: "integTest=" + testName})
c.Assert(err, IsNil)
// We have 2 applications running
c.Assert(len(pods.Items), Equals, 6)
c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationDualRunning, v1beta1.FlinkApplicationDeployFailed), IsNil)
c.Assert(s.Util.GetJobID(newApp), NotNil)
c.Assert(newApp.Status.UpdatingVersion, Equals, v1beta1.BlueFlinkApplication)
c.Assert(newApp.Status.DeployVersion, Equals, v1beta1.GreenFlinkApplication)

// TearDownVersionHash
teardownVersion := newApp.Status.DeployVersion
hashToTeardown := newApp.Status.DeployHash
oldHash := newApp.Status.DeployHash
log.Infof("Tearing down version %s", teardownVersion)
newApp = WaitForUpdate(c, s, config.Name, func(app *v1beta1.FlinkApplication) {
app.Spec.TearDownVersionHash = hashToTeardown
}, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed)

// wait for the old cluster to be cleaned up
for {
pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
List(v1.ListOptions{LabelSelector: "flink-app-hash=" + oldHash})
c.Assert(err, IsNil)
if len(pods.Items) == 0 {
break
}
time.Sleep(100 * time.Millisecond)
}

c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil)
c.Assert(newApp.Status.TeardownHash, NotNil)
c.Assert(newApp.Status.DeployVersion, Equals, v1beta1.BlueFlinkApplication)
c.Assert(newApp.Status.VersionStatuses[0].JobStatus.JobID, NotNil)
c.Assert(newApp.Status.VersionStatuses[1].JobStatus, Equals, v1beta1.FlinkJobStatus{})

pods, err = s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
List(v1.ListOptions{LabelSelector: "flink-app-hash=" + oldHash})
for _, pod := range pods.Items {
log.Infof("Pod name %s", pod.Name)
c.Assert(pod.Labels["flink-application-version"], Not(Equals), teardownVersion)
}

c.Assert(err, IsNil)
c.Assert(len(pods.Items), Equals, 0)

// cleanup
c.Assert(s.Util.FlinkApps().Delete(newApp.Name, &v1.DeleteOptions{}), IsNil)
var app *v1beta1.FlinkApplication
for {
app, err = s.Util.GetFlinkApplication(config.Name)
c.Assert(err, IsNil)
if len(app.Finalizers) == 1 && app.Finalizers[0] == finalizer {
break
}
time.Sleep(100 * time.Millisecond)
}

job := s.Util.GetJobOverview(app)
c.Assert(job["status"], Equals, "CANCELED")
c.Assert(app.Status.SavepointPath, NotNil)

// delete our finalizer
app.Finalizers = []string{}
_, err = s.Util.FlinkApps().Update(app)
c.Assert(err, IsNil)

for {
pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
List(v1.ListOptions{LabelSelector: "integTest=" + testName})
c.Assert(err, IsNil)
if len(pods.Items) == 0 {
break
}
}
log.Info("All pods torn down")
}
Loading

0 comments on commit 9fe7fc1

Please sign in to comment.