diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala index ca2f1d93447..03cc1bb7e59 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala @@ -52,7 +52,8 @@ trait DeltaColumnMappingBase extends DeltaLogging { protected val DELTA_INTERNAL_COLUMNS: Set[String] = (CDCReader.CDC_COLUMNS_IN_DATA ++ Seq( CDCReader.CDC_COMMIT_VERSION, - CDCReader.CDC_COMMIT_TIMESTAMP) + CDCReader.CDC_COMMIT_TIMESTAMP, + DeltaParquetFileFormat.IS_ROW_DELETED_COLUMN_NAME) ).map(_.toLowerCase(Locale.ROOT)).toSet val supportedModes: Set[DeltaColumnMappingMode] = diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala index 745acae4aa9..a5446ac981c 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala @@ -395,6 +395,16 @@ trait DeltaConfigsBase extends DeltaLogging { helpMessage = "needs to be a boolean.", minimumProtocolVersion = Some(AppendOnlyTableFeature.minProtocolVersion)) + /** + * Whether commands modifying this Delta table are allowed to create new deletion vectors. + */ + val ENABLE_DELETION_VECTORS_CREATION = buildConfig[Boolean]( + key = "enableDeletionVectors", + defaultValue = "false", + fromString = _.toBoolean, + validationFunction = _ => true, + helpMessage = "needs to be a boolean.", + minimumProtocolVersion = Some(DeletionVectorsTableFeature.minProtocolVersion)) /** * Whether this table will automatically optimize the layout of files during writes. diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala index 2e26a161bb1..15993598d65 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.delta +import java.net.URI + import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal @@ -49,12 +51,21 @@ class DeltaParquetFileFormat( val isSplittable: Boolean = true, val disablePushDowns: Boolean = false, val tablePath: Option[String] = None, - val broadcastDvMap: Option[Broadcast[Map[String, DeletionVectorDescriptor]]] = None, + val broadcastDvMap: Option[Broadcast[Map[URI, DeletionVectorDescriptor]]] = None, val broadcastHadoopConf: Option[Broadcast[SerializableConfiguration]] = None) extends ParquetFileFormat { // Validate either we have all arguments for DV enabled read or none of them. - require(!(broadcastHadoopConf.isDefined ^ broadcastDvMap.isDefined ^ tablePath.isDefined ^ - !isSplittable ^ disablePushDowns)) + if (broadcastHadoopConf.isDefined) { + require( + broadcastHadoopConf.isDefined && broadcastDvMap.isDefined && + tablePath.isDefined && !isSplittable && disablePushDowns, + "Wrong arguments for Delta table scan with deletion vectors") + } else { + require( + broadcastHadoopConf.isEmpty && broadcastDvMap.isEmpty && + tablePath.isEmpty && isSplittable && !disablePushDowns, + "Wrong arguments for Delta table scan with no deletion vectors") + } val columnMappingMode: DeltaColumnMappingMode = metadata.columnMappingMode val referenceSchema: StructType = metadata.schema @@ -147,7 +158,7 @@ class DeltaParquetFileFormat( def copyWithDVInfo( tablePath: String, - broadcastDvMap: Broadcast[Map[String, DeletionVectorDescriptor]], + broadcastDvMap: Broadcast[Map[URI, DeletionVectorDescriptor]], broadcastHadoopConf: Broadcast[SerializableConfiguration]): DeltaParquetFileFormat = { new DeltaParquetFileFormat( metadata, @@ -169,10 +180,10 @@ class DeltaParquetFileFormat( isRowDeletedColumnIdx: Int, useOffHeapBuffers: Boolean): Iterator[Object] = { val filePath = partitionedFile.filePath - val absolutePath = new Path(filePath).toString + val pathUri = new Path(filePath).toUri // Fetch the DV descriptor from the broadcast map and create a row index filter - val dvDescriptor = broadcastDvMap.get.value.get(absolutePath) + val dvDescriptor = broadcastDvMap.get.value.get(pathUri) val rowIndexFilter = DeletedRowsMarkingFilter.createInstance( dvDescriptor.getOrElse(DeletionVectorDescriptor.EMPTY), broadcastHadoopConf.get.value.value, diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaUDF.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaUDF.scala index fd4e302ff24..fb5647b82fb 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaUDF.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaUDF.scala @@ -47,6 +47,9 @@ object DeltaUDF { def booleanFromMap(f: Map[String, String] => Boolean): UserDefinedFunction = createUdfFromTemplateUnsafe(booleanFromMapTemplate, f, udf(f)) + def booleanFromByte(x: Byte => Boolean): UserDefinedFunction = + createUdfFromTemplateUnsafe(booleanFromByteTemplate, x, udf(x)) + private lazy val stringFromStringTemplate = udf[String, String](identity).asInstanceOf[SparkUserDefinedFunction] @@ -64,6 +67,9 @@ object DeltaUDF { private lazy val booleanFromMapTemplate = udf((_: Map[String, String]) => true).asInstanceOf[SparkUserDefinedFunction] + private lazy val booleanFromByteTemplate = + udf((_: Byte) => true).asInstanceOf[SparkUserDefinedFunction] + /** * Return a `UserDefinedFunction` for the given `f` from `template` if * `INTERNAL_UDF_OPTIMIZATION_ENABLED` is enabled. Otherwise, `orElse` will be called to create a diff --git a/core/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala b/core/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala new file mode 100644 index 00000000000..3126985c32c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala @@ -0,0 +1,182 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import java.net.URI + +import org.apache.spark.sql.delta.RowIndexFilter +import org.apache.spark.sql.delta.DeltaParquetFileFormat._ +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor +import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable +import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex} +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.util.DeltaFileOperations.absolutePath + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.{Column, SparkSession} +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + +/** + * Plan transformer to inject a filter that removes the rows marked as deleted according to + * deletion vectors. For tables with no deletion vectors, this transformation has no effect. + * + * It modifies for plan for tables with deletion vectors as follows: + * Before rule: -> Delta Scan (key, value). + * - Here we are reading `key`, `value`` columns from the Delta table + * After rule: + * -> + * Project(key, value) -> + * Filter (udf(__skip_row == 0) -> + * Delta Scan (key, value, __skip_row) + * - Here we insert a new column `__skip_row` in Delta scan. This value is populated by the + * Parquet reader using the DV corresponding to the Parquet file read + * (See [[DeltaParquetFileFormat]]) and it contains 0 if we want to keep the row. + * The scan created also disables Parquet file splitting and filter pushdowns, because + * in order to generate the __skip_row, we need to read the rows in a file consecutively + * to generate the row index. This is a cost we need to pay until we upgrade to latest + * Apache Spark which contains Parquet reader changes that automatically generate the + * row_index irrespective of the file splitting and filter pushdowns. + * - The scan created also contains a broadcast variable of Parquet File -> DV File map. + * The Parquet reader created uses this map to find the DV file corresponding to the data file. + * - Filter created filters out rows with __skip_row equals to 0 + * - And at the end we have a Project to keep the plan node output same as before the rule is + * applied. + */ +trait PreprocessTableWithDVs extends SubqueryTransformerHelper { + def preprocessTablesWithDVs(plan: LogicalPlan): LogicalPlan = { + transformSubqueryExpressions(plan) { + case ScanWithDeletionVectors(dvScan) => dvScan + } + } +} + +object ScanWithDeletionVectors { + def unapply(a: LogicalRelation): Option[LogicalPlan] = a match { + case scan @ LogicalRelation( + relation @ HadoopFsRelation( + index: TahoeFileIndex, _, _, _, format: DeltaParquetFileFormat, _), _, _, _) => + dvEnabledScanFor(scan, relation, format, index) + case _ => None + } + + def dvEnabledScanFor( + scan: LogicalRelation, + hadoopRelation: HadoopFsRelation, + fileFormat: DeltaParquetFileFormat, + index: TahoeFileIndex): Option[LogicalPlan] = { + // If the table has no DVs enabled, no change needed + if (!deletionVectorsReadable(index.protocol, index.metadata)) return None + + require(!index.isInstanceOf[TahoeLogFileIndex], + "Cannot work with a non-pinned table snapshot of the TahoeFileIndex") + + // If the table has no DVs enabled, no change needed + if (!deletionVectorsReadable(index.protocol, index.metadata)) return None + + // See if the relation is already modified to include DV reads as part of + // a previous invocation of this rule on this table + if (fileFormat.hasDeletionVectorMap()) return None + + // See if any files actually have a DV + val spark = SparkSession.getActiveSession.get + val filePathToDVBroadcastMap = createBroadcastDVMap(spark, index) + if (filePathToDVBroadcastMap.value.isEmpty) return None + + // Get the list of columns in the output of the `LogicalRelation` we are + // trying to modify. At the end of the plan, we need to return a + // `LogicalRelation` that has the same output as this `LogicalRelation` + val planOutput = scan.output + + val newScan = createScanWithSkipRowColumn( + spark, scan, fileFormat, index, filePathToDVBroadcastMap, hadoopRelation) + + // On top of the scan add a filter that filters out the rows which have + // skip row column value non-zero + val rowIndexFilter = createRowIndexFilterNode(newScan) + + // Now add a project on top of the row index filter node to + // remove the skip row column + Some(Project(planOutput, rowIndexFilter)) + } + /** + * Helper method that creates a new `LogicalRelation` for existing scan that outputs + * an extra column which indicates whether the row needs to be skipped or not. + */ + private def createScanWithSkipRowColumn( + spark: SparkSession, + inputScan: LogicalRelation, + fileFormat: DeltaParquetFileFormat, + tahoeFileIndex: TahoeFileIndex, + filePathToDVBroadcastMap: Broadcast[Map[URI, DeletionVectorDescriptor]], + hadoopFsRelation: HadoopFsRelation): LogicalRelation = { + // Create a new `LogicalRelation` that has modified `DeltaFileFormat` and output with an extra + // column to indicate whether to skip the row or not + + // Add a column for SKIP_ROW to the base output. Value of 0 means the row needs be kept, any + // other values mean the row needs be skipped. + val skipRowField = IS_ROW_DELETED_STRUCT_FIELD + val newScanOutput = inputScan.output :+ + AttributeReference(skipRowField.name, skipRowField.dataType)() + val newScanSchema = StructType(inputScan.schema).add(skipRowField) + + val hadoopConfBroadcast = spark.sparkContext.broadcast( + new SerializableConfiguration(tahoeFileIndex.deltaLog.newDeltaHadoopConf())) + + val newFileFormat = fileFormat.copyWithDVInfo( + tahoeFileIndex.path.toString, filePathToDVBroadcastMap, hadoopConfBroadcast) + val newRelation = hadoopFsRelation.copy( + fileFormat = newFileFormat, + dataSchema = newScanSchema)(hadoopFsRelation.sparkSession) + + // Create a new scan LogicalRelation + inputScan.copy(relation = newRelation, output = newScanOutput) + } + + private def createRowIndexFilterNode(newScan: LogicalRelation): Filter = { + val skipRowColumnRefs = newScan.output.filter(_.name == IS_ROW_DELETED_COLUMN_NAME) + require(skipRowColumnRefs.size == 1, + s"Expected only one column with name=$IS_ROW_DELETED_COLUMN_NAME") + val skipRowColumnRef = skipRowColumnRefs.head + + val keepRow = DeltaUDF.booleanFromByte( _ == RowIndexFilter.KEEP_ROW_VALUE) + .asNondeterministic() // To avoid constant folding the filter based on stats. + + val filterExp = keepRow(new Column(skipRowColumnRef)).expr + Filter(filterExp, newScan) + } + + private def createBroadcastDVMap( + spark: SparkSession, + tahoeFileIndex: TahoeFileIndex): Broadcast[Map[URI, DeletionVectorDescriptor]] = { + // Given there is no way to find the final filters, just select all files in the + // file index and create the DV map. + val filesWithDVs = + tahoeFileIndex.matchingFiles(Seq(TrueLiteral), Seq(TrueLiteral)) + .filter(_.deletionVector != null) + val filePathToDVMap = filesWithDVs + .map(x => + absolutePath(tahoeFileIndex.path.toString, x.path).toUri -> x.deletionVector) + .toMap + spark.sparkContext.broadcast(filePathToDVMap) + } +} diff --git a/core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index 32f13c571d0..a0d1295f0ab 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -137,7 +137,8 @@ class Snapshot( col("add.modificationTime"), col("add.dataChange"), col(ADD_STATS_TO_USE_COL_NAME).as("stats"), - col("add.tags") + col("add.tags"), + col("add.deletionVector") ))) .withColumn("remove", when( col("remove.path").isNotNull, diff --git a/core/src/main/scala/org/apache/spark/sql/delta/SubqueryTransformerHelper.scala b/core/src/main/scala/org/apache/spark/sql/delta/SubqueryTransformerHelper.scala new file mode 100644 index 00000000000..97224863bcb --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/SubqueryTransformerHelper.scala @@ -0,0 +1,61 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.catalyst.expressions.SubqueryExpression +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery, SupportsSubquery} + +/** + * Trait to allow processing a special transformation of [[SubqueryExpression]] + * instances in a query plan. + */ +trait SubqueryTransformerHelper { + + /** + * Transform all nodes matched by the rule in the query plan rooted at given `plan`. + * It traverses the tree starting from the leaves, whenever a [[SubqueryExpression]] + * expression is encountered, given [[rule]] is applied to the subquery plan `plan` + * in [[SubqueryExpression]] starting from the `plan` root until leaves. + * + * This is slightly different behavior compared to [[QueryPlan.transformUpWithSubqueries]] + * or [[QueryPlan.transformDownWithSubqueries]] + * + * It requires that the given plan already gone through [[OptimizeSubqueries]] and the + * root node denoting a subquery is removed and optimized appropriately. + */ + def transformSubqueryExpressions(plan: LogicalPlan) + (rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = { + require(!isSubqueryRoot(plan)) + transformSubqueries(plan, rule) transform (rule) + } + + /** Is the give plan a subquery root. */ + def isSubqueryRoot(plan: LogicalPlan): Boolean = { + plan.isInstanceOf[Subquery] || plan.isInstanceOf[SupportsSubquery] + } + + private def transformSubqueries( + plan: LogicalPlan, + rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = { + import org.apache.spark.sql.delta.implicits._ + + plan transformAllExpressionsUp { + case subquery: SubqueryExpression => + subquery.withNewPlan(transformSubqueryExpressions(subquery.plan)(rule)) + } + } +} diff --git a/core/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/core/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index 9e2c9f00e99..3644133ac71 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -196,7 +196,8 @@ object TableFeature { IdentityColumnsTableFeature, GeneratedColumnsTableFeature, InvariantsTableFeature, - ColumnMappingTableFeature) + ColumnMappingTableFeature, + DeletionVectorsTableFeature) if (DeltaUtils.isTesting) { features ++= Set( TestLegacyWriterFeature, @@ -295,6 +296,16 @@ object IdentityColumnsTableFeature } } +object DeletionVectorsTableFeature + extends ReaderWriterFeature(name = "deletionVectors") + with FeatureAutomaticallyEnabledByMetadata { + override def metadataRequiresFeatureToBeEnabled( + metadata: Metadata, + spark: SparkSession): Boolean = { + DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.fromMetaData(metadata) + } +} + /** * Features below are for testing only, and are being registered to the system only in the testing * environment. See [[TableFeature.allSupportedFeaturesMap]] for the registration. diff --git a/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala index 37a5102d814..93e4841d77b 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala @@ -389,7 +389,8 @@ case class AddFile( modificationTime: Long, override val dataChange: Boolean, stats: String = null, - override val tags: Map[String, String] = null + override val tags: Map[String, String] = null, + deletionVector: DeletionVectorDescriptor = null ) extends FileAction { require(path.nonEmpty) @@ -526,7 +527,8 @@ case class RemoveFile( partitionValues: Map[String, String] = null, @JsonDeserialize(contentAs = classOf[java.lang.Long]) size: Option[Long] = None, - override val tags: Map[String, String] = null + override val tags: Map[String, String] = null, + deletionVector: DeletionVectorDescriptor = null ) extends FileAction { override def wrap: SingleAction = SingleAction(remove = this) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/DeletionVectorUtils.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/DeletionVectorUtils.scala new file mode 100644 index 00000000000..c1f80cbe44e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/DeletionVectorUtils.scala @@ -0,0 +1,86 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.commands + +import org.apache.spark.sql.delta.{DeletionVectorsTableFeature, DeltaConfigs, Snapshot, SnapshotDescriptor} +import org.apache.spark.sql.delta.actions.{Metadata, Protocol} +import org.apache.spark.sql.delta.files.TahoeFileIndex + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.FileIndex +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.internal.SQLConf + +trait DeletionVectorUtils { + + /** + * Run a query on the delta log to determine if the given snapshot contains no deletion vectors. + * Return `false` if it does contain deletion vectors. + */ + def isTableDVFree(spark: SparkSession, snapshot: Snapshot): Boolean = { + val dvsReadable = deletionVectorsReadable(snapshot) + + if (dvsReadable) { + val dvCount = snapshot.allFiles + .filter(col("deletionVector").isNotNull) + .limit(1) + .count() + + dvCount == 0L + } else { + true + } + } + + /** + * Returns true if persistent deletion vectors are enabled and + * readable with the current reader version. + */ + def fileIndexSupportsReadingDVs(fileIndex: FileIndex): Boolean = fileIndex match { + case index: TahoeFileIndex => deletionVectorsReadable(index) + case _ => false + } + + def deletionVectorsWritable( + snapshot: SnapshotDescriptor, + newProtocol: Option[Protocol] = None, + newMetadata: Option[Metadata] = None): Boolean = { + def protocol = newProtocol.getOrElse(snapshot.protocol) + def metadata = newMetadata.getOrElse(snapshot.metadata) + protocol.isFeatureEnabled(DeletionVectorsTableFeature) && + DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.fromMetaData(metadata) + } + + def deletionVectorsReadable( + snapshot: SnapshotDescriptor, + newProtocol: Option[Protocol] = None, + newMetadata: Option[Metadata] = None): Boolean = { + deletionVectorsReadable( + newProtocol.getOrElse(snapshot.protocol), + newMetadata.getOrElse(snapshot.metadata)) + } + + def deletionVectorsReadable( + protocol: Protocol, + metadata: Metadata): Boolean = { + protocol.isFeatureEnabled(DeletionVectorsTableFeature) && + metadata.format.provider == "parquet" // DVs are only supported on parquet tables. + } +} + +// To access utilities from places where mixing in a trait is inconvenient. +object DeletionVectorUtils extends DeletionVectorUtils diff --git a/core/src/main/scala/org/apache/spark/sql/delta/perf/OptimizeMetadataOnlyDeltaQuery.scala b/core/src/main/scala/org/apache/spark/sql/delta/perf/OptimizeMetadataOnlyDeltaQuery.scala index f5bb592f53f..057f7bcbace 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/perf/OptimizeMetadataOnlyDeltaQuery.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/perf/OptimizeMetadataOnlyDeltaQuery.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.delta.DeltaTable import org.apache.spark.sql.delta.files.TahoeLogFileIndex import org.apache.spark.sql.delta.stats.DeltaScanGenerator -import org.apache.spark.sql.functions.{col, count, sum, when} +import org.apache.spark.sql.functions.{coalesce, col, count, lit, sum, when} trait OptimizeMetadataOnlyDeltaQuery { def optimizeQueryWithMetadata(plan: LogicalPlan): LogicalPlan = { @@ -48,10 +48,13 @@ trait OptimizeMetadataOnlyDeltaQuery { /** Return the number of rows in the table or `None` if we cannot calculate it from stats */ private def extractGlobalCount(tahoeLogFileIndex: TahoeLogFileIndex): Option[Long] = { - // TODO Update this to work with DV (https://github.com/delta-io/delta/issues/1485) + // account for deleted rows according to deletion vectors + val dvCardinality = coalesce(col("deletionVector.cardinality"), lit(0)) + val numLogicalRecords = (col("stats.numRecords") - dvCardinality).as("numLogicalRecords") + val row = getDeltaScanGenerator(tahoeLogFileIndex).filesWithStatsForScan(Nil) .agg( - sum("stats.numRecords"), + sum(numLogicalRecords), // Calculate the number of files missing `numRecords` count(when(col("stats.numRecords").isNull, 1))) .first diff --git a/core/src/main/scala/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala b/core/src/main/scala/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala index 36b597d6108..bad64a24cb7 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala @@ -51,7 +51,8 @@ import org.apache.spark.sql.types.StructType trait PrepareDeltaScanBase extends Rule[LogicalPlan] with PredicateHelper with DeltaLogging - with OptimizeMetadataOnlyDeltaQuery { self: PrepareDeltaScan => + with OptimizeMetadataOnlyDeltaQuery + with PreprocessTableWithDVs { self: PrepareDeltaScan => /** * Tracks the first-access snapshots of other logs planned by this rule. The snapshots are @@ -139,33 +140,15 @@ trait PrepareDeltaScanBase extends Rule[LogicalPlan] // delta scans. val deltaScans = new mutable.HashMap[LogicalPlan, DeltaScan]() - /* - * We need to first prepare the scans in the subqueries of a node. Otherwise, because of the - * short-circuiting nature of the pattern matching in the transform method, if a - * PhysicalOperation node is matched, its subqueries that may contain other PhysicalOperation - * nodes will be skipped. - */ - def transformSubqueries(plan: LogicalPlan): LogicalPlan = { - import org.apache.spark.sql.delta.implicits._ - - plan transformAllExpressionsUp { - case subquery: SubqueryExpression => - subquery.withNewPlan(transform(subquery.plan)) - } - } - - def transform(plan: LogicalPlan): LogicalPlan = - transformSubqueries(plan) transform { - case scan @ DeltaTableScan(canonicalizedPlanWithRemovedProjections, filters, fileIndex, + transformSubqueryExpressions(plan) { + case scan @ DeltaTableScan(planWithRemovedProjections, filters, fileIndex, limit, delta) => val scanGenerator = getDeltaScanGenerator(fileIndex) - val preparedScan = deltaScans.getOrElseUpdate(canonicalizedPlanWithRemovedProjections, + val preparedScan = deltaScans.getOrElseUpdate(planWithRemovedProjections, filesForScan(scanGenerator, limit, filters, delta)) val preparedIndex = getPreparedIndex(preparedScan, fileIndex) optimizeGeneratedColumns(scan, preparedIndex, filters, limit, delta) } - - transform(plan) } protected def optimizeGeneratedColumns( @@ -204,9 +187,9 @@ trait PrepareDeltaScanBase extends Rule[LogicalPlan] val shouldPrepareDeltaScan = ( spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STATS_SKIPPING) ) - if (shouldPrepareDeltaScan) { + val updatedPlan = if (shouldPrepareDeltaScan) { // Should not be applied to subqueries to avoid duplicate delta jobs. - val isSubquery = plan.isInstanceOf[Subquery] || plan.isInstanceOf[SupportsSubquery] + val isSubquery = isSubqueryRoot(plan) // Should not be applied to DataSourceV2 write plans, because they'll be planned later // through a V1 fallback and only that later planning takes place within the transaction. val isDataSourceV2 = plan.isInstanceOf[V2WriteCommand] @@ -217,7 +200,6 @@ trait PrepareDeltaScanBase extends Rule[LogicalPlan] if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED)) { plan = optimizeQueryWithMetadata(plan) } - prepareDeltaScan(plan) } else { // If this query is running inside an active transaction and is touching the same table @@ -233,6 +215,7 @@ trait PrepareDeltaScanBase extends Rule[LogicalPlan] // It will fall back to just partition pruning at planning time. plan } + preprocessTablesWithDVs(updatedPlan) } /** diff --git a/core/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000000.json b/core/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000000.json new file mode 100644 index 00000000000..872324f47a5 --- /dev/null +++ b/core/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000000.json @@ -0,0 +1,23 @@ +{"commitInfo":{"timestamp":1674064770682,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"20","numOutputRows":"2000","numOutputBytes":"20157"},"engineInfo":"Databricks-Runtime/","txnId":"f0ddc566-dfe6-4bd8-b264-ce100f9362ef"}} +{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}} +{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true"},"createdTime":1674064767118}} +{"add":{"path":"part-00000-f5c18e7b-d1bf-4ba5-85dd-e63ddc5931bf-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064769860,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":4},\"maxValues\":{\"value\":1967},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064769860000","MIN_INSERTION_TIME":"1674064769860000","MAX_INSERTION_TIME":"1674064769860000","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00001-5dbf0ba2-220a-4770-8e26-18a77cf875f0-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064769860,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":18},\"maxValues\":{\"value\":1988},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064769860001","MIN_INSERTION_TIME":"1674064769860001","MAX_INSERTION_TIME":"1674064769860001","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00002-5459a52f-3fd3-4b79-83a6-e7f57db28650-c000.snappy.parquet","partitionValues":{},"size":1007,"modificationTime":1674064770019,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":16},\"maxValues\":{\"value\":1977},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064769860002","MIN_INSERTION_TIME":"1674064769860002","MAX_INSERTION_TIME":"1674064769860002","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00003-0e842060-9e04-4896-ba21-029309ab8736-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064770019,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":5},\"maxValues\":{\"value\":1982},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064769860003","MIN_INSERTION_TIME":"1674064769860003","MAX_INSERTION_TIME":"1674064769860003","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00004-a72dbdec-2d0e-43d8-a756-4d0d63ef9fcb-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064770100,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":1},\"maxValues\":{\"value\":1999},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064769860004","MIN_INSERTION_TIME":"1674064769860004","MAX_INSERTION_TIME":"1674064769860004","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00005-0972979f-852d-4f3e-8f64-bf0bf072de5f-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064770100,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":8},\"maxValues\":{\"value\":1914},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064769860005","MIN_INSERTION_TIME":"1674064769860005","MAX_INSERTION_TIME":"1674064769860005","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00006-227c6a1e-0180-4feb-8816-19eccf7939f5-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064770207,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":30},\"maxValues\":{\"value\":1992},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064769860006","MIN_INSERTION_TIME":"1674064769860006","MAX_INSERTION_TIME":"1674064769860006","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00007-7c37e5e3-abb2-419e-8cba-eba4eeb3b11a-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064770207,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":40},\"maxValues\":{\"value\":1990},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064769860007","MIN_INSERTION_TIME":"1674064769860007","MAX_INSERTION_TIME":"1674064769860007","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00008-1a0b4375-bbcc-4f3c-8e51-ecb551c89430-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064770265,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":13},\"maxValues\":{\"value\":1897},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064769860008","MIN_INSERTION_TIME":"1674064769860008","MAX_INSERTION_TIME":"1674064769860008","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00009-52689115-1770-4f15-b98d-b942db5b7359-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064770265,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":12},\"maxValues\":{\"value\":1987},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064769860009","MIN_INSERTION_TIME":"1674064769860009","MAX_INSERTION_TIME":"1674064769860009","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00010-7f35fa1b-7993-4aff-8f60-2b76f1eb3f2c-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064770319,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":19},\"maxValues\":{\"value\":1993},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064769860010","MIN_INSERTION_TIME":"1674064769860010","MAX_INSERTION_TIME":"1674064769860010","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00011-fce7841f-be9a-43b8-b283-9e2308ef5487-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064770319,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":11},\"maxValues\":{\"value\":1984},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064769860011","MIN_INSERTION_TIME":"1674064769860011","MAX_INSERTION_TIME":"1674064769860011","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00012-9b83c213-31ff-4b2c-a5d9-be1a2bc2431d-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064770372,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":33},\"maxValues\":{\"value\":1995},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064769860012","MIN_INSERTION_TIME":"1674064769860012","MAX_INSERTION_TIME":"1674064769860012","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00013-c6b05dd2-0143-4e9f-a231-1a2d08a83a0e-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064770372,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":20},\"maxValues\":{\"value\":1974},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064769860013","MIN_INSERTION_TIME":"1674064769860013","MAX_INSERTION_TIME":"1674064769860013","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00014-41a4f51e-62cd-41f5-bb03-afba1e70ea29-c000.snappy.parquet","partitionValues":{},"size":1007,"modificationTime":1674064770427,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":3},\"maxValues\":{\"value\":1996},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064769860014","MIN_INSERTION_TIME":"1674064769860014","MAX_INSERTION_TIME":"1674064769860014","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00015-f2f141bb-fa8f-4553-a5db-d1b8d682153b-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064770427,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":1997},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064769860015","MIN_INSERTION_TIME":"1674064769860015","MAX_INSERTION_TIME":"1674064769860015","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00016-d8f58ffc-8bff-4e12-b709-e628f9bf2553-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064770477,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":2},\"maxValues\":{\"value\":1986},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064769860016","MIN_INSERTION_TIME":"1674064769860016","MAX_INSERTION_TIME":"1674064769860016","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00017-45bac3c9-7eb8-42cb-bb51-fc5b4dd0be10-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064770476,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":22},\"maxValues\":{\"value\":1998},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064769860017","MIN_INSERTION_TIME":"1674064769860017","MAX_INSERTION_TIME":"1674064769860017","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00018-9d74a51b-b800-4e4d-a258-738e585a78a5-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064770529,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":6},\"maxValues\":{\"value\":1983},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064769860018","MIN_INSERTION_TIME":"1674064769860018","MAX_INSERTION_TIME":"1674064769860018","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00019-a9bb3ce8-afba-47ec-8451-13edcd855b15-c000.snappy.parquet","partitionValues":{},"size":1007,"modificationTime":1674064770528,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":36},\"maxValues\":{\"value\":1969},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064769860019","MIN_INSERTION_TIME":"1674064769860019","MAX_INSERTION_TIME":"1674064769860019","OPTIMIZE_TARGET_SIZE":"268435456"}}} diff --git a/core/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000001.json b/core/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000001.json new file mode 100644 index 00000000000..d486e40ba0f --- /dev/null +++ b/core/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000001.json @@ -0,0 +1,11 @@ +{"commitInfo":{"timestamp":1674064789962,"operation":"DELETE","operationParameters":{"predicate":"[\"(spark_catalog.delta.`/private/var/folders/g3/hcd28y8s71s0yh7whh443wz00000gp/T/spark-f3dd4a29-dc57-42eb-b752-84179135f5b8`.value IN (0, 180, 300, 700, 1800))\"]"},"readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"0","numCopiedRows":"0","numDeletionVectorsAdded":"5","numDeletionVectorsRemoved":"0","numAddedChangeFiles":"0","executionTimeMs":"12828","numDeletedRows":"5","scanTimeMs":"12323","numAddedFiles":"0","rewriteTimeMs":"487"},"engineInfo":"Databricks-Runtime/","txnId":"5327cd46-c25b-4127-88fd-5b3c2402691b"}} +{"remove":{"path":"part-00001-5dbf0ba2-220a-4770-8e26-18a77cf875f0-c000.snappy.parquet","deletionTimestamp":1674064789957,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":1008,"tags":{"INSERTION_TIME":"1674064769860001","MIN_INSERTION_TIME":"1674064769860001","MAX_INSERTION_TIME":"1674064769860001","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"remove":{"path":"part-00003-0e842060-9e04-4896-ba21-029309ab8736-c000.snappy.parquet","deletionTimestamp":1674064789957,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":1008,"tags":{"INSERTION_TIME":"1674064769860003","MIN_INSERTION_TIME":"1674064769860003","MAX_INSERTION_TIME":"1674064769860003","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"remove":{"path":"part-00012-9b83c213-31ff-4b2c-a5d9-be1a2bc2431d-c000.snappy.parquet","deletionTimestamp":1674064789957,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":1008,"tags":{"INSERTION_TIME":"1674064769860012","MIN_INSERTION_TIME":"1674064769860012","MAX_INSERTION_TIME":"1674064769860012","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"remove":{"path":"part-00015-f2f141bb-fa8f-4553-a5db-d1b8d682153b-c000.snappy.parquet","deletionTimestamp":1674064789957,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":1008,"tags":{"INSERTION_TIME":"1674064769860015","MIN_INSERTION_TIME":"1674064769860015","MAX_INSERTION_TIME":"1674064769860015","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"remove":{"path":"part-00019-a9bb3ce8-afba-47ec-8451-13edcd855b15-c000.snappy.parquet","deletionTimestamp":1674064789957,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":1007,"tags":{"INSERTION_TIME":"1674064769860019","MIN_INSERTION_TIME":"1674064769860019","MAX_INSERTION_TIME":"1674064769860019","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00001-5dbf0ba2-220a-4770-8e26-18a77cf875f0-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064769860,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":18},\"maxValues\":{\"value\":1988},\"nullCount\":{\"value\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1674064769860001","MIN_INSERTION_TIME":"1674064769860001","MAX_INSERTION_TIME":"1674064769860001","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"m9JzgVlI!?Oy<+3x+y^b","offset":85,"sizeInBytes":34,"cardinality":1}}} +{"add":{"path":"part-00003-0e842060-9e04-4896-ba21-029309ab8736-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064770019,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":5},\"maxValues\":{\"value\":1982},\"nullCount\":{\"value\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1674064769860003","MIN_INSERTION_TIME":"1674064769860003","MAX_INSERTION_TIME":"1674064769860003","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"m9JzgVlI!?Oy<+3x+y^b","offset":169,"sizeInBytes":34,"cardinality":1}}} +{"add":{"path":"part-00012-9b83c213-31ff-4b2c-a5d9-be1a2bc2431d-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064770372,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":33},\"maxValues\":{\"value\":1995},\"nullCount\":{\"value\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1674064769860012","MIN_INSERTION_TIME":"1674064769860012","MAX_INSERTION_TIME":"1674064769860012","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"m9JzgVlI!?Oy<+3x+y^b","offset":1,"sizeInBytes":34,"cardinality":1}}} +{"add":{"path":"part-00015-f2f141bb-fa8f-4553-a5db-d1b8d682153b-c000.snappy.parquet","partitionValues":{},"size":1008,"modificationTime":1674064770427,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":1997},\"nullCount\":{\"value\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1674064769860015","MIN_INSERTION_TIME":"1674064769860015","MAX_INSERTION_TIME":"1674064769860015","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"m9JzgVlI!?Oy<+3x+y^b","offset":43,"sizeInBytes":34,"cardinality":1}}} +{"add":{"path":"part-00019-a9bb3ce8-afba-47ec-8451-13edcd855b15-c000.snappy.parquet","partitionValues":{},"size":1007,"modificationTime":1674064770528,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"value\":36},\"maxValues\":{\"value\":1969},\"nullCount\":{\"value\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1674064769860019","MIN_INSERTION_TIME":"1674064769860019","MAX_INSERTION_TIME":"1674064769860019","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"m9JzgVlI!?Oy<+3x+y^b","offset":127,"sizeInBytes":34,"cardinality":1}}} diff --git a/core/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000002.json b/core/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000002.json new file mode 100644 index 00000000000..752769167f3 --- /dev/null +++ b/core/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000002.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1674064791599,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":1,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"2","numOutputBytes":"600"},"engineInfo":"Databricks-Runtime/","txnId":"fb0a7015-0096-4d74-821b-3507163c17fa"}} +{"add":{"path":"part-00000-51219d56-88a7-41cc-be5d-eada75aceb4f-c000.snappy.parquet","partitionValues":{},"size":600,"modificationTime":1674064791593,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"value\":300},\"maxValues\":{\"value\":700},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064791593000","MIN_INSERTION_TIME":"1674064791593000","MAX_INSERTION_TIME":"1674064791593000","OPTIMIZE_TARGET_SIZE":"268435456"}}} diff --git a/core/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000003.json b/core/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000003.json new file mode 100644 index 00000000000..e3ddf79c740 --- /dev/null +++ b/core/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000003.json @@ -0,0 +1,13 @@ +{"commitInfo":{"timestamp":1674064797400,"operation":"DELETE","operationParameters":{"predicate":"[\"(spark_catalog.delta.`/private/var/folders/g3/hcd28y8s71s0yh7whh443wz00000gp/T/spark-f3dd4a29-dc57-42eb-b752-84179135f5b8`.value IN (300, 250, 350, 900, 1353, 1567, 1800))\"]"},"readVersion":2,"isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"0","numCopiedRows":"0","numDeletionVectorsAdded":"6","numDeletionVectorsRemoved":"3","numAddedChangeFiles":"0","executionTimeMs":"4726","numDeletedRows":"6","scanTimeMs":"4057","numAddedFiles":"0","rewriteTimeMs":"667"},"engineInfo":"Databricks-Runtime/","txnId":"d50de74c-f8c8-4e68-b120-267504045e9d"}} +{"remove":{"path":"part-00000-51219d56-88a7-41cc-be5d-eada75aceb4f-c000.snappy.parquet","deletionTimestamp":1674064797399,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":600,"tags":{"INSERTION_TIME":"1674064791593000","MIN_INSERTION_TIME":"1674064791593000","MAX_INSERTION_TIME":"1674064791593000","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"remove":{"path":"part-00001-5dbf0ba2-220a-4770-8e26-18a77cf875f0-c000.snappy.parquet","deletionTimestamp":1674064797399,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":1008,"tags":{"INSERTION_TIME":"1674064769860001","MIN_INSERTION_TIME":"1674064769860001","MAX_INSERTION_TIME":"1674064769860001","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"m9JzgVlI!?Oy<+3x+y^b","offset":85,"sizeInBytes":34,"cardinality":1}}} +{"remove":{"path":"part-00012-9b83c213-31ff-4b2c-a5d9-be1a2bc2431d-c000.snappy.parquet","deletionTimestamp":1674064797399,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":1008,"tags":{"INSERTION_TIME":"1674064769860012","MIN_INSERTION_TIME":"1674064769860012","MAX_INSERTION_TIME":"1674064769860012","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"m9JzgVlI!?Oy<+3x+y^b","offset":1,"sizeInBytes":34,"cardinality":1}}} +{"remove":{"path":"part-00014-41a4f51e-62cd-41f5-bb03-afba1e70ea29-c000.snappy.parquet","deletionTimestamp":1674064797399,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":1007,"tags":{"INSERTION_TIME":"1674064769860014","MIN_INSERTION_TIME":"1674064769860014","MAX_INSERTION_TIME":"1674064769860014","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"remove":{"path":"part-00018-9d74a51b-b800-4e4d-a258-738e585a78a5-c000.snappy.parquet","deletionTimestamp":1674064797399,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":1008,"tags":{"INSERTION_TIME":"1674064769860018","MIN_INSERTION_TIME":"1674064769860018","MAX_INSERTION_TIME":"1674064769860018","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"remove":{"path":"part-00019-a9bb3ce8-afba-47ec-8451-13edcd855b15-c000.snappy.parquet","deletionTimestamp":1674064797399,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":1007,"tags":{"INSERTION_TIME":"1674064769860019","MIN_INSERTION_TIME":"1674064769860019","MAX_INSERTION_TIME":"1674064769860019","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"m9JzgVlI!?Oy<+3x+y^b","offset":127,"sizeInBytes":34,"cardinality":1}}} +{"add":{"path":"part-00000-51219d56-88a7-41cc-be5d-eada75aceb4f-c000.snappy.parquet","partitionValues":{},"size":600,"modificationTime":1674064791593,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"value\":300},\"maxValues\":{\"value\":700},\"nullCount\":{\"value\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1674064791593000","MIN_INSERTION_TIME":"1674064791593000","MAX_INSERTION_TIME":"1674064791593000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"UGM+pBY.mtVeP","txnId":"4016704a-babb-44a8-ae8b-c53303465742"}} +{"add":{"path":"part-00000-7c52eadd-8da7-4782-a5d5-621cd92cab11-c000.snappy.parquet","partitionValues":{},"size":600,"modificationTime":1674064798704,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"value\":900},\"maxValues\":{\"value\":1567},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1674064798704000","MIN_INSERTION_TIME":"1674064798704000","MAX_INSERTION_TIME":"1674064798704000","OPTIMIZE_TARGET_SIZE":"268435456"}}} diff --git a/core/src/test/resources/delta/table-with-dv-large/deletion_vector_44ccbf3f-b223-4581-9cd8-a7e569120ada.bin b/core/src/test/resources/delta/table-with-dv-large/deletion_vector_44ccbf3f-b223-4581-9cd8-a7e569120ada.bin new file mode 100644 index 00000000000..e729ea4e696 Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/deletion_vector_44ccbf3f-b223-4581-9cd8-a7e569120ada.bin differ diff --git a/core/src/test/resources/delta/table-with-dv-large/deletion_vector_afcbf9f8-7558-4a5a-b1e2-7432c30bf452.bin b/core/src/test/resources/delta/table-with-dv-large/deletion_vector_afcbf9f8-7558-4a5a-b1e2-7432c30bf452.bin new file mode 100644 index 00000000000..e45492fbf48 Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/deletion_vector_afcbf9f8-7558-4a5a-b1e2-7432c30bf452.bin differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00000-51219d56-88a7-41cc-be5d-eada75aceb4f-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00000-51219d56-88a7-41cc-be5d-eada75aceb4f-c000.snappy.parquet new file mode 100644 index 00000000000..e3cb2ff9e2d Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00000-51219d56-88a7-41cc-be5d-eada75aceb4f-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00000-7c52eadd-8da7-4782-a5d5-621cd92cab11-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00000-7c52eadd-8da7-4782-a5d5-621cd92cab11-c000.snappy.parquet new file mode 100644 index 00000000000..8f856e0a336 Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00000-7c52eadd-8da7-4782-a5d5-621cd92cab11-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00000-f5c18e7b-d1bf-4ba5-85dd-e63ddc5931bf-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00000-f5c18e7b-d1bf-4ba5-85dd-e63ddc5931bf-c000.snappy.parquet new file mode 100644 index 00000000000..eb7f6909018 Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00000-f5c18e7b-d1bf-4ba5-85dd-e63ddc5931bf-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00001-5dbf0ba2-220a-4770-8e26-18a77cf875f0-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00001-5dbf0ba2-220a-4770-8e26-18a77cf875f0-c000.snappy.parquet new file mode 100644 index 00000000000..c57f1d7871d Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00001-5dbf0ba2-220a-4770-8e26-18a77cf875f0-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00002-5459a52f-3fd3-4b79-83a6-e7f57db28650-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00002-5459a52f-3fd3-4b79-83a6-e7f57db28650-c000.snappy.parquet new file mode 100644 index 00000000000..fbafa84440a Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00002-5459a52f-3fd3-4b79-83a6-e7f57db28650-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00003-0e842060-9e04-4896-ba21-029309ab8736-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00003-0e842060-9e04-4896-ba21-029309ab8736-c000.snappy.parquet new file mode 100644 index 00000000000..55d4f847138 Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00003-0e842060-9e04-4896-ba21-029309ab8736-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00004-a72dbdec-2d0e-43d8-a756-4d0d63ef9fcb-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00004-a72dbdec-2d0e-43d8-a756-4d0d63ef9fcb-c000.snappy.parquet new file mode 100644 index 00000000000..38b1af17dcd Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00004-a72dbdec-2d0e-43d8-a756-4d0d63ef9fcb-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00005-0972979f-852d-4f3e-8f64-bf0bf072de5f-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00005-0972979f-852d-4f3e-8f64-bf0bf072de5f-c000.snappy.parquet new file mode 100644 index 00000000000..9391c846a62 Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00005-0972979f-852d-4f3e-8f64-bf0bf072de5f-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00006-227c6a1e-0180-4feb-8816-19eccf7939f5-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00006-227c6a1e-0180-4feb-8816-19eccf7939f5-c000.snappy.parquet new file mode 100644 index 00000000000..ea18948b1ad Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00006-227c6a1e-0180-4feb-8816-19eccf7939f5-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00007-7c37e5e3-abb2-419e-8cba-eba4eeb3b11a-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00007-7c37e5e3-abb2-419e-8cba-eba4eeb3b11a-c000.snappy.parquet new file mode 100644 index 00000000000..e640051c8e4 Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00007-7c37e5e3-abb2-419e-8cba-eba4eeb3b11a-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00008-1a0b4375-bbcc-4f3c-8e51-ecb551c89430-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00008-1a0b4375-bbcc-4f3c-8e51-ecb551c89430-c000.snappy.parquet new file mode 100644 index 00000000000..c95daac1d18 Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00008-1a0b4375-bbcc-4f3c-8e51-ecb551c89430-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00009-52689115-1770-4f15-b98d-b942db5b7359-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00009-52689115-1770-4f15-b98d-b942db5b7359-c000.snappy.parquet new file mode 100644 index 00000000000..1d17a4cd184 Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00009-52689115-1770-4f15-b98d-b942db5b7359-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00010-7f35fa1b-7993-4aff-8f60-2b76f1eb3f2c-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00010-7f35fa1b-7993-4aff-8f60-2b76f1eb3f2c-c000.snappy.parquet new file mode 100644 index 00000000000..e011b569bc5 Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00010-7f35fa1b-7993-4aff-8f60-2b76f1eb3f2c-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00011-fce7841f-be9a-43b8-b283-9e2308ef5487-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00011-fce7841f-be9a-43b8-b283-9e2308ef5487-c000.snappy.parquet new file mode 100644 index 00000000000..83360c68c2a Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00011-fce7841f-be9a-43b8-b283-9e2308ef5487-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00012-9b83c213-31ff-4b2c-a5d9-be1a2bc2431d-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00012-9b83c213-31ff-4b2c-a5d9-be1a2bc2431d-c000.snappy.parquet new file mode 100644 index 00000000000..1426316cc86 Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00012-9b83c213-31ff-4b2c-a5d9-be1a2bc2431d-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00013-c6b05dd2-0143-4e9f-a231-1a2d08a83a0e-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00013-c6b05dd2-0143-4e9f-a231-1a2d08a83a0e-c000.snappy.parquet new file mode 100644 index 00000000000..680b7371166 Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00013-c6b05dd2-0143-4e9f-a231-1a2d08a83a0e-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00014-41a4f51e-62cd-41f5-bb03-afba1e70ea29-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00014-41a4f51e-62cd-41f5-bb03-afba1e70ea29-c000.snappy.parquet new file mode 100644 index 00000000000..267e8de72c4 Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00014-41a4f51e-62cd-41f5-bb03-afba1e70ea29-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00015-f2f141bb-fa8f-4553-a5db-d1b8d682153b-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00015-f2f141bb-fa8f-4553-a5db-d1b8d682153b-c000.snappy.parquet new file mode 100644 index 00000000000..65feba93d8b Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00015-f2f141bb-fa8f-4553-a5db-d1b8d682153b-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00016-d8f58ffc-8bff-4e12-b709-e628f9bf2553-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00016-d8f58ffc-8bff-4e12-b709-e628f9bf2553-c000.snappy.parquet new file mode 100644 index 00000000000..61ef5982f65 Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00016-d8f58ffc-8bff-4e12-b709-e628f9bf2553-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00017-45bac3c9-7eb8-42cb-bb51-fc5b4dd0be10-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00017-45bac3c9-7eb8-42cb-bb51-fc5b4dd0be10-c000.snappy.parquet new file mode 100644 index 00000000000..0a6a0b69ed6 Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00017-45bac3c9-7eb8-42cb-bb51-fc5b4dd0be10-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00018-9d74a51b-b800-4e4d-a258-738e585a78a5-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00018-9d74a51b-b800-4e4d-a258-738e585a78a5-c000.snappy.parquet new file mode 100644 index 00000000000..52bd23e20c4 Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00018-9d74a51b-b800-4e4d-a258-738e585a78a5-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-large/part-00019-a9bb3ce8-afba-47ec-8451-13edcd855b15-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-large/part-00019-a9bb3ce8-afba-47ec-8451-13edcd855b15-c000.snappy.parquet new file mode 100644 index 00000000000..9ba4278e875 Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-large/part-00019-a9bb3ce8-afba-47ec-8451-13edcd855b15-c000.snappy.parquet differ diff --git a/core/src/test/resources/delta/table-with-dv-small/_delta_log/00000000000000000000.crc b/core/src/test/resources/delta/table-with-dv-small/_delta_log/00000000000000000000.crc new file mode 100644 index 00000000000..d9aac2aa2da --- /dev/null +++ b/core/src/test/resources/delta/table-with-dv-small/_delta_log/00000000000000000000.crc @@ -0,0 +1 @@ +{"txnId":"d54c00f5-9500-4ed5-b1b5-9f463861f4d3","tableSizeBytes":818,"numFiles":1,"numDeletedRecordsOpt":0,"numDeletionVectorsOpt":0,"numMetadata":1,"numProtocol":1,"setTransactions":[],"metadata":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":1,\"delta.columnMapping.physicalName\":\"col-4f064e48-f371-433a-b851-9e73c78fa9fc\"}}]}","partitionColumns":[],"configuration":{"delta.columnMapping.mode":"name","delta.enableDeletionVectors":"true","delta.columnMapping.maxColumnId":"1"},"createdTime":1673461406485},"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors","columnMapping"],"writerFeatures":["deletionVectors","columnMapping"]},"histogramOpt":{"sortedBinBoundaries":[0,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,12582912,16777216,20971520,25165824,29360128,33554432,37748736,41943040,50331648,58720256,67108864,75497472,83886080,92274688,100663296,109051904,117440512,125829120,130023424,134217728,138412032,142606336,146800640,150994944,167772160,184549376,201326592,218103808,234881024,251658240,268435456,285212672,301989888,318767104,335544320,352321536,369098752,385875968,402653184,419430400,436207616,452984832,469762048,486539264,503316480,520093696,536870912,553648128,570425344,587202560,603979776,671088640,738197504,805306368,872415232,939524096,1006632960,1073741824,1140850688,1207959552,1275068416,1342177280,1409286144,1476395008,1610612736,1744830464,1879048192,2013265920,2147483648,2415919104,2684354560,2952790016,3221225472,3489660928,3758096384,4026531840,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944],"fileCounts":[1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"totalBytes":[818,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]},"deletedRecordCountsHistogramOpt":{"deletedRecordCounts":[1,0,0,0,0,0,0,0,0,0]},"allFiles":[{"path":"r4/part-00000-5521fc5e-6e49-4437-8b2d-ce6a1a94a34a-c000.snappy.parquet","partitionValues":{},"size":818,"modificationTime":1673461408778,"dataChange":false,"stats":"{\"numRecords\":10,\"minValues\":{\"col-4f064e48-f371-433a-b851-9e73c78fa9fc\":0},\"maxValues\":{\"col-4f064e48-f371-433a-b851-9e73c78fa9fc\":9},\"nullCount\":{\"col-4f064e48-f371-433a-b851-9e73c78fa9fc\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1673461408778000","MIN_INSERTION_TIME":"1673461408778000","MAX_INSERTION_TIME":"1673461408778000","OPTIMIZE_TARGET_SIZE":"268435456"}}]} diff --git a/core/src/test/resources/delta/table-with-dv-small/_delta_log/00000000000000000000.json b/core/src/test/resources/delta/table-with-dv-small/_delta_log/00000000000000000000.json new file mode 100644 index 00000000000..94de42620d4 --- /dev/null +++ b/core/src/test/resources/delta/table-with-dv-small/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"commitInfo":{"timestamp":1673461409137,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"10","numOutputBytes":"818"},"engineInfo":"Databricks-Runtime/","txnId":"d54c00f5-9500-4ed5-b1b5-9f463861f4d3"}} +{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors","columnMapping"],"writerFeatures":["deletionVectors","columnMapping"]}} +{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":1,\"delta.columnMapping.physicalName\":\"col-4f064e48-f371-433a-b851-9e73c78fa9fc\"}}]}","partitionColumns":[],"configuration":{"delta.columnMapping.mode":"name","delta.enableDeletionVectors":"true","delta.columnMapping.maxColumnId":"1"},"createdTime":1673461406485}} +{"add":{"path":"r4/part-00000-5521fc5e-6e49-4437-8b2d-ce6a1a94a34a-c000.snappy.parquet","partitionValues":{},"size":818,"modificationTime":1673461408778,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"col-4f064e48-f371-433a-b851-9e73c78fa9fc\":0},\"maxValues\":{\"col-4f064e48-f371-433a-b851-9e73c78fa9fc\":9},\"nullCount\":{\"col-4f064e48-f371-433a-b851-9e73c78fa9fc\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1673461408778000","MIN_INSERTION_TIME":"1673461408778000","MAX_INSERTION_TIME":"1673461408778000","OPTIMIZE_TARGET_SIZE":"268435456"}}} diff --git a/core/src/test/resources/delta/table-with-dv-small/_delta_log/00000000000000000001.crc b/core/src/test/resources/delta/table-with-dv-small/_delta_log/00000000000000000001.crc new file mode 100644 index 00000000000..127032042dc --- /dev/null +++ b/core/src/test/resources/delta/table-with-dv-small/_delta_log/00000000000000000001.crc @@ -0,0 +1 @@ +{"txnId":"3943baa4-30a0-44a4-a4f4-e5e92d2ab08b","tableSizeBytes":818,"numFiles":1,"numDeletedRecordsOpt":2,"numDeletionVectorsOpt":1,"numMetadata":1,"numProtocol":1,"setTransactions":[],"metadata":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":1,\"delta.columnMapping.physicalName\":\"col-4f064e48-f371-433a-b851-9e73c78fa9fc\"}}]}","partitionColumns":[],"configuration":{"delta.columnMapping.mode":"name","delta.enableDeletionVectors":"true","delta.columnMapping.maxColumnId":"1"},"createdTime":1673461406485},"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors","columnMapping"],"writerFeatures":["deletionVectors","columnMapping"]},"histogramOpt":{"sortedBinBoundaries":[0,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,12582912,16777216,20971520,25165824,29360128,33554432,37748736,41943040,50331648,58720256,67108864,75497472,83886080,92274688,100663296,109051904,117440512,125829120,130023424,134217728,138412032,142606336,146800640,150994944,167772160,184549376,201326592,218103808,234881024,251658240,268435456,285212672,301989888,318767104,335544320,352321536,369098752,385875968,402653184,419430400,436207616,452984832,469762048,486539264,503316480,520093696,536870912,553648128,570425344,587202560,603979776,671088640,738197504,805306368,872415232,939524096,1006632960,1073741824,1140850688,1207959552,1275068416,1342177280,1409286144,1476395008,1610612736,1744830464,1879048192,2013265920,2147483648,2415919104,2684354560,2952790016,3221225472,3489660928,3758096384,4026531840,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944],"fileCounts":[1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"totalBytes":[818,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]},"deletedRecordCountsHistogramOpt":{"deletedRecordCounts":[0,1,0,0,0,0,0,0,0,0]},"allFiles":[{"path":"r4/part-00000-5521fc5e-6e49-4437-8b2d-ce6a1a94a34a-c000.snappy.parquet","partitionValues":{},"size":818,"modificationTime":1673461408778,"dataChange":false,"stats":"{\"numRecords\":10,\"minValues\":{\"col-4f064e48-f371-433a-b851-9e73c78fa9fc\":0},\"maxValues\":{\"col-4f064e48-f371-433a-b851-9e73c78fa9fc\":9},\"nullCount\":{\"col-4f064e48-f371-433a-b851-9e73c78fa9fc\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1673461408778000","MIN_INSERTION_TIME":"1673461408778000","MAX_INSERTION_TIME":"1673461408778000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"WYbkwCTB$gH)J7t?$/sK","offset":1,"sizeInBytes":36,"cardinality":2}}]} diff --git a/core/src/test/resources/delta/table-with-dv-small/_delta_log/00000000000000000001.json b/core/src/test/resources/delta/table-with-dv-small/_delta_log/00000000000000000001.json new file mode 100644 index 00000000000..392d28d6a33 --- /dev/null +++ b/core/src/test/resources/delta/table-with-dv-small/_delta_log/00000000000000000001.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1673461427387,"operation":"DELETE","operationParameters":{"predicate":"[\"(spark_catalog.delta.`/private/var/folders/g3/hcd28y8s71s0yh7whh443wz00000gp/T/spark-cb573b98-e75d-460f-9769-efd9e9bfeffc`.value IN (0, 9))\"]"},"readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"0","numCopiedRows":"0","numDeletionVectorsAdded":"1","numDeletionVectorsRemoved":"0","numAddedChangeFiles":"0","executionTimeMs":"11114","numDeletedRows":"2","scanTimeMs":"10589","numAddedFiles":"0","rewriteTimeMs":"508"},"engineInfo":"Databricks-Runtime/","txnId":"3943baa4-30a0-44a4-a4f4-e5e92d2ab08b"}} +{"remove":{"path":"r4/part-00000-5521fc5e-6e49-4437-8b2d-ce6a1a94a34a-c000.snappy.parquet","deletionTimestamp":1673461427383,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":818,"tags":{"INSERTION_TIME":"1673461408778000","MIN_INSERTION_TIME":"1673461408778000","MAX_INSERTION_TIME":"1673461408778000","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"r4/part-00000-5521fc5e-6e49-4437-8b2d-ce6a1a94a34a-c000.snappy.parquet","partitionValues":{},"size":818,"modificationTime":1673461408778,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"col-4f064e48-f371-433a-b851-9e73c78fa9fc\":0},\"maxValues\":{\"col-4f064e48-f371-433a-b851-9e73c78fa9fc\":9},\"nullCount\":{\"col-4f064e48-f371-433a-b851-9e73c78fa9fc\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1673461408778000","MIN_INSERTION_TIME":"1673461408778000","MAX_INSERTION_TIME":"1673461408778000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"WYbkwCTB$gH)J7t?$/sK","offset":1,"sizeInBytes":36,"cardinality":2}}} diff --git a/core/src/test/resources/delta/table-with-dv-small/deletion_vector_b6a98cdd-7843-470d-8897-708cdffa38c5.bin b/core/src/test/resources/delta/table-with-dv-small/deletion_vector_b6a98cdd-7843-470d-8897-708cdffa38c5.bin new file mode 100644 index 00000000000..f1a01e661cd Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-small/deletion_vector_b6a98cdd-7843-470d-8897-708cdffa38c5.bin differ diff --git a/core/src/test/resources/delta/table-with-dv-small/r4/part-00000-5521fc5e-6e49-4437-8b2d-ce6a1a94a34a-c000.snappy.parquet b/core/src/test/resources/delta/table-with-dv-small/r4/part-00000-5521fc5e-6e49-4437-8b2d-ce6a1a94a34a-c000.snappy.parquet new file mode 100644 index 00000000000..29adffe4f0e Binary files /dev/null and b/core/src/test/resources/delta/table-with-dv-small/r4/part-00000-5521fc5e-6e49-4437-8b2d-ce6a1a94a34a-c000.snappy.parquet differ diff --git a/core/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala index c44c39c4f97..ba1a2571f71 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala @@ -226,6 +226,112 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta expectedJson = """{"remove":{"path":"part=p1/f1","deletionTimestamp":11,"dataChange":true,""" + """"extendedFileMetadata":true,"partitionValues":{"x":"2"},"size":10}}""".stripMargin) + private def deletionVectorWithRelativePath: DeletionVectorDescriptor = + DeletionVectorDescriptor.onDiskWithRelativePath( + id = UUID.randomUUID(), + randomPrefix = "a1", + sizeInBytes = 10, + cardinality = 2, + offset = Some(10)) + + private def deletionVectorWithAbsolutePath: DeletionVectorDescriptor = + DeletionVectorDescriptor.onDiskWithAbsolutePath( + path = "/test.dv", + sizeInBytes = 10, + cardinality = 2, + offset = Some(10)) + + private def deletionVectorInline: DeletionVectorDescriptor = + DeletionVectorDescriptor.inlineInLog(Array(1, 2, 3, 4), 1) + + roundTripCompare("Add with deletion vector - relative path", + AddFile( + path = "test", + partitionValues = Map.empty, + size = 1, + modificationTime = 1, + dataChange = true, + tags = Map.empty, + deletionVector = deletionVectorWithRelativePath)) + roundTripCompare("Add with deletion vector - absolute path", + AddFile( + path = "test", + partitionValues = Map.empty, + size = 1, + modificationTime = 1, + dataChange = true, + tags = Map.empty, + deletionVector = deletionVectorWithAbsolutePath)) + roundTripCompare("Add with deletion vector - inline", + AddFile( + path = "test", + partitionValues = Map.empty, + size = 1, + modificationTime = 1, + dataChange = true, + tags = Map.empty, + deletionVector = deletionVectorInline)) + + roundTripCompare("Remove with deletion vector - relative path", + RemoveFile( + path = "test", + deletionTimestamp = Some(1L), + extendedFileMetadata = Some(true), + partitionValues = Map.empty, + dataChange = true, + size = Some(1L), + tags = Map.empty, + deletionVector = deletionVectorWithRelativePath)) + roundTripCompare("Remove with deletion vector - absolute path", + RemoveFile( + path = "test", + deletionTimestamp = Some(1L), + extendedFileMetadata = Some(true), + partitionValues = Map.empty, + dataChange = true, + size = Some(1L), + tags = Map.empty, + deletionVector = deletionVectorWithAbsolutePath)) + roundTripCompare("Remove with deletion vector - inline", + RemoveFile( + path = "test", + deletionTimestamp = Some(1L), + extendedFileMetadata = Some(true), + partitionValues = Map.empty, + dataChange = true, + size = Some(1L), + tags = Map.empty, + deletionVector = deletionVectorInline)) + + // These make sure we don't accidentally serialise something we didn't mean to. + testActionSerDe( + name = "AddFile (with deletion vector) - json serialization/deserialization", + action = AddFile( + path = "test", + partitionValues = Map.empty, + size = 1, + modificationTime = 1, + dataChange = true, + tags = Map.empty, + deletionVector = deletionVectorWithAbsolutePath), + expectedJson = + """ + |{"add":{ + |"path":"test", + |"partitionValues":{}, + |"size":1, + |"modificationTime":1, + |"dataChange":true, + |"tags":{}, + |"deletionVector":{ + |"storageType":"p", + |"pathOrInlineDv":"/test.dv", + |"offset":10, + |"sizeInBytes":10, + |"cardinality":2}} + |}""".stripMargin.replaceAll("\n", "") + ) + testActionSerDe( "AddCDCFile (without tags) - json serialization/deserialization", diff --git a/core/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala index 23e095b4f5a..6bae748fd22 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala @@ -214,13 +214,14 @@ class CheckpointsSuite extends QueryTest test("checkpoint does not contain remove.tags and remove.numRecords") { withTempDir { tempDir => - var expectedRemoveFileSchema = Seq( + val expectedRemoveFileSchema = Seq( "path", "deletionTimestamp", "dataChange", "extendedFileMetadata", "partitionValues", - "size") + "size", + "deletionVector") val tablePath = tempDir.getAbsolutePath // Append rows [0, 9] to table and merge tablePath. spark.range(end = 10).write.format("delta").mode("overwrite").save(tablePath) diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala index 61f389cbd31..044987f24c4 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala @@ -64,7 +64,7 @@ class DeltaParquetFileFormatSuite extends QueryTest val fs = addFilePath.getFileSystem(hadoopConf) val broadcastDvMap = spark.sparkContext.broadcast( - Map(fs.getFileStatus(addFilePath).getPath().toString() -> dv) + Map(fs.getFileStatus(addFilePath).getPath().toUri -> dv) ) val broadcastHadoopConf = spark.sparkContext.broadcast( @@ -73,7 +73,7 @@ class DeltaParquetFileFormatSuite extends QueryTest val deltaParquetFormat = new DeltaParquetFileFormat( metadata, isSplittable = false, - disablePushDowns = false, + disablePushDowns = true, Some(tablePath), Some(broadcastDvMap), Some(broadcastHadoopConf)) diff --git a/core/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala new file mode 100644 index 00000000000..bacc482f51a --- /dev/null +++ b/core/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala @@ -0,0 +1,197 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.deletionvectors + +import java.io.File + +import org.apache.spark.sql.delta.{DeltaLog, DeltaTestUtilsForTempViews} +import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor.EMPTY +import org.apache.spark.sql.delta.deletionvectors.DeletionVectorsSuite._ +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.delta.util.JsonUtils +import com.fasterxml.jackson.databind.node.ObjectNode +import org.apache.commons.io.FileUtils + +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Subquery} +import org.apache.spark.sql.test.SharedSparkSession + +class DeletionVectorsSuite extends QueryTest + with SharedSparkSession + with DeltaSQLCommandTest + with DeltaTestUtilsForTempViews { + import testImplicits._ + + test(s"read Delta table with deletion vectors") { + def verifyVersion(version: Int, expectedData: Seq[Int]): Unit = { + checkAnswer( + spark.read.format("delta").option("versionAsOf", version.toString).load(table1Path), + expectedData.toDF()) + } + // Verify all versions of the table + verifyVersion(0, expectedTable1DataV0) + verifyVersion(1, expectedTable1DataV1) + verifyVersion(2, expectedTable1DataV2) + verifyVersion(3, expectedTable1DataV3) + verifyVersion(4, expectedTable1DataV4) + } + + test("throw error when non-pinned TahoeFileIndex snapshot is used") { + // Corner case where we still have non-pinned TahoeFileIndex when data skipping is disabled + withSQLConf(DeltaSQLConf.DELTA_STATS_SKIPPING.key -> "false") { + def assertError(dataFrame: DataFrame): Unit = { + val ex = intercept[IllegalArgumentException] { + dataFrame.collect() + } + assert(ex.getMessage contains + "Cannot work with a non-pinned table snapshot of the TahoeFileIndex") + } + assertError(spark.read.format("delta").load(table1Path)) + // assertError(spark.read.format("delta").option("versionAsOf", "2").load(table1Path)) + } + } + + test("read Delta table with deletion vectors with a filter") { + checkAnswer( + spark.read.format("delta").load(table1Path).where("value in (300, 787, 239)"), + // 300 is removed in the final table + Seq(787, 239).toDF()) + } + + test("read Delta table with DV for a select files") { + val deltaLog = DeltaLog.forTable(spark, table1Path) + val snapshot = deltaLog.unsafeVolatileSnapshot + + // Select a subset of files with DVs and specific value range, this is just to test + // that reading these files will respect the DVs + var rowCount = 0L + var deletedRowCount = 0L + val selectFiles = snapshot.allFiles.collect().filter( + addFile => { + val stats = JsonUtils.mapper.readTree(addFile.stats).asInstanceOf[ObjectNode] + // rowCount += stats.get("rowCount") + val min = stats.get("minValues").get("value").toString + val max = stats.get("maxValues").get("value").toString + val selected = (min == "18" && max == "1988") || + (min == "33" && max == "1995") || (min == "13" && max == "1897") + // TODO: these steps will be easier and also change (depending upon tightBounds value) once + // we expose more methods on AddFile as part of the data skipping changes with DVs + if (selected) { + rowCount += stats.get("numRecords").asInt(0) + deletedRowCount += Option(addFile.deletionVector).getOrElse(EMPTY).cardinality + } + selected + } + ).toSeq + assert(selectFiles.filter(_.deletionVector != null).size > 1) // make at least one file has DV + + assert(deltaLog.createDataFrame(snapshot, selectFiles).count() == rowCount - deletedRowCount) + } + + for (optimizeMetadataQuery <- BOOLEAN_DOMAIN) + test("read Delta tables with DVs in subqueries: " + + s"metadataQueryOptimizationEnabled=$optimizeMetadataQuery") { + withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED.key -> + optimizeMetadataQuery.toString) { + val table1 = s"delta.`${new File(table1Path).getAbsolutePath}`" + val table2 = s"delta.`${new File(table2Path).getAbsolutePath}`" + + def assertQueryResult(query: String, expected1: Int, expected2: Int): Unit = { + val df = spark.sql(query) + assertPlanContains(df, Subquery.getClass.getSimpleName.stripSuffix("$")) + val actual = df.collect()(0) // fetch only row in the result + assert(actual === Row(expected1, expected2)) + } + + // same table used twice in the query + val query1 = s"SELECT (SELECT COUNT(*) FROM $table1), (SELECT COUNT(*) FROM $table1)" + assertQueryResult(query1, expectedTable1DataV4.size, expectedTable1DataV4.size) + + // two tables used in the query + val query2 = s"SELECT (SELECT COUNT(*) FROM $table1), (SELECT COUNT(*) FROM $table2)" + assertQueryResult(query2, expectedTable1DataV4.size, expectedTable2DataV1.size) + } + } + + test("insert into Delta table with DVs") { + withTempDir { tempDir => + val source1 = new File(table1Path) + val source2 = new File(table2Path) + val target = new File(tempDir, "insertTest") + + // Copy the source2 DV table to a temporary directory + FileUtils.copyDirectory(source1, target) + + // Insert data from source2 into source1 (copied to target) + // This blind append generates a plan with `V2WriteCommand` which is a corner + // case in `PrepareDeltaScan` rule + val insertDf = spark.sql(s"INSERT INTO TABLE delta.`${target.getAbsolutePath}` " + + s"SELECT * FROM delta.`${source2.getAbsolutePath}`") + // [[AppendData]] is one of the [[V2WriteCommand]] subtypes + assertPlanContains(insertDf, AppendData.getClass.getSimpleName.stripSuffix("$")) + + val dataInTarget = spark.sql(s"SELECT * FROM delta.`${target.getAbsolutePath}`") + + // Make sure the number of rows is correct. + for (metadataQueryOptimization <- BOOLEAN_DOMAIN) { + withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED.key -> + metadataQueryOptimization.toString) { + assert(dataInTarget.count() == expectedTable2DataV1.size + expectedTable1DataV4.size) + } + } + + // Make sure the contents are the same + checkAnswer( + dataInTarget, + spark.sql( + s"SELECT * FROM delta.`${source1.getAbsolutePath}` UNION ALL " + + s"SELECT * FROM delta.`${source2.getAbsolutePath}`") + ) + } + } + + private def assertPlanContains(queryDf: DataFrame, expected: String): Unit = { + val optimizedPlan = queryDf.queryExecution.analyzed.toString() + assert(optimizedPlan.contains(expected), s"Plan is missing `$expected`: $optimizedPlan") + } +} + +object DeletionVectorsSuite { + val table1Path = "src/test/resources/delta/table-with-dv-large" + // Table at version 0: contains [0, 2000) + val expectedTable1DataV0 = Seq.range(0, 2000) + // Table at version 1: removes rows with id = 0, 180, 300, 700, 1800 + val v1Removed = Set(0, 180, 300, 700, 1800) + val expectedTable1DataV1 = expectedTable1DataV0.filterNot(e => v1Removed.contains(e)) + // Table at version 2: inserts rows with id = 300, 700 + val v2Added = Set(300, 700) + val expectedTable1DataV2 = expectedTable1DataV1 ++ v2Added + // Table at version 3: removes rows with id = 300, 250, 350, 900, 1353, 1567, 1800 + val v3Removed = Set(300, 250, 350, 900, 1353, 1567, 1800) + val expectedTable1DataV3 = expectedTable1DataV2.filterNot(e => v3Removed.contains(e)) + // Table at version 4: inserts rows with id = 900, 1567 + val v4Added = Set(900, 1567) + val expectedTable1DataV4 = expectedTable1DataV3 ++ v4Added + + val table2Path = "src/test/resources/delta/table-with-dv-small" + // Table at version 0: contains 0 - 9 + val expectedTable2DataV0 = Seq(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) + // Table at version 1: removes rows 0 and 9 + val expectedTable2DataV1 = Seq(1, 2, 3, 4, 5, 6, 7, 8) +}