From a4f5128cfaa04275a432836268758094fafe098e Mon Sep 17 00:00:00 2001 From: Chungmin Lee Date: Wed, 28 Jul 2021 19:19:21 +0900 Subject: [PATCH] Data Skipping Index Part 3-2: Rule --- build.sbt | 3 + .../microsoft/hyperspace/shim/ScalaUDF.scala | 0 .../com/microsoft/hyperspace/Hyperspace.scala | 11 +- .../hyperspace/index/IndexLogEntryTags.scala | 14 + .../dataskipping/DataSkippingIndex.scala | 107 ++++- .../rule/ApplyDataSkippingIndex.scala | 113 ++++++ .../rule/FilterConditionFilter.scala | 63 +++ .../rule/FilterPlanNodeFilter.scala | 40 ++ .../dataskipping/rule/FilterRankFilter.scala | 33 ++ .../dataskipping/sketch/MinMaxSketch.scala | 57 ++- .../index/dataskipping/sketch/Sketch.scala | 27 +- .../util/DataSkippingFileIndex.scala | 73 ++++ .../dataskipping/util/ExpressionUtils.scala | 49 +++ .../index/plananalysis/FilterReason.scala | 7 + .../index/plananalysis/PlanAnalyzer.scala | 9 +- .../plans/logical/IndexHadoopFsRelation.scala | 18 + .../index/rules/ApplyHyperspace.scala | 9 + .../rules/ScoreBasedIndexPlanOptimizer.scala | 4 +- .../default/DefaultFileBasedRelation.scala | 5 + .../sources/iceberg/IcebergRelation.scala | 3 + .../hyperspace/index/sources/interfaces.scala | 7 +- .../DataSkippingIndexIntegrationTest.scala | 279 ++++++++++++- .../dataskipping/DataSkippingSuite.scala | 72 ++++ .../rule/ApplyDataSkippingIndexTest.scala | 371 ++++++++++++++++++ .../rule/FilterConditionFilterTest.scala | 85 ++++ .../rule/FilterPlanNodeFilterTest.scala | 51 +++ .../sketch/MinMaxSketchTest.scala | 290 ++++++++++++++ 27 files changed, 1780 insertions(+), 20 deletions(-) create mode 100644 src/main/scala-spark2/com/microsoft/hyperspace/shim/ScalaUDF.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/dataskipping/rule/ApplyDataSkippingIndex.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/dataskipping/rule/FilterConditionFilter.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/dataskipping/rule/FilterPlanNodeFilter.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/dataskipping/rule/FilterRankFilter.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/DataSkippingFileIndex.scala create mode 100644 src/test/scala/com/microsoft/hyperspace/index/dataskipping/rule/ApplyDataSkippingIndexTest.scala create mode 100644 src/test/scala/com/microsoft/hyperspace/index/dataskipping/rule/FilterConditionFilterTest.scala create mode 100644 src/test/scala/com/microsoft/hyperspace/index/dataskipping/rule/FilterPlanNodeFilterTest.scala diff --git a/build.sbt b/build.sbt index ee3bb6b3f..ac0fafde7 100644 --- a/build.sbt +++ b/build.sbt @@ -127,6 +127,9 @@ ThisBuild / Test / fork := true ThisBuild / Test / javaOptions += "-Xmx1024m" +// Needed to test both non-codegen and codegen parts of expressions +ThisBuild / Test / envVars += "SPARK_TESTING" -> "1" + ThisBuild / coverageExcludedPackages := "com\\.fasterxml.*;com\\.microsoft\\.hyperspace\\.shim" /** diff --git a/src/main/scala-spark2/com/microsoft/hyperspace/shim/ScalaUDF.scala b/src/main/scala-spark2/com/microsoft/hyperspace/shim/ScalaUDF.scala new file mode 100644 index 000000000..e69de29bb diff --git a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala index a6c883f88..c55191744 100644 --- a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.IndexConstants.{OPTIMIZE_MODE_QUICK, REFRESH_MODE_FULL} import com.microsoft.hyperspace.index.plananalysis.{CandidateIndexAnalyzer, PlanAnalyzer} -import com.microsoft.hyperspace.index.rules.ApplyHyperspace +import com.microsoft.hyperspace.index.rules.ApplyHyperspace.withHyperspaceRuleDisabled import com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager class Hyperspace(spark: SparkSession) { @@ -189,15 +189,6 @@ class Hyperspace(spark: SparkSession) { } } } - - private def withHyperspaceRuleDisabled(f: => Unit): Unit = { - try { - ApplyHyperspace.disableForIndexMaintenance.set(true) - f - } finally { - ApplyHyperspace.disableForIndexMaintenance.set(false) - } - } } object Hyperspace extends ActiveSparkSession { diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala index ec9e071bb..ebff8063f 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala @@ -16,6 +16,7 @@ package com.microsoft.hyperspace.index +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources.InMemoryFileIndex import com.microsoft.hyperspace.index.plananalysis.FilterReason @@ -68,4 +69,17 @@ object IndexLogEntryTags { // If it's enabled, FILTER_REASONS and APPLIED_INDEX_RULES info will be tagged. val INDEX_PLAN_ANALYSIS_ENABLED: IndexLogEntryTag[Boolean] = IndexLogEntryTag[Boolean]("indexPlanAnalysisEnabled") + + // DATASKIPPING_INDEX_DATA_PREDICATE stores the index predicate translated + // from the plan's filter or join condition. + val DATASKIPPING_INDEX_PREDICATE: IndexLogEntryTag[Option[Expression]] = + IndexLogEntryTag[Option[Expression]]("dataskippingIndexPredicate") + + // DATASKIPPING_INDEX_FILEINDEX stores InMemoryFileIndex for the index data. + val DATASKIPPING_INDEX_FILEINDEX: IndexLogEntryTag[InMemoryFileIndex] = + IndexLogEntryTag[InMemoryFileIndex]("dataskippingIndexRelation") + + // DATASKIPPING_INDEX_FILEINDEX stores InMemoryFileIndex for the source data. + val DATASKIPPING_SOURCE_FILEINDEX: IndexLogEntryTag[InMemoryFileIndex] = + IndexLogEntryTag[InMemoryFileIndex]("dataskippingSourceRelation") } diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndex.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndex.scala index ced7b7624..c4fbe08c4 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndex.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndex.scala @@ -16,8 +16,13 @@ package com.microsoft.hyperspace.index.dataskipping -import org.apache.spark.sql.{Column, DataFrame, SaveMode} -import org.apache.spark.sql.functions.{input_file_name, min, spark_partition_id} +import scala.collection.mutable + +import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal, NamedExpression, Or} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.functions.input_file_name import org.apache.spark.sql.hyperspace.utils.StructTypeUtils import org.apache.spark.sql.types.StructType @@ -121,6 +126,60 @@ case class DataSkippingIndex( override def hashCode: Int = sketches.map(_.hashCode).sum + /** + * Translate the given filter/join condition for the source data to a + * predicate that can be used to filter out unnecessary source data files when + * applied to index data. + * + * For example, a filter condition "A = 1" can be translated into an index + * predicate "Min_A <= 1 && Max_A >= 1" to filter out files which cannot satisfy + * the condition for any rows in the file. + */ + def translateFilterCondition( + spark: SparkSession, + condition: Expression, + source: LogicalPlan): Option[Expression] = { + val resolvedExprs = + ExpressionUtils.getResolvedExprs(spark, sketches, source).getOrElse { return None } + val predMap = buildPredicateMap(spark, condition, source, resolvedExprs) + + // Create a single index predicate for a single source predicate node, + // by combining individual index predicates with And. + // True is returned if there are no index predicates for the source predicate node. + def toIndexPred(sourcePred: Expression): Expression = { + predMap.get(sourcePred).map(_.reduceLeft(And)).getOrElse(Literal.TrueLiteral) + } + + // Compose an index predicate visiting the source predicate tree recursively. + def composeIndexPred(sourcePred: Expression): Expression = + sourcePred match { + case and: And => And(toIndexPred(and), and.mapChildren(composeIndexPred)) + case or: Or => And(toIndexPred(or), or.mapChildren(composeIndexPred)) + case leaf => toIndexPred(leaf) + } + + val indexPredicate = composeIndexPred(condition) + + // Apply constant folding to get the final predicate. + // This is a trimmed down version of the BooleanSimplification rule. + // It's just enough to determine whether the index is applicable or not. + val optimizePredicate: PartialFunction[Expression, Expression] = { + case And(Literal.TrueLiteral, right) => right + case And(left, Literal.TrueLiteral) => left + case And(a, And(b, c)) if a.deterministic && a == b => And(b, c) + case Or(t @ Literal.TrueLiteral, right) => t + case Or(left, t @ Literal.TrueLiteral) => t + } + val optimizedIndexPredicate = indexPredicate.transformUp(optimizePredicate) + + // Return None if the index predicate is True - meaning no conversion can be done. + if (optimizedIndexPredicate == Literal.TrueLiteral) { + None + } else { + Some(optimizedIndexPredicate) + } + } + private def writeImpl(ctx: IndexerContext, indexData: DataFrame, writeMode: SaveMode): Unit = { // require instead of assert, as the condition can potentially be broken by // code which is external to dataskipping. @@ -142,6 +201,50 @@ case class DataSkippingIndex( repartitionedIndexData.write.mode(writeMode).parquet(ctx.indexDataPath.toString) indexData.unpersist() } + + /** + * Collects index predicates for each node in the source predicate. + */ + private def buildPredicateMap( + spark: SparkSession, + predicate: Expression, + source: LogicalPlan, + resolvedExprs: Map[Sketch, Seq[Expression]]) + : mutable.Map[Expression, mutable.Buffer[Expression]] = { + val predMap = mutable.Map[Expression, mutable.Buffer[Expression]]() + val sketchesWithIndex = sketches.zipWithIndex + predicate.foreachUp { sourcePred => + val indexPreds = sketchesWithIndex.flatMap { + case (sketch, idx) => + sketch.convertPredicate( + sourcePred, + aggrNames(idx).map(UnresolvedAttribute.quoted(_)), + source.output.map(attr => attr.exprId -> attr.name).toMap, + resolvedExprs(sketch)) + } + if (indexPreds.nonEmpty) { + predMap.getOrElseUpdate(sourcePred, mutable.Buffer.empty) ++= indexPreds + } + } + predMap + } + + private def aggrNames(i: Int): Seq[String] = { + aggregateFunctions + .slice(sketchOffsets(i), sketchOffsets(i + 1)) + .map(_.expr.asInstanceOf[NamedExpression].name) + } + + /** + * Sketch offsets are used to map each sketch to its corresponding columns + * in the dataframe. + */ + @transient + private lazy val sketchOffsets: Seq[Int] = + sketches.map(_.aggregateFunctions.length).scanLeft(0)(_ + _) + + @transient + private lazy val aggregateFunctions = DataSkippingIndex.getNamedAggregateFunctions(sketches) } object DataSkippingIndex { diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/rule/ApplyDataSkippingIndex.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/rule/ApplyDataSkippingIndex.scala new file mode 100644 index 000000000..7dd26b682 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/rule/ApplyDataSkippingIndex.scala @@ -0,0 +1,113 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.rule + +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} +import org.apache.spark.sql.execution.datasources.{FileStatusCache, HadoopFsRelation, InMemoryFileIndex, LogicalRelation, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.hyperspace.utils.logicalPlanToDataFrame +import org.apache.spark.sql.types.StructType + +import com.microsoft.hyperspace.index.IndexLogEntryTags +import com.microsoft.hyperspace.index.dataskipping.DataSkippingIndex +import com.microsoft.hyperspace.index.dataskipping.util.DataSkippingFileIndex +import com.microsoft.hyperspace.index.plans.logical.IndexHadoopFsRelation +import com.microsoft.hyperspace.index.rules.{ExtractRelation, HyperspaceRule, IndexRankFilter, IndexTypeFilter, QueryPlanIndexFilter} +import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToSelectedIndexMap + +object ApplyDataSkippingIndex extends HyperspaceRule { + protected override val filtersOnQueryPlan: Seq[QueryPlanIndexFilter] = + IndexTypeFilter[DataSkippingIndex]() :: FilterPlanNodeFilter :: FilterConditionFilter :: Nil + + protected override val indexRanker: IndexRankFilter = FilterRankFilter + + override def applyIndex(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): LogicalPlan = { + if (indexes.isEmpty) { + return plan + } + plan match { + case filter @ Filter(_, ExtractRelation(relation)) => + val indexLogEntry = indexes(relation.plan) + val indexDataPred = indexLogEntry + .getTagValue(plan, IndexLogEntryTags.DATASKIPPING_INDEX_PREDICATE) + .get + .getOrElse { return plan } + val indexDataSchema = indexLogEntry.derivedDataset.asInstanceOf[DataSkippingIndex].schema + val fileStatusCache = FileStatusCache.getOrCreate(spark) + val indexDataLoc = + indexLogEntry.withCachedTag(IndexLogEntryTags.DATASKIPPING_INDEX_FILEINDEX) { + new InMemoryFileIndex( + spark, + indexLogEntry.content.files, + Map.empty, + Some(indexDataSchema), + fileStatusCache) + } + val indexDataRel = LogicalRelation( + new HadoopFsRelation( + indexDataLoc, + StructType(Nil), + indexDataSchema, + None, + new ParquetFileFormat, + Map.empty)(spark)) + val indexData = logicalPlanToDataFrame(spark, indexDataRel) + val originalFileIndex = relation.plan match { + case LogicalRelation(HadoopFsRelation(location, _, _, _, _, _), _, _, _) => location + case _ => + indexLogEntry.withCachedTag( + relation.plan, + IndexLogEntryTags.DATASKIPPING_SOURCE_FILEINDEX) { + new InMemoryFileIndex( + spark, + relation.rootPaths, + relation.partitionBasePath + .map(PartitioningAwareFileIndex.BASE_PATH_PARAM -> _) + .toMap, + Some(relation.schema), + fileStatusCache) + } + } + val dataSkippingFileIndex = new DataSkippingFileIndex( + spark, + indexData, + indexDataPred, + indexLogEntry.fileIdTracker, + originalFileIndex) + val newFsRelation = IndexHadoopFsRelation( + relation.createHadoopFsRelation( + dataSkippingFileIndex, + relation.schema, + relation.options), + spark, + indexLogEntry) + val output = relation.output.map(_.asInstanceOf[AttributeReference]) + filter.copy(child = relation.createLogicalRelation(newFsRelation, output)) + case _ => plan + } + } + + override def score(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): Int = { + if (indexes.isEmpty) { + return 0 + } + // Return the lowest score so that covering indexes take precedence over + // data skipping indexes. + 1 + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/rule/FilterConditionFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/rule/FilterConditionFilter.scala new file mode 100644 index 000000000..385384ef2 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/rule/FilterConditionFilter.scala @@ -0,0 +1,63 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.rule + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} + +import com.microsoft.hyperspace.index.IndexLogEntryTags +import com.microsoft.hyperspace.index.dataskipping.DataSkippingIndex +import com.microsoft.hyperspace.index.plananalysis.FilterReasons +import com.microsoft.hyperspace.index.rules.{ExtractRelation, QueryPlanIndexFilter} +import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap + +/** + * FilterConditionFilter filters indexes out if + * 1) an index cannot be applied to the filter condition. + */ +object FilterConditionFilter extends QueryPlanIndexFilter { + override def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = { + if (candidateIndexes.isEmpty) { + return Map.empty + } + plan match { + case Filter(condition: Expression, ExtractRelation(relation)) => + val applicableIndexes = candidateIndexes(relation.plan).flatMap { indexLogEntry => + val indexDataPredOpt = + indexLogEntry.withCachedTag(plan, IndexLogEntryTags.DATASKIPPING_INDEX_PREDICATE) { + val index = indexLogEntry.derivedDataset.asInstanceOf[DataSkippingIndex] + index.translateFilterCondition(spark, condition, relation.plan) + } + if (withFilterReasonTag( + plan, + indexLogEntry, + FilterReasons.IneligibleFilterCondition(condition.sql))( + indexDataPredOpt.nonEmpty)) { + Some(indexLogEntry) + } else { + None + } + } + if (applicableIndexes.nonEmpty) { + Map(relation.plan -> applicableIndexes) + } else { + Map.empty + } + case _ => Map.empty + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/rule/FilterPlanNodeFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/rule/FilterPlanNodeFilter.scala new file mode 100644 index 000000000..4a465a93d --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/rule/FilterPlanNodeFilter.scala @@ -0,0 +1,40 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.rule + +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} + +import com.microsoft.hyperspace.index.rules.{ExtractRelation, QueryPlanIndexFilter, RuleUtils} +import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap + +/** + * FilterPlanNodeFilter filters indexes out if + * 1) the given plan is not eligible filter plan node. + * 2) the source plan of index is not part of the filter plan. + */ +object FilterPlanNodeFilter extends QueryPlanIndexFilter { + override def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = { + if (candidateIndexes.isEmpty) { + return Map.empty + } + plan match { + case Filter(_, ExtractRelation(relation)) if !RuleUtils.isIndexApplied(relation) => + candidateIndexes.filterKeys(relation.plan.equals) + case _ => Map.empty + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/rule/FilterRankFilter.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/rule/FilterRankFilter.scala new file mode 100644 index 000000000..44acda8e3 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/rule/FilterRankFilter.scala @@ -0,0 +1,33 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.rule + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap} +import com.microsoft.hyperspace.index.rules.IndexRankFilter + +object FilterRankFilter extends IndexRankFilter { + override def apply( + plan: LogicalPlan, + applicableIndexes: PlanToIndexesMap): PlanToSelectedIndexMap = { + // TODO: Multiple data skipping index can be applied to the same plan node, + // although the effectiveness decreases as more indexes are applied. + // The framework should be updated to allow multiple indexes. + applicableIndexes.collect { case (plan, indexes) if indexes.nonEmpty => plan -> indexes.head } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketch/MinMaxSketch.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketch/MinMaxSketch.scala index 474069dc5..dbe95a5b8 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketch/MinMaxSketch.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketch/MinMaxSketch.scala @@ -16,9 +16,12 @@ package com.microsoft.hyperspace.index.dataskipping.sketch -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{Max, Min} -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.catalyst.util.{ArrayData, TypeUtils} +import org.apache.spark.sql.types.{ArrayType, DataType} + +import com.microsoft.hyperspace.index.dataskipping.util._ /** * Sketch based on minimum and maximum values for a given expression. @@ -40,4 +43,54 @@ case class MinMaxSketch(override val expr: String, override val dataType: Option override def aggregateFunctions: Seq[Expression] = Min(parsedExpr).toAggregateExpression() :: Max(parsedExpr).toAggregateExpression() :: Nil + + override def convertPredicate( + predicate: Expression, + sketchValues: Seq[Expression], + nameMap: Map[ExprId, String], + resolvedExprs: Seq[Expression]): Option[Expression] = { + val min = sketchValues(0) + val max = sketchValues(1) + // TODO: Add third sketch value "hasNull" of type bool + // true if the expr can be null in the file, false if otherwise + // to optimize IsNull (can skip files with hasNull = false) + // This can be also done as a separate sketch, e.g. HasNullSketch + // Should evaluate which way is better + val resolvedExpr = resolvedExprs.head + val dataType = resolvedExpr.dataType + val exprMatcher = NormalizedExprMatcher(resolvedExpr, nameMap) + val ExprIsTrue = IsTrueExtractor(exprMatcher) + val ExprIsFalse = IsFalseExtractor(exprMatcher) + val ExprIsNotNull = IsNotNullExtractor(exprMatcher) + val ExprEqualTo = EqualToExtractor(exprMatcher) + val ExprLessThan = LessThanExtractor(exprMatcher) + val ExprLessThanOrEqualTo = LessThanOrEqualToExtractor(exprMatcher) + val ExprGreaterThan = GreaterThanExtractor(exprMatcher) + val ExprGreaterThanOrEqualTo = GreaterThanOrEqualToExtractor(exprMatcher) + val ExprIn = InExtractor(exprMatcher) + val ExprInSet = InSetExtractor(exprMatcher) + Option(predicate) + .collect { + case ExprIsTrue() => max + case ExprIsFalse() => Not(min) + case ExprIsNotNull() => Literal(true) + case ExprEqualTo(v) => And(LessThanOrEqual(min, v), GreaterThanOrEqual(max, v)) + case ExprLessThan(v) => LessThan(min, v) + case ExprLessThanOrEqualTo(v) => LessThanOrEqual(min, v) + case ExprGreaterThan(v) => GreaterThan(max, v) + case ExprGreaterThanOrEqualTo(v) => GreaterThanOrEqual(max, v) + case ExprIn(vs) => + vs.map(v => And(LessThanOrEqual(min, v), GreaterThanOrEqual(max, v))).reduceLeft(Or) + case ExprInSet(vs) => + val sortedValues = Literal( + ArrayData.toArrayData( + ArrayUtils.toArray( + vs.filter(_ != null).toArray.sorted(TypeUtils.getInterpretedOrdering(dataType)), + dataType)), + ArrayType(dataType, containsNull = false)) + LessThanOrEqual(ElementAt(sortedValues, SortedArrayLowerBound(sortedValues, min)), max) + // TODO: StartsWith, Like with constant prefix + } + .map(p => And(And(IsNotNull(min), IsNotNull(max)), p)) + } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketch/Sketch.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketch/Sketch.scala index 505dc4b27..d7b5b8a39 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketch/Sketch.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/sketch/Sketch.scala @@ -17,7 +17,7 @@ package com.microsoft.hyperspace.index.dataskipping.sketch import com.fasterxml.jackson.annotation.JsonTypeInfo -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{Expression, ExprId} import org.apache.spark.sql.types.DataType /** @@ -75,4 +75,29 @@ trait Sketch { * Returns the hash code for this sketch. */ def hashCode: Int + + /** + * Converts the given predicate node for source data to an index predicate + * that can be used to filter out unnecessary source files when applied to + * index data. + * + * The returned predicate should evaluate to true for an index data row + * if the corresponding source data file cannot be excluded, and false if + * the source data file can safely skipped. + * + * The implementation should consider the given predicate as a single node, + * not a tree that must be traversed recursively, because that part is + * handled by the framework. + * + * @param predicate Source predicate node + * @param sketchValues Sketch value references in index data + * @param nameMap Map used to match attribute names + * @param resolvedExprs Used to match expressions in predicates + * @return Converted predicate for index data + */ + def convertPredicate( + predicate: Expression, + sketchValues: Seq[Expression], + nameMap: Map[ExprId, String], + resolvedExprs: Seq[Expression]): Option[Expression] } diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/DataSkippingFileIndex.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/DataSkippingFileIndex.scala new file mode 100644 index 000000000..ff41b5701 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/DataSkippingFileIndex.scala @@ -0,0 +1,73 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.util + +import org.apache.hadoop.fs.Path +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Column, DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.functions.isnull +import org.apache.spark.sql.types.StructType + +import com.microsoft.hyperspace.index.{FileIdTracker, IndexConstants} + +class DataSkippingFileIndex( + sparkSession: SparkSession, + indexData: DataFrame, + private[dataskipping] val indexDataPred: Expression, // exposed for test + fileIdTracker: FileIdTracker, + baseFileIndex: FileIndex) + extends FileIndex + with Logging { + + override def listFiles( + partitionFilters: Seq[Expression], + dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { + val partitions = baseFileIndex.listFiles(partitionFilters, dataFilters) + import sparkSession.implicits._ + val pathCol = "__path" + val filesWithId = partitions + .flatMap(_.files.map(f => (f.getPath.toString, fileIdTracker.addFile(f)))) + .toDF(pathCol, IndexConstants.DATA_FILE_NAME_ID) + val selectedFiles = filesWithId + .hint("broadcast") + .join(indexData, Seq(IndexConstants.DATA_FILE_NAME_ID), "left") + .filter(isnull(indexData(IndexConstants.DATA_FILE_NAME_ID)) || new Column(indexDataPred)) + .select(pathCol) + .collect + .map(_.getString(0)) + .toSet + val selectedPartitions = partitions + .map(p => p.copy(files = p.files.filter(f => selectedFiles.contains(f.getPath.toString)))) + .filter(_.files.nonEmpty) + logDebug(s"selectedPartitions = $selectedPartitions") + selectedPartitions + } + + override def rootPaths: Seq[Path] = baseFileIndex.rootPaths + + override def inputFiles: Array[String] = baseFileIndex.inputFiles + + override def refresh(): Unit = baseFileIndex.refresh() + + override def sizeInBytes: Long = baseFileIndex.sizeInBytes + + override def partitionSchema: StructType = baseFileIndex.partitionSchema + + override def metadataOpsTimeNs: Option[Long] = baseFileIndex.metadataOpsTimeNs +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/ExpressionUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/ExpressionUtils.scala index ed009efc0..eb058ab2a 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/ExpressionUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/dataskipping/util/ExpressionUtils.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.types.DataType import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.index.IndexUtils import com.microsoft.hyperspace.index.dataskipping.sketch.Sketch +import com.microsoft.hyperspace.index.rules.ApplyHyperspace.withHyperspaceRuleDisabled object ExpressionUtils { @@ -159,4 +160,52 @@ object ExpressionUtils { } } } + + /** + * Returns sketch expressions that can be used to match indexed expressions + * and expressions in the filter condition. For example, when a user creates + * an index with MinMaxSketch("A"), we create an expression corresopnding to + * "A" here, and later we try to match expression nodes in a filter condition, + * say, EqualTo(AttributeReference("A"), Literal(1)), to the expression for + * "A". + * + * We need this step as the filter/join conditions are given to us as a tree + * of expressions in the Spark's optimizer, whereas the indexed expressions + * are provided and stored as strings. + */ + def getResolvedExprs( + spark: SparkSession, + sketches: Seq[Sketch], + source: LogicalPlan): Option[Map[Sketch, Seq[Expression]]] = { + val resolvedExprs = sketches.map { s => + val cond = PredicateWrapper(s.expressions.map { + case (expr, _) => spark.sessionState.sqlParser.parseExpression(expr) + }) + val filter = withHyperspaceRuleDisabled { + spark.sessionState.optimizer + .execute(spark.sessionState.analyzer.execute(Filter(cond, source))) + .asInstanceOf[Filter] + } + val resolved = filter.condition.asInstanceOf[PredicateWrapper].children.map(normalize) + if (!s.expressions.map(_._2.get).zip(resolved).forall { + case (dataType, resolvedExpr) => dataType == resolvedExpr.dataType + }) { + return None + } + s -> resolved + }.toMap + Some(resolvedExprs) + } + + // Used to preserve sketch expressions during optimization + private case class PredicateWrapper(override val children: Seq[Expression]) + extends Expression + with Predicate { + // $COVERAGE-OFF$ code never used + override def nullable: Boolean = false + override def eval(input: InternalRow): Any = throw new NotImplementedError + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = + throw new NotImplementedError + // $COVERAGE-ON$ + } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReason.scala b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReason.scala index 88dbfd1df..519400207 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReason.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/FilterReason.scala @@ -148,4 +148,11 @@ object FilterReasons { override def verboseStr: String = s"Another candidate index is applied: $appliedIndex" } + + case class IneligibleFilterCondition(condition: String) extends FilterReason { + override final def codeStr: String = "INELIGIBLE_FILTER_CONDITION" + override val args = Seq("condition" -> condition) + override def verboseStr: String = + s"Ineligible filter condition: $condition" + } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/PlanAnalyzer.scala b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/PlanAnalyzer.scala index a9336bf96..81cbfa035 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/plananalysis/PlanAnalyzer.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/plananalysis/PlanAnalyzer.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.hyperspace.utils.logicalPlanToDataFrame import com.microsoft.hyperspace.{HyperspaceException, Implicits} import com.microsoft.hyperspace.index.IndexConstants +import com.microsoft.hyperspace.index.plans.logical.IndexHadoopFsRelation import com.microsoft.hyperspace.shim.ExtractFileSourceScanExecRelation /** @@ -212,7 +213,13 @@ object PlanAnalyzer { plan: SparkPlan, indexes: DataFrame, bufferStream: BufferStream): Unit = { - val usedIndexes = indexes.filter(indexes("indexLocation").isin(getPaths(plan): _*)) + val usedIndexNames = plan.collect { + case ExtractFileSourceScanExecRelation(rel: IndexHadoopFsRelation) => + rel.indexName + } + val usedIndexes = indexes.filter( + indexes("indexLocation").isin(getPaths(plan): _*) || + indexes("name").isin(usedIndexNames: _*)) usedIndexes.collect().foreach { row => bufferStream .write(row.getAs("name").toString) diff --git a/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala b/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala index d46b25fb6..0936695b1 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/plans/logical/IndexHadoopFsRelation.scala @@ -45,6 +45,24 @@ class IndexHadoopFsRelation( s"Hyperspace(Type: ${index.derivedDataset.kindAbbr}, " + s"Name: ${index.name}, LogVersion: ${index.id})" } + + def indexName: String = index.name + override def toString(): String = indexPlanStr } + +object IndexHadoopFsRelation { + def apply( + rel: HadoopFsRelation, + spark: SparkSession, + index: IndexLogEntry): IndexHadoopFsRelation = { + new IndexHadoopFsRelation( + rel.location, + rel.partitionSchema, + rel.dataSchema, + rel.bucketSpec, + rel.fileFormat, + rel.options)(spark, index) + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala index df87202e0..a9ae97acd 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/ApplyHyperspace.scala @@ -63,4 +63,13 @@ object ApplyHyperspace } } } + + def withHyperspaceRuleDisabled[T](f: => T): T = { + try { + disableForIndexMaintenance.set(true) + f + } finally { + disableForIndexMaintenance.set(false) + } + } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizer.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizer.scala index ed083e1cb..acba2fa33 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizer.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/ScoreBasedIndexPlanOptimizer.scala @@ -21,13 +21,15 @@ import scala.collection.mutable import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import com.microsoft.hyperspace.index.covering.{FilterIndexRule, JoinIndexRule} +import com.microsoft.hyperspace.index.dataskipping.rule.ApplyDataSkippingIndex import com.microsoft.hyperspace.index.rules.ApplyHyperspace.PlanToIndexesMap /** * Apply Hyperspace indexes based on the score of each index application. */ class ScoreBasedIndexPlanOptimizer { - private val rules: Seq[HyperspaceRule] = FilterIndexRule :: JoinIndexRule :: NoOpRule :: Nil + private val rules: Seq[HyperspaceRule] = + Seq(FilterIndexRule, JoinIndexRule, ApplyDataSkippingIndex, NoOpRule) // Map for memoization. The key is the logical plan before applying [[HyperspaceRule]]s // and its value is a pair of best transformed plan and its score. diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedRelation.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedRelation.scala index fe9b18b7b..ee5eeb1e0 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedRelation.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedRelation.scala @@ -60,6 +60,11 @@ class DefaultFileBasedRelation(spark: SparkSession, override val plan: LogicalRe filesFromIndex(location) } + override def rootPaths: Seq[Path] = + plan.relation match { + case HadoopFsRelation(location, _, _, _, _, _) => location.rootPaths + } + /** * The partition schema of the current relation. */ diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergRelation.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergRelation.scala index 0c9d0e48d..1ed790bcc 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergRelation.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergRelation.scala @@ -73,6 +73,9 @@ class IcebergRelation( table.newScan().planFiles().iterator().asScala.toSeq.map(toFileStatus) } + override def rootPaths: Seq[Path] = + Seq(PathUtils.makeAbsolute(table.location(), spark.sessionState.newHadoopConf())) + /** * The optional partition base path of the current relation. */ diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala index 8b01adbf0..3c4ab0e0c 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala @@ -16,7 +16,7 @@ package com.microsoft.hyperspace.index.sources -import org.apache.hadoop.fs.FileStatus +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -75,6 +75,11 @@ trait FileBasedRelation extends SourceRelation { */ val allFiles: Seq[FileStatus] + /** + * Returns the root paths of the relation. + */ + def rootPaths: Seq[Path] + /** * FileInfo list for all source files that the current relation references to. */ diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexIntegrationTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexIntegrationTest.scala index adbb456f2..cbe3cf950 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexIntegrationTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexIntegrationTest.scala @@ -16,14 +16,45 @@ package com.microsoft.hyperspace.index.dataskipping -import com.microsoft.hyperspace.HyperspaceException -import com.microsoft.hyperspace.index.dataskipping.sketch.MinMaxSketch +import org.apache.hadoop.fs.Path +import org.apache.spark.SparkException +import org.apache.spark.sql.{DataFrame, SaveMode} +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.types._ + +import com.microsoft.hyperspace._ +import com.microsoft.hyperspace.index.IndexConstants +import com.microsoft.hyperspace.index.covering.CoveringIndexConfig +import com.microsoft.hyperspace.index.dataskipping.sketch._ +import com.microsoft.hyperspace.index.plans.logical.IndexHadoopFsRelation +import com.microsoft.hyperspace.shim.ExtractFileSourceScanExecRelation class DataSkippingIndexIntegrationTest extends DataSkippingSuite { import spark.implicits._ override val numParallelism: Int = 10 + test("MinMax index is applied for a filter query (EqualTo).") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("A"))) + def query: DataFrame = df.filter("A = 1") + checkIndexApplied(query, 1) + } + + test("Empty relation is returned if no files match the index predicate.") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("A"))) + def query: DataFrame = df.filter("A = -1") + checkIndexApplied(query, 0) + } + + test("MinMax index is applied for a filter query (EqualTo) with expression.") { + val df = createSourceData(spark.range(100).selectExpr("id as A", "id * 2 as B")) + hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("A + B"))) + def query: DataFrame = df.filter("A+B < 40") + checkIndexApplied(query, 2) + } + test("Non-deterministic expression is blocked.") { val df = createSourceData(spark.range(100).toDF("A")) val ex = intercept[HyperspaceException]( @@ -88,4 +119,248 @@ class DataSkippingIndexIntegrationTest extends DataSkippingSuite { "DataSkippingIndex does not support indexing an expression which does not " + "reference source columns: myfunc()")) } + + test("MinMax index is applied for a filter query (EqualTo) with UDF.") { + val df = createSourceData(spark.range(100).toDF("A")) + spark.udf.register("myfunc", (a: Int) => a * 2) + hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("myfunc(A)"))) + def query: DataFrame = df.filter("myfunc(A) = 10") + checkIndexApplied(query, 1) + } + + test("UDF matching is based on the name, not the actual lambda object.") { + val df = createSourceData(spark.range(100).toDF("A")) + spark.udf.register("myfunc", (a: Int) => a * 2) + hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("myfunc(A)"))) + // Register a new function with the same semantics. + spark.udf.register("myfunc", (a: Int) => 2 * a) + def query: DataFrame = df.filter("myfunc(A) = 10") + checkIndexApplied(query, 1) + } + + test("MinMax index is not applied for a filter query if it is not applicable.") { + val df = createSourceData(spark.range(100).selectExpr("id as A", "id * 2 as B")) + hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("B"))) + def query: DataFrame = df.filter("A = 1") + checkIndexApplied(query, numParallelism) + } + + test("MinMax index is not applied for a filter query if the filter condition is unsuitable.") { + val df = createSourceData(spark.range(100).selectExpr("id as A", "id * 2 as B")) + hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("A"))) + def query: DataFrame = df.filter("A = 1 or B = 2") + checkIndexApplied(query, numParallelism) + } + + test("MinMax index is not applied for a filter query if the filter condition is IsNull.") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("A"))) + def query: DataFrame = df.filter("A is null") + checkIndexApplied(query, numParallelism) + } + + test("Multiple indexes are applied to multiple filters.") { + val df = createSourceData(spark.range(100).toDF("A"), path = "TA") + val df2 = createSourceData(spark.range(100, 200).toDF("B"), path = "TB") + hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("A"))) + hs.createIndex(df2, DataSkippingIndexConfig("myind2", MinMaxSketch("B"))) + def query: DataFrame = df.filter("A = 10").union(df2.filter("B = 110")) + checkIndexApplied(query, 2) + } + + test("Single index is applied to multiple filters.") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("A"))) + def query: DataFrame = df.filter("A = 10").union(df.filter("A = 20")) + checkIndexApplied(query, 2) + } + + test("Single index is applied to a single filter.") { + val df = createSourceData(spark.range(100).selectExpr("id as A", "id * 2 as B")) + hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("A"))) + def query: DataFrame = df.filter("A = 10").union(df.filter("B = 120")) + checkIndexApplied(query, numParallelism + 1) + } + + test( + "DataSkippingIndex works correctly for CSV where the same source data files can be " + + "interpreted differently.") { + // String order: 1 < 10 < 2 + // Int order: 1 < 2 < 10 + createFile(dataPath("1.csv"), Seq("a", "1", "2", "10").mkString("\n").getBytes()) + createFile(dataPath("2.csv"), Seq("a", "3", "4", "5").mkString("\n").getBytes()) + val paths = Seq(dataPath("1.csv").toString, dataPath("2.csv").toString) + val dfString = spark.read.option("header", "true").csv(paths: _*) + assert(dfString.schema.head.dataType === StringType) + val dfInt = spark.read.option("header", "true").option("inferSchema", "true").csv(paths: _*) + assert(dfInt.schema.head.dataType === IntegerType) + + withIndex("myind") { + hs.createIndex(dfString, DataSkippingIndexConfig("myind", MinMaxSketch("A"))) + checkIndexApplied(dfString.filter("A = 3"), 2) + checkIndexApplied(dfString.filter("A = 10"), 2) + checkIndexApplied(dfString.filter("A = '3'"), 1) + checkIndexApplied(dfString.filter("A = '10'"), 1) + checkIndexApplied(dfInt.filter("A = 3"), 2) + checkIndexApplied(dfInt.filter("A = 10"), 2) + checkIndexApplied(dfInt.filter("A = '3'"), 2) + checkIndexApplied(dfInt.filter("A = '10'"), 2) + } + withIndex("myind") { + hs.createIndex(dfInt, DataSkippingIndexConfig("myind", MinMaxSketch("A"))) + checkIndexApplied(dfString.filter("A = 3"), 2) + checkIndexApplied(dfString.filter("A = 10"), 2) + checkIndexApplied(dfString.filter("A = '3'"), 2) + checkIndexApplied(dfString.filter("A = '10'"), 2) + checkIndexApplied(dfInt.filter("A = 3"), 2) + checkIndexApplied(dfInt.filter("A = 10"), 1) + checkIndexApplied(dfInt.filter("A = '3'"), 2) + checkIndexApplied(dfInt.filter("A = '10'"), 1) + } + } + + test("MinMax index is applied for a filter query (EqualTo) with selection.") { + val df = createSourceData(spark.range(100).selectExpr("id as A", "id * 2 as B")) + hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("A"))) + def query: DataFrame = df.filter("A = 1").select("B") + checkIndexApplied(query, 1) + } + + test("MinMax index can be refreshed (mode = incremental).") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("A"))) + createSourceData(spark.range(100, 200).toDF("A"), saveMode = SaveMode.Append) + hs.refreshIndex("myind", "incremental") + def query: DataFrame = spark.read.parquet(dataPath().toString).filter("A = 1 OR A = 123") + checkIndexApplied(query, 2) + assert(numIndexDataFiles("myind") === 2) + } + + test("MinMax index can be refreshed (mode = full).") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("A"))) + createSourceData(spark.range(100, 200).toDF("A"), saveMode = SaveMode.Append) + hs.refreshIndex("myind", "full") + def query: DataFrame = spark.read.parquet(dataPath().toString).filter("A = 1 OR A = 123") + checkIndexApplied(query, 2) + assert(numIndexDataFiles("myind") === 1) + } + + test( + "MinMax index can be applied without refresh when source files are added " + + "if hybrid scan is enabled.") { + withSQLConf( + IndexConstants.INDEX_HYBRID_SCAN_ENABLED -> "true", + IndexConstants.INDEX_HYBRID_SCAN_APPENDED_RATIO_THRESHOLD -> "1") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("A"))) + createSourceData(spark.range(100, 200).toDF("A"), saveMode = SaveMode.Append) + def query: DataFrame = spark.read.parquet(dataPath().toString).filter("A = 1 OR A = 123") + checkIndexApplied(query, 11) + } + } + + test("Empty source data does not cause an error.") { + val df = createSourceData(spark.range(0).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("A"))) + def query: DataFrame = df.filter("A = 1") + checkIndexApplied(query, 1) + } + + test("Empty source data followed by refresh incremental works as expected.") { + val df = createSourceData(spark.range(0).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("A"))) + createSourceData(spark.range(100).toDF("A"), saveMode = SaveMode.Append) + hs.refreshIndex("myind", "incremental") + def query: DataFrame = spark.read.parquet(dataPath().toString).filter("A = 1") + checkIndexApplied(query, 2) + } + + test("MinMax index can be optimized.") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("A"))) + createSourceData(spark.range(100, 200).toDF("A"), saveMode = SaveMode.Append) + hs.refreshIndex("myind", "incremental") + assert(numIndexDataFiles("myind") === 2) + hs.optimizeIndex("myind") + assert(numIndexDataFiles("myind") === 1) + def query: DataFrame = spark.read.parquet(dataPath().toString).filter("A = 1 OR A = 123") + checkIndexApplied(query, 2) + } + + test("CoveringIndex is applied if both CoveringIndex and DataSkippingIndex are applicable.") { + val df = createSourceData(spark.range(100).toDF("A")) + hs.createIndex(df, DataSkippingIndexConfig("ds", MinMaxSketch("A"))) + hs.createIndex(df, CoveringIndexConfig("ci", Seq("A"), Nil)) + spark.enableHyperspace + def query: DataFrame = df.filter("A = 1 or A = 50") + val rel = query.queryExecution.optimizedPlan.collect { + case LogicalRelation(rel: IndexHadoopFsRelation, _, _, _) => rel + } + assert(rel.map(_.indexName) === Seq("ci")) + checkAnswer(query, Seq(1, 50).toDF("A")) + } + + test("DataSkippingIndex is applied if CoveringIndex is not applicable.") { + val df = createSourceData(spark.range(100).selectExpr("id as A", "id * 2 as B")) + hs.createIndex(df, DataSkippingIndexConfig("ds", MinMaxSketch("A"))) + hs.createIndex(df, CoveringIndexConfig("ci", Seq("A"), Nil)) + spark.enableHyperspace + def query: DataFrame = df.filter("A = 1 or A = 50") + val rel = query.queryExecution.optimizedPlan.collect { + case LogicalRelation(rel: IndexHadoopFsRelation, _, _, _) => rel + } + assert(rel.map(_.indexName) === Seq("ds")) + checkAnswer(query, Seq((1, 2), (50, 100)).toDF("A", "B")) + } + + test("Both CoveringIndex and DataSkippnigIndex can be applied.") { + val df = createSourceData(spark.range(100).selectExpr("id as A", "id * 2 as B")) + hs.createIndex(df, CoveringIndexConfig("ci", Seq("A"), Nil)) + hs.createIndex(df, DataSkippingIndexConfig("ds", MinMaxSketch("B"))) + spark.enableHyperspace + def query: DataFrame = df.filter("A = 1").select("A").union(df.filter("B = 100").select("A")) + val rel = query.queryExecution.optimizedPlan.collect { + case LogicalRelation(rel: IndexHadoopFsRelation, _, _, _) => rel + } + assert(rel.map(_.indexName).sorted === Seq("ci", "ds")) + checkAnswer(query, Seq(1, 50).toDF("A")) + } + + test("DataSkippingIndex works correctly with files having special characters in their name.") { + assume(!Path.WINDOWS) + val df = createSourceData(spark.range(100).toDF("A"), "table ,.;'`~!@#$%^&()_+|\"<>") + hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("A"))) + def query: DataFrame = df.filter("A = 1") + checkIndexApplied(query, 1) + } + + test("DataSkippingIndex works correctly with catalog tables") { + withTable("T") { + spark.range(100).toDF("A").write.saveAsTable("T") + val df = spark.read.table("T") + hs.createIndex(df, DataSkippingIndexConfig("myind", MinMaxSketch("A"))) + def query: DataFrame = df.filter("A = 1") + checkIndexApplied(query, 1) + } + } + + def checkIndexApplied(query: => DataFrame, numExpectedFiles: Int): Unit = { + withClue(s"query = ${query.queryExecution.logical}numExpectedFiles = $numExpectedFiles\n") { + spark.disableHyperspace + val queryWithoutIndex = query + queryWithoutIndex.collect() + spark.enableHyperspace + val queryWithIndex = query + queryWithIndex.collect() + checkAnswer(queryWithIndex, queryWithoutIndex) + assert(numAccessedFiles(queryWithIndex) === numExpectedFiles) + } + } + + def numIndexDataFiles(name: String): Int = { + val manager = Hyperspace.getContext(spark).indexCollectionManager + val latestVersion = manager.getIndexVersions(name, Seq("ACTIVE")).max + manager.getIndex(name, latestVersion).get.content.files.length + } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingSuite.scala index 61750670c..a0fb54331 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingSuite.scala @@ -21,6 +21,9 @@ import scala.collection.AbstractIterator import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path, RemoteIterator} import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode, SparkSession} +import org.apache.spark.sql.execution.datasources.FilePartition +import org.apache.spark.sql.execution.DataSourceScanExec +import org.apache.spark.sql.internal.SQLConf import com.microsoft.hyperspace.Hyperspace import com.microsoft.hyperspace.index._ @@ -52,6 +55,7 @@ trait DataSkippingSuite extends QueryTest with HyperspaceSuite { after { FileUtils.delete(tempDir) + spark.catalog.clearCache() } def dataPath(path: String = "T"): Path = new Path(dataPathRoot, path) @@ -73,6 +77,24 @@ trait DataSkippingSuite extends QueryTest with HyperspaceSuite { } } + def createPartitionedSourceData( + originalData: DataFrame, + partitioningColumns: Seq[String], + path: String = "T", + saveMode: SaveMode = SaveMode.Overwrite, + appendedDataOnly: Boolean = false): DataFrame = { + val p = dataPath(path) + val oldFiles = listFiles(p).toSet + originalData.write.partitionBy(partitioningColumns: _*).mode(saveMode).parquet(p.toString) + updateFileIdTracker(p) + if (appendedDataOnly) { + val newFiles = listFiles(p).filterNot(oldFiles.contains) + spark.read.option("basePath", p.toString).parquet(newFiles.map(_.getPath.toString): _*) + } else { + spark.read.parquet(p.toString) + } + } + def updateFileIdTracker(path: Path): Unit = { listFiles(path).foreach(f => fileIdTracker.addFile(f)) } @@ -110,5 +132,55 @@ trait DataSkippingSuite extends QueryTest with HyperspaceSuite { fs.delete(path, true) } + def createFile(path: Path, data: Array[Byte]): Unit = { + val fs = path.getFileSystem(new Configuration) + val out = fs.create(path) + out.write(data) + out.close() + } + def isParquet: FileStatus => Boolean = _.getPath.getName.endsWith(".parquet") + + def withAndWithoutCodegen(testFun: => Unit): Unit = { + import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._ + Seq(false, true).foreach { codegenEnabled => + withClue(s"codegenEnabled = $codegenEnabled") { + val mode = if (codegenEnabled) CODEGEN_ONLY else NO_CODEGEN + withSQLConf( + SQLConf.CODEGEN_FACTORY_MODE.key -> mode.toString, + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegenEnabled.toString) { + testFun + } + } + } + } + + def createIndexLogEntry(indexConfig: IndexConfigTrait, sourceData: DataFrame): IndexLogEntry = { + val (index, indexData) = indexConfig.createIndex(ctx, sourceData, Map()) + index.write(ctx, indexData) + IndexLogEntry( + indexConfig.indexName, + index, + Content.fromDirectory(indexDataPath, fileIdTracker, new Configuration), + Source( + SparkPlan(SparkPlan.Properties( + Seq(RelationUtils + .getRelation(spark, sourceData.queryExecution.optimizedPlan) + .createRelationMetadata(fileIdTracker)), + null, + null, + LogicalPlanFingerprint( + LogicalPlanFingerprint.Properties(Seq(Signature("sp", "sig"))))))), + Map.empty) + } + + def numAccessedFiles(df: DataFrame): Int = { + df.queryExecution.executedPlan.collect { + case scan: DataSourceScanExec => + scan.inputRDDs + .flatMap( + _.partitions.flatMap(_.asInstanceOf[FilePartition].files.map(_.filePath).toSet)) + .length + }.sum + } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rule/ApplyDataSkippingIndexTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rule/ApplyDataSkippingIndexTest.scala new file mode 100644 index 000000000..35b5cbfeb --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rule/ApplyDataSkippingIndexTest.scala @@ -0,0 +1,371 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.rule + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.hyperspace.utils.logicalPlanToDataFrame + +import com.microsoft.hyperspace.index._ +import com.microsoft.hyperspace.index.dataskipping._ +import com.microsoft.hyperspace.index.dataskipping.sketch._ +import com.microsoft.hyperspace.index.dataskipping.util._ + +class ApplyDataSkippingIndexTest extends DataSkippingSuite { + import spark.implicits._ + + override val numParallelism: Int = 10 + + test("applyIndex returns the unmodified plan if no index is given.") { + val sourceData = createSourceData(spark.range(100).toDF("A")) + val query = sourceData.filter("A = 1") + val plan = query.queryExecution.optimizedPlan + assert(ApplyDataSkippingIndex.applyIndex(plan, Map.empty) === plan) + } + + test("score returns 0 if no index is given.") { + val sourceData = createSourceData(spark.range(100).toDF("A")) + val query = sourceData.filter("A = 1") + val plan = query.queryExecution.optimizedPlan + assert(ApplyDataSkippingIndex.score(plan, Map.empty) === 0) + } + + case class SourceData(df: () => DataFrame, description: String) + + case class Param( + sourceData: SourceData, + filter: String, + sketches: Seq[Sketch], + numExpectedFiles: Int, + setup: Option[() => _]) + + object Param { + def apply( + sourceData: SourceData, + filter: String, + sketch: Sketch, + numExpectedFiles: Int): Param = { + Param(sourceData, filter, Seq(sketch), numExpectedFiles, None) + } + + def apply( + sourceData: SourceData, + filter: String, + sketches: Seq[Sketch], + numExpectedFiles: Int): Param = { + Param(sourceData, filter, sketches, numExpectedFiles, None) + } + + def apply( + sourceData: SourceData, + filter: String, + sketch: Sketch, + numExpectedFiles: Int, + setup: () => _): Param = { + Param(sourceData, filter, Seq(sketch), numExpectedFiles, Some(setup)) + } + + def apply( + sourceData: SourceData, + filter: String, + sketches: Seq[Sketch], + numExpectedFiles: Int, + setup: () => _): Param = { + Param(sourceData, filter, sketches, numExpectedFiles, Some(setup)) + } + } + + def dataI: SourceData = + SourceData(() => createSourceData(spark.range(100).toDF("A")), "source [A:Int]") + + def dataII: SourceData = + SourceData( + () => createSourceData(spark.range(100).selectExpr("id as A", "id * 2 as B")), + "source [A:Int, B:Int]") + + def dataIN: SourceData = + SourceData( + () => + createSourceData( + Seq[Integer](1, 2, null, null, null, null, 7, 8, 9, null, 11, 12, null, 14, null, null, + 17, null, 19, 20).toDF("A")), + "source [A:Int] with nulls") + + def dataIIP: SourceData = + SourceData( + () => + createPartitionedSourceData( + spark.range(100).selectExpr("cast(id / 10 as int) as A", "id as B"), + Seq("A")), + "source [A:Int, B:Int] partitioned") + + def dataD: SourceData = + SourceData( + () => createSourceData(spark.range(100).map(_.toDouble).toDF("A")), + "source [A:Double]") + + def dataDS: SourceData = + SourceData( + () => + createSourceData( + Seq( + 0.0, + 1.0, + 1.5, + Double.NegativeInfinity, + Double.PositiveInfinity, + Double.NaN, + 3.14, + 2.718, + -1.1, + -0.0).toDF("A")), + "source [A:Double] small") + + def dataN2: SourceData = + SourceData( + () => + createSourceData( + spark.read.json(Seq( + """{"a": 1, "b": {"a": 0, "c": 2, "d": "x"}}""", + """{"a": 2, "b": {"a": 0, "c": 3, "d": "y"}}""", + """{"a": 3, "b": {"a": 1, "c": 4, "d": "x"}}""", + """{"a": 4, "b": {"a": 2, "c": null, "d": "x"}}""", + """{"a": 2, "b": {"a": 2, "c": 6, "d": "x"}}""", + """{"a": 2, "b": {"a": 1, "c": 7, "d": "x"}}""", + """{"b": {"c": 8, "d": "x"}}""", + """{"b": {"d": "y"}}""", + """{"a": 3}""", + """{"b": {"c": 11}}""").toDS)), + "source [A:Int, B:[A:Int, C:Int, D: String]]") + + def dataN3: SourceData = + SourceData( + () => + createSourceData( + spark.read.json(Seq( + """{"a": {"b": {"c": 1}}}""", + """{"a": {"b": {"c": 2}}}""", + """{"a": {"b": {"c": 3}}}""", + """{"a": {"b": {"c": null}}}""", + """{"a": {"b": {"c": 5}}}""", + """{"a": {"b": {"c": 6}}}""", + """{"a": {"b": {"c": 7}}}""", + """{"a": {"b": {"c": 8}}}""", + """{"a": null}""", + """{"a": {"b": {"c": 0}}}""").toDS)), + "source [A:[B:[C:Int]]]") + + def dataB: SourceData = + SourceData( + () => + createSourceData( + Seq( + Array[Byte](0, 0, 0, 0), + Array[Byte](0, 1, 0, 1), + Array[Byte](1, 2, 3, 4), + Array[Byte](5, 6, 7, 8), + Array[Byte](32, 32, 32, 32), + Array[Byte](64, 64, 64, 64), + Array[Byte](1, 1, 1, 1), + Array[Byte](-128, -128, -128, -128), + Array[Byte](127, 127, 127, 127), + Array[Byte](-1, 1, 0, 0)).toDF("A")), + "source [A:Binary]") + + def dataS: SourceData = + SourceData( + () => + createSourceData( + Seq( + "foo1", + "foo2000", + "foo3", + "foo4", + "foo5", + null, + "foo7", + "foo8", + "foo9", + "baar", + null) + .toDF("A")), + "source [A:String]") + + Seq( + Param(dataI, "A = 10", MinMaxSketch("A"), 1), + Param(dataI, "50 = a", MinMaxSketch("A"), 1), + Param(dataI, "A = -10", MinMaxSketch("a"), 0), + Param(dataI, "A = 5 + 5", MinMaxSketch("A"), 1), + Param(dataI, "A = 10 or A = 30", MinMaxSketch("A"), 2), + Param(dataI, "A is null", MinMaxSketch("A"), 10), + Param(dataI, "!(A is null)", MinMaxSketch("A"), 10), + Param(dataI, "A is not null", MinMaxSketch("A"), 10), + Param(dataI, "!(A is not null)", MinMaxSketch("A"), 10), + Param(dataI, "A <=> 10", MinMaxSketch("A"), 1), + Param(dataI, "10 <=> A", MinMaxSketch("A"), 1), + Param(dataI, "A <=> null", MinMaxSketch("A"), 10), + Param(dataI, "A <25", MinMaxSketch("A"), 3), + Param(dataI, "30>A", MinMaxSketch("A"), 3), + Param(dataI, "31 > A", MinMaxSketch("a"), 4), + Param(dataI, "A > 25", MinMaxSketch("a"), 8), + Param(dataI, "28 < A", MinMaxSketch("a"), 8), + Param(dataI, "29< A", MinMaxSketch("A"), 7), + Param(dataI, "A <= 25", MinMaxSketch("A"), 3), + Param(dataI, "29 >= A", MinMaxSketch("A"), 3), + Param(dataI, "30>=A", MinMaxSketch("A"), 4), + Param(dataI, "A >= 25", MinMaxSketch("A"), 8), + Param(dataI, "29 <= A", MinMaxSketch("A"), 8), + Param(dataI, "30 <= A", MinMaxSketch("A"), 7), + Param(dataI, "A != 1", MinMaxSketch("A"), 10), + Param(dataI, "not (A != 1 and A != 10)", MinMaxSketch("A"), 2), + Param(dataI, "!(!(A = 1))", MinMaxSketch("A"), 1), + Param(dataI, "!(A < 20)", MinMaxSketch("A"), 8), + Param(dataI, "not (A not in (1, 2, 3))", MinMaxSketch("A"), 1), + Param(dataS, "A < 'foo'", MinMaxSketch("A"), 1), + Param(dataI, "a = 10", MinMaxSketch("A"), 1), + Param(dataI, "A = 10", MinMaxSketch("a"), 1), + Param(dataI, "A in (1, 2, 3, 10)", MinMaxSketch("A"), 2), + Param(dataI, "A in (10,9,8,7,6,5,4,3,2,1,50,49,48,47,46,45)", MinMaxSketch("A"), 4), + Param(dataS, "A in ('foo1', 'foo5', 'foo9')", MinMaxSketch("A"), 3), + Param( + dataS, + "A in ('foo1','a','b','c','d','e','f','g','h','i','j','k')", + MinMaxSketch("A"), + 1), + Param(dataD, "A in (1,2,3,15,16,17)", MinMaxSketch("A"), 2), + Param(dataD, "A in (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16)", MinMaxSketch("A"), 2), + Param(dataB, "A in (x'00000000', x'0001', x'0002', x'05060708')", MinMaxSketch("A"), 2), + Param( + dataB, + "A in (x'00',x'01',x'02',x'03',x'04',x'05',x'06',x'07',x'08',x'09',x'0a',x'20202020')", + MinMaxSketch("A"), + 1), + Param(dataI, "A BETWEEN 27 AND 51", MinMaxSketch("A"), 4), + Param(dataI, "IF(A=1,2,3)=2", MinMaxSketch("A"), 10), + Param(dataII, "A = 10 OR B = 50", Seq(MinMaxSketch("A"), MinMaxSketch("B")), 2), + Param(dataII, "A = 10 or B = 50", Seq(MinMaxSketch("A")), 10), + Param(dataII, "A = 10 and B = 20", MinMaxSketch("A"), 1), + Param(dataII, "a = 10 AND b = 20", Seq(MinMaxSketch("A"), MinMaxSketch("B")), 1), + Param(dataII, "A < 30 and B > 20", MinMaxSketch("A"), 3), + Param(dataII, "A < 30 and b > 40", Seq(MinMaxSketch("a"), MinMaxSketch("B")), 1), + Param(dataII, "A = 10 and B = 90", Seq(MinMaxSketch("A"), MinMaxSketch("B")), 0), + Param(dataIN, "A is not null", MinMaxSketch("A"), 7), + Param(dataIN, "!(A <=> null)", MinMaxSketch("A"), 7), + Param(dataIN, "A = 2", MinMaxSketch("A"), 1), + Param(dataIN, "A is null", MinMaxSketch("A"), 10), + Param(dataIIP, "B = 10", MinMaxSketch("B"), 1), + Param(dataIIP, "A = 5 and B = 20", MinMaxSketch("B"), 0), + Param(dataIIP, "A < 5 and B = 20", MinMaxSketch("B"), 1), + Param(dataN2, "B.C = 2", MinMaxSketch("B.C"), 1), + Param(dataN2, "B.c = 2", MinMaxSketch("b.C"), 1), + Param(dataN2, "b.c < 5", MinMaxSketch("b.c"), 3), + Param(dataN3, "A.B.C = 2", MinMaxSketch("a.B.C"), 1), + Param(dataDS, "A = 1.0", MinMaxSketch("A"), 1), + Param(dataDS, "A <= 1.5", MinMaxSketch("A"), 6), + Param(dataDS, "A >= 1.5", MinMaxSketch("A"), 5), + Param(dataD, "A in (1, 2, 3, 10)", MinMaxSketch("A"), 2), + Param(dataII, "A + B < 100", MinMaxSketch("a+b"), 4), + Param( + dataI, + "F(A) = 10", + MinMaxSketch("F(A)"), + 1, + () => spark.udf.register("F", (a: Int) => a * 2)), + Param( + dataI, + "is_less_than_23(A)", + MinMaxSketch("is_less_than_23(A)"), + 3, + () => spark.udf.register("is_less_than_23", (a: Int) => a < 23)), + Param( + dataI, + "!is_less_than_23(A)", + MinMaxSketch("is_less_than_23(A)"), + 8, + () => spark.udf.register("is_less_than_23", (a: Int) => a < 23)), + Param( + dataII, + "A < 50 and F(A,B) < 20", + Seq(MinMaxSketch("A"), MinMaxSketch("F(A,B)")), + 2, + () => spark.udf.register("F", (a: Int, b: Int) => b - a)), + Param( + dataI, + "f(a) < 30", + MinMaxSketch("F(a)"), + 2, + () => spark.udf.register("F", (a: Int) => a * 2)), + Param( + dataI, + "IF(A IS NULL,NULL,F(A))=2", + MinMaxSketch("A"), + 10, + () => spark.udf.register("F", (a: Int) => a * 2))).foreach { + case Param(sourceData, filter, sketches, numExpectedFiles, setup) => + test( + s"applyIndex works as expected for ${sourceData.description}: " + + s"filter=[$filter], sketches=[${sketches.mkString(", ")}], " + + s"numExpectedFiles=[$numExpectedFiles]") { + val indexConfig = DataSkippingIndexConfig("ind1", sketches.head, sketches.tail: _*) + if (setup.nonEmpty) { + setup.get.apply() + } + testApplyIndex(sourceData.df(), filter, indexConfig, numExpectedFiles) + } + } + + def testApplyIndex( + sourceData: DataFrame, + filter: String, + indexConfig: DataSkippingIndexConfig, + numExpectedFiles: Int): Unit = { + val originalNumFiles = listFiles(dataPath()).filter(isParquet).length + val query = sourceData.filter(filter) + val plan = query.queryExecution.optimizedPlan + val indexLogEntry = createIndexLogEntry(indexConfig, sourceData) + val indexDataPred = indexLogEntry.derivedDataset + .asInstanceOf[DataSkippingIndex] + .translateFilterCondition( + spark, + plan.asInstanceOf[Filter].condition, + sourceData.queryExecution.optimizedPlan) + indexLogEntry.setTagValue(plan, IndexLogEntryTags.DATASKIPPING_INDEX_PREDICATE, indexDataPred) + val optimizedPlan = ApplyDataSkippingIndex.applyIndex( + plan, + Map(sourceData.queryExecution.optimizedPlan -> indexLogEntry)) + if (indexDataPred.isEmpty) { + assert(optimizedPlan === plan) + } else { + assert(optimizedPlan !== plan) + optimizedPlan match { + case Filter( + _, + LogicalRelation( + HadoopFsRelation(location: DataSkippingFileIndex, _, _, _, _, _), + _, + _, + _)) => + assert(location.indexDataPred === indexDataPred.get) + val optimizedDf = logicalPlanToDataFrame(spark, optimizedPlan) + checkAnswer(optimizedDf, query) + assert(numAccessedFiles(optimizedDf) === numExpectedFiles) + case _ => fail(s"unexpected optimizedPlan: $optimizedPlan") + } + } + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rule/FilterConditionFilterTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rule/FilterConditionFilterTest.scala new file mode 100644 index 000000000..042ea694e --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rule/FilterConditionFilterTest.scala @@ -0,0 +1,85 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.rule + +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions._ + +import com.microsoft.hyperspace.index.IndexLogEntryTags +import com.microsoft.hyperspace.index.dataskipping._ +import com.microsoft.hyperspace.index.dataskipping.sketch._ +import com.microsoft.hyperspace.index.plananalysis.FilterReasons.IneligibleFilterCondition + +class FilterConditionFilterTest extends DataSkippingSuite { + test("apply returns an empty map if there are no candidate indexes.") { + val df = spark.range(10).toDF("A") + assert(FilterConditionFilter(df.queryExecution.optimizedPlan, Map.empty) === Map.empty) + } + + test("apply returns an empty map if the plan is not a filter.") { + val df = createSourceData(spark.range(10).toDF("A")) + val indexConfig = DataSkippingIndexConfig("myind", MinMaxSketch("A")) + val indexLogEntry = createIndexLogEntry(indexConfig, df) + val candidateIndexes = Map(df.queryExecution.optimizedPlan -> Seq(indexLogEntry)) + assert(FilterConditionFilter(df.queryExecution.optimizedPlan, candidateIndexes) === Map.empty) + } + + test("apply creates an index data predicate if the index can be applied to the plan.") { + val df = createSourceData(spark.range(10).toDF("A")) + val indexConfig = DataSkippingIndexConfig("myind", MinMaxSketch("A")) + val indexLogEntry = createIndexLogEntry(indexConfig, df) + val candidateIndexes = Map(df.queryExecution.optimizedPlan -> Seq(indexLogEntry)) + val plan = df.filter("A = 1").queryExecution.optimizedPlan + assert(FilterConditionFilter(plan, candidateIndexes) === candidateIndexes) + val indexDataPredOpt = + indexLogEntry.getTagValue(plan, IndexLogEntryTags.DATASKIPPING_INDEX_PREDICATE) + assert( + indexDataPredOpt === Some(Some(And( + And( + IsNotNull(UnresolvedAttribute("MinMax_A__0")), + IsNotNull(UnresolvedAttribute("MinMax_A__1"))), + And( + LessThanOrEqual(UnresolvedAttribute("MinMax_A__0"), Literal(1L)), + GreaterThanOrEqual(UnresolvedAttribute("MinMax_A__1"), Literal(1L))))))) + } + + test("apply returns an empty map if the filter condition is not suitable.") { + val df = createSourceData(spark.range(10).selectExpr("id as A", "id * 2 as B")) + val indexConfig = DataSkippingIndexConfig("myind", MinMaxSketch("A")) + val indexLogEntry = createIndexLogEntry(indexConfig, df) + indexLogEntry.setTagValue(IndexLogEntryTags.INDEX_PLAN_ANALYSIS_ENABLED, true) + val candidateIndexes = Map(df.queryExecution.optimizedPlan -> Seq(indexLogEntry)) + val plan = df.filter("B = 1").queryExecution.optimizedPlan + assert(FilterConditionFilter(plan, candidateIndexes) === Map.empty) + val reason = indexLogEntry.getTagValue(plan, IndexLogEntryTags.FILTER_REASONS) + assert(reason === Some(List(IneligibleFilterCondition("((`B` IS NOT NULL) AND (`B` = 1L))")))) + } + + test("apply returns only the applicable indexes when there are multiple candidate indexes.") { + val df = createSourceData(spark.range(10).selectExpr("id as A", "id * 2 as B")) + val indexConfig1 = DataSkippingIndexConfig("myind", MinMaxSketch("A")) + val indexConfig2 = DataSkippingIndexConfig("myind", MinMaxSketch("B")) + val indexLogEntry1 = createIndexLogEntry(indexConfig1, df) + val indexLogEntry2 = createIndexLogEntry(indexConfig2, df) + val candidateIndexes = + Map(df.queryExecution.optimizedPlan -> Seq(indexLogEntry1, indexLogEntry2)) + val plan = df.filter("A = 1").queryExecution.optimizedPlan + assert( + FilterConditionFilter(plan, candidateIndexes) === Map( + df.queryExecution.optimizedPlan -> Seq(indexLogEntry1))) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rule/FilterPlanNodeFilterTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rule/FilterPlanNodeFilterTest.scala new file mode 100644 index 000000000..a52b25449 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/rule/FilterPlanNodeFilterTest.scala @@ -0,0 +1,51 @@ +/* + * Copyright (2021) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.dataskipping.rule + +import com.microsoft.hyperspace.index.dataskipping._ +import com.microsoft.hyperspace.index.dataskipping.sketch._ + +class FilterPlanNodeFilterTest extends DataSkippingSuite { + test("apply returns an empty map if there are no candidate indexes.") { + val df = spark.range(10).toDF("A") + assert(FilterPlanNodeFilter(df.queryExecution.optimizedPlan, Map.empty) === Map.empty) + } + + test("apply returns an empty map if the plan is not a filter.") { + val df = createSourceData(spark.range(10).toDF("A")) + val indexConfig = DataSkippingIndexConfig("myind", MinMaxSketch("A")) + val indexLogEntry = createIndexLogEntry(indexConfig, df) + val candidateIndexes = Map(df.queryExecution.optimizedPlan -> Seq(indexLogEntry)) + val plan = df.groupBy("A").count().queryExecution.optimizedPlan + assert(FilterPlanNodeFilter(plan, candidateIndexes) === Map.empty) + } + + test("apply returns applicable indexes only.") { + val df1 = createSourceData(spark.range(10).toDF("A"), "T1") + val df2 = createSourceData(spark.range(10).toDF("A"), "T2") + val indexConfig = DataSkippingIndexConfig("myind", MinMaxSketch("A")) + val indexLogEntry1 = createIndexLogEntry(indexConfig, df1) + val indexLogEntry2 = createIndexLogEntry(indexConfig, df2) + val candidateIndexes = Map( + df1.queryExecution.optimizedPlan -> Seq(indexLogEntry1), + df2.queryExecution.optimizedPlan -> Seq(indexLogEntry2)) + val plan = df1.filter("A = 1").queryExecution.optimizedPlan + assert( + FilterPlanNodeFilter(plan, candidateIndexes) === Map( + df1.queryExecution.optimizedPlan -> Seq(indexLogEntry1))) + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketch/MinMaxSketchTest.scala b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketch/MinMaxSketchTest.scala index f3db70545..14dee07a6 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketch/MinMaxSketchTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/dataskipping/sketch/MinMaxSketchTest.scala @@ -17,9 +17,13 @@ package com.microsoft.hyperspace.index.dataskipping.sketch import org.apache.spark.sql.{Column, QueryTest} +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ import org.mockito.Mockito.mock import com.microsoft.hyperspace.index.HyperspaceSuite +import com.microsoft.hyperspace.index.dataskipping.util.ExpressionUtils class MinMaxSketchTest extends QueryTest with HyperspaceSuite { import spark.implicits._ @@ -63,4 +67,290 @@ class MinMaxSketchTest extends QueryTest with HyperspaceSuite { assert(MinMaxSketch("A").hashCode === MinMaxSketch("A").hashCode) assert(MinMaxSketch("A").hashCode !== MinMaxSketch("a").hashCode) } + + test("convertPredicate converts EqualTo(, ).") { + val sketch = MinMaxSketch("A") + val predicate = EqualTo(AttributeReference("A", IntegerType)(ExprId(0)), Literal(42)) + val sketchValues = Seq(UnresolvedAttribute("min"), UnresolvedAttribute("max")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId))) + val expected = Some( + And( + And(IsNotNull(sketchValues(0)), IsNotNull(sketchValues(1))), + And( + LessThanOrEqual(sketchValues(0), Literal(42)), + GreaterThanOrEqual(sketchValues(1), Literal(42))))) + assert(result === expected) + } + + test("convertPredicate converts EqualTo(, ).") { + val sketch = MinMaxSketch("A") + val predicate = EqualTo(Literal(42), AttributeReference("A", IntegerType)(ExprId(0))) + val sketchValues = Seq(UnresolvedAttribute("min"), UnresolvedAttribute("max")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId))) + val expected = Some( + And( + And(IsNotNull(sketchValues(0)), IsNotNull(sketchValues(1))), + And( + LessThanOrEqual(sketchValues(0), Literal(42)), + GreaterThanOrEqual(sketchValues(1), Literal(42))))) + assert(result === expected) + } + + test("convertPredicate converts EqualTo(, ).") { + val sketch = MinMaxSketch("A.C") + val structAccess = GetStructField( + AttributeReference("A", StructType(Seq(StructField("C", IntegerType))))(ExprId(0)), + 0) + val predicate = EqualTo(structAccess, Literal(42)) + val sketchValues = Seq(UnresolvedAttribute("min"), UnresolvedAttribute("max")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(structAccess.transformUp { + case attr: AttributeReference => attr.withExprId(ExpressionUtils.nullExprId) + })) + val expected = Some( + And( + And(IsNotNull(sketchValues(0)), IsNotNull(sketchValues(1))), + And( + LessThanOrEqual(sketchValues(0), Literal(42)), + GreaterThanOrEqual(sketchValues(1), Literal(42))))) + assert(result === expected) + } + + test("convertPredicate converts EqualTo(, ).") { + val sketch = MinMaxSketch("A.B.C") + val structAccess = GetStructField( + GetStructField( + AttributeReference( + "A", + StructType(Seq(StructField("B", StructType(Seq(StructField("C", IntegerType)))))))( + ExprId(0)), + 0), + 0) + val predicate = EqualTo(structAccess, Literal(42)) + val sketchValues = Seq(UnresolvedAttribute("min"), UnresolvedAttribute("max")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(structAccess.transformUp { + case attr: AttributeReference => attr.withExprId(ExpressionUtils.nullExprId) + })) + val expected = Some( + And( + And(IsNotNull(sketchValues(0)), IsNotNull(sketchValues(1))), + And( + LessThanOrEqual(sketchValues(0), Literal(42)), + GreaterThanOrEqual(sketchValues(1), Literal(42))))) + assert(result === expected) + } + + test("convertPredicate converts EqualTo(, ) - string type.") { + val sketch = MinMaxSketch("A") + val predicate = + EqualTo(AttributeReference("A", StringType)(ExprId(0)), Literal.create("hello", StringType)) + val sketchValues = Seq(UnresolvedAttribute("min"), UnresolvedAttribute("max")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", StringType)(ExpressionUtils.nullExprId))) + val expected = Some( + And( + And(IsNotNull(sketchValues(0)), IsNotNull(sketchValues(1))), + And( + LessThanOrEqual(sketchValues(0), Literal.create("hello", StringType)), + GreaterThanOrEqual(sketchValues(1), Literal.create("hello", StringType))))) + assert(result === expected) + } + + test("convertPredicate converts EqualTo(, ) - double type.") { + val sketch = MinMaxSketch("A") + val predicate = + EqualTo(AttributeReference("A", StringType)(ExprId(0)), Literal(3.14, DoubleType)) + val sketchValues = Seq(UnresolvedAttribute("min"), UnresolvedAttribute("max")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", StringType)(ExpressionUtils.nullExprId))) + val expected = Some( + And( + And(IsNotNull(sketchValues(0)), IsNotNull(sketchValues(1))), + And( + LessThanOrEqual(sketchValues(0), Literal(3.14, DoubleType)), + GreaterThanOrEqual(sketchValues(1), Literal(3.14, DoubleType))))) + assert(result === expected) + } + + test("convertPredicate converts LessThan.") { + val sketch = MinMaxSketch("A") + val predicate = LessThan(AttributeReference("A", IntegerType)(ExprId(0)), Literal(42)) + val sketchValues = Seq(UnresolvedAttribute("min"), UnresolvedAttribute("max")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId))) + val expected = Some( + And( + And(IsNotNull(sketchValues(0)), IsNotNull(sketchValues(1))), + LessThan(sketchValues(0), Literal(42)))) + assert(result === expected) + } + + test("convertPredicate converts LessThan - string type.") { + val sketch = MinMaxSketch("A") + val predicate = LessThan( + AttributeReference("A", StringType)(ExprId(0)), + Literal.create("hello", StringType)) + val sketchValues = Seq(UnresolvedAttribute("min"), UnresolvedAttribute("max")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", StringType)(ExpressionUtils.nullExprId))) + val expected = Some( + And( + And(IsNotNull(sketchValues(0)), IsNotNull(sketchValues(1))), + LessThan(sketchValues(0), Literal.create("hello", StringType)))) + assert(result === expected) + } + + test("convertPredicate converts LessThanOrEqual.") { + val sketch = MinMaxSketch("A") + val predicate = LessThanOrEqual(AttributeReference("A", IntegerType)(ExprId(0)), Literal(42)) + val sketchValues = Seq(UnresolvedAttribute("min"), UnresolvedAttribute("max")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId))) + val expected = + Some( + And( + And(IsNotNull(sketchValues(0)), IsNotNull(sketchValues(1))), + LessThanOrEqual(sketchValues(0), Literal(42)))) + assert(result === expected) + } + + test("convertPredicate converts GreaterThan.") { + val sketch = MinMaxSketch("A") + val predicate = GreaterThan(AttributeReference("A", IntegerType)(ExprId(0)), Literal(42)) + val sketchValues = Seq(UnresolvedAttribute("min"), UnresolvedAttribute("max")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId))) + val expected = + Some( + And( + And(IsNotNull(sketchValues(0)), IsNotNull(sketchValues(1))), + GreaterThan(sketchValues(1), Literal(42)))) + assert(result === expected) + } + + test("convertPredicate converts GreaterThanOrEqual.") { + val sketch = MinMaxSketch("A") + val predicate = + GreaterThanOrEqual(AttributeReference("A", IntegerType)(ExprId(0)), Literal(42)) + val sketchValues = Seq(UnresolvedAttribute("min"), UnresolvedAttribute("max")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId))) + val expected = + Some( + And( + And(IsNotNull(sketchValues(0)), IsNotNull(sketchValues(1))), + GreaterThanOrEqual(sketchValues(1), Literal(42)))) + assert(result === expected) + } + + test("convertPredicate converts In.") { + val sketch = MinMaxSketch("A") + val predicate = + In(AttributeReference("A", IntegerType)(ExprId(0)), Seq(Literal(42), Literal(23))) + val sketchValues = Seq(UnresolvedAttribute("min"), UnresolvedAttribute("max")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId))) + val expected = Some( + And( + And(IsNotNull(sketchValues(0)), IsNotNull(sketchValues(1))), + Or( + And( + LessThanOrEqual(sketchValues(0), Literal(42)), + GreaterThanOrEqual(sketchValues(1), Literal(42))), + And( + LessThanOrEqual(sketchValues(0), Literal(23)), + GreaterThanOrEqual(sketchValues(1), Literal(23)))))) + assert(result === expected) + } + + test("convertPredicate converts In - string type.") { + val sketch = MinMaxSketch("A") + val predicate = + In( + AttributeReference("A", StringType)(ExprId(0)), + Seq(Literal.create("hello", StringType), Literal.create("world", StringType))) + val sketchValues = Seq(UnresolvedAttribute("min"), UnresolvedAttribute("max")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", StringType)(ExpressionUtils.nullExprId))) + val expected = Some( + And( + And(IsNotNull(sketchValues(0)), IsNotNull(sketchValues(1))), + Or( + And( + LessThanOrEqual(sketchValues(0), Literal.create("hello", StringType)), + GreaterThanOrEqual(sketchValues(1), Literal.create("hello", StringType))), + And( + LessThanOrEqual(sketchValues(0), Literal.create("world", StringType)), + GreaterThanOrEqual(sketchValues(1), Literal.create("world", StringType)))))) + assert(result === expected) + } + + test("convertPredicate does not convert Not(EqualTo(, )).") { + val sketch = MinMaxSketch("A") + val predicate = Not(EqualTo(AttributeReference("A", IntegerType)(ExprId(0)), Literal(42))) + val sketchValues = Seq(UnresolvedAttribute("min"), UnresolvedAttribute("max")) + val exprIdColMap = Map(ExprId(0) -> "A") + val result = sketch.convertPredicate( + predicate, + sketchValues, + exprIdColMap, + Seq(AttributeReference("A", IntegerType)(ExpressionUtils.nullExprId))) + val expected = None + assert(result === expected) + } }