From 07b7b4ff4d3760e67f6c286f51c423b90a305953 Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Thu, 22 Aug 2019 13:17:52 -0700 Subject: [PATCH 1/5] Use error retrying system for savepoint errors during deletion --- pkg/controller/flinkapplication/flink_state_machine.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index 5921950f..5cd67c35 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -2,6 +2,7 @@ package flinkapplication import ( "context" + "math" "time" "k8s.io/client-go/tools/record" @@ -676,6 +677,8 @@ func (s *FlinkStateMachine) handleApplicationDeleting(ctx context.Context, app * fmt.Sprintf("Failed to take savepoint %v", status.Operation.FailureCause)) // clear the trigger id so that we can try again app.Spec.SavepointInfo.TriggerID = "" + return true, client.GetRetryableError(errors.New("failed to take savepoint"), + client.CancelJobWithSavepoint, "500", math.MaxInt32) } else if status.SavepointStatus.Status == client.SavePointCompleted { // we're done, clean up s.flinkController.LogEvent(ctx, app, corev1.EventTypeNormal, "CanceledJob", From f53d63d787d0c8e99f390dfc76fc3ff2f5090c18 Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Thu, 22 Aug 2019 14:55:57 -0700 Subject: [PATCH 2/5] Increase maxBackoffDuration for integration tests --- integ/utils/utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integ/utils/utils.go b/integ/utils/utils.go index a35a90e6..a633f92b 100644 --- a/integ/utils/utils.go +++ b/integ/utils/utils.go @@ -139,7 +139,7 @@ func (f *TestUtil) CreateCRD() error { func (f *TestUtil) CreateOperator() error { configValue := make(map[string]string) - configValue["development"] = "operator:\n containerNameFormat: \"%s-unknown\"\n resyncPeriod: 5s\n baseBackoffDuration: 10ms\n maxBackoffDuration: 50ms\n maxErrDuration: 40s\n " + configValue["development"] = "operator:\n containerNameFormat: \"%s-unknown\"\n resyncPeriod: 5s\n baseBackoffDuration: 50ms\n maxBackoffDuration: 2s\n maxErrDuration: 40s\n " configMap := v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ From 2b5944180649e3cd55767a5a435563f8ec36a815 Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Thu, 22 Aug 2019 14:56:32 -0700 Subject: [PATCH 3/5] increase test timeout --- integ/test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integ/test.sh b/integ/test.sh index 21e7bb3a..08724063 100755 --- a/integ/test.sh +++ b/integ/test.sh @@ -9,5 +9,5 @@ export OPERATOR_IMAGE=127.0.0.1:32000/flinkk8soperator:local umask 000 cd $(dirname "$0") -go test -timeout 22m -check.vv IntegSuite +go test -timeout 30m -check.vv IntegSuite From 76b3bff298f474965abd71b357c9a8081310a12b Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Thu, 22 Aug 2019 15:17:17 -0700 Subject: [PATCH 4/5] Add unit test --- .../flink_state_machine_test.go | 45 ++++++++++++++----- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index 1f072a73..60d33540 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -704,8 +704,10 @@ func TestDeleteWithSavepoint(t *testing.T) { if updateCount == 1 { assert.Equal(t, triggerID, application.Spec.SavepointInfo.TriggerID) } else if updateCount == 2 { - assert.Equal(t, savepointPath, application.Spec.SavepointInfo.SavepointLocation) + assert.NotNil(t, application.Status.LastSeenError) } else if updateCount == 3 { + assert.Equal(t, savepointPath, application.Spec.SavepointInfo.SavepointLocation) + } else if updateCount == 4 { assert.Equal(t, 0, len(app.Finalizers)) } @@ -717,21 +719,42 @@ func TestDeleteWithSavepoint(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 2, updateCount) + savepointStatusCount := 0 mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) { - return &client.SavepointResponse{ - SavepointStatus: client.SavepointStatusResponse{ - Status: client.SavePointCompleted, - }, - Operation: client.SavepointOperationResponse{ - Location: "s3:///path/to/savepoint", - }, - }, nil + savepointStatusCount++ + + if savepointStatusCount == 1 { + return &client.SavepointResponse{ + SavepointStatus: client.SavepointStatusResponse{ + Status: client.SavePointCompleted, + }, + Operation: client.SavepointOperationResponse{ + FailureCause: client.FailureCause{ + Class: "java.util.concurrent.CompletionException", + StackTrace: "Exception", + }, + }, + }, nil + } else { + return &client.SavepointResponse{ + SavepointStatus: client.SavepointStatusResponse{ + Status: client.SavePointCompleted, + }, + Operation: client.SavepointOperationResponse{ + Location: "s3:///path/to/savepoint", + }, + }, nil + } } + // the first time we return an error from the savepointing status + err = stateMachineForTest.Handle(context.Background(), app.DeepCopy()) + assert.Error(t, err) + err = stateMachineForTest.Handle(context.Background(), &app) assert.NoError(t, err) - assert.Equal(t, 3, updateCount) + assert.Equal(t, 4, updateCount) mockFlinkController.GetJobsForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (jobs []client.FlinkJob, err error) { return []client.FlinkJob{ @@ -745,7 +768,7 @@ func TestDeleteWithSavepoint(t *testing.T) { err = stateMachineForTest.Handle(context.Background(), &app) assert.NoError(t, err) - assert.Equal(t, 4, updateCount) + assert.Equal(t, 5, updateCount) } From 30faac73a8983504f2938e18a77bc0f7b4fcdf33 Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Thu, 22 Aug 2019 15:21:47 -0700 Subject: [PATCH 5/5] fix lint --- .../flink_state_machine_test.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index 60d33540..6d76739f 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -730,21 +730,20 @@ func TestDeleteWithSavepoint(t *testing.T) { }, Operation: client.SavepointOperationResponse{ FailureCause: client.FailureCause{ - Class: "java.util.concurrent.CompletionException", + Class: "java.util.concurrent.CompletionException", StackTrace: "Exception", }, }, }, nil - } else { - return &client.SavepointResponse{ - SavepointStatus: client.SavepointStatusResponse{ - Status: client.SavePointCompleted, - }, - Operation: client.SavepointOperationResponse{ - Location: "s3:///path/to/savepoint", - }, - }, nil } + return &client.SavepointResponse{ + SavepointStatus: client.SavepointStatusResponse{ + Status: client.SavePointCompleted, + }, + Operation: client.SavepointOperationResponse{ + Location: "s3:///path/to/savepoint", + }, + }, nil } // the first time we return an error from the savepointing status