diff --git a/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala b/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala new file mode 100644 index 000000000..0edee450b --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/HyperspaceSparkSessionExtension.scala @@ -0,0 +1,69 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace + +import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +import com.microsoft.hyperspace.index.execution.BucketUnionStrategy +import com.microsoft.hyperspace.index.rules.ApplyHyperspace + +/** + * An extension for Spark SQL to activate Hyperspace. + * + * Example to run a `spark-submit` with Hyperspace enabled: + * {{{ + * spark-submit -c spark.sql.extensions=com.microsoft.hyperspace.HyperspaceSparkSessionExtension + * }}} + * + * Example to create a `SparkSession` with Hyperspace enabled: + * {{{ + * val spark = SparkSession + * .builder() + * .appName("...") + * .master("...") + * .config("spark.sql.extensions", "com.microsoft.hyperspace.HyperspaceSparkSessionExtension") + * .getOrCreate() + * }}} + */ +class HyperspaceSparkSessionExtension extends (SparkSessionExtensions => Unit) { + + /** + * If HyperspaceRule is injected directly to OptimizerRule with HyperspaceExtension, + * the order of applying rule is different from without HyperspaceExtension + * (i.e., explicitly calling enableHyperspace). To make behavior consistently, + * current implementation of HyperspaceExtension uses a trick to call enableHyperspace + * before rule is applied. Since the interface of injectOptimizerRule should return rule builder, + * it returns a dummy rule that does nothing. It may increase overhead slightly + * because enableHyperspace is called once for each evaluation of spark plan. + */ + private case object DummyRule extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { + plan + } + } + + override def apply(extensions: SparkSessionExtensions): Unit = { + extensions.injectOptimizerRule { sparkSession => + // Enable Hyperspace to leverage indexes. + sparkSession.addOptimizationsIfNeeded() + // Return a dummy rule to fit in interface of injectOptimizerRule + DummyRule + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index b415fb548..c65c2e092 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -19,6 +19,10 @@ package com.microsoft.hyperspace.index import org.apache.spark.sql.internal.SQLConf object IndexConstants { + // If it is set as false, Hyperspace will not be applied. + val HYPERSPACE_APPLY_ENABLED = "spark.hyperspace.apply.enabled" + val HYPERSPACE_APPLY_ENABLED_DEFAULT = "true" + val INDEXES_DIR = "indexes" // Config used for setting the system path, which is considered as a "root" path for Hyperspace; diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala index a9ae97acd..60967c1ad 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala @@ -24,6 +24,7 @@ import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace} import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index.IndexLogEntry import com.microsoft.hyperspace.telemetry.HyperspaceEventLogging +import com.microsoft.hyperspace.util.HyperspaceConf /** * Transform the given plan to use Hyperspace indexes. @@ -42,7 +43,7 @@ object ApplyHyperspace private[hyperspace] val disableForIndexMaintenance = new ThreadLocal[Boolean] override def apply(plan: LogicalPlan): LogicalPlan = { - if (disableForIndexMaintenance.get) { + if (!HyperspaceConf.hyperspaceApplyEnabled(spark) || disableForIndexMaintenance.get) { return plan } diff --git a/src/main/scala/com/microsoft/hyperspace/package.scala b/src/main/scala/com/microsoft/hyperspace/package.scala index fb57bdb35..7d7caa4e4 100644 --- a/src/main/scala/com/microsoft/hyperspace/package.scala +++ b/src/main/scala/com/microsoft/hyperspace/package.scala @@ -18,8 +18,10 @@ package com.microsoft import org.apache.spark.sql.SparkSession +import com.microsoft.hyperspace.HyperspaceSparkSessionExtension import com.microsoft.hyperspace.index.execution.BucketUnionStrategy import com.microsoft.hyperspace.index.rules.ApplyHyperspace +import com.microsoft.hyperspace.util.HyperspaceConf package object hyperspace { @@ -29,42 +31,65 @@ package object hyperspace { implicit class Implicits(sparkSession: SparkSession) { /** - * Plug in Hyperspace-specific rules. + * Enable Hyperspace indexes. + * + * Plug in Hyperspace-specific rules and set `IndexConstants.HYPERSPACE_APPLY_ENABLED` as true. * * @return a spark session that contains Hyperspace-specific rules. */ def enableHyperspace(): SparkSession = { - disableHyperspace - sparkSession.sessionState.experimentalMethods.extraOptimizations ++= - ApplyHyperspace :: Nil - sparkSession.sessionState.experimentalMethods.extraStrategies ++= - BucketUnionStrategy :: Nil + HyperspaceConf.setHyperspaceApplyEnabled(sparkSession, true) + addOptimizationsIfNeeded() sparkSession } /** - * Plug out Hyperspace-specific rules. + * Disable Hyperspace indexes. + * + * Set `IndexConstants.HYPERSPACE_APPLY_ENABLED` as false + * to stop applying Hyperspace indexes. * - * @return a spark session that does not contain Hyperspace-specific rules. + * @return a spark session that `IndexConstants.HYPERSPACE_APPLY_ENABLED` is set as false. */ def disableHyperspace(): SparkSession = { - val experimentalMethods = sparkSession.sessionState.experimentalMethods - experimentalMethods.extraOptimizations = - experimentalMethods.extraOptimizations.filterNot(ApplyHyperspace.equals) - experimentalMethods.extraStrategies = - experimentalMethods.extraStrategies.filterNot(BucketUnionStrategy.equals) + HyperspaceConf.setHyperspaceApplyEnabled(sparkSession, false) sparkSession } /** * Checks if Hyperspace is enabled or not. * + * Note that Hyperspace is enabled when: + * 1) `ApplyHyperspace` exists in extraOptimization + * 2) `BucketUnionStrate` exists in extraStrategies and + * 3) `IndexConstants.HYPERSPACE_APPLY_ENABLED` is true. + * * @return true if Hyperspace is enabled or false otherwise. */ def isHyperspaceEnabled(): Boolean = { val experimentalMethods = sparkSession.sessionState.experimentalMethods experimentalMethods.extraOptimizations.contains(ApplyHyperspace) && - experimentalMethods.extraStrategies.contains(BucketUnionStrategy) + experimentalMethods.extraStrategies.contains(BucketUnionStrategy) && + HyperspaceConf.hyperspaceApplyEnabled(sparkSession) + } + + /** + * Add ApplyHyperspace and BucketUnionStrategy into extraOptimization + * and extraStrategies, respectively, to make Spark can use Hyperspace. + * + * @param sparkSession Spark session that will use Hyperspace + */ + private[hyperspace] def addOptimizationsIfNeeded(): Unit = { + if (!sparkSession.sessionState.experimentalMethods.extraOptimizations.contains( + ApplyHyperspace)) { + sparkSession.sessionState.experimentalMethods.extraOptimizations ++= + ApplyHyperspace :: Nil + } + if (!sparkSession.sessionState.experimentalMethods.extraStrategies.contains( + BucketUnionStrategy)) { + sparkSession.sessionState.experimentalMethods.extraStrategies ++= + BucketUnionStrategy :: Nil + } } } } diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index 2fcc56519..8b071c439 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -25,6 +25,22 @@ import com.microsoft.hyperspace.index.IndexConstants * Helper class to extract Hyperspace-related configs from SparkSession. */ object HyperspaceConf { + + /** + * Returns the config value whether hyperspace is enabled or not. + */ + def hyperspaceApplyEnabled(spark: SparkSession): Boolean = { + spark.conf + .get( + IndexConstants.HYPERSPACE_APPLY_ENABLED, + IndexConstants.HYPERSPACE_APPLY_ENABLED_DEFAULT) + .toBoolean + } + + def setHyperspaceApplyEnabled(spark: SparkSession, apply: Boolean): Unit = { + spark.conf.set(IndexConstants.HYPERSPACE_APPLY_ENABLED, apply.toString) + } + def hybridScanEnabled(spark: SparkSession): Boolean = { spark.conf .get( diff --git a/src/test/scala/com/microsoft/hyperspace/HyperspaceExtensionTest.scala b/src/test/scala/com/microsoft/hyperspace/HyperspaceExtensionTest.scala new file mode 100644 index 000000000..f51564d3c --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/HyperspaceExtensionTest.scala @@ -0,0 +1,198 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.microsoft.hyperspace + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation} + +import com.microsoft.hyperspace.index.{Content, FileIdTracker, HyperspaceSuite, IndexConfig, IndexConstants} +import com.microsoft.hyperspace.telemetry.Constants.HYPERSPACE_EVENT_LOGGER_CLASS_KEY +import com.microsoft.hyperspace.util.FileUtils + +class HyperspaceExtensionTest extends HyperspaceSuite { + private val sampleDeptDataLocation = inTempDir("dept") + private val sampleEmpDataLocation = inTempDir("emp") + + private val departments = Seq( + (10, "Accounting", "New York"), + (20, "Research", "Dallas"), + (30, "Sales", "Chicago"), + (40, "Operations", "Boston")) + + private val employees = Seq( + (7369, "SMITH", 20), + (7499, "ALLEN", 30), + (7521, "WARD", 30), + (7566, "JONES", 20), + (7698, "BLAKE", 30), + (7782, "CLARK", 10), + (7788, "SCOTT", 20), + (7839, "KING", 10), + (7844, "TURNER", 30), + (7876, "ADAMS", 20), + (7900, "JAMES", 30), + (7934, "MILLER", 10), + (7902, "FORD", 20), + (7654, "MARTIN", 30)) + + override protected lazy val spark: SparkSession = SparkSession + .builder() + .master(s"local[$numParallelism]") + .config(HYPERSPACE_EVENT_LOGGER_CLASS_KEY, "com.microsoft.hyperspace.MockEventLogger") + .config("delta.log.cacheSize", "3") + .config("spark.databricks.delta.snapshotPartitions", "2") + .config("spark.driver.bindAddress", "127.0.0.1") + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") + .config( + "spark.sql.extensions", + "io.delta.sql.DeltaSparkSessionExtension," + + "com.microsoft.hyperspace.HyperspaceSparkSessionExtension") + .config("spark.sql.shuffle.partitions", "5") + .config("spark.sql.sources.parallelPartitionDiscovery.parallelism", "5") + .config("spark.ui.enabled", "false") + .config("spark.ui.showConsoleProgress", "false") + .appName(suiteName) + .getOrCreate() + + override def beforeAll(): Unit = { + super.beforeAll() + + val sparkSession = spark + import sparkSession.implicits._ + FileUtils.delete(new Path(sampleDeptDataLocation)) + FileUtils.delete(new Path(sampleEmpDataLocation)) + + departments + .toDF("deptId", "deptName", "location") + .write + .mode("overwrite") + .parquet(sampleDeptDataLocation) + + employees + .toDF("empId", "empName", "deptId") + .write + .mode("overwrite") + .parquet(sampleEmpDataLocation) + } + + override def afterAll(): Unit = { + FileUtils.delete(new Path(sampleDeptDataLocation)) + FileUtils.delete(new Path(sampleEmpDataLocation)) + super.beforeAll() + } + + test("Verify ApplyHyperspace is used with hyperspace extension session") { + MockEventLogger.reset() + + val deptDF = spark.read.parquet(sampleDeptDataLocation) + val empDF = spark.read.parquet(sampleEmpDataLocation) + + val deptIndexConfig = IndexConfig("deptIndex", Seq("deptId"), Seq("deptName")) + val empIndexConfig = IndexConfig("empIndex", Seq("deptId"), Seq("empName")) + + // Create Hyperspace indexes. + val hs = new Hyperspace(spark) + hs.createIndex(deptDF, deptIndexConfig) + hs.createIndex(empDF, empIndexConfig) + + // Make sure new index is available to all. + assert(Hyperspace.getContext(spark).indexCollectionManager.indexes.count == 2) + + def filterQuery(): DataFrame = deptDF.filter("deptId == '30'").select("deptId", "deptName") + + verifyIndexUsage(filterQuery, getIndexFilesPath(deptIndexConfig.indexName)) + + def eqJoinQuery(): DataFrame = + empDF + .join(deptDF, empDF("deptId") === deptDF("deptId")) + .select(empDF("empName"), deptDF("deptName")) + + verifyIndexUsage( + eqJoinQuery, + getIndexFilesPath(deptIndexConfig.indexName) ++ getIndexFilesPath(empIndexConfig.indexName)) + } + + /** + * Verify that the query plan has the expected rootPaths. + * + * @param optimizedPlan the optimized query plan. + * @param expectedPaths the expected paths in the query plan. + */ + private def verifyQueryPlanHasExpectedRootPaths( + optimizedPlan: LogicalPlan, + expectedPaths: Seq[Path]): Unit = { + assert(getAllRootPaths(optimizedPlan).sortBy(_.getName) === expectedPaths.sortBy(_.getName)) + } + + /** + * Get all rootPaths from a query plan. + * + * @param optimizedPlan the optimized query plan. + * @return a sequence of [[Path]]. + */ + private def getAllRootPaths(optimizedPlan: LogicalPlan): Seq[Path] = { + optimizedPlan.collect { + case LogicalRelation( + HadoopFsRelation(location: InMemoryFileIndex, _, _, _, _, _), + _, + _, + _) => + location.rootPaths + }.flatten + } + + private def getIndexFilesPath(indexName: String, versions: Seq[Int] = Seq(0)): Seq[Path] = { + versions.flatMap { v => + Content + .fromDirectory( + new Path(systemPath, s"$indexName/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=$v"), + new FileIdTracker, + new Configuration) + .files + } + } + + /** + * Gets the sorted rows from the given dataframe to make it easy to compare with + * other dataframe. + * + * @param df dataframe to collect rows from. + * @return sorted rows. + */ + private def getSortedRows(df: DataFrame): Array[Row] = { + df.orderBy(df.columns.head, df.columns.tail: _*).collect() + } + + private def verifyIndexUsage(f: () => DataFrame, expectedRootPaths: Seq[Path]): Unit = { + spark.disableHyperspace() + val dfWithHyperspaceDisabled = f() + val schemaWithHyperspaceDisabled = dfWithHyperspaceDisabled.schema + val sortedRowsWithHyperspaceDisabled = getSortedRows(dfWithHyperspaceDisabled) + + spark.enableHyperspace() + val dfWithHyperspaceEnabled = f() + + verifyQueryPlanHasExpectedRootPaths( + dfWithHyperspaceEnabled.queryExecution.optimizedPlan, + expectedRootPaths) + + assert(schemaWithHyperspaceDisabled.equals(dfWithHyperspaceEnabled.schema)) + assert(sortedRowsWithHyperspaceDisabled.sameElements(getSortedRows(dfWithHyperspaceEnabled))) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala index ec1c2841f..ae66ccb15 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala @@ -33,7 +33,7 @@ import com.microsoft.hyperspace.index.dataskipping.DataSkippingIndexConfig import com.microsoft.hyperspace.index.dataskipping.sketches.MinMaxSketch import com.microsoft.hyperspace.index.execution.BucketUnionStrategy import com.microsoft.hyperspace.index.rules.{ApplyHyperspace, CandidateIndexCollector} -import com.microsoft.hyperspace.util.PathUtils +import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils} class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { private val testDir = inTempDir("e2eTests") @@ -91,12 +91,16 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite { spark.sessionState.experimentalMethods.extraStrategies .containsSlice(expectedOptimizationStrategy)) + // Since applyHyperspace is called before, extraOptimization contains ApplyHyperspace + // This behavior has changed according to following discussion: + // https://github.com/microsoft/hyperspace/pull/504/files#r740278070 spark.disableHyperspace() + assert(!HyperspaceConf.hyperspaceApplyEnabled(spark)) assert( - !spark.sessionState.experimentalMethods.extraOptimizations + spark.sessionState.experimentalMethods.extraOptimizations .containsSlice(expectedOptimizationRuleBatch)) assert( - !spark.sessionState.experimentalMethods.extraStrategies + spark.sessionState.experimentalMethods.extraStrategies .containsSlice(expectedOptimizationStrategy)) }