Skip to content

Commit

Permalink
[BLAZE-573] Support Spark 3.4 version (#589)
Browse files Browse the repository at this point in the history
  • Loading branch information
SteNicholas authored Sep 25, 2024
1 parent 4e9d948 commit 5fd90b4
Show file tree
Hide file tree
Showing 31 changed files with 151 additions and 61 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-ce7-releases.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
sparkver: [spark-3.0, spark-3.1, spark-3.2, spark-3.3, spark-3.5]
sparkver: [spark-3.0, spark-3.1, spark-3.2, spark-3.3, spark-3.4, spark-3.5]
blazever: [3.0.1]

steps:
Expand Down
11 changes: 9 additions & 2 deletions .github/workflows/tpcds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,18 @@ jobs:
uses: ./.github/workflows/tpcds-reusable.yml
with:
sparkver: spark-3.3
sparkurl: https://archive.apache.org/dist/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz
sparkurl: https://archive.apache.org/dist/spark/spark-3.3.4/spark-3.3.4-bin-hadoop3.tgz

test-spark-34:
name: Test spark-3.4
uses: ./.github/workflows/tpcds-reusable.yml
with:
sparkver: spark-3.4
sparkurl: https://archive.apache.org/dist/spark/spark-3.4.3/spark-3.4.3-bin-hadoop3.tgz

test-spark-35:
name: Test spark-3.5
uses: ./.github/workflows/tpcds-reusable.yml
with:
sparkver: spark-3.5
sparkurl: https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz
sparkurl: https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,14 @@ Currently we have supported these shims:
* spark-3.1 - for spark3.1.x
* spark-3.2 - for spark3.2.x
* spark-3.3 - for spark3.3.x
* spark-3.4 - for spark3.4.x
* spark-3.5 - for spark3.5.x.

You could either build Blaze in pre mode for debugging or in release mode to unlock the full potential of
Blaze.

```shell
SHIM=spark-3.3 # or spark-3.0/spark-3.1/spark-3.2/spark-3.3/spark-3.5
SHIM=spark-3.3 # or spark-3.0/spark-3.1/spark-3.2/spark-3.3/spark-3.4/spark-3.5
MODE=release # or pre
mvn package -P"${SHIM}" -P"${MODE}"
```
Expand Down
18 changes: 16 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,21 @@
<scalaLongVersion>2.12.15</scalaLongVersion>
<scalaTestVersion>3.2.9</scalaTestVersion>
<scalafmtVersion>3.0.0</scalafmtVersion>
<sparkVersion>3.3.3</sparkVersion>
<sparkVersion>3.3.4</sparkVersion>
</properties>
</profile>

<profile>
<id>spark-3.4</id>
<properties>
<shimName>spark-3.4</shimName>
<shimPkg>spark-extension-shims-spark3</shimPkg>
<javaVersion>1.8</javaVersion>
<scalaVersion>2.12</scalaVersion>
<scalaLongVersion>2.12.15</scalaLongVersion>
<scalaTestVersion>3.2.9</scalaTestVersion>
<scalafmtVersion>3.0.0</scalafmtVersion>
<sparkVersion>3.4.3</sparkVersion>
</properties>
</profile>

Expand All @@ -332,7 +346,7 @@
<scalaLongVersion>2.12.15</scalaLongVersion>
<scalaTestVersion>3.2.9</scalaTestVersion>
<scalafmtVersion>3.0.0</scalafmtVersion>
<sparkVersion>3.5.2</sparkVersion>
<sparkVersion>3.5.3</sparkVersion>
</properties>
</profile>
</profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import com.thoughtworks.enableIf

object InterceptedValidateSparkPlan extends Logging {

@enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
def validate(plan: SparkPlan): Unit = {
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.blaze.plan.NativeRenameColumnsBase
Expand Down Expand Up @@ -75,7 +77,9 @@ object InterceptedValidateSparkPlan extends Logging {
throw new UnsupportedOperationException("validate is not supported in spark 3.0.3 or 3.1.3")
}

@enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
private def errorOnInvalidBroadcastQueryStage(plan: SparkPlan): Unit = {
import org.apache.spark.sql.execution.adaptive.InvalidAQEPlanException
throw InvalidAQEPlanException("Invalid broadcast query stage", plan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,14 @@ class ShimsImpl extends Shims with Logging {
override def shimVersion: String = "spark-3.2"
@enableIf(Seq("spark-3.3").contains(System.getProperty("blaze.shim")))
override def shimVersion: String = "spark-3.3"
@enableIf(Seq("spark-3.4").contains(System.getProperty("blaze.shim")))
override def shimVersion: String = "spark-3.4"
@enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim")))
override def shimVersion: String = "spark-3.5"

@enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override def initExtension(): Unit = {
ValidateSparkPlanInjector.inject()

Expand All @@ -135,7 +139,7 @@ class ShimsImpl extends Shims with Logging {
}

// disable MultiCommutativeOp suggested in spark3.4+
if (shimVersion >= "spark340") {
if (shimVersion >= "spark-3.4") {
val confName = "spark.sql.analyzer.canonicalization.multiCommutativeOpMemoryOptThreshold"
SparkEnv.get.conf.set(confName, Int.MaxValue.toString)
}
Expand Down Expand Up @@ -368,7 +372,9 @@ class ShimsImpl extends Shims with Logging {
length: Long,
numRecords: Long): FileSegment = new FileSegment(file, offset, length)

@enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override def commit(
dep: ShuffleDependency[_, _, _],
shuffleBlockResolver: IndexShuffleBlockResolver,
Expand Down Expand Up @@ -509,7 +515,9 @@ class ShimsImpl extends Shims with Logging {
expr.asInstanceOf[AggregateExpression].filter
}

@enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
private def isAQEShuffleRead(exec: SparkPlan): Boolean = {
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
exec.isInstanceOf[AQEShuffleReadExec]
Expand All @@ -521,7 +529,9 @@ class ShimsImpl extends Shims with Logging {
exec.isInstanceOf[CustomShuffleReaderExec]
}

@enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
private def executeNativeAQEShuffleReader(exec: SparkPlan): NativeRDD = {
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
import org.apache.spark.sql.execution.CoalescedMapperPartitionSpec
Expand Down Expand Up @@ -779,7 +789,9 @@ class ShimsImpl extends Shims with Logging {
}
}

@enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override def getSqlContext(sparkPlan: SparkPlan): SQLContext =
sparkPlan.session.sqlContext

Expand Down Expand Up @@ -808,13 +820,13 @@ class ShimsImpl extends Shims with Logging {
}
}

@enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(Seq("spark-3.4", "spark-3.5").contains(System.getProperty("blaze.shim")))
private def convertPromotePrecision(
e: Expression,
isPruningExpr: Boolean,
fallback: Expression => pb.PhysicalExprNode): Option[pb.PhysicalExprNode] = None

@enableIf(Seq("spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(Seq("spark-3.3", "spark-3.4", "spark-3.5").contains(System.getProperty("blaze.shim")))
private def convertBloomFilterAgg(agg: AggregateFunction): Option[pb.PhysicalAggExprNode] = {
import org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate
agg match {
Expand All @@ -837,7 +849,7 @@ class ShimsImpl extends Shims with Logging {
@enableIf(Seq("spark-3.0", "spark-3.1", "spark-3.2").contains(System.getProperty("blaze.shim")))
private def convertBloomFilterAgg(agg: AggregateFunction): Option[pb.PhysicalAggExprNode] = None

@enableIf(Seq("spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(Seq("spark-3.3", "spark-3.4", "spark-3.5").contains(System.getProperty("blaze.shim")))
private def convertBloomFilterMightContain(
e: Expression,
isPruningExpr: Boolean,
Expand Down Expand Up @@ -869,7 +881,9 @@ class ShimsImpl extends Shims with Logging {
case class ForceNativeExecutionWrapper(override val child: SparkPlan)
extends ForceNativeExecutionWrapperBase(child) {

@enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand All @@ -884,6 +898,8 @@ case class NativeExprWrapper(
override val nullable: Boolean)
extends NativeExprWrapperBase(nativeExpr, dataType, nullable) {

@enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = copy()
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import com.thoughtworks.enableIf

case class ConvertToNativeExec(override val child: SparkPlan) extends ConvertToNativeBase(child) {

@enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ case class NativeAggExec(
with BaseAggregateExec {

@enableIf(
Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains(
Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override val requiredChildDistributionExpressions: Option[Seq[Expression]] =
theRequiredChildDistributionExpressions

@enableIf(Seq("spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(Seq("spark-3.3", "spark-3.4", "spark-3.5").contains(System.getProperty("blaze.shim")))
override val initialInputBufferOffset: Int = theInitialInputBufferOffset

override def output: Seq[Attribute] =
Expand All @@ -65,15 +65,21 @@ case class NativeAggExec(
ExprId.apply(NativeAggBase.AGG_BUF_COLUMN_EXPR_ID))
}

@enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override def isStreaming: Boolean = false

@enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override def numShufflePartitions: Option[Int] = None

override def resultExpressions: Seq[NamedExpression] = output

@enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ case class NativeBroadcastExchangeExec(mode: BroadcastMode, override val child:
relationFuturePromise.future
}

@enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ case class NativeExpandExec(
override val child: SparkPlan)
extends NativeExpandBase(projections, output, child) {

@enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import com.thoughtworks.enableIf
case class NativeFilterExec(condition: Expression, override val child: SparkPlan)
extends NativeFilterBase(condition, child) {

@enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ case class NativeGenerateExec(
override val child: SparkPlan)
extends NativeGenerateBase(generator, requiredChildOutput, outer, generatorOutput, child) {

@enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import com.thoughtworks.enableIf
case class NativeGlobalLimitExec(limit: Long, override val child: SparkPlan)
extends NativeGlobalLimitBase(limit, child) {

@enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import com.thoughtworks.enableIf
case class NativeLocalLimitExec(limit: Long, override val child: SparkPlan)
extends NativeLocalLimitBase(limit, child) {

@enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class NativeParquetInsertIntoHiveTableExec(
ifPartitionNotExists: Boolean,
outputColumnNames: Seq[String],
metrics: Map[String, SQLMetric]): InsertIntoHiveTable = {
new BlazeInsertIntoHiveTable303(
new BlazeInsertIntoHiveTable30(
table,
partition,
query,
Expand All @@ -52,7 +52,7 @@ case class NativeParquetInsertIntoHiveTableExec(
metrics)
}

@enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(Seq("spark-3.4", "spark-3.5").contains(System.getProperty("blaze.shim")))
override protected def getInsertIntoHiveTableCommand(
table: CatalogTable,
partition: Map[String, Option[String]],
Expand All @@ -61,7 +61,7 @@ case class NativeParquetInsertIntoHiveTableExec(
ifPartitionNotExists: Boolean,
outputColumnNames: Seq[String],
metrics: Map[String, SQLMetric]): InsertIntoHiveTable = {
new BlazeInsertIntoHiveTable351(
new BlazeInsertIntoHiveTable34(
table,
partition,
query,
Expand All @@ -71,7 +71,9 @@ case class NativeParquetInsertIntoHiveTableExec(
metrics)
}

@enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
@enableIf(
Seq("spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)

Expand All @@ -82,7 +84,7 @@ case class NativeParquetInsertIntoHiveTableExec(
@enableIf(
Seq("spark-3.0", "spark-3.1", "spark-3.2", "spark-3.3").contains(
System.getProperty("blaze.shim")))
class BlazeInsertIntoHiveTable303(
class BlazeInsertIntoHiveTable30(
table: CatalogTable,
partition: Map[String, Option[String]],
query: LogicalPlan,
Expand Down Expand Up @@ -200,8 +202,8 @@ case class NativeParquetInsertIntoHiveTableExec(
}
}

@enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim")))
class BlazeInsertIntoHiveTable351(
@enableIf(Seq("spark-3.4", "spark-3.5").contains(System.getProperty("blaze.shim")))
class BlazeInsertIntoHiveTable34(
table: CatalogTable,
partition: Map[String, Option[String]],
query: LogicalPlan,
Expand Down
Loading

0 comments on commit 5fd90b4

Please sign in to comment.