Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32945][SQL] Avoid collapsing projects if reaching max allowed common exprs #29950

Closed
wants to merge 12 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Oct 6, 2020

What changes were proposed in this pull request?

This patch proposes to avoid collapsing adjacent Project in query optimizer if the combined Project will duplicate too many common expressions. One SQL config spark.sql.optimizer.maxCommonExprsInCollapseProject is added to set up the maximum allowed number of common expressions.

Why are the changes needed?

In some edge cases, collapsing adjacent Project hurts performance, instead of improving it. We observed such behavior in our customer Spark jobs where one expensive expression was repeatedly duplicated many times. It is hard to have a optimizer rule that could decide whether to collapse two Projects because we don't know the cost of each expression. Currently we can provide a SQL config so users can set it up to change optimizer's behavior regarding collapsing adjacent Projects.

Note that normally in whole-stage codegen Project operator will de-duplicate expressions internally, but in edge cases Spark cannot do whole-stage codegen and fallback to interpreted mode. In such cases, users can use this config to avoid duplicate expressions.

Does this PR introduce any user-facing change?

Yes. Users can change optimizer's behavior regarding collapsing Projects by setting SQL config.

How was this patch tested?

Unit test.

@@ -766,6 +768,23 @@ object CollapseProject extends Rule[LogicalPlan] {
})
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could extend to other cases like case p @ Project(_, agg: Aggregate), but leave it untouched for now.

@SparkQA
Copy link

SparkQA commented Oct 6, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34039/

@SparkQA
Copy link

SparkQA commented Oct 6, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34039/

@maropu
Copy link
Member

maropu commented Oct 6, 2020

Related to #29094 ?

@viirya
Copy link
Member Author

viirya commented Oct 6, 2020

Related to #29094 ?

No, after did a quick scan of that PR. That PR targets driver OOM caused by too many leaf expressions in collapsed Project. Here this diff cares about duplicated common expressions in collapsed Project. Different problems, I think.

@tanelk
Copy link
Contributor

tanelk commented Oct 6, 2020

Perhaps the max number of common expressions is not the best metric here?

Lets compare two cases:

  1. On the lower project you have a JsonToStructs and on upper Project you get 3 fields from that struct. This would mean 2 redundant computations and the "metric" you are looking at is 3.

  2. On the lower project you have two JsonToStructs and on upper Project you get 2 fields from both stucts. This would also mean 2 redundant computations and the "metric" you are looking at is 2.

Adding more JsonToStructs to the lower level would increase the number redundant computations without increasing the max value.
So as an alternative I would propose "the number of redundant computations" (sum of values in the exprMap minus its size) as a metric to use.

Although I must admit, that in that case we might cache more values for the number of extra computations we save.
So both of them have their benefits.

val maxCommonExprs = SQLConf.get.maxCommonExprsInCollapseProject

if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList) ||
getLargestNumOfCommonOutput(p1.projectList, p2.projectList) >= maxCommonExprs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this comparison should be > instead of >=, because currently the actual max value is maxCommonExprs - 1.

@SparkQA
Copy link

SparkQA commented Oct 6, 2020

Test build #129432 has finished for PR 29950 at commit f418714.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Oct 6, 2020

Perhaps the max number of common expressions is not the best metric here?

Lets compare two cases:

1. On the lower project you have a `JsonToStructs` and on upper Project you get 3 fields from that struct. This would mean 2 redundant computations and the "metric" you are looking at is 3.

2. On the lower project you have two `JsonToStructs` and on upper Project you get 2 fields from both stucts. This would also mean 2 redundant computations and the "metric" you are looking at is 2.

Adding more JsonToStructs to the lower level would increase the number redundant computations without increasing the max value.
So as an alternative I would propose "the number of redundant computations" (sum of values in the exprMap minus its size) as a metric to use.

Although I must admit, that in that case we might cache more values for the number of extra computations we save.
So both of them have their benefits.

Yes, in the case you add the number of redundant computations each time you add one more JsonToStructs. But overall it should not cause noticeable performance issue because you simply three times the running cost of JsonToStructs (assume each JsonToStructs has 2 redundant computations).

The number of redundant computations is misleading. If we have 100 JsonToStructs in lower project, each of them has 2 redundant computations in upper project, it doesn't mean we 100 times the running cost of JsonToStructs. In other words, it is hard to tell the performance difference between 10 and 20 redundant computations. If the redundant computations come from the same expression, then we have 10 times v.s. 20 times running cost, but if they are from 10 expressions? We might just have 2 to 3 times running cost.

@viirya
Copy link
Member Author

viirya commented Oct 6, 2020

cc @cloud-fan @dongjoon-hyun too

@SparkQA
Copy link

SparkQA commented Oct 7, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34091/

@SparkQA
Copy link

SparkQA commented Oct 7, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34092/

@SparkQA

This comment has been minimized.

@SparkQA
Copy link

SparkQA commented Oct 7, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34091/

@SparkQA
Copy link

SparkQA commented Oct 7, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34092/

@SparkQA

This comment has been minimized.

Comment on lines 183 to 188
val query = relation.select(
JsonToStructs(schema, options, 'json).as("struct"))
.select(
GetStructField('struct, 0).as("a"),
GetStructField('struct, 1).as("b"),
GetStructField('struct, 2).as("c")).analyze
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using the dataset API, then it would be very common to chain withColumn calls:

dataset
    .withColumn("json", ...)
    .withColumn("a", col("json").getField("a"))
    .withColumn("b", col("json").getField("b"))
    .withColumn("c", col("json").getField("c"))

In that case the query should look more like this:

        val query = relation
          .select('json, JsonToStructs(schema, options, 'json).as("struct"))
          .select('json, 'struct, GetStructField('struct, 0).as("a"))
          .select('json, 'struct, 'a, GetStructField('struct, 1).as("b"))
          .select('json, 'struct, 'a, 'b, GetStructField('struct, 2).as("c"))
          .analyze

The CollapseProject rule uses transformUp. It seems that in that case we do not get the expected results from this optimization.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems can be fixed by using transformDown instead? Seems to me CollapseProject is not necessarily to use transformUp if I don't miss anything. cc @cloud-fan @maropu

Copy link
Contributor

@tanelk tanelk Oct 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a chain of projects: P1(P2(P3(P4(...)))), then using transformDown will firstly merge P1 and P2 into P12 and then it will go to its child P3 and merge it with P4 into P34. Only on the second iteration it will merge all 4 of these.

In this case we want to merge P123 and then see, that we can't merge with P4 because we would exceed maxCommonExprsInCollapseProject.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, that correct way would be using transformDown in a similar manner to recursiveRemoveSort in #21072.
So basically when you hit the first Project, then you collect all consecutive Projects until you hit the maxCommonExprsInCollapseProject limit and merge them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, it sounds fine, too. Rather, it seems a top-down transformation can collapse projects in one shot just like RemoveRedundantProjects?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we need to change to transformDown and take a recursive approach like RemoveRedundantProjects and recursiveRemoveSort for collapsing Project.

"if merging two Project, Spark SQL will skip the merging.")
.version("3.1.0")
.intConf
.createWithDefault(20)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question. Is there a reason to choose 20?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, just decide a number that seems bad for repeating an expression.

val maxCommonExprs = SQLConf.get.maxCommonExprsInCollapseProject

if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList) ||
getLargestNumOfCommonOutput(p1.projectList, p2.projectList) > maxCommonExprs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation?

@@ -124,14 +128,34 @@ object ScanOperation extends OperationHelper with PredicateHelper {
}.exists(!_.deterministic))
}

def moreThanMaxAllowedCommonOutput(
expr: Seq[NamedExpression],
Copy link
Member

@dongjoon-hyun dongjoon-hyun Oct 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation? It seems that there is one more space here.

// do not have common non-deterministic expressions, or do not have equal to/more than
// maximum allowed common outputs.
if (!hasCommonNonDeterministic(fields, aliases)
|| !moreThanMaxAllowedCommonOutput(fields, aliases)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, you may want to move || into line 157.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. thanks.

@SparkQA

This comment has been minimized.

@SparkQA
Copy link

SparkQA commented Oct 9, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34198/

@viirya
Copy link
Member Author

viirya commented Nov 1, 2020

gentle ping @dongjoon-hyun @cloud-fan

@dongjoon-hyun
Copy link
Member

Oops. Sorry for being late, @viirya .

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Nov 12, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35578/

@SparkQA
Copy link

SparkQA commented Nov 12, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35578/

@SparkQA
Copy link

SparkQA commented Nov 12, 2020

Test build #130972 has finished for PR 29950 at commit 58e71d8.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Nov 12, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Nov 12, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35589/

@SparkQA
Copy link

SparkQA commented Nov 12, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35589/

@SparkQA
Copy link

SparkQA commented Nov 12, 2020

Test build #130983 has finished for PR 29950 at commit 58e71d8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

.version("3.1.0")
.intConf
.checkValue(_ > 0, "The value of maxCommonExprsInCollapseProject must be larger than zero.")
.createWithDefault(20)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, can we introduce this configuration with Int.MaxValue in 3.1.0 first? We can reduce it later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. It is safer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@SparkQA
Copy link

SparkQA commented Nov 13, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35630/

@SparkQA
Copy link

SparkQA commented Nov 13, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35630/

@SparkQA
Copy link

SparkQA commented Nov 13, 2020

Test build #131024 has finished for PR 29950 at commit bbaae3e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

The GitHub Action's flakiness at sql - slow tests is fixed via #30365 .

} else {
p2.copy(projectList = buildCleanedProjectList(p1.projectList, p2.projectList))
}
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to change from transformUp to transformDown? If the all test passed, it would be safe if we keep the original one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the previous comment about supporting withColumn. If this is designed for that, shall we add a test case for that?

// If we collapse two Projects, `JsonToStructs` will be repeated three times.
val relation = LocalRelation('json.string)
val query1 = relation.select(
JsonToStructs(schema, options, 'json).as("struct"))
Copy link
Member

@dongjoon-hyun dongjoon-hyun Nov 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation? Maybe, the following is better?

- val query1 = relation.select(
-   JsonToStructs(schema, options, 'json).as("struct"))
-   .select(
+ val query1 = relation.select(JsonToStructs(schema, options, 'json).as("struct"))
+   .select(

"the physical planning.")
.version("3.1.0")
.intConf
.checkValue(_ > 0, "The value of maxCommonExprsInCollapseProject must be larger than zero.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

larger than zero -> positive.

})
}

// Whether the largest times common outputs from lower operator used in upper operators is
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upper operators -> upper operator?

}

// Whether the largest times common outputs from lower operator used in upper operators is
// larger than allowed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

than allowed -> than the maximum?

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Nov 25, 2020

Test build #131729 has finished for PR 29950 at commit bbaae3e.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Nov 25, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Nov 25, 2020

Test build #131780 has finished for PR 29950 at commit bbaae3e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Dec 7, 2020

I recently generalized subexpression elimination feature to interpreted project and predicate. So now both whole-stage codegen and interpreted execution support subexpression elimination that could avoid the performance issue caused by embedding common expressions from collapsing projects.

That's said, I think this patch is less useful for now. I'm closing it now.

@viirya viirya closed this Dec 7, 2020
@dongjoon-hyun
Copy link
Member

Thank you for your decision, @viirya !

@HyukjinKwon
Copy link
Member

Thanks @viirya

@viirya viirya deleted the SPARK-32945 branch December 27, 2023 18:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants