From be7183bef85feaebfc928d5f291c5a90246cde87 Mon Sep 17 00:00:00 2001 From: Thang Long Vu <107926660+longvu-db@users.noreply.github.com> Date: Tue, 23 Apr 2024 17:09:09 +0200 Subject: [PATCH] [Spark] DV Reads Performance Improvement in Delta by removing Broadcasting DV Information (#2888) #### Which Delta project/connector is this regarding? - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Back then, we relied on an [expensive Broadcast of DV files](https://github.com/delta-io/delta/pull/1542) to pass the DV files to the associated Parquet Files. With the introduction of [adding custom metadata to files](https://github.com/apache/spark/pull/40677) introduced in Spark 3.5, we can now pass the DV through the custom metadata field, this is expected to improve the performance of DV reads in Delta. ## How was this patch tested? Adjusted the existing UTs that cover our changes. ## Does this PR introduce _any_ user-facing changes? No. --- .../sql/delta/DeltaParquetFileFormat.scala | 89 ++++---- .../sql/delta/PreprocessTableWithDVs.scala | 62 +---- .../sql/delta/files/TahoeFileIndex.scala | 14 +- .../delta/DeltaParquetFileFormatSuite.scala | 214 ++++++++---------- 4 files changed, 167 insertions(+), 212 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala index f97c2ec0d7e..a4a27d54c97 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala @@ -16,8 +16,6 @@ package org.apache.spark.sql.delta -import java.net.URI - import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal @@ -32,9 +30,9 @@ import org.apache.hadoop.mapreduce.Job import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.parquet.hadoop.util.ContextUtil -import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.FileSourceConstantMetadataStructField import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath import org.apache.spark.sql.execution.datasources.OutputWriterFactory import org.apache.spark.sql.execution.datasources.PartitionedFile @@ -42,7 +40,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.{ByteType, LongType, MetadataBuilder, StructField, StructType} +import org.apache.spark.sql.types.{ByteType, LongType, MetadataBuilder, StringType, StructField, StructType} import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchRow, ColumnVector} import org.apache.spark.util.SerializableConfiguration @@ -59,13 +57,11 @@ case class DeltaParquetFileFormat( nullableRowTrackingFields: Boolean = false, isSplittable: Boolean = true, disablePushDowns: Boolean = false, - tablePath: Option[String] = None, - broadcastDvMap: Option[Broadcast[Map[URI, DeletionVectorDescriptorWithFilterType]]] = None, - broadcastHadoopConf: Option[Broadcast[SerializableConfiguration]] = None) + tablePath: Option[String] = None) extends ParquetFileFormat { // Validate either we have all arguments for DV enabled read or none of them. - if (hasDeletionVectorMap) { - require(tablePath.isDefined && !isSplittable && disablePushDowns, + if (hasTablePath) { + require(!isSplittable && disablePushDowns, "Wrong arguments for Delta table scan with deletion vectors") } @@ -124,7 +120,7 @@ case class DeltaParquetFileFormat( override def isSplitable( sparkSession: SparkSession, options: Map[String, String], path: Path): Boolean = isSplittable - def hasDeletionVectorMap: Boolean = broadcastDvMap.isDefined && broadcastHadoopConf.isDefined + def hasTablePath: Boolean = tablePath.isDefined /** * We sometimes need to replace FileFormat within LogicalPlans, so we have to override @@ -182,11 +178,13 @@ case class DeltaParquetFileFormat( require(disablePushDowns, "Cannot generate row index related metadata with filter pushdown") } - if (hasDeletionVectorMap && isRowDeletedColumn.isEmpty) { + if (hasTablePath && isRowDeletedColumn.isEmpty) { throw new IllegalArgumentException( s"Expected a column $IS_ROW_DELETED_COLUMN_NAME in the schema") } + val serializableHadoopConf = new SerializableConfiguration(hadoopConf) + val useOffHeapBuffers = sparkSession.sessionState.conf.offHeapColumnVectorEnabled (partitionedFile: PartitionedFile) => { val rowIteratorFromParquet = parquetDataReader(partitionedFile) @@ -196,8 +194,9 @@ case class DeltaParquetFileFormat( partitionedFile, rowIteratorFromParquet, isRowDeletedColumn, - useOffHeapBuffers = useOffHeapBuffers, - rowIndexColumn = rowIndexColumn) + rowIndexColumn, + useOffHeapBuffers, + serializableHadoopConf) iterToReturn.asInstanceOf[Iterator[InternalRow]] } catch { case NonFatal(e) => @@ -276,16 +275,11 @@ case class DeltaParquetFileFormat( .updated(DefaultRowCommitVersion.METADATA_STRUCT_FIELD_NAME, extractDefaultRowCommitVersion) } - def copyWithDVInfo( - tablePath: String, - broadcastDvMap: Broadcast[Map[URI, DeletionVectorDescriptorWithFilterType]], - broadcastHadoopConf: Broadcast[SerializableConfiguration]): DeltaParquetFileFormat = { + def disableSplittingAndPushdown(tablePath: String): DeltaParquetFileFormat = { this.copy( isSplittable = false, disablePushDowns = true, - tablePath = Some(tablePath), - broadcastDvMap = Some(broadcastDvMap), - broadcastHadoopConf = Some(broadcastHadoopConf)) + tablePath = Some(tablePath)) } /** @@ -300,28 +294,32 @@ case class DeltaParquetFileFormat( iterator: Iterator[Object], isRowDeletedColumn: Option[ColumnMetadata], rowIndexColumn: Option[ColumnMetadata], - useOffHeapBuffers: Boolean): Iterator[Object] = { - val pathUri = partitionedFile.pathUri - + useOffHeapBuffers: Boolean, + serializableHadoopConf: SerializableConfiguration): Iterator[Object] = { val rowIndexFilter = isRowDeletedColumn.map { col => // Fetch the DV descriptor from the broadcast map and create a row index filter - broadcastDvMap.get.value - .get(pathUri) - .map { case DeletionVectorDescriptorWithFilterType(dvDescriptor, filterType) => - filterType match { - case i if i == RowIndexFilterType.IF_CONTAINED => - DropMarkedRowsFilter.createInstance( - dvDescriptor, - broadcastHadoopConf.get.value.value, - tablePath.map(new Path(_))) - case i if i == RowIndexFilterType.IF_NOT_CONTAINED => - KeepMarkedRowsFilter.createInstance( - dvDescriptor, - broadcastHadoopConf.get.value.value, - tablePath.map(new Path(_))) - } + val dvDescriptorOpt = partitionedFile.otherConstantMetadataColumnValues + .get(FILE_ROW_INDEX_FILTER_ID_ENCODED) + val filterTypeOpt = partitionedFile.otherConstantMetadataColumnValues + .get(FILE_ROW_INDEX_FILTER_TYPE) + if (dvDescriptorOpt.isDefined && filterTypeOpt.isDefined) { + val rowIndexFilter = filterTypeOpt.get match { + case RowIndexFilterType.IF_CONTAINED => DropMarkedRowsFilter + case RowIndexFilterType.IF_NOT_CONTAINED => KeepMarkedRowsFilter + case unexpectedFilterType => throw new IllegalStateException( + s"Unexpected row index filter type: ${unexpectedFilterType}") } - .getOrElse(KeepAllRowsFilter) + rowIndexFilter.createInstance( + DeletionVectorDescriptor.fromJson(dvDescriptorOpt.get.asInstanceOf[String]), + serializableHadoopConf.value, + tablePath.map(new Path(_))) + } else if (dvDescriptorOpt.isDefined || filterTypeOpt.isDefined) { + throw new IllegalStateException( + s"Both ${FILE_ROW_INDEX_FILTER_ID_ENCODED} and ${FILE_ROW_INDEX_FILTER_TYPE} " + + "should either both have values or no values at all.") + } else { + KeepAllRowsFilter + } } val metadataColumns = Seq(isRowDeletedColumn, rowIndexColumn).filter(_.nonEmpty).map(_.get) @@ -417,6 +415,14 @@ object DeltaParquetFileFormat { val ROW_INDEX_COLUMN_NAME = "__delta_internal_row_index" val ROW_INDEX_STRUCT_FIELD = StructField(ROW_INDEX_COLUMN_NAME, LongType) + /** The key to the encoded row index filter identifier value of the + * [[PartitionedFile]]'s otherConstantMetadataColumnValues map. */ + val FILE_ROW_INDEX_FILTER_ID_ENCODED = "row_index_filter_id_encoded" + + /** The key to the row index filter type value of the + * [[PartitionedFile]]'s otherConstantMetadataColumnValues map. */ + val FILE_ROW_INDEX_FILTER_TYPE = "row_index_filter_type" + /** Utility method to create a new writable vector */ private def newVector( useOffHeapBuffers: Boolean, size: Int, dataType: StructField): WritableColumnVector = { @@ -484,11 +490,6 @@ object DeltaParquetFileFormat { /** Helper class to encapsulate column info */ case class ColumnMetadata(index: Int, structField: StructField) - /** Helper class that encapsulate an [[RowIndexFilterType]]. */ - case class DeletionVectorDescriptorWithFilterType( - descriptor: DeletionVectorDescriptor, - filterType: RowIndexFilterType) - /** * Translates the filter to use physical column names instead of logical column names. * This is needed when the column mapping mode is set to `NameMapping` or `IdMapping` diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala index 55e40c7ac5f..6b5d0602a6e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala @@ -16,23 +16,16 @@ package org.apache.spark.sql.delta -import java.net.URI - -import org.apache.spark.sql.delta.{RowIndexFilter, RowIndexFilterType} +import org.apache.spark.sql.delta.RowIndexFilter import org.apache.spark.sql.delta.DeltaParquetFileFormat._ import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex} -import org.apache.spark.sql.delta.util.DeltaFileOperations.absolutePath -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.sql.{Column, SparkSession} +import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} -import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.SerializableConfiguration /** * Plan transformer to inject a filter that removes the rows marked as deleted according to @@ -54,8 +47,6 @@ import org.apache.spark.util.SerializableConfiguration * to generate the row index. This is a cost we need to pay until we upgrade to latest * Apache Spark which contains Parquet reader changes that automatically generate the * row_index irrespective of the file splitting and filter pushdowns. - * - The scan created also contains a broadcast variable of Parquet File -> DV File map. - * The Parquet reader created uses this map to find the DV file corresponding to the data file. * - Filter created filters out rows with __skip_row equals to 0 * - And at the end we have a Project to keep the plan node output same as before the rule is * applied. @@ -93,20 +84,20 @@ object ScanWithDeletionVectors { // See if the relation is already modified to include DV reads as part of // a previous invocation of this rule on this table - if (fileFormat.hasDeletionVectorMap) return None + if (fileFormat.hasTablePath) return None // See if any files actually have a DV - val spark = SparkSession.getActiveSession.get - val filePathToDVBroadcastMap = createBroadcastDVMap(spark, index) - if (filePathToDVBroadcastMap.value.isEmpty) return None + val filesWithDVs = index + .matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral)) + .filter(_.deletionVector != null) + if (filesWithDVs.isEmpty) return None // Get the list of columns in the output of the `LogicalRelation` we are // trying to modify. At the end of the plan, we need to return a // `LogicalRelation` that has the same output as this `LogicalRelation` val planOutput = scan.output - val newScan = createScanWithSkipRowColumn( - spark, scan, fileFormat, index, filePathToDVBroadcastMap, hadoopRelation) + val newScan = createScanWithSkipRowColumn(scan, fileFormat, index, hadoopRelation) // On top of the scan add a filter that filters out the rows which have // skip row column value non-zero @@ -121,11 +112,9 @@ object ScanWithDeletionVectors { * an extra column which indicates whether the row needs to be skipped or not. */ private def createScanWithSkipRowColumn( - spark: SparkSession, inputScan: LogicalRelation, fileFormat: DeltaParquetFileFormat, tahoeFileIndex: TahoeFileIndex, - filePathToDVBroadcastMap: Broadcast[Map[URI, DeletionVectorDescriptorWithFilterType]], hadoopFsRelation: HadoopFsRelation): LogicalRelation = { // Create a new `LogicalRelation` that has modified `DeltaFileFormat` and output with an extra // column to indicate whether to skip the row or not @@ -141,11 +130,7 @@ object ScanWithDeletionVectors { // operator after the data is read from the underlying file reader. val newDataSchema = hadoopFsRelation.dataSchema.add(skipRowField) - val hadoopConfBroadcast = spark.sparkContext.broadcast( - new SerializableConfiguration(tahoeFileIndex.deltaLog.newDeltaHadoopConf())) - - val newFileFormat = fileFormat.copyWithDVInfo( - tahoeFileIndex.path.toString, filePathToDVBroadcastMap, hadoopConfBroadcast) + val newFileFormat = fileFormat.disableSplittingAndPushdown(tahoeFileIndex.path.toString) val newRelation = hadoopFsRelation.copy( fileFormat = newFileFormat, dataSchema = newDataSchema)(hadoopFsRelation.sparkSession) @@ -166,33 +151,4 @@ object ScanWithDeletionVectors { val filterExp = keepRow(new Column(skipRowColumnRef)).expr Filter(filterExp, newScan) } - - private def createBroadcastDVMap( - spark: SparkSession, - tahoeFileIndex: TahoeFileIndex) - : Broadcast[Map[URI, DeletionVectorDescriptorWithFilterType]] = { - val filterTypes = tahoeFileIndex.rowIndexFilters.getOrElse(Map.empty) - - // Given there is no way to find the final filters, just select all files in the - // file index and create the DV map. - val filesWithDVs = tahoeFileIndex - .matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral)) - .filter(_.deletionVector != null) - // Attach filter types to FileActions, so that later [[DeltaParquetFileFormat]] could pick it up - // to decide which kind of rows should be filtered out. This info is necessary for reading CDC - // rows that have been deleted (marked in DV), in which case marked rows must be kept rather - // than filtered out. In such a case, the `filterTypes` map will be populated by [[CDCReader]] - // to indicate IF_NOT_CONTAINED filter should be used. In other cases, `filterTypes` will be - // empty, so we generate IF_CONTAINED as the default DV behavior. - val filePathToDVMap = filesWithDVs.map { addFile => - val key = absolutePath(tahoeFileIndex.path.toString, addFile.path).toUri - val filterType = - filterTypes.getOrElse(addFile.path, RowIndexFilterType.IF_CONTAINED) - val value = - DeletionVectorDescriptorWithFilterType(addFile.deletionVector, filterType) - key -> value - }.toMap - - spark.sparkContext.broadcast(filePathToDVMap) - } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeFileIndex.scala b/spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeFileIndex.scala index 827b0f110af..aafd4e8b9ff 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeFileIndex.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeFileIndex.scala @@ -22,13 +22,14 @@ import java.util.Objects import scala.collection.mutable import org.apache.spark.sql.delta.RowIndexFilterType -import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaErrors, DeltaLog, NoMapping, Snapshot, SnapshotDescriptor} +import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaErrors, DeltaLog, DeltaParquetFileFormat, Snapshot, SnapshotDescriptor} import org.apache.spark.sql.delta.DefaultRowCommitVersion import org.apache.spark.sql.delta.RowId import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol} import org.apache.spark.sql.delta.implicits._ import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.util.JsonUtils import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.fs.Path @@ -126,6 +127,17 @@ abstract class TahoeFileIndex( addFile.defaultRowCommitVersion.foreach(defaultRowCommitVersion => metadata.put(DefaultRowCommitVersion.METADATA_STRUCT_FIELD_NAME, defaultRowCommitVersion)) + if (addFile.deletionVector != null) { + metadata.put(DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED, + JsonUtils.toJson(addFile.deletionVector)) + + // Set the filter type to IF_CONTAINED by default to let [[DeltaParquetFileFormat]] filter + // out rows unless a filter type was explicitly provided in rowIndexFilters. This can happen + // e.g. when reading CDC data to keep deleted rows instead of filtering them out. + val filterType = rowIndexFilters.getOrElse(Map.empty) + .getOrElse(addFile.path, RowIndexFilterType.IF_CONTAINED) + metadata.put(DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE, filterType) + } FileStatusWithMetadata(fs, metadata.toMap) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala index 6a90293d223..869073c7588 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala @@ -16,15 +16,11 @@ package org.apache.spark.sql.delta -import org.apache.spark.sql.delta.DeltaParquetFileFormat.DeletionVectorDescriptorWithFilterType import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN -import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor -import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArray, RoaringBitmapArrayFormat} +import org.apache.spark.sql.delta.files.TahoeLogFileIndex import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore -import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore.pathToEscapedString import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaTestImplicits._ -import org.apache.spark.sql.delta.util.PathWithFileSystem import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.parquet.format.converter.ParquetMetadataConverter @@ -33,10 +29,11 @@ import org.apache.parquet.hadoop.ParquetFileReader import org.apache.spark.sql.{DataFrame, Dataset, QueryTest} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.util.{SerializableConfiguration, Utils} class DeltaParquetFileFormatSuite extends QueryTest - with SharedSparkSession with DeltaSQLCommandTest { + with SharedSparkSession + with DeletionVectorsTestUtils + with DeltaSQLCommandTest { import testImplicits._ // Read with deletion vectors has separate code paths based on vectorized Parquet @@ -44,107 +41,112 @@ class DeltaParquetFileFormatSuite extends QueryTest for { readIsRowDeletedCol <- BOOLEAN_DOMAIN readRowIndexCol <- BOOLEAN_DOMAIN - rowIndexFilterType <- Seq(RowIndexFilterType.IF_CONTAINED, RowIndexFilterType.IF_NOT_CONTAINED) - // this isn't need to be tested as it is same as regular reading path without DVs. - if readIsRowDeletedCol || readRowIndexCol + enableDVs <- BOOLEAN_DOMAIN + if (enableDVs && readIsRowDeletedCol) || !enableDVs } { testWithBothParquetReaders( - "read DV metadata columns: " + + s"isDeletionVectorsEnabled=$enableDVs, read DV metadata columns: " + s"with isRowDeletedCol=$readIsRowDeletedCol, " + - s"with rowIndexCol=$readRowIndexCol, " + - s"with rowIndexFilterType=$rowIndexFilterType") { - withTempDir { tempDir => - val tablePath = tempDir.toString - - // Generate a table with one parquet file containing multiple row groups. - generateData(tablePath) - - val deltaLog = DeltaLog.forTable(spark, tempDir) - val metadata = deltaLog.snapshot.metadata + s"with rowIndexCol=$readRowIndexCol") { + withSQLConf(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey -> + enableDVs.toString) { + withTempDir { tempDir => + val tablePath = tempDir.toString + + // Generate a table with one parquet file containing multiple row groups. + generateData(tablePath) + + val deltaLog = DeltaLog.forTable(spark, tempDir) + val metadata = deltaLog.snapshot.metadata + + // Add additional field that has the deleted row flag to existing data schema + var readingSchema = metadata.schema + if (readIsRowDeletedCol) { + readingSchema = readingSchema.add(DeltaParquetFileFormat.IS_ROW_DELETED_STRUCT_FIELD) + } + if (readRowIndexCol) { + readingSchema = readingSchema.add(DeltaParquetFileFormat.ROW_INDEX_STRUCT_FIELD) + } - // Add additional field that has the deleted row flag to existing data schema - var readingSchema = metadata.schema - if (readIsRowDeletedCol) { - readingSchema = readingSchema.add(DeltaParquetFileFormat.IS_ROW_DELETED_STRUCT_FIELD) - } - if (readRowIndexCol) { - readingSchema = readingSchema.add(DeltaParquetFileFormat.ROW_INDEX_STRUCT_FIELD) - } + // Fetch the only file in the DeltaLog snapshot + val addFile = deltaLog.snapshot.allFiles.collect()(0) - // Fetch the only file in the DeltaLog snapshot - val addFile = deltaLog.snapshot.allFiles.collect()(0) - val addFilePath = new Path(tempDir.toString, addFile.path) - assertParquetHasMultipleRowGroups(addFilePath) - - val dv = generateDV(tablePath, 0, 200, 300, 756, 10352, 19999) - - val fs = addFilePath.getFileSystem(hadoopConf) - val broadcastDvMap = spark.sparkContext.broadcast( - Map(fs.getFileStatus(addFilePath).getPath().toUri -> - DeletionVectorDescriptorWithFilterType(dv, rowIndexFilterType)) - ) - - val broadcastHadoopConf = spark.sparkContext.broadcast( - new SerializableConfiguration(hadoopConf)) - - val deltaParquetFormat = new DeltaParquetFileFormat( - deltaLog.snapshot.protocol, - metadata, - nullableRowTrackingFields = false, - isSplittable = false, - disablePushDowns = true, - Some(tablePath), - if (readIsRowDeletedCol) Some(broadcastDvMap) else None, - if (readIsRowDeletedCol) Some(broadcastHadoopConf) else None) - - val fileIndex = DeltaLogFileIndex(deltaParquetFormat, fs, addFilePath :: Nil) - - val relation = HadoopFsRelation( - fileIndex, - fileIndex.partitionSchema, - readingSchema, - bucketSpec = None, - deltaParquetFormat, - options = Map.empty)(spark) - val plan = LogicalRelation(relation) - - if (readIsRowDeletedCol) { - // Select some rows that are deleted and some rows not deleted - // Deleted row `value`: 0, 200, 300, 756, 10352, 19999 - // Not deleted row `value`: 7, 900 - val (deletedColumnValue, notDeletedColumnValue) = rowIndexFilterType match { - case RowIndexFilterType.IF_CONTAINED => (1, 0) - case RowIndexFilterType.IF_NOT_CONTAINED => (0, 1) - case _ => (-1, -1) // Invalid, expecting the test to fail. + if (enableDVs) { + removeRowsFromFile(deltaLog, addFile, Seq(0, 200, 300, 756, 10352, 19999)) } - checkDatasetUnorderly( - Dataset.ofRows(spark, plan) - .filter("value in (0, 7, 200, 300, 756, 900, 10352, 19999)") - .select("value", DeltaParquetFileFormat.IS_ROW_DELETED_COLUMN_NAME) - .as[(Int, Int)], - (0, deletedColumnValue), - (7, notDeletedColumnValue), - (200, deletedColumnValue), - (300, deletedColumnValue), - (756, deletedColumnValue), - (900, notDeletedColumnValue), - (10352, deletedColumnValue), - (19999, deletedColumnValue)) - } - if (readRowIndexCol) { - def rowIndexes(df: DataFrame): Set[Long] = { - val colIndex = if (readIsRowDeletedCol) 2 else 1 - df.collect().map(_.getLong(colIndex)).toSet + val addFilePath = new Path(tempDir.toString, addFile.path) + assertParquetHasMultipleRowGroups(addFilePath) + + val deltaParquetFormat = new DeltaParquetFileFormat( + deltaLog.snapshot.protocol, + metadata, + nullableRowTrackingFields = false, + isSplittable = false, + disablePushDowns = true, + if (enableDVs) Some(tablePath) else None) + + val fileIndex = TahoeLogFileIndex(spark, deltaLog) + + val relation = HadoopFsRelation( + fileIndex, + fileIndex.partitionSchema, + readingSchema, + bucketSpec = None, + deltaParquetFormat, + options = Map.empty)(spark) + val plan = LogicalRelation(relation) + + if (readIsRowDeletedCol) { + val (deletedColumnValue, notDeletedColumnValue) = (1, 0) + if (enableDVs) { + // Select some rows that are deleted and some rows not deleted + // Deleted row `value`: 0, 200, 300, 756, 10352, 19999 + // Not deleted row `value`: 7, 900 + checkDatasetUnorderly( + Dataset.ofRows(spark, plan) + .filter("value in (0, 7, 200, 300, 756, 900, 10352, 19999)") + .select("value", DeltaParquetFileFormat.IS_ROW_DELETED_COLUMN_NAME) + .as[(Int, Int)], + (0, deletedColumnValue), + (7, notDeletedColumnValue), + (200, deletedColumnValue), + (300, deletedColumnValue), + (756, deletedColumnValue), + (900, notDeletedColumnValue), + (10352, deletedColumnValue), + (19999, deletedColumnValue)) + } else { + checkDatasetUnorderly( + Dataset.ofRows(spark, plan) + .filter("value in (0, 7, 200, 300, 756, 900, 10352, 19999)") + .select("value", DeltaParquetFileFormat.IS_ROW_DELETED_COLUMN_NAME) + .as[(Int, Int)], + (0, notDeletedColumnValue), + (7, notDeletedColumnValue), + (200, notDeletedColumnValue), + (300, notDeletedColumnValue), + (756, notDeletedColumnValue), + (900, notDeletedColumnValue), + (10352, notDeletedColumnValue), + (19999, notDeletedColumnValue)) + } } - val df = Dataset.ofRows(spark, plan) - assert(rowIndexes(df) === Seq.range(0, 20000).toSet) + if (readRowIndexCol) { + def rowIndexes(df: DataFrame): Set[Long] = { + val colIndex = if (readIsRowDeletedCol) 2 else 1 + df.collect().map(_.getLong(colIndex)).toSet + } + + val df = Dataset.ofRows(spark, plan) + assert(rowIndexes(df) === Seq.range(0, 20000).toSet) - assert( - rowIndexes( - df.filter("value in (0, 7, 200, 300, 756, 900, 10352, 19999)")) === - Seq(0, 7, 200, 300, 756, 900, 10352, 19999).toSet) + assert( + rowIndexes( + df.filter("value in (0, 7, 200, 300, 756, 900, 10352, 19999)")) === + Seq(0, 7, 200, 300, 756, 900, 10352, 19999).toSet) + } } } } @@ -190,22 +192,6 @@ class DeltaParquetFileFormatSuite extends QueryTest hadoopConf().set("dfs.block.size", (1024 * 20).toString) } - /** Utility method that generates deletion vector based on the given row indexes */ - private def generateDV(tablePath: String, rowIndexes: Long *): DeletionVectorDescriptor = { - val bitmap = RoaringBitmapArray(rowIndexes: _*) - val tableWithFS = PathWithFileSystem.withConf(new Path(tablePath), hadoopConf) - val dvPath = dvStore.generateUniqueNameInTable(tableWithFS) - val serializedBitmap = bitmap.serializeAsByteArray(RoaringBitmapArrayFormat.Portable) - val dvRange = Utils.tryWithResource(dvStore.createWriter(dvPath)) { writer => - writer.write(serializedBitmap) - } - DeletionVectorDescriptor.onDiskWithAbsolutePath( - pathToEscapedString(dvPath.makeQualified().path), - dvRange.length, - rowIndexes.size, - Some(dvRange.offset)) - } - private def assertParquetHasMultipleRowGroups(filePath: Path): Unit = { val parquetMetadata = ParquetFileReader.readFooter( hadoopConf,