From 8c988ab8e453944a289dddfa9c3f86ccec7ad125 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Mon, 15 Apr 2024 15:44:12 +0200 Subject: [PATCH] Update --- .../sql/delta/DeltaParquetFileFormat.scala | 90 ++++++++++++------- .../sql/delta/PreprocessTableWithDVs.scala | 44 ++------- .../sql/delta/files/TahoeFileIndex.scala | 16 +++- .../delta/DeltaParquetFileFormatSuite.scala | 59 ++++-------- 4 files changed, 97 insertions(+), 112 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala index b170d329062..3ad95d20f38 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala @@ -16,8 +16,6 @@ package org.apache.spark.sql.delta -import java.net.URI - import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal @@ -35,13 +33,14 @@ import org.apache.parquet.hadoop.util.ContextUtil import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.FileSourceConstantMetadataStructField import org.apache.spark.sql.execution.datasources.OutputWriterFactory import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.{ByteType, LongType, MetadataBuilder, StructField, StructType} +import org.apache.spark.sql.types.{ByteType, IntegerType, LongType, MetadataBuilder, StringType, StructField, StructType} import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchRow, ColumnVector} import org.apache.spark.util.SerializableConfiguration @@ -59,11 +58,10 @@ case class DeltaParquetFileFormat( isSplittable: Boolean = true, disablePushDowns: Boolean = false, tablePath: Option[String] = None, - broadcastDvMap: Option[Broadcast[Map[URI, DeletionVectorDescriptorWithFilterType]]] = None, broadcastHadoopConf: Option[Broadcast[SerializableConfiguration]] = None) extends ParquetFileFormat { // Validate either we have all arguments for DV enabled read or none of them. - if (hasDeletionVectorMap) { + if (hasBroadcastHadoopConf) { require(tablePath.isDefined && !isSplittable && disablePushDowns, "Wrong arguments for Delta table scan with deletion vectors") } @@ -104,7 +102,7 @@ case class DeltaParquetFileFormat( override def isSplitable( sparkSession: SparkSession, options: Map[String, String], path: Path): Boolean = isSplittable - def hasDeletionVectorMap: Boolean = broadcastDvMap.isDefined && broadcastHadoopConf.isDefined + def hasBroadcastHadoopConf: Boolean = broadcastHadoopConf.isDefined /** * We sometimes need to replace FileFormat within LogicalPlans, so we have to override @@ -164,7 +162,7 @@ case class DeltaParquetFileFormat( require(disablePushDowns, "Cannot generate row index related metadata with filter pushdown") } - if (hasDeletionVectorMap && isRowDeletedColumn.isEmpty) { + if (hasBroadcastHadoopConf && isRowDeletedColumn.isEmpty) { throw new IllegalArgumentException( s"Expected a column $IS_ROW_DELETED_COLUMN_NAME in the schema") } @@ -214,7 +212,7 @@ case class DeltaParquetFileFormat( // we can then use less rows per rowgroup. Also, 2b+ rows in a single rowgroup is // not a common use case. super.metadataSchemaFields ++ rowTrackingFields - } + } ++ Some(FILE_ROW_INDEX_FILTER_ID_ENCODED_FIELD) ++ Some(FILE_ROW_INDEX_FILTER_TYPE_FIELD) } override def prepareWrite( @@ -253,20 +251,36 @@ case class DeltaParquetFileFormat( s"for file '${file.filePath}'") }) } + val extractRowIndexFilterIdEncoded: PartitionedFile => Any = { file => + file.otherConstantMetadataColumnValues + .getOrElse(DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED, { + throw new IllegalStateException( + s"Missing ${DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED} value " + + s"for file '${file.filePath}'") + }) + } + val extractRowIndexFilterType: PartitionedFile => Any = { file => + file.otherConstantMetadataColumnValues + .getOrElse(DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE, { + throw new IllegalStateException( + s"Missing ${DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE} value " + + s"for file '${file.filePath}'") + }) + } super.fileConstantMetadataExtractors .updated(RowId.BASE_ROW_ID, extractBaseRowId) .updated(DefaultRowCommitVersion.METADATA_STRUCT_FIELD_NAME, extractDefaultRowCommitVersion) + .updated(FILE_ROW_INDEX_FILTER_ID_ENCODED, extractRowIndexFilterIdEncoded) + .updated(FILE_ROW_INDEX_FILTER_TYPE, extractRowIndexFilterType) } def copyWithDVInfo( tablePath: String, - broadcastDvMap: Broadcast[Map[URI, DeletionVectorDescriptorWithFilterType]], broadcastHadoopConf: Broadcast[SerializableConfiguration]): DeltaParquetFileFormat = { this.copy( isSplittable = false, disablePushDowns = true, tablePath = Some(tablePath), - broadcastDvMap = Some(broadcastDvMap), broadcastHadoopConf = Some(broadcastHadoopConf)) } @@ -283,27 +297,31 @@ case class DeltaParquetFileFormat( isRowDeletedColumn: Option[ColumnMetadata], rowIndexColumn: Option[ColumnMetadata], useOffHeapBuffers: Boolean): Iterator[Object] = { - val pathUri = partitionedFile.pathUri val rowIndexFilter = isRowDeletedColumn.map { col => // Fetch the DV descriptor from the broadcast map and create a row index filter - broadcastDvMap.get.value - .get(pathUri) - .map { case DeletionVectorDescriptorWithFilterType(dvDescriptor, filterType) => - filterType match { - case i if i == RowIndexFilterType.IF_CONTAINED => - DropMarkedRowsFilter.createInstance( - dvDescriptor, - broadcastHadoopConf.get.value.value, - tablePath.map(new Path(_))) - case i if i == RowIndexFilterType.IF_NOT_CONTAINED => - KeepMarkedRowsFilter.createInstance( - dvDescriptor, - broadcastHadoopConf.get.value.value, - tablePath.map(new Path(_))) - } + val dvDescriptorOpt = partitionedFile.otherConstantMetadataColumnValues + .get(FILE_ROW_INDEX_FILTER_ID_ENCODED) + val filterTypeOpt = partitionedFile.otherConstantMetadataColumnValues + .get(FILE_ROW_INDEX_FILTER_TYPE) + if (dvDescriptorOpt.isDefined && filterTypeOpt.isDefined) { + val rowIndexFilter = filterTypeOpt.get match { + case RowIndexFilterType.IF_CONTAINED => DropMarkedRowsFilter + case RowIndexFilterType.IF_NOT_CONTAINED => KeepMarkedRowsFilter + case unexpectedFilterType => throw new IllegalStateException( + s"Unexpected row index filter type: ${unexpectedFilterType}") } - .getOrElse(KeepAllRowsFilter) + rowIndexFilter.createInstance( + DeletionVectorDescriptor.fromJson(dvDescriptorOpt.get.asInstanceOf[String]), + broadcastHadoopConf.get.value.value, + tablePath.map(new Path(_))) + } else if (dvDescriptorOpt.isDefined || filterTypeOpt.isDefined) { + throw new IllegalStateException( + s"Both ${FILE_ROW_INDEX_FILTER_ID_ENCODED} and ${FILE_ROW_INDEX_FILTER_TYPE} " + + "should both have a value or not have a value together") + } else { + KeepAllRowsFilter + } } val metadataColumns = Seq(isRowDeletedColumn, rowIndexColumn).filter(_.nonEmpty).map(_.get) @@ -399,6 +417,18 @@ object DeltaParquetFileFormat { val ROW_INDEX_COLUMN_NAME = "__delta_internal_row_index" val ROW_INDEX_STRUCT_FIELD = StructField(ROW_INDEX_COLUMN_NAME, LongType) + /** The name of the metadata column that contains the encoded row index filter identifier. */ + val FILE_ROW_INDEX_FILTER_ID_ENCODED = "row_index_filter_id_encoded" + /** The encoded row index filter identifier column. */ + val FILE_ROW_INDEX_FILTER_ID_ENCODED_FIELD = FileSourceConstantMetadataStructField( + FILE_ROW_INDEX_FILTER_ID_ENCODED, StringType) + + /** The name of the metadata column that contains the row index filter type. */ + val FILE_ROW_INDEX_FILTER_TYPE = "row_index_filter_type" + /** The row index filter type column. */ + val FILE_ROW_INDEX_FILTER_TYPE_FIELD = FileSourceConstantMetadataStructField( + FILE_ROW_INDEX_FILTER_TYPE, IntegerType) + /** Utility method to create a new writable vector */ private def newVector( useOffHeapBuffers: Boolean, size: Int, dataType: StructField): WritableColumnVector = { @@ -465,10 +495,4 @@ object DeltaParquetFileFormat { /** Helper class to encapsulate column info */ case class ColumnMetadata(index: Int, structField: StructField) - - /** Helper class that encapsulate an [[RowIndexFilterType]]. */ - case class DeletionVectorDescriptorWithFilterType( - descriptor: DeletionVectorDescriptor, - filterType: RowIndexFilterType) { - } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala index 55e40c7ac5f..6cac372821e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala @@ -54,8 +54,6 @@ import org.apache.spark.util.SerializableConfiguration * to generate the row index. This is a cost we need to pay until we upgrade to latest * Apache Spark which contains Parquet reader changes that automatically generate the * row_index irrespective of the file splitting and filter pushdowns. - * - The scan created also contains a broadcast variable of Parquet File -> DV File map. - * The Parquet reader created uses this map to find the DV file corresponding to the data file. * - Filter created filters out rows with __skip_row equals to 0 * - And at the end we have a Project to keep the plan node output same as before the rule is * applied. @@ -93,12 +91,14 @@ object ScanWithDeletionVectors { // See if the relation is already modified to include DV reads as part of // a previous invocation of this rule on this table - if (fileFormat.hasDeletionVectorMap) return None + if (fileFormat.hasBroadcastHadoopConf) return None // See if any files actually have a DV val spark = SparkSession.getActiveSession.get - val filePathToDVBroadcastMap = createBroadcastDVMap(spark, index) - if (filePathToDVBroadcastMap.value.isEmpty) return None + val filesWithDVs = index + .matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral)) + .filter(_.deletionVector != null) + if (filesWithDVs.isEmpty) return None // Get the list of columns in the output of the `LogicalRelation` we are // trying to modify. At the end of the plan, we need to return a @@ -106,7 +106,7 @@ object ScanWithDeletionVectors { val planOutput = scan.output val newScan = createScanWithSkipRowColumn( - spark, scan, fileFormat, index, filePathToDVBroadcastMap, hadoopRelation) + spark, scan, fileFormat, index, hadoopRelation) // On top of the scan add a filter that filters out the rows which have // skip row column value non-zero @@ -125,7 +125,6 @@ object ScanWithDeletionVectors { inputScan: LogicalRelation, fileFormat: DeltaParquetFileFormat, tahoeFileIndex: TahoeFileIndex, - filePathToDVBroadcastMap: Broadcast[Map[URI, DeletionVectorDescriptorWithFilterType]], hadoopFsRelation: HadoopFsRelation): LogicalRelation = { // Create a new `LogicalRelation` that has modified `DeltaFileFormat` and output with an extra // column to indicate whether to skip the row or not @@ -145,7 +144,7 @@ object ScanWithDeletionVectors { new SerializableConfiguration(tahoeFileIndex.deltaLog.newDeltaHadoopConf())) val newFileFormat = fileFormat.copyWithDVInfo( - tahoeFileIndex.path.toString, filePathToDVBroadcastMap, hadoopConfBroadcast) + tahoeFileIndex.path.toString, hadoopConfBroadcast) val newRelation = hadoopFsRelation.copy( fileFormat = newFileFormat, dataSchema = newDataSchema)(hadoopFsRelation.sparkSession) @@ -166,33 +165,4 @@ object ScanWithDeletionVectors { val filterExp = keepRow(new Column(skipRowColumnRef)).expr Filter(filterExp, newScan) } - - private def createBroadcastDVMap( - spark: SparkSession, - tahoeFileIndex: TahoeFileIndex) - : Broadcast[Map[URI, DeletionVectorDescriptorWithFilterType]] = { - val filterTypes = tahoeFileIndex.rowIndexFilters.getOrElse(Map.empty) - - // Given there is no way to find the final filters, just select all files in the - // file index and create the DV map. - val filesWithDVs = tahoeFileIndex - .matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral)) - .filter(_.deletionVector != null) - // Attach filter types to FileActions, so that later [[DeltaParquetFileFormat]] could pick it up - // to decide which kind of rows should be filtered out. This info is necessary for reading CDC - // rows that have been deleted (marked in DV), in which case marked rows must be kept rather - // than filtered out. In such a case, the `filterTypes` map will be populated by [[CDCReader]] - // to indicate IF_NOT_CONTAINED filter should be used. In other cases, `filterTypes` will be - // empty, so we generate IF_CONTAINED as the default DV behavior. - val filePathToDVMap = filesWithDVs.map { addFile => - val key = absolutePath(tahoeFileIndex.path.toString, addFile.path).toUri - val filterType = - filterTypes.getOrElse(addFile.path, RowIndexFilterType.IF_CONTAINED) - val value = - DeletionVectorDescriptorWithFilterType(addFile.deletionVector, filterType) - key -> value - }.toMap - - spark.sparkContext.broadcast(filePathToDVMap) - } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeFileIndex.scala b/spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeFileIndex.scala index 827b0f110af..a74415fde9e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeFileIndex.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeFileIndex.scala @@ -22,13 +22,14 @@ import java.util.Objects import scala.collection.mutable import org.apache.spark.sql.delta.RowIndexFilterType -import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaErrors, DeltaLog, NoMapping, Snapshot, SnapshotDescriptor} +import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaErrors, DeltaLog, DeltaParquetFileFormat, Snapshot, SnapshotDescriptor} import org.apache.spark.sql.delta.DefaultRowCommitVersion import org.apache.spark.sql.delta.RowId import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol} import org.apache.spark.sql.delta.implicits._ import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.util.JsonUtils import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.fs.Path @@ -126,6 +127,19 @@ abstract class TahoeFileIndex( addFile.defaultRowCommitVersion.foreach(defaultRowCommitVersion => metadata.put(DefaultRowCommitVersion.METADATA_STRUCT_FIELD_NAME, defaultRowCommitVersion)) + // Attach filter types to FileActions, so that later [[DeltaParquetFileFormat]] could pick it up + // to decide which kind of rows should be filtered out. This info is necessary for reading CDC + // rows that have been deleted (marked in DV), in which case marked rows must be kept rather + // than filtered out. In such a case, the `filterTypes` map will be populated by [[CDCReader]] + // to indicate IF_NOT_CONTAINED filter should be used. In other cases, `filterTypes` will be + // empty, so we generate IF_CONTAINED as the default DV behavior + if (addFile.deletionVector != null) { + metadata.put(DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED, + JsonUtils.toJson(addFile.deletionVector)) + val filterType = rowIndexFilters.getOrElse(Map.empty) + .getOrElse(addFile.path, RowIndexFilterType.IF_CONTAINED) + metadata.put(DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE, filterType) + } FileStatusWithMetadata(fs, metadata.toMap) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala index 6a90293d223..5bd05590d8f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala @@ -16,15 +16,11 @@ package org.apache.spark.sql.delta -import org.apache.spark.sql.delta.DeltaParquetFileFormat.DeletionVectorDescriptorWithFilterType import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN -import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor -import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArray, RoaringBitmapArrayFormat} +import org.apache.spark.sql.delta.files.TahoeLogFileIndex import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore -import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore.pathToEscapedString import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaTestImplicits._ -import org.apache.spark.sql.delta.util.PathWithFileSystem import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.parquet.format.converter.ParquetMetadataConverter @@ -33,26 +29,31 @@ import org.apache.parquet.hadoop.ParquetFileReader import org.apache.spark.sql.{DataFrame, Dataset, QueryTest} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.util.SerializableConfiguration class DeltaParquetFileFormatSuite extends QueryTest - with SharedSparkSession with DeltaSQLCommandTest { + with SharedSparkSession + with DeletionVectorsTestUtils + with DeltaSQLCommandTest { import testImplicits._ + override def beforeAll(): Unit = { + super.beforeAll() + enableDeletionVectorsInNewTables(spark.conf) + } + // Read with deletion vectors has separate code paths based on vectorized Parquet // reader is enabled or not. Test both the combinations for { readIsRowDeletedCol <- BOOLEAN_DOMAIN readRowIndexCol <- BOOLEAN_DOMAIN - rowIndexFilterType <- Seq(RowIndexFilterType.IF_CONTAINED, RowIndexFilterType.IF_NOT_CONTAINED) // this isn't need to be tested as it is same as regular reading path without DVs. if readIsRowDeletedCol || readRowIndexCol } { testWithBothParquetReaders( "read DV metadata columns: " + s"with isRowDeletedCol=$readIsRowDeletedCol, " + - s"with rowIndexCol=$readRowIndexCol, " + - s"with rowIndexFilterType=$rowIndexFilterType") { + s"with rowIndexCol=$readRowIndexCol") { withTempDir { tempDir => val tablePath = tempDir.toString @@ -73,16 +74,13 @@ class DeltaParquetFileFormatSuite extends QueryTest // Fetch the only file in the DeltaLog snapshot val addFile = deltaLog.snapshot.allFiles.collect()(0) - val addFilePath = new Path(tempDir.toString, addFile.path) - assertParquetHasMultipleRowGroups(addFilePath) - val dv = generateDV(tablePath, 0, 200, 300, 756, 10352, 19999) + if (readIsRowDeletedCol) { + removeRowsFromFile(deltaLog, addFile, Seq(0, 200, 300, 756, 10352, 19999)) + } - val fs = addFilePath.getFileSystem(hadoopConf) - val broadcastDvMap = spark.sparkContext.broadcast( - Map(fs.getFileStatus(addFilePath).getPath().toUri -> - DeletionVectorDescriptorWithFilterType(dv, rowIndexFilterType)) - ) + val addFilePath = new Path(tempDir.toString, addFile.path) + assertParquetHasMultipleRowGroups(addFilePath) val broadcastHadoopConf = spark.sparkContext.broadcast( new SerializableConfiguration(hadoopConf)) @@ -94,10 +92,9 @@ class DeltaParquetFileFormatSuite extends QueryTest isSplittable = false, disablePushDowns = true, Some(tablePath), - if (readIsRowDeletedCol) Some(broadcastDvMap) else None, if (readIsRowDeletedCol) Some(broadcastHadoopConf) else None) - val fileIndex = DeltaLogFileIndex(deltaParquetFormat, fs, addFilePath :: Nil) + val fileIndex = TahoeLogFileIndex(spark, deltaLog) val relation = HadoopFsRelation( fileIndex, @@ -112,11 +109,7 @@ class DeltaParquetFileFormatSuite extends QueryTest // Select some rows that are deleted and some rows not deleted // Deleted row `value`: 0, 200, 300, 756, 10352, 19999 // Not deleted row `value`: 7, 900 - val (deletedColumnValue, notDeletedColumnValue) = rowIndexFilterType match { - case RowIndexFilterType.IF_CONTAINED => (1, 0) - case RowIndexFilterType.IF_NOT_CONTAINED => (0, 1) - case _ => (-1, -1) // Invalid, expecting the test to fail. - } + val (deletedColumnValue, notDeletedColumnValue) = (1, 0) checkDatasetUnorderly( Dataset.ofRows(spark, plan) .filter("value in (0, 7, 200, 300, 756, 900, 10352, 19999)") @@ -190,22 +183,6 @@ class DeltaParquetFileFormatSuite extends QueryTest hadoopConf().set("dfs.block.size", (1024 * 20).toString) } - /** Utility method that generates deletion vector based on the given row indexes */ - private def generateDV(tablePath: String, rowIndexes: Long *): DeletionVectorDescriptor = { - val bitmap = RoaringBitmapArray(rowIndexes: _*) - val tableWithFS = PathWithFileSystem.withConf(new Path(tablePath), hadoopConf) - val dvPath = dvStore.generateUniqueNameInTable(tableWithFS) - val serializedBitmap = bitmap.serializeAsByteArray(RoaringBitmapArrayFormat.Portable) - val dvRange = Utils.tryWithResource(dvStore.createWriter(dvPath)) { writer => - writer.write(serializedBitmap) - } - DeletionVectorDescriptor.onDiskWithAbsolutePath( - pathToEscapedString(dvPath.makeQualified().path), - dvRange.length, - rowIndexes.size, - Some(dvRange.offset)) - } - private def assertParquetHasMultipleRowGroups(filePath: Path): Unit = { val parquetMetadata = ParquetFileReader.readFooter( hadoopConf,