Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Refactoring for an extensible Index API: Part 2 #474

Merged
merged 1 commit into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,14 @@ class RefreshIncrementalAction(
} else {
None
}
updatedIndex = Some(previousIndexLogEntry.derivedDataset
.refreshIncremental(this, appendedSourceData, deletedFiles, previousIndexLogEntry.content))
val (updatedIndex, updateMode) =
previousIndexLogEntry.derivedDataset.refreshIncremental(
this,
appendedSourceData,
deletedFiles,
previousIndexLogEntry.content)
updatedIndexOpt = Some(updatedIndex)
updateModeOpt = Some(updateMode)
}

/**
Expand All @@ -95,7 +101,8 @@ class RefreshIncrementalAction(
}
}

private var updatedIndex: Option[Index] = None
private var updatedIndexOpt: Option[Index] = None
private var updateModeOpt: Option[Index.UpdateMode] = None

/**
* Create a log entry with all source data files, and all required index content. This contains
Expand All @@ -106,15 +113,11 @@ class RefreshIncrementalAction(
* @return Refreshed index log entry.
*/
override def logEntry: LogEntry = {
val index = updatedIndex.getOrElse(previousIndexLogEntry.derivedDataset)
val index = updatedIndexOpt.getOrElse(previousIndexLogEntry.derivedDataset)
val entry =
getIndexLogEntry(spark, df, previousIndexLogEntry.name, index, indexDataPath, endId)

// If there is no deleted files, there are index data files only for appended data in this
// version and we need to add the index data files of previous index version.
// Otherwise, as previous index data is rewritten in this version while excluding
// indexed rows from deleted files, all necessary index data files exist in this version.
if (deletedFiles.isEmpty) {
if (updateModeOpt.contains(Index.UpdateMode.Merge)) {
// Merge new index files with old index files.
val mergedContent = Content(previousIndexLogEntry.content.root.merge(entry.content.root))
entry.copy(content = mergedContent)
Expand Down
40 changes: 14 additions & 26 deletions src/main/scala/com/microsoft/hyperspace/index/CoveringIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.spark.sql.functions.{col, input_file_name}
import org.apache.spark.sql.hyperspace.utils.StructTypeUtils
import org.apache.spark.sql.types.StructType

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer
import com.microsoft.hyperspace.util.ResolverUtils
import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn
Expand Down Expand Up @@ -85,7 +84,7 @@ case class CoveringIndex(
ctx: IndexerContext,
appendedSourceData: Option[DataFrame],
deletedSourceDataFiles: Seq[FileInfo],
indexContent: Content): CoveringIndex = {
indexContent: Content): (CoveringIndex, Index.UpdateMode) = {
val updatedIndex = if (appendedSourceData.nonEmpty) {
val (indexData, resolvedIndexedColumns, resolvedIncludedColumns) =
CoveringIndex.createIndexData(
Expand Down Expand Up @@ -123,7 +122,17 @@ case class CoveringIndex(
indexedColumns,
writeMode)
}
updatedIndex

// If there is no deleted files, there are index data files only for appended data in this
// version and we need to add the index data files of previous index version.
// Otherwise, as previous index data is rewritten in this version while excluding
// indexed rows from deleted files, all necessary index data files exist in this version.
val updatedMode = if (deletedSourceDataFiles.isEmpty) {
Index.UpdateMode.Merge
} else {
Index.UpdateMode.Overwrite
}
(updatedIndex, updatedMode)
}

override def refreshFull(
Expand Down Expand Up @@ -221,8 +230,8 @@ object CoveringIndex {
includedColumns: Seq[String],
hasLineageColumn: Boolean): (DataFrame, Seq[ResolvedColumn], Seq[ResolvedColumn]) = {
val spark = ctx.spark
val (resolvedIndexedColumns, resolvedIncludedColumns) =
resolveConfig(sourceData, indexedColumns, includedColumns)
val resolvedIndexedColumns = IndexUtils.resolveColumns(sourceData, indexedColumns)
val resolvedIncludedColumns = IndexUtils.resolveColumns(sourceData, includedColumns)
val projectColumns = (resolvedIndexedColumns ++ resolvedIncludedColumns).map(_.toColumn)

val indexData =
Expand Down Expand Up @@ -267,25 +276,4 @@ object CoveringIndex {

(indexData, resolvedIndexedColumns, resolvedIncludedColumns)
}

private def resolveConfig(
df: DataFrame,
indexedColumns: Seq[String],
includedColumns: Seq[String]): (Seq[ResolvedColumn], Seq[ResolvedColumn]) = {
val spark = df.sparkSession
val plan = df.queryExecution.analyzed
val resolvedIndexedColumns = ResolverUtils.resolve(spark, indexedColumns, plan)
val resolvedIncludedColumns = ResolverUtils.resolve(spark, includedColumns, plan)

(resolvedIndexedColumns, resolvedIncludedColumns) match {
case (Some(indexed), Some(included)) => (indexed, included)
case _ =>
val unresolvedColumns = (indexedColumns ++ includedColumns)
.map(c => (c, ResolverUtils.resolve(spark, Seq(c), plan).map(_.map(_.name))))
.collect { case (c, r) if r.isEmpty => c }
throw HyperspaceException(
s"Columns '${unresolvedColumns.mkString(",")}' could not be resolved " +
s"from available source columns:\n${df.schema.treeString}")
}
}
}
29 changes: 26 additions & 3 deletions src/main/scala/com/microsoft/hyperspace/index/Index.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,17 @@ trait Index {
* @param deletedSourceDataFiles Source data files deleted from the source
* since the creation of this index
* @param indexContent Unrefreshed index data files
* @return Updated index resulting from the indexing operation, or this
* index if no update is needed
* @return Pair of (updated index, update mode) where the first value is an
* updated index resulting from the indexing operation or this index
* if no update is needed, and the second value is whether the
* updated index data needs to be merged to the existing index data
* or the existing index data should be overwritten
*/
def refreshIncremental(
ctx: IndexerContext,
appendedSourceData: Option[DataFrame],
deletedSourceDataFiles: Seq[FileInfo],
indexContent: Content): Index
indexContent: Content): (Index, Index.UpdateMode)

/**
* Indexes the source data and returns an updated index and index data.
Expand All @@ -140,4 +143,24 @@ trait Index {
* index if no update is needed
*/
def refreshFull(ctx: IndexerContext, sourceData: DataFrame): (Index, DataFrame)

/**
* Returns true if and only if this index equals to that.
*
* Indexes only differing in their [[properties]] should be considered equal.
*/
def equals(that: Any): Boolean

/**
* Returns the hash code for this index.
*/
def hashCode: Int
}

object Index {
sealed trait UpdateMode
object UpdateMode {
case object Merge extends UpdateMode
case object Overwrite extends UpdateMode
}
}
23 changes: 23 additions & 0 deletions src/main/scala/com/microsoft/hyperspace/index/IndexUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

package com.microsoft.hyperspace.index

import org.apache.spark.sql.DataFrame

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.util.ResolverUtils
import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn

object IndexUtils {

/**
Expand All @@ -32,4 +38,21 @@ object IndexUtils {
.getOrElse(IndexConstants.LINEAGE_PROPERTY, IndexConstants.INDEX_LINEAGE_ENABLED_DEFAULT)
.toBoolean
}

def resolveColumns(df: DataFrame, columns: Seq[String]): Seq[ResolvedColumn] = {
val spark = df.sparkSession
val plan = df.queryExecution.analyzed
val resolvedColumns = ResolverUtils.resolve(spark, columns, plan)

resolvedColumns match {
case Some(cs) => cs
case _ =>
val unresolvedColumns = columns
.map(c => (c, ResolverUtils.resolve(spark, Seq(c), plan).map(_.map(_.name))))
.collect { case (c, r) if r.isEmpty => c }
throw HyperspaceException(
s"Columns '${unresolvedColumns.mkString(",")}' could not be resolved " +
s"from available source columns:\n${df.schema.treeString}")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,34 @@ class CreateActionTest extends HyperspaceSuite with SQLHelper {
ex.getMessage.contains(s"Another Index with name ${indexConfig.indexName} already exists"))
}

test("op() fails if index config is of wrong case and spark is case-sensitive") {
test("op() fails if indexed column is of wrong case and spark is case-sensitive") {
when(mockLogManager.getLatestLog()).thenReturn(Some(TestLogEntry(ACTIVE)))
val indexConfig = IndexConfig("index1", Seq("rgUID"), Seq("dATE"))
val indexConfig = IndexConfig("index1", Seq("rgUID"), Seq("Date"))
val action = new CreateAction(spark, df, indexConfig, mockLogManager, mockDataManager)
withSQLConf("spark.sql.caseSensitive" -> "true") {
val ex = intercept[HyperspaceException](action.op())
assert(
ex.getMessage.contains(
"Columns 'rgUID,dATE' could not be resolved from available " +
"Columns 'rgUID' could not be resolved from available " +
"source columns:\n" +
"root\n " +
"|-- Date: string (nullable = true)\n " +
"|-- RGUID: string (nullable = true)\n " +
"|-- Query: string (nullable = true)\n " +
"|-- imprs: integer (nullable = true)\n " +
"|-- clicks: integer (nullable = true)"))
}
}

test("op() fails if included config is of wrong case and spark is case-sensitive") {
when(mockLogManager.getLatestLog()).thenReturn(Some(TestLogEntry(ACTIVE)))
val indexConfig = IndexConfig("index1", Seq("RGUID"), Seq("dATE"))
val action = new CreateAction(spark, df, indexConfig, mockLogManager, mockDataManager)
withSQLConf("spark.sql.caseSensitive" -> "true") {
val ex = intercept[HyperspaceException](action.op())
assert(
ex.getMessage.contains(
"Columns 'dATE' could not be resolved from available " +
"source columns:\n" +
"root\n " +
"|-- Date: string (nullable = true)\n " +
Expand Down