From 0bc8686ffe05ca8b3687ff740689bd50ea72c439 Mon Sep 17 00:00:00 2001 From: glaksh100 Date: Sun, 8 Dec 2019 18:13:30 -0800 Subject: [PATCH 1/6] [WIP] Fix for deploy getting stuck --- pkg/controller/flink/client/api.go | 7 +- .../flinkapplication/flink_state_machine.go | 23 +++- .../flink_state_machine_test.go | 128 ++++++++++++++++++ 3 files changed, 150 insertions(+), 8 deletions(-) diff --git a/pkg/controller/flink/client/api.go b/pkg/controller/flink/client/api.go index 3b93ca4e..fe173045 100644 --- a/pkg/controller/flink/client/api.go +++ b/pkg/controller/flink/client/api.go @@ -37,6 +37,7 @@ const httpPatch = "PATCH" const retryCount = 3 const httpGetTimeOut = 5 * time.Second const defaultTimeOut = 1 * time.Minute +const checkSavepointStatusRetries = 3 // ProgramInvocationException is thrown when the entry class doesn't exist or throws an exception const programInvocationException = "org.apache.flink.client.program.ProgramInvocationException" @@ -264,17 +265,17 @@ func (c *FlinkJobManagerClient) CheckSavepointStatus(ctx context.Context, url st response, err := c.executeRequest(ctx, httpGet, url, nil) if err != nil { c.metrics.checkSavepointFailureCounter.Inc(ctx) - return nil, GetRetryableError(err, v1beta1.CheckSavepointStatus, GlobalFailure, DefaultRetries) + return nil, GetRetryableError(err, v1beta1.CheckSavepointStatus, GlobalFailure, checkSavepointStatusRetries) } if response != nil && !response.IsSuccess() { c.metrics.checkSavepointFailureCounter.Inc(ctx) logger.Errorf(ctx, fmt.Sprintf("Check savepoint status failed with response %v", response)) - return nil, GetRetryableError(err, v1beta1.CheckSavepointStatus, response.Status(), DefaultRetries) + return nil, GetRetryableError(err, v1beta1.CheckSavepointStatus, response.Status(), checkSavepointStatusRetries) } var savepointResponse SavepointResponse if err = json.Unmarshal(response.Body(), &savepointResponse); err != nil { logger.Errorf(ctx, "Unable to Unmarshal savepointResponse %v, err: %v", response, err) - return nil, GetRetryableError(err, v1beta1.CheckSavepointStatus, JSONUnmarshalError, DefaultRetries) + return nil, GetRetryableError(err, v1beta1.CheckSavepointStatus, JSONUnmarshalError, checkSavepointStatusRetries) } c.metrics.cancelJobSuccessCounter.Inc(ctx) return &savepointResponse, nil diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index 2482e906..a79ae5c2 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -160,7 +160,7 @@ func (s *FlinkStateMachine) handle(ctx context.Context, application *v1beta1.Fli return statusChanged, nil } - if s.IsTimeToHandlePhase(application) { + if s.IsTimeToHandlePhase(application, appPhase) { if !v1beta1.IsRunningPhase(application.Status.Phase) { logger.Infof(ctx, "Handling state for application") } @@ -193,7 +193,14 @@ func (s *FlinkStateMachine) handle(ctx context.Context, application *v1beta1.Fli return updateApplication || updateLastSeenError, appErr } -func (s *FlinkStateMachine) IsTimeToHandlePhase(application *v1beta1.FlinkApplication) bool { +func (s *FlinkStateMachine) IsTimeToHandlePhase(application *v1beta1.FlinkApplication, phase v1beta1.FlinkApplicationPhase) bool { + if phase == v1beta1.FlinkApplicationDeleting { + // reset lastSeenError and retryCount in case the application was failing in its previous phase + // We always want a Deleting phase to be handled + application.Status.LastSeenError = nil + application.Status.RetryCount = 0 + return true + } lastSeenError := application.Status.LastSeenError if application.Status.LastSeenError == nil || !s.retryHandler.IsErrorRetryable(application.Status.LastSeenError) { return true @@ -312,12 +319,18 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a // check the savepoints in progress savepointStatusResponse, err := s.flinkController.GetSavepointStatus(ctx, application, application.Status.DeployHash) - if err != nil { + // Here shouldRollback() is used as a proxy to identify if we have exhausted all retries trying to GetSavepointStatus + // The application is NOT actually rolled back because we're in a spot where there's no application running and we don't + // have information on the last successful savepoint. + // In the event that rollback evaluates to True, we don't return immediately but allow for the remaining steps to proceed as normal. + rollback, _ := s.shouldRollback(ctx, application) + + if err != nil && !rollback { return statusUnchanged, err } var restorePath string - if savepointStatusResponse.Operation.Location == "" && + if savepointStatusResponse != nil && savepointStatusResponse.Operation.Location == "" && savepointStatusResponse.SavepointStatus.Status != client.SavePointInProgress { // Savepointing failed // TODO: we should probably retry this a few times before failing @@ -340,7 +353,7 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a path, flink.HashForApplication(application))) restorePath = path - } else if savepointStatusResponse.SavepointStatus.Status == client.SavePointCompleted { + } else if savepointStatusResponse != nil && savepointStatusResponse.SavepointStatus.Status == client.SavePointCompleted { s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "CanceledJob", fmt.Sprintf("Canceled job with savepoint %s", savepointStatusResponse.Operation.Location)) diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index 113360e2..28643bf1 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -1263,3 +1263,131 @@ func TestLastSeenErrTimeIsNil(t *testing.T) { assert.Nil(t, err) } + +func TestCheckSavepointStatusFailing(t *testing.T) { + oldHash := "old-hash-force-nil" + maxRetries := int32(1) + retryableErr := client.GetRetryableError(errors.New("blah"), "CheckSavepointStatus", "FAILED", 1) + app := v1beta1.FlinkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "flink", + }, + Spec: v1beta1.FlinkApplicationSpec{ + JarName: "job.jar", + Parallelism: 5, + EntryClass: "com.my.Class", + ProgramArgs: "--test", + }, + Status: v1beta1.FlinkApplicationStatus{ + Phase: v1beta1.FlinkApplicationSavepointing, + DeployHash: oldHash, + LastSeenError: retryableErr.(*v1beta1.FlinkApplicationError), + SavepointTriggerID: "trigger", + }, + } + app.Status.LastSeenError.LastErrorUpdateTime = nil + + stateMachineForTest := getTestStateMachine() + mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) + mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) { + return &client.SavepointResponse{ + SavepointStatus: client.SavepointStatusResponse{ + Status: client.SavePointInvalid, + }, + Operation: client.SavepointOperationResponse{ + Location: "", + }}, retryableErr.(*v1beta1.FlinkApplicationError) + } + + mockFlinkController.FindExternalizedCheckpointFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) { + return "/tmp/checkpoint", nil + } + mockRetryHandler := stateMachineForTest.retryHandler.(*mock.RetryHandler) + mockRetryHandler.IsErrorRetryableFunc = func(err error) bool { + return true + } + mockRetryHandler.IsTimeToRetryFunc = func(clock clock.Clock, lastUpdatedTime time.Time, retryCount int32) bool { + return true + } + mockRetryHandler.IsRetryRemainingFunc = func(err error, retryCount int32) bool { + return retryCount < maxRetries + } + + err := stateMachineForTest.Handle(context.Background(), &app) + // 1 retry left + assert.NotNil(t, err) + assert.Equal(t, v1beta1.FlinkApplicationSavepointing, app.Status.Phase) + + // No retries left for CheckSavepointStatus + // The app should hence try to recover from an externalized checkpoint + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + assert.Equal(t, v1beta1.FlinkApplicationSubmittingJob, app.Status.Phase) +} + +func TestDeleteWhenCheckSavepointStatusFailing(t *testing.T) { + oldHash := "old-hash" + retryableErr := client.GetRetryableError(errors.New("blah"), "CheckSavepointStatus", "FAILED", 1) + app := v1beta1.FlinkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "flink", + }, + Spec: v1beta1.FlinkApplicationSpec{ + JarName: "job.jar", + Parallelism: 5, + EntryClass: "com.my.Class", + ProgramArgs: "--test", + }, + Status: v1beta1.FlinkApplicationStatus{ + Phase: v1beta1.FlinkApplicationSavepointing, + DeployHash: oldHash, + LastSeenError: retryableErr.(*v1beta1.FlinkApplicationError), + SavepointTriggerID: "trigger", + }, + } + app.Status.LastSeenError.LastErrorUpdateTime = nil + + stateMachineForTest := getTestStateMachine() + mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) + mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) { + return &client.SavepointResponse{ + SavepointStatus: client.SavepointStatusResponse{ + Status: client.SavePointInvalid, + }, + Operation: client.SavepointOperationResponse{ + Location: "", + }}, retryableErr.(*v1beta1.FlinkApplicationError) + } + mockRetryHandler := stateMachineForTest.retryHandler.(*mock.RetryHandler) + mockRetryHandler.IsErrorRetryableFunc = func(err error) bool { + return true + } + mockRetryHandler.IsRetryRemainingFunc = func(err error, retryCount int32) bool { + return true + } + err := stateMachineForTest.Handle(context.Background(), &app) + assert.NotNil(t, err) + assert.Equal(t, v1beta1.FlinkApplicationSavepointing, app.Status.Phase) + // Try to force delete the app while it's in a savepointing state (with errors) + // We should handle the delete here + app.Status.Phase = v1beta1.FlinkApplicationDeleting + app.Spec.DeleteMode = v1beta1.DeleteModeForceCancel + + mockFlinkController.GetJobForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) { + assert.Equal(t, "old-hash", hash) + return &client.FlinkJobOverview{ + JobID: "jobID", + State: client.Failing, + }, nil + } + + mockFlinkController.ForceCancelFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error { + return nil + } + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + assert.Nil(t, app.GetFinalizers()) + +} From 5a73cc5b4841d530538cfee5328019030abcd4e1 Mon Sep 17 00:00:00 2001 From: glaksh100 Date: Sun, 8 Dec 2019 18:34:20 -0800 Subject: [PATCH 2/6] fix lint --- .../flinkapplication/flink_state_machine_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index 28643bf1..6bf916ca 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -1265,7 +1265,7 @@ func TestLastSeenErrTimeIsNil(t *testing.T) { } func TestCheckSavepointStatusFailing(t *testing.T) { - oldHash := "old-hash-force-nil" + oldHash := "old-hash-fail" maxRetries := int32(1) retryableErr := client.GetRetryableError(errors.New("blah"), "CheckSavepointStatus", "FAILED", 1) app := v1beta1.FlinkApplication{ @@ -1327,7 +1327,6 @@ func TestCheckSavepointStatusFailing(t *testing.T) { } func TestDeleteWhenCheckSavepointStatusFailing(t *testing.T) { - oldHash := "old-hash" retryableErr := client.GetRetryableError(errors.New("blah"), "CheckSavepointStatus", "FAILED", 1) app := v1beta1.FlinkApplication{ ObjectMeta: metav1.ObjectMeta{ @@ -1342,7 +1341,7 @@ func TestDeleteWhenCheckSavepointStatusFailing(t *testing.T) { }, Status: v1beta1.FlinkApplicationStatus{ Phase: v1beta1.FlinkApplicationSavepointing, - DeployHash: oldHash, + DeployHash: "appHash", LastSeenError: retryableErr.(*v1beta1.FlinkApplicationError), SavepointTriggerID: "trigger", }, @@ -1376,7 +1375,7 @@ func TestDeleteWhenCheckSavepointStatusFailing(t *testing.T) { app.Spec.DeleteMode = v1beta1.DeleteModeForceCancel mockFlinkController.GetJobForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) { - assert.Equal(t, "old-hash", hash) + assert.Equal(t, "appHash", hash) return &client.FlinkJobOverview{ JobID: "jobID", State: client.Failing, From cb9bb95619e30eb7469aa5311c999eec4f843485 Mon Sep 17 00:00:00 2001 From: glaksh100 Date: Mon, 9 Dec 2019 09:55:42 -0800 Subject: [PATCH 3/6] Refine comment --- pkg/controller/flinkapplication/flink_state_machine.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index a79ae5c2..aa28e15d 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -320,15 +320,19 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a // check the savepoints in progress savepointStatusResponse, err := s.flinkController.GetSavepointStatus(ctx, application, application.Status.DeployHash) // Here shouldRollback() is used as a proxy to identify if we have exhausted all retries trying to GetSavepointStatus - // The application is NOT actually rolled back because we're in a spot where there's no application running and we don't + // The application is NOT actually rolled back because we're in a spot where there may be no application running and we don't // have information on the last successful savepoint. // In the event that rollback evaluates to True, we don't return immediately but allow for the remaining steps to proceed as normal. - rollback, _ := s.shouldRollback(ctx, application) + rollback, reason := s.shouldRollback(ctx, application) if err != nil && !rollback { return statusUnchanged, err } + s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "SavepointStatusCheckFailed", + fmt.Sprintf("Exhausted retries trying to get status for savepoint for job %s: %v", + application.Status.JobStatus.JobID, reason)) + var restorePath string if savepointStatusResponse != nil && savepointStatusResponse.Operation.Location == "" && savepointStatusResponse.SavepointStatus.Status != client.SavePointInProgress { From 69b4e2392777e4ab6a8c80c057b3efd8acd91c88 Mon Sep 17 00:00:00 2001 From: glaksh100 Date: Mon, 9 Dec 2019 09:58:20 -0800 Subject: [PATCH 4/6] Add more asserts --- pkg/controller/flinkapplication/flink_state_machine_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index 6bf916ca..c123c805 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -1369,6 +1369,7 @@ func TestDeleteWhenCheckSavepointStatusFailing(t *testing.T) { err := stateMachineForTest.Handle(context.Background(), &app) assert.NotNil(t, err) assert.Equal(t, v1beta1.FlinkApplicationSavepointing, app.Status.Phase) + assert.NotNil(t, app.Status.LastSeenError) // Try to force delete the app while it's in a savepointing state (with errors) // We should handle the delete here app.Status.Phase = v1beta1.FlinkApplicationDeleting @@ -1387,6 +1388,8 @@ func TestDeleteWhenCheckSavepointStatusFailing(t *testing.T) { } err = stateMachineForTest.Handle(context.Background(), &app) assert.Nil(t, err) + assert.Nil(t, app.Status.LastSeenError) + assert.Equal(t, int32(0), app.Status.RetryCount) assert.Nil(t, app.GetFinalizers()) } From f20136fb01b96d60b6280f84c088abe83be22e57 Mon Sep 17 00:00:00 2001 From: glaksh100 Date: Mon, 9 Dec 2019 17:44:57 -0800 Subject: [PATCH 5/6] Fix log event --- pkg/controller/flinkapplication/flink_state_machine.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index aa28e15d..bee1515b 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -329,9 +329,11 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a return statusUnchanged, err } - s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "SavepointStatusCheckFailed", - fmt.Sprintf("Exhausted retries trying to get status for savepoint for job %s: %v", - application.Status.JobStatus.JobID, reason)) + if err != nil && rollback { + s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "SavepointStatusCheckFailed", + fmt.Sprintf("Exhausted retries trying to get status for savepoint for job %s: %v", + application.Status.JobStatus.JobID, reason)) + } var restorePath string if savepointStatusResponse != nil && savepointStatusResponse.Operation.Location == "" && From e8b5facc955ae867cbb16e76921fd17eb52a027b Mon Sep 17 00:00:00 2001 From: glaksh100 Date: Mon, 9 Dec 2019 18:00:33 -0800 Subject: [PATCH 6/6] Fix nil response checks --- .../flinkapplication/flink_state_machine.go | 12 ++++++----- .../flink_state_machine_test.go | 21 +++++++------------ 2 files changed, 14 insertions(+), 19 deletions(-) diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index bee1515b..c5f868f3 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -336,13 +336,15 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a } var restorePath string - if savepointStatusResponse != nil && savepointStatusResponse.Operation.Location == "" && - savepointStatusResponse.SavepointStatus.Status != client.SavePointInProgress { + if rollback || (savepointStatusResponse.Operation.Location == "" && + savepointStatusResponse.SavepointStatus.Status != client.SavePointInProgress) { // Savepointing failed // TODO: we should probably retry this a few times before failing - s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "SavepointFailed", - fmt.Sprintf("Failed to take savepoint for job %s: %v", - application.Status.JobStatus.JobID, savepointStatusResponse.Operation.FailureCause)) + if savepointStatusResponse != nil { + s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "SavepointFailed", + fmt.Sprintf("Failed to take savepoint for job %s: %v", + application.Status.JobStatus.JobID, savepointStatusResponse.Operation.FailureCause)) + } // try to find an externalized checkpoint path, err := s.flinkController.FindExternalizedCheckpoint(ctx, application, application.Status.DeployHash) diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index c123c805..24a140a6 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -1291,13 +1291,7 @@ func TestCheckSavepointStatusFailing(t *testing.T) { stateMachineForTest := getTestStateMachine() mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) { - return &client.SavepointResponse{ - SavepointStatus: client.SavepointStatusResponse{ - Status: client.SavePointInvalid, - }, - Operation: client.SavepointOperationResponse{ - Location: "", - }}, retryableErr.(*v1beta1.FlinkApplicationError) + return nil, retryableErr.(*v1beta1.FlinkApplicationError) } mockFlinkController.FindExternalizedCheckpointFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) { @@ -1324,6 +1318,8 @@ func TestCheckSavepointStatusFailing(t *testing.T) { err = stateMachineForTest.Handle(context.Background(), &app) assert.Nil(t, err) assert.Equal(t, v1beta1.FlinkApplicationSubmittingJob, app.Status.Phase) + assert.Equal(t, "/tmp/checkpoint", app.Status.SavepointPath) + assert.Equal(t, "", app.Status.JobStatus.JobID) } func TestDeleteWhenCheckSavepointStatusFailing(t *testing.T) { @@ -1351,13 +1347,10 @@ func TestDeleteWhenCheckSavepointStatusFailing(t *testing.T) { stateMachineForTest := getTestStateMachine() mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) { - return &client.SavepointResponse{ - SavepointStatus: client.SavepointStatusResponse{ - Status: client.SavePointInvalid, - }, - Operation: client.SavepointOperationResponse{ - Location: "", - }}, retryableErr.(*v1beta1.FlinkApplicationError) + return nil, retryableErr.(*v1beta1.FlinkApplicationError) + } + mockFlinkController.CancelWithSavepointFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (s string, e error) { + return "triggerId", nil } mockRetryHandler := stateMachineForTest.retryHandler.(*mock.RetryHandler) mockRetryHandler.IsErrorRetryableFunc = func(err error) bool {