Skip to content

Commit

Permalink
update comments
Browse files Browse the repository at this point in the history
  • Loading branch information
imback82 committed Jul 12, 2020
1 parent a0366f2 commit 99493e4
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
}

/**
* Recursively reorders the join keys based on the partitioning. It starts reordering
* keys to match HashPartitioning on either side, followed by PartitioningCollection.
* Recursively reorders the join keys based on partitioning. It starts reordering the
* join keys to match HashPartitioning on either side, followed by PartitioningCollection.
*/
private def reorderJoinKeysRecursively(
leftKeys: Seq[Expression],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -943,37 +943,6 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
}
}

test("terry - hashpartitioning") {
withTable("t1", "t2") {
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0",
SQLConf.SHUFFLE_PARTITIONS.key -> "4") {
val df1 = (0 until 10).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val df2 = (0 until 10).map(i => (i % 7, i % 11)).toDF("i2", "j2")

df1.write.format("parquet").bucketBy(4, "i1", "j1").saveAsTable("t1")
df2.write.format("parquet").bucketBy(4, "i2", "j2").saveAsTable("t2")

val t1 = spark.table("t1")
val t2 = spark.table("t2")
val join = t1.join(t2, t1("i1") === t2("j2") && t1("i1") === t2("i2"))
join.explain
}
}
}


test("terry - collectionpartition") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
val df1 = (0 until 100).map(i => (i % 5, i % 13, i.toString)).toDF("i1", "j1", "k1")
val df2 = (0 until 100).map(i => (i % 7, i % 11, i.toString)).toDF("i2", "j2", "k2")
val df3 = (0 until 100).map(i => (i % 5, i % 13, i.toString)).toDF("i3", "j3", "k3")
val join = df1.join(df2, df1("i1") === df2("i2") && df1("j1") === df2("j2"))
val join2 = join.join(df3, join("j1") === df3("j3") && join("i1") === df3("i3"))
join2.explain
}
}

test("bucket coalescing is applied when join expressions match with partitioning expressions") {
withTable("t1", "t2") {
df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1")
Expand Down

0 comments on commit 99493e4

Please sign in to comment.