Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Data Skipping Index Part 3-2: Rule
Browse files Browse the repository at this point in the history
  • Loading branch information
Chungmin Lee committed Aug 22, 2021
1 parent e131f2a commit 2c961b4
Show file tree
Hide file tree
Showing 35 changed files with 2,400 additions and 57 deletions.
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ ThisBuild / Test / fork := true

ThisBuild / Test / javaOptions += "-Xmx1024m"

// Needed to test both non-codegen and codegen parts of expressions
ThisBuild / Test / envVars += "SPARK_TESTING" -> "1"

ThisBuild / coverageExcludedPackages := "com\\.fasterxml.*;com\\.microsoft\\.hyperspace\\.shim"

/**
Expand Down
24 changes: 24 additions & 0 deletions src/main/scala-spark2/com/microsoft/hyperspace/shim/First.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.shim

import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.catalyst.expressions.aggregate.First

object FirstNullSafe {
def apply(child: Expression): First = First(child, Literal(false))
}
Empty file.
24 changes: 24 additions & 0 deletions src/main/scala-spark3/com/microsoft/hyperspace/shim/First.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.shim

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.aggregate.First

object FirstNullSafe {
def apply(child: Expression): First = First(child, false)
}
11 changes: 1 addition & 10 deletions src/main/scala/com/microsoft/hyperspace/Hyperspace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.IndexConstants.{OPTIMIZE_MODE_QUICK, REFRESH_MODE_FULL}
import com.microsoft.hyperspace.index.plananalysis.{CandidateIndexAnalyzer, PlanAnalyzer}
import com.microsoft.hyperspace.index.rules.ApplyHyperspace
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.withHyperspaceRuleDisabled
import com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager

class Hyperspace(spark: SparkSession) {
Expand Down Expand Up @@ -189,15 +189,6 @@ class Hyperspace(spark: SparkSession) {
}
}
}

private def withHyperspaceRuleDisabled(f: => Unit): Unit = {
try {
ApplyHyperspace.disableForIndexMaintenance.set(true)
f
} finally {
ApplyHyperspace.disableForIndexMaintenance.set(false)
}
}
}

object Hyperspace extends ActiveSparkSession {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

package com.microsoft.hyperspace.index

import org.apache.spark.sql.execution.datasources.InMemoryFileIndex
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.{FileIndex, InMemoryFileIndex}

import com.microsoft.hyperspace.index.plananalysis.FilterReason

Expand Down Expand Up @@ -68,4 +69,17 @@ object IndexLogEntryTags {
// If it's enabled, FILTER_REASONS and APPLIED_INDEX_RULES info will be tagged.
val INDEX_PLAN_ANALYSIS_ENABLED: IndexLogEntryTag[Boolean] =
IndexLogEntryTag[Boolean]("indexPlanAnalysisEnabled")

// DATASKIPPING_INDEX_DATA_PREDICATE stores the index predicate translated
// from the plan's filter or join condition.
val DATASKIPPING_INDEX_PREDICATE: IndexLogEntryTag[Option[Expression]] =
IndexLogEntryTag[Option[Expression]]("dataskippingIndexPredicate")

// DATASKIPPING_INDEX_FILEINDEX stores InMemoryFileIndex for the index data.
val DATASKIPPING_INDEX_FILEINDEX: IndexLogEntryTag[InMemoryFileIndex] =
IndexLogEntryTag[InMemoryFileIndex]("dataskippingIndexRelation")

// DATASKIPPING_INDEX_FILEINDEX stores InMemoryFileIndex for the source data.
val DATASKIPPING_SOURCE_FILEINDEX: IndexLogEntryTag[FileIndex] =
IndexLogEntryTag[FileIndex]("dataskippingSourceRelation")
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@

package com.microsoft.hyperspace.index.dataskipping

import org.apache.spark.sql.{Column, DataFrame, SaveMode}
import org.apache.spark.sql.functions.{input_file_name, min, spark_partition_id}
import scala.collection.mutable

import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal, NamedExpression, Or}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.hyperspace.utils.StructTypeUtils
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -121,6 +126,60 @@ case class DataSkippingIndex(

override def hashCode: Int = sketches.map(_.hashCode).sum

/**
* Translate the given filter/join condition for the source data to a
* predicate that can be used to filter out unnecessary source data files when
* applied to index data.
*
* For example, a filter condition "A = 1" can be translated into an index
* predicate "Min_A <= 1 && Max_A >= 1" to filter out files which cannot satisfy
* the condition for any rows in the file.
*/
def translateFilterCondition(
spark: SparkSession,
condition: Expression,
source: LogicalPlan): Option[Expression] = {
val resolvedExprs =
ExpressionUtils.getResolvedExprs(spark, sketches, source).getOrElse { return None }
val predMap = buildPredicateMap(spark, condition, source, resolvedExprs)

// Create a single index predicate for a single source predicate node,
// by combining individual index predicates with And.
// True is returned if there are no index predicates for the source predicate node.
def toIndexPred(sourcePred: Expression): Expression = {
predMap.get(sourcePred).map(_.reduceLeft(And)).getOrElse(Literal.TrueLiteral)
}

// Compose an index predicate visiting the source predicate tree recursively.
def composeIndexPred(sourcePred: Expression): Expression =
sourcePred match {
case and: And => And(toIndexPred(and), and.mapChildren(composeIndexPred))
case or: Or => And(toIndexPred(or), or.mapChildren(composeIndexPred))
case leaf => toIndexPred(leaf)
}

val indexPredicate = composeIndexPred(condition)

// Apply constant folding to get the final predicate.
// This is a trimmed down version of the BooleanSimplification rule.
// It's just enough to determine whether the index is applicable or not.
val optimizePredicate: PartialFunction[Expression, Expression] = {
case And(Literal.TrueLiteral, right) => right
case And(left, Literal.TrueLiteral) => left
case And(a, And(b, c)) if a.deterministic && a == b => And(b, c)
case Or(t @ Literal.TrueLiteral, right) => t
case Or(left, t @ Literal.TrueLiteral) => t
}
val optimizedIndexPredicate = indexPredicate.transformUp(optimizePredicate)

// Return None if the index predicate is True - meaning no conversion can be done.
if (optimizedIndexPredicate == Literal.TrueLiteral) {
None
} else {
Some(optimizedIndexPredicate)
}
}

private def writeImpl(ctx: IndexerContext, indexData: DataFrame, writeMode: SaveMode): Unit = {
// require instead of assert, as the condition can potentially be broken by
// code which is external to dataskipping.
Expand All @@ -142,6 +201,50 @@ case class DataSkippingIndex(
repartitionedIndexData.write.mode(writeMode).parquet(ctx.indexDataPath.toString)
indexData.unpersist()
}

/**
* Collects index predicates for each node in the source predicate.
*/
private def buildPredicateMap(
spark: SparkSession,
predicate: Expression,
source: LogicalPlan,
resolvedExprs: Map[Sketch, Seq[Expression]])
: mutable.Map[Expression, mutable.Buffer[Expression]] = {
val predMap = mutable.Map[Expression, mutable.Buffer[Expression]]()
val sketchesWithIndex = sketches.zipWithIndex
predicate.foreachUp { sourcePred =>
val indexPreds = sketchesWithIndex.flatMap {
case (sketch, idx) =>
sketch.convertPredicate(
sourcePred,
resolvedExprs(sketch),
source.output.map(attr => attr.exprId -> attr.name).toMap,
aggrNames(idx).map(UnresolvedAttribute.quoted(_)))
}
if (indexPreds.nonEmpty) {
predMap.getOrElseUpdate(sourcePred, mutable.Buffer.empty) ++= indexPreds
}
}
predMap
}

private def aggrNames(i: Int): Seq[String] = {
aggregateFunctions
.slice(sketchOffsets(i), sketchOffsets(i + 1))
.map(_.expr.asInstanceOf[NamedExpression].name)
}

/**
* Sketch offsets are used to map each sketch to its corresponding columns
* in the dataframe.
*/
@transient
private lazy val sketchOffsets: Seq[Int] =
sketches.map(_.aggregateFunctions.length).scanLeft(0)(_ + _)

@transient
private lazy val aggregateFunctions = DataSkippingIndex.getNamedAggregateFunctions(sketches)
}

object DataSkippingIndex {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ package com.microsoft.hyperspace.index.dataskipping

import scala.collection.mutable

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.{DataFrame, SparkSession}

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.index.{IndexConfigTrait, IndexerContext}
import com.microsoft.hyperspace.index.dataskipping.sketch.Sketch
import com.microsoft.hyperspace.index.dataskipping.sketch.{PartitionSketch, Sketch}
import com.microsoft.hyperspace.index.dataskipping.util.ExpressionUtils

/**
Expand Down Expand Up @@ -59,12 +59,24 @@ case class DataSkippingIndexConfig(
sourceData: DataFrame,
properties: Map[String, String]): (DataSkippingIndex, DataFrame) = {
val resolvedSketches = ExpressionUtils.resolve(ctx.spark, sketches, sourceData)
checkDuplicateSketches(resolvedSketches)
val indexData = DataSkippingIndex.createIndexData(ctx, resolvedSketches, sourceData)
val index = DataSkippingIndex(resolvedSketches, indexData.schema, properties)
val partitionSketches = getPartitionSketches(ctx.spark, sourceData)
val finalSketches = partitionSketches ++ resolvedSketches
checkDuplicateSketches(finalSketches)
val indexData = DataSkippingIndex.createIndexData(ctx, finalSketches, sourceData)
val index = DataSkippingIndex(finalSketches, indexData.schema, properties)
(index, indexData)
}

private def getPartitionSketches(
spark: SparkSession,
sourceData: DataFrame): Seq[PartitionSketch] = {
val relation = Hyperspace
.getContext(spark)
.sourceProviderManager
.getRelation(sourceData.queryExecution.optimizedPlan)
relation.partitionSchema.map(f => PartitionSketch(f.name, Some(f.dataType)))
}

private def checkDuplicateSketches(sketches: Seq[Sketch]): Unit = {
val uniqueSketches = sketches.toSet
if (uniqueSketches.size != sketches.size) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.rule

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hyperspace.utils.logicalPlanToDataFrame
import org.apache.spark.sql.types.StructType

import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags}
import com.microsoft.hyperspace.index.dataskipping.DataSkippingIndex
import com.microsoft.hyperspace.index.dataskipping.util.DataSkippingFileIndex
import com.microsoft.hyperspace.index.plans.logical.IndexHadoopFsRelation
import com.microsoft.hyperspace.index.rules._
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToSelectedIndexMap

object ApplyDataSkippingIndex extends HyperspaceRule {
protected override val filtersOnQueryPlan: Seq[QueryPlanIndexFilter] =
IndexTypeFilter[DataSkippingIndex]() :: FilterPlanNodeFilter :: FilterConditionFilter :: Nil

protected override val indexRanker: IndexRankFilter = FilterRankFilter

override def applyIndex(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): LogicalPlan = {
if (indexes.isEmpty) {
return plan
}
plan match {
case filter @ Filter(_, ExtractRelation(relation)) =>
val indexLogEntry = indexes(relation.plan)
val indexDataPred = indexLogEntry
.getTagValue(plan, IndexLogEntryTags.DATASKIPPING_INDEX_PREDICATE)
.get
.getOrElse { return plan }
val indexData = logicalPlanToDataFrame(spark, getIndexDataRelation(indexLogEntry))
val originalFileIndex = indexLogEntry.withCachedTag(
relation.plan,
IndexLogEntryTags.DATASKIPPING_SOURCE_FILEINDEX) {
relation.getOrCreateFileIndex(spark)
}
val dataSkippingFileIndex = new DataSkippingFileIndex(
spark,
indexLogEntry.fileIdTracker,
indexData,
indexDataPred,
originalFileIndex)
val newRelation = relation.createLogicalRelation(
IndexHadoopFsRelation(
relation.createHadoopFsRelation(
dataSkippingFileIndex,
relation.schema,
relation.options),
spark,
indexLogEntry),
relation.output.map(_.asInstanceOf[AttributeReference]))
filter.copy(child = newRelation)
case _ => plan
}
}

override def score(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): Int = {
if (indexes.isEmpty) {
return 0
}
// Return the lowest score so that covering indexes take precedence over
// data skipping indexes.
1
}

private def getIndexDataRelation(indexLogEntry: IndexLogEntry): LogicalRelation = {
val indexDataSchema = indexLogEntry.derivedDataset.asInstanceOf[DataSkippingIndex].schema
val indexDataLoc =
indexLogEntry.withCachedTag(IndexLogEntryTags.DATASKIPPING_INDEX_FILEINDEX) {
new InMemoryFileIndex(
spark,
indexLogEntry.content.files,
Map.empty,
Some(indexDataSchema),
FileStatusCache.getOrCreate(spark))
}
LogicalRelation(
new HadoopFsRelation(
indexDataLoc,
StructType(Nil),
indexDataSchema,
None,
new ParquetFileFormat,
Map.empty)(spark))
}
}
Loading

0 comments on commit 2c961b4

Please sign in to comment.