Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34906] Refactor TreeNode's children handling methods into specialized traits #31932

Conversation

dbaliafroozeh
Copy link
Contributor

@dbaliafroozeh dbaliafroozeh commented Mar 22, 2021

What changes were proposed in this pull request?

Spark query plan node hierarchy has specialized traits (or abstract classes) for handling nodes with fixed number of children, for example UnaryExpression, UnaryNode and UnaryExec for representing an expression, a logical plan and a physical plan with only one child, respectively. This PR refactors the TreeNode hierarchy by extracting the children handling functionality into the following traits. UnaryExpression and other similar classes now extend the corresponding new trait:

trait LeafLike[T <: TreeNode[T]] { self: TreeNode[T] =>
  override final def children: Seq[T] = Nil
}

trait UnaryLike[T <: TreeNode[T]] { self: TreeNode[T] =>
  def child: T
  @transient override final lazy val children: Seq[T] = child :: Nil
}

trait BinaryLike[T <: TreeNode[T]] { self: TreeNode[T] =>
  def left: T
  def right: T
  @transient override final lazy val children: Seq[T] = left :: right :: Nil
}

trait TernaryLike[T <: TreeNode[T]] { self: TreeNode[T] =>
  def first: T
  def second: T
  def third: T
  @transient override final lazy val children: Seq[T] = first :: second :: third :: Nil
}

This refactoring, which is part of a bigger effort to make tree transformations in Spark more efficient, has two benefits:

  • It moves the children handling methods to a single place, instead of being spread in specific subclasses, which will help the future optimizations for tree traversals.
  • It allows to mix in these traits with some concrete node types that could not extend the previous classes. For example, expressions with one child that extend AggregateFunction cannot extend UnaryExpression as AggregateFunction defines the foldable method final while UnaryExpression defines it as non final. With the new traits, we can directly extend the concrete class from UnaryLike in these cases. Classes with more specific child handling will make tree traversal methods faster.

In this PR we have also updated many concrete node types to extend these traits to benefit from more specific child handling.

Why are the changes needed?

Does this PR introduce any user-facing change?

How was this patch tested?

This is a refactoring, passes existing tests.

@github-actions github-actions bot added the SQL label Mar 22, 2021
@SparkQA
Copy link

SparkQA commented Mar 22, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40944/

@SparkQA
Copy link

SparkQA commented Mar 22, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40944/

@SparkQA
Copy link

SparkQA commented Mar 22, 2021

Test build #136360 has finished for PR 31932 at commit 8928cce.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Grouping(child: Expression) extends Expression with Unevaluable

@SparkQA
Copy link

SparkQA commented Mar 23, 2021

Test build #136409 has finished for PR 31932 at commit b27bb4d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 23, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40996/

@SparkQA
Copy link

SparkQA commented Mar 23, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40996/

@SparkQA
Copy link

SparkQA commented Mar 29, 2021

Test build #136655 has finished for PR 31932 at commit 1ac45f5.

  • This patch fails Scala style tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
  • case class UseStatement(isNamespaceSet: Boolean, nameParts: Seq[String]) extends LeafParsedStatement
  • case class ShowCurrentNamespaceStatement() extends LeafParsedStatement
  • trait V2WriteCommand extends UnaryCommand
  • trait V2PartitionCommand extends UnaryCommand
  • case class RefreshTable(child: LogicalPlan) extends UnaryCommand
  • case class ShowCurrentNamespace(catalogManager: CatalogManager) extends LeafCommand
  • case class CommentOnNamespace(child: LogicalPlan, comment: String) extends UnaryCommand
  • case class CommentOnTable(child: LogicalPlan, comment: String) extends UnaryCommand
  • case class RefreshFunction(child: LogicalPlan) extends UnaryCommand
  • case class DescribeFunction(child: LogicalPlan, isExtended: Boolean) extends UnaryCommand
  • case class RecoverPartitions(child: LogicalPlan) extends UnaryCommand
  • case class TruncateTable(table: LogicalPlan) extends UnaryCommand
  • trait RunnableCommand extends LeafCommand

@SparkQA
Copy link

SparkQA commented Mar 29, 2021

Test build #136660 has finished for PR 31932 at commit 6e3f54a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • sealed trait SpecialFrameBoundary extends LeafExpression with Unevaluable
  • case class RowNumber() extends RowNumberLike with LeafLike[Expression]
  • case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction with LeafLike[Expression]
  • case class Assignment(key: Expression, value: Expression) extends Expression

@SparkQA
Copy link

SparkQA commented Mar 29, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41242/

@SparkQA
Copy link

SparkQA commented Mar 29, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41242/

@SparkQA
Copy link

SparkQA commented Mar 30, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41256/

@SparkQA
Copy link

SparkQA commented Mar 30, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41256/

@SparkQA
Copy link

SparkQA commented Mar 30, 2021

Test build #136674 has finished for PR 31932 at commit 3c0e507.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class UnaryExpression extends Expression with UnaryLike[Expression]
  • abstract class BinaryExpression extends Expression with BinaryLike[Expression]
  • abstract class TernaryExpression extends Expression with TernaryLike[Expression]

@dbaliafroozeh dbaliafroozeh changed the title [WIP] Introduce specialized traits for TreeNode children handling [WIP] Refactor TreeNode's children handling methods into specialized traits Mar 30, 2021
@dbaliafroozeh dbaliafroozeh changed the title [WIP] Refactor TreeNode's children handling methods into specialized traits [SPARK-34906] Refactor TreeNode's children handling methods into specialized traits Mar 30, 2021
Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hvanhovell
Copy link
Contributor

Merging to master. Thanks!

@@ -32,7 +33,8 @@ import org.apache.spark.sql.types.{DataType, IntegerType}
* df.writeTo("catalog.db.table").partitionedBy($"category", days($"timestamp")).create()
* }}}
*/
abstract class PartitionTransformExpression extends Expression with Unevaluable {
abstract class PartitionTransformExpression extends Expression with Unevaluable
with UnaryLike[Expression] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -34,12 +35,11 @@ import org.apache.spark.sql.types._
""",
group = "agg_funcs",
since = "1.0.0")
case class Average(child: Expression) extends DeclarativeAggregate with ImplicitCastInputTypes {
case class Average(child: Expression) extends DeclarativeAggregate with ImplicitCastInputTypes
with UnaryLike[Expression] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 space indent

@@ -34,10 +35,12 @@ import org.apache.spark.sql.types.{AbstractDataType, BooleanType, DataType, Long
""",
group = "agg_funcs",
since = "3.0.0")
case class CountIf(predicate: Expression) extends UnevaluableAggregate with ImplicitCastInputTypes {
case class CountIf(predicate: Expression) extends UnevaluableAggregate with ImplicitCastInputTypes
with UnaryLike[Expression] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 space indent


override def children: Seq[Expression] = child :: Nil
case class Sum(child: Expression) extends DeclarativeAggregate with ImplicitCastInputTypes
with UnaryLike[Expression] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 space indent

import org.apache.spark.sql.types.{AbstractDataType, DataType, IntegralType}

abstract class BitAggregate extends DeclarativeAggregate with ExpectsInputTypes {
abstract class BitAggregate extends DeclarativeAggregate with ExpectsInputTypes
with UnaryLike[Expression] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 space indent

@@ -33,12 +34,11 @@ import org.apache.spark.sql.types._
* We have to store all the collected elements in memory, and so notice that too many elements
* can cause GC paused and eventually OutOfMemory Errors.
*/
abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImperativeAggregate[T] {
abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImperativeAggregate[T]
with UnaryLike[Expression] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 space indent

@@ -101,11 +102,11 @@ case class Rollup(groupByExprs: Seq[Expression]) extends GroupingSet {}
since = "2.0.0",
group = "agg_funcs")
// scalastyle:on line.size.limit line.contains.tab
case class Grouping(child: Expression) extends Expression with Unevaluable {
case class Grouping(child: Expression) extends Expression with Unevaluable
with UnaryLike[Expression] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 space indent

@@ -734,10 +741,12 @@ case class NthValue(input: Expression, offset: Expression, ignoreNulls: Boolean)
since = "2.0.0",
group = "window_funcs")
// scalastyle:on line.size.limit line.contains.tab
case class NTile(buckets: Expression) extends RowNumberLike with SizeBasedWindowFunction {
case class NTile(buckets: Expression) extends RowNumberLike with SizeBasedWindowFunction
with UnaryLike[Expression] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 space indent

@@ -428,10 +430,12 @@ case class InsertAction(
override def children: Seq[Expression] = condition.toSeq ++ assignments
}

case class Assignment(key: Expression, value: Expression) extends Expression with Unevaluable {
case class Assignment(key: Expression, value: Expression) extends Expression
with Unevaluable with BinaryLike[Expression] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 space indent

@@ -27,9 +28,10 @@ import org.apache.spark.sql.types._
* When applied on empty data (i.e., count is zero), it returns NULL.
*/
abstract class Covariance(x: Expression, y: Expression, nullOnDivideByZero: Boolean)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can simply do val left: Expression, val right: Expression here.

@dbaliafroozeh
Copy link
Contributor Author

@MaxGekk @cloud-fan I'll do your suggestions in a follow up PR

hvanhovell pushed a commit that referenced this pull request Apr 7, 2021
…dren handling methods into specialized traits

### What changes were proposed in this pull request?

This is a followup for #31932.
In this PR we:
- Introduce the `QuaternaryLike` trait for node types with 4 children.
- Specialize more node types
- Fix a number of style errors that were introduced in the original PR.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

This is a refactoring, passes existing tests.

Closes #32065 from dbaliafroozeh/FollowupSPARK-34906.

Authored-by: Ali Afroozeh <[email protected]>
Signed-off-by: herman <[email protected]>
hvanhovell pushed a commit that referenced this pull request Apr 9, 2021
…ren methods

### What changes were proposed in this pull request?
One of the main performance bottlenecks in query compilation is overly-generic tree transformation methods, namely `mapChildren` and `withNewChildren` (defined in `TreeNode`). These methods have an overly-generic implementation to iterate over the children and rely on reflection to create new instances. We have observed that, especially for queries with large query plans, a significant amount of CPU cycles are wasted in these methods. In this PR we make these methods more efficient, by delegating the iteration and instantiation to concrete node types. The benchmarks show that we can expect significant performance improvement in total query compilation time in queries with large query plans (from 30-80%) and about 20% on average.

#### Problem detail
The `mapChildren` method in `TreeNode` is overly generic and costly. To be more specific, this method:
- iterates over all the fields of a node using Scala’s product iterator. While the iteration is not reflection-based, thanks to the Scala compiler generating code for `Product`, we create many anonymous functions and visit many nested structures (recursive calls).
The anonymous functions (presumably compiled to Java anonymous inner classes) also show up quite high on the list in the object allocation profiles, so we are putting unnecessary pressure on GC here.
- does a lot of comparisons. Basically for each element returned from the product iterator, we check if it is a child (contained in the list of children) and then transform it. We can avoid that by just iterating over children, but in the current implementation, we need to gather all the fields (only transform the children) so that we can instantiate the object using the reflection.
- creates objects using reflection, by delegating to the `makeCopy` method, which is several orders of magnitude slower than using the constructor.

#### Solution
The proposed solution in this PR is rather straightforward: we rewrite the `mapChildren` method using the `children` and `withNewChildren` methods. The default `withNewChildren` method suffers from the same problems as `mapChildren` and we need to make it more efficient by specializing it in concrete classes.  Similar to how each concrete query plan node already defines its children, it should also define how they can be constructed given a new list of children. Actually, the implementation is quite simple in most cases and is a one-liner thanks to the copy method present in Scala case classes. Note that we cannot abstract over the copy method, it’s generated by the compiler for case classes if no other type higher in the hierarchy defines it. For most concrete nodes, the implementation of `withNewChildren` looks like this:
```
override def withNewChildren(newChildren: Seq[LogicalPlan]): LogicalPlan = copy(children = newChildren)
```
The current `withNewChildren` method has two properties that we should preserve:

- It returns the same instance if the provided children are the same as its children, i.e., it preserves referential equality.
- It copies tags and maintains the origin links when a new copy is created.

These properties are hard to enforce in the concrete node type implementation. Therefore, we propose a template method `withNewChildrenInternal` that should be rewritten by the concrete classes and let the `withNewChildren` method take care of referential equality and copying:
```
override def withNewChildren(newChildren: Seq[LogicalPlan]): LogicalPlan = {
 if (childrenFastEquals(children, newChildren)) {
   this
 } else {
   CurrentOrigin.withOrigin(origin) {
     val res = withNewChildrenInternal(newChildren)
     res.copyTagsFrom(this)
     res
   }
 }
}
```

With the refactoring done in a previous PR (#31932) most tree node types fall in one of the categories of `Leaf`, `Unary`, `Binary` or `Ternary`. These traits have a more efficient implementation for `mapChildren` and define a more specialized version of `withNewChildrenInternal` that avoids creating unnecessary lists. For example, the `mapChildren` method in `UnaryLike` is defined as follows:
```
  override final def mapChildren(f: T => T): T = {
    val newChild = f(child)
    if (newChild fastEquals child) {
      this.asInstanceOf[T]
    } else {
      CurrentOrigin.withOrigin(origin) {
        val res = withNewChildInternal(newChild)
        res.copyTagsFrom(this.asInstanceOf[T])
        res
      }
    }
  }
```

#### Results
With this PR, we have observed significant performance improvements in query compilation time, more specifically in the analysis and optimization phases. The table below shows the TPC-DS queries that had more than 25% speedup in compilation times. Biggest speedups are observed in queries with large query plans.
| Query  | Speedup |
| ------------- | ------------- |
|q4    |29%|
|q9    |81%|
|q14a  |31%|
|q14b  |28%|
|q22   |33%|
|q33   |29%|
|q34   |25%|
|q39   |27%|
|q41   |27%|
|q44   |26%|
|q47   |28%|
|q48   |76%|
|q49   |46%|
|q56   |26%|
|q58   |43%|
|q59   |46%|
|q60   |50%|
|q65   |59%|
|q66   |46%|
|q67   |52%|
|q69   |31%|
|q70   |30%|
|q96   |26%|
|q98   |32%|

#### Binary incompatibility
Changing the `withNewChildren` in `TreeNode` breaks the binary compatibility of the code compiled against older versions of Spark because now it is expected that concrete `TreeNode` subclasses all implement the `withNewChildrenInternal` method. This is a problem, for example, when users write custom expressions. This change is the right choice, since it forces all newly added expressions to Catalyst implement it in an efficient manner and will prevent future regressions.
Please note that we have not completely removed the old implementation and renamed it to `legacyWithNewChildren`. This method will be removed in the future and for now helps the transition. There are expressions such as `UpdateFields` that have a complex way of defining children. Writing `withNewChildren` for them requires refactoring the expression. For now, these expressions use the old, slow method. In a future PR we address these expressions.

### Does this PR introduce _any_ user-facing change?

This PR does not introduce user facing changes but my break binary compatibility of the code compiled against older versions. See the binary compatibility section.

### How was this patch tested?

This PR is mainly a refactoring and passes existing tests.

Closes #32030 from dbaliafroozeh/ImprovedMapChildren.

Authored-by: Ali Afroozeh <[email protected]>
Signed-off-by: herman <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants