Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Add spark session extension for Hyperspace (#504)
Browse files Browse the repository at this point in the history
  • Loading branch information
paryoja authored Nov 15, 2021
1 parent 2f8d32b commit d8c4b79
Show file tree
Hide file tree
Showing 7 changed files with 335 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace

import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

import com.microsoft.hyperspace.index.execution.BucketUnionStrategy
import com.microsoft.hyperspace.index.rules.ApplyHyperspace

/**
* An extension for Spark SQL to activate Hyperspace.
*
* Example to run a `spark-submit` with Hyperspace enabled:
* {{{
* spark-submit -c spark.sql.extensions=com.microsoft.hyperspace.HyperspaceSparkSessionExtension
* }}}
*
* Example to create a `SparkSession` with Hyperspace enabled:
* {{{
* val spark = SparkSession
* .builder()
* .appName("...")
* .master("...")
* .config("spark.sql.extensions", "com.microsoft.hyperspace.HyperspaceSparkSessionExtension")
* .getOrCreate()
* }}}
*/
class HyperspaceSparkSessionExtension extends (SparkSessionExtensions => Unit) {

/**
* If HyperspaceRule is injected directly to OptimizerRule with HyperspaceExtension,
* the order of applying rule is different from without HyperspaceExtension
* (i.e., explicitly calling enableHyperspace). To make behavior consistently,
* current implementation of HyperspaceExtension uses a trick to call enableHyperspace
* before rule is applied. Since the interface of injectOptimizerRule should return rule builder,
* it returns a dummy rule that does nothing. It may increase overhead slightly
* because enableHyperspace is called once for each evaluation of spark plan.
*/
private case object DummyRule extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan
}
}

override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectOptimizerRule { sparkSession =>
// Enable Hyperspace to leverage indexes.
sparkSession.addOptimizationsIfNeeded()
// Return a dummy rule to fit in interface of injectOptimizerRule
DummyRule
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ package com.microsoft.hyperspace.index
import org.apache.spark.sql.internal.SQLConf

object IndexConstants {
// If it is set as false, Hyperspace will not be applied.
val HYPERSPACE_APPLY_ENABLED = "spark.hyperspace.apply.enabled"
val HYPERSPACE_APPLY_ENABLED_DEFAULT = "true"

val INDEXES_DIR = "indexes"

// Config used for setting the system path, which is considered as a "root" path for Hyperspace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.telemetry.HyperspaceEventLogging
import com.microsoft.hyperspace.util.HyperspaceConf

/**
* Transform the given plan to use Hyperspace indexes.
Expand All @@ -42,7 +43,7 @@ object ApplyHyperspace
private[hyperspace] val disableForIndexMaintenance = new ThreadLocal[Boolean]

override def apply(plan: LogicalPlan): LogicalPlan = {
if (disableForIndexMaintenance.get) {
if (!HyperspaceConf.hyperspaceApplyEnabled(spark) || disableForIndexMaintenance.get) {
return plan
}

Expand Down
53 changes: 39 additions & 14 deletions src/main/scala/com/microsoft/hyperspace/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package com.microsoft

import org.apache.spark.sql.SparkSession

import com.microsoft.hyperspace.HyperspaceSparkSessionExtension
import com.microsoft.hyperspace.index.execution.BucketUnionStrategy
import com.microsoft.hyperspace.index.rules.ApplyHyperspace
import com.microsoft.hyperspace.util.HyperspaceConf

package object hyperspace {

Expand All @@ -29,42 +31,65 @@ package object hyperspace {
implicit class Implicits(sparkSession: SparkSession) {

/**
* Plug in Hyperspace-specific rules.
* Enable Hyperspace indexes.
*
* Plug in Hyperspace-specific rules and set `IndexConstants.HYPERSPACE_APPLY_ENABLED` as true.
*
* @return a spark session that contains Hyperspace-specific rules.
*/
def enableHyperspace(): SparkSession = {
disableHyperspace
sparkSession.sessionState.experimentalMethods.extraOptimizations ++=
ApplyHyperspace :: Nil
sparkSession.sessionState.experimentalMethods.extraStrategies ++=
BucketUnionStrategy :: Nil
HyperspaceConf.setHyperspaceApplyEnabled(sparkSession, true)
addOptimizationsIfNeeded()
sparkSession
}

/**
* Plug out Hyperspace-specific rules.
* Disable Hyperspace indexes.
*
* Set `IndexConstants.HYPERSPACE_APPLY_ENABLED` as false
* to stop applying Hyperspace indexes.
*
* @return a spark session that does not contain Hyperspace-specific rules.
* @return a spark session that `IndexConstants.HYPERSPACE_APPLY_ENABLED` is set as false.
*/
def disableHyperspace(): SparkSession = {
val experimentalMethods = sparkSession.sessionState.experimentalMethods
experimentalMethods.extraOptimizations =
experimentalMethods.extraOptimizations.filterNot(ApplyHyperspace.equals)
experimentalMethods.extraStrategies =
experimentalMethods.extraStrategies.filterNot(BucketUnionStrategy.equals)
HyperspaceConf.setHyperspaceApplyEnabled(sparkSession, false)
sparkSession
}

/**
* Checks if Hyperspace is enabled or not.
*
* Note that Hyperspace is enabled when:
* 1) `ApplyHyperspace` exists in extraOptimization
* 2) `BucketUnionStrate` exists in extraStrategies and
* 3) `IndexConstants.HYPERSPACE_APPLY_ENABLED` is true.
*
* @return true if Hyperspace is enabled or false otherwise.
*/
def isHyperspaceEnabled(): Boolean = {
val experimentalMethods = sparkSession.sessionState.experimentalMethods
experimentalMethods.extraOptimizations.contains(ApplyHyperspace) &&
experimentalMethods.extraStrategies.contains(BucketUnionStrategy)
experimentalMethods.extraStrategies.contains(BucketUnionStrategy) &&
HyperspaceConf.hyperspaceApplyEnabled(sparkSession)
}

/**
* Add ApplyHyperspace and BucketUnionStrategy into extraOptimization
* and extraStrategies, respectively, to make Spark can use Hyperspace.
*
* @param sparkSession Spark session that will use Hyperspace
*/
private[hyperspace] def addOptimizationsIfNeeded(): Unit = {
if (!sparkSession.sessionState.experimentalMethods.extraOptimizations.contains(
ApplyHyperspace)) {
sparkSession.sessionState.experimentalMethods.extraOptimizations ++=
ApplyHyperspace :: Nil
}
if (!sparkSession.sessionState.experimentalMethods.extraStrategies.contains(
BucketUnionStrategy)) {
sparkSession.sessionState.experimentalMethods.extraStrategies ++=
BucketUnionStrategy :: Nil
}
}
}
}
16 changes: 16 additions & 0 deletions src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,22 @@ import com.microsoft.hyperspace.index.IndexConstants
* Helper class to extract Hyperspace-related configs from SparkSession.
*/
object HyperspaceConf {

/**
* Returns the config value whether hyperspace is enabled or not.
*/
def hyperspaceApplyEnabled(spark: SparkSession): Boolean = {
spark.conf
.get(
IndexConstants.HYPERSPACE_APPLY_ENABLED,
IndexConstants.HYPERSPACE_APPLY_ENABLED_DEFAULT)
.toBoolean
}

def setHyperspaceApplyEnabled(spark: SparkSession, apply: Boolean): Unit = {
spark.conf.set(IndexConstants.HYPERSPACE_APPLY_ENABLED, apply.toString)
}

def hybridScanEnabled(spark: SparkSession): Boolean = {
spark.conf
.get(
Expand Down
Loading

0 comments on commit d8c4b79

Please sign in to comment.