Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
longvu-db committed Apr 15, 2024
1 parent e1063a1 commit 8c988ab
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package org.apache.spark.sql.delta

import java.net.URI

import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

Expand All @@ -35,13 +33,14 @@ import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.FileSourceConstantMetadataStructField
import org.apache.spark.sql.execution.datasources.OutputWriterFactory
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{ByteType, LongType, MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.types.{ByteType, IntegerType, LongType, MetadataBuilder, StringType, StructField, StructType}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchRow, ColumnVector}
import org.apache.spark.util.SerializableConfiguration

Expand All @@ -59,11 +58,10 @@ case class DeltaParquetFileFormat(
isSplittable: Boolean = true,
disablePushDowns: Boolean = false,
tablePath: Option[String] = None,
broadcastDvMap: Option[Broadcast[Map[URI, DeletionVectorDescriptorWithFilterType]]] = None,
broadcastHadoopConf: Option[Broadcast[SerializableConfiguration]] = None)
extends ParquetFileFormat {
// Validate either we have all arguments for DV enabled read or none of them.
if (hasDeletionVectorMap) {
if (hasBroadcastHadoopConf) {
require(tablePath.isDefined && !isSplittable && disablePushDowns,
"Wrong arguments for Delta table scan with deletion vectors")
}
Expand Down Expand Up @@ -104,7 +102,7 @@ case class DeltaParquetFileFormat(
override def isSplitable(
sparkSession: SparkSession, options: Map[String, String], path: Path): Boolean = isSplittable

def hasDeletionVectorMap: Boolean = broadcastDvMap.isDefined && broadcastHadoopConf.isDefined
def hasBroadcastHadoopConf: Boolean = broadcastHadoopConf.isDefined

/**
* We sometimes need to replace FileFormat within LogicalPlans, so we have to override
Expand Down Expand Up @@ -164,7 +162,7 @@ case class DeltaParquetFileFormat(
require(disablePushDowns, "Cannot generate row index related metadata with filter pushdown")
}

if (hasDeletionVectorMap && isRowDeletedColumn.isEmpty) {
if (hasBroadcastHadoopConf && isRowDeletedColumn.isEmpty) {
throw new IllegalArgumentException(
s"Expected a column $IS_ROW_DELETED_COLUMN_NAME in the schema")
}
Expand Down Expand Up @@ -214,7 +212,7 @@ case class DeltaParquetFileFormat(
// we can then use less rows per rowgroup. Also, 2b+ rows in a single rowgroup is
// not a common use case.
super.metadataSchemaFields ++ rowTrackingFields
}
} ++ Some(FILE_ROW_INDEX_FILTER_ID_ENCODED_FIELD) ++ Some(FILE_ROW_INDEX_FILTER_TYPE_FIELD)
}

override def prepareWrite(
Expand Down Expand Up @@ -253,20 +251,36 @@ case class DeltaParquetFileFormat(
s"for file '${file.filePath}'")
})
}
val extractRowIndexFilterIdEncoded: PartitionedFile => Any = { file =>
file.otherConstantMetadataColumnValues
.getOrElse(DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED, {
throw new IllegalStateException(
s"Missing ${DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED} value " +
s"for file '${file.filePath}'")
})
}
val extractRowIndexFilterType: PartitionedFile => Any = { file =>
file.otherConstantMetadataColumnValues
.getOrElse(DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE, {
throw new IllegalStateException(
s"Missing ${DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE} value " +
s"for file '${file.filePath}'")
})
}
super.fileConstantMetadataExtractors
.updated(RowId.BASE_ROW_ID, extractBaseRowId)
.updated(DefaultRowCommitVersion.METADATA_STRUCT_FIELD_NAME, extractDefaultRowCommitVersion)
.updated(FILE_ROW_INDEX_FILTER_ID_ENCODED, extractRowIndexFilterIdEncoded)
.updated(FILE_ROW_INDEX_FILTER_TYPE, extractRowIndexFilterType)
}

def copyWithDVInfo(
tablePath: String,
broadcastDvMap: Broadcast[Map[URI, DeletionVectorDescriptorWithFilterType]],
broadcastHadoopConf: Broadcast[SerializableConfiguration]): DeltaParquetFileFormat = {
this.copy(
isSplittable = false,
disablePushDowns = true,
tablePath = Some(tablePath),
broadcastDvMap = Some(broadcastDvMap),
broadcastHadoopConf = Some(broadcastHadoopConf))
}

Expand All @@ -283,27 +297,31 @@ case class DeltaParquetFileFormat(
isRowDeletedColumn: Option[ColumnMetadata],
rowIndexColumn: Option[ColumnMetadata],
useOffHeapBuffers: Boolean): Iterator[Object] = {
val pathUri = partitionedFile.pathUri

val rowIndexFilter = isRowDeletedColumn.map { col =>
// Fetch the DV descriptor from the broadcast map and create a row index filter
broadcastDvMap.get.value
.get(pathUri)
.map { case DeletionVectorDescriptorWithFilterType(dvDescriptor, filterType) =>
filterType match {
case i if i == RowIndexFilterType.IF_CONTAINED =>
DropMarkedRowsFilter.createInstance(
dvDescriptor,
broadcastHadoopConf.get.value.value,
tablePath.map(new Path(_)))
case i if i == RowIndexFilterType.IF_NOT_CONTAINED =>
KeepMarkedRowsFilter.createInstance(
dvDescriptor,
broadcastHadoopConf.get.value.value,
tablePath.map(new Path(_)))
}
val dvDescriptorOpt = partitionedFile.otherConstantMetadataColumnValues
.get(FILE_ROW_INDEX_FILTER_ID_ENCODED)
val filterTypeOpt = partitionedFile.otherConstantMetadataColumnValues
.get(FILE_ROW_INDEX_FILTER_TYPE)
if (dvDescriptorOpt.isDefined && filterTypeOpt.isDefined) {
val rowIndexFilter = filterTypeOpt.get match {
case RowIndexFilterType.IF_CONTAINED => DropMarkedRowsFilter
case RowIndexFilterType.IF_NOT_CONTAINED => KeepMarkedRowsFilter
case unexpectedFilterType => throw new IllegalStateException(
s"Unexpected row index filter type: ${unexpectedFilterType}")
}
.getOrElse(KeepAllRowsFilter)
rowIndexFilter.createInstance(
DeletionVectorDescriptor.fromJson(dvDescriptorOpt.get.asInstanceOf[String]),
broadcastHadoopConf.get.value.value,
tablePath.map(new Path(_)))
} else if (dvDescriptorOpt.isDefined || filterTypeOpt.isDefined) {
throw new IllegalStateException(
s"Both ${FILE_ROW_INDEX_FILTER_ID_ENCODED} and ${FILE_ROW_INDEX_FILTER_TYPE} " +
"should both have a value or not have a value together")
} else {
KeepAllRowsFilter
}
}

val metadataColumns = Seq(isRowDeletedColumn, rowIndexColumn).filter(_.nonEmpty).map(_.get)
Expand Down Expand Up @@ -399,6 +417,18 @@ object DeltaParquetFileFormat {
val ROW_INDEX_COLUMN_NAME = "__delta_internal_row_index"
val ROW_INDEX_STRUCT_FIELD = StructField(ROW_INDEX_COLUMN_NAME, LongType)

/** The name of the metadata column that contains the encoded row index filter identifier. */
val FILE_ROW_INDEX_FILTER_ID_ENCODED = "row_index_filter_id_encoded"
/** The encoded row index filter identifier column. */
val FILE_ROW_INDEX_FILTER_ID_ENCODED_FIELD = FileSourceConstantMetadataStructField(
FILE_ROW_INDEX_FILTER_ID_ENCODED, StringType)

/** The name of the metadata column that contains the row index filter type. */
val FILE_ROW_INDEX_FILTER_TYPE = "row_index_filter_type"
/** The row index filter type column. */
val FILE_ROW_INDEX_FILTER_TYPE_FIELD = FileSourceConstantMetadataStructField(
FILE_ROW_INDEX_FILTER_TYPE, IntegerType)

/** Utility method to create a new writable vector */
private def newVector(
useOffHeapBuffers: Boolean, size: Int, dataType: StructField): WritableColumnVector = {
Expand Down Expand Up @@ -465,10 +495,4 @@ object DeltaParquetFileFormat {

/** Helper class to encapsulate column info */
case class ColumnMetadata(index: Int, structField: StructField)

/** Helper class that encapsulate an [[RowIndexFilterType]]. */
case class DeletionVectorDescriptorWithFilterType(
descriptor: DeletionVectorDescriptor,
filterType: RowIndexFilterType) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ import org.apache.spark.util.SerializableConfiguration
* to generate the row index. This is a cost we need to pay until we upgrade to latest
* Apache Spark which contains Parquet reader changes that automatically generate the
* row_index irrespective of the file splitting and filter pushdowns.
* - The scan created also contains a broadcast variable of Parquet File -> DV File map.
* The Parquet reader created uses this map to find the DV file corresponding to the data file.
* - Filter created filters out rows with __skip_row equals to 0
* - And at the end we have a Project to keep the plan node output same as before the rule is
* applied.
Expand Down Expand Up @@ -93,20 +91,22 @@ object ScanWithDeletionVectors {

// See if the relation is already modified to include DV reads as part of
// a previous invocation of this rule on this table
if (fileFormat.hasDeletionVectorMap) return None
if (fileFormat.hasBroadcastHadoopConf) return None

// See if any files actually have a DV
val spark = SparkSession.getActiveSession.get
val filePathToDVBroadcastMap = createBroadcastDVMap(spark, index)
if (filePathToDVBroadcastMap.value.isEmpty) return None
val filesWithDVs = index
.matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral))
.filter(_.deletionVector != null)
if (filesWithDVs.isEmpty) return None

// Get the list of columns in the output of the `LogicalRelation` we are
// trying to modify. At the end of the plan, we need to return a
// `LogicalRelation` that has the same output as this `LogicalRelation`
val planOutput = scan.output

val newScan = createScanWithSkipRowColumn(
spark, scan, fileFormat, index, filePathToDVBroadcastMap, hadoopRelation)
spark, scan, fileFormat, index, hadoopRelation)

// On top of the scan add a filter that filters out the rows which have
// skip row column value non-zero
Expand All @@ -125,7 +125,6 @@ object ScanWithDeletionVectors {
inputScan: LogicalRelation,
fileFormat: DeltaParquetFileFormat,
tahoeFileIndex: TahoeFileIndex,
filePathToDVBroadcastMap: Broadcast[Map[URI, DeletionVectorDescriptorWithFilterType]],
hadoopFsRelation: HadoopFsRelation): LogicalRelation = {
// Create a new `LogicalRelation` that has modified `DeltaFileFormat` and output with an extra
// column to indicate whether to skip the row or not
Expand All @@ -145,7 +144,7 @@ object ScanWithDeletionVectors {
new SerializableConfiguration(tahoeFileIndex.deltaLog.newDeltaHadoopConf()))

val newFileFormat = fileFormat.copyWithDVInfo(
tahoeFileIndex.path.toString, filePathToDVBroadcastMap, hadoopConfBroadcast)
tahoeFileIndex.path.toString, hadoopConfBroadcast)
val newRelation = hadoopFsRelation.copy(
fileFormat = newFileFormat,
dataSchema = newDataSchema)(hadoopFsRelation.sparkSession)
Expand All @@ -166,33 +165,4 @@ object ScanWithDeletionVectors {
val filterExp = keepRow(new Column(skipRowColumnRef)).expr
Filter(filterExp, newScan)
}

private def createBroadcastDVMap(
spark: SparkSession,
tahoeFileIndex: TahoeFileIndex)
: Broadcast[Map[URI, DeletionVectorDescriptorWithFilterType]] = {
val filterTypes = tahoeFileIndex.rowIndexFilters.getOrElse(Map.empty)

// Given there is no way to find the final filters, just select all files in the
// file index and create the DV map.
val filesWithDVs = tahoeFileIndex
.matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral))
.filter(_.deletionVector != null)
// Attach filter types to FileActions, so that later [[DeltaParquetFileFormat]] could pick it up
// to decide which kind of rows should be filtered out. This info is necessary for reading CDC
// rows that have been deleted (marked in DV), in which case marked rows must be kept rather
// than filtered out. In such a case, the `filterTypes` map will be populated by [[CDCReader]]
// to indicate IF_NOT_CONTAINED filter should be used. In other cases, `filterTypes` will be
// empty, so we generate IF_CONTAINED as the default DV behavior.
val filePathToDVMap = filesWithDVs.map { addFile =>
val key = absolutePath(tahoeFileIndex.path.toString, addFile.path).toUri
val filterType =
filterTypes.getOrElse(addFile.path, RowIndexFilterType.IF_CONTAINED)
val value =
DeletionVectorDescriptorWithFilterType(addFile.deletionVector, filterType)
key -> value
}.toMap

spark.sparkContext.broadcast(filePathToDVMap)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import java.util.Objects

import scala.collection.mutable
import org.apache.spark.sql.delta.RowIndexFilterType
import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaErrors, DeltaLog, NoMapping, Snapshot, SnapshotDescriptor}
import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaErrors, DeltaLog, DeltaParquetFileFormat, Snapshot, SnapshotDescriptor}
import org.apache.spark.sql.delta.DefaultRowCommitVersion
import org.apache.spark.sql.delta.RowId
import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol}
import org.apache.spark.sql.delta.implicits._
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.Path

Expand Down Expand Up @@ -126,6 +127,19 @@ abstract class TahoeFileIndex(
addFile.defaultRowCommitVersion.foreach(defaultRowCommitVersion =>
metadata.put(DefaultRowCommitVersion.METADATA_STRUCT_FIELD_NAME, defaultRowCommitVersion))

// Attach filter types to FileActions, so that later [[DeltaParquetFileFormat]] could pick it up
// to decide which kind of rows should be filtered out. This info is necessary for reading CDC
// rows that have been deleted (marked in DV), in which case marked rows must be kept rather
// than filtered out. In such a case, the `filterTypes` map will be populated by [[CDCReader]]
// to indicate IF_NOT_CONTAINED filter should be used. In other cases, `filterTypes` will be
// empty, so we generate IF_CONTAINED as the default DV behavior
if (addFile.deletionVector != null) {
metadata.put(DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED,
JsonUtils.toJson(addFile.deletionVector))
val filterType = rowIndexFilters.getOrElse(Map.empty)
.getOrElse(addFile.path, RowIndexFilterType.IF_CONTAINED)
metadata.put(DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE, filterType)
}
FileStatusWithMetadata(fs, metadata.toMap)
}

Expand Down
Loading

0 comments on commit 8c988ab

Please sign in to comment.