Skip to content

Commit

Permalink
[STRMCMP-1639] Integration Test Fix (#277)
Browse files Browse the repository at this point in the history
## overview
Integrations tests working in CI again

## Additional Info
- Change to minikube from microk8s. Largely due to being able to
reproduce the integration env locally. Iterating bugs that occurred only
in CI on microk8s due to memory was inefficient.
- Upgrade k8s to 1.20 from 1.13. Changes for v1 CRD's were introduced to
support 1.22 k8s however k8s client, on 1.14, for integration tests only
support v1beta1 which was deprecated in 1.22. Upgrading the client is
non-trivial and will be done separately. Using older k8s versions met
issues with docker and local system cgroup. 1.20 became the decided upon
version.
- Create service account, role, and role binding in CI since 1.20 has
RBAC by default. Disabling rbac is hacky and resulted in issues with the
kube proxy
- Remove base mount of tmp directory due to permission issues. Flink
application writes 0755. Integration test downgrades 0777 to 0755. Using
minikube ssh directly to create and delete the necessary files. This
creates a strong dependency on minikube which is acceptable for now.
- Comment out test for rescale given that default github runner only has
8gb memory and 2 cpu. Rescale needs 2 flink clusters up with above
average size which busts limits. Separate ticket created to address
using larger runners (in beta). Flink test apps were also scaled down to
parallelism 2 using 1 TM to fit on the github runner
- Changes made to build the test app image locally as opposed to pull
from dockerhub. Not in use currently as the tests are remaining on 1.8
due to simpler memory configuration until larger github runners are
used. Additional changes made to upgrade flink app from 1.8 to 1.11.
- Upgrade ubuntu for CI as ubuntu version is deprecated on april 1st
2023 by github
  • Loading branch information
sethsaperstein-lyft committed Mar 31, 2023
1 parent bd15d93 commit d3a6fe4
Show file tree
Hide file tree
Showing 16 changed files with 300 additions and 90 deletions.
51 changes: 25 additions & 26 deletions .github/workflows/actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
branches: [ master ]
jobs:
unit-tests:
runs-on: ubuntu-18.04
runs-on: ubuntu-22.04
defaults:
run:
working-directory: go/src/github.com/lyft/flinkk8soperator
Expand All @@ -27,7 +27,7 @@ jobs:
- name: test
run: make test_unit
lint:
runs-on: ubuntu-18.04
runs-on: ubuntu-22.04
defaults:
run:
working-directory: go/src/github.com/lyft/flinkk8soperator
Expand All @@ -47,27 +47,26 @@ jobs:
run: make install
- name: test
run: make lint
# TODO: restore this test
# integration-tests:
# runs-on: ubuntu-18.04
# defaults:
# run:
# working-directory: go/src/github.com/lyft/flinkk8soperator
# env:
# GOPATH: "/home/runner/work/flinkk8soperator/flinkk8soperator/go/"
# steps:
# - name: checkout
# uses: actions/checkout@v2
# with:
# fetch-depth: 1
# path: go/src/github.com/lyft/flinkk8soperator
# - name: install go
# uses: actions/setup-go@v2
# with:
# go-version: 1.12
# - name: install
# run: integ/install.sh
# - name: setup
# run: integ/setup.sh
# - name: test
# run: sudo "PATH=$PATH" "GOPATH=$GOPATH" integ/test.sh
integration-tests:
runs-on: ubuntu-22.04
defaults:
run:
working-directory: go/src/github.com/lyft/flinkk8soperator
env:
GOPATH: "/home/runner/work/flinkk8soperator/flinkk8soperator/go/"
steps:
- name: checkout
uses: actions/checkout@v2
with:
fetch-depth: 1
path: go/src/github.com/lyft/flinkk8soperator
- name: install go
uses: actions/setup-go@v2
with:
go-version: 1.12
- name: install
run: integ/install.sh
- name: setup
run: integ/setup.sh
- name: test
run: PATH=$PATH GOPATH=$GOPATH integ/test.sh
14 changes: 10 additions & 4 deletions docs/local_dev.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ to develop their applications locally.

## Run the operator

### Install [Docker for Mac](https://docs.docker.com/docker-for-mac/install/)
### Install [Minikube](https://minikube.sigs.k8s.io/docs/start/#what-youll-need)

You will want to start minikube on <=1.20, for example:
`minikube start --kubernetes-version=v1.20.15`

Once installed and running, enabled Kuberenetes in settings (from the
docker icon in the menu bar, click Preferences -> Kubernetes -> Enable
Kubernetes).

### (Optional) Setup kubernetes dashboard

Expand Down Expand Up @@ -46,6 +46,12 @@ $ cd flinkk8soperator
$ kubectl create -f deploy/crd.yaml
```

### Install permissions
``` bash
$ kubectl create -f deploy/role.yaml
$ kubectl create -f deploy/role-binding.yaml
```

### Start the operator

#### Option 1: run outside the kubernetes cluster
Expand Down
45 changes: 45 additions & 0 deletions integ/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,48 @@ variables. Supported options include:
You can also pass [gocheck](http://labix.org/gocheck) options to the
test runner. Particularly useful is `-check.vv` which will output logs
from the operator and Flink pods to help debugging test failures.

### Minikube Setup

Ideally we'd use k8s 1.16 to match the deployed k8s version, however, this
is non-trivial due to cgroup configurations. Instead, we will use a version
that is compatible with v1beta1 CRD's which corresponds to <1.22. CRD's v1
is only available with client >=1.16, however, the client used here is 1.14
and the upgrade is non-trivial.
TODO: https://jira.lyft.net/browse/STRMCMP-1659

Ran on:
- Go 1.12
- Docker desktop 4.5.0
- Minikube v1.29.0 (running 1.20.15)
- i9 Ventura 13.2.1
- GoLand 2021.3.3


1. Install Dependencies
Run `dep ensure -vendor-only`

3. Start minikube
`minikube start --kubernetes-version=v1.20.15`

4. Proxy minikube
`kubectl proxy --port 8001 &`

5. Set up test app images and operator image
`integ/setup.sh`

8. Set the following for the Go test:
Package path: `github.com/lyft/flinkk8soperator/integ`
Env: `INTEGRATION=true;OPERATOR_IMAGE=flinkk8soperator:local;RUN_DIRECT=true`
Program Args: `-timeout 40m -check.vv IntegTest`


Helpers:
- Kill kube proxy
`ps -ef | grep "kubectl proxy"`
`kill -9 <process_id>`
- Kill stuck flink app
`kubectl patch FlinkApplication invalidcanceljob -p '{"metadata":{"finalizers":[]}}' --type=merge`
- Set default namespace
`kubectl config set-context --current --namespace=flinkoperatortest`

6 changes: 4 additions & 2 deletions integ/blue_green_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func WaitForUpdate(c *C, s *IntegSuite, name string, updateFn func(app *v1beta1.
}

func (s *IntegSuite) TestUpdateWithBlueGreenDeploymentMode(c *C) {
log.Info("Starting test TestUpdateWithBlueGreenDeploymentMode")

testName := "bluegreenupdate"
const finalizer = "bluegreen.finalizers.test.com"
Expand All @@ -55,7 +56,7 @@ func (s *IntegSuite) TestUpdateWithBlueGreenDeploymentMode(c *C) {
pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
List(v1.ListOptions{LabelSelector: "integTest=" + testName})
c.Assert(err, IsNil)
c.Assert(len(pods.Items), Equals, 3)
c.Assert(len(pods.Items), Equals, 2)
for _, pod := range pods.Items {
c.Assert(pod.Spec.Containers[0].Image, Equals, config.Spec.Image)
}
Expand All @@ -72,7 +73,7 @@ func (s *IntegSuite) TestUpdateWithBlueGreenDeploymentMode(c *C) {
List(v1.ListOptions{LabelSelector: "integTest=" + testName})
c.Assert(err, IsNil)
// We have 2 applications running
c.Assert(len(pods.Items), Equals, 6)
c.Assert(len(pods.Items), Equals, 4)
c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationDualRunning, v1beta1.FlinkApplicationDeployFailed), IsNil)
c.Assert(s.Util.GetJobID(newApp), NotNil)
c.Assert(newApp.Status.UpdatingVersion, Equals, v1beta1.BlueFlinkApplication)
Expand Down Expand Up @@ -153,4 +154,5 @@ func (s *IntegSuite) TestUpdateWithBlueGreenDeploymentMode(c *C) {
}
}
log.Info("All pods torn down")
log.Info("Completed test TestUpdateWithBlueGreenDeploymentMode")
}
14 changes: 9 additions & 5 deletions integ/checkpoint_failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package integ

import (
"fmt"
"io/ioutil"
"os"
"time"

"github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1"
Expand Down Expand Up @@ -67,18 +65,24 @@ func failingJobTest(s *IntegSuite, c *C, testName string, causeFailure func()) {

// Tests that we correctly handle updating a job with task failures
func (s *IntegSuite) TestJobWithTaskFailures(c *C) {
log.Info("Starting test TestJobWithTaskFailures")

failingJobTest(s, c, "taskfailure", func() {
f, err := os.OpenFile(s.Util.CheckpointDir+"/fail", os.O_RDONLY|os.O_CREATE, 0666)
err := s.Util.ExecuteCommand("minikube", "ssh", "touch /tmp/checkpoints/fail && chmod 0644 /tmp/checkpoints/fail")
c.Assert(err, IsNil)
c.Assert(f.Close(), IsNil)
})
log.Info("Completed test TestJobWithTaskFailures")
}

// Tests that we correctly handle updating a job with a checkpoint timeout
func (s *IntegSuite) TestCheckpointTimeout(c *C) {
log.Info("Starting test TestCheckpointTimeout")

failingJobTest(s, c, "checkpointtimeout", func() {
// cause checkpoints to take 120 seconds
err := ioutil.WriteFile(s.Util.CheckpointDir+"/checkpoint_delay", []byte("120000"), 0644)
err := s.Util.ExecuteCommand("minikube", "ssh", "echo 120000 >> /tmp/checkpoints/checkpoint_delay && sudo chmod 0644 /tmp/checkpoints/checkpoint_delay")
c.Assert(err, IsNil)
})
log.Info("Completed test TestCheckpointTimeout")

}
9 changes: 5 additions & 4 deletions integ/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

set -e

sudo snap install microk8s --classic --channel=1.13/stable
microk8s.status --wait-ready
microk8s.enable dns
microk8s.enable registry
curl -LO -s https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
sudo install minikube-linux-amd64 /usr/local/bin/minikube

minikube config set memory 6800
minikube start --kubernetes-version=v1.20.15

sh boilerplate/lyft/golang_test_targets/dep_install.sh

Expand Down
14 changes: 10 additions & 4 deletions integ/job_cancellation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func WaitUpdateAndValidate(c *C, s *IntegSuite, name string, updateFn func(app *

// tests the workflow of job cancellation without savepoint
func (s *IntegSuite) TestJobCancellationWithoutSavepoint(c *C) {

log.Info("Starting test TestJobCancellationWithoutSavepoint")
testName := "cancelsuccess"
const finalizer = "simple.finalizers.test.com"

Expand All @@ -81,7 +81,7 @@ func (s *IntegSuite) TestJobCancellationWithoutSavepoint(c *C) {
pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
List(v1.ListOptions{LabelSelector: "integTest=" + testName})
c.Assert(err, IsNil)
c.Assert(len(pods.Items), Equals, 3)
c.Assert(len(pods.Items), Equals, 2)
for _, pod := range pods.Items {
c.Assert(pod.Spec.Containers[0].Image, Equals, config.Spec.Image)
}
Expand All @@ -97,7 +97,7 @@ func (s *IntegSuite) TestJobCancellationWithoutSavepoint(c *C) {
pods, err = s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
List(v1.ListOptions{LabelSelector: "integTest=" + testName})
c.Assert(err, IsNil)
c.Assert(len(pods.Items), Equals, 3)
c.Assert(len(pods.Items), Equals, 2)
for _, pod := range pods.Items {
c.Assert(pod.Spec.Containers[0].Image, Equals, NewImage)
}
Expand Down Expand Up @@ -131,11 +131,13 @@ func (s *IntegSuite) TestJobCancellationWithoutSavepoint(c *C) {
}
}
log.Info("All pods torn down")
log.Info("Completed test TestJobCancellationWithoutSavepoint")
}

// tests a job update with the existing job already in cancelled state.
// here, the new submitted job starts without a savepoint.
func (s *IntegSuite) TestCancelledJobWithoutSavepoint(c *C) {
log.Info("Starting test TestCancelledJobWithoutSavepoint")

testName := "invalidcancel"
config, err := s.Util.ReadFlinkApplication("test_app.yaml")
Expand All @@ -150,6 +152,7 @@ func (s *IntegSuite) TestCancelledJobWithoutSavepoint(c *C) {
Commentf("Failed to create flink application"))

c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil)

c.Assert(s.Util.WaitForAllTasksRunning(config.Name), IsNil)

currApp, _ := s.Util.GetFlinkApplication(config.Name)
Expand All @@ -163,7 +166,7 @@ func (s *IntegSuite) TestCancelledJobWithoutSavepoint(c *C) {
c.Assert(err, IsNil)

// wait a bit
time.Sleep(1 * time.Second)
time.Sleep(5 * time.Second)

job = s.Util.GetJobOverview(currApp)
c.Assert(job["status"], Equals, "CANCELED")
Expand Down Expand Up @@ -205,10 +208,12 @@ func (s *IntegSuite) TestCancelledJobWithoutSavepoint(c *C) {
}
}
log.Info("All pods torn down")
log.Info("Completed test TestCancelledJobWithoutSavepoint")
}

// tests the recovery workflow of the job when savepoint is disabled.
func (s *IntegSuite) TestJobRecoveryWithoutSavepoint(c *C) {
log.Info("Starting test TestJobRecoveryWithoutSavepoint")

const finalizer = "simple.finalizers.test.com"
const testName = "cancelrecovery"
Expand Down Expand Up @@ -300,4 +305,5 @@ func (s *IntegSuite) TestJobRecoveryWithoutSavepoint(c *C) {
time.Sleep(100 * time.Millisecond)
}
log.Info("All pods torn down")
log.Info("Completed test TestJobRecoveryWithoutSavepoint")
}
Loading

0 comments on commit d3a6fe4

Please sign in to comment.