diff --git a/.gitignore b/.gitignore index d74cd78..dc2f5d4 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,7 @@ # vendor/ .DS_Store .idea/* +.vscode/* vendor dist @@ -24,4 +25,4 @@ gen/pb_python/flyteidl.egg-info/ .virtualgo /build/ -.venv \ No newline at end of file +.venv diff --git a/go.mod b/go.mod index a51eaa9..70e8ef6 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/golang/protobuf v1.5.2 github.com/hashicorp/go-version v1.4.0 github.com/pkg/errors v0.9.1 - github.com/spotify/flink-on-k8s-operator v0.4.0-beta.8 + github.com/spotify/flink-on-k8s-operator v0.4.1-beta.9 gotest.tools v2.2.0+incompatible k8s.io/api v0.22.8 k8s.io/apimachinery v0.22.8 @@ -48,7 +48,7 @@ require ( github.com/gofrs/uuid v4.2.0+incompatible // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/google/go-cmp v0.5.6 // indirect + github.com/google/go-cmp v0.5.7 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.2.0 // indirect github.com/googleapis/gax-go/v2 v2.0.5 // indirect @@ -85,10 +85,10 @@ require ( golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect - golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect + golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93 // indirect - golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect - golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d // indirect + golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect + golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect golang.org/x/tools v0.1.10 // indirect diff --git a/go.sum b/go.sum index a96ef09..55948f9 100644 --- a/go.sum +++ b/go.sum @@ -373,8 +373,9 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= @@ -601,7 +602,7 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.8.1/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.10.2/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.17.0 h1:9Luw4uT5HTjHTN8+aNcSThgH1vdXnmdJ8xIfZ4wyTRE= +github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= @@ -721,8 +722,8 @@ github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DM github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= -github.com/spotify/flink-on-k8s-operator v0.4.0-beta.8 h1:37h2XKi0VaSdAYk7o7U7t576axiVQJijieNrtC1zutE= -github.com/spotify/flink-on-k8s-operator v0.4.0-beta.8/go.mod h1:5KgtNwfw9SZyULzvkmgBGr1BK/3iTOQ3hpU9wIuHS6c= +github.com/spotify/flink-on-k8s-operator v0.4.1-beta.9 h1:GS+UOOIbAkbyBFTYwqoH+Id+KcuKX2Xan02hRh3CsSs= +github.com/spotify/flink-on-k8s-operator v0.4.1-beta.9/go.mod h1:oxkVvYFg/tZ7zyl97fk5q0GOzd9LdmAsMDVNQz0WnrA= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= @@ -909,8 +910,9 @@ golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20211209124913-491a49abca63 h1:iocB37TsdFuN6IBRZ+ry36wrkoV51/tl5vOWqkcPGvY= golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc= +golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1005,12 +1007,14 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 h1:id054HUawV2/6IGm2IV8KZQjqtwAOo2CYlOToYqa0d0= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d h1:SZxvLBoTP5yHO3Frd4z4vrF+DBX9vMVanchswa69toE= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1269,8 +1273,8 @@ gorm.io/gorm v1.22.4/go.mod h1:1aeVC+pe9ZmvKZban/gW4QPra7PRoTEssyc922qCAkk= gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= -gotest.tools/v3 v3.0.3 h1:4AuOwCGf4lLR9u3YOe2awrHygurzhO/HeQ6laiA6Sx0= gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8= +gotest.tools/v3 v3.1.0 h1:rVV8Tcg/8jHUkPUorwjaMTtemIMVXfIPKiOqnhEhakk= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/flink/config.go b/pkg/flink/config.go index dc591ad..495cfe7 100644 --- a/pkg/flink/config.go +++ b/pkg/flink/config.go @@ -26,6 +26,7 @@ type Config struct { LogConfig logs.LogConfig `json:"logs"` GeneratedNameMaxLength *int `json:"generatedNameMaxLength" pflag:"Specifies the length of TaskExecutionID generated name. default: 50"` RemoteClusterConfig ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for array jobs"` + NonRetryableExitCodes []int32 `json:"nonRetryableExitCodes" pfFlag:"Defines which job submitter exit codes should not be retried"` } func GetFlinkConfig() *Config { diff --git a/pkg/flink/constants.go b/pkg/flink/constants.go index 8bfbc99..456c543 100644 --- a/pkg/flink/constants.go +++ b/pkg/flink/constants.go @@ -41,6 +41,7 @@ const ( var ( regexpFlinkClusterName = regexp.MustCompile(`^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$`) generatedNameMaxLength = 50 + nonRetryableExitCodes = []int32{} defaultServiceAccount = "default" defaultConfig = Config{ DefaultFlinkCluster: flinkOp.FlinkCluster{ @@ -66,6 +67,7 @@ var ( }, }, GeneratedNameMaxLength: &generatedNameMaxLength, + NonRetryableExitCodes: nonRetryableExitCodes, } flinkConfigSection = pluginsConfig.MustRegisterSubSection("flink", &defaultConfig) diff --git a/pkg/flink/handler.go b/pkg/flink/handler.go index a8c2ada..456c5dc 100644 --- a/pkg/flink/handler.go +++ b/pkg/flink/handler.go @@ -140,7 +140,9 @@ func (h flinkResourceHandler) OnAbort(ctx context.Context, tCtx pluginsCore.Task var abortBehavior k8s.AbortBehavior annotationPatch, err := NewAnnotationPatch(flinkOp.ControlAnnotation, flinkOp.ControlNameJobCancel) + if err != nil { + logger.Error(ctx, "error observed in abort", err) return abortBehavior, err } @@ -236,6 +238,17 @@ func flinkClusterTaskInfo(ctx context.Context, flinkCluster *flinkOp.FlinkCluste }, nil } +func isSubmitterExitCodeRetryable(ctx context.Context, exitCode int32) bool { + config := GetFlinkConfig() + for _, ec := range config.NonRetryableExitCodes { + if exitCode == ec { + logger.Infof(ctx, "Found non-retryable exit code: %v", ec) + return false + } + } + return true +} + func flinkClusterJobPhaseInfo(ctx context.Context, jobStatus *flinkOp.JobStatus, occurredAt time.Time, info *pluginsCore.TaskInfo) pluginsCore.PhaseInfo { logger.Infof(ctx, "job_state: %s", jobStatus.State) @@ -252,7 +265,15 @@ func flinkClusterJobPhaseInfo(ctx context.Context, jobStatus *flinkOp.JobStatus, case flinkOp.JobStateUpdating, flinkOp.JobStatePending, flinkOp.JobStateDeploying, flinkOp.JobStateRestarting: return pluginsCore.PhaseInfoInitializing(occurredAt, pluginsCore.DefaultPhaseVersion, msg, info) case flinkOp.JobStateSucceeded: - return pluginsCore.PhaseInfoSuccess(info) + if jobStatus.SubmitterExitCode == 0 { + return pluginsCore.PhaseInfoSuccess(info) + } + if isSubmitterExitCodeRetryable(ctx, jobStatus.SubmitterExitCode) { + reason := fmt.Sprintf("Flink jobsubmitter exited with non-zero exit code: %v (retryable)", jobStatus.FailureReasons) + return pluginsCore.PhaseInfoRetryableFailure(errors.DownstreamSystemError, reason, info) + } + reason := fmt.Sprintf("Flink jobsubmitter exited with non-zero exit code: %v (non-retryable)", jobStatus.FailureReasons) + return pluginsCore.PhaseInfoFailure(errors.DownstreamSystemError, reason, info) default: msg := fmt.Sprintf("job id: %s with unknown state: %s", jobStatus.ID, jobStatus.State) return pluginsCore.PhaseInfoFailure(errors.DownstreamSystemError, msg, info)