Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2213] [SQL] sort merge join for spark sql #5208

Closed
wants to merge 33 commits into from

Conversation

adrian-wang
Copy link
Contributor

Thanks for the initial work from @Ishiihara in #3173

This PR introduce a new join method of sort merge join, which firstly ensure that keys of same value are in the same partition, and inside each partition the Rows are sorted by key. Then we can run down both sides together, find matched rows using sort merge join. In this way, we don't have to store the whole hash table of one side as hash join, thus we have less memory usage. Also, this PR would benefit from #3438 , making the sorting phrase much more efficient.

We introduced a new configuration of "spark.sql.planner.sortMergeJoin" to switch between this(true) and ShuffledHashJoin(false), probably we want the default value of it be false at first.

@SparkQA
Copy link

SparkQA commented Mar 26, 2015

Test build #29224 has started for PR 5208 at commit b87df90.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Mar 26, 2015

Test build #29224 has finished for PR 5208 at commit b87df90.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredOrderedDistribution(clustering: Seq[Expression])
    • case class HashSortedPartitioning(expressions: Seq[Expression], numPartitions: Int)
    • case class SortMergeJoin(

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29224/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Mar 30, 2015

Test build #29382 has started for PR 5208 at commit cb1e18d.

@SparkQA
Copy link

SparkQA commented Mar 30, 2015

Test build #29382 has finished for PR 5208 at commit cb1e18d.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredOrderedDistribution(clustering: Seq[Expression])
    • case class HashSortedPartitioning(expressions: Seq[Expression], numPartitions: Int)
    • case class SortMergeJoin(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29382/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Mar 30, 2015

Test build #29383 has started for PR 5208 at commit 6df9f01.

@SparkQA
Copy link

SparkQA commented Mar 30, 2015

Test build #29383 has finished for PR 5208 at commit 6df9f01.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredOrderedDistribution(clustering: Seq[Expression])
    • case class HashSortedPartitioning(expressions: Seq[Expression], numPartitions: Int)
    • case class SortMergeJoin(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29383/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 1, 2015

Test build #29530 has started for PR 5208 at commit d7bfe07.

@SparkQA
Copy link

SparkQA commented Apr 1, 2015

Test build #29530 has finished for PR 5208 at commit d7bfe07.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredOrderedDistribution(clustering: Seq[Expression])
    • case class HashSortedPartitioning(expressions: Seq[Expression], numPartitions: Int)
    • case class SortMergeJoin(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29530/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 1, 2015

Test build #29532 has started for PR 5208 at commit c34c96e.

@SparkQA
Copy link

SparkQA commented Apr 1, 2015

Test build #29532 has finished for PR 5208 at commit c34c96e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredOrderedDistribution(clustering: Seq[Expression])
    • case class HashSortedPartitioning(expressions: Seq[Expression], numPartitions: Int)
    • case class SortMergeJoin(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29532/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 1, 2015

Test build #29533 has started for PR 5208 at commit f5f81db.

@SparkQA
Copy link

SparkQA commented Apr 1, 2015

Test build #29533 has finished for PR 5208 at commit f5f81db.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredOrderedDistribution(clustering: Seq[Expression])
    • case class HashSortedPartitioning(expressions: Seq[Expression], numPartitions: Int)
    • case class SortMergeJoin(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29533/
Test FAILed.

@adrian-wang
Copy link
Contributor Author

I am not getting this error locally... what's wrong?

@adrian-wang
Copy link
Contributor Author

This exception only exists on current master, I didn't get this locally because I was working on a March-26 master. This could be a potential bug we introduced during this period.

cc @chenghao-intel

@chenghao-intel
Copy link
Contributor

From the log, seems the output fields of the PhysicalRDD changed its order, can you rebase against the latest code and try again in your local?

== Physical Plan ==
Project [b#2957,a#2959]
 SortMergeJoin [a#2956], [b#2960], Inner
  Exchange (HashSortedPartitioning [a#2956], 200)
   PhysicalRDD [b#2957,a#2956], MapPartitionsRDD[1584] at map at FilteredScanSuite.scala:85
  Exchange (HashSortedPartitioning [b#2960], 200)
   PhysicalRDD [a#2959,b#2960], MapPartitionsRDD[1587] at map at FilteredScanSuite.scala:85

@adrian-wang
Copy link
Contributor Author

yes, after rebase i can see this exception

@SparkQA
Copy link

SparkQA commented Apr 2, 2015

Test build #29584 has started for PR 5208 at commit 7a869c5.

@SparkQA
Copy link

SparkQA commented Apr 2, 2015

Test build #29584 has finished for PR 5208 at commit 7a869c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ClusteredOrderedDistribution(clustering: Seq[Expression])
    • case class HashSortedPartitioning(expressions: Seq[Expression], numPartitions: Int)
    • case class SortMergeJoin(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29584/
Test PASSed.

@adrian-wang
Copy link
Contributor Author

* By default it will choose sort merge join.
*/
private[spark] def autoSortMergeJoin: Boolean =
getConf(AUTO_SORTMERGEJOIN, true.toString).toBoolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make it false as default, the SMJ should be experimental feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, just use true for Jenkins testing.


if (meetsRequirements && compatible) {
val withSort = if (needSort) {
Sort(rowOrdering, global = false, withShuffle)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like what we do in SparkStrategies, use execution.ExternalSort when sqlContext.conf.externalSortEnabled is true.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30309 has started for PR 5208 at commit f515cd2.

case (UnspecifiedDistribution, Seq(), child) =>
child
case (UnspecifiedDistribution, rowOrdering, child) =>
Sort(rowOrdering, global = false, child)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use execution.ExternalSort when sqlContext.conf.externalSortEnabled is true.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30315 has started for PR 5208 at commit f91a2ae.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30309 has finished for PR 5208 at commit f515cd2.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Exchange(
    • case class SortMergeJoin(
  • This patch adds the following new dependencies:
    • snappy-java-1.1.1.7.jar
  • This patch removes the following dependencies:
    • snappy-java-1.1.1.6.jar

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30309/
Test FAILed.

@@ -87,7 +126,12 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
implicit val ordering = new RowOrdering(sortingExpressions, child.output)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this line is redundant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I see... For RangePartitioner..

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30315 has finished for PR 5208 at commit f91a2ae.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Exchange(
    • case class SortMergeJoin(
  • This patch adds the following new dependencies:
    • commons-math3-3.4.1.jar
    • snappy-java-1.1.1.7.jar
  • This patch removes the following dependencies:
    • commons-math3-3.1.1.jar
    • snappy-java-1.1.1.6.jar

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30315/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30319 has started for PR 5208 at commit 5049d88.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30319 has finished for PR 5208 at commit 5049d88.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Exchange(
    • case class SortMergeJoin(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30319/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30321 has started for PR 5208 at commit 2493b9f.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30304 has finished for PR 5208 at commit ec8061b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Exchange(
    • case class SortMergeJoin(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30304/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Apr 15, 2015

Test build #30321 has finished for PR 5208 at commit 2493b9f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Exchange(
    • case class SortMergeJoin(
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30321/
Test PASSed.

@marmbrus
Copy link
Contributor

I manually fixed the conflicts while merging to master. Thanks! I'm excited to test out the performance of this new feature :)

@asfgit asfgit closed this in 585638e Apr 15, 2015
@adrian-wang
Copy link
Contributor Author

Thanks!

@justmytwospence
Copy link

Is this feature limited to equi-joins?

@adrian-wang
Copy link
Contributor Author

@justmytwospence yes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants