Skip to content

Commit

Permalink
Merge pull request #169 from rayhaanbhikha/rayhaanb/skip-default-conf…
Browse files Browse the repository at this point in the history
…ig-flag

Skip FlinkCluster Defaults flag added
  • Loading branch information
jto authored May 25, 2022
2 parents 4d6f83b + 4667618 commit e97751c
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 29 deletions.
6 changes: 0 additions & 6 deletions pkg/flink/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,12 +339,6 @@ func NewFlinkCluster(config *Config, taskCtx FlinkTaskContext) (*flinkOp.FlinkCl

// fill in defaults
resource := flinkOp.FlinkCluster(cluster)
resource.Default()

err := resource.ValidateCreate()
if err != nil {
return nil, err
}

return &resource, nil
}
Expand Down
87 changes: 64 additions & 23 deletions pkg/flink/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
flinkOp "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1"
flinkIdl "github.com/spotify/flyte-flink-plugin/gen/pb-go/flyteidl-flink"
"gotest.tools/assert"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var artifacts = []string{"gs://bucket/a.jar", "gs://bucket/b.jar", "gs://bucket/c.jar"}
Expand Down Expand Up @@ -127,29 +129,6 @@ func TestWithPersistentVolume(t *testing.T) {
assert.Assert(t, cluster.Spec.FlinkProperties[flinkIoTmpDirsProperty] == volumeClaimMountPath)
}

func TestBuildFlinkClusterSpecInvalid(t *testing.T) {
job := flinkIdl.FlinkJob{
JarFiles: artifacts,
FlinkProperties: map[string]string{
"taskmanager.numberOfTaskSlots": "1",
},
}

// Use empty config
config := &Config{}

flinkCtx := FlinkTaskContext{
ClusterName: ClusterName("generated-name"),
Namespace: "test-namespace",
Annotations: make(map[string]string),
Labels: make(map[string]string),
Job: job,
}

_, err := NewFlinkCluster(config, flinkCtx)
assert.Error(t, err, "image name is unspecified")
}

func TestBuildFlinkClusterSpecServiceAccount(t *testing.T) {
job := flinkIdl.FlinkJob{
JarFiles: artifacts,
Expand Down Expand Up @@ -293,9 +272,15 @@ func TestBuildFlinkClusterSpecJobCommand(t *testing.T) {

func TestBuildAnnotationPatch(t *testing.T) {
patch, err := NewAnnotationPatch("testKey", "testValue")
if err != nil {
t.Error(err)
}
assert.Equal(t, patch.Type(), types.MergePatchType)

bytes, err := patch.Data(nil)
if err != nil {
t.Error(err)
}

var jsonData map[string]interface{}
err = json.Unmarshal(bytes, &jsonData)
Expand All @@ -308,3 +293,59 @@ func TestBuildAnnotationPatch(t *testing.T) {

assert.NilError(t, err)
}

func TestPartialFlinkCluster(t *testing.T) {
parallelism := int32(10)
mainClass := "SomeClass"
jobIdl := flinkIdl.FlinkJob{
JarFiles: artifacts,
MainClass: mainClass,
Parallelism: parallelism,
FlinkProperties: map[string]string{
"taskmanager.numberOfTaskSlots": "1",
"metrics.reporter.promgateway.groupingKey": `namespace={{.Namespace}};cluster={{.ClusterName}};execution_id={{index .Labels "execution-id"}}`,
},
}

config := &Config{}

flinkCtx := FlinkTaskContext{
ClusterName: ClusterName("generated-name"),
Namespace: "test-namespace",
Annotations: make(map[string]string),
Labels: map[string]string{"execution-id": "1"},
Job: jobIdl,
}

cluster, err := NewFlinkCluster(config, flinkCtx)
if err != nil {
t.Error(err)
}

assert.DeepEqual(t, cluster.ObjectMeta, metav1.ObjectMeta{
Name: "generated-name",
Namespace: "test-namespace",
Labels: map[string]string{"execution-id": "1"},
Annotations: make(map[string]string),
})

assert.DeepEqual(t, cluster.TypeMeta, metav1.TypeMeta{
Kind: "FlinkCluster",
APIVersion: "flinkoperator.k8s.io/v1beta1",
})

assert.DeepEqual(t, cluster.Spec, flinkOp.FlinkClusterSpec{
Job: &flinkOp.JobSpec{
ClassName: &mainClass,
Parallelism: &parallelism,
InitContainers: []corev1.Container{},
PodAnnotations: map[string]string{},
PodLabels: map[string]string{"execution-id": "1"},
},
EnvVars: []corev1.EnvVar{{Name: "STAGED_JARS", Value: "gs://bucket/a.jar gs://bucket/b.jar gs://bucket/c.jar"}},
FlinkProperties: map[string]string{
"metrics.reporter.promgateway.groupingKey": "namespace=test-namespace;cluster=generated-name;execution_id=1",
"taskmanager.numberOfTaskSlots": "1",
},
})
}

0 comments on commit e97751c

Please sign in to comment.