Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STRMCMP-1639] Integration Test Fix #277

Merged
merged 68 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from 61 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
a0a90a0
local test working for TestJobCancellationWithoutSavepoint
sethsaperstein-lyft Mar 21, 2023
0cc931c
comment back in tests
sethsaperstein-lyft Mar 21, 2023
3859dc3
waits between cancel
sethsaperstein-lyft Mar 22, 2023
33234ca
5 min timeout
sethsaperstein-lyft Mar 22, 2023
a43addf
additional logging
sethsaperstein-lyft Mar 22, 2023
698bc34
remove local debug changes
sethsaperstein-lyft Mar 22, 2023
4e81a8d
test app on flink 1.11
sethsaperstein-lyft Mar 23, 2023
cb21077
remove local changes
sethsaperstein-lyft Mar 23, 2023
8a7811b
fix docker location
sethsaperstein-lyft Mar 23, 2023
68b2330
move to proper dir
sethsaperstein-lyft Mar 23, 2023
b0eede8
break waiting after 30s so errors show
sethsaperstein-lyft Mar 23, 2023
490f385
log all events at failure
sethsaperstein-lyft Mar 23, 2023
f821e91
forgot to add microk8s docker build and push
sethsaperstein-lyft Mar 23, 2023
3e6dbfe
wait longer for running app
sethsaperstein-lyft Mar 23, 2023
5101df2
tm logs first
sethsaperstein-lyft Mar 23, 2023
e904616
see if microk8s can take the memory
sethsaperstein-lyft Mar 23, 2023
0196be5
bump memory for tm due to 20mb allocated to task heap
sethsaperstein-lyft Mar 24, 2023
abc7186
increase flink tm heap size
sethsaperstein-lyft Mar 24, 2023
3965f45
set memory param. likely oom on second cluster
sethsaperstein-lyft Mar 24, 2023
74bebca
fix typo
sethsaperstein-lyft Mar 24, 2023
bca42b4
change memory configs
sethsaperstein-lyft Mar 24, 2023
02ebaef
add limits to containers. add concurrency limit to go test
sethsaperstein-lyft Mar 24, 2023
c469f10
limit cpu
sethsaperstein-lyft Mar 24, 2023
bd5a7b7
describe k8s resources on failure
sethsaperstein-lyft Mar 24, 2023
b3fabc6
add get pods to view restarts
sethsaperstein-lyft Mar 24, 2023
f425da7
try with minikube
sethsaperstein-lyft Mar 24, 2023
5350c37
flink 1.8 not 1.11
sethsaperstein-lyft Mar 24, 2023
3110b64
make executable
sethsaperstein-lyft Mar 24, 2023
f6cc040
remove microk8s from proxy kube
sethsaperstein-lyft Mar 24, 2023
15787c6
fix minikube command
sethsaperstein-lyft Mar 25, 2023
521bac5
disable rbac
sethsaperstein-lyft Mar 25, 2023
2c09fb2
workaround auth
sethsaperstein-lyft Mar 25, 2023
4cb11e5
fix file writing
sethsaperstein-lyft Mar 25, 2023
80b8251
raise timeout
sethsaperstein-lyft Mar 25, 2023
38d31d6
all integ tests passing local direct
sethsaperstein-lyft Mar 27, 2023
9ea26a8
bump minikube mem
sethsaperstein-lyft Mar 27, 2023
9a2f36b
decrease minikube mem
sethsaperstein-lyft Mar 27, 2023
6a8b657
removing permissions crashes minikube api
sethsaperstein-lyft Mar 27, 2023
f4bfd5b
add role
sethsaperstein-lyft Mar 27, 2023
f1d51ff
update timeouts
sethsaperstein-lyft Mar 27, 2023
532906d
raise timeout for cluster start. add tags to tests
sethsaperstein-lyft Mar 27, 2023
b5d6626
add better gc. add longer timeout
sethsaperstein-lyft Mar 27, 2023
eb7ede2
comment out scale up test as it uses too much cpu
sethsaperstein-lyft Mar 28, 2023
238f0ce
try to increase cpus. 2 should be max but lets find out
sethsaperstein-lyft Mar 28, 2023
cd0b4b1
yep 2 cpus in max
sethsaperstein-lyft Mar 28, 2023
5fc7c04
refactor clean up
sethsaperstein-lyft Mar 28, 2023
09b094d
update readme
sethsaperstein-lyft Mar 28, 2023
54c61af
update local dev docs
sethsaperstein-lyft Mar 28, 2023
13ac8c0
update ubuntu
sethsaperstein-lyft Mar 28, 2023
138b0a4
update ubuntu
sethsaperstein-lyft Mar 28, 2023
73a17cf
see if kube config directory issue is due to ubuntu upgrade
sethsaperstein-lyft Mar 29, 2023
b1dfd96
remove unused minikube command
sethsaperstein-lyft Mar 29, 2023
c928440
fix namespace in clusterrolebinding
sethsaperstein-lyft Mar 29, 2023
de9651f
fix lint issues
sethsaperstein-lyft Mar 29, 2023
1dd4788
move import for lint
sethsaperstein-lyft Mar 29, 2023
63b6797
upgrade ubuntu again
sethsaperstein-lyft Mar 29, 2023
1cbc3f6
attempt to fix ubuntu upgrade issue
sethsaperstein-lyft Mar 29, 2023
3db9f0c
after cluster start
sethsaperstein-lyft Mar 29, 2023
6e86f70
check if kube dir exists
sethsaperstein-lyft Mar 29, 2023
7fab7cc
set kube config location
sethsaperstein-lyft Mar 29, 2023
11fbbe8
check kube config env var
sethsaperstein-lyft Mar 30, 2023
69ccd27
move env var to test.sh since sudo
sethsaperstein-lyft Mar 30, 2023
21b7f7b
update docs
sethsaperstein-lyft Mar 30, 2023
2579200
upgrade newer ubuntu. all setup sudo so home is same
sethsaperstein-lyft Mar 30, 2023
48b2ecf
run tests without sudo
sethsaperstein-lyft Mar 30, 2023
aba457f
fix format
sethsaperstein-lyft Mar 30, 2023
db88cf9
remove commented env var
sethsaperstein-lyft Mar 30, 2023
1db3074
jira and test skip fix
sethsaperstein-lyft Mar 31, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 25 additions & 26 deletions .github/workflows/actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
branches: [ master ]
jobs:
unit-tests:
runs-on: ubuntu-18.04
runs-on: ubuntu-20.04
defaults:
run:
working-directory: go/src/github.com/lyft/flinkk8soperator
Expand All @@ -27,7 +27,7 @@ jobs:
- name: test
run: make test_unit
lint:
runs-on: ubuntu-18.04
runs-on: ubuntu-20.04
defaults:
run:
working-directory: go/src/github.com/lyft/flinkk8soperator
Expand All @@ -47,27 +47,26 @@ jobs:
run: make install
- name: test
run: make lint
# TODO: restore this test
# integration-tests:
# runs-on: ubuntu-18.04
# defaults:
# run:
# working-directory: go/src/github.com/lyft/flinkk8soperator
# env:
# GOPATH: "/home/runner/work/flinkk8soperator/flinkk8soperator/go/"
# steps:
# - name: checkout
# uses: actions/checkout@v2
# with:
# fetch-depth: 1
# path: go/src/github.com/lyft/flinkk8soperator
# - name: install go
# uses: actions/setup-go@v2
# with:
# go-version: 1.12
# - name: install
# run: integ/install.sh
# - name: setup
# run: integ/setup.sh
# - name: test
# run: sudo "PATH=$PATH" "GOPATH=$GOPATH" integ/test.sh
integration-tests:
runs-on: ubuntu-20.04
defaults:
run:
working-directory: go/src/github.com/lyft/flinkk8soperator
env:
GOPATH: "/home/runner/work/flinkk8soperator/flinkk8soperator/go/"
steps:
- name: checkout
uses: actions/checkout@v2
with:
fetch-depth: 1
path: go/src/github.com/lyft/flinkk8soperator
- name: install go
uses: actions/setup-go@v2
with:
go-version: 1.12
- name: install
run: integ/install.sh
- name: setup
run: integ/setup.sh
- name: test
run: sudo "PATH=$PATH" "GOPATH=$GOPATH" integ/test.sh
14 changes: 10 additions & 4 deletions docs/local_dev.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ to develop their applications locally.

## Run the operator

### Install [Docker for Mac](https://docs.docker.com/docker-for-mac/install/)
### Install [Minikube](https://minikube.sigs.k8s.io/docs/start/#what-youll-need)

You will want to start minikube on <=1.20, for example:
`minikube start --kubernetes-version=v1.20.15`

Once installed and running, enabled Kuberenetes in settings (from the
docker icon in the menu bar, click Preferences -> Kubernetes -> Enable
Kubernetes).

### (Optional) Setup kubernetes dashboard

Expand Down Expand Up @@ -46,6 +46,12 @@ $ cd flinkk8soperator
$ kubectl create -f deploy/crd.yaml
```

### Install permissions
``` bash
$ kubectl create -f deploy/role.yaml
$ kubectl create -f deploy/role-binding.yaml
```

### Start the operator

#### Option 1: run outside the kubernetes cluster
Expand Down
37 changes: 37 additions & 0 deletions integ/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,40 @@ variables. Supported options include:
You can also pass [gocheck](http://labix.org/gocheck) options to the
test runner. Particularly useful is `-check.vv` which will output logs
from the operator and Flink pods to help debugging test failures.

### Minikube Setup

Ideally we'd use k8s 1.16 to match the deployed k8s version, however, this
is non-trivial due to cgroup configurations. Instead, we will use a version
that is compatible with v1beta1 CRD's which corresponds to <1.22. CRD's v1
is only available with client >=1.16, however, the client used here is 1.14
and the upgrade is non-trivial.
sethsaperstein-lyft marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just add a TODO here with the jira to upgrade client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a ticket for the 1.22 upgrade and added



1. Install Dependencies
Run dep ensure -vendor-only
sethsaperstein-lyft marked this conversation as resolved.
Show resolved Hide resolved

3. Start minikube
minikube start --kubernetes-version=v1.20.15

4. Proxy minikube
kubectl proxy --port 8001 &

5. Set up test app images and operator image
integ/setup.sh

8. Set the following for the Go test:
Package path: github.com/lyft/flinkk8soperator/integ
Env: INTEGRATION=true;OPERATOR_IMAGE=flinkk8soperator:local;RUN_DIRECT=true
Program Args: -timeout 40m -check.vv IntegTest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought IntegTest is not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this when attempting to target a single test. No luck there for now so leaving as is since this is what was there for the integration test prior. In the future we'll figure out how to target a single test as I'll likely write an integration test for fallback without savepoint



Helpers:
- Kill kube proxy
ps -ef | grep "kubectl proxy"
kill -9 <process_id>
- Kill stuck flink app
kubectl patch FlinkApplication invalidcanceljob -p '{"metadata":{"finalizers":[]}}' --type=merge
- Set default namespace
kubectl config set-context --current --namespace=flinkoperatortest

6 changes: 4 additions & 2 deletions integ/blue_green_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func WaitForUpdate(c *C, s *IntegSuite, name string, updateFn func(app *v1beta1.
}

func (s *IntegSuite) TestUpdateWithBlueGreenDeploymentMode(c *C) {
log.Info("Starting test TestUpdateWithBlueGreenDeploymentMode")

testName := "bluegreenupdate"
const finalizer = "bluegreen.finalizers.test.com"
Expand All @@ -55,7 +56,7 @@ func (s *IntegSuite) TestUpdateWithBlueGreenDeploymentMode(c *C) {
pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
List(v1.ListOptions{LabelSelector: "integTest=" + testName})
c.Assert(err, IsNil)
c.Assert(len(pods.Items), Equals, 3)
c.Assert(len(pods.Items), Equals, 2)
for _, pod := range pods.Items {
c.Assert(pod.Spec.Containers[0].Image, Equals, config.Spec.Image)
}
Expand All @@ -72,7 +73,7 @@ func (s *IntegSuite) TestUpdateWithBlueGreenDeploymentMode(c *C) {
List(v1.ListOptions{LabelSelector: "integTest=" + testName})
c.Assert(err, IsNil)
// We have 2 applications running
c.Assert(len(pods.Items), Equals, 6)
c.Assert(len(pods.Items), Equals, 4)
c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationDualRunning, v1beta1.FlinkApplicationDeployFailed), IsNil)
c.Assert(s.Util.GetJobID(newApp), NotNil)
c.Assert(newApp.Status.UpdatingVersion, Equals, v1beta1.BlueFlinkApplication)
Expand Down Expand Up @@ -153,4 +154,5 @@ func (s *IntegSuite) TestUpdateWithBlueGreenDeploymentMode(c *C) {
}
}
log.Info("All pods torn down")
log.Info("Completed test TestUpdateWithBlueGreenDeploymentMode")
}
14 changes: 9 additions & 5 deletions integ/checkpoint_failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package integ

import (
"fmt"
"io/ioutil"
"os"
"time"

"github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1"
Expand Down Expand Up @@ -67,18 +65,24 @@ func failingJobTest(s *IntegSuite, c *C, testName string, causeFailure func()) {

// Tests that we correctly handle updating a job with task failures
func (s *IntegSuite) TestJobWithTaskFailures(c *C) {
log.Info("Starting test TestJobWithTaskFailures")

failingJobTest(s, c, "taskfailure", func() {
f, err := os.OpenFile(s.Util.CheckpointDir+"/fail", os.O_RDONLY|os.O_CREATE, 0666)
err := s.Util.ExecuteCommand("minikube", "ssh", "touch /tmp/checkpoints/fail && chmod 0644 /tmp/checkpoints/fail")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get this test, you're creating and modifying privs on a file and then just asserting the commands ran without errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.Assert(err, IsNil)
c.Assert(f.Close(), IsNil)
})
log.Info("Completed test TestJobWithTaskFailures")
}

// Tests that we correctly handle updating a job with a checkpoint timeout
func (s *IntegSuite) TestCheckpointTimeout(c *C) {
log.Info("Starting test TestCheckpointTimeout")

failingJobTest(s, c, "checkpointtimeout", func() {
// cause checkpoints to take 120 seconds
err := ioutil.WriteFile(s.Util.CheckpointDir+"/checkpoint_delay", []byte("120000"), 0644)
err := s.Util.ExecuteCommand("minikube", "ssh", "echo 120000 >> /tmp/checkpoints/checkpoint_delay && sudo chmod 0644 /tmp/checkpoints/checkpoint_delay")
c.Assert(err, IsNil)
})
log.Info("Completed test TestCheckpointTimeout")

}
11 changes: 7 additions & 4 deletions integ/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

set -e

sudo snap install microk8s --classic --channel=1.13/stable
microk8s.status --wait-ready
microk8s.enable dns
microk8s.enable registry
curl -LO -s https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
sudo install minikube-linux-amd64 /usr/local/bin/minikube

minikube config set memory 6800
minikube start --kubernetes-version=v1.20.15

export KUBERNETES_CONFIG=/home/runner/.kube/config

sh boilerplate/lyft/golang_test_targets/dep_install.sh

Expand Down
14 changes: 10 additions & 4 deletions integ/job_cancellation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func WaitUpdateAndValidate(c *C, s *IntegSuite, name string, updateFn func(app *

// tests the workflow of job cancellation without savepoint
func (s *IntegSuite) TestJobCancellationWithoutSavepoint(c *C) {

log.Info("Starting test TestJobCancellationWithoutSavepoint")
testName := "cancelsuccess"
const finalizer = "simple.finalizers.test.com"

Expand All @@ -81,7 +81,7 @@ func (s *IntegSuite) TestJobCancellationWithoutSavepoint(c *C) {
pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
List(v1.ListOptions{LabelSelector: "integTest=" + testName})
c.Assert(err, IsNil)
c.Assert(len(pods.Items), Equals, 3)
c.Assert(len(pods.Items), Equals, 2)
for _, pod := range pods.Items {
c.Assert(pod.Spec.Containers[0].Image, Equals, config.Spec.Image)
}
Expand All @@ -97,7 +97,7 @@ func (s *IntegSuite) TestJobCancellationWithoutSavepoint(c *C) {
pods, err = s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
List(v1.ListOptions{LabelSelector: "integTest=" + testName})
c.Assert(err, IsNil)
c.Assert(len(pods.Items), Equals, 3)
c.Assert(len(pods.Items), Equals, 2)
for _, pod := range pods.Items {
c.Assert(pod.Spec.Containers[0].Image, Equals, NewImage)
}
Expand Down Expand Up @@ -131,11 +131,13 @@ func (s *IntegSuite) TestJobCancellationWithoutSavepoint(c *C) {
}
}
log.Info("All pods torn down")
log.Info("Completed test TestJobCancellationWithoutSavepoint")
}

// tests a job update with the existing job already in cancelled state.
// here, the new submitted job starts without a savepoint.
func (s *IntegSuite) TestCancelledJobWithoutSavepoint(c *C) {
log.Info("Starting test TestCancelledJobWithoutSavepoint")

testName := "invalidcancel"
config, err := s.Util.ReadFlinkApplication("test_app.yaml")
Expand All @@ -150,6 +152,7 @@ func (s *IntegSuite) TestCancelledJobWithoutSavepoint(c *C) {
Commentf("Failed to create flink application"))

c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil)

c.Assert(s.Util.WaitForAllTasksRunning(config.Name), IsNil)

currApp, _ := s.Util.GetFlinkApplication(config.Name)
Expand All @@ -163,7 +166,7 @@ func (s *IntegSuite) TestCancelledJobWithoutSavepoint(c *C) {
c.Assert(err, IsNil)

// wait a bit
time.Sleep(1 * time.Second)
time.Sleep(5 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't have to change anything but generally speaking I am never a fan of such sleep timers in tests (just makes it flakey).
I would always promote to change this to an polling method with exponential delay (and an exit criteria). Again, don't need to do that now but just wanted to point it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Will leave as a future update or at least keep in mind while writing future integration tests for this repo


job = s.Util.GetJobOverview(currApp)
c.Assert(job["status"], Equals, "CANCELED")
Expand Down Expand Up @@ -205,10 +208,12 @@ func (s *IntegSuite) TestCancelledJobWithoutSavepoint(c *C) {
}
}
log.Info("All pods torn down")
log.Info("Completed test TestCancelledJobWithoutSavepoint")
}

// tests the recovery workflow of the job when savepoint is disabled.
func (s *IntegSuite) TestJobRecoveryWithoutSavepoint(c *C) {
log.Info("Starting test TestJobRecoveryWithoutSavepoint")

const finalizer = "simple.finalizers.test.com"
const testName = "cancelrecovery"
Expand Down Expand Up @@ -300,4 +305,5 @@ func (s *IntegSuite) TestJobRecoveryWithoutSavepoint(c *C) {
time.Sleep(100 * time.Millisecond)
}
log.Info("All pods torn down")
log.Info("Completed test TestJobRecoveryWithoutSavepoint")
}
54 changes: 41 additions & 13 deletions integ/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (s *IntegSuite) SetUpSuite(c *C) {
}

kubeconfig := os.Getenv("KUBERNETES_CONFIG")
fmt.Printf("Kube config: %s", kubeconfig)
if kubeconfig == "" {
kubeconfig = filepath.Join(homedir.HomeDir(), ".kube", "config")
err := os.Setenv("KUBERNETES_CONFIG", kubeconfig)
Expand Down Expand Up @@ -79,7 +80,7 @@ func (s *IntegSuite) SetUpSuite(c *C) {
LimitNamespace: namespace,
UseProxy: true,
ResyncPeriod: flyteConfig.Duration{Duration: 3 * time.Second},
MaxErrDuration: flyteConfig.Duration{Duration: 30 * time.Second},
MaxErrDuration: flyteConfig.Duration{Duration: 60 * time.Second},
MetricsPrefix: "flinkk8soperator",
ProxyPort: flyteConfig.Port{Port: 8001},
}
Expand All @@ -92,6 +93,18 @@ func (s *IntegSuite) SetUpSuite(c *C) {
}
}()
} else {
if err = s.Util.CreateClusterRole(); err != nil && !k8sErrors.IsAlreadyExists(err) {
c.Fatalf("Failed to create role: %v", err)
}

if err = s.Util.CreateServiceAccount(); err != nil && !k8sErrors.IsAlreadyExists(err) {
c.Fatalf("Failed to create service account: %v", err)
}

if err = s.Util.CreateClusterRoleBinding(); err != nil && !k8sErrors.IsAlreadyExists(err) {
c.Fatalf("Failed to create cluster role binding: %v", err)
}

if err = s.Util.CreateOperator(); err != nil {
c.Fatalf("Failed to create operator: %v", err)
}
Expand All @@ -111,18 +124,12 @@ func (s *IntegSuite) TearDownSuite(c *C) {

func (s *IntegSuite) SetUpTest(c *C) {
// create checkpoint directory
if _, err := os.Stat(s.Util.CheckpointDir); os.IsNotExist(err) {
c.Assert(os.Mkdir(s.Util.CheckpointDir, 0777), IsNil)
if err := s.Util.ExecuteCommand("minikube", "ssh", "sudo mkdir /tmp/checkpoints && sudo chmod -R 0777 /tmp/checkpoints"); err != nil {
c.Fatalf("Failed to create checkpoint directory: %v", err)
}
}

func (s *IntegSuite) TearDownTest(c *C) {
jm, err := s.Util.GetJobManagerPod()
if err == nil {
fmt.Printf("\n\n######### JobManager logs for debugging #########\n---------------------------\n")
_ = s.Util.GetLogs(jm, nil)
}

tms, err := s.Util.GetTaskManagerPods()
if err == nil {
for i, tm := range tms {
Expand All @@ -132,13 +139,34 @@ func (s *IntegSuite) TearDownTest(c *C) {
}
}

jm, err := s.Util.GetJobManagerPod()
if err == nil {
fmt.Printf("\n\n######### JobManager logs for debugging #########\n---------------------------\n")
_ = s.Util.GetLogs(jm, nil)
}

fmt.Printf("\n\n######### Nodes for debugging #########\n---------------------------\n")
err = s.Util.ExecuteCommand("kubectl", "describe", "nodes")
c.Assert(err, IsNil)

fmt.Printf("\n\n######### Pods for debugging #########\n---------------------------\n")
err = s.Util.ExecuteCommand("kubectl", "get", "pods", "-n", "flinkoperatortest")
c.Assert(err, IsNil)

fmt.Printf("\n\n######### Pod details for debugging #########\n---------------------------\n")
err = s.Util.ExecuteCommand("kubectl", "describe", "pods", "-n", "flinkoperatortest")
c.Assert(err, IsNil)

fmt.Printf("\n\n######### Flink Applications for debugging #########\n---------------------------\n")
err = s.Util.ExecuteCommand("kubectl", "describe", "flinkapplications", "-n", "flinkoperatortest")
c.Assert(err, IsNil)

err = s.Util.FlinkApps().DeleteCollection(nil, v1.ListOptions{})
if err != nil {
log.Fatalf("Failed to clean up flink applications")
log.Fatalf("Failed to clean up flink applications: %v", err)
}

err = os.RemoveAll(s.Util.CheckpointDir)
if err != nil {
log.Fatalf("Failed to clean up checkpoints directory: %v", err)
if err := s.Util.ExecuteCommand("minikube", "ssh", "sudo rm -rf /tmp/checkpoints"); err != nil {
c.Fatalf("Failed to delete checkpoint directory: %v", err)
}
}
Loading