Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
add spark session extension
Browse files Browse the repository at this point in the history
add configure to control enabling hyperspace
add dummy rule to avoid different behavior of Extensions / apply hyperspace
add test for hyperspace extension
update document about the methods to enable hyperspace
  • Loading branch information
Yoonjae Park authored and paryoja committed Nov 4, 2021
1 parent 4d93770 commit 923872a
Show file tree
Hide file tree
Showing 6 changed files with 323 additions and 1 deletion.
38 changes: 38 additions & 0 deletions docs/_docs/01-ug-quick-start-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,44 @@ hs.explain(query, verbose = True)
#### Enable Hyperspace

Now that you have created an index that your query can utilize, you can enable Hyperspace and execute your query:
There is two ways of enabling Hyperspace.

1. Using Hyperspace extension.

You can start a session with Hyperspace extension.
The follows are examples to use Spark™ with Hyperspace.

Spark-submit:
```
spark-submit -c spark.sql.extensions=com.microsoft.hyperspace.HyperspaceSparkSessionExtension ...
```

Scala:
```scala
val spark = SparkSession
.builder()
.appName("...")
.master("...")
.config("spark.sql.extensions", "com.microsoft.hyperspace.HyperspaceSparkSessionExtension")
.getOrCreate()
```

Python:
```python
from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("...") \
.master("...") \
.config("spark.sql.extensions", "com.microsoft.hyperspace.HyperspaceSparkSessionExtension") \
.getOrCreate()
```


2. Calling enableHyperspace function explicitly.

By explicitly calling enableHyperspace (for scala) or Hyperspace.enable (for python), Spark will use Hyperspace indexes when it is applicable.

Scala:
```scala
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace

import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

/**
* An extension for Spark SQL to activate Hyperspace.
*
* Example to run a `spark-submit` with Hyperspace enabled:
* {{{
* spark-submit -c spark.sql.extensions=com.microsoft.hyperspace.HyperspaceSparkSessionExtension
* }}}
*
* Example to create a `SparkSession` with Hyperspace enabled:
* {{{
* val spark = SparkSession
* .builder()
* .appName("...")
* .master("...")
* .config("spark.sql.extensions", "com.microsoft.hyperspace.HyperspaceSparkSessionExtension")
* .getOrCreate()
* }}}
*
* NOTE: If HyperspaceRule is injected directly to OptimizerRule with HyperspaceExtension,
* the order of applying rule is different from without HyperspaceExtension
* (i.e., explicitly calling enableHyperspace). To make behavior consistently,
* current implementation of HyperspaceExtension uses a trick to call enableHyperspace
* before rule is applied. Since the interface of injectOptimizerRule should return rule builder,
* it returns a dummy rule that does nothing. It may increase overhead slightly
* because enableHyperspace is called once for each evaluation of spark plan.
*/

class DummyRule extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan
}
}

class HyperspaceSparkSessionExtension extends (SparkSessionExtensions => Unit) {
type RuleBuilder = SparkSession => Rule[LogicalPlan]
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectOptimizerRule { sparkSession =>
// Enable Hyperspace to leverage indexes.
sparkSession.enableHyperspace()
// returns a dummy rule to fit in interface of injectOptimizerRule
new DummyRule
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ package com.microsoft.hyperspace.index
import org.apache.spark.sql.internal.SQLConf

object IndexConstants {
// If it is set as true, Hyperspace will not be applied
// even when user explicitly called "spark.enableHyperspace()".
val APPLY_HYPERSPACE_BYPASS = "spark.hyperspace.applyhyperspace.bypass"
val APPLY_HYPERSPACE_BYPASS_DEFAULT = "false"

val INDEXES_DIR = "indexes"

// Config used for setting the system path, which is considered as a "root" path for Hyperspace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.telemetry.HyperspaceEventLogging
import com.microsoft.hyperspace.util.HyperspaceConf

/**
* Transform the given plan to use Hyperspace indexes.
Expand All @@ -42,10 +43,13 @@ object ApplyHyperspace
private[hyperspace] val disableForIndexMaintenance = new ThreadLocal[Boolean]

override def apply(plan: LogicalPlan): LogicalPlan = {
if (disableForIndexMaintenance.get) {
if (HyperspaceConf.applyHyperspaceBypass(spark)) {
return plan
}

if (disableForIndexMaintenance.get) {
return plan
}
val indexManager = Hyperspace
.getContext(spark)
.indexCollectionManager
Expand Down
10 changes: 10 additions & 0 deletions src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ import com.microsoft.hyperspace.index.IndexConstants
* Helper class to extract Hyperspace-related configs from SparkSession.
*/
object HyperspaceConf {

/**
* Returns the config value whether hyperspace is enabled or not.
*/
def applyHyperspaceBypass(spark: SparkSession): Boolean = {
spark.conf
.get(IndexConstants.APPLY_HYPERSPACE_BYPASS, IndexConstants.APPLY_HYPERSPACE_BYPASS_DEFAULT)
.toBoolean
}

def hybridScanEnabled(spark: SparkSession): Boolean = {
spark.conf
.get(
Expand Down
199 changes: 199 additions & 0 deletions src/test/scala/com/microsoft/hyperspace/HyperspaceExtensionTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation}

import com.microsoft.hyperspace.index.{Content, FileIdTracker, HyperspaceSuite, IndexConfig, IndexConstants}
import com.microsoft.hyperspace.telemetry.Constants.HYPERSPACE_EVENT_LOGGER_CLASS_KEY
import com.microsoft.hyperspace.util.FileUtils

class HyperspaceExtensionTest extends HyperspaceSuite {
private val sampleDeptDataLocation = inTempDir("dept")
private val sampleEmpDataLocation = inTempDir("emp")

private val departments = Seq(
(10, "Accounting", "New York"),
(20, "Research", "Dallas"),
(30, "Sales", "Chicago"),
(40, "Operations", "Boston"))

private val employees = Seq(
(7369, "SMITH", 20),
(7499, "ALLEN", 30),
(7521, "WARD", 30),
(7566, "JONES", 20),
(7698, "BLAKE", 30),
(7782, "CLARK", 10),
(7788, "SCOTT", 20),
(7839, "KING", 10),
(7844, "TURNER", 30),
(7876, "ADAMS", 20),
(7900, "JAMES", 30),
(7934, "MILLER", 10),
(7902, "FORD", 20),
(7654, "MARTIN", 30))

override protected lazy val spark: SparkSession = SparkSession
.builder()
.master(s"local[$numParallelism]")
.config(HYPERSPACE_EVENT_LOGGER_CLASS_KEY, "com.microsoft.hyperspace.MockEventLogger")
.config("delta.log.cacheSize", "3")
.config("spark.databricks.delta.snapshotPartitions", "2")
.config("spark.driver.bindAddress", "127.0.0.1")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config(
"spark.sql.extensions",
"io.delta.sql.DeltaSparkSessionExtension," +
"com.microsoft.hyperspace.HyperspaceSparkSessionExtension")
.config("spark.sql.shuffle.partitions", "5")
.config("spark.sql.sources.parallelPartitionDiscovery.parallelism", "5")
.config("spark.ui.enabled", "false")
.config("spark.ui.showConsoleProgress", "false")
.appName(suiteName)
.getOrCreate()

override def beforeAll(): Unit = {
super.beforeAll()

val sparkSession = spark
import sparkSession.implicits._
FileUtils.delete(new Path(sampleDeptDataLocation))
FileUtils.delete(new Path(sampleEmpDataLocation))

departments
.toDF("deptId", "deptName", "location")
.write
.mode("overwrite")
.parquet(sampleDeptDataLocation)

employees
.toDF("empId", "empName", "deptId")
.write
.mode("overwrite")
.parquet(sampleEmpDataLocation)
}

override def afterAll(): Unit = {
FileUtils.delete(new Path(sampleDeptDataLocation))
FileUtils.delete(new Path(sampleEmpDataLocation))
super.beforeAll()
}

test("Verify ApplyHyperspace is used with hyperspace extension session") {
MockEventLogger.reset()

val deptDF = spark.read.parquet(sampleDeptDataLocation)
val empDF = spark.read.parquet(sampleEmpDataLocation)

val deptIndexConfig = IndexConfig("deptIndex", Seq("deptId"), Seq("deptName"))
val empIndexConfig = IndexConfig("empIndex", Seq("deptId"), Seq("empName"))

// Create Hyperspace indexes.
val hs = new Hyperspace(spark)
hs.createIndex(deptDF, deptIndexConfig)
hs.createIndex(empDF, empIndexConfig)

// Make sure new index is available to all.
assert(Hyperspace.getContext(spark).indexCollectionManager.indexes.count == 2)

def filterQuery(): DataFrame = deptDF.filter("deptId == '30'").select("deptId", "deptName")

verifyIndexUsage(filterQuery, getIndexFilesPath(deptIndexConfig.indexName))

def eqJoinQuery(): DataFrame =
empDF
.join(deptDF, empDF("deptId") === deptDF("deptId"))
.select(empDF("empName"), deptDF("deptName"))

verifyIndexUsage(
eqJoinQuery,
getIndexFilesPath(deptIndexConfig.indexName) ++ getIndexFilesPath(empIndexConfig.indexName))

}

/**
* Verify that the query plan has the expected rootPaths.
*
* @param optimizedPlan the optimized query plan.
* @param expectedPaths the expected paths in the query plan.
*/
private def verifyQueryPlanHasExpectedRootPaths(
optimizedPlan: LogicalPlan,
expectedPaths: Seq[Path]): Unit = {
assert(getAllRootPaths(optimizedPlan).sortBy(_.getName) === expectedPaths.sortBy(_.getName))
}

/**
* Get all rootPaths from a query plan.
*
* @param optimizedPlan the optimized query plan.
* @return a sequence of [[Path]].
*/
private def getAllRootPaths(optimizedPlan: LogicalPlan): Seq[Path] = {
optimizedPlan.collect {
case LogicalRelation(
HadoopFsRelation(location: InMemoryFileIndex, _, _, _, _, _),
_,
_,
_) =>
location.rootPaths
}.flatten
}

private def getIndexFilesPath(indexName: String, versions: Seq[Int] = Seq(0)): Seq[Path] = {
versions.flatMap { v =>
Content
.fromDirectory(
new Path(systemPath, s"$indexName/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=$v"),
new FileIdTracker,
new Configuration)
.files
}
}

/**
* Gets the sorted rows from the given dataframe to make it easy to compare with
* other dataframe.
*
* @param df dataframe to collect rows from.
* @return sorted rows.
*/
private def getSortedRows(df: DataFrame): Array[Row] = {
df.orderBy(df.columns.head, df.columns.tail: _*).collect()
}

private def verifyIndexUsage(f: () => DataFrame, expectedRootPaths: Seq[Path]): Unit = {
spark.disableHyperspace()
val dfWithHyperspaceDisabled = f()
val schemaWithHyperspaceDisabled = dfWithHyperspaceDisabled.schema
val sortedRowsWithHyperspaceDisabled = getSortedRows(dfWithHyperspaceDisabled)

spark.enableHyperspace()
val dfWithHyperspaceEnabled = f()

verifyQueryPlanHasExpectedRootPaths(
dfWithHyperspaceEnabled.queryExecution.optimizedPlan,
expectedRootPaths)

assert(schemaWithHyperspaceDisabled.equals(dfWithHyperspaceEnabled.schema))
assert(sortedRowsWithHyperspaceDisabled.sameElements(getSortedRows(dfWithHyperspaceEnabled)))
}
}

0 comments on commit 923872a

Please sign in to comment.