Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Case Sensitivity: Support Case Insensitive Index Column Names #78

Merged
merged 43 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
2290de0
initial commit
apoorvedave1 Jun 30, 2020
8193f52
added test cases for creation with different case
apoorvedave1 Jun 30, 2020
c4651a1
improve tests for case insensitive index creation
apoorvedave1 Jun 30, 2020
918a443
Filter rule now supports case-insensitive matching
apoorvedave1 Jun 30, 2020
c1bb94c
revert FilterRule to go for resolver based implementation
apoorvedave1 Jun 30, 2020
c890056
Update Rules to use spark resolver
apoorvedave1 Jul 1, 2020
ff63d61
Update JoinRule to use resolver for case-sensitivity
apoorvedave1 Jul 1, 2020
ed05730
code cleanup and refactoring
apoorvedave1 Jul 1, 2020
2f26efc
fix for join rule tests failures
apoorvedave1 Jul 1, 2020
902aa8b
improve tests for case insensitive index creation
apoorvedave1 Jun 30, 2020
6d9d620
Update Rules to use spark resolver
apoorvedave1 Jul 1, 2020
b733a03
Update JoinRule to use resolver for case-sensitivity
apoorvedave1 Jul 1, 2020
fdeaa25
code cleanup and refactoring
apoorvedave1 Jul 1, 2020
c9d2113
fix for join rule tests failures
apoorvedave1 Jul 1, 2020
469d432
Merge branch 'casesensitivity' of github.com:apoorvedave1/hyperspace-…
apoorvedave1 Jul 2, 2020
d0d5c35
Merge remote-tracking branch 'upstream/master' into casesensitivity
apoorvedave1 Jul 16, 2020
a0fbf77
review comments and remove isResolve methods
apoorvedave1 Jul 17, 2020
f8cee6d
replace iterable with Seq
apoorvedave1 Jul 17, 2020
2372ae9
Lots of major changes and bug fixes
apoorvedave1 Jul 17, 2020
ed77af3
minor variable name fix
apoorvedave1 Jul 20, 2020
b769c60
code cleanup to use canonicalized
apoorvedave1 Jul 20, 2020
919a241
Merge remote-tracking branch 'upstream/master' into casesensitivity
apoorvedave1 Jul 20, 2020
603008e
git commit -m merge errors fixed
apoorvedave1 Jul 20, 2020
b33daa9
Merge remote-tracking branch 'upstream/master' into casesensitivity
apoorvedave1 Jul 21, 2020
45d3f14
Merge remote-tracking branch 'upstream/master' into casesensitivity
apoorvedave1 Jul 21, 2020
a95a544
simplify code
apoorvedave1 Jul 22, 2020
090a3af
review comments
apoorvedave1 Jul 22, 2020
489908c
added comments
apoorvedave1 Jul 22, 2020
fca13ae
show unresolved columns in output during create
apoorvedave1 Jul 22, 2020
ee30227
review comment
apoorvedave1 Jul 23, 2020
ce72622
comments cleaned up
apoorvedave1 Jul 23, 2020
4b6c8ef
Merge remote-tracking branch 'upstream/master' into casesensitivity
apoorvedave1 Jul 23, 2020
a29b3d7
review comments
apoorvedave1 Jul 27, 2020
83a4aff
Merge remote-tracking branch 'upstream/master' into casesensitivity
apoorvedave1 Jul 27, 2020
7cc19de
Merge remote-tracking branch 'upstream/master' into casesensitivity
apoorvedave1 Aug 3, 2020
a602ee3
some review comments
apoorvedave1 Aug 3, 2020
5671e4a
more review comments
apoorvedave1 Aug 3, 2020
ef60e78
revert removed check
apoorvedave1 Aug 3, 2020
cab7582
Merge remote-tracking branch 'upstream/master' into casesensitivity
apoorvedave1 Aug 3, 2020
04c410e
review comments
apoorvedave1 Aug 4, 2020
12235d7
added missing test cases
apoorvedave1 Aug 4, 2020
b52b27d
Merge remote-tracking branch 'upstream/master' into casesensitivity
apoorvedave1 Aug 4, 2020
e895dcd
removed empty line
apoorvedave1 Aug 4, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, CREATING, DOESNOTEXIST}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.telemetry.{AppInfo, CreateActionEvent, HyperspaceEvent}
import com.microsoft.hyperspace.util.LogicalPlanUtils
import com.microsoft.hyperspace.util.{LogicalPlanUtils, ResolverUtils}

class CreateAction(
spark: SparkSession,
Expand Down Expand Up @@ -64,12 +64,11 @@ class CreateAction(
}
}

private def isValidIndexSchema(indexConfig: IndexConfig, schema: StructType): Boolean = {
val validColumnNames = schema.fieldNames
val indexedColumns = indexConfig.indexedColumns
val includedColumns = indexConfig.includedColumns
indexedColumns.forall(validColumnNames.contains) && includedColumns.forall(
validColumnNames.contains)
private def isValidIndexSchema(config: IndexConfig, schema: StructType): Boolean = {
// Resolve index config columns from available column names present in the schema.
ResolverUtils
.resolve(spark, config.indexedColumns ++ config.includedColumns, schema.fieldNames)
.isDefined
}

// TODO: The following should be protected, but RefreshAction is calling CreateAction.op().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer
import com.microsoft.hyperspace.index.serde.LogicalPlanSerDeUtils
import com.microsoft.hyperspace.util.ResolverUtils

/**
* CreateActionBase provides functionality to write dataframe as covering index.
Expand All @@ -50,8 +51,10 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)

val signatureProvider = LogicalPlanSignatureProvider.create()

// Resolve the passed column names with existing column names from the dataframe.
val (resolvedIndexedColumns, resolvedIncludedColumns) = resolveConfig(df, indexConfig)
val schema = {
val allColumns = indexConfig.indexedColumns ++ indexConfig.includedColumns
val allColumns = resolvedIndexedColumns ++ resolvedIncludedColumns
df.select(allColumns.head, allColumns.tail: _*).schema
}

Expand All @@ -77,7 +80,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
CoveringIndex(
CoveringIndex.Properties(
CoveringIndex.Properties
.Columns(indexConfig.indexedColumns, indexConfig.includedColumns),
.Columns(resolvedIndexedColumns, resolvedIncludedColumns),
IndexLogEntry.schemaString(schema),
numBuckets)),
Content(path.toString, Seq()),
Expand All @@ -104,20 +107,43 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
IndexConstants.INDEX_NUM_BUCKETS,
IndexConstants.INDEX_NUM_BUCKETS_DEFAULT.toString)
.toInt
val selectedColumns = indexConfig.indexedColumns ++ indexConfig.includedColumns

val (resolvedIndexedColumns, resolvedIncludedColumns) = resolveConfig(df, indexConfig)
val selectedColumns = resolvedIndexedColumns ++ resolvedIncludedColumns
val indexDataFrame = df.select(selectedColumns.head, selectedColumns.tail: _*)
val indexColNames = indexConfig.indexedColumns

// run job
val repartitionedIndexDataFrame =
indexDataFrame.repartition(numBuckets, indexColNames.map(df(_)): _*)
indexDataFrame.repartition(numBuckets, resolvedIndexedColumns.map(df(_)): _*)

// Save the index with the number of buckets specified.
repartitionedIndexDataFrame.write
.saveWithBuckets(
repartitionedIndexDataFrame,
indexDataPath.toString,
numBuckets,
indexColNames)
resolvedIndexedColumns)
}

private def resolveConfig(
df: DataFrame,
indexConfig: IndexConfig): (Seq[String], Seq[String]) = {
val spark = df.sparkSession
val dfColumnNames = df.schema.fieldNames
val indexedColumns = indexConfig.indexedColumns
val includedColumns = indexConfig.includedColumns
val resolvedIndexedColumns = ResolverUtils.resolve(spark, indexedColumns, dfColumnNames)
val resolvedIncludedColumns = ResolverUtils.resolve(spark, includedColumns, dfColumnNames)

(resolvedIndexedColumns, resolvedIncludedColumns) match {
case (Some(indexed), Some(included)) => (indexed, included)
case _ =>
val unresolvedColumns = (indexedColumns ++ includedColumns)
.map(c => (c, ResolverUtils.resolve(spark, c, dfColumnNames)))
.collect { case c if c._2.isEmpty => c._1 }
throw HyperspaceException(
s"Columns $unresolvedColumns could not be resolved " +
s"from available source columns $dfColumnNames")
apoorvedave1 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.types.StructType
import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.index.IndexLogEntry
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
import com.microsoft.hyperspace.util.ResolverUtils

/**
* FilterIndex rule looks for opportunities in a logical plan to replace
Expand Down Expand Up @@ -137,8 +138,6 @@ object FilterIndexRule
* For a given relation, find all available indexes on it which fully cover given output and
* filter columns.
*
* TODO: This method is duplicated in FilterIndexRule and JoinIndexRule. Deduplicate.
*
* @param filter Filter node in the subplan that is being optimized.
* @param outputColumns List of output columns in subplan.
* @param filterColumns List of columns in filter predicate.
Expand Down Expand Up @@ -193,8 +192,8 @@ object FilterIndexRule
val allColumnsInIndex = indexedColumns ++ includedColumns

// TODO: Normalize predicates into CNF and incorporate more conditions.
filterColumns.contains(indexedColumns.head) &&
allColumnsInPlan.forall(allColumnsInIndex.contains)
ResolverUtils.resolve(spark, indexedColumns.head, filterColumns).isDefined &&
ResolverUtils.resolve(spark, allColumnsInPlan, allColumnsInIndex).isDefined
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
package com.microsoft.hyperspace.index.rules

import scala.collection.mutable
import scala.util.Try

import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.analysis.CleanupAliases
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, EqualTo, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, AttributeReference, AttributeSet, EqualTo, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project}
apoorvedave1 marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
Expand All @@ -33,6 +34,7 @@ import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.rankers.JoinIndexRanker
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
import com.microsoft.hyperspace.util.ResolverUtils._

/**
* Rule to optimize a join between two indexed dataframes.
Expand Down Expand Up @@ -234,9 +236,6 @@ object JoinIndexRule
* WHERE newT1.aliasCol = T2.b
* Here, aliasCol is not directly from the base relation T1
*
* TODO: add alias resolver for supporting aliases in join condition. Until then,
apoorvedave1 marked this conversation as resolved.
Show resolved Hide resolved
* make sure this scenario isn't supported
*
* 2. For each equality condition in the join predicate, one attribute must belong to the left
* subplan, and another from right subplan.
* E.g. A = B => A should come from left and B should come from right or vice versa.
Expand Down Expand Up @@ -287,35 +286,45 @@ object JoinIndexRule
l: LogicalPlan,
r: LogicalPlan,
condition: Expression): Boolean = {

val conditions = extractConditions(condition)
apoorvedave1 marked this conversation as resolved.
Show resolved Hide resolved
// Output attributes from base relations. Join condition attributes must belong to these
// attributes
val lBaseAttrs = l.collectLeaves().filter(_.isInstanceOf[LogicalRelation]).flatMap(_.output)
val rBaseAttrs = r.collectLeaves().filter(_.isInstanceOf[LogicalRelation]).flatMap(_.output)
// attributes. We work on canonicalized forms to make sure we support case-sensitivity.
val lBaseAttrs = l
apoorvedave1 marked this conversation as resolved.
Show resolved Hide resolved
.collectLeaves()
.filter(_.isInstanceOf[LogicalRelation])
.flatMap(_.output)
.map(_.canonicalized)
val rBaseAttrs = r
.collectLeaves()
.filter(_.isInstanceOf[LogicalRelation])
.flatMap(_.output)
.map(_.canonicalized)

def fromDifferentBaseRelations(c1: Expression, c2: Expression): Boolean = {
def contains(attributes: Seq[Attribute], column: Expression): Boolean = {
attributes.exists(_.semanticEquals(column))
}
(contains(lBaseAttrs, c1) && contains(rBaseAttrs, c2)) ||
(contains(lBaseAttrs, c2) && contains(rBaseAttrs, c1))
(lBaseAttrs.contains(c1) && rBaseAttrs.contains(c2)) ||
(lBaseAttrs.contains(c2) && rBaseAttrs.contains(c1))
}

// Map to maintain and check one-to-one relation between join condition attributes
// Map to maintain and check one-to-one relation between join condition attributes. For join
// condition attributes, we store their corresponding base relation attributes in the map.
// This ensures uniqueness of attributes in case of case-insensitive system. E.g. We want to
// store just one copy of column 'A' when join condition contains column 'A' as well as 'a'.
val attrMap = new mutable.HashMap[Expression, Expression]()

extractConditions(condition).forall {
conditions.forall {
case EqualTo(c1, c2) =>
// c1 and c2 should belong to l and r respectively, or r and l respectively.
if (!fromDifferentBaseRelations(c1, c2)) {
val (c1Canonicalized, c2Canonicalized) = (c1.canonicalized, c2.canonicalized)
apoorvedave1 marked this conversation as resolved.
Show resolved Hide resolved
// Check 1: c1 and c2 should belong to l and r respectively, or r and l respectively.
if (!fromDifferentBaseRelations(c1Canonicalized, c2Canonicalized)) {
return false
}
// The following validates that c1 is compared only against c2 and vice versa
if (attrMap.contains(c1) && attrMap.contains(c2)) {
attrMap(c1).equals(c2) && attrMap(c2).equals(c1)
} else if (!attrMap.contains(c1) && !attrMap.contains(c2)) {
attrMap.put(c1, c2)
attrMap.put(c2, c1)
// Check 2: c1 is compared only against c2 and vice versa.
if (attrMap.contains(c1Canonicalized) && attrMap.contains(c2Canonicalized)) {
attrMap(c1Canonicalized).equals(c2Canonicalized) &&
attrMap(c2Canonicalized).equals(c1Canonicalized)
} else if (!attrMap.contains(c1Canonicalized) && !attrMap.contains(c2Canonicalized)) {
attrMap.put(c1Canonicalized, c2Canonicalized)
attrMap.put(c2Canonicalized, c1Canonicalized)
true
} else {
false
Expand All @@ -340,17 +349,24 @@ object JoinIndexRule
joinCondition: Expression,
lIndexes: Seq[IndexLogEntry],
rIndexes: Seq[IndexLogEntry]): Option[(IndexLogEntry, IndexLogEntry)] = {
val lBaseAttrs =
left.collectLeaves().filter(_.isInstanceOf[LogicalRelation]).flatMap(_.output).map(_.name)
val rBaseAttrs =
right.collectLeaves().filter(_.isInstanceOf[LogicalRelation]).flatMap(_.output).map(_.name)

val lRequiredIndexedCols = requiredIndexedCols(left, joinCondition)
val rRequiredIndexedCols = requiredIndexedCols(right, joinCondition)
val lRMap = getLRColumnMapping(lRequiredIndexedCols, rRequiredIndexedCols, joinCondition)
// Map of left resolved columns with their corresponding right resolved
// columns from condition.
val lRMap = getLRColumnMapping(lBaseAttrs, rBaseAttrs, joinCondition)
val lRequiredIndexedCols = lRMap.keys.toSeq
val rRequiredIndexedCols = lRMap.values.toSeq

val lRequiredAllCols = allRequiredCols(left)
val rRequiredAllCols = allRequiredCols(right)
// All required columns resolved with base relation.
val lRequiredAllCols: Seq[String] = resolve(spark, allRequiredCols(left), lBaseAttrs).get
apoorvedave1 marked this conversation as resolved.
Show resolved Hide resolved
val rRequiredAllCols: Seq[String] = resolve(spark, allRequiredCols(right), rBaseAttrs).get

// Make sure required indexed columns are subset of all required columns for a subplan
require(lRequiredIndexedCols.forall(lRequiredAllCols.contains))
require(rRequiredIndexedCols.forall(rRequiredAllCols.contains))
require(resolve(spark, lRequiredIndexedCols, lRequiredAllCols).isDefined)
require(resolve(spark, rRequiredIndexedCols, rRequiredAllCols).isDefined)

val lUsable = getUsableIndexes(lIndexes, lRequiredIndexedCols, lRequiredAllCols)
val rUsable = getUsableIndexes(rIndexes, rRequiredIndexedCols, rRequiredAllCols)
Expand All @@ -359,27 +375,6 @@ object JoinIndexRule
compatibleIndexPairs.map(indexPairs => JoinIndexRanker.rank(indexPairs).head)
}

/**
* Returns list of column names which must be in the indexed columns config of a selected
* index.
*
* @param plan logical plan
* @param condition join condition used to pull out required indexed columns in the plan
* @return list of column names from this plan which must be part of indexed columns in a chosen
* index
*/
private def requiredIndexedCols(plan: LogicalPlan, condition: Expression): Seq[String] = {
val cleanedPlan = CleanupAliases(plan)
apoorvedave1 marked this conversation as resolved.
Show resolved Hide resolved
CleanupAliases
.trimNonTopLevelAliases(condition)
.references
.collect {
case column if cleanedPlan.references.contains(column) => column.name
}
.toSeq
.distinct
}

/**
* Returns list of column names which must be present in either the indexed or the included
* columns list of a selected index. For this, collect all columns referenced in the plan
Expand Down Expand Up @@ -435,28 +430,30 @@ object JoinIndexRule
*
* This mapping is used to find compatible indexes of T1 and T2.
*
* @param lRequiredIndexedCols required indexed columns from left plan
* @param rRequiredIndexedCols required indexed columns from right plan
* @param leftBaseAttrs required indexed columns from left plan
* @param rightBaseAttrs required indexed columns from right plan
* @param condition join condition which will be used to find the left-right column mapping
* @return Mapping of corresponding columns from left and right, depending on the join
* condition. The keys represent columns from left subplan. The values are columns from
* right subplan.
*/
private def getLRColumnMapping(
lRequiredIndexedCols: Seq[String],
rRequiredIndexedCols: Seq[String],
leftBaseAttrs: Seq[String],
rightBaseAttrs: Seq[String],
condition: Expression): Map[String, String] = {
extractConditions(condition).map {
case EqualTo(attr1: AttributeReference, attr2: AttributeReference)
if lRequiredIndexedCols.contains(attr1.name) && rRequiredIndexedCols.contains(
attr2.name) =>
(attr1.name, attr2.name)
case EqualTo(attr1: AttributeReference, attr2: AttributeReference)
if lRequiredIndexedCols.contains(attr2.name) && rRequiredIndexedCols.contains(
attr1.name) =>
(attr2.name, attr1.name)
case _ =>
throw new IllegalStateException("Unexpected exception while using join rule")
case EqualTo(attr1: AttributeReference, attr2: AttributeReference) =>
Try {
(resolve(spark, attr1.name, leftBaseAttrs).get,
resolve(spark, attr2.name, rightBaseAttrs).get)
}.getOrElse {
Try {
(resolve(spark, attr2.name, leftBaseAttrs).get,
resolve(spark, attr1.name, rightBaseAttrs).get)
}.getOrElse {
throw new IllegalStateException("Unexpected exception while using join rule")
}
}
}.toMap
}

Expand All @@ -478,9 +475,12 @@ object JoinIndexRule
/**
* Get usable indexes which satisfy indexed and included column requirements.
*
* Pre-requisite: the indexed and included columns required must be already resolved with their
* corresponding base relation columns at this point.
*
* @param indexes All available indexes for the logical plan
* @param requiredIndexCols required indexed columns
* @param allRequiredCols required included columns
* @param requiredIndexCols required indexed columns resolved with their base relation column.
* @param allRequiredCols required included columns resolved with their base relation column.
* @return Indexes which satisfy the indexed and covering column requirements from the logical
* plan and join condition
*/
Expand All @@ -490,6 +490,9 @@ object JoinIndexRule
allRequiredCols: Seq[String]): Seq[IndexLogEntry] = {
indexes.filter { idx =>
val allCols = idx.indexedColumns ++ idx.includedColumns

// All required index columns should match one-to-one with all indexed columns and
// vice-versa. All required columns must be present in the available index columns.
requiredIndexCols.toSet.equals(idx.indexedColumns.toSet) &&
allRequiredCols.forall(allCols.contains)
}
Expand Down Expand Up @@ -558,9 +561,6 @@ object JoinIndexRule
lIndex: IndexLogEntry,
rIndex: IndexLogEntry,
columnMapping: Map[String, String]): Boolean = {
require(columnMapping.keys.toSet.equals(lIndex.indexedColumns.toSet))
apoorvedave1 marked this conversation as resolved.
Show resolved Hide resolved
require(columnMapping.values.toSet.equals(rIndex.indexedColumns.toSet))

val requiredRightIndexedCols = lIndex.indexedColumns.map(columnMapping)
rIndex.indexedColumns.equals(requiredRightIndexedCols)
}
Expand Down
Loading