Skip to content

Commit

Permalink
Spark features should not drop spark configs if no features are found (
Browse files Browse the repository at this point in the history
  • Loading branch information
catalinii authored Feb 9, 2021
1 parent 8c66bdd commit 275483f
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
7 changes: 5 additions & 2 deletions go/tasks/plugins/k8s/spark/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,24 +217,27 @@ func (sparkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCo
func addConfig(sparkConfig map[string]string, key string, value string) {

if strings.ToLower(strings.TrimSpace(value)) != "true" {
sparkConfig[key] = value
return
}

matches := featureRegex.FindAllStringSubmatch(key, -1)
if len(matches) == 0 || len(matches[0]) == 0 {
sparkConfig[key] = value
return
}
featureName := matches[0][len(matches[0])-1]

// Use the first matching feature in-case of duplicates.
for _, feature := range GetSparkConfig().Features {
if feature.Name == featureName {
for k, v := range feature.SparkConfig {
sparkConfig[k] = v
}
break
return
}

}
sparkConfig[key] = value
}

// Convert SparkJob ApplicationType to Operator CRD ApplicationType
Expand Down
4 changes: 3 additions & 1 deletion go/tasks/plugins/k8s/spark/spark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
"spark.executor.memory": "500M",
"spark.flyte.feature1.enabled": "true",
"spark.lyft.feature2.enabled": "true",
"spark.lyft.feature3.enabled": "true",
}

dummyEnvVars = []*core.KeyValuePair{
Expand Down Expand Up @@ -391,7 +392,7 @@ func TestBuildResourceSpark(t *testing.T) {
for confKey, confVal := range dummySparkConf {
exists := false

if featureRegex.MatchString(confKey) {
if featureRegex.MatchString(confKey) && confKey != "spark.lyft.feature3.enabled" {
match := featureRegex.FindAllStringSubmatch(confKey, -1)
feature := match[0][len(match[0])-1]
assert.True(t, feature == "feature1" || feature == "feature2")
Expand All @@ -417,6 +418,7 @@ func TestBuildResourceSpark(t *testing.T) {
assert.Equal(t, dummySparkConf["spark.driver.cores"], sparkApp.Spec.SparkConf["spark.kubernetes.driver.limit.cores"])
assert.Equal(t, dummySparkConf["spark.executor.cores"], sparkApp.Spec.SparkConf["spark.kubernetes.executor.limit.cores"])
assert.Greater(t, len(sparkApp.Spec.SparkConf["spark.kubernetes.driverEnv.FLYTE_START_TIME"]), 1)
assert.Equal(t, dummySparkConf["spark.lyft.feature3.enabled"], sparkApp.Spec.SparkConf["spark.lyft.feature3.enabled"])

assert.Equal(t, len(sparkApp.Spec.Driver.EnvVars["FLYTE_MAX_ATTEMPTS"]), 1)

Expand Down

0 comments on commit 275483f

Please sign in to comment.