diff --git a/pkg/controller/flink/client/api.go b/pkg/controller/flink/client/api.go index 527f82e0..3f88b1a7 100644 --- a/pkg/controller/flink/client/api.go +++ b/pkg/controller/flink/client/api.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "time" "github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1" @@ -227,16 +228,22 @@ func (c *FlinkJobManagerClient) SubmitJob(ctx context.Context, url string, jarID c.metrics.submitJobFailureCounter.Inc(ctx) logger.Warnf(ctx, fmt.Sprintf("Job submission failed with response %v", response)) if response.StatusCode() > 499 { - return nil, GetRetryableError(err, v1beta1.SubmitJob, response.Status(), DefaultRetries, string(response.Body())) + // Flink returns a 500 when the entry class doesn't exist or crashes on start, but we want to fail fast + // in those cases + body := response.String() + if strings.Contains(body, "org.apache.flink.client.program.ProgramInvocationException") { + return nil, GetNonRetryableErrorWithMessage(err, v1beta1.SubmitJob, response.Status(), body) + } + return nil, GetRetryableErrorWithMessage(err, v1beta1.SubmitJob, response.Status(), DefaultRetries, string(response.Body())) } - return nil, GetNonRetryableError(err, v1beta1.SubmitJob, response.Status(), string(response.Body())) + return nil, GetNonRetryableErrorWithMessage(err, v1beta1.SubmitJob, response.Status(), string(response.Body())) } var submitJobResponse SubmitJobResponse if err = json.Unmarshal(response.Body(), &submitJobResponse); err != nil { logger.Errorf(ctx, "Unable to Unmarshal submitJobResponse %v, err: %v", response, err) - return nil, GetRetryableError(err, v1beta1.SubmitJob, response.Status(), DefaultRetries, JSONUnmarshalError) + return nil, GetRetryableErrorWithMessage(err, v1beta1.SubmitJob, response.Status(), DefaultRetries, JSONUnmarshalError) } c.metrics.submitJobSuccessCounter.Inc(ctx) diff --git a/pkg/controller/flink/client/api_test.go b/pkg/controller/flink/client/api_test.go index c177217f..c3953921 100644 --- a/pkg/controller/flink/client/api_test.go +++ b/pkg/controller/flink/client/api_test.go @@ -4,6 +4,8 @@ import ( "context" "testing" + "github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1" + "github.com/jarcoal/httpmock" mockScope "github.com/lyft/flytestdlib/promutils" "github.com/lyft/flytestdlib/promutils/labeled" @@ -17,6 +19,7 @@ import ( const testURL = "http://abc.com" const invalidTestResponse = "invalid response" +const wrongEntryClassResponse = `{"errors":["Internal server error.","(PackagedProgram.java:199)\n\tat org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:149)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:125)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.ClassNotFoundException: com.lyft.streamingplatform.OperatorTestAppX\n\tat java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:424)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:357)\n\tat java.lang.Class.forName0(Native Method)\n\tat java.lang.Class.forName(Class.java:348)\n\tat org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:614)\n\t... 8 more\n\nEnd of exception on server side>"]}` const fakeJobsURL = "http://abc.com/jobs" const fakeOverviewURL = "http://abc.com/overview" const fakeJobConfigURL = "http://abc.com/jobs/1/config" @@ -79,7 +82,7 @@ func TestGetJobs500Response(t *testing.T) { client := getTestJobManagerClient() resp, err := client.GetJobs(ctx, testURL) assert.Nil(t, resp) - assert.EqualError(t, err, "GetJobs call failed with status 500 and message []") + assert.EqualError(t, err, "GetJobs call failed with status 500 and message ''") } func TestGetJobsError(t *testing.T) { @@ -150,7 +153,7 @@ func TestGetCluster500Response(t *testing.T) { client := getTestJobManagerClient() resp, err := client.GetClusterOverview(ctx, testURL) assert.Nil(t, resp) - assert.EqualError(t, err, "GetClusterOverview call failed with status 500 and message []") + assert.EqualError(t, err, "GetClusterOverview call failed with status 500 and message ''") } func TestGetCluster503Response(t *testing.T) { @@ -163,7 +166,7 @@ func TestGetCluster503Response(t *testing.T) { client := getTestJobManagerClient() resp, err := client.GetClusterOverview(ctx, testURL) assert.Nil(t, resp) - assert.EqualError(t, err, "GetClusterOverview call failed with status 503 and message []") + assert.EqualError(t, err, "GetClusterOverview call failed with status 503 and message ''") } func TestGetClusterOverviewError(t *testing.T) { @@ -218,7 +221,7 @@ func TestGetJobConfig500Response(t *testing.T) { client := getTestJobManagerClient() resp, err := client.GetJobConfig(ctx, testURL, "1") assert.Nil(t, resp) - assert.EqualError(t, err, "GetJobConfig call failed with status 500 and message []") + assert.EqualError(t, err, "GetJobConfig call failed with status 500 and message ''") } func TestGetJobConfigError(t *testing.T) { @@ -275,7 +278,7 @@ func TestCheckSavepoint500Response(t *testing.T) { client := getTestJobManagerClient() resp, err := client.CheckSavepointStatus(ctx, testURL, "1", "2") assert.Nil(t, resp) - assert.EqualError(t, err, "CheckSavepointStatus call failed with status 500 and message []") + assert.EqualError(t, err, "CheckSavepointStatus call failed with status 500 and message ''") } func TestCheckSavepointError(t *testing.T) { @@ -336,7 +339,32 @@ func TestSubmitJob500Response(t *testing.T) { Parallelism: 10, }) assert.Nil(t, resp) - assert.EqualError(t, err, "SubmitJob call failed with status 500 and message [could not submit]") + + assert.EqualError(t, err, "SubmitJob call failed with status 500 and message 'could not submit'") +} + +func TestSubmitStartupFail(t *testing.T) { + // Tests the case where we submit a job that fails to start up we want to immediately roll back, even though + // the Flink API unhelpfully returns a 500 in that case + httpmock.Activate() + defer httpmock.DeactivateAndReset() + ctx := context.Background() + + //var message json.RawMessage + //err := json.Unmarshal([]byte(wrongEntryClassResponse), message) + responder := httpmock.NewStringResponder(500, wrongEntryClassResponse) + httpmock.RegisterResponder("POST", fakeSubmitURL, responder) + + client := getTestJobManagerClient() + resp, err := client.SubmitJob(ctx, testURL, "1", SubmitJobRequest{ + Parallelism: 10, + }) + assert.Nil(t, resp) + flinkAppError, _ := err.(*v1beta1.FlinkApplicationError) + assert.True(t, flinkAppError.IsFailFast) + + assert.EqualError(t, err, "SubmitJob call failed with status 500 and message '"+ + wrongEntryClassResponse+"'") } func TestSubmitJobError(t *testing.T) { @@ -393,7 +421,7 @@ func TestCancelJob500Response(t *testing.T) { client := getTestJobManagerClient() resp, err := client.CancelJobWithSavepoint(ctx, testURL, "1") assert.Empty(t, resp) - assert.EqualError(t, err, "CancelJobWithSavepoint call failed with status 500 and message []") + assert.EqualError(t, err, "CancelJobWithSavepoint call failed with status 500 and message ''") } func TestCancelJobError(t *testing.T) { @@ -426,7 +454,7 @@ func TestHttpGetNon200Response(t *testing.T) { client := getTestJobManagerClient() _, err := client.GetJobs(ctx, testURL) assert.NotNil(t, err) - assert.EqualError(t, err, "GetJobs call failed with status 500 and message []") + assert.EqualError(t, err, "GetJobs call failed with status 500 and message ''") } func TestClientInvalidMethod(t *testing.T) { diff --git a/pkg/controller/flink/client/error_handler.go b/pkg/controller/flink/client/error_handler.go index 175f14e2..83e3f5c2 100644 --- a/pkg/controller/flink/client/error_handler.go +++ b/pkg/controller/flink/client/error_handler.go @@ -21,22 +21,31 @@ const ( NoRetries = 0 ) -func GetRetryableError(err error, method v1beta1.FlinkMethod, errorCode string, maxRetries int32, message ...string) error { +func GetRetryableError(err error, method v1beta1.FlinkMethod, errorCode string, maxRetries int32) error { + return GetRetryableErrorWithMessage(err, method, errorCode, maxRetries, "") +} + +func GetRetryableErrorWithMessage(err error, method v1beta1.FlinkMethod, errorCode string, maxRetries int32, message string) error { appError := getErrorValue(err, method, errorCode, message) return NewFlinkApplicationError(appError.Error(), method, errorCode, true, false, maxRetries) } -func GetNonRetryableError(err error, method v1beta1.FlinkMethod, errorCode string, message ...string) error { +func GetNonRetryableError(err error, method v1beta1.FlinkMethod, errorCode string) error { + return GetNonRetryableErrorWithMessage(err, method, errorCode, "") +} + +func GetNonRetryableErrorWithMessage(err error, method v1beta1.FlinkMethod, errorCode string, message string) error { appError := getErrorValue(err, method, errorCode, message) return NewFlinkApplicationError(appError.Error(), method, errorCode, false, true, NoRetries) } -func getErrorValue(err error, method v1beta1.FlinkMethod, errorCode string, message []string) error { +func getErrorValue(err error, method v1beta1.FlinkMethod, errorCode string, message string) error { if err == nil { - return errors.New(fmt.Sprintf("%v call failed with status %v and message %v", method, errorCode, message)) + return errors.New(fmt.Sprintf("%v call failed with status %v and message '%s'", method, errorCode, message)) } - return errors.Wrapf(err, "%v call failed with status %v and message %v", method, errorCode, message) + return errors.Wrapf(err, "%v call failed with status %v and message '%s'", method, errorCode, message) } + func min(a, b int) int { if a < b { return a diff --git a/pkg/controller/flink/client/error_handler_test.go b/pkg/controller/flink/client/error_handler_test.go index 075f64a0..fd6ee52a 100644 --- a/pkg/controller/flink/client/error_handler_test.go +++ b/pkg/controller/flink/client/error_handler_test.go @@ -18,19 +18,19 @@ func getTestRetryer() RetryHandler { func TestGetError(t *testing.T) { testErr := errors.New("Service unavailable") ferr := GetNonRetryableError(testErr, "GetTest", "500") - assert.Equal(t, "GetTest call failed with status 500 and message []: Service unavailable", ferr.Error()) + assert.Equal(t, "GetTest call failed with status 500 and message '': Service unavailable", ferr.Error()) //nil error ferrNil := GetNonRetryableError(nil, "GetTest", "500") - assert.Equal(t, "GetTest call failed with status 500 and message []", ferrNil.Error()) + assert.Equal(t, "GetTest call failed with status 500 and message ''", ferrNil.Error()) testWrappedErr := errors.Wrap(testErr, "Wrapped errors") ferrWrapped := GetNonRetryableError(testWrappedErr, "GetTestWrapped", "400") - assert.Equal(t, "GetTestWrapped call failed with status 400 and message []: Wrapped errors: Service unavailable", ferrWrapped.Error()) + assert.Equal(t, "GetTestWrapped call failed with status 400 and message '': Wrapped errors: Service unavailable", ferrWrapped.Error()) testMessageErr := errors.New("Test Error") - ferrMessage := GetNonRetryableError(testMessageErr, "GetTest", "500", "message1", "message2") - assert.Equal(t, "GetTest call failed with status 500 and message [message1 message2]: Test Error", ferrMessage.Error()) + ferrMessage := GetNonRetryableErrorWithMessage(testMessageErr, "GetTest", "500", "message") + assert.Equal(t, "GetTest call failed with status 500 and message 'message': Test Error", ferrMessage.Error()) } func TestErrors(t *testing.T) {