Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30298][SQL] Respect aliases in output partitioning of projects and aggregates #26943

Closed
wants to merge 13 commits into from

Conversation

imback82
Copy link
Contributor

What changes were proposed in this pull request?

Currently, in the following scenario, bucket join is not utilized:

val df = (0 until 20).map(i => (i, i)).toDF("i", "j").as("df")
df.write.format("parquet").bucketBy(8, "i").saveAsTable("t")
sql("CREATE VIEW v AS SELECT * FROM t")
sql("SELECT * FROM t a JOIN v b ON a.i = b.i").explain
== Physical Plan ==
*(4) SortMergeJoin [i#13], [i#15], Inner
:- *(1) Sort [i#13 ASC NULLS FIRST], false, 0
:  +- *(1) Project [i#13, j#14]
:     +- *(1) Filter isnotnull(i#13)
:        +- *(1) ColumnarToRow
:           +- FileScan parquet default.t[i#13,j#14] Batched: true, DataFilters: [isnotnull(i#13)], Format: Parquet, Location: InMemoryFileIndex[file:..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int>, SelectedBucketsCount: 8 out of 8
+- *(3) Sort [i#15 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i#15, 8), true, [id=#64] <----- Exchange node introduced
      +- *(2) Project [i#13 AS i#15, j#14 AS j#16]
         +- *(2) Filter isnotnull(i#13)
            +- *(2) ColumnarToRow
               +- FileScan parquet default.t[i#13,j#14] Batched: true, DataFilters: [isnotnull(i#13)], Format: Parquet, Location: InMemoryFileIndex[file:..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int>, SelectedBucketsCount: 8 out of 8

Notice that Exchange is present. This is because Project introduces aliases and outputPartitioning and requiredChildDistribution do not consider aliases while considering bucket join in EnsureRequirements. This PR addresses to allow this scenario.

Why are the changes needed?

This allows bucket join to be utilized in the above example.

Does this PR introduce any user-facing change?

Yes, now with the fix, the explain out is as follows:

== Physical Plan ==
*(3) SortMergeJoin [i#13], [i#15], Inner
:- *(1) Sort [i#13 ASC NULLS FIRST], false, 0
:  +- *(1) Project [i#13, j#14]
:     +- *(1) Filter isnotnull(i#13)
:        +- *(1) ColumnarToRow
:           +- FileScan parquet default.t[i#13,j#14] Batched: true, DataFilters: [isnotnull(i#13)], Format: Parquet, Location: InMemoryFileIndex[file:.., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int>, SelectedBucketsCount: 8 out of 8
+- *(2) Sort [i#15 ASC NULLS FIRST], false, 0
   +- *(2) Project [i#13 AS i#15, j#14 AS j#16]
      +- *(2) Filter isnotnull(i#13)
         +- *(2) ColumnarToRow
            +- FileScan parquet default.t[i#13,j#14] Batched: true, DataFilters: [isnotnull(i#13)], Format: Parquet, Location: InMemoryFileIndex[file:.., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int>, SelectedBucketsCount: 8 out of 8

Note that the Exchange is no longer present.

How was this patch tested?

@SparkQA
Copy link

SparkQA commented Dec 19, 2019

Test build #115540 has finished for PR 26943 at commit d3559b4.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@imback82
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Dec 19, 2019

Test build #115545 has finished for PR 26943 at commit d3559b4.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@imback82
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Dec 19, 2019

Test build #115576 has finished for PR 26943 at commit d3559b4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Dec 20, 2019

Have you checked SPARK-25951, SPARK-19981, and SPARK-19468? I think the other project-like operators (e.g., aggregate) has the same issue. see also #22957

@imback82
Copy link
Contributor Author

imback82 commented Dec 20, 2019

Thanks @maropu for the info! Adding the same outputPartitioning logic to HashAggregateExec correctly handles cases reported in #22957 and #17400. I can make this more generic similar to what was proposed in those PRs. But could you explain why those two PRs never got merged? Especially, this PR seems to be very similar to #17400 in terms of how Alias is being removed. :)

@maropu
Copy link
Member

maropu commented Dec 20, 2019

I think we just couldn't reach a consensus about how-to-fix for this issue at that time.

@imback82
Copy link
Contributor Author

I see. @maropu Do you mind if I take your ideas from #17400 like AliasAwareOutputPartitioning and put them here? I would like to try one more time with the fix if possible.

@maropu
Copy link
Member

maropu commented Dec 20, 2019

yea, of course not! You can feel free to take them over.

@imback82
Copy link
Contributor Author

imback82 commented Jan 3, 2020

@cloud-fan @gatorsmile @viirya This addresses the same issues brought up in #22957 and #17400. I understand those two PRs didn't get merged, but wanted to give another shot at it. (We had few customers asking why bucket join was not respected when aliases were used). Could you help reviewing? Thanks in advance!

@SparkQA
Copy link

SparkQA commented Jan 3, 2020

Test build #116062 has finished for PR 26943 at commit c24789d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait AliasAwareOutputPartitioning extends UnaryExecNode

@imback82
Copy link
Contributor Author

imback82 commented Jan 3, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jan 3, 2020

Test build #116066 has finished for PR 26943 at commit c24789d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait AliasAwareOutputPartitioning extends UnaryExecNode

@cloud-fan
Copy link
Contributor

looks fine to me. Shall we also consider some corner cases like a + 1 as b and the child's output partitioning is hash(a + 1)?

@SparkQA
Copy link

SparkQA commented Jan 7, 2020

Test build #116197 has finished for PR 26943 at commit fcd2186.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 7, 2020

Test build #116198 has finished for PR 26943 at commit 1762a96.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Jan 7, 2020

I'll check later and thanks for the rework, @imback82! cc: @mgaido91


final override def outputPartitioning: Partitioning = {
child.outputPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a corner case is that: the child output partitioning is a + 1 and the project list has a + 1 as b, then the final output partitioning should be b.

I'm not sure how common it is, maybe it's fine to ignore it.

case other => other
}
HashPartitioning(newExpressions, numPartitions)
case other => other
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the other partitioning cases, e.g., range?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartitioningCollection is constructed as PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning)), so aliases should have been already removed if the partitioning was HashPartitioning. But we could add one similar to your solution (https://github.com/apache/spark/pull/17400/files#diff-342789ab9c8c0154b412dd1c719c9397R82-R86) for future proof.

For RangePartitioning, your change (https://github.com/apache/spark/pull/17400/files#diff-342789ab9c8c0154b412dd1c719c9397R72-R78) makes sense, but I couldn't come up with an actual example to test against. Do you have one in mind?

withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
withTable("t") {
withView("v") {
val df = (0 until 20).map(i => (i, i)).toDF("i", "j").as("df")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We need .as("df") for this test? I think you can just write it like;

spark.range(20).selectExpr("id as i", "id as j").write.bucketBy(8, "I").saveAsTable("t")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

val plan1 = sql("SELECT * FROM t a JOIN t b ON a.i = b.i").queryExecution.executedPlan
assert(plan1.collect { case exchange: ShuffleExchangeExec => exchange }.isEmpty)

val plan2 = sql("SELECT * FROM t a JOIN v b ON a.i = b.i").queryExecution.executedPlan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this test means? This test can improve the test coverage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove plan1 which is a benign case.

@SparkQA
Copy link

SparkQA commented Jan 23, 2020

Test build #117276 has finished for PR 26943 at commit b877de7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

| (SELECT key AS k from df2) t2
|ON t1.k = t2.k
""".stripMargin).queryExecution.executedPlan
val exchanges = planned.collect { case s: ShuffleExchangeExec => s }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was confused about why only one shuffle, then realized it's exchange reuse.

Can we join different data frames? e.g. spark.range(10) and spark.range(20).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing that out. I updated it and it now generates two ShuffleExchangeExec instead of four.

| (SELECT key + 1 AS k2 from df2) t2
|ON t1.k1 = t2.k2
|""".stripMargin).queryExecution.executedPlan
val exchanges = planned.collect { case s: ShuffleExchangeExec => s }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except a minor comments for the test

@SparkQA
Copy link

SparkQA commented Jan 23, 2020

Test build #117319 has finished for PR 26943 at commit fbafedf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu maropu changed the title [SPARK-30298][SQL] Bucket join should work for self-join with views [SPARK-30298][SQL] Respect aliases in output partitioning of projects and aggregates Jan 23, 2020
Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks nice and thanks for the work, @imback82 !

@maropu maropu closed this in 4847f73 Jan 23, 2020
@maropu
Copy link
Member

maropu commented Jan 23, 2020

Thanks! Merged to master.

@maropu
Copy link
Member

maropu commented Jan 23, 2020

@imback82 I think outputOrdering has the same issue and do you wanna work on that, too?
#17400 (comment)

@imback82
Copy link
Contributor Author

imback82 commented Jan 24, 2020

@imback82 I think outputOrdering has the same issue and do you wanna work on that, too?
#17400 (comment)

Yes, I will work on it.

Thanks @cloud-fan and @maropu for review and guidance!

@maropu
Copy link
Member

maropu commented Jan 24, 2020

Yes, I will work on it.

Thanks in advance, @imback82 !

@maropu
Copy link
Member

maropu commented Mar 6, 2020

@imback82 I think outputOrdering has the same issue and do you wanna work on that, too?
#17400 (comment)

Yes, I will work on it.

@imback82 You're still working on that?

@imback82
Copy link
Contributor Author

imback82 commented Mar 6, 2020

@imback82 You're still working on that?

Yes! I tried the failing example in the comment, but that was working fine in the latest Spark. I will look into this further.

@maropu
Copy link
Member

maropu commented Mar 6, 2020

yea, thanks!

@imback82
Copy link
Contributor Author

imback82 commented Mar 7, 2020

@maropu I created a PR for outputOrdering: #27842

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants