Skip to content

Commit

Permalink
[STRMCMP-1658] Enable Deploys to Fallback without State (#287)
Browse files Browse the repository at this point in the history
## overview
Add CRD parameter `fallbackWithoutState` to be enabled to allow deploys
to occur without a savepoint or checkpoint in the case that the
savepoint fails and there are no usable externalized checkpoints.

In the case that the savepoint and checkpoints are failing, this has a
similar effect as savepointDisabled. The benefit of this configuration
option is that it only exhibits this behavior when there is no other
path forward and thus acts as a fallback without potential intervention.
This is disabled by default.

The state graph does not change, but rather, the recovering state that
attempts to find an externalized checkpoint forwards to submittingJob in
the case that this is enabled and there is no externalized checkpoint
  • Loading branch information
sethsaperstein-lyft authored May 5, 2023
1 parent f57e2cd commit db5a5de
Show file tree
Hide file tree
Showing 16 changed files with 209 additions and 57 deletions.
2 changes: 2 additions & 0 deletions deploy/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ spec:
maxCheckpointRestoreAgeSeconds:
type: integer
minimum: 1
fallbackWithoutState:
type: boolean
jobManagerConfig:
type: object
properties:
Expand Down
4 changes: 4 additions & 0 deletions docs/crd.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,7 @@ Below is the list of fields in the custom resource and their description:
* **tearDownVersionHash** `type:string`
Used **only** with the BlueGreen deployment mode. This is set typically once a FlinkApplication successfully transitions to the `DualRunning` phase.
Once set, the application version corresponding to the hash is torn down. On successful teardown, the FlinkApplication transitions to a `Running` phase.

* **fallbackWithoutState** `type:bool`
Can be set to true to attempt to continue to submit a job without a savepoint in the case where
a savepoint cannot be taken and there are no external checkpoints to recover from.
10 changes: 1 addition & 9 deletions integ/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,7 @@ By default the tests create, use, and clean up the namespace
`flinkoperatortest`.

These tests use a sample Flink job [operator-test-app](/integ/operator-test-app/). The
tests currently use two images built from here:

* `lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1`
* `lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2`

Those images are available on our private Dockerhub registry, and you
will either need to pull them locally or give Kubernetes access to the
registry.
tests currently use two images built before the integration test is run.

### Setup

Expand Down Expand Up @@ -123,4 +116,3 @@ Helpers:
`kubectl patch FlinkApplication invalidcanceljob -p '{"metadata":{"finalizers":[]}}' --type=merge`
- Set default namespace
`kubectl config set-context --current --namespace=flinkoperatortest`

1 change: 0 additions & 1 deletion integ/blue_green_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
)

func WaitForUpdate(c *C, s *IntegSuite, name string, updateFn func(app *v1beta1.FlinkApplication), phase v1beta1.FlinkApplicationPhase, failurePhase v1beta1.FlinkApplicationPhase) *v1beta1.FlinkApplication {

// update with new image.
app, err := s.Util.Update(name, updateFn)
c.Assert(err, IsNil)
Expand Down
117 changes: 104 additions & 13 deletions integ/checkpoint_failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1"
"github.com/prometheus/common/log"
. "gopkg.in/check.v1"
coreV1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -19,12 +20,12 @@ func failingJobTest(s *IntegSuite, c *C, testName string, causeFailure func()) {

config.ObjectMeta.Labels["integTest"] = testName

c.Assert(s.Util.CreateFlinkApplication(config), IsNil,
Commentf("Failed to create flink application"))

// Cause it to fail
causeFailure()

c.Assert(s.Util.CreateFlinkApplication(config), IsNil,
Commentf("Failed to create flink application"))

c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil)

// wait a bit for it to start failing
Expand Down Expand Up @@ -63,25 +64,115 @@ func failingJobTest(s *IntegSuite, c *C, testName string, causeFailure func()) {
log.Info("All pods torn down")
}

// Tests that we correctly handle updating a job with a checkpoint timeout
func (s *IntegSuite) TestCheckpointTimeout(c *C) {
log.Info("Starting test TestCheckpointTimeout")

failingJobTest(s, c, "checkpointtimeout", func() {
// cause checkpoints to take 120 seconds
err := s.Util.ExecuteCommand("minikube", "ssh", "echo 120000 >> /tmp/checkpoints/checkpoint_delay && sudo chmod 0644 /tmp/checkpoints/checkpoint_delay")
c.Assert(err, IsNil)
})
log.Info("Completed test TestCheckpointTimeout")
}

func appUpdate(app *v1beta1.FlinkApplication) *v1beta1.FlinkApplication {
app.Spec.Image = NewImage
skipFailureEnvVar := coreV1.EnvVar{Name: "SKIP_INDUCED_FAILURE", Value: "true"}
app.Spec.JobManagerConfig.EnvConfig.Env = append(app.Spec.JobManagerConfig.EnvConfig.Env, skipFailureEnvVar)
app.Spec.TaskManagerConfig.EnvConfig.Env = append(app.Spec.TaskManagerConfig.EnvConfig.Env, skipFailureEnvVar)
var maxCheckpointRestoreAgeSeconds int32 = 1
app.Spec.MaxCheckpointRestoreAgeSeconds = &maxCheckpointRestoreAgeSeconds
return app
}

func failingTaskTest(s *IntegSuite, c *C, testName string, fallbackWithoutState bool, deployShouldFail bool, causeFailure func()) {
config, err := s.Util.ReadFlinkApplication("test_app.yaml")
c.Assert(err, IsNil, Commentf("Failed to read test app yaml"))
config.Name = testName + "job"
config.Spec.DeleteMode = v1beta1.DeleteModeForceCancel
config.Spec.FallbackWithoutState = fallbackWithoutState
config.ObjectMeta.Labels["integTest"] = testName

// Avoid external checkpoints to be used in recovery stage during update
err = s.Util.ExecuteCommand("minikube", "ssh", "echo 120000 >> /tmp/checkpoints/checkpoint_delay && sudo chmod 0644 /tmp/checkpoints/checkpoint_delay")
c.Assert(err, IsNil)

c.Assert(s.Util.CreateFlinkApplication(config), IsNil,
Commentf("Failed to create flink application"))

c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil)

// Cause it to fail
causeFailure()

// wait a bit for it to start failing
time.Sleep(5 * time.Second)

// get app details
app, err := s.Util.GetFlinkApplication(config.Name)
c.Assert(err, IsNil)

if deployShouldFail {
// Try to update it
app, err := s.Util.GetFlinkApplication(config.Name)
c.Assert(err, IsNil)
app = appUpdate(app)
_, err = s.Util.FlinkApps().Update(app)
c.Assert(err, IsNil)

// because the checkpoint will fail, the app should move to deploy failed
c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationDeployFailed), IsNil)

// And the job should not have been updated
newApp, err := s.Util.GetFlinkApplication(config.Name)
c.Assert(err, IsNil)
c.Assert(newApp.Status.JobStatus.JobID, Equals, app.Status.JobStatus.JobID)
} else {
// Try to update it with app that does not fail on checkpoint
newApp := WaitUpdateAndValidate(c, s, config.Name, func(app *v1beta1.FlinkApplication) {
appUpdate(app)
}, v1beta1.FlinkApplicationDeployFailed)

// Check job updated and started without savepointPath
c.Assert(newApp.Status.JobStatus.JobID, Not(Equals), app.Status.JobStatus.JobID)
c.Assert(newApp.Spec.SavepointPath, Equals, "")

// Check new app has no failures
endpoint := fmt.Sprintf("jobs/%s", newApp.Status.JobStatus.JobID)
_, err = s.Util.FlinkAPIGet(newApp, endpoint)
c.Assert(err, IsNil)
}

// delete the application and ensure everything is cleaned up successfully
c.Assert(s.Util.FlinkApps().Delete(app.Name, &v1.DeleteOptions{}), IsNil)

for {
pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
List(v1.ListOptions{LabelSelector: "integTest=" + testName})
c.Assert(err, IsNil)
if len(pods.Items) == 0 {
break
}
}
log.Info("All pods torn down")
}

// Tests that we correctly handle updating a job with task failures
func (s *IntegSuite) TestJobWithTaskFailures(c *C) {
log.Info("Starting test TestJobWithTaskFailures")

failingJobTest(s, c, "taskfailure", func() {
failingTaskTest(s, c, "taskfailure", false, true, func() {
err := s.Util.ExecuteCommand("minikube", "ssh", "touch /tmp/checkpoints/fail && chmod 0644 /tmp/checkpoints/fail")
c.Assert(err, IsNil)
})
log.Info("Completed test TestJobWithTaskFailures")
}

// Tests that we correctly handle updating a job with a checkpoint timeout
func (s *IntegSuite) TestCheckpointTimeout(c *C) {
log.Info("Starting test TestCheckpointTimeout")

failingJobTest(s, c, "checkpointtimeout", func() {
// cause checkpoints to take 120 seconds
err := s.Util.ExecuteCommand("minikube", "ssh", "echo 120000 >> /tmp/checkpoints/checkpoint_delay && sudo chmod 0644 /tmp/checkpoints/checkpoint_delay")
func (s *IntegSuite) TestSavepointCheckpointFailureFallback(c *C) {
log.Info("Starting test TestSavepointCheckpointFailureFallback")
failingTaskTest(s, c, "recoveryfallback", true, false, func() {
err := s.Util.ExecuteCommand("minikube", "ssh", "touch /tmp/checkpoints/fail && chmod 0644 /tmp/checkpoints/fail")
c.Assert(err, IsNil)
})
log.Info("Completed test TestCheckpointTimeout")
log.Info("Completed test TestSavepointCheckpointFailureFallback")
}
13 changes: 7 additions & 6 deletions integ/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ func (s *IntegSuite) SetUpSuite(c *C) {

if runDirect {
config := controllerConfig.Config{
LimitNamespace: namespace,
UseProxy: true,
ResyncPeriod: flyteConfig.Duration{Duration: 3 * time.Second},
MaxErrDuration: flyteConfig.Duration{Duration: 60 * time.Second},
MetricsPrefix: "flinkk8soperator",
ProxyPort: flyteConfig.Port{Port: 8001},
LimitNamespace: namespace,
UseProxy: true,
ResyncPeriod: flyteConfig.Duration{Duration: 3 * time.Second},
MaxErrDuration: flyteConfig.Duration{Duration: 6000 * time.Second},
FlinkJobVertexTimeout: flyteConfig.Duration{Duration: 3 * time.Minute},
MetricsPrefix: "flinkk8soperator",
ProxyPort: flyteConfig.Port{Port: 8001},
}

log.Info("Running operator directly")
Expand Down
2 changes: 1 addition & 1 deletion integ/operator-test-app/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ ENV PATH=$FLINK_HOME/bin:$HADOOP_HOME/bin:$MAVEN_HOME/bin:$PATH
COPY . /code

# Configure Flink version
ENV FLINK_VERSION=1.11.6 \
ENV FLINK_VERSION=1.8.1 \
HADOOP_SCALA_VARIANT=scala_2.12

# Install dependencies
Expand Down
17 changes: 15 additions & 2 deletions integ/operator-test-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,25 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.11.6</version>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.11.6</version>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.1</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.8.1</version>
<scope>provided</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static class MaybeFail implements MapFunction<Long, Long> {

@Override
public Long map(Long x) throws Exception {
if (new File("/checkpoints/fail").exists()) {
if (new File("/checkpoints/fail").exists() && !Settings.skipInducedFailure()) {
throw new RuntimeException("FAILED!!!");
}

Expand Down
9 changes: 9 additions & 0 deletions integ/operator-test-app/src/main/java/com/lyft/Settings.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.lyft;

public class Settings {
private static final String SKIP_INDUCED_FAILURE = "SKIP_INDUCED_FAILURE";

public static boolean skipInducedFailure() {
return System.getenv(SKIP_INDUCED_FAILURE) != null && System.getenv(SKIP_INDUCED_FAILURE).equals("true");
}
}
24 changes: 9 additions & 15 deletions integ/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,15 @@
# Test App Setup

# TODO: upgrade flink test app from 1.8
#cd integ/operator-test-app
#export TEST_APP_IMAGE=operator-test-app:$(git rev-parse HEAD)
#docker build -t $TEST_APP_IMAGE .
#docker tag $TEST_APP_IMAGE flink-test-app:local.1
#docker tag $TEST_APP_IMAGE flink-test-app:local.2
#minikube image load flink-test-app:local.1
#minikube image load flink-test-app:local.2
#
#cd ../../

docker pull lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1
docker pull lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2
minikube image load lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1
minikube image load lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2

cd integ/operator-test-app
export TEST_APP_IMAGE=operator-test-app:$(git rev-parse HEAD)
docker build -t $TEST_APP_IMAGE .
docker tag $TEST_APP_IMAGE operator-test-app:local.1
docker tag $TEST_APP_IMAGE operator-test-app:local.2
minikube image load operator-test-app:local.1
minikube image load operator-test-app:local.2

cd ../../

# Operator Setup

Expand Down
2 changes: 1 addition & 1 deletion integ/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const NewImage = "lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2"
const NewImage = "operator-test-app:local.2"

func updateAndValidate(c *C, s *IntegSuite, name string, updateFn func(app *v1beta1.FlinkApplication), failurePhase v1beta1.FlinkApplicationPhase) *v1beta1.FlinkApplication {
app, err := s.Util.Update(name, updateFn)
Expand Down
2 changes: 1 addition & 1 deletion integ/test_app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
labels:
environment: development
spec:
image: lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.1
image: operator-test-app:local.1
imagePullPolicy: IfNotPresent
imagePullSecrets:
- name: dockerhub
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/app/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type FlinkApplicationSpec struct {
ForceRollback bool `json:"forceRollback"`
MaxCheckpointRestoreAgeSeconds *int32 `json:"maxCheckpointRestoreAgeSeconds,omitempty"`
TearDownVersionHash string `json:"tearDownVersionHash,omitempty"`
FallbackWithoutState bool `json:"fallbackWithoutState"`
}

type FlinkConfig map[string]interface{}
Expand Down
26 changes: 19 additions & 7 deletions pkg/controller/flinkapplication/flink_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ func (s *FlinkStateMachine) handle(ctx context.Context, application *v1beta1.Fli
updateApplication, appErr = s.handleApplicationDeleting(ctx, application)
case v1beta1.FlinkApplicationDualRunning:
updateApplication, appErr = s.handleDualRunning(ctx, application)

}

if !v1beta1.IsRunningPhase(appPhase) {
Expand Down Expand Up @@ -587,17 +586,30 @@ func (s *FlinkStateMachine) handleApplicationRecovering(ctx context.Context, app
// (but if the JM is unavailable, our options there might be limited)

// try to find an externalized checkpoint
failDeploy := false
path, err := s.flinkController.FindExternalizedCheckpoint(ctx, app, app.Status.DeployHash)
if err != nil {
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "RecoveryFailed",
"Failed to get externalized checkpoint config, could not recover. "+
"Manual intervention is needed.")
return s.deployFailed(app)
"Failed to get externalized checkpoint config.")
failDeploy = true
} else if path == "" {
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "RecoveryFailed",
"No externalized checkpoint found, could not recover. Make sure that "+
"externalized checkpoints are enabled in your job's checkpoint configuration. Manual intervention "+
"is needed to recover.")
"No externalized checkpoint found. Make sure that "+
"externalized checkpoints are enabled in your job's checkpoint configuration.")
failDeploy = true
}
// try to continue without state if configured else fail
if failDeploy && app.Spec.FallbackWithoutState {
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "RestoringWithoutExternalizedCheckpoint",
"FallbackWithoutState enabled. Proceeding without a checkpoint or savepoint.")
s.flinkController.UpdateLatestJobID(ctx, app, "")
s.updateApplicationPhase(app, v1beta1.FlinkApplicationSubmittingJob)
return statusChanged, nil
}

if failDeploy {
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "RecoveryFailed",
"Could not recover. Manual intervention is needed to recover.")
return s.deployFailed(app)
}

Expand Down
Loading

0 comments on commit db5a5de

Please sign in to comment.