From 39bb2457ddfc09cfcfbb0ff6b2d90649b1f05de2 Mon Sep 17 00:00:00 2001 From: rayhaanbhikha Date: Tue, 24 May 2022 15:12:16 +0100 Subject: [PATCH 1/2] Skip FlinkCluster Defaults flag added. ValidateCreate method removed. --- pkg/flink/config.go | 11 ++--- pkg/flink/resources.go | 7 +-- pkg/flink/resources_test.go | 89 +++++++++++++++++++++++++++---------- 3 files changed, 74 insertions(+), 33 deletions(-) diff --git a/pkg/flink/config.go b/pkg/flink/config.go index dc591ad..2e259b1 100644 --- a/pkg/flink/config.go +++ b/pkg/flink/config.go @@ -21,11 +21,12 @@ import ( // Config ... Flink-specific configs type Config struct { - DefaultFlinkCluster flinkOp.FlinkCluster `json:"defaultFlinkCluster"` - FlinkPropertiesOverride map[string]string `json:"flinkPropertiesOverride" pflag:",Key value pairs of flink properties to be overridden in every FlinkJob"` - LogConfig logs.LogConfig `json:"logs"` - GeneratedNameMaxLength *int `json:"generatedNameMaxLength" pflag:"Specifies the length of TaskExecutionID generated name. default: 50"` - RemoteClusterConfig ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for array jobs"` + DefaultFlinkCluster flinkOp.FlinkCluster `json:"defaultFlinkCluster"` + SkipFlinkClusterDefaults bool `json:"skipFlinkClusterDefaults"` + FlinkPropertiesOverride map[string]string `json:"flinkPropertiesOverride" pflag:",Key value pairs of flink properties to be overridden in every FlinkJob"` + LogConfig logs.LogConfig `json:"logs"` + GeneratedNameMaxLength *int `json:"generatedNameMaxLength" pflag:"Specifies the length of TaskExecutionID generated name. default: 50"` + RemoteClusterConfig ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for array jobs"` } func GetFlinkConfig() *Config { diff --git a/pkg/flink/resources.go b/pkg/flink/resources.go index 15e5e6d..26d5cd0 100644 --- a/pkg/flink/resources.go +++ b/pkg/flink/resources.go @@ -339,11 +339,8 @@ func NewFlinkCluster(config *Config, taskCtx FlinkTaskContext) (*flinkOp.FlinkCl // fill in defaults resource := flinkOp.FlinkCluster(cluster) - resource.Default() - - err := resource.ValidateCreate() - if err != nil { - return nil, err + if !config.SkipFlinkClusterDefaults { + resource.Default() } return &resource, nil diff --git a/pkg/flink/resources_test.go b/pkg/flink/resources_test.go index 52e06df..5324af4 100644 --- a/pkg/flink/resources_test.go +++ b/pkg/flink/resources_test.go @@ -25,7 +25,9 @@ import ( flinkOp "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1" flinkIdl "github.com/spotify/flyte-flink-plugin/gen/pb-go/flyteidl-flink" "gotest.tools/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) var artifacts = []string{"gs://bucket/a.jar", "gs://bucket/b.jar", "gs://bucket/c.jar"} @@ -127,29 +129,6 @@ func TestWithPersistentVolume(t *testing.T) { assert.Assert(t, cluster.Spec.FlinkProperties[flinkIoTmpDirsProperty] == volumeClaimMountPath) } -func TestBuildFlinkClusterSpecInvalid(t *testing.T) { - job := flinkIdl.FlinkJob{ - JarFiles: artifacts, - FlinkProperties: map[string]string{ - "taskmanager.numberOfTaskSlots": "1", - }, - } - - // Use empty config - config := &Config{} - - flinkCtx := FlinkTaskContext{ - ClusterName: ClusterName("generated-name"), - Namespace: "test-namespace", - Annotations: make(map[string]string), - Labels: make(map[string]string), - Job: job, - } - - _, err := NewFlinkCluster(config, flinkCtx) - assert.Error(t, err, "image name is unspecified") -} - func TestBuildFlinkClusterSpecServiceAccount(t *testing.T) { job := flinkIdl.FlinkJob{ JarFiles: artifacts, @@ -293,9 +272,15 @@ func TestBuildFlinkClusterSpecJobCommand(t *testing.T) { func TestBuildAnnotationPatch(t *testing.T) { patch, err := NewAnnotationPatch("testKey", "testValue") + if err != nil { + t.Error(err) + } assert.Equal(t, patch.Type(), types.MergePatchType) bytes, err := patch.Data(nil) + if err != nil { + t.Error(err) + } var jsonData map[string]interface{} err = json.Unmarshal(bytes, &jsonData) @@ -308,3 +293,61 @@ func TestBuildAnnotationPatch(t *testing.T) { assert.NilError(t, err) } + +func TestPartialFlinkCluster(t *testing.T) { + parallelism := int32(10) + mainClass := "SomeClass" + jobIdl := flinkIdl.FlinkJob{ + JarFiles: artifacts, + MainClass: mainClass, + Parallelism: parallelism, + FlinkProperties: map[string]string{ + "taskmanager.numberOfTaskSlots": "1", + "metrics.reporter.promgateway.groupingKey": `namespace={{.Namespace}};cluster={{.ClusterName}};execution_id={{index .Labels "execution-id"}}`, + }, + } + + config := &Config{ + SkipFlinkClusterDefaults: true, + } + + flinkCtx := FlinkTaskContext{ + ClusterName: ClusterName("generated-name"), + Namespace: "test-namespace", + Annotations: make(map[string]string), + Labels: map[string]string{"execution-id": "1"}, + Job: jobIdl, + } + + cluster, err := NewFlinkCluster(config, flinkCtx) + if err != nil { + t.Error(err) + } + + assert.DeepEqual(t, cluster.ObjectMeta, metav1.ObjectMeta{ + Name: "generated-name", + Namespace: "test-namespace", + Labels: map[string]string{"execution-id": "1"}, + Annotations: make(map[string]string), + }) + + assert.DeepEqual(t, cluster.TypeMeta, metav1.TypeMeta{ + Kind: "FlinkCluster", + APIVersion: "flinkoperator.k8s.io/v1beta1", + }) + + assert.DeepEqual(t, cluster.Spec, flinkOp.FlinkClusterSpec{ + Job: &flinkOp.JobSpec{ + ClassName: &mainClass, + Parallelism: ¶llelism, + InitContainers: []corev1.Container{}, + PodAnnotations: map[string]string{}, + PodLabels: map[string]string{"execution-id": "1"}, + }, + EnvVars: []corev1.EnvVar{{Name: "STAGED_JARS", Value: "gs://bucket/a.jar gs://bucket/b.jar gs://bucket/c.jar"}}, + FlinkProperties: map[string]string{ + "metrics.reporter.promgateway.groupingKey": "namespace=test-namespace;cluster=generated-name;execution_id=1", + "taskmanager.numberOfTaskSlots": "1", + }, + }) +} From 4667618e0857b1c560f94dfe959faba48565eb14 Mon Sep 17 00:00:00 2001 From: rayhaanbhikha Date: Wed, 25 May 2022 10:29:36 +0100 Subject: [PATCH 2/2] removed skipFlinkClusterDefault flag and resource defaulting. --- pkg/flink/config.go | 11 +++++------ pkg/flink/resources.go | 3 --- pkg/flink/resources_test.go | 4 +--- 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/pkg/flink/config.go b/pkg/flink/config.go index 2e259b1..dc591ad 100644 --- a/pkg/flink/config.go +++ b/pkg/flink/config.go @@ -21,12 +21,11 @@ import ( // Config ... Flink-specific configs type Config struct { - DefaultFlinkCluster flinkOp.FlinkCluster `json:"defaultFlinkCluster"` - SkipFlinkClusterDefaults bool `json:"skipFlinkClusterDefaults"` - FlinkPropertiesOverride map[string]string `json:"flinkPropertiesOverride" pflag:",Key value pairs of flink properties to be overridden in every FlinkJob"` - LogConfig logs.LogConfig `json:"logs"` - GeneratedNameMaxLength *int `json:"generatedNameMaxLength" pflag:"Specifies the length of TaskExecutionID generated name. default: 50"` - RemoteClusterConfig ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for array jobs"` + DefaultFlinkCluster flinkOp.FlinkCluster `json:"defaultFlinkCluster"` + FlinkPropertiesOverride map[string]string `json:"flinkPropertiesOverride" pflag:",Key value pairs of flink properties to be overridden in every FlinkJob"` + LogConfig logs.LogConfig `json:"logs"` + GeneratedNameMaxLength *int `json:"generatedNameMaxLength" pflag:"Specifies the length of TaskExecutionID generated name. default: 50"` + RemoteClusterConfig ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for array jobs"` } func GetFlinkConfig() *Config { diff --git a/pkg/flink/resources.go b/pkg/flink/resources.go index 26d5cd0..c9de92e 100644 --- a/pkg/flink/resources.go +++ b/pkg/flink/resources.go @@ -339,9 +339,6 @@ func NewFlinkCluster(config *Config, taskCtx FlinkTaskContext) (*flinkOp.FlinkCl // fill in defaults resource := flinkOp.FlinkCluster(cluster) - if !config.SkipFlinkClusterDefaults { - resource.Default() - } return &resource, nil } diff --git a/pkg/flink/resources_test.go b/pkg/flink/resources_test.go index 5324af4..b3fca20 100644 --- a/pkg/flink/resources_test.go +++ b/pkg/flink/resources_test.go @@ -307,9 +307,7 @@ func TestPartialFlinkCluster(t *testing.T) { }, } - config := &Config{ - SkipFlinkClusterDefaults: true, - } + config := &Config{} flinkCtx := FlinkTaskContext{ ClusterName: ClusterName("generated-name"),