From f1d0c1ea7e2cb2d4f15240ae3a21882b3e716d66 Mon Sep 17 00:00:00 2001 From: Nicholas Jiang Date: Tue, 24 Sep 2024 18:56:46 +0800 Subject: [PATCH] [BLAZE-587] Replace spark pattern with spark- for maven profile and shim name (#588) --- .github/workflows/build-ce7-releases.yml | 2 +- .github/workflows/tpcds.yml | 39 ++++----- README.md | 18 ++-- benchmark-results/20240701-blaze300.md | 4 +- ...-vs-blaze300-query-time-20240701-tpch.png} | Bin ...k-3.3-vs-blaze300-query-time-20240701.png} | Bin dev/docker-build/docker-compose.yml | 3 +- pom.xml | 36 +++----- release-docker.sh | 2 +- .../blaze/InterceptedValidateSparkPlan.scala | 9 +- .../apache/spark/sql/blaze/ShimsImpl.scala | 80 +++++++----------- .../blaze/plan/ConvertToNativeExec.scala | 6 +- .../execution/blaze/plan/NativeAggExec.scala | 14 ++- .../plan/NativeBroadcastExchangeExec.scala | 6 +- .../blaze/plan/NativeExpandExec.scala | 6 +- .../blaze/plan/NativeFilterExec.scala | 6 +- .../blaze/plan/NativeGenerateExec.scala | 6 +- .../blaze/plan/NativeGlobalLimitExec.scala | 6 +- .../blaze/plan/NativeLocalLimitExec.scala | 6 +- ...NativeParquetInsertIntoHiveTableExec.scala | 20 ++--- .../blaze/plan/NativeParquetSinkExec.scala | 6 +- .../plan/NativePartialTakeOrderedExec.scala | 6 +- .../plan/NativeProjectExecProvider.scala | 13 ++- .../NativeRenameColumnsExecProvider.scala | 13 ++- .../plan/NativeShuffleExchangeExec.scala | 12 ++- .../execution/blaze/plan/NativeSortExec.scala | 6 +- .../blaze/plan/NativeTakeOrderedExec.scala | 6 +- .../blaze/plan/NativeUnionExec.scala | 6 +- .../blaze/plan/NativeWindowExec.scala | 6 +- .../BlazeBlockStoreShuffleReader.scala | 5 +- .../shuffle/BlazeRssShuffleManagerBase.scala | 6 +- .../blaze/shuffle/BlazeShuffleManager.scala | 14 ++- .../blaze/shuffle/BlazeShuffleWriter.scala | 4 +- .../blaze/plan/NativeBroadcastJoinExec.scala | 18 ++-- .../NativeShuffledHashJoinExecProvider.scala | 8 +- .../NativeSortMergeJoinExecProvider.scala | 6 +- 36 files changed, 157 insertions(+), 247 deletions(-) rename benchmark-results/{spark333-vs-blaze300-query-time-20240701-tpch.png => spark-3.3-vs-blaze300-query-time-20240701-tpch.png} (100%) rename benchmark-results/{spark333-vs-blaze300-query-time-20240701.png => spark-3.3-vs-blaze300-query-time-20240701.png} (100%) diff --git a/.github/workflows/build-ce7-releases.yml b/.github/workflows/build-ce7-releases.yml index 21fb6ded4..d5c1df209 100644 --- a/.github/workflows/build-ce7-releases.yml +++ b/.github/workflows/build-ce7-releases.yml @@ -11,7 +11,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - sparkver: [spark303, spark320, spark324, spark333, spark351] + sparkver: [spark-3.0, spark-3.1, spark-3.2, spark-3.3, spark-3.5] blazever: [3.0.1] steps: diff --git a/.github/workflows/tpcds.yml b/.github/workflows/tpcds.yml index de37515bc..f710f3775 100644 --- a/.github/workflows/tpcds.yml +++ b/.github/workflows/tpcds.yml @@ -5,44 +5,37 @@ on: push: jobs: - test-spark303: - name: Test Spark303 + test-spark-30: + name: Test spark-3.0 uses: ./.github/workflows/tpcds-reusable.yml with: - sparkver: spark303 + sparkver: spark-3.0 sparkurl: https://archive.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz - test-spark313: - name: Test Spark313 + test-spark-31: + name: Test spark-3.1 uses: ./.github/workflows/tpcds-reusable.yml with: - sparkver: spark313 + sparkver: spark-3.1 sparkurl: https://archive.apache.org/dist/spark/spark-3.1.3/spark-3.1.3-bin-hadoop2.7.tgz - test-spark320: - name: Test Spark320 + test-spark-32: + name: Test spark-3.2 uses: ./.github/workflows/tpcds-reusable.yml with: - sparkver: spark320 - sparkurl: https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop2.7.tgz - - test-spark324: - name: Test Spark324 - uses: ./.github/workflows/tpcds-reusable.yml - with: - sparkver: spark324 + sparkver: spark-3.2 sparkurl: https://archive.apache.org/dist/spark/spark-3.2.4/spark-3.2.4-bin-hadoop2.7.tgz - test-spark333: - name: Test Spark333 + test-spark-33: + name: Test spark-3.3 uses: ./.github/workflows/tpcds-reusable.yml with: - sparkver: spark333 + sparkver: spark-3.3 sparkurl: https://archive.apache.org/dist/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz - test-spark351: - name: Test Spark351 + test-spark-35: + name: Test spark-3.5 uses: ./.github/workflows/tpcds-reusable.yml with: - sparkver: spark351 - sparkurl: https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz + sparkver: spark-3.5 + sparkurl: https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz diff --git a/README.md b/README.md index 8506c8bdf..1526ac31c 100644 --- a/README.md +++ b/README.md @@ -76,17 +76,17 @@ Specify shims package of which spark version that you would like to run on. Currently we have supported these shims: -* spark303 - for spark3.0.x -* spark313 - for spark3.1.x -* spark324 - for spark3.2.x -* spark333 - for spark3.3.x -* spark351 - for spark3.5.x. +* spark-3.0 - for spark3.0.x +* spark-3.1 - for spark3.1.x +* spark-3.2 - for spark3.2.x +* spark-3.3 - for spark3.3.x +* spark-3.5 - for spark3.5.x. You could either build Blaze in pre mode for debugging or in release mode to unlock the full potential of Blaze. ```shell -SHIM=spark333 # or spark303/spark313/spark320/spark324/spark333/spark351 +SHIM=spark-3.3 # or spark-3.0/spark-3.1/spark-3.2/spark-3.3/spark-3.5 MODE=release # or pre mvn package -P"${SHIM}" -P"${MODE}" ``` @@ -98,7 +98,7 @@ directory. You can use the following command to build a centos-7 compatible release: ```shell -SHIM=spark333 MODE=release ./release-docker.sh +SHIM=spark-3.3 MODE=release ./release-docker.sh ``` ## Run Spark Job with Blaze Accelerator @@ -132,10 +132,10 @@ comparison with vanilla Spark 3.3.3. The benchmark result shows that Blaze save Stay tuned and join us for more upcoming thrilling numbers. TPC-DS Query time: ([How can I run TPC-DS benchmark?](./tpcds/README.md)) -![20240701-query-time-tpcds](./benchmark-results/spark333-vs-blaze300-query-time-20240701.png) +![20240701-query-time-tpcds](./benchmark-results/spark-3.3-vs-blaze300-query-time-20240701.png) TPC-H Query time: -![20240701-query-time-tpch](./benchmark-results/spark333-vs-blaze300-query-time-20240701-tpch.png) +![20240701-query-time-tpch](./benchmark-results/spark-3.3-vs-blaze300-query-time-20240701-tpch.png) We also encourage you to benchmark Blaze and share the results with us. 🤗 diff --git a/benchmark-results/20240701-blaze300.md b/benchmark-results/20240701-blaze300.md index 1106d5451..2a0121ff5 100644 --- a/benchmark-results/20240701-blaze300.md +++ b/benchmark-results/20240701-blaze300.md @@ -60,7 +60,7 @@ spark.sql.readSideCharPadding false ### TPC-DS Results Blaze saved 46% total query time comparing to spark, benchmarks using the above configuration. Query time comparison (seconds): -![spark333-vs-blaze300-query-time-20240701.png](spark333-vs-blaze300-query-time-20240701.png) +![spark-3.3-vs-blaze300-query-time-20240701.png](spark-3.3-vs-blaze300-query-time-20240701.png) | | Blaze | Spark | Speedup(x) | | ------ | -------- | -------- | ---------- | @@ -172,7 +172,7 @@ Query time comparison (seconds): ### TPC-H Results Blaze saved 55% total query time comparing to spark, benchmarks using the above configuration. Query time comparison (seconds): -![spark333-vs-blaze300-query-time-20240701-tpch.png](spark333-vs-blaze300-query-time-20240701-tpch.png) +![spark-3.3-vs-blaze300-query-time-20240701-tpch.png](spark-3.3-vs-blaze300-query-time-20240701-tpch.png) | | Blaze | Spark | Speedup(x) | | ------ | ------- | -------- | ---------- | diff --git a/benchmark-results/spark333-vs-blaze300-query-time-20240701-tpch.png b/benchmark-results/spark-3.3-vs-blaze300-query-time-20240701-tpch.png similarity index 100% rename from benchmark-results/spark333-vs-blaze300-query-time-20240701-tpch.png rename to benchmark-results/spark-3.3-vs-blaze300-query-time-20240701-tpch.png diff --git a/benchmark-results/spark333-vs-blaze300-query-time-20240701.png b/benchmark-results/spark-3.3-vs-blaze300-query-time-20240701.png similarity index 100% rename from benchmark-results/spark333-vs-blaze300-query-time-20240701.png rename to benchmark-results/spark-3.3-vs-blaze300-query-time-20240701.png diff --git a/dev/docker-build/docker-compose.yml b/dev/docker-build/docker-compose.yml index 444d9b4c2..039b08f95 100644 --- a/dev/docker-build/docker-compose.yml +++ b/dev/docker-build/docker-compose.yml @@ -11,8 +11,7 @@ services: - ./../../:/blaze:rw - ./../../target-docker:/blaze/target:rw - ./../../target-docker/spark-extension-target:/blaze/spark-extension/target:rw - - ./../../target-docker/spark-extension-shims-spark303-target:/blaze/spark-extension-shims-spark303/target:rw - - ./../../target-docker/spark-extension-shims-spark241kwaiae-target:/blaze/spark-extension-shims-spark241kwaiae/target:rw + - ./../../target-docker/spark-extension-shims-spark-3.0-target:/blaze/spark-extension-shims-spark-3.0/target:rw - ./../../target-docker/build-helper-proto-target:/blaze/build-helper/proto/target:rw - ./../../target-docker/build-helper-assembly-target:/blaze/build-helper/assembly/target:rw environment: diff --git a/pom.xml b/pom.xml index eaab07c43..53ff1a18d 100644 --- a/pom.xml +++ b/pom.xml @@ -261,9 +261,9 @@ - spark303 + spark-3.0 - spark303 + spark-3.0 spark-extension-shims-spark3 1.8 2.12 @@ -275,9 +275,9 @@ - spark313 + spark-3.1 - spark313 + spark-3.1 spark-extension-shims-spark3 1.8 2.12 @@ -289,23 +289,9 @@ - spark320 + spark-3.2 - spark320 - spark-extension-shims-spark3 - 1.8 - 2.12 - 2.12.15 - 3.2.9 - 3.0.0 - 3.2.0 - - - - - spark324 - - spark324 + spark-3.2 spark-extension-shims-spark3 1.8 2.12 @@ -317,9 +303,9 @@ - spark333 + spark-3.3 - spark333 + spark-3.3 spark-extension-shims-spark3 1.8 2.12 @@ -331,16 +317,16 @@ - spark351 + spark-3.5 - spark351 + spark-3.5 spark-extension-shims-spark3 1.8 2.12 2.12.15 3.2.9 3.0.0 - 3.5.1 + 3.5.2 diff --git a/release-docker.sh b/release-docker.sh index fdf2ac679..12821d630 100755 --- a/release-docker.sh +++ b/release-docker.sh @@ -1,6 +1,6 @@ #!/bin/bash -export SHIM="${SHIM:-spark303}" +export SHIM="${SHIM:-spark-3.0}" export MODE="${MODE:-release}" docker-compose -f dev/docker-build/docker-compose.yml up diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/InterceptedValidateSparkPlan.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/InterceptedValidateSparkPlan.scala index b34db7c75..590d904d6 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/InterceptedValidateSparkPlan.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/InterceptedValidateSparkPlan.scala @@ -22,7 +22,7 @@ import com.thoughtworks.enableIf object InterceptedValidateSparkPlan extends Logging { - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) def validate(plan: SparkPlan): Unit = { import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec import org.apache.spark.sql.execution.blaze.plan.NativeRenameColumnsBase @@ -70,13 +70,12 @@ object InterceptedValidateSparkPlan extends Logging { } } - @enableIf(Seq("spark303", "spark313", "spark320").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) def validate(plan: SparkPlan): Unit = { - throw new UnsupportedOperationException( - "validate is not supported in spark 3.0.3 or 3.1.3 or spark 3.2.0") + throw new UnsupportedOperationException("validate is not supported in spark 3.0.3 or 3.1.3") } - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) private def errorOnInvalidBroadcastQueryStage(plan: SparkPlan): Unit = { import org.apache.spark.sql.execution.adaptive.InvalidAQEPlanException throw InvalidAQEPlanException("Invalid broadcast query stage", plan) diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala index ab187380b..2d554c66d 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala @@ -115,20 +115,18 @@ import com.thoughtworks.enableIf class ShimsImpl extends Shims with Logging { - @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim"))) - override def shimVersion: String = "spark303" - @enableIf(Seq("spark313").contains(System.getProperty("blaze.shim"))) - override def shimVersion: String = "spark313" - @enableIf(Seq("spark320").contains(System.getProperty("blaze.shim"))) - override def shimVersion: String = "spark320" - @enableIf(Seq("spark324").contains(System.getProperty("blaze.shim"))) - override def shimVersion: String = "spark324" - @enableIf(Seq("spark333").contains(System.getProperty("blaze.shim"))) - override def shimVersion: String = "spark333" - @enableIf(Seq("spark351").contains(System.getProperty("blaze.shim"))) - override def shimVersion: String = "spark351" - - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim"))) + override def shimVersion: String = "spark-3.0" + @enableIf(Seq("spark-3.1").contains(System.getProperty("blaze.shim"))) + override def shimVersion: String = "spark-3.1" + @enableIf(Seq("spark-3.2").contains(System.getProperty("blaze.shim"))) + override def shimVersion: String = "spark-3.2" + @enableIf(Seq("spark-3.3").contains(System.getProperty("blaze.shim"))) + override def shimVersion: String = "spark-3.3" + @enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim"))) + override def shimVersion: String = "spark-3.5" + + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override def initExtension(): Unit = { ValidateSparkPlanInjector.inject() @@ -143,7 +141,7 @@ class ShimsImpl extends Shims with Logging { } } - @enableIf(Seq("spark303", "spark320").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def initExtension(): Unit = { if (BlazeConf.FORCE_SHUFFLED_HASH_JOIN.booleanConf()) { logWarning(s"${BlazeConf.FORCE_SHUFFLED_HASH_JOIN.key} is not supported in $shimVersion") @@ -370,9 +368,7 @@ class ShimsImpl extends Shims with Logging { length: Long, numRecords: Long): FileSegment = new FileSegment(file, offset, length) - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override def commit( dep: ShuffleDependency[_, _, _], shuffleBlockResolver: IndexShuffleBlockResolver, @@ -392,7 +388,7 @@ class ShimsImpl extends Shims with Logging { MapStatus.apply(SparkEnv.get.blockManager.shuffleServerId, partitionLengths, mapId) } - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def commit( dep: ShuffleDependency[_, _, _], shuffleBlockResolver: IndexShuffleBlockResolver, @@ -513,23 +509,19 @@ class ShimsImpl extends Shims with Logging { expr.asInstanceOf[AggregateExpression].filter } - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) private def isAQEShuffleRead(exec: SparkPlan): Boolean = { import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec exec.isInstanceOf[AQEShuffleReadExec] } - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) private def isAQEShuffleRead(exec: SparkPlan): Boolean = { import org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec exec.isInstanceOf[CustomShuffleReaderExec] } - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) private def executeNativeAQEShuffleReader(exec: SparkPlan): NativeRDD = { import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec import org.apache.spark.sql.execution.CoalescedMapperPartitionSpec @@ -619,7 +611,7 @@ class ShimsImpl extends Shims with Logging { } } - @enableIf(Seq("spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.1").contains(System.getProperty("blaze.shim"))) private def executeNativeAQEShuffleReader(exec: SparkPlan): NativeRDD = { import org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec @@ -698,7 +690,7 @@ class ShimsImpl extends Shims with Logging { } } - @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim"))) private def executeNativeAQEShuffleReader(exec: SparkPlan): NativeRDD = { import org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec @@ -787,13 +779,11 @@ class ShimsImpl extends Shims with Logging { } } - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override def getSqlContext(sparkPlan: SparkPlan): SQLContext = sparkPlan.session.sqlContext - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def getSqlContext(sparkPlan: SparkPlan): SQLContext = sparkPlan.sqlContext override def createNativeExprWrapper( @@ -804,7 +794,7 @@ class ShimsImpl extends Shims with Logging { } @enableIf( - Seq("spark303", "spark313", "spark320", "spark324", "spark333").contains( + Seq("spark-3.0", "spark-3.1", "spark-3.2", "spark-3.3").contains( System.getProperty("blaze.shim"))) private def convertPromotePrecision( e: Expression, @@ -818,13 +808,13 @@ class ShimsImpl extends Shims with Logging { } } - @enableIf(Seq("spark351").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim"))) private def convertPromotePrecision( e: Expression, isPruningExpr: Boolean, fallback: Expression => pb.PhysicalExprNode): Option[pb.PhysicalExprNode] = None - @enableIf(Seq("spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) private def convertBloomFilterAgg(agg: AggregateFunction): Option[pb.PhysicalAggExprNode] = { import org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate agg match { @@ -844,12 +834,10 @@ class ShimsImpl extends Shims with Logging { } } - @enableIf( - Seq("spark303", "spark313", "spark320", "spark324").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1", "spark-3.2").contains(System.getProperty("blaze.shim"))) private def convertBloomFilterAgg(agg: AggregateFunction): Option[pb.PhysicalAggExprNode] = None - @enableIf(Seq("spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) private def convertBloomFilterMightContain( e: Expression, isPruningExpr: Boolean, @@ -870,9 +858,7 @@ class ShimsImpl extends Shims with Logging { } } - @enableIf( - Seq("spark303", "spark313", "spark320", "spark324").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1", "spark-3.2").contains(System.getProperty("blaze.shim"))) private def convertBloomFilterMightContain( e: Expression, isPruningExpr: Boolean, @@ -883,13 +869,11 @@ class ShimsImpl extends Shims with Logging { case class ForceNativeExecutionWrapper(override val child: SparkPlan) extends ForceNativeExecutionWrapperBase(child) { - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(child = newChildren.head) } @@ -900,8 +884,6 @@ case class NativeExprWrapper( override val nullable: Boolean) extends NativeExprWrapperBase(nativeExpr, dataType, nullable) { - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = copy() } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/ConvertToNativeExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/ConvertToNativeExec.scala index 4f5270fa5..e44ea1cdd 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/ConvertToNativeExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/ConvertToNativeExec.scala @@ -21,13 +21,11 @@ import com.thoughtworks.enableIf case class ConvertToNativeExec(override val child: SparkPlan) extends ConvertToNativeBase(child) { - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(child = newChildren.head) } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala index 11e55ebd9..2f91b1fe4 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala @@ -48,12 +48,12 @@ case class NativeAggExec( with BaseAggregateExec { @enableIf( - Seq("spark313", "spark320", "spark324", "spark333", "spark351").contains( + Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains( System.getProperty("blaze.shim"))) override val requiredChildDistributionExpressions: Option[Seq[Expression]] = theRequiredChildDistributionExpressions - @enableIf(Seq("spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override val initialInputBufferOffset: Int = theInitialInputBufferOffset override def output: Seq[Attribute] = @@ -65,21 +65,19 @@ case class NativeAggExec( ExprId.apply(NativeAggBase.AGG_BUF_COLUMN_EXPR_ID)) } - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override def isStreaming: Boolean = false - @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override def numShufflePartitions: Option[Int] = None override def resultExpressions: Seq[NamedExpression] = output - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(child = newChildren.head) } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastExchangeExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastExchangeExec.scala index e380875f5..8838d8bc8 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastExchangeExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastExchangeExec.scala @@ -42,13 +42,11 @@ case class NativeBroadcastExchangeExec(mode: BroadcastMode, override val child: relationFuturePromise.future } - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(child = newChildren.head) } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeExpandExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeExpandExec.scala index 629a52e1a..b04057518 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeExpandExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeExpandExec.scala @@ -27,13 +27,11 @@ case class NativeExpandExec( override val child: SparkPlan) extends NativeExpandBase(projections, output, child) { - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(child = newChildren.head) } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterExec.scala index 1e63d9a1a..2a4b06c6a 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterExec.scala @@ -23,13 +23,11 @@ import com.thoughtworks.enableIf case class NativeFilterExec(condition: Expression, override val child: SparkPlan) extends NativeFilterBase(condition, child) { - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(child = newChildren.head) } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGenerateExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGenerateExec.scala index e5a4a2738..3168036fb 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGenerateExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGenerateExec.scala @@ -29,13 +29,11 @@ case class NativeGenerateExec( override val child: SparkPlan) extends NativeGenerateBase(generator, requiredChildOutput, outer, generatorOutput, child) { - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(child = newChildren.head) } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGlobalLimitExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGlobalLimitExec.scala index 3861a29d3..25c387e06 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGlobalLimitExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGlobalLimitExec.scala @@ -22,13 +22,11 @@ import com.thoughtworks.enableIf case class NativeGlobalLimitExec(limit: Long, override val child: SparkPlan) extends NativeGlobalLimitBase(limit, child) { - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(child = newChildren.head) } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeLocalLimitExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeLocalLimitExec.scala index 282a0867c..faf3b28b0 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeLocalLimitExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeLocalLimitExec.scala @@ -22,13 +22,11 @@ import com.thoughtworks.enableIf case class NativeLocalLimitExec(limit: Long, override val child: SparkPlan) extends NativeLocalLimitBase(limit, child) { - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(child = newChildren.head) } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala index a6eb27783..00b8e6d1b 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala @@ -32,7 +32,7 @@ case class NativeParquetInsertIntoHiveTableExec( extends NativeParquetInsertIntoHiveTableBase(cmd, child) { @enableIf( - Seq("spark303", "spark313", "spark320", "spark324", "spark333").contains( + Seq("spark-3.0", "spark-3.1", "spark-3.2", "spark-3.3").contains( System.getProperty("blaze.shim"))) override protected def getInsertIntoHiveTableCommand( table: CatalogTable, @@ -52,7 +52,7 @@ case class NativeParquetInsertIntoHiveTableExec( metrics) } - @enableIf(Seq("spark351").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim"))) override protected def getInsertIntoHiveTableCommand( table: CatalogTable, partition: Map[String, Option[String]], @@ -71,18 +71,16 @@ case class NativeParquetInsertIntoHiveTableExec( metrics) } - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(child = newChildren.head) @enableIf( - Seq("spark303", "spark313", "spark320", "spark324", "spark333").contains( + Seq("spark-3.0", "spark-3.1", "spark-3.2", "spark-3.3").contains( System.getProperty("blaze.shim"))) class BlazeInsertIntoHiveTable303( table: CatalogTable, @@ -108,7 +106,7 @@ case class NativeParquetInsertIntoHiveTableExec( super.run(sparkSession, nativeParquetSink) } - @enableIf(Seq("spark320", "spark324", "spark333").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3").contains(System.getProperty("blaze.shim"))) override def basicWriteJobStatsTracker(hadoopConf: org.apache.hadoop.conf.Configuration) = { import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker @@ -134,7 +132,7 @@ case class NativeParquetInsertIntoHiveTableExec( } } - @enableIf(Seq("spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.1").contains(System.getProperty("blaze.shim"))) override def basicWriteJobStatsTracker(hadoopConf: org.apache.hadoop.conf.Configuration) = { import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker @@ -172,7 +170,7 @@ case class NativeParquetInsertIntoHiveTableExec( } } - @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim"))) override def basicWriteJobStatsTracker(hadoopConf: org.apache.hadoop.conf.Configuration) = { import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker @@ -202,7 +200,7 @@ case class NativeParquetInsertIntoHiveTableExec( } } - @enableIf(Seq("spark351").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim"))) class BlazeInsertIntoHiveTable351( table: CatalogTable, partition: Map[String, Option[String]], diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkExec.scala index 0faba5021..aa673db86 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkExec.scala @@ -30,13 +30,11 @@ case class NativeParquetSinkExec( override val metrics: Map[String, SQLMetric]) extends NativeParquetSinkBase(sparkSession, table, partition, child, metrics) { - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(child = newChildren.head) } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativePartialTakeOrderedExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativePartialTakeOrderedExec.scala index d54323d77..c30350db1 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativePartialTakeOrderedExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativePartialTakeOrderedExec.scala @@ -28,13 +28,11 @@ case class NativePartialTakeOrderedExec( override val metrics: Map[String, SQLMetric]) extends NativePartialTakeOrderedBase(limit, sortOrder, child, metrics) { - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(child = newChildren.head) } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectExecProvider.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectExecProvider.scala index a1eafbb58..3943516d0 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectExecProvider.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectExecProvider.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.execution.SparkPlan import com.thoughtworks.enableIf case object NativeProjectExecProvider { - @enableIf(Seq("spark351").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim"))) def provide( projectList: Seq[NamedExpression], child: SparkPlan, @@ -49,9 +49,7 @@ case object NativeProjectExecProvider { NativeProjectExec(projectList, child, addTypeCast) } - @enableIf( - Seq("spark313", "spark320", "spark324", "spark333").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.1", "spark-3.2", "spark-3.3").contains(System.getProperty("blaze.shim"))) def provide( projectList: Seq[NamedExpression], child: SparkPlan, @@ -67,12 +65,11 @@ case object NativeProjectExecProvider { with AliasAwareOutputPartitioning with AliasAwareOutputOrdering { - @enableIf( - Seq("spark320", "spark324", "spark333").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3").contains(System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) - @enableIf(Seq("spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.1").contains(System.getProperty("blaze.shim"))) override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(child = newChildren.head) @@ -85,7 +82,7 @@ case object NativeProjectExecProvider { NativeProjectExec(projectList, child, addTypeCast) } - @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim"))) def provide( projectList: Seq[NamedExpression], child: SparkPlan, diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeRenameColumnsExecProvider.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeRenameColumnsExecProvider.scala index 5d8e9d30c..853b76686 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeRenameColumnsExecProvider.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeRenameColumnsExecProvider.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.execution.SparkPlan import com.thoughtworks.enableIf case object NativeRenameColumnsExecProvider { - @enableIf(Seq("spark351").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim"))) def provide(child: SparkPlan, renamedColumnNames: Seq[String]): NativeRenameColumnsBase = { import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.catalyst.expressions.SortOrder @@ -44,9 +44,7 @@ case object NativeRenameColumnsExecProvider { NativeRenameColumnsExec(child, renamedColumnNames) } - @enableIf( - Seq("spark313", "spark320", "spark324", "spark333").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.1", "spark-3.2", "spark-3.3").contains(System.getProperty("blaze.shim"))) def provide(child: SparkPlan, renamedColumnNames: Seq[String]): NativeRenameColumnsBase = { import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.catalyst.expressions.SortOrder @@ -60,12 +58,11 @@ case object NativeRenameColumnsExecProvider { with AliasAwareOutputPartitioning with AliasAwareOutputOrdering { - @enableIf( - Seq("spark320", "spark324", "spark333").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3").contains(System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) - @enableIf(Seq("spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.1").contains(System.getProperty("blaze.shim"))) override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(child = newChildren.head) @@ -76,7 +73,7 @@ case object NativeRenameColumnsExecProvider { NativeRenameColumnsExec(child, renamedColumnNames) } - @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim"))) def provide(child: SparkPlan, renamedColumnNames: Seq[String]): NativeRenameColumnsBase = { case class NativeRenameColumnsExec( override val child: SparkPlan, diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala index acd93fe91..78c8dd481 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala @@ -148,28 +148,26 @@ case class NativeShuffleExchangeExec( // for databricks testing val causedBroadcastJoinBuildOOM = false - @enableIf(Seq("spark351").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim"))) override def advisoryPartitionSize: Option[Long] = None // If users specify the num partitions via APIs like `repartition`, we shouldn't change it. // For `SinglePartition`, it requires exactly one partition and we can't change it either. - @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim"))) override def canChangeNumPartitions: Boolean = outputPartitioning != SinglePartition @enableIf( - Seq("spark313", "spark320", "spark324", "spark333", "spark351").contains( + Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains( System.getProperty("blaze.shim"))) override def shuffleOrigin = org.apache.spark.sql.execution.exchange.ENSURE_REQUIREMENTS - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(child = newChildren.head) } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeSortExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeSortExec.scala index 39186b71f..7e569837a 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeSortExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeSortExec.scala @@ -26,13 +26,11 @@ case class NativeSortExec( override val child: SparkPlan) extends NativeSortBase(sortOrder, global, child) { - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(child = newChildren.head) } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeTakeOrderedExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeTakeOrderedExec.scala index fda51d776..03afc5e31 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeTakeOrderedExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeTakeOrderedExec.scala @@ -26,13 +26,11 @@ case class NativeTakeOrderedExec( override val child: SparkPlan) extends NativeTakeOrderedBase(limit, sortOrder, child) { - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(child = newChildren.head) } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeUnionExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeUnionExec.scala index 73aadc5f3..be790c558 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeUnionExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeUnionExec.scala @@ -22,13 +22,11 @@ import com.thoughtworks.enableIf case class NativeUnionExec(override val children: Seq[SparkPlan]) extends NativeUnionBase(children) { - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = copy(children = newChildren) - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(children = newChildren) } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeWindowExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeWindowExec.scala index 235d5c02c..0b2f4a0d9 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeWindowExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeWindowExec.scala @@ -29,13 +29,11 @@ case class NativeWindowExec( override val child: SparkPlan) extends NativeWindowBase(windowExpression, partitionSpec, orderSpec, child) { - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(child = newChildren.head) } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReader.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReader.scala index e9a48bbf7..9d9d10032 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReader.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReader.scala @@ -45,8 +45,7 @@ class BlazeBlockStoreShuffleReader[K, C]( override def readBlocks(): Iterator[(BlockId, InputStream)] = { @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) def fetchIterator = new ShuffleBlockFetcherIterator( context, blockManager.blockStoreClient, @@ -67,7 +66,7 @@ class BlazeBlockStoreShuffleReader[K, C]( readMetrics, fetchContinuousBlocksInBatch).toCompletionIterator - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) def fetchIterator = new ShuffleBlockFetcherIterator( context, blockManager.blockStoreClient, diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeRssShuffleManagerBase.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeRssShuffleManagerBase.scala index 662358544..96a6b3f34 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeRssShuffleManagerBase.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeRssShuffleManagerBase.scala @@ -76,7 +76,7 @@ abstract class BlazeRssShuffleManagerBase(conf: SparkConf) extends ShuffleManage metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] @enableIf( - Seq("spark313", "spark320", "spark324", "spark333", "spark351").contains( + Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains( System.getProperty("blaze.shim"))) override def getReader[K, C]( handle: ShuffleHandle, @@ -108,7 +108,7 @@ abstract class BlazeRssShuffleManagerBase(conf: SparkConf) extends ShuffleManage } } - @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim"))) override def getReader[K, C]( handle: ShuffleHandle, startPartition: Int, @@ -123,7 +123,7 @@ abstract class BlazeRssShuffleManagerBase(conf: SparkConf) extends ShuffleManage } } - @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim"))) override def getReaderForRange[K, C]( handle: ShuffleHandle, startMapIndex: Int, diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleManager.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleManager.scala index 75c74affb..06c8fb2c3 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleManager.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleManager.scala @@ -48,9 +48,7 @@ class BlazeShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { sortShuffleManager.registerShuffle(shuffleId, dependency) } - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override def getReader[K, C]( handle: ShuffleHandle, startMapIndex: Int, @@ -63,9 +61,9 @@ class BlazeShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { if (isArrowShuffle(handle)) { val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]] - @enableIf(Seq("spark320", "spark324").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2").contains(System.getProperty("blaze.shim"))) def shuffleMergeFinalized = baseShuffleHandle.dependency.shuffleMergeFinalized - @enableIf(Seq("spark333", "spark351").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) def shuffleMergeFinalized = baseShuffleHandle.dependency.isShuffleMergeFinalizedMarked val (blocksByAddress, canEnableBatchFetch) = @@ -108,7 +106,7 @@ class BlazeShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { } } - @enableIf(Seq("spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.1").contains(System.getProperty("blaze.shim"))) override def getReader[K, C]( handle: ShuffleHandle, startMapIndex: Int, @@ -145,7 +143,7 @@ class BlazeShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { } } - @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim"))) override def getReader[K, C]( handle: ShuffleHandle, startPartition: Int, @@ -172,7 +170,7 @@ class BlazeShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { } } - @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim"))) override def getReaderForRange[K, C]( handle: ShuffleHandle, startMapIndex: Int, diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriter.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriter.scala index 04043b70d..8ad299f34 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriter.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriter.scala @@ -22,8 +22,6 @@ import com.thoughtworks.enableIf class BlazeShuffleWriter[K, V](metrics: ShuffleWriteMetricsReporter) extends BlazeShuffleWriterBase[K, V](metrics) { - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override def getPartitionLengths(): Array[Long] = partitionLengths } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeBroadcastJoinExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeBroadcastJoinExec.scala index ec6b88171..45b6f55f8 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeBroadcastJoinExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeBroadcastJoinExec.scala @@ -48,7 +48,7 @@ case class NativeBroadcastJoinExec( override val condition: Option[Expression] = None @enableIf( - Seq("spark313", "spark320", "spark324", "spark333", "spark351").contains( + Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains( System.getProperty("blaze.shim"))) override def buildSide: org.apache.spark.sql.catalyst.optimizer.BuildSide = broadcastSide match { @@ -56,14 +56,14 @@ case class NativeBroadcastJoinExec( case BroadcastRight => org.apache.spark.sql.catalyst.optimizer.BuildRight } - @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim"))) override val buildSide: org.apache.spark.sql.execution.joins.BuildSide = broadcastSide match { case BroadcastLeft => org.apache.spark.sql.execution.joins.BuildLeft case BroadcastRight => org.apache.spark.sql.execution.joins.BuildRight } @enableIf( - Seq("spark313", "spark320", "spark324", "spark333", "spark351").contains( + Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains( System.getProperty("blaze.shim"))) override def requiredChildDistribution = { import org.apache.spark.sql.catalyst.plans.physical.BroadcastDistribution @@ -80,19 +80,19 @@ case class NativeBroadcastJoinExec( } @enableIf( - Seq("spark313", "spark320", "spark324", "spark333", "spark351").contains( + Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains( System.getProperty("blaze.shim"))) override def supportCodegen: Boolean = false @enableIf( - Seq("spark313", "spark320", "spark324", "spark333", "spark351").contains( + Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains( System.getProperty("blaze.shim"))) override def inputRDDs() = { throw new NotImplementedError("NativeBroadcastJoin dose not support codegen") } @enableIf( - Seq("spark313", "spark320", "spark324", "spark333", "spark351").contains( + Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains( System.getProperty("blaze.shim"))) override protected def prepareRelation( ctx: org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext) @@ -100,15 +100,13 @@ case class NativeBroadcastJoinExec( throw new NotImplementedError("NativeBroadcastJoin dose not support codegen") } - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) override protected def withNewChildrenInternal( newLeft: SparkPlan, newRight: SparkPlan): SparkPlan = copy(left = newLeft, right = newRight) - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(left = newChildren(0), right = newChildren(1)) } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeShuffledHashJoinExecProvider.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeShuffledHashJoinExecProvider.scala index 6582154b5..2e04013ac 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeShuffledHashJoinExecProvider.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeShuffledHashJoinExecProvider.scala @@ -25,9 +25,7 @@ import com.thoughtworks.enableIf case object NativeShuffledHashJoinExecProvider { - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) def provide( left: SparkPlan, right: SparkPlan, @@ -74,7 +72,7 @@ case object NativeShuffledHashJoinExecProvider { NativeShuffledHashJoinExec(left, right, leftKeys, rightKeys, joinType, buildSide) } - @enableIf(Seq("spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.1").contains(System.getProperty("blaze.shim"))) def provide( left: SparkPlan, right: SparkPlan, @@ -120,7 +118,7 @@ case object NativeShuffledHashJoinExecProvider { NativeShuffledHashJoinExec(left, right, leftKeys, rightKeys, joinType, buildSide) } - @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim"))) def provide( left: SparkPlan, right: SparkPlan, diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeSortMergeJoinExecProvider.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeSortMergeJoinExecProvider.scala index 42b8787f5..50fbd98b5 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeSortMergeJoinExecProvider.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeSortMergeJoinExecProvider.scala @@ -24,9 +24,7 @@ import com.thoughtworks.enableIf case object NativeSortMergeJoinExecProvider { - @enableIf( - Seq("spark320", "spark324", "spark333", "spark351").contains( - System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim"))) def provide( left: SparkPlan, right: SparkPlan, @@ -71,7 +69,7 @@ case object NativeSortMergeJoinExecProvider { NativeSortMergeJoinExec(left, right, leftKeys, rightKeys, joinType) } - @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim"))) + @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim"))) def provide( left: SparkPlan, right: SparkPlan,