diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index 96d23ba7557b3..c565f914ab677 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -681,26 +681,29 @@ case class SortMergeJoinExec( val cond = BindReferences.bindReference( condition.get, streamedPlan.output ++ bufferedPlan.output).genCode(ctx) // Evaluate the columns those used by condition before loop - val before = - s""" - |boolean $loaded = false; - |$streamedBefore - """.stripMargin - - val loadStreamed = - s""" - |if (!$loaded) { - | $loaded = true; - | $streamedAfter - |} + val before = joinType match { + case LeftAnti => + // No need to initialize `loaded` variable for Left Anti join. + streamedBefore.trim + case _ => + s""" + |boolean $loaded = false; + |$streamedBefore """.stripMargin + } val loadStreamedAfterCondition = joinType match { case LeftAnti => // No need to evaluate columns not used by condition from streamed side, as for Left Anti // join, streamed row with match is not outputted. "" - case _ => loadStreamed + case _ => + s""" + |if (!$loaded) { + | $loaded = true; + | $streamedAfter + |} + """.stripMargin } val loadBufferedAfterCondition = joinType match { @@ -722,7 +725,7 @@ case class SortMergeJoinExec( |$loadStreamedAfterCondition |$loadBufferedAfterCondition """.stripMargin - (before, checking.trim, loadStreamed) + (before, checking.trim, streamedAfter.trim) } else { (evaluateVariables(streamedVars), "", "") }