Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19981][SQL] Respect aliases in output partitioning of projects and aggregates #17400

Closed
wants to merge 3 commits into from

Conversation

maropu
Copy link
Member

@maropu maropu commented Mar 23, 2017

What changes were proposed in this pull request?

The current master might wrongly add shuffle operations when projects and aggregates in physical plans have aliases in output expressions. A concrete example is as follows;

scala> :paste
sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
spark.range(10).selectExpr("id AS key", "0").repartition($"key").createOrReplaceTempView("df1")
spark.range(10).selectExpr("id AS key", "0").repartition($"key").createOrReplaceTempView("df2")
sql("""
  SELECT * FROM
    (SELECT key AS k from df1) t1
  INNER JOIN
    (SELECT key AS k from df2) t2
  ON t1.k = t2.k
""").explain

== Physical Plan ==
*SortMergeJoin [k#56L], [k#57L], Inner
:- *Sort [k#56L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(k#56L, 200)     // <--- Unnecessary ShuffleExchange
:     +- *Project [key#39L AS k#56L]
:        +- Exchange hashpartitioning(key#39L, 200)
:           +- *Project [id#36L AS key#39L]
:              +- *Range (0, 10, step=1, splits=Some(4))
+- *Sort [k#57L ASC NULLS FIRST], false, 0
   +- ReusedExchange [k#57L], Exchange hashpartitioning(k#56L, 200)

In the query, the second Exchange is not necessary. The root cause is that the planner wrongly regards key and k as different attributes because they have different exprId. Then, it fails distribution requirement checks in EnsureRequirements. This pr proposes to handle these aliases in EnsureRequirements so as to check if the operators satisfy their output distribution requirements.

How was this patch tested?

Added tests in SQLQueryTestSuite.

@SparkQA
Copy link

SparkQA commented Mar 23, 2017

Test build #75104 has finished for PR 17400 at commit b5d1038.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MasterDDT
Copy link
Contributor

MasterDDT commented Mar 28, 2017

Just curious, how come the fix is not in this code?

https://github.com/maropu/spark/blob/b5d1038edffff5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41

So anywhere we compare expressions for semantic equality, we can say a#1 is the same as a#1 as b#2? (where 1 and 2 are the expressionIds)

@MasterDDT
Copy link
Contributor

MasterDDT commented Mar 28, 2017

Here is a sort example with 1 partition. I believe the extra sort on newA is unnecessary.

scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", "b").coalesce(1).sortWithinPartitions("a")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int]

scala> val df2 = df1.selectExpr("a as newA", "b")
df2: org.apache.spark.sql.DataFrame = [newA: int, b: int]

scala> println(df1.join(df2, df1("a") === df2("newA")).queryExecution.executedPlan)
*SortMergeJoin [args=[a#37225], [newA#37232], Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, newA#37232:int%NONNULL, b#37243:int%NONNULL)]
:- *Sort [args=[a#37225 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)]
:  +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)]
:     +- LocalTableScan [args=[a#37225, b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)]
+- *Sort [args=[newA#37232 ASC], false, 0][outPart=SinglePartition][outOrder=List(newA#37232 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
   +- *Project [args=[a#37242 AS newA#37232, b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
      +- *Sort [args=[a#37242 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)]
         +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)]
            +- LocalTableScan [args=[a#37242, b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)]```

@maropu
Copy link
Member Author

maropu commented Mar 28, 2017

ISTM the solution you suggested does not work because the planner actually compares references (that is, AttributeReferences output by child.outputPartioning) instead of Alias (See related code for details). So, IMO we cannot depend on semantically-equal checks you suggested.

@allengeorge
Copy link

I suggest the following code for outputOrdering:

override def outputOrdering: Seq[SortOrder] = child.outputOrdering.map {
    case s @ SortOrder(e, _) =>
      s.copy(child = maybeReplaceExpr(e))
    case s =>
      s
  }```

@maropu
Copy link
Member Author

maropu commented Apr 17, 2017

@allengeorge yea, we could there. But, I think we should first make sure about how to fix this issue. I'm not sure that the approach of this pr is the best. cc: @gatorsmile

@maropu maropu force-pushed the SPARK-19981 branch 2 times, most recently from 91a412e to 0492c0f Compare May 10, 2017 08:03
@SparkQA
Copy link

SparkQA commented May 10, 2017

Test build #76738 has finished for PR 17400 at commit 91a412e.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 10, 2017

Test build #76740 has finished for PR 17400 at commit 0492c0f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented May 16, 2017

ping

@@ -36,6 +36,12 @@ import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)
extends UnaryExecNode with CodegenSupport {

@transient private lazy val aliasesInProjects = projectList.flatMap(_.collectFirst {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we fix the issue in EnsureRequirements? Aggregate operators can also introduce alias.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, ok. I'll consider again.

@maropu maropu changed the title [SPARK-19981][SQL] Update output partitioning info. in ProjectExec when having aliases [SPARK-19981][SQL] Update output partitioning info. when children having aliases May 17, 2017
@SparkQA
Copy link

SparkQA commented May 17, 2017

Test build #77030 has finished for PR 17400 at commit ac3ffe4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class AggregateExec extends UnaryExecNode
  • case class ProjectExec(

@maropu
Copy link
Member Author

maropu commented May 18, 2017

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented May 18, 2017

Test build #77053 has finished for PR 17400 at commit 49a1732.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class AggregateExec extends UnaryExecNode
  • case class ProjectExec(

@maropu
Copy link
Member Author

maropu commented May 20, 2017

@gatorsmile ping

@maropu
Copy link
Member Author

maropu commented May 23, 2017

ping

@SparkQA
Copy link

SparkQA commented Jul 11, 2017

Test build #79520 has finished for PR 17400 at commit a7dd063.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class AggregateExec extends UnaryExecNode
  • case class ProjectExec(

@SparkQA
Copy link

SparkQA commented Dec 18, 2017

Test build #85034 has finished for PR 17400 at commit e6ce117.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class AggregateExec extends UnaryExecNode
  • case class ProjectExec(

@eyalfa
Copy link

eyalfa commented Aug 14, 2018

@maropu , any reason why this is on hold for so long?

@@ -321,6 +321,58 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
}
}

private def updatePartitioningByAliases(exprs: Seq[NamedExpression], partioning: Partitioning)
: Partitioning = {
val aliasSeq = exprs.flatMap(_.collectFirst {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might do more than you'd like it to (at least if it behaves the way I understand collect first), i.e.
df.select($"x" as "x1, struct($"a" as "a1", $"b" as "b1") as "s1")

x1 and s1 are aliases, a1 and b1 are not. it could even get more complicated if there was an a1 alias in the top level projections list.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pr only focuses on aliases, so the point you described above is out-of-scope in this pr. IMO more complicated cases should be fixed in follow-ups.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu , I didn't aim for supporting complex partitioning expressions (which deserves its own separate PR), I meant that this code could introduce regressions by 'over-capturing' nested aliases.

  • my specific example is wrong since struct is transformed into a named struct (alias is replaced by an explicit name).

@maropu
Copy link
Member Author

maropu commented Aug 14, 2018

I think that's because the priority is not much high. This issue causes any problem in your query?

@eyalfa
Copy link

eyalfa commented Aug 14, 2018

@maropu , yes it does :-)

@maropu
Copy link
Member Author

maropu commented Aug 14, 2018

If possible, could you describe that problem in your case to encourage this work?

@eyalfa
Copy link

eyalfa commented Aug 14, 2018

in my use case, I aggregate a dataset, the use select to align columns with a case-class. I later try to join the resulting dataset based on the same columns used for aggregattion.
the join introduces shuffles (exchange nodes).

@SparkQA
Copy link

SparkQA commented Aug 21, 2018

Test build #94999 has finished for PR 17400 at commit 1813738.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 21, 2018

Test build #95009 has finished for PR 17400 at commit 089f218.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu maropu changed the title [SPARK-19981][SQL] Update output partitioning info. when children having aliases [SPARK-19981][SQL] Respect aliases in output partitioning of projects and aggregates Aug 21, 2018
@maropu maropu force-pushed the SPARK-19981 branch 2 times, most recently from e288288 to ec3e6d9 Compare August 22, 2018 03:31
@SparkQA
Copy link

SparkQA commented Aug 22, 2018

Test build #95079 has finished for PR 17400 at commit ec3e6d9.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 22, 2018

Test build #95078 has finished for PR 17400 at commit e288288.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 22, 2018

Test build #95080 has finished for PR 17400 at commit c67d11a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 22, 2018

Test build #95097 has finished for PR 17400 at commit 91809e5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait AliasAwareOutputPartitioning extends UnaryExecNode

@SparkQA
Copy link

SparkQA commented Aug 22, 2018

Test build #95102 has finished for PR 17400 at commit 5482b1b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait AliasAwareOutputPartitioning extends UnaryExecNode

@maropu
Copy link
Member Author

maropu commented Sep 14, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Sep 14, 2018

Test build #96061 has finished for PR 17400 at commit 5482b1b.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait AliasAwareOutputPartitioning extends UnaryExecNode

@maropu
Copy link
Member Author

maropu commented Sep 17, 2018

retest this please

import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.plans.physical._

trait AliasAwareOutputPartitioning extends UnaryExecNode {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need a general utility class for this. cc @maryannxue She did the similar things for the other projects in the past. Maybe @maryannxue can help deliver such a utility class?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll wait for @maryannxue suggestion.

@SparkQA
Copy link

SparkQA commented Sep 18, 2018

Test build #96161 has finished for PR 17400 at commit 5482b1b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait AliasAwareOutputPartitioning extends UnaryExecNode

@maropu
Copy link
Member Author

maropu commented Sep 18, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Sep 18, 2018

Test build #96164 has finished for PR 17400 at commit 5482b1b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait AliasAwareOutputPartitioning extends UnaryExecNode

@SparkQA
Copy link

SparkQA commented Oct 22, 2018

Test build #97746 has finished for PR 17400 at commit 5482b1b.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
  • trait AliasAwareOutputPartitioning extends UnaryExecNode

@SparkQA
Copy link

SparkQA commented Oct 22, 2018

Test build #97753 has finished for PR 17400 at commit 5482b1b.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
  • trait AliasAwareOutputPartitioning extends UnaryExecNode

@SparkQA
Copy link

SparkQA commented Oct 22, 2018

Test build #97788 has finished for PR 17400 at commit 5482b1b.

  • This patch fails build dependency tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
  • trait AliasAwareOutputPartitioning extends UnaryExecNode

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants