Skip to content

Commit

Permalink
[SPARK-48817][SQL] Eagerly execute union multi commands together
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Eagerly execute union multi commands together.

### Why are the changes needed?
MultiInsert is split to multiple sql executions, resulting in no exchange reuse.

Reproduce sql:

```
create table wangzhen_t1(c1 int);
create table wangzhen_t2(c1 int);
create table wangzhen_t3(c1 int);
insert into wangzhen_t1 values (1), (2), (3);

from (select /*+ REPARTITION(3) */ c1 from wangzhen_t1)
insert overwrite table wangzhen_t2 select c1
insert overwrite table wangzhen_t3 select c1;
```

In Spark 3.1, there is only one SQL execution and there is a reuse exchange.

![image](https://github.com/apache/spark/assets/17894939/5ff68392-aaa8-4e6b-8cac-1687880796b9)

However, in Spark 3.5, it was split to multiple executions and there was no ReuseExchange.

![image](https://github.com/apache/spark/assets/17894939/afdb14b6-5007-4923-802d-535149974ecf)
![image](https://github.com/apache/spark/assets/17894939/0d60e8db-9da7-4906-8d07-2b622b55e6ab)

### Does this PR introduce _any_ user-facing change?

yes,  multi  inserts will executed in one execution.

### How was this patch tested?

added unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #47224 from wForget/SPARK-48817.

Authored-by: wforget <[email protected]>
Signed-off-by: youxiduo <[email protected]>
  • Loading branch information
wForget authored and ulysses-you committed Jul 10, 2024
1 parent f738843 commit b5f3e1e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CommandResult, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CommandResult, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer, Union}
import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
Expand Down Expand Up @@ -110,15 +110,17 @@ class QueryExecution(
case _ => "command"
}

private def eagerlyExecuteCommands(p: LogicalPlan) = p transformDown {
case c: Command =>
private def eagerlyExecuteCommands(p: LogicalPlan) = {
def eagerlyExecute(
p: LogicalPlan,
name: String,
mode: CommandExecutionMode.Value): LogicalPlan = {
// Since Command execution will eagerly take place here,
// and in most cases be the bulk of time and effort,
// with the rest of processing of the root plan being just outputting command results,
// for eagerly executed commands we mark this place as beginning of execution.
tracker.setReadyForExecution()
val qe = sparkSession.sessionState.executePlan(c, CommandExecutionMode.NON_ROOT)
val name = commandExecutionName(c)
val qe = sparkSession.sessionState.executePlan(p, mode)
val result = QueryExecution.withInternalError(s"Eagerly executed $name failed.") {
SQLExecution.withNewExecutionId(qe, Some(name)) {
qe.executedPlan.executeCollect()
Expand All @@ -129,7 +131,14 @@ class QueryExecution(
qe.commandExecuted,
qe.executedPlan,
result.toImmutableArraySeq)
case other => other
}
p transformDown {
case u @ Union(children, _, _) if children.forall(_.isInstanceOf[Command]) =>
eagerlyExecute(u, "multi-commands", CommandExecutionMode.SKIP)
case c: Command =>
val name = commandExecutionName(c)
eagerlyExecute(c, name, CommandExecutionMode.NON_ROOT)
}
}

// The plan that has been normalized by custom rules, so that it's more likely to hit cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ import org.apache.spark.{SparkConf, SparkNumberFormatException, SparkThrowable}
import org.apache.spark.sql.catalyst.expressions.Hex
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connector.catalog.InMemoryPartitionTableCatalog
import org.apache.spark.sql.execution.CommandResultExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
import org.apache.spark.unsafe.types.UTF8String

/**
* The base trait for SQL INSERT.
*/
trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
trait SQLInsertTestSuite extends QueryTest with SQLTestUtils with AdaptiveSparkPlanHelper {

import testImplicits._

Expand Down Expand Up @@ -519,6 +522,34 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
}
}
}

test("SPARK-48817: test multi inserts") {
withTable("t1", "t2", "t3") {
createTable("t1", Seq("i"), Seq("int"))
createTable("t2", Seq("i"), Seq("int"))
createTable("t3", Seq("i"), Seq("int"))
sql(s"INSERT INTO t1 VALUES (1), (2), (3)")
val df = sql(
"""
|FROM (select /*+ REPARTITION(3) */ i from t1)
|INSERT INTO t2 SELECT i
|INSERT INTO t3 SELECT i
|""".stripMargin
)
checkAnswer(spark.table("t2"), Seq(Row(1), Row(2), Row(3)))
checkAnswer(spark.table("t3"), Seq(Row(1), Row(2), Row(3)))

val commandResults = df.queryExecution.executedPlan.collect {
case c: CommandResultExec => c
}
assert(commandResults.size == 1)

val reusedExchanges = collect(commandResults.head.commandPhysicalPlan) {
case r: ReusedExchangeExec => r
}
assert(reusedExchanges.size == 1)
}
}
}

class FileSourceSQLInsertTestSuite extends SQLInsertTestSuite with SharedSparkSession {
Expand Down

0 comments on commit b5f3e1e

Please sign in to comment.