Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Data Skipping Index Part 3-2: Rule #482

Merged
merged 10 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ ThisBuild / Test / fork := true

ThisBuild / Test / javaOptions += "-Xmx1024m"

// Needed to test both non-codegen and codegen parts of expressions
ThisBuild / Test / envVars += "SPARK_TESTING" -> "1"
clee704 marked this conversation as resolved.
Show resolved Hide resolved

ThisBuild / coverageExcludedPackages := "com\\.fasterxml.*;com\\.microsoft\\.hyperspace\\.shim"

/**
Expand Down
24 changes: 24 additions & 0 deletions src/main/scala-spark2/com/microsoft/hyperspace/shim/First.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.shim

import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.catalyst.expressions.aggregate.First

object FirstNullSafe {
clee704 marked this conversation as resolved.
Show resolved Hide resolved
def apply(child: Expression): First = First(child, Literal(false))
}
24 changes: 24 additions & 0 deletions src/main/scala-spark3/com/microsoft/hyperspace/shim/First.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.shim

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.aggregate.First

object FirstNullSafe {
def apply(child: Expression): First = First(child, false)
}
11 changes: 1 addition & 10 deletions src/main/scala/com/microsoft/hyperspace/Hyperspace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.IndexConstants.{OPTIMIZE_MODE_QUICK, REFRESH_MODE_FULL}
import com.microsoft.hyperspace.index.plananalysis.{CandidateIndexAnalyzer, PlanAnalyzer}
import com.microsoft.hyperspace.index.rules.ApplyHyperspace
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.withHyperspaceRuleDisabled
import com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager

class Hyperspace(spark: SparkSession) {
Expand Down Expand Up @@ -189,15 +189,6 @@ class Hyperspace(spark: SparkSession) {
}
}
}

private def withHyperspaceRuleDisabled(f: => Unit): Unit = {
clee704 marked this conversation as resolved.
Show resolved Hide resolved
try {
ApplyHyperspace.disableForIndexMaintenance.set(true)
f
} finally {
ApplyHyperspace.disableForIndexMaintenance.set(false)
}
}
}

object Hyperspace extends ActiveSparkSession {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

package com.microsoft.hyperspace.index

import org.apache.spark.sql.execution.datasources.InMemoryFileIndex
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.{FileIndex, InMemoryFileIndex}

import com.microsoft.hyperspace.index.plananalysis.FilterReason

Expand Down Expand Up @@ -68,4 +69,17 @@ object IndexLogEntryTags {
// If it's enabled, FILTER_REASONS and APPLIED_INDEX_RULES info will be tagged.
val INDEX_PLAN_ANALYSIS_ENABLED: IndexLogEntryTag[Boolean] =
IndexLogEntryTag[Boolean]("indexPlanAnalysisEnabled")

// DATASKIPPING_INDEX_DATA_PREDICATE stores the index predicate translated
// from the plan's filter or join condition.
val DATASKIPPING_INDEX_PREDICATE: IndexLogEntryTag[Option[Expression]] =
IndexLogEntryTag[Option[Expression]]("dataskippingIndexPredicate")

// DATASKIPPING_INDEX_FILEINDEX stores InMemoryFileIndex for the index data.
val DATASKIPPING_INDEX_FILEINDEX: IndexLogEntryTag[InMemoryFileIndex] =
IndexLogEntryTag[InMemoryFileIndex]("dataskippingIndexRelation")

// DATASKIPPING_INDEX_FILEINDEX stores InMemoryFileIndex for the source data.
val DATASKIPPING_SOURCE_FILEINDEX: IndexLogEntryTag[FileIndex] =
IndexLogEntryTag[FileIndex]("dataskippingSourceRelation")
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@

package com.microsoft.hyperspace.index.dataskipping

import org.apache.spark.sql.{Column, DataFrame, SaveMode}
import org.apache.spark.sql.functions.{input_file_name, min, spark_partition_id}
import scala.collection.mutable

import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal, NamedExpression, Or}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.hyperspace.utils.StructTypeUtils
import org.apache.spark.sql.types.StructType

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.index._
Expand All @@ -30,10 +37,12 @@ import com.microsoft.hyperspace.util.HyperspaceConf
* files in relations using sketches.
*
* @param sketches List of sketches for this index
* @param schema Index data schema
* @param properties Properties for this index; see [[Index.properties]] for details.
*/
case class DataSkippingIndex(
sketches: Seq[Sketch],
schema: StructType,
override val properties: Map[String, String] = Map.empty)
extends Index {
assert(sketches.nonEmpty, "At least one sketch is required.")
Expand Down Expand Up @@ -61,7 +70,8 @@ case class DataSkippingIndex(
}

override def optimize(ctx: IndexerContext, indexDataFilesToOptimize: Seq[FileInfo]): Unit = {
val indexData = ctx.spark.read.parquet(indexDataFilesToOptimize.map(_.name): _*)
val indexData =
ctx.spark.read.schema(schema).parquet(indexDataFilesToOptimize.map(_.name): _*)
writeImpl(ctx, indexData, SaveMode.Overwrite)
}

Expand All @@ -71,7 +81,10 @@ case class DataSkippingIndex(
deletedSourceDataFiles: Seq[FileInfo],
indexContent: Content): (Index, Index.UpdateMode) = {
if (appendedSourceData.nonEmpty) {
writeImpl(ctx, index(ctx, appendedSourceData.get), SaveMode.Overwrite)
writeImpl(
ctx,
DataSkippingIndex.createIndexData(ctx, sketches, appendedSourceData.get),
SaveMode.Overwrite)
}
if (deletedSourceDataFiles.nonEmpty) {
val spark = ctx.spark
Expand All @@ -98,23 +111,157 @@ case class DataSkippingIndex(
override def refreshFull(
ctx: IndexerContext,
sourceData: DataFrame): (DataSkippingIndex, DataFrame) = {
val updatedIndex = copy(sketches = ExpressionUtils.resolve(ctx.spark, sketches, sourceData))
(updatedIndex, updatedIndex.index(ctx, sourceData))
val resolvedSketches = ExpressionUtils.resolve(ctx.spark, sketches, sourceData)
val indexData = DataSkippingIndex.createIndexData(ctx, resolvedSketches, sourceData)
val updatedIndex = copy(sketches = resolvedSketches, schema = indexData.schema)
(updatedIndex, indexData)
}

override def equals(that: Any): Boolean =
that match {
case DataSkippingIndex(thatSketches, _) => sketches.toSet == thatSketches.toSet
case DataSkippingIndex(thatSketches, thatSchema, _) =>
sketches.toSet == thatSketches.toSet && schema == thatSchema
case _ => false
}

override def hashCode: Int = sketches.map(_.hashCode).sum

/**
* Translate the given filter/join condition for the source data to a
* predicate that can be used to filter out unnecessary source data files when
* applied to index data.
*
* For example, a filter condition "A = 1" can be translated into an index
* predicate "Min_A <= 1 && Max_A >= 1" to filter out files which cannot satisfy
* the condition for any rows in the file.
*/
def translateFilterCondition(
spark: SparkSession,
condition: Expression,
source: LogicalPlan): Option[Expression] = {
val resolvedExprs =
ExpressionUtils.getResolvedExprs(spark, sketches, source).getOrElse { return None }
val predMap = buildPredicateMap(spark, condition, source, resolvedExprs)

// Create a single index predicate for a single source predicate node,
// by combining individual index predicates with And.
// True is returned if there are no index predicates for the source predicate node.
def toIndexPred(sourcePred: Expression): Expression = {
predMap.get(sourcePred).map(_.reduceLeft(And)).getOrElse(Literal.TrueLiteral)
}

// Compose an index predicate visiting the source predicate tree recursively.
def composeIndexPred(sourcePred: Expression): Expression =
sourcePred match {
case and: And => And(toIndexPred(and), and.mapChildren(composeIndexPred))
case or: Or => And(toIndexPred(or), or.mapChildren(composeIndexPred))
case leaf => toIndexPred(leaf)
}

val indexPredicate = composeIndexPred(condition)

// Apply constant folding to get the final predicate.
// This is a trimmed down version of the BooleanSimplification rule.
// It's just enough to determine whether the index is applicable or not.
val optimizePredicate: PartialFunction[Expression, Expression] = {
case And(Literal.TrueLiteral, right) => right
case And(left, Literal.TrueLiteral) => left
case And(a, And(b, c)) if a.deterministic && a == b => And(b, c)
case Or(t @ Literal.TrueLiteral, right) => t
case Or(left, t @ Literal.TrueLiteral) => t
}
val optimizedIndexPredicate = indexPredicate.transformUp(optimizePredicate)

// Return None if the index predicate is True - meaning no conversion can be done.
if (optimizedIndexPredicate == Literal.TrueLiteral) {
None
} else {
Some(optimizedIndexPredicate)
}
}

private def writeImpl(ctx: IndexerContext, indexData: DataFrame, writeMode: SaveMode): Unit = {
// require instead of assert, as the condition can potentially be broken by
// code which is external to dataskipping.
require(
indexData.schema.sameType(schema),
"Schema of the index data doesn't match the index schema: " +
s"index data schema = ${indexData.schema.toDDL}, index schema = ${schema.toDDL}")
indexData.cache()
indexData.count() // force cache
val indexDataSize = DataFrameUtils.getSizeInBytes(indexData)
val targetIndexDataFileSize = HyperspaceConf.DataSkipping.targetIndexDataFileSize(ctx.spark)
val numFiles = indexDataSize / targetIndexDataFileSize
if (!numFiles.isValidInt) {
throw HyperspaceException(
"Could not create index data files due to too many files: " +
clee704 marked this conversation as resolved.
Show resolved Hide resolved
s"indexDataSize=$indexDataSize, targetIndexDataFileSize=$targetIndexDataFileSize")
}
val repartitionedIndexData = indexData.repartition(math.max(1, numFiles.toInt))
repartitionedIndexData.write.mode(writeMode).parquet(ctx.indexDataPath.toString)
indexData.unpersist()
}

/**
* Collects index predicates for each node in the source predicate.
*/
private def buildPredicateMap(
spark: SparkSession,
predicate: Expression,
source: LogicalPlan,
resolvedExprs: Map[Sketch, Seq[Expression]])
: mutable.Map[Expression, mutable.Buffer[Expression]] = {
clee704 marked this conversation as resolved.
Show resolved Hide resolved
val predMap = mutable.Map[Expression, mutable.Buffer[Expression]]()
val sketchesWithIndex = sketches.zipWithIndex
predicate.foreachUp { sourcePred =>
val indexPreds = sketchesWithIndex.flatMap {
case (sketch, idx) =>
sketch.convertPredicate(
sourcePred,
resolvedExprs(sketch),
source.output.map(attr => attr.exprId -> attr.name).toMap,
aggrNames(idx).map(UnresolvedAttribute.quoted(_)))
}
if (indexPreds.nonEmpty) {
predMap.getOrElseUpdate(sourcePred, mutable.Buffer.empty) ++= indexPreds
}
}
predMap
}

private def aggrNames(i: Int): Seq[String] = {
aggregateFunctions
.slice(sketchOffsets(i), sketchOffsets(i + 1))
.map(_.expr.asInstanceOf[NamedExpression].name)
}

/**
* Sketch offsets are used to map each sketch to its corresponding columns
* in the dataframe.
*/
@transient
private lazy val sketchOffsets: Seq[Int] =
sketches.map(_.aggregateFunctions.length).scanLeft(0)(_ + _)

@transient
private lazy val aggregateFunctions = DataSkippingIndex.getNamedAggregateFunctions(sketches)
}

object DataSkippingIndex {
// $COVERAGE-OFF$ https://github.com/scoverage/scalac-scoverage-plugin/issues/125
final val kind = "DataSkippingIndex"
final val kindAbbr = "DS"
// $COVERAGE-ON$

/**
* Creates index data for the given source data.
*/
def index(ctx: IndexerContext, sourceData: DataFrame): DataFrame = {
def createIndexData(
ctx: IndexerContext,
sketches: Seq[Sketch],
sourceData: DataFrame): DataFrame = {
val fileNameCol = "input_file_name"
val aggregateFunctions = getNamedAggregateFunctions(sketches)
val indexDataWithFileName = sourceData
.groupBy(input_file_name().as(fileNameCol))
.agg(aggregateFunctions.head, aggregateFunctions.tail: _*)
Expand All @@ -137,20 +284,15 @@ case class DataSkippingIndex(
indexDataWithFileName.columns.filterNot(_ == fileNameCol).map(c => s"`$c`"): _*)
}

private def writeImpl(ctx: IndexerContext, indexData: DataFrame, writeMode: SaveMode): Unit = {
indexData.cache()
indexData.count() // force cache
val indexDataSize = DataFrameUtils.getSizeInBytes(indexData)
val targetIndexDataFileSize = HyperspaceConf.DataSkipping.targetIndexDataFileSize(ctx.spark)
val numFiles = indexDataSize / targetIndexDataFileSize
if (!numFiles.isValidInt) {
throw HyperspaceException(
"Could not create index data files due to too many files: " +
s"indexDataSize=$indexDataSize, targetIndexDataFileSize=$targetIndexDataFileSize")
def getNamedAggregateFunctions(sketches: Seq[Sketch]): Seq[Column] = {
sketches.flatMap { s =>
val aggrs = s.aggregateFunctions
assert(aggrs.nonEmpty)
aggrs.zipWithIndex.map {
case (aggr, idx) =>
new Column(aggr).as(getNormalizeColumnName(s"${s}_$idx"))
}
}
val repartitionedIndexData = indexData.repartition(math.max(1, numFiles.toInt))
repartitionedIndexData.write.mode(writeMode).parquet(ctx.indexDataPath.toString)
indexData.unpersist()
}

/**
Expand All @@ -159,21 +301,4 @@ case class DataSkippingIndex(
private def getNormalizeColumnName(name: String): String = {
name.replaceAll("[ ,;{}()\n\t=]", "_")
}

@transient
private lazy val aggregateFunctions = sketches.flatMap { s =>
val aggrs = s.aggregateFunctions
assert(aggrs.nonEmpty)
aggrs.zipWithIndex.map {
case (aggr, idx) =>
new Column(aggr).as(getNormalizeColumnName(s"${s}_$idx"))
}
}
}

object DataSkippingIndex {
// $COVERAGE-OFF$ https://github.com/scoverage/scalac-scoverage-plugin/issues/125
final val kind = "DataSkippingIndex"
final val kindAbbr = "DS"
// $COVERAGE-ON$
}
Loading