Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STRMCMP-569] Failures during job startup should trigger immediate rollback #100

Merged
merged 1 commit into from
Sep 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions pkg/controller/flink/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1"
Expand Down Expand Up @@ -227,16 +228,22 @@ func (c *FlinkJobManagerClient) SubmitJob(ctx context.Context, url string, jarID
c.metrics.submitJobFailureCounter.Inc(ctx)
logger.Warnf(ctx, fmt.Sprintf("Job submission failed with response %v", response))
if response.StatusCode() > 499 {
return nil, GetRetryableError(err, v1beta1.SubmitJob, response.Status(), DefaultRetries, string(response.Body()))
// Flink returns a 500 when the entry class doesn't exist or crashes on start, but we want to fail fast
// in those cases
body := response.String()
if strings.Contains(body, "org.apache.flink.client.program.ProgramInvocationException") {
return nil, GetNonRetryableErrorWithMessage(err, v1beta1.SubmitJob, response.Status(), body)
}
return nil, GetRetryableErrorWithMessage(err, v1beta1.SubmitJob, response.Status(), DefaultRetries, string(response.Body()))
}

return nil, GetNonRetryableError(err, v1beta1.SubmitJob, response.Status(), string(response.Body()))
return nil, GetNonRetryableErrorWithMessage(err, v1beta1.SubmitJob, response.Status(), string(response.Body()))
}

var submitJobResponse SubmitJobResponse
if err = json.Unmarshal(response.Body(), &submitJobResponse); err != nil {
logger.Errorf(ctx, "Unable to Unmarshal submitJobResponse %v, err: %v", response, err)
return nil, GetRetryableError(err, v1beta1.SubmitJob, response.Status(), DefaultRetries, JSONUnmarshalError)
return nil, GetRetryableErrorWithMessage(err, v1beta1.SubmitJob, response.Status(), DefaultRetries, JSONUnmarshalError)
}

c.metrics.submitJobSuccessCounter.Inc(ctx)
Expand Down
44 changes: 36 additions & 8 deletions pkg/controller/flink/client/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"testing"

"github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1"

"github.com/jarcoal/httpmock"
mockScope "github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/promutils/labeled"
Expand All @@ -17,6 +19,7 @@ import (

const testURL = "http://abc.com"
const invalidTestResponse = "invalid response"
const wrongEntryClassResponse = `{"errors":["Internal server error.","<Exception on server side:\norg.apache.flink.client.program.ProgramInvocationException: The program's entry point class 'com.lyft.streamingplatform.OperatorTestAppX' was not found in the jar file.\n\tat org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:617)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:199)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:149)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:125)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.ClassNotFoundException: com.lyft.streamingplatform.OperatorTestAppX\n\tat java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:424)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:357)\n\tat java.lang.Class.forName0(Native Method)\n\tat java.lang.Class.forName(Class.java:348)\n\tat org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:614)\n\t... 8 more\n\nEnd of exception on server side>"]}`
const fakeJobsURL = "http://abc.com/jobs"
const fakeOverviewURL = "http://abc.com/overview"
const fakeJobConfigURL = "http://abc.com/jobs/1/config"
Expand Down Expand Up @@ -79,7 +82,7 @@ func TestGetJobs500Response(t *testing.T) {
client := getTestJobManagerClient()
resp, err := client.GetJobs(ctx, testURL)
assert.Nil(t, resp)
assert.EqualError(t, err, "GetJobs call failed with status 500 and message []")
assert.EqualError(t, err, "GetJobs call failed with status 500 and message ''")
}

func TestGetJobsError(t *testing.T) {
Expand Down Expand Up @@ -150,7 +153,7 @@ func TestGetCluster500Response(t *testing.T) {
client := getTestJobManagerClient()
resp, err := client.GetClusterOverview(ctx, testURL)
assert.Nil(t, resp)
assert.EqualError(t, err, "GetClusterOverview call failed with status 500 and message []")
assert.EqualError(t, err, "GetClusterOverview call failed with status 500 and message ''")
}

func TestGetCluster503Response(t *testing.T) {
Expand All @@ -163,7 +166,7 @@ func TestGetCluster503Response(t *testing.T) {
client := getTestJobManagerClient()
resp, err := client.GetClusterOverview(ctx, testURL)
assert.Nil(t, resp)
assert.EqualError(t, err, "GetClusterOverview call failed with status 503 and message []")
assert.EqualError(t, err, "GetClusterOverview call failed with status 503 and message ''")
}

func TestGetClusterOverviewError(t *testing.T) {
Expand Down Expand Up @@ -218,7 +221,7 @@ func TestGetJobConfig500Response(t *testing.T) {
client := getTestJobManagerClient()
resp, err := client.GetJobConfig(ctx, testURL, "1")
assert.Nil(t, resp)
assert.EqualError(t, err, "GetJobConfig call failed with status 500 and message []")
assert.EqualError(t, err, "GetJobConfig call failed with status 500 and message ''")
}

func TestGetJobConfigError(t *testing.T) {
Expand Down Expand Up @@ -275,7 +278,7 @@ func TestCheckSavepoint500Response(t *testing.T) {
client := getTestJobManagerClient()
resp, err := client.CheckSavepointStatus(ctx, testURL, "1", "2")
assert.Nil(t, resp)
assert.EqualError(t, err, "CheckSavepointStatus call failed with status 500 and message []")
assert.EqualError(t, err, "CheckSavepointStatus call failed with status 500 and message ''")
}

func TestCheckSavepointError(t *testing.T) {
Expand Down Expand Up @@ -336,7 +339,32 @@ func TestSubmitJob500Response(t *testing.T) {
Parallelism: 10,
})
assert.Nil(t, resp)
assert.EqualError(t, err, "SubmitJob call failed with status 500 and message [could not submit]")

assert.EqualError(t, err, "SubmitJob call failed with status 500 and message 'could not submit'")
}

func TestSubmitStartupFail(t *testing.T) {
// Tests the case where we submit a job that fails to start up we want to immediately roll back, even though
// the Flink API unhelpfully returns a 500 in that case
httpmock.Activate()
defer httpmock.DeactivateAndReset()
ctx := context.Background()

//var message json.RawMessage
//err := json.Unmarshal([]byte(wrongEntryClassResponse), message)
responder := httpmock.NewStringResponder(500, wrongEntryClassResponse)
httpmock.RegisterResponder("POST", fakeSubmitURL, responder)

client := getTestJobManagerClient()
resp, err := client.SubmitJob(ctx, testURL, "1", SubmitJobRequest{
Parallelism: 10,
})
assert.Nil(t, resp)
flinkAppError, _ := err.(*v1beta1.FlinkApplicationError)
assert.True(t, flinkAppError.IsFailFast)

assert.EqualError(t, err, "SubmitJob call failed with status 500 and message '"+
wrongEntryClassResponse+"'")
}

func TestSubmitJobError(t *testing.T) {
Expand Down Expand Up @@ -393,7 +421,7 @@ func TestCancelJob500Response(t *testing.T) {
client := getTestJobManagerClient()
resp, err := client.CancelJobWithSavepoint(ctx, testURL, "1")
assert.Empty(t, resp)
assert.EqualError(t, err, "CancelJobWithSavepoint call failed with status 500 and message []")
assert.EqualError(t, err, "CancelJobWithSavepoint call failed with status 500 and message ''")
}

func TestCancelJobError(t *testing.T) {
Expand Down Expand Up @@ -426,7 +454,7 @@ func TestHttpGetNon200Response(t *testing.T) {
client := getTestJobManagerClient()
_, err := client.GetJobs(ctx, testURL)
assert.NotNil(t, err)
assert.EqualError(t, err, "GetJobs call failed with status 500 and message []")
assert.EqualError(t, err, "GetJobs call failed with status 500 and message ''")
}

func TestClientInvalidMethod(t *testing.T) {
Expand Down
19 changes: 14 additions & 5 deletions pkg/controller/flink/client/error_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,31 @@ const (
NoRetries = 0
)

func GetRetryableError(err error, method v1beta1.FlinkMethod, errorCode string, maxRetries int32, message ...string) error {
func GetRetryableError(err error, method v1beta1.FlinkMethod, errorCode string, maxRetries int32) error {
return GetRetryableErrorWithMessage(err, method, errorCode, maxRetries, "")
}

func GetRetryableErrorWithMessage(err error, method v1beta1.FlinkMethod, errorCode string, maxRetries int32, message string) error {
appError := getErrorValue(err, method, errorCode, message)
return NewFlinkApplicationError(appError.Error(), method, errorCode, true, false, maxRetries)
}

func GetNonRetryableError(err error, method v1beta1.FlinkMethod, errorCode string, message ...string) error {
func GetNonRetryableError(err error, method v1beta1.FlinkMethod, errorCode string) error {
return GetNonRetryableErrorWithMessage(err, method, errorCode, "")
}

func GetNonRetryableErrorWithMessage(err error, method v1beta1.FlinkMethod, errorCode string, message string) error {
appError := getErrorValue(err, method, errorCode, message)
return NewFlinkApplicationError(appError.Error(), method, errorCode, false, true, NoRetries)
}

func getErrorValue(err error, method v1beta1.FlinkMethod, errorCode string, message []string) error {
func getErrorValue(err error, method v1beta1.FlinkMethod, errorCode string, message string) error {
if err == nil {
return errors.New(fmt.Sprintf("%v call failed with status %v and message %v", method, errorCode, message))
return errors.New(fmt.Sprintf("%v call failed with status %v and message '%s'", method, errorCode, message))
}
return errors.Wrapf(err, "%v call failed with status %v and message %v", method, errorCode, message)
return errors.Wrapf(err, "%v call failed with status %v and message '%s'", method, errorCode, message)
}

func min(a, b int) int {
if a < b {
return a
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/flink/client/error_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ func getTestRetryer() RetryHandler {
func TestGetError(t *testing.T) {
testErr := errors.New("Service unavailable")
ferr := GetNonRetryableError(testErr, "GetTest", "500")
assert.Equal(t, "GetTest call failed with status 500 and message []: Service unavailable", ferr.Error())
assert.Equal(t, "GetTest call failed with status 500 and message '': Service unavailable", ferr.Error())

//nil error
ferrNil := GetNonRetryableError(nil, "GetTest", "500")
assert.Equal(t, "GetTest call failed with status 500 and message []", ferrNil.Error())
assert.Equal(t, "GetTest call failed with status 500 and message ''", ferrNil.Error())

testWrappedErr := errors.Wrap(testErr, "Wrapped errors")
ferrWrapped := GetNonRetryableError(testWrappedErr, "GetTestWrapped", "400")
assert.Equal(t, "GetTestWrapped call failed with status 400 and message []: Wrapped errors: Service unavailable", ferrWrapped.Error())
assert.Equal(t, "GetTestWrapped call failed with status 400 and message '': Wrapped errors: Service unavailable", ferrWrapped.Error())

testMessageErr := errors.New("Test Error")
ferrMessage := GetNonRetryableError(testMessageErr, "GetTest", "500", "message1", "message2")
assert.Equal(t, "GetTest call failed with status 500 and message [message1 message2]: Test Error", ferrMessage.Error())
ferrMessage := GetNonRetryableErrorWithMessage(testMessageErr, "GetTest", "500", "message")
assert.Equal(t, "GetTest call failed with status 500 and message 'message': Test Error", ferrMessage.Error())
}

func TestErrors(t *testing.T) {
Expand Down