Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
imback82 committed Aug 7, 2020
1 parent e5b078f commit 89ad6ef
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,11 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
leftPartitioning: Partitioning,
rightPartitioning: Partitioning): (Seq[Expression], Seq[Expression]) = {
if (leftKeys.forall(_.deterministic) && rightKeys.forall(_.deterministic)) {
reorderJoinKeysRecursively(leftKeys, rightKeys, leftPartitioning, rightPartitioning)
reorderJoinKeysRecursively(
leftKeys,
rightKeys,
Some(leftPartitioning),
Some(rightPartitioning))
.getOrElse((leftKeys, rightKeys))
} else {
(leftKeys, rightKeys)
Expand All @@ -190,27 +194,27 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
private def reorderJoinKeysRecursively(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
leftPartitioning: Partitioning,
rightPartitioning: Partitioning): Option[(Seq[Expression], Seq[Expression])] = {
leftPartitioning: Option[Partitioning],
rightPartitioning: Option[Partitioning]): Option[(Seq[Expression], Seq[Expression])] = {
(leftPartitioning, rightPartitioning) match {
case (HashPartitioning(leftExpressions, _), _) =>
case (Some(HashPartitioning(leftExpressions, _)), _) =>
reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, leftExpressions, leftKeys)
.orElse(reorderJoinKeysRecursively(
leftKeys, rightKeys, UnknownPartitioning(0), rightPartitioning))
case (_, HashPartitioning(rightExpressions, _)) =>
leftKeys, rightKeys, None, rightPartitioning))
case (_, Some(HashPartitioning(rightExpressions, _))) =>
reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, rightExpressions, rightKeys)
.orElse(reorderJoinKeysRecursively(
leftKeys, rightKeys, leftPartitioning, UnknownPartitioning(0)))
case (PartitioningCollection(partitionings), _) =>
leftKeys, rightKeys, leftPartitioning, None))
case (Some(PartitioningCollection(partitionings)), _) =>
partitionings.foreach { p =>
reorderJoinKeysRecursively(leftKeys, rightKeys, p, rightPartitioning).map { k =>
reorderJoinKeysRecursively(leftKeys, rightKeys, Some(p), rightPartitioning).map { k =>
return Some(k)
}
}
reorderJoinKeysRecursively(leftKeys, rightKeys, UnknownPartitioning(0), rightPartitioning)
case (_, PartitioningCollection(partitionings)) =>
reorderJoinKeysRecursively(leftKeys, rightKeys, None, rightPartitioning)
case (_, Some(PartitioningCollection(partitionings))) =>
partitionings.foreach { p =>
reorderJoinKeysRecursively(leftKeys, rightKeys, leftPartitioning, p).map { k =>
reorderJoinKeysRecursively(leftKeys, rightKeys, leftPartitioning, Some(p)).map { k =>
return Some(k)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
private val exprB = Literal(2)
private val exprC = Literal(3)

test("EnsureRequirements.reorder should handle PartitioningCollection") {
test("reorder should handle PartitioningCollection") {
val plan1 = DummySparkPlan(
outputPartitioning = PartitioningCollection(Seq(
HashPartitioning(exprA :: exprB :: Nil, 5),
Expand Down Expand Up @@ -86,7 +86,7 @@ class EnsureRequirementsSuite extends SharedSparkSession {
}
}

test("EnsureRequirements.reorder should fallback to the other side partitioning") {
test("reorder should fallback to the other side partitioning") {
val plan1 = DummySparkPlan(
outputPartitioning = HashPartitioning(exprA :: exprB :: exprC :: Nil, 5))
val plan2 = DummySparkPlan(
Expand Down

0 comments on commit 89ad6ef

Please sign in to comment.