diff --git a/pkg/controller/flink/client/api.go b/pkg/controller/flink/client/api.go index 3b93ca4e..fe173045 100644 --- a/pkg/controller/flink/client/api.go +++ b/pkg/controller/flink/client/api.go @@ -37,6 +37,7 @@ const httpPatch = "PATCH" const retryCount = 3 const httpGetTimeOut = 5 * time.Second const defaultTimeOut = 1 * time.Minute +const checkSavepointStatusRetries = 3 // ProgramInvocationException is thrown when the entry class doesn't exist or throws an exception const programInvocationException = "org.apache.flink.client.program.ProgramInvocationException" @@ -264,17 +265,17 @@ func (c *FlinkJobManagerClient) CheckSavepointStatus(ctx context.Context, url st response, err := c.executeRequest(ctx, httpGet, url, nil) if err != nil { c.metrics.checkSavepointFailureCounter.Inc(ctx) - return nil, GetRetryableError(err, v1beta1.CheckSavepointStatus, GlobalFailure, DefaultRetries) + return nil, GetRetryableError(err, v1beta1.CheckSavepointStatus, GlobalFailure, checkSavepointStatusRetries) } if response != nil && !response.IsSuccess() { c.metrics.checkSavepointFailureCounter.Inc(ctx) logger.Errorf(ctx, fmt.Sprintf("Check savepoint status failed with response %v", response)) - return nil, GetRetryableError(err, v1beta1.CheckSavepointStatus, response.Status(), DefaultRetries) + return nil, GetRetryableError(err, v1beta1.CheckSavepointStatus, response.Status(), checkSavepointStatusRetries) } var savepointResponse SavepointResponse if err = json.Unmarshal(response.Body(), &savepointResponse); err != nil { logger.Errorf(ctx, "Unable to Unmarshal savepointResponse %v, err: %v", response, err) - return nil, GetRetryableError(err, v1beta1.CheckSavepointStatus, JSONUnmarshalError, DefaultRetries) + return nil, GetRetryableError(err, v1beta1.CheckSavepointStatus, JSONUnmarshalError, checkSavepointStatusRetries) } c.metrics.cancelJobSuccessCounter.Inc(ctx) return &savepointResponse, nil diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index 2482e906..c5f868f3 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -160,7 +160,7 @@ func (s *FlinkStateMachine) handle(ctx context.Context, application *v1beta1.Fli return statusChanged, nil } - if s.IsTimeToHandlePhase(application) { + if s.IsTimeToHandlePhase(application, appPhase) { if !v1beta1.IsRunningPhase(application.Status.Phase) { logger.Infof(ctx, "Handling state for application") } @@ -193,7 +193,14 @@ func (s *FlinkStateMachine) handle(ctx context.Context, application *v1beta1.Fli return updateApplication || updateLastSeenError, appErr } -func (s *FlinkStateMachine) IsTimeToHandlePhase(application *v1beta1.FlinkApplication) bool { +func (s *FlinkStateMachine) IsTimeToHandlePhase(application *v1beta1.FlinkApplication, phase v1beta1.FlinkApplicationPhase) bool { + if phase == v1beta1.FlinkApplicationDeleting { + // reset lastSeenError and retryCount in case the application was failing in its previous phase + // We always want a Deleting phase to be handled + application.Status.LastSeenError = nil + application.Status.RetryCount = 0 + return true + } lastSeenError := application.Status.LastSeenError if application.Status.LastSeenError == nil || !s.retryHandler.IsErrorRetryable(application.Status.LastSeenError) { return true @@ -312,18 +319,32 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a // check the savepoints in progress savepointStatusResponse, err := s.flinkController.GetSavepointStatus(ctx, application, application.Status.DeployHash) - if err != nil { + // Here shouldRollback() is used as a proxy to identify if we have exhausted all retries trying to GetSavepointStatus + // The application is NOT actually rolled back because we're in a spot where there may be no application running and we don't + // have information on the last successful savepoint. + // In the event that rollback evaluates to True, we don't return immediately but allow for the remaining steps to proceed as normal. + rollback, reason := s.shouldRollback(ctx, application) + + if err != nil && !rollback { return statusUnchanged, err } + if err != nil && rollback { + s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "SavepointStatusCheckFailed", + fmt.Sprintf("Exhausted retries trying to get status for savepoint for job %s: %v", + application.Status.JobStatus.JobID, reason)) + } + var restorePath string - if savepointStatusResponse.Operation.Location == "" && - savepointStatusResponse.SavepointStatus.Status != client.SavePointInProgress { + if rollback || (savepointStatusResponse.Operation.Location == "" && + savepointStatusResponse.SavepointStatus.Status != client.SavePointInProgress) { // Savepointing failed // TODO: we should probably retry this a few times before failing - s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "SavepointFailed", - fmt.Sprintf("Failed to take savepoint for job %s: %v", - application.Status.JobStatus.JobID, savepointStatusResponse.Operation.FailureCause)) + if savepointStatusResponse != nil { + s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "SavepointFailed", + fmt.Sprintf("Failed to take savepoint for job %s: %v", + application.Status.JobStatus.JobID, savepointStatusResponse.Operation.FailureCause)) + } // try to find an externalized checkpoint path, err := s.flinkController.FindExternalizedCheckpoint(ctx, application, application.Status.DeployHash) @@ -340,7 +361,7 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a path, flink.HashForApplication(application))) restorePath = path - } else if savepointStatusResponse.SavepointStatus.Status == client.SavePointCompleted { + } else if savepointStatusResponse != nil && savepointStatusResponse.SavepointStatus.Status == client.SavePointCompleted { s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "CanceledJob", fmt.Sprintf("Canceled job with savepoint %s", savepointStatusResponse.Operation.Location)) diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index 113360e2..24a140a6 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -1263,3 +1263,126 @@ func TestLastSeenErrTimeIsNil(t *testing.T) { assert.Nil(t, err) } + +func TestCheckSavepointStatusFailing(t *testing.T) { + oldHash := "old-hash-fail" + maxRetries := int32(1) + retryableErr := client.GetRetryableError(errors.New("blah"), "CheckSavepointStatus", "FAILED", 1) + app := v1beta1.FlinkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "flink", + }, + Spec: v1beta1.FlinkApplicationSpec{ + JarName: "job.jar", + Parallelism: 5, + EntryClass: "com.my.Class", + ProgramArgs: "--test", + }, + Status: v1beta1.FlinkApplicationStatus{ + Phase: v1beta1.FlinkApplicationSavepointing, + DeployHash: oldHash, + LastSeenError: retryableErr.(*v1beta1.FlinkApplicationError), + SavepointTriggerID: "trigger", + }, + } + app.Status.LastSeenError.LastErrorUpdateTime = nil + + stateMachineForTest := getTestStateMachine() + mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) + mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) { + return nil, retryableErr.(*v1beta1.FlinkApplicationError) + } + + mockFlinkController.FindExternalizedCheckpointFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) { + return "/tmp/checkpoint", nil + } + mockRetryHandler := stateMachineForTest.retryHandler.(*mock.RetryHandler) + mockRetryHandler.IsErrorRetryableFunc = func(err error) bool { + return true + } + mockRetryHandler.IsTimeToRetryFunc = func(clock clock.Clock, lastUpdatedTime time.Time, retryCount int32) bool { + return true + } + mockRetryHandler.IsRetryRemainingFunc = func(err error, retryCount int32) bool { + return retryCount < maxRetries + } + + err := stateMachineForTest.Handle(context.Background(), &app) + // 1 retry left + assert.NotNil(t, err) + assert.Equal(t, v1beta1.FlinkApplicationSavepointing, app.Status.Phase) + + // No retries left for CheckSavepointStatus + // The app should hence try to recover from an externalized checkpoint + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + assert.Equal(t, v1beta1.FlinkApplicationSubmittingJob, app.Status.Phase) + assert.Equal(t, "/tmp/checkpoint", app.Status.SavepointPath) + assert.Equal(t, "", app.Status.JobStatus.JobID) +} + +func TestDeleteWhenCheckSavepointStatusFailing(t *testing.T) { + retryableErr := client.GetRetryableError(errors.New("blah"), "CheckSavepointStatus", "FAILED", 1) + app := v1beta1.FlinkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "flink", + }, + Spec: v1beta1.FlinkApplicationSpec{ + JarName: "job.jar", + Parallelism: 5, + EntryClass: "com.my.Class", + ProgramArgs: "--test", + }, + Status: v1beta1.FlinkApplicationStatus{ + Phase: v1beta1.FlinkApplicationSavepointing, + DeployHash: "appHash", + LastSeenError: retryableErr.(*v1beta1.FlinkApplicationError), + SavepointTriggerID: "trigger", + }, + } + app.Status.LastSeenError.LastErrorUpdateTime = nil + + stateMachineForTest := getTestStateMachine() + mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) + mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) { + return nil, retryableErr.(*v1beta1.FlinkApplicationError) + } + mockFlinkController.CancelWithSavepointFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (s string, e error) { + return "triggerId", nil + } + mockRetryHandler := stateMachineForTest.retryHandler.(*mock.RetryHandler) + mockRetryHandler.IsErrorRetryableFunc = func(err error) bool { + return true + } + mockRetryHandler.IsRetryRemainingFunc = func(err error, retryCount int32) bool { + return true + } + err := stateMachineForTest.Handle(context.Background(), &app) + assert.NotNil(t, err) + assert.Equal(t, v1beta1.FlinkApplicationSavepointing, app.Status.Phase) + assert.NotNil(t, app.Status.LastSeenError) + // Try to force delete the app while it's in a savepointing state (with errors) + // We should handle the delete here + app.Status.Phase = v1beta1.FlinkApplicationDeleting + app.Spec.DeleteMode = v1beta1.DeleteModeForceCancel + + mockFlinkController.GetJobForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) { + assert.Equal(t, "appHash", hash) + return &client.FlinkJobOverview{ + JobID: "jobID", + State: client.Failing, + }, nil + } + + mockFlinkController.ForceCancelFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error { + return nil + } + err = stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + assert.Nil(t, app.Status.LastSeenError) + assert.Equal(t, int32(0), app.Status.RetryCount) + assert.Nil(t, app.GetFinalizers()) + +}