Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

[WIP] Support for Spark 3.0 #85

Closed
wants to merge 15 commits into from
45 changes: 8 additions & 37 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,47 +6,18 @@ trigger:
- master

jobs:
- job: Build_2_11
displayName: 'Build sources and run unit tests for Scala 2.11'
pool:
vmImage: 'ubuntu-latest'
steps:
- script: sbt ++2.11.12 clean
displayName: 'Running $sbt clean'
- script: sbt ++2.11.12 update
displayName: 'Running $sbt update'
- script: sbt ++2.11.12 compile
displayName: 'Running $sbt compile'
- script: sbt ++2.11.12 test
displayName: 'Running $sbt test'
# If not a pull request, publish artifacts.
- ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}:
- script: sbt ++2.11.12 package
displayName: 'Running $sbt package'
- task: CopyFiles@2
displayName: 'Copy hyperspace-core JAR'
inputs:
sourceFolder: '$(Build.SourcesDirectory)/target/'
contents: '**/*.jar'
targetFolder: '$(Build.ArtifactStagingDirectory)/hyperspace-core/'
- task: PublishBuildArtifacts@1
displayName: 'Publish Hyperspace artifacts'
inputs:
artifactName: 'hyperspace-core'
pathtoPublish: '$(Build.ArtifactStagingDirectory)/hyperspace-core/'

- job: Build_2_12
displayName: 'Build sources and run unit tests for Scala 2.12'
pool:
vmImage: 'ubuntu-latest'
steps:
- script: sbt ++2.12.8 clean
- script: sbt ++2.12.10 clean
displayName: 'Running $sbt clean'
- script: sbt ++2.12.8 update
- script: sbt ++2.12.10 update
displayName: 'Running $sbt update'
- script: sbt ++2.12.8 compile
- script: sbt ++2.12.10 compile
displayName: 'Running $sbt compile'
- script: sbt ++2.12.8 test
- script: sbt ++2.12.10 test
displayName: 'Running $sbt test'
# If not a pull request, publish artifacts.
- ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}:
Expand All @@ -69,11 +40,11 @@ jobs:
pool:
vmImage: 'ubuntu-latest'
steps:
- script: sbt ++2.11.12 clean
- script: sbt ++2.12.10 clean
displayName: 'Running $sbt clean'
- script: sbt ++2.11.12 update
- script: sbt ++2.12.10 update
displayName: 'Running $sbt update'
- script: sbt ++2.11.12 compile
- script: sbt ++2.12.10 compile
displayName: 'Running $sbt compile'
- task: Bash@3
inputs:
Expand All @@ -85,4 +56,4 @@ jobs:
scriptPath: 'run-tests.py'
displayName: 'Running python tests'
env:
SPARK_HOME: $(Build.SourcesDirectory)/spark-2.4.2-bin-hadoop2.7
SPARK_HOME: $(Build.SourcesDirectory)/spark-3.0.0-bin-hadoop2.7
14 changes: 4 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,16 @@

name := "hyperspace-core"

sparkVersion := "2.4.2"
sparkVersion := "3.0.1"

lazy val scala212 = "2.12.8"
lazy val scala211 = "2.11.12"
lazy val supportedScalaVersions = List(scala212, scala211)

scalaVersion := scala212

crossScalaVersions := supportedScalaVersions
scalaVersion := "2.12.10"
imback82 marked this conversation as resolved.
Show resolved Hide resolved

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided" withSources (),
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided" withSources (),
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided" withSources (),
"io.delta" %% "delta-core" % "0.6.1" % "provided" withSources (),
"org.apache.iceberg" % "iceberg-spark-runtime" % "0.11.0" % "provided" withSources (),
"io.delta" %% "delta-core" % "0.8.0" % "provided" withSources (),
"org.apache.iceberg" % "iceberg-spark3-runtime" % "0.11.0" % "provided" withSources (),
// Test dependencies
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"org.mockito" %% "mockito-scala" % "0.4.0" % "test",
Expand Down
2 changes: 1 addition & 1 deletion script/download_spark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# A utility script for build pipeline to download and install spark binaries for
# python tests to run.

SPARK_VERSION="2.4.2"
SPARK_VERSION="3.0.0"
HADOOP_VERSION="2.7"
SPARK_DIR="spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ object DataFrameWriterExtensions {
private def runCommand(session: SparkSession)(command: LogicalPlan): Unit = {
val qe = session.sessionState.executePlan(command)
// Call `QueryExecution.toRDD` to trigger the execution of commands.
SQLExecution.withNewExecutionId(session, qe)(qe.toRdd)
SQLExecution.withNewExecutionId(qe)(qe.toRdd)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object JoinIndexRule
with HyperspaceEventLogging
with ActiveSparkSession {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case join @ Join(l, r, _, Some(condition)) if isApplicable(l, r, condition) =>
case join @ Join(l, r, _, Some(condition), _) if isApplicable(l, r, condition) =>
try {
getBestIndexPair(l, r, condition)
.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.microsoft.hyperspace.index.sources.default

import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.hyperspace.SparkHadoopUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
Expand Down Expand Up @@ -159,7 +159,7 @@ class DefaultFileBasedRelation(spark: SparkSession, override val plan: LogicalRe
.map { path =>
val hdfsPath = new Path(path)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
qualified.toString -> SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
qualified.toString -> SparkHadoopUtil.globPathIfNecessary(fs, qualified)
}
.toMap

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class DeltaLakeRelation(spark: SparkSession, override val plan: LogicalRelation)
override def allFiles: Seq[FileStatus] = plan.relation match {
case HadoopFsRelation(location: TahoeLogFileIndex, _, _, _, _, _) =>
location
.getSnapshot(stalenessAcceptable = false)
.filesForScan(projection = Nil, location.partitionFilters, keepStats = false)
.getSnapshot
.filesForScan(projection = Nil, location.partitionFilters)
Comment on lines +49 to +50
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sezruby in Delta 8.0, there were some changes in the API. Could you confirm if these changes are fine?

.files
.map { f =>
toFileStatus(f.size, f.modificationTime, new Path(location.path, f.path))
Expand All @@ -73,8 +73,8 @@ class DeltaLakeRelation(spark: SparkSession, override val plan: LogicalRelation)
plan.relation match {
case HadoopFsRelation(location: TahoeLogFileIndex, _, dataSchema, _, _, options) =>
val files = location
.getSnapshot(stalenessAcceptable = false)
.filesForScan(projection = Nil, location.partitionFilters, keepStats = false)
.getSnapshot
.filesForScan(projection = Nil, location.partitionFilters)
.files
.map { f =>
toFileStatus(f.size, f.modificationTime, new Path(location.path, f.path))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.hyperspace

import org.apache.hadoop.fs.{FileSystem, Path}

object SparkHadoopUtil {
def globPathIfNecessary(fs: FileSystem, pattern: Path): Seq[Path] = {
org.apache.spark.deploy.SparkHadoopUtil.get.globPathIfNecessary(fs, pattern)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,6 @@ trait HyperspaceSuite extends SparkFunSuite with SparkInvolvedSuite {
}
}

/**
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
* returns. This is copied from SparkFunSuite.scala in Spark 3.0.
*
* TODO: This can be removed when we support Spark 3.0.
*/
protected def withTempDir(f: File => Unit): Unit = {
val dir = Utils.createTempDir()
try f(dir)
finally {
Utils.deleteRecursively(dir)
}
}

protected def withTempPathAsString(f: String => Unit): Unit = {
// The following is from SQLHelper.withTempPath with a modification to pass
// String instead of File to "f". The reason this is copied instead of extending
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class IndexSignatureProviderTest extends SparkFunSuite with SparkInvolvedSuite {
t2Schema)

val joinCondition = EqualTo(t1c3, t2c2)
val joinNode = Join(r1, r2, JoinType("inner"), Some(joinCondition))
val joinNode = Join(r1, r2, JoinType("inner"), Some(joinCondition), JoinHint.NONE)

val filterCondition = And(EqualTo(t1c1, Literal("ABC")), IsNotNull(t1c1))
val filterNode = Filter(filterCondition, joinNode)
Expand Down
Loading