Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-15076][SQL] Add ReorderAssociativeOperator optimizer #12850

Closed
wants to merge 5 commits into from
Closed

[SPARK-15076][SQL] Add ReorderAssociativeOperator optimizer #12850

wants to merge 5 commits into from

Conversation

dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented May 2, 2016

What changes were proposed in this pull request?

This issue add a new optimizer ReorderAssociativeOperator by taking advantage of integral associative property. Currently, Spark works like the following.

  1. Can optimize 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + a into 45 + a.
  2. Cannot optimize a + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9.

This PR can handle Case 2 for Add/Multiply expression whose data types are ByteType, ShortType, IntegerType, and LongType. The followings are the plan comparison between before and after this issue.

Before

scala> sql("select a+1+2+3+4+5+6+7+8+9 from (select explode(array(1)) a)").explain
== Physical Plan ==
WholeStageCodegen
:  +- Project [(((((((((a#7 + 1) + 2) + 3) + 4) + 5) + 6) + 7) + 8) + 9) AS (((((((((a + 1) + 2) + 3) + 4) + 5) + 6) + 7) + 8) + 9)#8]
:     +- INPUT
+- Generate explode([1]), false, false, [a#7]
   +- Scan OneRowRelation[]
scala> sql("select a*1*2*3*4*5*6*7*8*9 from (select explode(array(1)) a)").explain
== Physical Plan ==
*Project [(((((((((a#18 * 1) * 2) * 3) * 4) * 5) * 6) * 7) * 8) * 9) AS (((((((((a * 1) * 2) * 3) * 4) * 5) * 6) * 7) * 8) * 9)#19]
+- Generate explode([1]), false, false, [a#18]
   +- Scan OneRowRelation[]

After

scala> sql("select a+1+2+3+4+5+6+7+8+9 from (select explode(array(1)) a)").explain
== Physical Plan ==
WholeStageCodegen
:  +- Project [(a#7 + 45) AS (((((((((a + 1) + 2) + 3) + 4) + 5) + 6) + 7) + 8) + 9)#8]
:     +- INPUT
+- Generate explode([1]), false, false, [a#7]
   +- Scan OneRowRelation[]
scala> sql("select a*1*2*3*4*5*6*7*8*9 from (select explode(array(1)) a)").explain
== Physical Plan ==
*Project [(a#18 * 362880) AS (((((((((a * 1) * 2) * 3) * 4) * 5) * 6) * 7) * 8) * 9)#19]
+- Generate explode([1]), false, false, [a#18]
   +- Scan OneRowRelation[]

This PR is greatly generalized by @cloud-fan 's key ideas; he should be credited for the work he did.

How was this patch tested?

Pass the Jenkins tests including new testsuite.

@SparkQA
Copy link

SparkQA commented May 3, 2016

Test build #57567 has finished for PR 12850 at commit 71c3c73.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 4, 2016

Test build #57781 has finished for PR 12850 at commit 6898e0a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 6, 2016

Test build #58006 has finished for PR 12850 at commit a4a3ce3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 8, 2016

Test build #58113 has finished for PR 12850 at commit 3802255.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Rebased to see the result on re-enable hive queries.

@SparkQA
Copy link

SparkQA commented May 10, 2016

Test build #58246 has finished for PR 12850 at commit 06e9b36.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 13, 2016

Test build #58519 has finished for PR 12850 at commit 18f5a8a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 16, 2016

Test build #58648 has finished for PR 12850 at commit 0b60464.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 19, 2016

Test build #58875 has finished for PR 12850 at commit 65c7db7.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Rebased to trigger Jenkins test again.

@SparkQA
Copy link

SparkQA commented May 19, 2016

Test build #58884 has finished for PR 12850 at commit 0ffd004.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 22, 2016

Test build #59114 has finished for PR 12850 at commit eeae56d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Hi, @marmbrus and @rxin .
Could you review this PR when you have some time?

@SparkQA
Copy link

SparkQA commented May 27, 2016

Test build #59531 has finished for PR 12850 at commit 8c8ea7a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// Use associative property for integral type
case e if e.isInstanceOf[BinaryArithmetic] && e.dataType.isInstanceOf[IntegralType]
=> e match {
case Add(Add(a, b), c) if b.foldable && c.foldable => Add(a, Add(b, c))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about a + 1 + b + 2? I think we need a more general approach, like reordering the Add nodes to put all literals together.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review, @cloud-fan !
I see. That sounds great.
Let me think about how to eliminate all constants then.

@SparkQA
Copy link

SparkQA commented May 31, 2016

Test build #59645 has finished for PR 12850 at commit 8956a1e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented May 31, 2016

Hi, @cloud-fan .
Could you review again?
Now, this PR provides a more generalized way to handle all foldable constants in any orders.

@@ -742,6 +742,23 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
* equivalent [[Literal]] values.
*/
object ConstantFolding extends Rule[LogicalPlan] {
private def isAssociativelyFoldable(e: Expression): Boolean =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to ReorderJoin, we should have a new rule ReorderAssociativeOperator to do this optimization, instead of putting it in ConstantFolding.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that could be.

There is some difference on level of granulity.

Join-related optimizers might be improved later to cost-based optimizers while ConstantFolder optimizer is just about removing constants on a single expression.

Do you think it is a good idea to put the different levels of concerns together?

I can do this in any way you decide. :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is OK, BooleanSimplification is also kind of constant folding but we made a new rule for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I see!

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-15076][SQL] Improve ConstantFolding optimizer by using integral associative property [SPARK-15076][SQL] Add ReorderAssociativeOperator optimizer May 31, 2016
@dongjoon-hyun
Copy link
Member Author

Hi, @cloud-fan .
Now, I made a new rule ReorderAssociativeOperator as you recommended.
Jira issue and PR description are updated together, too.

@SparkQA
Copy link

SparkQA commented May 31, 2016

Test build #59691 has finished for PR 12850 at commit 4e4845c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 31, 2016

Test build #59690 has finished for PR 12850 at commit d022904.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 1, 2016

Test build #59700 has finished for PR 12850 at commit 2ebc53c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Hi, @cloud-fan .
It's ready for review again.
Could you review this when you have some time?
Thank you always!

@cloud-fan
Copy link
Contributor

I discussed it with @davies offline and here is our conclusion:

  1. This feature is not that important, as users can always do it manually, i.e. change the add/multiply order, which is not a lot effort.
  2. When we have this future, users lose the control of the execution order. e.g. they may add UDFs and other literals together and they want a deterministic execution order.
  3. other corner cases like overflow

In general, we think this feature brings too much nondeterminacy compared to the benefits it brings. What do you think?

@dongjoon-hyun
Copy link
Member Author

Thank you for deep discussion on this. I think like this.

For 1), there are machine-generated queries by BI tools. This is an important category of queries. In many cases, BIs (or other tools having UI) will generated queries by simple rules and those rule does not care about the output queries. The optimization is the role of DBMS or Spark. So, static optimizations are always important. This PR also minimizes the size of generated codes, too.

For 2), other optimizers already remove or duplicate UDFs. Spark dose not give the control of the execution order. As you know, we already made the conclusion to leave an explicit note like the following for this (in SPARK-15282 and #13087).

Note that the user-defined functions must be deterministic. Due to optimization,
duplicate invocations may be eliminated or the function may even be invoked more times than
it is present in the query.

For 3), could you give some problematic real cases? This PR reordered only addition or multiplications, but I think this PR does not change the final result value. The following is the behavior of current Spark. (Not this PR. You can see that in the physical plan.)

scala> sql("select 2147483640 + a + 7 from (select explode(array(1,2,3)) a)").explain()
== Physical Plan ==
*Project [((2147483640 + a#8) + 7) AS ((2147483640 + a) + 7)#9]
+- Generate explode([1,2,3]), false, false, [a#8]
   +- Scan OneRowRelation[]

scala> sql("select 2147483640 + a + 7 from (select explode(array(1,2,3)) a)").collect()
res1: Array[org.apache.spark.sql.Row] = Array([-2147483648], [-2147483647], [-2147483646])

scala>  sql("select a + 2147483647 from (select explode(array(1,2,3)) a)").collect()
res2: Array[org.apache.spark.sql.Row] = Array([-2147483648], [-2147483647], [-2147483646])

scala> sql("select 214748364 * a from (select explode(array(1,2,3)) a)").collect()
res3: Array[org.apache.spark.sql.Row] = Array([214748364], [429496728], [644245092])

scala> sql("select 214748364 * a * 10 from (select explode(array(1,2,3)) a)").collect()
res4: Array[org.apache.spark.sql.Row] = Array([2147483640], [-16], [2147483624])

scala> sql("select a * 2147483640 from (select explode(array(1,2,3)) a)").collect()
res5: Array[org.apache.spark.sql.Row] = Array([2147483640], [-16], [2147483624])

Apparently, the optimization of this PR will work like the above.

@dongjoon-hyun
Copy link
Member Author

Hi, @cloud-fan and @davies .
How do you think about the above?

@cloud-fan
Copy link
Contributor

UDF is the first thing I came out, and yes, it must be deterministic. But as we have the deterministic property in Expression, I think it's possible for users to create non-deterministic expressions, e.g. ScalaUDAF, or other API we may create in the future, then the execution order matters.

You can still improve this PR to handle non-deterministic cases, but that will make this PR more complex and harder to reason about, which may not worth.

cc @davies

@dongjoon-hyun
Copy link
Member Author

Thank you for feedback. I'm really happy with your attention!
For the non-deterministic part, we can add a single condition in isAssociativelyFoldable.
If some of operand of the expression is non-deterministic, the whole expression is non-deterministic. It's easy.
Also, it's future-proof. In the future, although we make NondeterministicScalaUDF whose deterministic==false, this optimizer will not handle the expressions containing it.

@dongjoon-hyun
Copy link
Member Author

I added the missing part, e.deterministic check in isAssociativelyFoldable.

}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan => q transformExpressionsDown {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

def flattenAdd(e: Expression): Seq[Expression] = e match {
  case Add(l, r) => flattenAdd(l) ++ flattenAdd(r)
  case other => other
}

...
plan transformAllExpressions {
  case a: Add if a.deterministic && a.dataType.isInstanceOf[IntegralType] =>
    val (foldables, others) => flattenAdd(a).partition(_.foldable)
    if (foldables.size > 1) {
      val foldableExpr = foldables.reduce(Add(_, _))
      val c = Literal.create(foldableExpr.eval(), a.dataType)
      if (others.isEmpty) c else Add(others.reduce(Add(_, _)), c)
    } else {
      a
    }
}

We can duplicate some code for Multiply, and I think this maybe more readable than the current version.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. That could be.
We also need to add isSingleOperatorExpr there.
Otherwise, flattenAdd(Add(Multiply(1, 2), 3)) -> (3).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flattenAdd(Add(Multiply(1, 2), 3)) will become [Multiply(1, 2), 3], and we won't get wrong result

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. You generalize my PR again! Great!

@cloud-fan
Copy link
Contributor

looks like it's not such difficult to handle all cases, this optimization LGTM

'b * 1 * 2 * 3 * 4,
'a + 1 + 'b + 2 + 'c + 3,
Rand(0) * 1 * 2 * 3 * 4)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already added non-deterministic case here.

@dongjoon-hyun
Copy link
Member Author

Thank you for reconsidering this PR positively. I'll update soon according to your advice.

@dongjoon-hyun
Copy link
Member Author

@cloud-fan .
According to your advice, I refactored the code and added mixed(addition+multiplication) testcases. Also, the PR description is updated.
Thank you so much again.

@SparkQA
Copy link

SparkQA commented Jun 2, 2016

Test build #59788 has finished for PR 12850 at commit 8b7a0bf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 2, 2016

Test build #59795 has finished for PR 12850 at commit 0acb157.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case other => other :: Nil
}

def apply(plan: LogicalPlan): LogicalPlan = plan transformExpressionsDown {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should do:

plan transform {
    case q: LogicalPlan => q transformExpressionsDown {
      ......
    }
}

or here we just optimize the top level plan.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad. I changed this in a hurry. I'll fix soon.

@cloud-fan
Copy link
Contributor

cc @davies , can you take a look?

@rxin
Copy link
Contributor

rxin commented Jun 2, 2016

BTW it goes without saying ... if you do decide to merge this, don't merge it in branch-2.0.

@SparkQA
Copy link

SparkQA commented Jun 2, 2016

Test build #59824 has finished for PR 12850 at commit 3959d57.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 63b7f12 Jun 2, 2016
@dongjoon-hyun
Copy link
Member Author

Oh, thank you! @cloud-fan .

@dongjoon-hyun dongjoon-hyun deleted the SPARK-15076 branch July 20, 2016 07:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants