Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DV table plan transformer trait to prune the deleted rows from scan output #1560

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ trait DeltaColumnMappingBase extends DeltaLogging {
protected val DELTA_INTERNAL_COLUMNS: Set[String] =
(CDCReader.CDC_COLUMNS_IN_DATA ++ Seq(
CDCReader.CDC_COMMIT_VERSION,
CDCReader.CDC_COMMIT_TIMESTAMP)
CDCReader.CDC_COMMIT_TIMESTAMP,
DeltaParquetFileFormat.IS_ROW_DELETED_COLUMN_NAME)
).map(_.toLowerCase(Locale.ROOT)).toSet

val supportedModes: Set[DeltaColumnMappingMode] =
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,16 @@ trait DeltaConfigsBase extends DeltaLogging {
helpMessage = "needs to be a boolean.",
minimumProtocolVersion = Some(AppendOnlyTableFeature.minProtocolVersion))

/**
* Whether commands modifying this Delta table are allowed to create new deletion vectors.
*/
val ENABLE_DELETION_VECTORS_CREATION = buildConfig[Boolean](
key = "enableDeletionVectors",
defaultValue = "false",
fromString = _.toBoolean,
validationFunction = _ => true,
helpMessage = "needs to be a boolean.",
minimumProtocolVersion = Some(DeletionVectorsTableFeature.minProtocolVersion))

/**
* Whether this table will automatically optimize the layout of files during writes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.apache.spark.sql.delta

import java.net.URI

import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

Expand Down Expand Up @@ -49,12 +51,21 @@ class DeltaParquetFileFormat(
val isSplittable: Boolean = true,
val disablePushDowns: Boolean = false,
val tablePath: Option[String] = None,
val broadcastDvMap: Option[Broadcast[Map[String, DeletionVectorDescriptor]]] = None,
val broadcastDvMap: Option[Broadcast[Map[URI, DeletionVectorDescriptor]]] = None,
val broadcastHadoopConf: Option[Broadcast[SerializableConfiguration]] = None)
extends ParquetFileFormat {
// Validate either we have all arguments for DV enabled read or none of them.
require(!(broadcastHadoopConf.isDefined ^ broadcastDvMap.isDefined ^ tablePath.isDefined ^
!isSplittable ^ disablePushDowns))
if (broadcastHadoopConf.isDefined) {
require(
broadcastHadoopConf.isDefined && broadcastDvMap.isDefined &&
tablePath.isDefined && !isSplittable && disablePushDowns,
"Wrong arguments for Delta table scan with deletion vectors")
} else {
require(
broadcastHadoopConf.isEmpty && broadcastDvMap.isEmpty &&
tablePath.isEmpty && isSplittable && !disablePushDowns,
"Wrong arguments for Delta table scan with no deletion vectors")
}

val columnMappingMode: DeltaColumnMappingMode = metadata.columnMappingMode
val referenceSchema: StructType = metadata.schema
Expand Down Expand Up @@ -147,7 +158,7 @@ class DeltaParquetFileFormat(

def copyWithDVInfo(
tablePath: String,
broadcastDvMap: Broadcast[Map[String, DeletionVectorDescriptor]],
broadcastDvMap: Broadcast[Map[URI, DeletionVectorDescriptor]],
broadcastHadoopConf: Broadcast[SerializableConfiguration]): DeltaParquetFileFormat = {
new DeltaParquetFileFormat(
metadata,
Expand All @@ -169,10 +180,10 @@ class DeltaParquetFileFormat(
isRowDeletedColumnIdx: Int,
useOffHeapBuffers: Boolean): Iterator[Object] = {
val filePath = partitionedFile.filePath
val absolutePath = new Path(filePath).toString
val pathUri = new Path(filePath).toUri

// Fetch the DV descriptor from the broadcast map and create a row index filter
val dvDescriptor = broadcastDvMap.get.value.get(absolutePath)
val dvDescriptor = broadcastDvMap.get.value.get(pathUri)
val rowIndexFilter = DeletedRowsMarkingFilter.createInstance(
dvDescriptor.getOrElse(DeletionVectorDescriptor.EMPTY),
broadcastHadoopConf.get.value.value,
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaUDF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ object DeltaUDF {
def booleanFromMap(f: Map[String, String] => Boolean): UserDefinedFunction =
createUdfFromTemplateUnsafe(booleanFromMapTemplate, f, udf(f))

def booleanFromByte(x: Byte => Boolean): UserDefinedFunction =
createUdfFromTemplateUnsafe(booleanFromByteTemplate, x, udf(x))

private lazy val stringFromStringTemplate =
udf[String, String](identity).asInstanceOf[SparkUserDefinedFunction]

Expand All @@ -64,6 +67,9 @@ object DeltaUDF {
private lazy val booleanFromMapTemplate =
udf((_: Map[String, String]) => true).asInstanceOf[SparkUserDefinedFunction]

private lazy val booleanFromByteTemplate =
udf((_: Byte) => true).asInstanceOf[SparkUserDefinedFunction]

/**
* Return a `UserDefinedFunction` for the given `f` from `template` if
* `INTERNAL_UDF_OPTIMIZATION_ENABLED` is enabled. Otherwise, `orElse` will be called to create a
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import java.net.URI

import org.apache.spark.sql.delta.RowIndexFilter
import org.apache.spark.sql.delta.DeltaParquetFileFormat._
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable
import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.DeltaFileOperations.absolutePath

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

/**
* Plan transformer to inject a filter that removes the rows marked as deleted according to
* deletion vectors. For tables with no deletion vectors, this transformation has no effect.
*
* It modifies for plan for tables with deletion vectors as follows:
* Before rule: <Parent Node> -> Delta Scan (key, value).
* - Here we are reading `key`, `value`` columns from the Delta table
* After rule:
* <Parent Node> ->
* Project(key, value) ->
* Filter (udf(__skip_row == 0) ->
* Delta Scan (key, value, __skip_row)
* - Here we insert a new column `__skip_row` in Delta scan. This value is populated by the
* Parquet reader using the DV corresponding to the Parquet file read
* (See [[DeltaParquetFileFormat]]) and it contains 0 if we want to keep the row.
* The scan created also disables Parquet file splitting and filter pushdowns, because
* in order to generate the __skip_row, we need to read the rows in a file consecutively
* to generate the row index. This is a cost we need to pay until we upgrade to latest
* Apache Spark which contains Parquet reader changes that automatically generate the
* row_index irrespective of the file splitting and filter pushdowns.
* - The scan created also contains a broadcast variable of Parquet File -> DV File map.
* The Parquet reader created uses this map to find the DV file corresponding to the data file.
* - Filter created filters out rows with __skip_row equals to 0
* - And at the end we have a Project to keep the plan node output same as before the rule is
* applied.
*/
trait PreprocessTableWithDVs extends SubqueryTransformerHelper {
def preprocessTablesWithDVs(plan: LogicalPlan): LogicalPlan = {
transformSubqueryExpressions(plan) {
case ScanWithDeletionVectors(dvScan) => dvScan
}
}
}

object ScanWithDeletionVectors {
def unapply(a: LogicalRelation): Option[LogicalPlan] = a match {
case scan @ LogicalRelation(
relation @ HadoopFsRelation(
index: TahoeFileIndex, _, _, _, format: DeltaParquetFileFormat, _), _, _, _) =>
dvEnabledScanFor(scan, relation, format, index)
case _ => None
}

def dvEnabledScanFor(
scan: LogicalRelation,
hadoopRelation: HadoopFsRelation,
fileFormat: DeltaParquetFileFormat,
index: TahoeFileIndex): Option[LogicalPlan] = {
// If the table has no DVs enabled, no change needed
if (!deletionVectorsReadable(index.protocol, index.metadata)) return None

require(!index.isInstanceOf[TahoeLogFileIndex],
"Cannot work with a non-pinned table snapshot of the TahoeFileIndex")

// If the table has no DVs enabled, no change needed
if (!deletionVectorsReadable(index.protocol, index.metadata)) return None

// See if the relation is already modified to include DV reads as part of
// a previous invocation of this rule on this table
if (fileFormat.hasDeletionVectorMap()) return None

// See if any files actually have a DV
val spark = SparkSession.getActiveSession.get
zsxwing marked this conversation as resolved.
Show resolved Hide resolved
val filePathToDVBroadcastMap = createBroadcastDVMap(spark, index)
if (filePathToDVBroadcastMap.value.isEmpty) return None

// Get the list of columns in the output of the `LogicalRelation` we are
// trying to modify. At the end of the plan, we need to return a
// `LogicalRelation` that has the same output as this `LogicalRelation`
val planOutput = scan.output

val newScan = createScanWithSkipRowColumn(
spark, scan, fileFormat, index, filePathToDVBroadcastMap, hadoopRelation)

// On top of the scan add a filter that filters out the rows which have
// skip row column value non-zero
val rowIndexFilter = createRowIndexFilterNode(newScan)

// Now add a project on top of the row index filter node to
// remove the skip row column
Some(Project(planOutput, rowIndexFilter))
}
/**
* Helper method that creates a new `LogicalRelation` for existing scan that outputs
* an extra column which indicates whether the row needs to be skipped or not.
*/
private def createScanWithSkipRowColumn(
spark: SparkSession,
inputScan: LogicalRelation,
fileFormat: DeltaParquetFileFormat,
tahoeFileIndex: TahoeFileIndex,
filePathToDVBroadcastMap: Broadcast[Map[URI, DeletionVectorDescriptor]],
hadoopFsRelation: HadoopFsRelation): LogicalRelation = {
// Create a new `LogicalRelation` that has modified `DeltaFileFormat` and output with an extra
// column to indicate whether to skip the row or not

// Add a column for SKIP_ROW to the base output. Value of 0 means the row needs be kept, any
// other values mean the row needs be skipped.
val skipRowField = IS_ROW_DELETED_STRUCT_FIELD
val newScanOutput = inputScan.output :+
AttributeReference(skipRowField.name, skipRowField.dataType)()
val newScanSchema = StructType(inputScan.schema).add(skipRowField)

val hadoopConfBroadcast = spark.sparkContext.broadcast(
new SerializableConfiguration(tahoeFileIndex.deltaLog.newDeltaHadoopConf()))

val newFileFormat = fileFormat.copyWithDVInfo(
tahoeFileIndex.path.toString, filePathToDVBroadcastMap, hadoopConfBroadcast)
val newRelation = hadoopFsRelation.copy(
fileFormat = newFileFormat,
dataSchema = newScanSchema)(hadoopFsRelation.sparkSession)

// Create a new scan LogicalRelation
inputScan.copy(relation = newRelation, output = newScanOutput)
}

private def createRowIndexFilterNode(newScan: LogicalRelation): Filter = {
val skipRowColumnRefs = newScan.output.filter(_.name == IS_ROW_DELETED_COLUMN_NAME)
require(skipRowColumnRefs.size == 1,
s"Expected only one column with name=$IS_ROW_DELETED_COLUMN_NAME")
val skipRowColumnRef = skipRowColumnRefs.head

val keepRow = DeltaUDF.booleanFromByte( _ == RowIndexFilter.KEEP_ROW_VALUE)
.asNondeterministic() // To avoid constant folding the filter based on stats.

val filterExp = keepRow(new Column(skipRowColumnRef)).expr
Filter(filterExp, newScan)
}

private def createBroadcastDVMap(
spark: SparkSession,
tahoeFileIndex: TahoeFileIndex): Broadcast[Map[URI, DeletionVectorDescriptor]] = {
// Given there is no way to find the final filters, just select all files in the
// file index and create the DV map.
val filesWithDVs =
tahoeFileIndex.matchingFiles(Seq(TrueLiteral), Seq(TrueLiteral))
.filter(_.deletionVector != null)
val filePathToDVMap = filesWithDVs
.map(x =>
absolutePath(tahoeFileIndex.path.toString, x.path).toUri -> x.deletionVector)
.toMap
spark.sparkContext.broadcast(filePathToDVMap)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ class Snapshot(
col("add.modificationTime"),
col("add.dataChange"),
col(ADD_STATS_TO_USE_COL_NAME).as("stats"),
col("add.tags")
col("add.tags"),
col("add.deletionVector")
)))
.withColumn("remove", when(
col("remove.path").isNotNull,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery, SupportsSubquery}

/**
* Trait to allow processing a special transformation of [[SubqueryExpression]]
* instances in a query plan.
*/
trait SubqueryTransformerHelper {

/**
* Transform all nodes matched by the rule in the query plan rooted at given `plan`.
* It traverses the tree starting from the leaves, whenever a [[SubqueryExpression]]
* expression is encountered, given [[rule]] is applied to the subquery plan `plan`
* in [[SubqueryExpression]] starting from the `plan` root until leaves.
*
* This is slightly different behavior compared to [[QueryPlan.transformUpWithSubqueries]]
* or [[QueryPlan.transformDownWithSubqueries]]
*
* It requires that the given plan already gone through [[OptimizeSubqueries]] and the
* root node denoting a subquery is removed and optimized appropriately.
*/
def transformSubqueryExpressions(plan: LogicalPlan)
(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
require(!isSubqueryRoot(plan))
transformSubqueries(plan, rule) transform (rule)
}

/** Is the give plan a subquery root. */
def isSubqueryRoot(plan: LogicalPlan): Boolean = {
plan.isInstanceOf[Subquery] || plan.isInstanceOf[SupportsSubquery]
}

private def transformSubqueries(
plan: LogicalPlan,
rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
import org.apache.spark.sql.delta.implicits._

plan transformAllExpressionsUp {
case subquery: SubqueryExpression =>
subquery.withNewPlan(transformSubqueryExpressions(subquery.plan)(rule))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ object TableFeature {
IdentityColumnsTableFeature,
GeneratedColumnsTableFeature,
InvariantsTableFeature,
ColumnMappingTableFeature)
ColumnMappingTableFeature,
DeletionVectorsTableFeature)
if (DeltaUtils.isTesting) {
features ++= Set(
TestLegacyWriterFeature,
Expand Down Expand Up @@ -295,6 +296,16 @@ object IdentityColumnsTableFeature
}
}

object DeletionVectorsTableFeature
extends ReaderWriterFeature(name = "deletionVectors")
with FeatureAutomaticallyEnabledByMetadata {
override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = {
DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.fromMetaData(metadata)
}
}

/**
* Features below are for testing only, and are being registered to the system only in the testing
* environment. See [[TableFeature.allSupportedFeaturesMap]] for the registration.
Expand Down
Loading