Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Data Skipping Index Part 3-2: Rule
Browse files Browse the repository at this point in the history
  • Loading branch information
Chungmin Lee committed Aug 22, 2021
1 parent e131f2a commit a4f5128
Show file tree
Hide file tree
Showing 27 changed files with 1,780 additions and 20 deletions.
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ ThisBuild / Test / fork := true

ThisBuild / Test / javaOptions += "-Xmx1024m"

// Needed to test both non-codegen and codegen parts of expressions
ThisBuild / Test / envVars += "SPARK_TESTING" -> "1"

ThisBuild / coverageExcludedPackages := "com\\.fasterxml.*;com\\.microsoft\\.hyperspace\\.shim"

/**
Expand Down
Empty file.
11 changes: 1 addition & 10 deletions src/main/scala/com/microsoft/hyperspace/Hyperspace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.IndexConstants.{OPTIMIZE_MODE_QUICK, REFRESH_MODE_FULL}
import com.microsoft.hyperspace.index.plananalysis.{CandidateIndexAnalyzer, PlanAnalyzer}
import com.microsoft.hyperspace.index.rules.ApplyHyperspace
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.withHyperspaceRuleDisabled
import com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager

class Hyperspace(spark: SparkSession) {
Expand Down Expand Up @@ -189,15 +189,6 @@ class Hyperspace(spark: SparkSession) {
}
}
}

private def withHyperspaceRuleDisabled(f: => Unit): Unit = {
try {
ApplyHyperspace.disableForIndexMaintenance.set(true)
f
} finally {
ApplyHyperspace.disableForIndexMaintenance.set(false)
}
}
}

object Hyperspace extends ActiveSparkSession {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.microsoft.hyperspace.index

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.InMemoryFileIndex

import com.microsoft.hyperspace.index.plananalysis.FilterReason
Expand Down Expand Up @@ -68,4 +69,17 @@ object IndexLogEntryTags {
// If it's enabled, FILTER_REASONS and APPLIED_INDEX_RULES info will be tagged.
val INDEX_PLAN_ANALYSIS_ENABLED: IndexLogEntryTag[Boolean] =
IndexLogEntryTag[Boolean]("indexPlanAnalysisEnabled")

// DATASKIPPING_INDEX_DATA_PREDICATE stores the index predicate translated
// from the plan's filter or join condition.
val DATASKIPPING_INDEX_PREDICATE: IndexLogEntryTag[Option[Expression]] =
IndexLogEntryTag[Option[Expression]]("dataskippingIndexPredicate")

// DATASKIPPING_INDEX_FILEINDEX stores InMemoryFileIndex for the index data.
val DATASKIPPING_INDEX_FILEINDEX: IndexLogEntryTag[InMemoryFileIndex] =
IndexLogEntryTag[InMemoryFileIndex]("dataskippingIndexRelation")

// DATASKIPPING_INDEX_FILEINDEX stores InMemoryFileIndex for the source data.
val DATASKIPPING_SOURCE_FILEINDEX: IndexLogEntryTag[InMemoryFileIndex] =
IndexLogEntryTag[InMemoryFileIndex]("dataskippingSourceRelation")
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@

package com.microsoft.hyperspace.index.dataskipping

import org.apache.spark.sql.{Column, DataFrame, SaveMode}
import org.apache.spark.sql.functions.{input_file_name, min, spark_partition_id}
import scala.collection.mutable

import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal, NamedExpression, Or}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.hyperspace.utils.StructTypeUtils
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -121,6 +126,60 @@ case class DataSkippingIndex(

override def hashCode: Int = sketches.map(_.hashCode).sum

/**
* Translate the given filter/join condition for the source data to a
* predicate that can be used to filter out unnecessary source data files when
* applied to index data.
*
* For example, a filter condition "A = 1" can be translated into an index
* predicate "Min_A <= 1 && Max_A >= 1" to filter out files which cannot satisfy
* the condition for any rows in the file.
*/
def translateFilterCondition(
spark: SparkSession,
condition: Expression,
source: LogicalPlan): Option[Expression] = {
val resolvedExprs =
ExpressionUtils.getResolvedExprs(spark, sketches, source).getOrElse { return None }
val predMap = buildPredicateMap(spark, condition, source, resolvedExprs)

// Create a single index predicate for a single source predicate node,
// by combining individual index predicates with And.
// True is returned if there are no index predicates for the source predicate node.
def toIndexPred(sourcePred: Expression): Expression = {
predMap.get(sourcePred).map(_.reduceLeft(And)).getOrElse(Literal.TrueLiteral)
}

// Compose an index predicate visiting the source predicate tree recursively.
def composeIndexPred(sourcePred: Expression): Expression =
sourcePred match {
case and: And => And(toIndexPred(and), and.mapChildren(composeIndexPred))
case or: Or => And(toIndexPred(or), or.mapChildren(composeIndexPred))
case leaf => toIndexPred(leaf)
}

val indexPredicate = composeIndexPred(condition)

// Apply constant folding to get the final predicate.
// This is a trimmed down version of the BooleanSimplification rule.
// It's just enough to determine whether the index is applicable or not.
val optimizePredicate: PartialFunction[Expression, Expression] = {
case And(Literal.TrueLiteral, right) => right
case And(left, Literal.TrueLiteral) => left
case And(a, And(b, c)) if a.deterministic && a == b => And(b, c)
case Or(t @ Literal.TrueLiteral, right) => t
case Or(left, t @ Literal.TrueLiteral) => t
}
val optimizedIndexPredicate = indexPredicate.transformUp(optimizePredicate)

// Return None if the index predicate is True - meaning no conversion can be done.
if (optimizedIndexPredicate == Literal.TrueLiteral) {
None
} else {
Some(optimizedIndexPredicate)
}
}

private def writeImpl(ctx: IndexerContext, indexData: DataFrame, writeMode: SaveMode): Unit = {
// require instead of assert, as the condition can potentially be broken by
// code which is external to dataskipping.
Expand All @@ -142,6 +201,50 @@ case class DataSkippingIndex(
repartitionedIndexData.write.mode(writeMode).parquet(ctx.indexDataPath.toString)
indexData.unpersist()
}

/**
* Collects index predicates for each node in the source predicate.
*/
private def buildPredicateMap(
spark: SparkSession,
predicate: Expression,
source: LogicalPlan,
resolvedExprs: Map[Sketch, Seq[Expression]])
: mutable.Map[Expression, mutable.Buffer[Expression]] = {
val predMap = mutable.Map[Expression, mutable.Buffer[Expression]]()
val sketchesWithIndex = sketches.zipWithIndex
predicate.foreachUp { sourcePred =>
val indexPreds = sketchesWithIndex.flatMap {
case (sketch, idx) =>
sketch.convertPredicate(
sourcePred,
aggrNames(idx).map(UnresolvedAttribute.quoted(_)),
source.output.map(attr => attr.exprId -> attr.name).toMap,
resolvedExprs(sketch))
}
if (indexPreds.nonEmpty) {
predMap.getOrElseUpdate(sourcePred, mutable.Buffer.empty) ++= indexPreds
}
}
predMap
}

private def aggrNames(i: Int): Seq[String] = {
aggregateFunctions
.slice(sketchOffsets(i), sketchOffsets(i + 1))
.map(_.expr.asInstanceOf[NamedExpression].name)
}

/**
* Sketch offsets are used to map each sketch to its corresponding columns
* in the dataframe.
*/
@transient
private lazy val sketchOffsets: Seq[Int] =
sketches.map(_.aggregateFunctions.length).scanLeft(0)(_ + _)

@transient
private lazy val aggregateFunctions = DataSkippingIndex.getNamedAggregateFunctions(sketches)
}

object DataSkippingIndex {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.rule

import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, HadoopFsRelation, InMemoryFileIndex, LogicalRelation, PartitioningAwareFileIndex}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hyperspace.utils.logicalPlanToDataFrame
import org.apache.spark.sql.types.StructType

import com.microsoft.hyperspace.index.IndexLogEntryTags
import com.microsoft.hyperspace.index.dataskipping.DataSkippingIndex
import com.microsoft.hyperspace.index.dataskipping.util.DataSkippingFileIndex
import com.microsoft.hyperspace.index.plans.logical.IndexHadoopFsRelation
import com.microsoft.hyperspace.index.rules.{ExtractRelation, HyperspaceRule, IndexRankFilter, IndexTypeFilter, QueryPlanIndexFilter}
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToSelectedIndexMap

object ApplyDataSkippingIndex extends HyperspaceRule {
protected override val filtersOnQueryPlan: Seq[QueryPlanIndexFilter] =
IndexTypeFilter[DataSkippingIndex]() :: FilterPlanNodeFilter :: FilterConditionFilter :: Nil

protected override val indexRanker: IndexRankFilter = FilterRankFilter

override def applyIndex(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): LogicalPlan = {
if (indexes.isEmpty) {
return plan
}
plan match {
case filter @ Filter(_, ExtractRelation(relation)) =>
val indexLogEntry = indexes(relation.plan)
val indexDataPred = indexLogEntry
.getTagValue(plan, IndexLogEntryTags.DATASKIPPING_INDEX_PREDICATE)
.get
.getOrElse { return plan }
val indexDataSchema = indexLogEntry.derivedDataset.asInstanceOf[DataSkippingIndex].schema
val fileStatusCache = FileStatusCache.getOrCreate(spark)
val indexDataLoc =
indexLogEntry.withCachedTag(IndexLogEntryTags.DATASKIPPING_INDEX_FILEINDEX) {
new InMemoryFileIndex(
spark,
indexLogEntry.content.files,
Map.empty,
Some(indexDataSchema),
fileStatusCache)
}
val indexDataRel = LogicalRelation(
new HadoopFsRelation(
indexDataLoc,
StructType(Nil),
indexDataSchema,
None,
new ParquetFileFormat,
Map.empty)(spark))
val indexData = logicalPlanToDataFrame(spark, indexDataRel)
val originalFileIndex = relation.plan match {
case LogicalRelation(HadoopFsRelation(location, _, _, _, _, _), _, _, _) => location
case _ =>
indexLogEntry.withCachedTag(
relation.plan,
IndexLogEntryTags.DATASKIPPING_SOURCE_FILEINDEX) {
new InMemoryFileIndex(
spark,
relation.rootPaths,
relation.partitionBasePath
.map(PartitioningAwareFileIndex.BASE_PATH_PARAM -> _)
.toMap,
Some(relation.schema),
fileStatusCache)
}
}
val dataSkippingFileIndex = new DataSkippingFileIndex(
spark,
indexData,
indexDataPred,
indexLogEntry.fileIdTracker,
originalFileIndex)
val newFsRelation = IndexHadoopFsRelation(
relation.createHadoopFsRelation(
dataSkippingFileIndex,
relation.schema,
relation.options),
spark,
indexLogEntry)
val output = relation.output.map(_.asInstanceOf[AttributeReference])
filter.copy(child = relation.createLogicalRelation(newFsRelation, output))
case _ => plan
}
}

override def score(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): Int = {
if (indexes.isEmpty) {
return 0
}
// Return the lowest score so that covering indexes take precedence over
// data skipping indexes.
1
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.dataskipping.rule

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}

import com.microsoft.hyperspace.index.IndexLogEntryTags
import com.microsoft.hyperspace.index.dataskipping.DataSkippingIndex
import com.microsoft.hyperspace.index.plananalysis.FilterReasons
import com.microsoft.hyperspace.index.rules.{ExtractRelation, QueryPlanIndexFilter}
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap

/**
* FilterConditionFilter filters indexes out if
* 1) an index cannot be applied to the filter condition.
*/
object FilterConditionFilter extends QueryPlanIndexFilter {
override def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = {
if (candidateIndexes.isEmpty) {
return Map.empty
}
plan match {
case Filter(condition: Expression, ExtractRelation(relation)) =>
val applicableIndexes = candidateIndexes(relation.plan).flatMap { indexLogEntry =>
val indexDataPredOpt =
indexLogEntry.withCachedTag(plan, IndexLogEntryTags.DATASKIPPING_INDEX_PREDICATE) {
val index = indexLogEntry.derivedDataset.asInstanceOf[DataSkippingIndex]
index.translateFilterCondition(spark, condition, relation.plan)
}
if (withFilterReasonTag(
plan,
indexLogEntry,
FilterReasons.IneligibleFilterCondition(condition.sql))(
indexDataPredOpt.nonEmpty)) {
Some(indexLogEntry)
} else {
None
}
}
if (applicableIndexes.nonEmpty) {
Map(relation.plan -> applicableIndexes)
} else {
Map.empty
}
case _ => Map.empty
}
}
}
Loading

0 comments on commit a4f5128

Please sign in to comment.