Skip to content

Commit

Permalink
Failures during job startup should trigger immediate rollback (#100)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Sep 6, 2019
1 parent b427995 commit b1607d0
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 21 deletions.
13 changes: 10 additions & 3 deletions pkg/controller/flink/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1"
Expand Down Expand Up @@ -227,16 +228,22 @@ func (c *FlinkJobManagerClient) SubmitJob(ctx context.Context, url string, jarID
c.metrics.submitJobFailureCounter.Inc(ctx)
logger.Warnf(ctx, fmt.Sprintf("Job submission failed with response %v", response))
if response.StatusCode() > 499 {
return nil, GetRetryableError(err, v1beta1.SubmitJob, response.Status(), DefaultRetries, string(response.Body()))
// Flink returns a 500 when the entry class doesn't exist or crashes on start, but we want to fail fast
// in those cases
body := response.String()
if strings.Contains(body, "org.apache.flink.client.program.ProgramInvocationException") {
return nil, GetNonRetryableErrorWithMessage(err, v1beta1.SubmitJob, response.Status(), body)
}
return nil, GetRetryableErrorWithMessage(err, v1beta1.SubmitJob, response.Status(), DefaultRetries, string(response.Body()))
}

return nil, GetNonRetryableError(err, v1beta1.SubmitJob, response.Status(), string(response.Body()))
return nil, GetNonRetryableErrorWithMessage(err, v1beta1.SubmitJob, response.Status(), string(response.Body()))
}

var submitJobResponse SubmitJobResponse
if err = json.Unmarshal(response.Body(), &submitJobResponse); err != nil {
logger.Errorf(ctx, "Unable to Unmarshal submitJobResponse %v, err: %v", response, err)
return nil, GetRetryableError(err, v1beta1.SubmitJob, response.Status(), DefaultRetries, JSONUnmarshalError)
return nil, GetRetryableErrorWithMessage(err, v1beta1.SubmitJob, response.Status(), DefaultRetries, JSONUnmarshalError)
}

c.metrics.submitJobSuccessCounter.Inc(ctx)
Expand Down
44 changes: 36 additions & 8 deletions pkg/controller/flink/client/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"testing"

"github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1"

"github.com/jarcoal/httpmock"
mockScope "github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/promutils/labeled"
Expand All @@ -17,6 +19,7 @@ import (

const testURL = "http://abc.com"
const invalidTestResponse = "invalid response"
const wrongEntryClassResponse = `{"errors":["Internal server error.","<Exception on server side:\norg.apache.flink.client.program.ProgramInvocationException: The program's entry point class 'com.lyft.streamingplatform.OperatorTestAppX' was not found in the jar file.\n\tat org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:617)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:199)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:149)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:125)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.ClassNotFoundException: com.lyft.streamingplatform.OperatorTestAppX\n\tat java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:424)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:357)\n\tat java.lang.Class.forName0(Native Method)\n\tat java.lang.Class.forName(Class.java:348)\n\tat org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:614)\n\t... 8 more\n\nEnd of exception on server side>"]}`
const fakeJobsURL = "http://abc.com/jobs"
const fakeOverviewURL = "http://abc.com/overview"
const fakeJobConfigURL = "http://abc.com/jobs/1/config"
Expand Down Expand Up @@ -79,7 +82,7 @@ func TestGetJobs500Response(t *testing.T) {
client := getTestJobManagerClient()
resp, err := client.GetJobs(ctx, testURL)
assert.Nil(t, resp)
assert.EqualError(t, err, "GetJobs call failed with status 500 and message []")
assert.EqualError(t, err, "GetJobs call failed with status 500 and message ''")
}

func TestGetJobsError(t *testing.T) {
Expand Down Expand Up @@ -150,7 +153,7 @@ func TestGetCluster500Response(t *testing.T) {
client := getTestJobManagerClient()
resp, err := client.GetClusterOverview(ctx, testURL)
assert.Nil(t, resp)
assert.EqualError(t, err, "GetClusterOverview call failed with status 500 and message []")
assert.EqualError(t, err, "GetClusterOverview call failed with status 500 and message ''")
}

func TestGetCluster503Response(t *testing.T) {
Expand All @@ -163,7 +166,7 @@ func TestGetCluster503Response(t *testing.T) {
client := getTestJobManagerClient()
resp, err := client.GetClusterOverview(ctx, testURL)
assert.Nil(t, resp)
assert.EqualError(t, err, "GetClusterOverview call failed with status 503 and message []")
assert.EqualError(t, err, "GetClusterOverview call failed with status 503 and message ''")
}

func TestGetClusterOverviewError(t *testing.T) {
Expand Down Expand Up @@ -218,7 +221,7 @@ func TestGetJobConfig500Response(t *testing.T) {
client := getTestJobManagerClient()
resp, err := client.GetJobConfig(ctx, testURL, "1")
assert.Nil(t, resp)
assert.EqualError(t, err, "GetJobConfig call failed with status 500 and message []")
assert.EqualError(t, err, "GetJobConfig call failed with status 500 and message ''")
}

func TestGetJobConfigError(t *testing.T) {
Expand Down Expand Up @@ -275,7 +278,7 @@ func TestCheckSavepoint500Response(t *testing.T) {
client := getTestJobManagerClient()
resp, err := client.CheckSavepointStatus(ctx, testURL, "1", "2")
assert.Nil(t, resp)
assert.EqualError(t, err, "CheckSavepointStatus call failed with status 500 and message []")
assert.EqualError(t, err, "CheckSavepointStatus call failed with status 500 and message ''")
}

func TestCheckSavepointError(t *testing.T) {
Expand Down Expand Up @@ -336,7 +339,32 @@ func TestSubmitJob500Response(t *testing.T) {
Parallelism: 10,
})
assert.Nil(t, resp)
assert.EqualError(t, err, "SubmitJob call failed with status 500 and message [could not submit]")

assert.EqualError(t, err, "SubmitJob call failed with status 500 and message 'could not submit'")
}

func TestSubmitStartupFail(t *testing.T) {
// Tests the case where we submit a job that fails to start up we want to immediately roll back, even though
// the Flink API unhelpfully returns a 500 in that case
httpmock.Activate()
defer httpmock.DeactivateAndReset()
ctx := context.Background()

//var message json.RawMessage
//err := json.Unmarshal([]byte(wrongEntryClassResponse), message)
responder := httpmock.NewStringResponder(500, wrongEntryClassResponse)
httpmock.RegisterResponder("POST", fakeSubmitURL, responder)

client := getTestJobManagerClient()
resp, err := client.SubmitJob(ctx, testURL, "1", SubmitJobRequest{
Parallelism: 10,
})
assert.Nil(t, resp)
flinkAppError, _ := err.(*v1beta1.FlinkApplicationError)
assert.True(t, flinkAppError.IsFailFast)

assert.EqualError(t, err, "SubmitJob call failed with status 500 and message '"+
wrongEntryClassResponse+"'")
}

func TestSubmitJobError(t *testing.T) {
Expand Down Expand Up @@ -393,7 +421,7 @@ func TestCancelJob500Response(t *testing.T) {
client := getTestJobManagerClient()
resp, err := client.CancelJobWithSavepoint(ctx, testURL, "1")
assert.Empty(t, resp)
assert.EqualError(t, err, "CancelJobWithSavepoint call failed with status 500 and message []")
assert.EqualError(t, err, "CancelJobWithSavepoint call failed with status 500 and message ''")
}

func TestCancelJobError(t *testing.T) {
Expand Down Expand Up @@ -426,7 +454,7 @@ func TestHttpGetNon200Response(t *testing.T) {
client := getTestJobManagerClient()
_, err := client.GetJobs(ctx, testURL)
assert.NotNil(t, err)
assert.EqualError(t, err, "GetJobs call failed with status 500 and message []")
assert.EqualError(t, err, "GetJobs call failed with status 500 and message ''")
}

func TestClientInvalidMethod(t *testing.T) {
Expand Down
19 changes: 14 additions & 5 deletions pkg/controller/flink/client/error_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,31 @@ const (
NoRetries = 0
)

func GetRetryableError(err error, method v1beta1.FlinkMethod, errorCode string, maxRetries int32, message ...string) error {
func GetRetryableError(err error, method v1beta1.FlinkMethod, errorCode string, maxRetries int32) error {
return GetRetryableErrorWithMessage(err, method, errorCode, maxRetries, "")
}

func GetRetryableErrorWithMessage(err error, method v1beta1.FlinkMethod, errorCode string, maxRetries int32, message string) error {
appError := getErrorValue(err, method, errorCode, message)
return NewFlinkApplicationError(appError.Error(), method, errorCode, true, false, maxRetries)
}

func GetNonRetryableError(err error, method v1beta1.FlinkMethod, errorCode string, message ...string) error {
func GetNonRetryableError(err error, method v1beta1.FlinkMethod, errorCode string) error {
return GetNonRetryableErrorWithMessage(err, method, errorCode, "")
}

func GetNonRetryableErrorWithMessage(err error, method v1beta1.FlinkMethod, errorCode string, message string) error {
appError := getErrorValue(err, method, errorCode, message)
return NewFlinkApplicationError(appError.Error(), method, errorCode, false, true, NoRetries)
}

func getErrorValue(err error, method v1beta1.FlinkMethod, errorCode string, message []string) error {
func getErrorValue(err error, method v1beta1.FlinkMethod, errorCode string, message string) error {
if err == nil {
return errors.New(fmt.Sprintf("%v call failed with status %v and message %v", method, errorCode, message))
return errors.New(fmt.Sprintf("%v call failed with status %v and message '%s'", method, errorCode, message))
}
return errors.Wrapf(err, "%v call failed with status %v and message %v", method, errorCode, message)
return errors.Wrapf(err, "%v call failed with status %v and message '%s'", method, errorCode, message)
}

func min(a, b int) int {
if a < b {
return a
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/flink/client/error_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ func getTestRetryer() RetryHandler {
func TestGetError(t *testing.T) {
testErr := errors.New("Service unavailable")
ferr := GetNonRetryableError(testErr, "GetTest", "500")
assert.Equal(t, "GetTest call failed with status 500 and message []: Service unavailable", ferr.Error())
assert.Equal(t, "GetTest call failed with status 500 and message '': Service unavailable", ferr.Error())

//nil error
ferrNil := GetNonRetryableError(nil, "GetTest", "500")
assert.Equal(t, "GetTest call failed with status 500 and message []", ferrNil.Error())
assert.Equal(t, "GetTest call failed with status 500 and message ''", ferrNil.Error())

testWrappedErr := errors.Wrap(testErr, "Wrapped errors")
ferrWrapped := GetNonRetryableError(testWrappedErr, "GetTestWrapped", "400")
assert.Equal(t, "GetTestWrapped call failed with status 400 and message []: Wrapped errors: Service unavailable", ferrWrapped.Error())
assert.Equal(t, "GetTestWrapped call failed with status 400 and message '': Wrapped errors: Service unavailable", ferrWrapped.Error())

testMessageErr := errors.New("Test Error")
ferrMessage := GetNonRetryableError(testMessageErr, "GetTest", "500", "message1", "message2")
assert.Equal(t, "GetTest call failed with status 500 and message [message1 message2]: Test Error", ferrMessage.Error())
ferrMessage := GetNonRetryableErrorWithMessage(testMessageErr, "GetTest", "500", "message")
assert.Equal(t, "GetTest call failed with status 500 and message 'message': Test Error", ferrMessage.Error())
}

func TestErrors(t *testing.T) {
Expand Down

0 comments on commit b1607d0

Please sign in to comment.