From c5f157be1914b6eac33abbb3e9e44b35a71fb60a Mon Sep 17 00:00:00 2001 From: Barna Kutassy Date: Tue, 21 Jun 2022 15:03:09 +0200 Subject: [PATCH 01/14] Add vscode gitignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index d74cd78..dc2f5d4 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,7 @@ # vendor/ .DS_Store .idea/* +.vscode/* vendor dist @@ -24,4 +25,4 @@ gen/pb_python/flyteidl.egg-info/ .virtualgo /build/ -.venv \ No newline at end of file +.venv From 036eb71a7a22b15d00517ab0acf75531ef9a5e47 Mon Sep 17 00:00:00 2001 From: Barna Kutassy Date: Tue, 21 Jun 2022 15:03:26 +0200 Subject: [PATCH 02/14] Update flink-on-k8s-operator --- go.mod | 10 +++++----- go.sum | 20 ++++++++++++-------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index a51eaa9..647f6b7 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/golang/protobuf v1.5.2 github.com/hashicorp/go-version v1.4.0 github.com/pkg/errors v0.9.1 - github.com/spotify/flink-on-k8s-operator v0.4.0-beta.8 + github.com/spotify/flink-on-k8s-operator v0.4.1-beta.6 gotest.tools v2.2.0+incompatible k8s.io/api v0.22.8 k8s.io/apimachinery v0.22.8 @@ -48,7 +48,7 @@ require ( github.com/gofrs/uuid v4.2.0+incompatible // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/google/go-cmp v0.5.6 // indirect + github.com/google/go-cmp v0.5.7 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.2.0 // indirect github.com/googleapis/gax-go/v2 v2.0.5 // indirect @@ -85,10 +85,10 @@ require ( golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect - golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect + golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93 // indirect - golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect - golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d // indirect + golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect + golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect golang.org/x/tools v0.1.10 // indirect diff --git a/go.sum b/go.sum index a96ef09..42e06bd 100644 --- a/go.sum +++ b/go.sum @@ -373,8 +373,9 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= @@ -601,7 +602,7 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.8.1/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.10.2/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.17.0 h1:9Luw4uT5HTjHTN8+aNcSThgH1vdXnmdJ8xIfZ4wyTRE= +github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= @@ -721,8 +722,8 @@ github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DM github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= -github.com/spotify/flink-on-k8s-operator v0.4.0-beta.8 h1:37h2XKi0VaSdAYk7o7U7t576axiVQJijieNrtC1zutE= -github.com/spotify/flink-on-k8s-operator v0.4.0-beta.8/go.mod h1:5KgtNwfw9SZyULzvkmgBGr1BK/3iTOQ3hpU9wIuHS6c= +github.com/spotify/flink-on-k8s-operator v0.4.1-beta.6 h1:9L9uxOEM7TJ4ciGrbC5ljPSUUdwo+AvRl4rGKGeBmtI= +github.com/spotify/flink-on-k8s-operator v0.4.1-beta.6/go.mod h1:oxkVvYFg/tZ7zyl97fk5q0GOzd9LdmAsMDVNQz0WnrA= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= @@ -909,8 +910,9 @@ golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20211209124913-491a49abca63 h1:iocB37TsdFuN6IBRZ+ry36wrkoV51/tl5vOWqkcPGvY= golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc= +golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1005,12 +1007,14 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 h1:id054HUawV2/6IGm2IV8KZQjqtwAOo2CYlOToYqa0d0= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d h1:SZxvLBoTP5yHO3Frd4z4vrF+DBX9vMVanchswa69toE= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1269,8 +1273,8 @@ gorm.io/gorm v1.22.4/go.mod h1:1aeVC+pe9ZmvKZban/gW4QPra7PRoTEssyc922qCAkk= gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= -gotest.tools/v3 v3.0.3 h1:4AuOwCGf4lLR9u3YOe2awrHygurzhO/HeQ6laiA6Sx0= gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8= +gotest.tools/v3 v3.1.0 h1:rVV8Tcg/8jHUkPUorwjaMTtemIMVXfIPKiOqnhEhakk= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= From 709b862059b9aaa1a5d94ace4f602ccc0c8b6571 Mon Sep 17 00:00:00 2001 From: Barna Kutassy Date: Tue, 21 Jun 2022 15:05:11 +0200 Subject: [PATCH 03/14] Add retryable exit codes to config --- pkg/flink/config.go | 1 + pkg/flink/constants.go | 2 ++ pkg/flink/handler.go | 16 ++++++++++++++++ 3 files changed, 19 insertions(+) diff --git a/pkg/flink/config.go b/pkg/flink/config.go index dc591ad..65efc4e 100644 --- a/pkg/flink/config.go +++ b/pkg/flink/config.go @@ -26,6 +26,7 @@ type Config struct { LogConfig logs.LogConfig `json:"logs"` GeneratedNameMaxLength *int `json:"generatedNameMaxLength" pflag:"Specifies the length of TaskExecutionID generated name. default: 50"` RemoteClusterConfig ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for array jobs"` + NonRetriableExitCodes []int32 `json:"nonRetriableExitCodes" pfFlag:"Defines which job submitter exit codes should not be retried"` } func GetFlinkConfig() *Config { diff --git a/pkg/flink/constants.go b/pkg/flink/constants.go index 8bfbc99..96946cf 100644 --- a/pkg/flink/constants.go +++ b/pkg/flink/constants.go @@ -41,6 +41,7 @@ const ( var ( regexpFlinkClusterName = regexp.MustCompile(`^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$`) generatedNameMaxLength = 50 + nonRetriableExitCodes = []int32{} defaultServiceAccount = "default" defaultConfig = Config{ DefaultFlinkCluster: flinkOp.FlinkCluster{ @@ -66,6 +67,7 @@ var ( }, }, GeneratedNameMaxLength: &generatedNameMaxLength, + NonRetriableExitCodes: nonRetriableExitCodes, } flinkConfigSection = pluginsConfig.MustRegisterSubSection("flink", &defaultConfig) diff --git a/pkg/flink/handler.go b/pkg/flink/handler.go index a8c2ada..e03369d 100644 --- a/pkg/flink/handler.go +++ b/pkg/flink/handler.go @@ -236,6 +236,15 @@ func flinkClusterTaskInfo(ctx context.Context, flinkCluster *flinkOp.FlinkCluste }, nil } +func isSubmitterExitCodeRetryable(exitCode int32) bool { + for _, ec := range GetFlinkConfig().NonRetriableExitCodes { + if exitCode == ec { + return false + } + } + return true +} + func flinkClusterJobPhaseInfo(ctx context.Context, jobStatus *flinkOp.JobStatus, occurredAt time.Time, info *pluginsCore.TaskInfo) pluginsCore.PhaseInfo { logger.Infof(ctx, "job_state: %s", jobStatus.State) @@ -244,6 +253,13 @@ func flinkClusterJobPhaseInfo(ctx context.Context, jobStatus *flinkOp.JobStatus, switch jobStatus.State { case flinkOp.JobStateCancelled: return pluginsCore.PhaseInfoFailure(errors.DownstreamSystemError, msg, info) + case flinkOp.JobStateSubmitterFailed: + if isSubmitterExitCodeRetryable(jobStatus.SubmitterExitCode) { + reason := fmt.Sprintf("Flink jobsubmitter exited with retryable non-zero exit code: %v", jobStatus.FailureReasons) + return pluginsCore.PhaseInfoRetryableFailure(errors.DownstreamSystemError, reason, info) + } + reason := fmt.Sprintf("Flink jobsubmitter exited with non-retryable non-zero exit code: %v", jobStatus.FailureReasons) + return pluginsCore.PhaseInfoFailure(errors.DownstreamSystemError, reason, info) case flinkOp.JobStateFailed, flinkOp.JobStateDeployFailed, flinkOp.JobStateLost: reason := fmt.Sprintf("Flink Job Failed with Error: %v", jobStatus.FailureReasons) return pluginsCore.PhaseInfoRetryableFailure(errors.DownstreamSystemError, reason, info) From 0bc1178a2966ed4f4483e3909c1d3bc6b0866f47 Mon Sep 17 00:00:00 2001 From: Barna Kutassy Date: Tue, 21 Jun 2022 17:08:32 +0200 Subject: [PATCH 04/14] Add debug logging --- pkg/flink/handler.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pkg/flink/handler.go b/pkg/flink/handler.go index e03369d..1bb7973 100644 --- a/pkg/flink/handler.go +++ b/pkg/flink/handler.go @@ -236,9 +236,13 @@ func flinkClusterTaskInfo(ctx context.Context, flinkCluster *flinkOp.FlinkCluste }, nil } -func isSubmitterExitCodeRetryable(exitCode int32) bool { - for _, ec := range GetFlinkConfig().NonRetriableExitCodes { +func isSubmitterExitCodeRetryable(ctx context.Context, exitCode int32) bool { + logger.Errorf(ctx, "job submitter failed: %v", exitCode) + config := GetFlinkConfig() + logger.Errorf(ctx, "Config: %v", config) + for _, ec := range config.NonRetriableExitCodes { if exitCode == ec { + logger.Errorf(ctx, "Found non-retriable exit CODE: %v", ec) return false } } @@ -254,11 +258,11 @@ func flinkClusterJobPhaseInfo(ctx context.Context, jobStatus *flinkOp.JobStatus, case flinkOp.JobStateCancelled: return pluginsCore.PhaseInfoFailure(errors.DownstreamSystemError, msg, info) case flinkOp.JobStateSubmitterFailed: - if isSubmitterExitCodeRetryable(jobStatus.SubmitterExitCode) { - reason := fmt.Sprintf("Flink jobsubmitter exited with retryable non-zero exit code: %v", jobStatus.FailureReasons) + if isSubmitterExitCodeRetryable(ctx, jobStatus.SubmitterExitCode) { + reason := fmt.Sprintf("Flink jobsubmitter exited with retriable non-zero exit code: %v", jobStatus.FailureReasons) return pluginsCore.PhaseInfoRetryableFailure(errors.DownstreamSystemError, reason, info) } - reason := fmt.Sprintf("Flink jobsubmitter exited with non-retryable non-zero exit code: %v", jobStatus.FailureReasons) + reason := fmt.Sprintf("Flink jobsubmitter exited with non-retriable non-zero exit code: %v", jobStatus.FailureReasons) return pluginsCore.PhaseInfoFailure(errors.DownstreamSystemError, reason, info) case flinkOp.JobStateFailed, flinkOp.JobStateDeployFailed, flinkOp.JobStateLost: reason := fmt.Sprintf("Flink Job Failed with Error: %v", jobStatus.FailureReasons) From c1e7d1694a0e2b4a056d36c2b14e81c017460d32 Mon Sep 17 00:00:00 2001 From: live-wire Date: Wed, 22 Jun 2022 15:16:53 +0200 Subject: [PATCH 05/14] Check if job was already terminated onAbort --- pkg/flink/handler.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/flink/handler.go b/pkg/flink/handler.go index 1bb7973..e585274 100644 --- a/pkg/flink/handler.go +++ b/pkg/flink/handler.go @@ -17,6 +17,7 @@ package flink import ( "context" "fmt" + "strings" "time" "github.com/flyteorg/flyteplugins/go/tasks/errors" @@ -140,7 +141,7 @@ func (h flinkResourceHandler) OnAbort(ctx context.Context, tCtx pluginsCore.Task var abortBehavior k8s.AbortBehavior annotationPatch, err := NewAnnotationPatch(flinkOp.ControlAnnotation, flinkOp.ControlNameJobCancel) - if err != nil { + if err != nil && !strings.Contains(fmt.Sprint(err), fmt.Sprintf(flinkOp.InvalidJobStateForJobCancelMsg, flinkOp.ControlAnnotation)) { return abortBehavior, err } From a41e331efaee23535a0bc45b4849b172da3a20a9 Mon Sep 17 00:00:00 2001 From: Barna Kutassy Date: Thu, 23 Jun 2022 13:19:47 +0200 Subject: [PATCH 06/14] Remove new job status and rely on exit code --- go.mod | 2 +- go.sum | 4 ++-- pkg/flink/config.go | 2 +- pkg/flink/constants.go | 4 ++-- pkg/flink/handler.go | 21 +++++++++++---------- 5 files changed, 17 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index 647f6b7..db623aa 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/golang/protobuf v1.5.2 github.com/hashicorp/go-version v1.4.0 github.com/pkg/errors v0.9.1 - github.com/spotify/flink-on-k8s-operator v0.4.1-beta.6 + github.com/spotify/flink-on-k8s-operator v0.4.1-beta.7 gotest.tools v2.2.0+incompatible k8s.io/api v0.22.8 k8s.io/apimachinery v0.22.8 diff --git a/go.sum b/go.sum index 42e06bd..425a7b2 100644 --- a/go.sum +++ b/go.sum @@ -722,8 +722,8 @@ github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DM github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= -github.com/spotify/flink-on-k8s-operator v0.4.1-beta.6 h1:9L9uxOEM7TJ4ciGrbC5ljPSUUdwo+AvRl4rGKGeBmtI= -github.com/spotify/flink-on-k8s-operator v0.4.1-beta.6/go.mod h1:oxkVvYFg/tZ7zyl97fk5q0GOzd9LdmAsMDVNQz0WnrA= +github.com/spotify/flink-on-k8s-operator v0.4.1-beta.7 h1:yMBFHwNMRxCnWLPCP6PzlFHj4P52QmUaAVoM+9c2X1A= +github.com/spotify/flink-on-k8s-operator v0.4.1-beta.7/go.mod h1:oxkVvYFg/tZ7zyl97fk5q0GOzd9LdmAsMDVNQz0WnrA= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= diff --git a/pkg/flink/config.go b/pkg/flink/config.go index 65efc4e..495cfe7 100644 --- a/pkg/flink/config.go +++ b/pkg/flink/config.go @@ -26,7 +26,7 @@ type Config struct { LogConfig logs.LogConfig `json:"logs"` GeneratedNameMaxLength *int `json:"generatedNameMaxLength" pflag:"Specifies the length of TaskExecutionID generated name. default: 50"` RemoteClusterConfig ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for array jobs"` - NonRetriableExitCodes []int32 `json:"nonRetriableExitCodes" pfFlag:"Defines which job submitter exit codes should not be retried"` + NonRetryableExitCodes []int32 `json:"nonRetryableExitCodes" pfFlag:"Defines which job submitter exit codes should not be retried"` } func GetFlinkConfig() *Config { diff --git a/pkg/flink/constants.go b/pkg/flink/constants.go index 96946cf..456c543 100644 --- a/pkg/flink/constants.go +++ b/pkg/flink/constants.go @@ -41,7 +41,7 @@ const ( var ( regexpFlinkClusterName = regexp.MustCompile(`^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$`) generatedNameMaxLength = 50 - nonRetriableExitCodes = []int32{} + nonRetryableExitCodes = []int32{} defaultServiceAccount = "default" defaultConfig = Config{ DefaultFlinkCluster: flinkOp.FlinkCluster{ @@ -67,7 +67,7 @@ var ( }, }, GeneratedNameMaxLength: &generatedNameMaxLength, - NonRetriableExitCodes: nonRetriableExitCodes, + NonRetryableExitCodes: nonRetryableExitCodes, } flinkConfigSection = pluginsConfig.MustRegisterSubSection("flink", &defaultConfig) diff --git a/pkg/flink/handler.go b/pkg/flink/handler.go index e585274..35c90b6 100644 --- a/pkg/flink/handler.go +++ b/pkg/flink/handler.go @@ -241,9 +241,9 @@ func isSubmitterExitCodeRetryable(ctx context.Context, exitCode int32) bool { logger.Errorf(ctx, "job submitter failed: %v", exitCode) config := GetFlinkConfig() logger.Errorf(ctx, "Config: %v", config) - for _, ec := range config.NonRetriableExitCodes { + for _, ec := range config.NonRetryableExitCodes { if exitCode == ec { - logger.Errorf(ctx, "Found non-retriable exit CODE: %v", ec) + logger.Errorf(ctx, "Found non-retryable exit CODE: %v", ec) return false } } @@ -258,13 +258,6 @@ func flinkClusterJobPhaseInfo(ctx context.Context, jobStatus *flinkOp.JobStatus, switch jobStatus.State { case flinkOp.JobStateCancelled: return pluginsCore.PhaseInfoFailure(errors.DownstreamSystemError, msg, info) - case flinkOp.JobStateSubmitterFailed: - if isSubmitterExitCodeRetryable(ctx, jobStatus.SubmitterExitCode) { - reason := fmt.Sprintf("Flink jobsubmitter exited with retriable non-zero exit code: %v", jobStatus.FailureReasons) - return pluginsCore.PhaseInfoRetryableFailure(errors.DownstreamSystemError, reason, info) - } - reason := fmt.Sprintf("Flink jobsubmitter exited with non-retriable non-zero exit code: %v", jobStatus.FailureReasons) - return pluginsCore.PhaseInfoFailure(errors.DownstreamSystemError, reason, info) case flinkOp.JobStateFailed, flinkOp.JobStateDeployFailed, flinkOp.JobStateLost: reason := fmt.Sprintf("Flink Job Failed with Error: %v", jobStatus.FailureReasons) return pluginsCore.PhaseInfoRetryableFailure(errors.DownstreamSystemError, reason, info) @@ -273,7 +266,15 @@ func flinkClusterJobPhaseInfo(ctx context.Context, jobStatus *flinkOp.JobStatus, case flinkOp.JobStateUpdating, flinkOp.JobStatePending, flinkOp.JobStateDeploying, flinkOp.JobStateRestarting: return pluginsCore.PhaseInfoInitializing(occurredAt, pluginsCore.DefaultPhaseVersion, msg, info) case flinkOp.JobStateSucceeded: - return pluginsCore.PhaseInfoSuccess(info) + if jobStatus.SubmitterExitCode == 0 { + return pluginsCore.PhaseInfoSuccess(info) + } + if isSubmitterExitCodeRetryable(ctx, jobStatus.SubmitterExitCode) { + reason := fmt.Sprintf("Flink jobsubmitter exited with non-zero exit code: %v (retryable)", jobStatus.FailureReasons) + return pluginsCore.PhaseInfoRetryableFailure(errors.DownstreamSystemError, reason, info) + } + reason := fmt.Sprintf("Flink jobsubmitter exited with non-zero exit code: %v (non-retryable)", jobStatus.FailureReasons) + return pluginsCore.PhaseInfoFailure(errors.DownstreamSystemError, reason, info) default: msg := fmt.Sprintf("job id: %s with unknown state: %s", jobStatus.ID, jobStatus.State) return pluginsCore.PhaseInfoFailure(errors.DownstreamSystemError, msg, info) From 31faadadb1057a7fe82e3637e98d441b4583f371 Mon Sep 17 00:00:00 2001 From: live-wire Date: Thu, 23 Jun 2022 17:04:41 +0200 Subject: [PATCH 07/14] More logs --- pkg/flink/handler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/flink/handler.go b/pkg/flink/handler.go index 35c90b6..4a3b51f 100644 --- a/pkg/flink/handler.go +++ b/pkg/flink/handler.go @@ -141,6 +141,7 @@ func (h flinkResourceHandler) OnAbort(ctx context.Context, tCtx pluginsCore.Task var abortBehavior k8s.AbortBehavior annotationPatch, err := NewAnnotationPatch(flinkOp.ControlAnnotation, flinkOp.ControlNameJobCancel) + logger.Errorf(ctx, "Error in Annotation Patch: %s sprint[%s]", err.Error(), fmt.Sprint(err)) if err != nil && !strings.Contains(fmt.Sprint(err), fmt.Sprintf(flinkOp.InvalidJobStateForJobCancelMsg, flinkOp.ControlAnnotation)) { return abortBehavior, err } From c1adb3fe981650eebe101be2e6cc32e95e234feb Mon Sep 17 00:00:00 2001 From: live-wire Date: Thu, 23 Jun 2022 17:22:11 +0200 Subject: [PATCH 08/14] Fix panic in logging --- pkg/flink/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/flink/handler.go b/pkg/flink/handler.go index 4a3b51f..1e602f8 100644 --- a/pkg/flink/handler.go +++ b/pkg/flink/handler.go @@ -141,7 +141,7 @@ func (h flinkResourceHandler) OnAbort(ctx context.Context, tCtx pluginsCore.Task var abortBehavior k8s.AbortBehavior annotationPatch, err := NewAnnotationPatch(flinkOp.ControlAnnotation, flinkOp.ControlNameJobCancel) - logger.Errorf(ctx, "Error in Annotation Patch: %s sprint[%s]", err.Error(), fmt.Sprint(err)) + logger.Errorf(ctx, "Error in Annotation Patch: %s", fmt.Sprint(err)) if err != nil && !strings.Contains(fmt.Sprint(err), fmt.Sprintf(flinkOp.InvalidJobStateForJobCancelMsg, flinkOp.ControlAnnotation)) { return abortBehavior, err } From 99eaa7f4f7624981f140bf5bc0df8c96166855ec Mon Sep 17 00:00:00 2001 From: live-wire Date: Thu, 23 Jun 2022 17:35:54 +0200 Subject: [PATCH 09/14] Better logs --- pkg/flink/handler.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/flink/handler.go b/pkg/flink/handler.go index 1e602f8..b992152 100644 --- a/pkg/flink/handler.go +++ b/pkg/flink/handler.go @@ -141,8 +141,10 @@ func (h flinkResourceHandler) OnAbort(ctx context.Context, tCtx pluginsCore.Task var abortBehavior k8s.AbortBehavior annotationPatch, err := NewAnnotationPatch(flinkOp.ControlAnnotation, flinkOp.ControlNameJobCancel) - logger.Errorf(ctx, "Error in Annotation Patch: %s", fmt.Sprint(err)) - if err != nil && !strings.Contains(fmt.Sprint(err), fmt.Sprintf(flinkOp.InvalidJobStateForJobCancelMsg, flinkOp.ControlAnnotation)) { + if err != nil { + logger.Errorf(ctx, "Error in Annotation Patch: %v", err) + } + if err != nil && !strings.Contains(err.Error(), fmt.Sprintf(flinkOp.InvalidJobStateForJobCancelMsg, flinkOp.ControlAnnotation)) { return abortBehavior, err } From 165aca4e132ad53539de7fd0c7bde39cd030b9fc Mon Sep 17 00:00:00 2001 From: live-wire Date: Thu, 23 Jun 2022 18:58:38 +0200 Subject: [PATCH 10/14] Cleanup logging --- pkg/flink/handler.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pkg/flink/handler.go b/pkg/flink/handler.go index b992152..1dd286e 100644 --- a/pkg/flink/handler.go +++ b/pkg/flink/handler.go @@ -17,7 +17,6 @@ package flink import ( "context" "fmt" - "strings" "time" "github.com/flyteorg/flyteplugins/go/tasks/errors" @@ -141,10 +140,8 @@ func (h flinkResourceHandler) OnAbort(ctx context.Context, tCtx pluginsCore.Task var abortBehavior k8s.AbortBehavior annotationPatch, err := NewAnnotationPatch(flinkOp.ControlAnnotation, flinkOp.ControlNameJobCancel) + if err != nil { - logger.Errorf(ctx, "Error in Annotation Patch: %v", err) - } - if err != nil && !strings.Contains(err.Error(), fmt.Sprintf(flinkOp.InvalidJobStateForJobCancelMsg, flinkOp.ControlAnnotation)) { return abortBehavior, err } @@ -241,12 +238,11 @@ func flinkClusterTaskInfo(ctx context.Context, flinkCluster *flinkOp.FlinkCluste } func isSubmitterExitCodeRetryable(ctx context.Context, exitCode int32) bool { - logger.Errorf(ctx, "job submitter failed: %v", exitCode) + logger.Infof(ctx, "job submitter failed: %v", exitCode) config := GetFlinkConfig() - logger.Errorf(ctx, "Config: %v", config) for _, ec := range config.NonRetryableExitCodes { if exitCode == ec { - logger.Errorf(ctx, "Found non-retryable exit CODE: %v", ec) + logger.Infof(ctx, "Found non-retryable exit code: %v", ec) return false } } From 021c4d934db5607b797b434d73060d4b1ad9d087 Mon Sep 17 00:00:00 2001 From: live-wire Date: Mon, 27 Jun 2022 15:59:26 +0200 Subject: [PATCH 11/14] Error observed in abort --- pkg/flink/handler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/flink/handler.go b/pkg/flink/handler.go index 1dd286e..0a3a5da 100644 --- a/pkg/flink/handler.go +++ b/pkg/flink/handler.go @@ -142,6 +142,7 @@ func (h flinkResourceHandler) OnAbort(ctx context.Context, tCtx pluginsCore.Task annotationPatch, err := NewAnnotationPatch(flinkOp.ControlAnnotation, flinkOp.ControlNameJobCancel) if err != nil { + logger.Error(ctx, "error observed in abort", err) return abortBehavior, err } From 17a2e489b41f649181a2f70467d82f7f20a9fadd Mon Sep 17 00:00:00 2001 From: Barna Kutassy Date: Tue, 28 Jun 2022 10:35:21 +0200 Subject: [PATCH 12/14] Update operator version --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index db623aa..70e8ef6 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/golang/protobuf v1.5.2 github.com/hashicorp/go-version v1.4.0 github.com/pkg/errors v0.9.1 - github.com/spotify/flink-on-k8s-operator v0.4.1-beta.7 + github.com/spotify/flink-on-k8s-operator v0.4.1-beta.9 gotest.tools v2.2.0+incompatible k8s.io/api v0.22.8 k8s.io/apimachinery v0.22.8 diff --git a/go.sum b/go.sum index 425a7b2..55948f9 100644 --- a/go.sum +++ b/go.sum @@ -722,8 +722,8 @@ github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DM github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= -github.com/spotify/flink-on-k8s-operator v0.4.1-beta.7 h1:yMBFHwNMRxCnWLPCP6PzlFHj4P52QmUaAVoM+9c2X1A= -github.com/spotify/flink-on-k8s-operator v0.4.1-beta.7/go.mod h1:oxkVvYFg/tZ7zyl97fk5q0GOzd9LdmAsMDVNQz0WnrA= +github.com/spotify/flink-on-k8s-operator v0.4.1-beta.9 h1:GS+UOOIbAkbyBFTYwqoH+Id+KcuKX2Xan02hRh3CsSs= +github.com/spotify/flink-on-k8s-operator v0.4.1-beta.9/go.mod h1:oxkVvYFg/tZ7zyl97fk5q0GOzd9LdmAsMDVNQz0WnrA= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= From 2ee5de026bfaa3cca28f5e2d71dfa8620a40e353 Mon Sep 17 00:00:00 2001 From: Barnabas Kutassy Date: Tue, 28 Jun 2022 10:55:14 +0200 Subject: [PATCH 13/14] Update pkg/flink/handler.go Co-authored-by: Filipe Regadas --- pkg/flink/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/flink/handler.go b/pkg/flink/handler.go index 0a3a5da..f482291 100644 --- a/pkg/flink/handler.go +++ b/pkg/flink/handler.go @@ -239,7 +239,7 @@ func flinkClusterTaskInfo(ctx context.Context, flinkCluster *flinkOp.FlinkCluste } func isSubmitterExitCodeRetryable(ctx context.Context, exitCode int32) bool { - logger.Infof(ctx, "job submitter failed: %v", exitCode) + logger.Infof(ctx, "job submitter failed: %d", exitCode) config := GetFlinkConfig() for _, ec := range config.NonRetryableExitCodes { if exitCode == ec { From c2fe52c40e6e67575f93451f4f1c307475ba7b97 Mon Sep 17 00:00:00 2001 From: live-wire Date: Tue, 28 Jun 2022 12:00:08 +0200 Subject: [PATCH 14/14] Removed logging from helper func --- pkg/flink/handler.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/flink/handler.go b/pkg/flink/handler.go index f482291..456c5dc 100644 --- a/pkg/flink/handler.go +++ b/pkg/flink/handler.go @@ -239,7 +239,6 @@ func flinkClusterTaskInfo(ctx context.Context, flinkCluster *flinkOp.FlinkCluste } func isSubmitterExitCodeRetryable(ctx context.Context, exitCode int32) bool { - logger.Infof(ctx, "job submitter failed: %d", exitCode) config := GetFlinkConfig() for _, ec := range config.NonRetryableExitCodes { if exitCode == ec {