From 93cd8c844dee424c782b986308e867308dc98c66 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 8 Jul 2020 21:47:28 -0700 Subject: [PATCH 1/8] initial commit --- build.sbt | 10 +- .../index/DataFrameWriterExtensions.scala | 2 +- .../hyperspace/index/IndexLogEntry.scala | 22 ++ .../index/rules/JoinIndexRule.scala | 2 +- .../index/serde/LogicalPlanSerDeUtils.scala | 6 +- .../hyperspace/index/serde/package.scala | 4 +- .../hyperspace/index/IndexLogEntryTest.scala | 5 +- .../index/IndexLogManagerImplTest.scala | 2 +- .../hyperspace/index/IndexTests.scala | 5 +- .../index/LogicalPlanSerDeTests.scala | 9 +- .../index/plananalysis/ExplainTest.scala | 240 ++++++++++-------- .../index/rules/JoinIndexRuleTest.scala | 59 +++-- .../hyperspace/util/JsonUtilsTests.scala | 5 +- 13 files changed, 217 insertions(+), 154 deletions(-) diff --git a/build.sbt b/build.sbt index a1333612d..c5d46722a 100644 --- a/build.sbt +++ b/build.sbt @@ -16,15 +16,9 @@ name := "hyperspace-core" -lazy val scala212 = "2.12.8" -lazy val scala211 = "2.11.12" -lazy val supportedScalaVersions = List(scala212, scala211) +lazy val sparkVersion = "3.0.0" -lazy val sparkVersion = "2.4.2" - -scalaVersion := scala212 - -crossScalaVersions := supportedScalaVersions +scalaVersion := "2.12.10" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % sparkVersion % "provided" withSources(), diff --git a/src/main/scala/com/microsoft/hyperspace/index/DataFrameWriterExtensions.scala b/src/main/scala/com/microsoft/hyperspace/index/DataFrameWriterExtensions.scala index 152c86ef8..e520bb78f 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/DataFrameWriterExtensions.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/DataFrameWriterExtensions.scala @@ -74,7 +74,7 @@ object DataFrameWriterExtensions { private def runCommand(session: SparkSession)(command: LogicalPlan): Unit = { val qe = session.sessionState.executePlan(command) // Call `QueryExecution.toRDD` to trigger the execution of commands. - SQLExecution.withNewExecutionId(session, qe)(qe.toRdd) + SQLExecution.withNewExecutionId(qe)(qe.toRdd) } } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 5e676f870..785f76819 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -20,6 +20,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types.{DataType, StructType} +import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index.serde.LogicalPlanSerDeUtils @@ -60,7 +61,28 @@ object LogicalPlanFingerprint { // IndexLogEntry-specific SparkPlan that represents the source plan. case class SparkPlan(properties: SparkPlan.Properties) { val kind = "Spark" + + override def equals(o: Any): Boolean = o match { + case that: SparkPlan => + val isPlanEqual = if (!properties.rawPlan.isEmpty) { + val sparkSession = SparkSession.getActiveSession.getOrElse { + throw HyperspaceException("Could not find active SparkSession.") + } + val thisPlan = LogicalPlanSerDeUtils.deserialize(properties.rawPlan, sparkSession) + val thatPlan = LogicalPlanSerDeUtils.deserialize(that.properties.rawPlan, sparkSession) + thisPlan.fastEquals(thatPlan) + } else { + true + } + isPlanEqual && properties.fingerprint.equals(that.properties.fingerprint) + case _ => false + } + + override def hashCode(): Int = { + properties.fingerprint.hashCode + } } + object SparkPlan { case class Properties(rawPlan: String, fingerprint: LogicalPlanFingerprint) } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala index fb3d0f43b..67c1ef171 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/JoinIndexRule.scala @@ -53,7 +53,7 @@ import com.microsoft.hyperspace.index.rankers.JoinIndexRanker */ object JoinIndexRule extends Rule[LogicalPlan] with Logging { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case join @ Join(l, r, _, Some(condition)) if isApplicable(l, r, condition) => + case join @ Join(l, r, _, Some(condition), _) if isApplicable(l, r, condition) => try { getUsableIndexPair(l, r, condition) .map { diff --git a/src/main/scala/com/microsoft/hyperspace/index/serde/LogicalPlanSerDeUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/serde/LogicalPlanSerDeUtils.scala index 91542a06b..f5fb237d7 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/serde/LogicalPlanSerDeUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/serde/LogicalPlanSerDeUtils.scala @@ -93,8 +93,7 @@ object LogicalPlanSerDeUtils { e.function, e.dataType, e.children, - e.inputsNullSafe, - e.inputTypes, + e.inputEncoders, e.udfName, e.nullable, e.udfDeterministic) @@ -206,8 +205,7 @@ object LogicalPlanSerDeUtils { e.function, e.dataType, e.children, - e.inputsNullSafe, - e.inputTypes, + e.inputEncoders, e.udfName, e.nullable, e.udfDeterministic) diff --git a/src/main/scala/com/microsoft/hyperspace/index/serde/package.scala b/src/main/scala/com/microsoft/hyperspace/index/serde/package.scala index 8c125fe86..b23503bb5 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/serde/package.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/serde/package.scala @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce.Job import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, ExprId, Predicate, Unevaluable} import org.apache.spark.sql.catalyst.plans.logical.{BinaryNode, LeafNode, LogicalPlan} import org.apache.spark.sql.execution.FileRelation @@ -103,8 +104,7 @@ package object serde { function: AnyRef, dataType: DataType, children: Seq[Expression], - inputsNullSafe: Seq[Boolean], - inputTypes: Seq[DataType], + inputEncoders: Seq[Option[ExpressionEncoder[_]]], udfName: Option[String], nullable: Boolean, udfDeterministic: Boolean) diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index 0860f3392..b006d35b4 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -19,9 +19,10 @@ package com.microsoft.hyperspace.index import org.apache.spark.SparkFunSuite import org.apache.spark.sql.types.{StringType, StructField, StructType} +import com.microsoft.hyperspace.SparkInvolvedSuite import com.microsoft.hyperspace.util.JsonUtils -class IndexLogEntryTest extends SparkFunSuite { +class IndexLogEntryTest extends SparkFunSuite with SparkInvolvedSuite { test("IndexLogEntry spec example") { val schemaString = """{\"type\":\"struct\", @@ -53,7 +54,7 @@ class IndexLogEntryTest extends SparkFunSuite { | "plan" : { | "kind" : "Spark", | "properties" : { - | "rawPlan" : "planString", + | "rawPlan" : "", | "fingerprint" : { | "kind" : "LogicalPlan", | "properties" : { diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogManagerImplTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogManagerImplTest.scala index 06e567a2c..f9635d612 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogManagerImplTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogManagerImplTest.scala @@ -44,7 +44,7 @@ class IndexLogManagerImplTest extends SparkFunSuite with SparkInvolvedSuite with Source( SparkPlan( SparkPlan.Properties( - rawPlan = "spark plan", + rawPlan = "", LogicalPlanFingerprint( LogicalPlanFingerprint.Properties(Seq(Signature("provider", "signature")))))), Seq( diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexTests.scala index 12850ff17..7a6e89870 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexTests.scala @@ -19,9 +19,10 @@ package com.microsoft.hyperspace.index import org.apache.spark.SparkFunSuite import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import com.microsoft.hyperspace.SparkInvolvedSuite import com.microsoft.hyperspace.actions.Constants -class IndexTests extends SparkFunSuite { +class IndexTests extends SparkFunSuite with SparkInvolvedSuite { val indexConfig1 = IndexConfig("myIndex1", Array("id"), Seq("name")) val indexConfig2 = IndexConfig("myIndex2", Array("id"), Seq("school")) @@ -31,7 +32,7 @@ class IndexTests extends SparkFunSuite { schema: StructType, numBuckets: Int): IndexLogEntry = { val sourcePlanProperties = SparkPlan.Properties( - "plan", + "", LogicalPlanFingerprint( LogicalPlanFingerprint.Properties(Seq(Signature("signatureProvider", "dfSignature"))))) val sourceDataProperties = diff --git a/src/test/scala/com/microsoft/hyperspace/index/LogicalPlanSerDeTests.scala b/src/test/scala/com/microsoft/hyperspace/index/LogicalPlanSerDeTests.scala index a6a6d4621..638649aca 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/LogicalPlanSerDeTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/LogicalPlanSerDeTests.scala @@ -151,7 +151,7 @@ class LogicalPlanSerDeTests extends SparkFunSuite with SparkInvolvedSuite { } test("Serde query with scala UDF.") { - val intUdf = ScalaUDF(null, IntegerType, Literal(1) :: Nil, true :: Nil) + val intUdf = ScalaUDF(null, IntegerType, Literal(1) :: Nil) val plan = Filter(intUdf, scanNode) verifyPlanSerde(plan, "scalaUdf.plan") } @@ -179,7 +179,12 @@ class LogicalPlanSerDeTests extends SparkFunSuite with SparkInvolvedSuite { test("Serde query with join.") { val joinCondition = EqualTo(c1, c2) - val plan = Join(singleTablePlan, singleTablePlan, JoinType("inner"), Some(joinCondition)) + val plan = Join( + singleTablePlan, + singleTablePlan, + JoinType("inner"), + Some(joinCondition), + JoinHint.NONE) verifyPlanSerde(plan, "join.plan") } diff --git a/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala b/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala index 6c4a2e0e6..1642c7bac 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.SparkFunSuite import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.internal.SQLConf import com.microsoft.hyperspace.{Hyperspace, Implicits} import com.microsoft.hyperspace.index.{HyperspaceSuite, IndexConfig, IndexConstants} @@ -37,6 +38,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { val sparkSession = spark spark.conf.set(IndexConstants.INDEX_SYSTEM_PATH, indexStorageLocation) spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) + spark.sessionState.conf.setConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING, true) import sparkSession.implicits._ hyperspace = new Hyperspace(sparkSession) @@ -51,6 +53,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { override def afterAll(): Unit = { fileSystem.delete(new Path(sampleParquetDataLocation), true) + spark.sessionState.conf.unsetConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING) super.afterAll() } @@ -76,50 +79,58 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { // The format of the explain output looks as follows: // scalastyle:off filelinelengthchecker /** - *============================================================= - *Plan with indexes: - *============================================================= - * SortMergeJoin [Col1#11], [Col1#21], Inner - * <----:- *(1) Project [Col1#11, Col2#12]----> - * <----: +- *(1) Filter isnotnull(Col1#11)----> - * <----: +- *(1) FileScan parquet [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/indexLocation/joinIndex/v__=0], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200----> - * <----+- *(2) Project [Col1#21, Col2#22]----> - * <----+- *(2) Filter isnotnull(Col1#21)----> - * <----+- *(2) FileScan parquet [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/indexLocation/joinIndex/v__=0], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200----> + * ============================================================= + * Plan with indexes: + * ============================================================= + * SortMergeJoin [Col1#13], [Col1#23], Inner + * <----:- *(1) Project [Col1#13, Col2#14]----> + * <----: +- *(1) Filter isnotnull(Col1#13)----> + * <----: +- *(1) ColumnarToRow----> + * <----: +- FileScan parquet [Col1#13,Col2#14] Batched: true, DataFilters: [isnotnull(Col1#13)], Format: Parquet, Location: InMemoryFileIndex[src/test/resources/indexLocation/joinIndex/v__=0], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200----> + * <----+- *(2) Project [Col1#23, Col2#24]----> + * <----+- *(2) Filter isnotnull(Col1#23)----> + * <----+- *(2) ColumnarToRow----> + * <----+- FileScan parquet [Col1#23,Col2#24] Batched: true, DataFilters: [isnotnull(Col1#23)], Format: Parquet, Location: InMemoryFileIndex[src/test/resources/indexLocation/joinIndex/v__=0], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200----> * - *============================================================= - *Plan without indexes: - *============================================================= - * SortMergeJoin [Col1#11], [Col1#21], Inner - * <----:- *(2) Sort [Col1#11 ASC NULLS FIRST], false, 0----> - * <----: +- Exchange hashpartitioning(Col1#11, 200)----> - * <----: +- *(1) Project [Col1#11, Col2#12]----> - * <----: +- *(1) Filter isnotnull(Col1#11)----> - * <----: +- *(1) FileScan parquet [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: InMemoryFileIndex[src/test/resources/sampleparquet], PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct----> - * <----+- *(4) Sort [Col1#21 ASC NULLS FIRST], false, 0----> - * <----+- ReusedExchange [Col1#21, Col2#22], Exchange hashpartitioning(Col1#11, 200)----> + * ============================================================= + * Plan without indexes: + * ============================================================= + * SortMergeJoin [Col1#13], [Col1#23], Inner + * <----:- *(2) Sort [Col1#13 ASC NULLS FIRST], false, 0----> + * <----: +- Exchange hashpartitioning(Col1#13, 200), true, [id=#68]----> + * <----: +- *(1) Project [Col1#13, Col2#14]----> + * <----: +- *(1) Filter isnotnull(Col1#13)----> + * <----: +- *(1) ColumnarToRow----> + * <----: +- FileScan parquet [Col1#13,Col2#14] Batched: true, DataFilters: [isnotnull(Col1#13)], Format: Parquet, Location: InMemoryFileIndex[src/test/resources/samplepa..., PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct----> + * <----+- *(4) Sort [Col1#23 ASC NULLS FIRST], false, 0----> + * <----+- ReusedExchange [Col1#23, Col2#24], Exchange hashpartitioning(Col1#13, 200), true, [id=#68]----> * - *============================================================= - *Indexes used: - *============================================================= - *joinIndex:src/test/resources/indexLocation/joinIndex/v__=0 + * ============================================================= + * Indexes used: + * ============================================================= + * joinIndex:src/test/resources/indexLocation/joinIndex/v__=0 * * ============================================================= * Physical operator stats: * ============================================================= - * +------------------+-------------------+------------------+----------+ - * | Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference| - * +------------------+-------------------+------------------+----------+ - * | *Filter| 1| 2| 1| - * | *InputAdapter| 4| 2| -2| - * | *Project| 1| 2| 1| - * | *ReusedExchange| 1| 0| -1| - * | *Scan parquet| 1| 2| 1| - * | *ShuffleExchange| 1| 0| -1| - * | *Sort| 2| 0| -2| - * |*WholeStageCodegen| 4| 3| -1| - * | SortMergeJoin| 1| 1| 0| - * +------------------+-------------------+------------------+----------+ + * +----------------------+-------------------+------------------+----------+ + * | Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference| + * +----------------------+-------------------+------------------+----------+ + * | *ColumnarToRow| 1| 2| 1| + * | *Filter| 1| 2| 1| + * | *InputAdapter| 5| 4| -1| + * | *Project| 1| 2| 1| + * | *ReusedExchange| 1| 0| -1| + * | *Scan parquet| 1| 2| 1| + * | *ShuffleExchange| 1| 0| -1| + * | *Sort| 2| 0| -2| + * |*WholeStageCodegen (3)| 0| 1| 1| + * |*WholeStageCodegen (4)| 1| 0| -1| + * |*WholeStageCodegen (5)| 1| 0| -1| + * | SortMergeJoin| 1| 1| 0| + * | WholeStageCodegen (1)| 1| 1| 0| + * | WholeStageCodegen (2)| 1| 1| 0| + * +----------------------+-------------------+------------------+----------+ */ // scalastyle:on filelinelengthchecker @@ -139,7 +150,9 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append("<----: +- *(1) Filter isnotnull(Col1#11)---->") .append(defaultDisplayMode.newLine) - .append(s"<----: +- *(1) FileScan parquet [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " + + .append("<----: +- *(1) ColumnarToRow---->") + .append(defaultDisplayMode.newLine) + .append("<----: +- FileScan parquet [Col1#11,Col2#12] Batched: true, DataFilters: [isnotnull(Col1#)], Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$joinIndexPath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200---->") .append(defaultDisplayMode.newLine) @@ -147,7 +160,9 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append(" <----+- *(2) Filter isnotnull(Col1#21)---->") .append(defaultDisplayMode.newLine) - .append(s" <----+- *(2) FileScan parquet [Col1#21,Col2#22] Batched: true, Format: Parquet, Location: " + + .append(" <----+- *(2) ColumnarToRow---->") + .append(defaultDisplayMode.newLine) + .append(" <----+- FileScan parquet [Col1#21,Col2#22] Batched: true, DataFilters: [isnotnull(Col1#)], Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$joinIndexPath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200---->") .append(defaultDisplayMode.newLine) @@ -162,19 +177,21 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append("<----:- *(2) Sort [Col1#11 ASC NULLS FIRST], false, 0---->") .append(defaultDisplayMode.newLine) - .append("<----: +- Exchange hashpartitioning(Col1#11, 200)---->") + .append("<----: +- Exchange hashpartitioning(Col1#11, 200), true, [id=#]---->") .append(defaultDisplayMode.newLine) .append("<----: +- *(1) Project [Col1#11, Col2#12]---->") .append(defaultDisplayMode.newLine) .append("<----: +- *(1) Filter isnotnull(Col1#11)---->") .append(defaultDisplayMode.newLine) - .append(s"<----: +- *(1) FileScan parquet [Col1#11,Col2#12] Batched: true, Format: Parquet, Location: " + + .append("<----: +- *(1) ColumnarToRow---->") + .append(defaultDisplayMode.newLine) + .append("<----: +- FileScan parquet [Col1#11,Col2#12] Batched: true, DataFilters: [isnotnull(Col1#)], Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$sampleParquetDataFullPath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct---->") .append(defaultDisplayMode.newLine) .append("<----+- *(4) Sort [Col1#21 ASC NULLS FIRST], false, 0---->") .append(defaultDisplayMode.newLine) - .append(" <----+- ReusedExchange [Col1#21, Col2#22], Exchange hashpartitioning(Col1#11, 200)---->") + .append(" <----+- ReusedExchange [Col1#21, Col2#22], Exchange hashpartitioning(Col1#11, 200), true, [id=#]---->") .append(defaultDisplayMode.newLine) .append(defaultDisplayMode.newLine) .append("=============================================================") @@ -192,31 +209,41 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(defaultDisplayMode.newLine) .append("=============================================================") .append(defaultDisplayMode.newLine) - .append("+------------------+-------------------+------------------+----------+") + .append("+----------------------+-------------------+------------------+----------+") + .append(defaultDisplayMode.newLine) + .append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|") + .append(defaultDisplayMode.newLine) + .append("+----------------------+-------------------+------------------+----------+") + .append(defaultDisplayMode.newLine) + .append("| *ColumnarToRow| 1| 2| 1|") .append(defaultDisplayMode.newLine) - .append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|") + .append("| *Filter| 1| 2| 1|") .append(defaultDisplayMode.newLine) - .append("+------------------+-------------------+------------------+----------+") + .append("| *InputAdapter| 5| 4| -1|") .append(defaultDisplayMode.newLine) - .append("| *Filter| 1| 2| 1|") + .append("| *Project| 1| 2| 1|") .append(defaultDisplayMode.newLine) - .append("| *InputAdapter| 4| 2| -2|") + .append("| *ReusedExchange| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| *Project| 1| 2| 1|") + .append("| *Scan parquet| 1| 2| 1|") .append(defaultDisplayMode.newLine) - .append("| *ReusedExchange| 1| 0| -1|") + .append("| *ShuffleExchange| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| *Scan parquet| 1| 2| 1|") + .append("| *Sort| 2| 0| -2|") .append(defaultDisplayMode.newLine) - .append("| *ShuffleExchange| 1| 0| -1|") + .append("|*WholeStageCodegen (3)| 0| 1| 1|") .append(defaultDisplayMode.newLine) - .append("| *Sort| 2| 0| -2|") + .append("|*WholeStageCodegen (4)| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("|*WholeStageCodegen| 4| 3| -1|") + .append("|*WholeStageCodegen (5)| 1| 0| -1|") .append(defaultDisplayMode.newLine) - .append("| SortMergeJoin| 1| 1| 0|") + .append("| SortMergeJoin| 1| 1| 0|") .append(defaultDisplayMode.newLine) - .append("+------------------+-------------------+------------------+----------+") + .append("| WholeStageCodegen (1)| 1| 1| 0|") + .append(defaultDisplayMode.newLine) + .append("| WholeStageCodegen (2)| 1| 1| 0|") + .append(defaultDisplayMode.newLine) + .append("+----------------------+-------------------+------------------+----------+") .append(defaultDisplayMode.newLine) .append(defaultDisplayMode.newLine) // scalastyle:on filelinelengthchecker @@ -297,36 +324,29 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(displayMode.newLine) .append("Project [Col1#135]") .append(displayMode.newLine) - .append("+- Filter (isnotnull(Col1#135) && (Col1#135 = Subquery subquery145))") + .append("+- Filter (isnotnull(Col1#13) AND (Col1#13 = Subquery scalar-subquery#23, [id=#83]))") + .append(displayMode.newLine) + .append(" : +- Subquery scalar-subquery#23, [id=#83]") .append(displayMode.newLine) - .append(" : +- Subquery subquery145") + .append(" : +- *(1) Project [Col1#13]") .append(displayMode.newLine) - .append(" : +- *(1) Project [Col1#135]") + .append(" : +- *(1) Filter (isnotnull(Col2#14) AND (Col2#14 = 1))") .append(displayMode.newLine) - .append(" : +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1))") + .append(" : +- *(1) ColumnarToRow") .append(displayMode.newLine) - .append(" <----: +- *(1) FileScan parquet [Col2#136,Col1#135]") - .append(" Batched: true, Format: Parquet, Location: " + + .append(" <----: +- FileScan parquet [Col2#136,Col1#135]") + .append(" Batched: true, DataFilters: [isnotnull(Col2#14), (Col2#14 = 1)], Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ") .append("ReadSchema: struct---->") .append(displayMode.newLine) - .append(" +- FileScan parquet [Col1#135] Batched: true, Format: Parquet, Location: " + + .append(" +- ColumnarToRow") + .append(displayMode.newLine) + .append(" +- FileScan parquet [Col1#135] Batched: true, DataFilters: [isnotnull(Col1#13)], " + + "Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$sampleParquetDataFullPath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct") .append(displayMode.newLine) - .append(" +- Subquery subquery145") - .append(displayMode.newLine) - .append(" +- *(1) Project [Col1#135]") - .append(displayMode.newLine) - .append(" +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1))") - .append(displayMode.newLine) - .append(" <----+- *(1) FileScan parquet [Col2#136,Col1#135] " + - "Batched: true, Format: Parquet, Location: " + - truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]") + - ", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ") - .append("ReadSchema: struct---->") - .append(displayMode.newLine) .append(displayMode.newLine) .append("=============================================================") .append(displayMode.newLine) @@ -336,37 +356,29 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(displayMode.newLine) .append("Project [Col1#135]") .append(displayMode.newLine) - .append("+- Filter (isnotnull(Col1#135) && (Col1#135 = Subquery subquery145))") + .append("+- Filter (isnotnull(Col1#13) AND (Col1#13 = Subquery scalar-subquery#23, [id=#53]))") .append(displayMode.newLine) - .append(" : +- Subquery subquery145") + .append(" : +- Subquery scalar-subquery#23, [id=#53]") .append(displayMode.newLine) - .append(" : +- *(1) Project [Col1#135]") + .append(" : +- *(1) Project [Col1#13]") .append(displayMode.newLine) - .append(" : +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1))") + .append(" : +- *(1) Filter (isnotnull(Col2#14) AND (Col2#14 = 1))") .append(displayMode.newLine) - .append(" <----: +- *(1) FileScan parquet [Col1#135,Col2#136] Batched: true, " + - "Format: Parquet, Location: " + + .append(" : +- *(1) ColumnarToRow") + .append(displayMode.newLine) + .append(" <----: +- FileScan parquet [Col1#135,Col2#136] Batched: true, " + + "DataFilters: [isnotnull(Col2#14), (Col2#14 = 1)], Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$sampleParquetDataFullPath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ") .append("ReadSchema: struct---->") .append(displayMode.newLine) - .append(" +- FileScan parquet [Col1#135] Batched: true, Format: Parquet, Location: " + + .append(" +- ColumnarToRow") + .append(displayMode.newLine) + .append(" +- FileScan parquet [Col1#135] Batched: true, DataFilters: [isnotnull(Col1#13)], " + + "Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[$sampleParquetDataFullPath]") + ", PartitionFilters: [], PushedFilters: [IsNotNull(Col1)], ReadSchema: struct") .append(displayMode.newLine) - .append(" +- Subquery subquery145") - .append(displayMode.newLine) - .append(" +- *(1) Project [Col1#135]") - .append(displayMode.newLine) - .append(" +- *(1) Filter (isnotnull(Col2#136) && (Col2#136 = 1))") - .append(displayMode.newLine) - .append( - " <----+- *(1) FileScan parquet [Col1#135,Col2#136] Batched: true, " + - "Format: Parquet, Location: ") - .append(truncate("InMemoryFileIndex[" + sampleParquetDataFullPath + "]") + - ", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], " + - "ReadSchema: struct---->") - .append(displayMode.newLine) .append(displayMode.newLine) .append("=============================================================") .append(displayMode.newLine) @@ -383,21 +395,25 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(displayMode.newLine) .append("=============================================================") .append(displayMode.newLine) - .append("+-----------------+-------------------+------------------+----------+") + .append("+---------------------+-------------------+------------------+----------+") .append(displayMode.newLine) - .append("|Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|") + .append("| Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|") .append(displayMode.newLine) - .append("+-----------------+-------------------+------------------+----------+") + .append("+---------------------+-------------------+------------------+----------+") .append(displayMode.newLine) - .append("| Filter| 1| 1| 0|") + .append("| ColumnarToRow| 1| 1| 0|") .append(displayMode.newLine) - .append("| Project| 1| 1| 0|") + .append("| Filter| 1| 1| 0|") .append(displayMode.newLine) - .append("| Scan parquet| 1| 1| 0|") + .append("| InputAdapter| 1| 1| 0|") .append(displayMode.newLine) - .append("|WholeStageCodegen| 1| 1| 0|") + .append("| Project| 1| 1| 0|") .append(displayMode.newLine) - .append("+-----------------+-------------------+------------------+----------+") + .append("| Scan parquet| 1| 1| 0|") + .append(displayMode.newLine) + .append("|WholeStageCodegen (1)| 1| 1| 0|") + .append(displayMode.newLine) + .append("+---------------------+-------------------+------------------+----------+") .append(displayMode.newLine) .append(displayMode.newLine) // scalastyle:on filelinelengthchecker @@ -475,10 +491,13 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(displayMode.newLine) .append("Project [Col1#]") .append(displayMode.newLine) - .append("+- Filter (isnotnull(Col2#) && (Col2# = 2))") + .append("+- Filter (isnotnull(Col2#) AND (Col2# = 2))") + .append(displayMode.newLine) + .append(" +- ColumnarToRow") .append(displayMode.newLine) - .append(" " + displayMode.highlightTag.open ++ "+- FileScan parquet [Col2#,Col1#] ") - .append("Batched: true, Format: Parquet, Location: " + + .append(" " + displayMode.highlightTag.open ++ "+- FileScan parquet [Col2#,Col1#] ") + .append("Batched: true, DataFilters: [isnotnull(Col2#14), (Col2#14 = 2)], " + + "Format: Parquet, Location: " + truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]")) .append(", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ") .append("ReadSchema: struct" + displayMode.highlightTag.close) @@ -492,10 +511,13 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite { .append(displayMode.newLine) .append("Project [Col1#]") .append(displayMode.newLine) - .append("+- Filter (isnotnull(Col2#) && (Col2# = 2))") + .append("+- Filter (isnotnull(Col2#) AND (Col2# = 2))") + .append(displayMode.newLine) + .append(" +- ColumnarToRow") .append(displayMode.newLine) - .append(" " + displayMode.highlightTag.open + "+- FileScan parquet [Col1#,Col2#] ") - .append("Batched: true, Format: Parquet, Location: ") + .append(" " + displayMode.highlightTag.open + "+- FileScan parquet [Col1#,Col2#] ") + .append("Batched: true, DataFilters: [isnotnull(Col2#14), (Col2#14 = 2)], ") + .append("Format: Parquet, Location: ") // Note: The below conversion converts relative path to absolute path for comparison. .append(truncate(s"InMemoryFileIndex[$sampleParquetDataFullPath]") + ", ") .append("PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ") diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala index 4680d53a7..a05678847 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import com.microsoft.hyperspace.actions.Constants @@ -117,7 +118,8 @@ class JoinIndexRuleTest extends HyperspaceSuite { test("Join rule works if indexes exist and configs are set correctly") { val joinCondition = EqualTo(t1c1, t2c1) - val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val originalPlan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) val updatedPlan = JoinIndexRule(originalPlan) assert(!updatedPlan.equals(originalPlan)) @@ -129,34 +131,38 @@ class JoinIndexRuleTest extends HyperspaceSuite { spark.conf.unset(IndexConstants.INDEX_SYSTEM_PATH) val joinCondition = EqualTo(t1c1, t2c1) - val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val originalPlan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) val updatedPlan = JoinIndexRule(originalPlan) assert(updatedPlan.equals(originalPlan)) } test("Join rule does not update plan if join condition does not exist") { - val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), None) + val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), None, JoinHint.NONE) val updatedPlan = JoinIndexRule(originalPlan) assert(updatedPlan.equals(originalPlan)) } test("Join rule does not update plan if join condition is not equality") { val joinCondition = GreaterThan(t1c1, t2c1) - val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val originalPlan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) val updatedPlan = JoinIndexRule(originalPlan) assert(updatedPlan.equals(originalPlan)) } test("Join rule does not update plan if join condition contains And or Or") { val joinCondition = And(EqualTo(t1c1, t2c1), EqualTo(t1c2, t2c2)) - val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val originalPlan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) val updatedPlan = JoinIndexRule(originalPlan) assert(updatedPlan.equals(originalPlan)) } test("Join rule does not update plan if join condition contains Literals") { val joinCondition = EqualTo(t1c2, Literal(10, IntegerType)) - val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val originalPlan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) val updatedPlan = JoinIndexRule(originalPlan) assert(updatedPlan.equals(originalPlan)) } @@ -170,7 +176,8 @@ class JoinIndexRuleTest extends HyperspaceSuite { // Index exists with t1c2 as indexed columns but not for t2c2. Plan should not update val joinCondition = EqualTo(t1c2, t2c2) - val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val originalPlan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) val updatedPlan = JoinIndexRule(originalPlan) assert(updatedPlan.equals(originalPlan)) } @@ -182,7 +189,8 @@ class JoinIndexRuleTest extends HyperspaceSuite { val t1ProjectNode = Project(Seq(t1c1, t1c4), t1FilterNode) val t2ProjectNode = Project(Seq(t2c1, t2c4), t2FilterNode) val joinCondition = EqualTo(t1c1, t2c1) - val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val originalPlan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) // The plan requires t1c4 and t4c4 columns for projection. These columns are not part of any // index. Since no index satisfies the requirement, the plan should not change. @@ -199,7 +207,8 @@ class JoinIndexRuleTest extends HyperspaceSuite { // t1c1, t1c2, t1c3, t1c4, t2c1, t2c2, t2c3, t2c4. // The below query is same as // SELECT * FROM T1, T2 WHERE T1.C1 = T2.C1 - val originalPlan = Join(t1FilterNode, t2FilterNode, JoinType("inner"), Some(joinCondition)) + val originalPlan = + Join(t1FilterNode, t2FilterNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) { // Test: should not update plan if no index exist to cover all implicit columns @@ -234,7 +243,8 @@ class JoinIndexRuleTest extends HyperspaceSuite { val t1ProjectNode = Project(Seq(t1c1Alias, t1c3), t1FilterNode) val joinCondition = EqualTo(t1c1Alias.toAttribute, t2c1) - val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val originalPlan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) val updatedPlan = JoinIndexRule(originalPlan) assert(updatedPlan.equals(originalPlan)) } @@ -254,7 +264,7 @@ class JoinIndexRuleTest extends HyperspaceSuite { // Here, join condition contains a column from a LocalRelation and one from a LogicalRelation val joinCondition = EqualTo(t1c1, localCol1) val originalPlan = - Join(t1ProjectNode, localProjectNode, JoinType("inner"), Some(joinCondition)) + Join(t1ProjectNode, localProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) val updatedPlan = JoinIndexRule(originalPlan) assert(updatedPlan.equals(originalPlan)) @@ -268,7 +278,8 @@ class JoinIndexRuleTest extends HyperspaceSuite { // FROM t1, t2 // WHERE t1c1 = t2c1 and t1c2 = t2c2 val joinCondition = And(EqualTo(t1c1, t2c1), EqualTo(t1c2, t2c2)) - val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val originalPlan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) val updatedPlan = JoinIndexRule(originalPlan) assert(!updatedPlan.equals(originalPlan)) @@ -285,7 +296,8 @@ class JoinIndexRuleTest extends HyperspaceSuite { // WHERE t1c2 = t2c2 and t1c1 = t2c1 >> order of predicates changed. The rule should make sure // if any usable index can be found irrespective of order of predicates val joinCondition = And(EqualTo(t1c2, t2c2), EqualTo(t1c1, t2c1)) - val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val originalPlan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) val updatedPlan = JoinIndexRule(originalPlan) assert(!updatedPlan.equals(originalPlan)) @@ -301,7 +313,8 @@ class JoinIndexRuleTest extends HyperspaceSuite { // FROM t1, t2 // WHERE t1c1 = t2c1 and t2c2 = t1c2 >> Swapped order of query columns val joinCondition = And(EqualTo(t1c1, t2c1), EqualTo(t2c2, t1c2)) - val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val originalPlan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) val updatedPlan = JoinIndexRule(originalPlan) assert(!updatedPlan.equals(originalPlan)) @@ -319,7 +332,7 @@ class JoinIndexRuleTest extends HyperspaceSuite { // WHERE t1c1 = t2c1 and t1c1 = t2c2 >> t1c1 compared against both t2c1 and t2c2 val joinCondition = And(EqualTo(t1c1, t2c1), EqualTo(t1c1, t2c2)) val originalPlan = - Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) val updatedPlan = JoinIndexRule(originalPlan) assert(updatedPlan.equals(originalPlan)) } @@ -329,7 +342,7 @@ class JoinIndexRuleTest extends HyperspaceSuite { // WHERE t1c1 = t2c1 and t1c2 = t2c1 >> t2c1 compared against both t1c1 and t1c2 val joinCondition = And(EqualTo(t1c1, t2c1), EqualTo(t1c2, t2c1)) val originalPlan = - Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) val updatedPlan = JoinIndexRule(originalPlan) assert(updatedPlan.equals(originalPlan)) } @@ -343,7 +356,8 @@ class JoinIndexRuleTest extends HyperspaceSuite { // FROM t1, t2 // WHERE t1c1 = t2c1 and t1c1 = t2c2 and t1c1 = t2c1 >> one predicate repeated twice val joinCondition = And(And(EqualTo(t1c1, t2c1), EqualTo(t1c2, t2c2)), EqualTo(t1c1, t2c1)) - val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val originalPlan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) val updatedPlan = JoinIndexRule(originalPlan) assert(!updatedPlan.equals(originalPlan)) @@ -359,7 +373,8 @@ class JoinIndexRuleTest extends HyperspaceSuite { // FROM t1, t2 // WHERE t1c1 = t1c2 and t1c1 = t2c2 >> two columns of t1 compared against each other val joinCondition = And(EqualTo(t1c1, t1c2), EqualTo(t1c2, t2c2)) - val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val originalPlan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) val updatedPlan = JoinIndexRule(originalPlan) assert(updatedPlan.equals(originalPlan)) } @@ -374,7 +389,8 @@ class JoinIndexRuleTest extends HyperspaceSuite { val t2c1Qualified = t2c1.copy()(t2c1.exprId, Seq("Table2")) val joinCondition = EqualTo(t1c1Qualified, t2c1Qualified) - val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val originalPlan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) val updatedPlan = JoinIndexRule(originalPlan) assert(!updatedPlan.equals(originalPlan)) @@ -403,7 +419,10 @@ class JoinIndexRuleTest extends HyperspaceSuite { case _: LogicalRelation => plan2(i).isInstanceOf[LogicalRelation] // for other node types, we compare exact matching between original and updated plans - case node => node.simpleString.equals(plan2(i).simpleString) + case node => + // The default is 25, and should be big enough for this purpose. + val maxFields = SQLConf.get.maxToStringFields + node.simpleString(maxFields).equals(plan2(i).simpleString(maxFields)) } } } else { diff --git a/src/test/scala/com/microsoft/hyperspace/util/JsonUtilsTests.scala b/src/test/scala/com/microsoft/hyperspace/util/JsonUtilsTests.scala index 60236cb42..e1a1b0d6f 100644 --- a/src/test/scala/com/microsoft/hyperspace/util/JsonUtilsTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/util/JsonUtilsTests.scala @@ -19,10 +19,11 @@ package com.microsoft.hyperspace.util import org.apache.spark.SparkFunSuite import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import com.microsoft.hyperspace.SparkInvolvedSuite import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index._ -class JsonUtilsTests extends SparkFunSuite { +class JsonUtilsTests extends SparkFunSuite with SparkInvolvedSuite { test("Test for JsonUtils.") { val schema = StructType( Seq( @@ -31,7 +32,7 @@ class JsonUtilsTests extends SparkFunSuite { StructField("school", StringType))) val sourcePlanProperties = SparkPlan.Properties( - "plan", + "", LogicalPlanFingerprint( LogicalPlanFingerprint.Properties(Seq(Signature("signatureProvider", "dfSignature"))))) val sourceDataProperties = From 2f232cbf589b888312d8aae2012d6fe155986b6a Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 8 Jul 2020 21:54:41 -0700 Subject: [PATCH 2/8] add comment --- .../scala/com/microsoft/hyperspace/index/IndexLogEntry.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 785f76819..0f22c6852 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -64,6 +64,9 @@ case class SparkPlan(properties: SparkPlan.Properties) { override def equals(o: Any): Boolean = o match { case that: SparkPlan => + // When a logical plan is serialized, some of the fields can be 'lazy' (e.g., + // Metadata._hashCode), meaning that the same logical plan can result in different serialized + // stings. Thus, the serialized strings are deserialized to logical plans and compared. val isPlanEqual = if (!properties.rawPlan.isEmpty) { val sparkSession = SparkSession.getActiveSession.getOrElse { throw HyperspaceException("Could not find active SparkSession.") From b20097bc0908ddca2489f5b38e34516779c3cb65 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 11 Aug 2020 14:17:09 -0700 Subject: [PATCH 3/8] more fixes --- .../index/IndexSignatureProviderTest.scala | 2 +- .../hyperspace/index/IndexTests.scala | 3 +-- .../index/rules/JoinIndexRuleTest.scala | 18 ++++++++---------- .../hyperspace/index/rules/RuleUtilsTest.scala | 4 ++-- .../hyperspace/util/JsonUtilsTests.scala | 3 +-- 5 files changed, 13 insertions(+), 17 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexSignatureProviderTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexSignatureProviderTest.scala index 912e67cd6..6c1851f89 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexSignatureProviderTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexSignatureProviderTest.scala @@ -113,7 +113,7 @@ class IndexSignatureProviderTest extends SparkFunSuite with SparkInvolvedSuite { t2Schema) val joinCondition = EqualTo(t1c3, t2c2) - val joinNode = Join(r1, r2, JoinType("inner"), Some(joinCondition)) + val joinNode = Join(r1, r2, JoinType("inner"), Some(joinCondition), JoinHint.NONE) val filterCondition = And(EqualTo(t1c1, Literal("ABC")), IsNotNull(t1c1)) val filterNode = Filter(filterCondition, joinNode) diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexTests.scala index e8b00af92..28d7a91d0 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexTests.scala @@ -19,10 +19,9 @@ package com.microsoft.hyperspace.index import org.apache.spark.SparkFunSuite import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} -import com.microsoft.hyperspace.SparkInvolvedSuite import com.microsoft.hyperspace.actions.Constants -class IndexTests extends SparkFunSuite with SparkInvolvedSuite { +class IndexTests extends SparkFunSuite { val indexConfig1 = IndexConfig("myIndex1", Array("id"), Seq("name")) val indexConfig2 = IndexConfig("myIndex2", Array("id"), Seq("school")) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala index 881de4176..b9f094a4c 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.{JoinType, SQLHelper} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StringType} import com.microsoft.hyperspace.index._ @@ -115,14 +116,9 @@ class JoinIndexRuleTest extends HyperspaceRuleTestSuite with SQLHelper { test("Join rule works if indexes exist for case insensitive index and query") { val t1c1Caps = t1c1.withName("T1C1") -<<<<<<< HEAD - val joinCondition = EqualTo(t1c1, t2c1) + val joinCondition = EqualTo(t1c1Caps, t2c1) val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) -======= - val joinCondition = EqualTo(t1c1Caps, t2c1) - val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) ->>>>>>> master val updatedPlan = JoinIndexRule(originalPlan) assert(!updatedPlan.equals(originalPlan)) @@ -134,7 +130,7 @@ class JoinIndexRuleTest extends HyperspaceRuleTestSuite with SQLHelper { withSQLConf(IndexConstants.INDEX_SYSTEM_PATH -> "") { val joinCondition = EqualTo(t1c1, t2c1) val originalPlan = - Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) val updatedPlan = JoinIndexRule(originalPlan) assert(updatedPlan.equals(originalPlan)) } @@ -349,8 +345,9 @@ class JoinIndexRuleTest extends HyperspaceRuleTestSuite with SQLHelper { } } - test("Join rule updates plan if columns have one-to-one mapping with repeated " + - "case-insensitive predicates") { + test( + "Join rule updates plan if columns have one-to-one mapping with repeated " + + "case-insensitive predicates") { val t1ProjectNode = Project(Seq(t1c1, t1c3), t1FilterNode) val t2ProjectNode = Project(Seq(t2c1, t2c3), t2FilterNode) @@ -358,7 +355,8 @@ class JoinIndexRuleTest extends HyperspaceRuleTestSuite with SQLHelper { val t2c1Caps = t2c1.withName("T2C1") val joinCondition = And(EqualTo(t1c1, t2c1), EqualTo(t1c1Caps, t2c1Caps)) - val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val originalPlan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) val updatedPlan = JoinIndexRule(originalPlan) assert(!updatedPlan.equals(originalPlan)) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala index f7d685b77..1a7c675b5 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -19,7 +19,7 @@ package com.microsoft.hyperspace.index.rules import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.expressions.{AttributeReference, IsNotNull} import org.apache.spark.sql.catalyst.plans.JoinType -import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, JoinHint, LogicalPlan, Project} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation, NoopCache} import org.apache.spark.sql.types.{IntegerType, StringType} @@ -104,7 +104,7 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite { } test("Verify get logical relation for non-linear plan.") { - val joinNode = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), None) + val joinNode = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), None, JoinHint.NONE) val r = RuleUtils.getLogicalRelation(Project(Seq(t1c3, t2c3), joinNode)) assert(r.isEmpty) } diff --git a/src/test/scala/com/microsoft/hyperspace/util/JsonUtilsTests.scala b/src/test/scala/com/microsoft/hyperspace/util/JsonUtilsTests.scala index 4b52aca07..9385cb603 100644 --- a/src/test/scala/com/microsoft/hyperspace/util/JsonUtilsTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/util/JsonUtilsTests.scala @@ -19,11 +19,10 @@ package com.microsoft.hyperspace.util import org.apache.spark.SparkFunSuite import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} -import com.microsoft.hyperspace.SparkInvolvedSuite import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index._ -class JsonUtilsTests extends SparkFunSuite with SparkInvolvedSuite { +class JsonUtilsTests extends SparkFunSuite { test("Test for JsonUtils.") { val schema = StructType( Seq( From 8ba43f1033cbd6d5420c44a6546867ffea3b29d0 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 11 Aug 2020 14:18:53 -0700 Subject: [PATCH 4/8] clean up --- .../scala/com/microsoft/hyperspace/index/IndexLogEntry.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 4c7cc0b8b..8173bdc21 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -18,7 +18,6 @@ package com.microsoft.hyperspace.index import org.apache.spark.sql.types.{DataType, StructType} -import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.actions.Constants // IndexLogEntry-specific fingerprint to be temporarily used where fingerprint is not defined. From 7dd6cfe912389a9e514ae43d62c217e777f6bda9 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 11 Aug 2020 14:32:36 -0700 Subject: [PATCH 5/8] update python test --- azure-pipelines.yml | 2 +- script/download_spark.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 665f03c3d..7cf60ae00 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -29,7 +29,7 @@ jobs: scriptPath: 'run-tests.py' displayName: 'Running python tests' env: - SPARK_HOME: $(Build.SourcesDirectory)/spark-2.4.2-bin-hadoop2.7 + SPARK_HOME: $(Build.SourcesDirectory)/spark-3.0.0-bin-hadoop2.7 # If not a pull request, publish artifacts. - ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}: - script: sbt +package diff --git a/script/download_spark.sh b/script/download_spark.sh index 39fec55bd..ca202c6ca 100644 --- a/script/download_spark.sh +++ b/script/download_spark.sh @@ -19,7 +19,7 @@ # A utility script for build pipeline to download and install spark binaries for # python tests to run. -SPARK_VERSION="2.4.2" +SPARK_VERSION="3.0.0" HADOOP_VERSION="2.7" SPARK_DIR="spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}" From 8555775460303aa21f2c9e9dce78fd5dbe6208e6 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Thu, 13 Aug 2020 22:39:17 -0700 Subject: [PATCH 6/8] clean up --- .../com/microsoft/hyperspace/index/IndexLogEntryTest.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index a22e946d6..f91d33300 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -19,10 +19,9 @@ package com.microsoft.hyperspace.index import org.apache.spark.SparkFunSuite import org.apache.spark.sql.types.{StringType, StructField, StructType} -import com.microsoft.hyperspace.SparkInvolvedSuite import com.microsoft.hyperspace.util.JsonUtils -class IndexLogEntryTest extends SparkFunSuite with SparkInvolvedSuite { +class IndexLogEntryTest extends SparkFunSuite { test("IndexLogEntry spec example") { val schemaString = """{\"type\":\"struct\", From ce8a5f2954639fe1f31fd2c9cfd79eecddfef981 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 7 Oct 2020 17:55:25 -0700 Subject: [PATCH 7/8] fix tests --- .../hyperspace/index/HyperspaceSuite.scala | 14 -------------- .../hyperspace/index/rules/JoinIndexRuleTest.scala | 7 +++---- 2 files changed, 3 insertions(+), 18 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/HyperspaceSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/HyperspaceSuite.scala index b6a374845..1b60da6d5 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/HyperspaceSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/HyperspaceSuite.scala @@ -90,20 +90,6 @@ trait HyperspaceSuite extends SparkFunSuite with SparkInvolvedSuite { } } - /** - * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` - * returns. This is copied from SparkFunSuite.scala in Spark 3.0. - * - * TODO: This can be removed when we support Spark 3.0. - */ - protected def withTempDir(f: File => Unit): Unit = { - val dir = Utils.createTempDir() - try f(dir) - finally { - Utils.deleteRecursively(dir) - } - } - protected def withTempPathAsString(f: String => Unit): Unit = { // The following is from SQLHelper.withTempPath with a modification to pass // String instead of File to "f". The reason this is copied instead of extending diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala index e8b765986..3b8ae36a0 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala @@ -417,15 +417,14 @@ class JoinIndexRuleTest extends HyperspaceRuleTestSuite with SQLHelper { test("Join rule is not applied for modified plan.") { val joinCondition = EqualTo(t1c1, t2c1) - val plan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition)) + val plan = + Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition), JoinHint.NONE) assert(!JoinIndexRule(plan).equals(plan)) // Mark the relation that the rule is applied and verify the plan does not change. val newPlan = plan transform { case r @ LogicalRelation(h: HadoopFsRelation, _, _, _) => - r.copy( - relation = - h.copy(options = Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark)) + r.copy(relation = h.copy(options = Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark)) } assert(JoinIndexRule(newPlan).equals(newPlan)) } From a51dda0f5bdce2153d0e7095ed20168a56921827 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 23 Feb 2021 09:46:06 -0800 Subject: [PATCH 8/8] update --- build.sbt | 6 ++--- .../default/DefaultFileBasedRelation.scala | 4 +-- .../sources/delta/DeltaLakeRelation.scala | 8 +++--- .../deploy/hyperspace/SparkHadoopUtil.scala | 25 +++++++++++++++++++ 4 files changed, 34 insertions(+), 9 deletions(-) create mode 100644 src/main/scala/org/apache/spark/deploy/hyperspace/SparkHadoopUtil.scala diff --git a/build.sbt b/build.sbt index eeb8c8759..cbed04552 100644 --- a/build.sbt +++ b/build.sbt @@ -16,7 +16,7 @@ name := "hyperspace-core" -sparkVersion := "3.0.0" +sparkVersion := "3.0.1" scalaVersion := "2.12.10" @@ -24,8 +24,8 @@ libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided" withSources (), "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided" withSources (), "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided" withSources (), - "io.delta" %% "delta-core" % "0.6.1" % "provided" withSources (), - "org.apache.iceberg" % "iceberg-spark-runtime" % "0.11.0" % "provided" withSources (), + "io.delta" %% "delta-core" % "0.8.0" % "provided" withSources (), + "org.apache.iceberg" % "iceberg-spark3-runtime" % "0.11.0" % "provided" withSources (), // Test dependencies "org.scalatest" %% "scalatest" % "3.0.5" % "test", "org.mockito" %% "mockito-scala" % "0.4.0" % "test", diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedRelation.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedRelation.scala index 448b8c943..1c3ad0a2e 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedRelation.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedRelation.scala @@ -17,7 +17,7 @@ package com.microsoft.hyperspace.index.sources.default import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.hyperspace.SparkHadoopUtil import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -159,7 +159,7 @@ class DefaultFileBasedRelation(spark: SparkSession, override val plan: LogicalRe .map { path => val hdfsPath = new Path(path) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - qualified.toString -> SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) + qualified.toString -> SparkHadoopUtil.globPathIfNecessary(fs, qualified) } .toMap diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala index 3871e07dc..b800a4b80 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala @@ -46,8 +46,8 @@ class DeltaLakeRelation(spark: SparkSession, override val plan: LogicalRelation) override def allFiles: Seq[FileStatus] = plan.relation match { case HadoopFsRelation(location: TahoeLogFileIndex, _, _, _, _, _) => location - .getSnapshot(stalenessAcceptable = false) - .filesForScan(projection = Nil, location.partitionFilters, keepStats = false) + .getSnapshot + .filesForScan(projection = Nil, location.partitionFilters) .files .map { f => toFileStatus(f.size, f.modificationTime, new Path(location.path, f.path)) @@ -73,8 +73,8 @@ class DeltaLakeRelation(spark: SparkSession, override val plan: LogicalRelation) plan.relation match { case HadoopFsRelation(location: TahoeLogFileIndex, _, dataSchema, _, _, options) => val files = location - .getSnapshot(stalenessAcceptable = false) - .filesForScan(projection = Nil, location.partitionFilters, keepStats = false) + .getSnapshot + .filesForScan(projection = Nil, location.partitionFilters) .files .map { f => toFileStatus(f.size, f.modificationTime, new Path(location.path, f.path)) diff --git a/src/main/scala/org/apache/spark/deploy/hyperspace/SparkHadoopUtil.scala b/src/main/scala/org/apache/spark/deploy/hyperspace/SparkHadoopUtil.scala new file mode 100644 index 000000000..856a8a4c8 --- /dev/null +++ b/src/main/scala/org/apache/spark/deploy/hyperspace/SparkHadoopUtil.scala @@ -0,0 +1,25 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.hyperspace + +import org.apache.hadoop.fs.{FileSystem, Path} + +object SparkHadoopUtil { + def globPathIfNecessary(fs: FileSystem, pattern: Path): Seq[Path] = { + org.apache.spark.deploy.SparkHadoopUtil.get.globPathIfNecessary(fs, pattern) + } +}