diff --git a/PROTOCOL.md b/PROTOCOL.md index ee5b7700bc2..91d5a0f0caa 100644 --- a/PROTOCOL.md +++ b/PROTOCOL.md @@ -217,7 +217,7 @@ their content, e.g. `"hello%20world"` * Connect path segments by `+`, e.g. `"b"+"y"` * Connect path and value pairs by `=`, e.g. `"b"+"y"=null` * Sort canonicalized path/value pairs using a byte-order sort on paths. The byte-order sort can be done by converting paths to byte array using UTF-8 charset\ - and then comparing them, e.g. `"a" < "b"."x" < "b"."y"` + and then comparing them, e.g. `"a" < "b"+"x" < "b"+"y"` * Separate ordered pairs by `,`, e.g. `"a"=10,"b"+"x"="https%3A%2F%2Fdelta.io","b"+"y"=null` 5. Array values (e.g. `[null, "hi ho", 2.71]`) are canonicalized as if they were objects, except the "name" has numeric type instead of string type, and gives the (0-based) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala index 92fcae0e85a..2e26a161bb1 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala @@ -53,7 +53,7 @@ class DeltaParquetFileFormat( val broadcastHadoopConf: Option[Broadcast[SerializableConfiguration]] = None) extends ParquetFileFormat { // Validate either we have all arguments for DV enabled read or none of them. - require(!(broadcastHadoopConf.isDefined ^ broadcastDvMap.isDefined ^ tablePath .isDefined ^ + require(!(broadcastHadoopConf.isDefined ^ broadcastDvMap.isDefined ^ tablePath.isDefined ^ !isSplittable ^ disablePushDowns)) val columnMappingMode: DeltaColumnMappingMode = metadata.columnMappingMode @@ -185,7 +185,7 @@ class DeltaParquetFileFormat( val newIter = iterator.map { row => val newRow = row match { - case batch: ColumnarBatch => + case batch: ColumnarBatch => // When vectorized Parquet reader is enabled val size = batch.numRows() // Create a new vector for the `is row deleted` column. We can't use the one from Parquet // reader as it set the [[WritableColumnVector.isAllNulls]] to true and it can't be reset @@ -210,7 +210,7 @@ class DeltaParquetFileFormat( } newBatch - case rest: InternalRow => + case rest: InternalRow => // When vectorized Parquet reader is disabled // Temporary vector variable used to get DV values from RowIndexFilter // Currently the RowIndexFilter only supports writing into a columnar vector // and doesn't have methods to get DV value for a specific row index. diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala index ac5e6ae1f22..39044120e52 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala @@ -448,10 +448,15 @@ case class MergeIntoCommand( 1 }.asNondeterministic() - // Skip data based on the merge condition - val targetOnlyPredicates = - splitConjunctivePredicates(condition).filter(_.references.subsetOf(target.outputSet)) - val dataSkippedFiles = deltaTxn.filterFiles(targetOnlyPredicates) + // Prune non-matching files if we don't need to collect them for NOT MATCHED BY SOURCE clauses. + val dataSkippedFiles = + if (notMatchedBySourceClauses.isEmpty) { + val targetOnlyPredicates = + splitConjunctivePredicates(condition).filter(_.references.subsetOf(target.outputSet)) + deltaTxn.filterFiles(targetOnlyPredicates) + } else { + deltaTxn.filterFiles() + } // UDF to increment metrics val incrSourceRowCountExpr = makeMetricUpdateUDF("numSourceRows") diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala index 209595f03d6..f0dd09f046f 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala @@ -223,7 +223,7 @@ case class WriteIntoDelta( CharVarcharUtils.replaceCharVarcharWithStringInSchema( replaceCharWithVarchar(CharVarcharUtils.getRawSchema(data.schema)).asInstanceOf[StructType]) } - var finalSchema = schemaInCatalog.getOrElse(dataSchema) + val finalSchema = schemaInCatalog.getOrElse(dataSchema) updateMetadata(data.sparkSession, txn, finalSchema, partitionColumns, configuration, isOverwriteOperation, rearrangeOnly) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala b/core/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala index 41d0c5c8890..2cd32fd17ec 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala @@ -60,7 +60,8 @@ trait ImplicitMetadataOperation extends DeltaLogging { // To support the new column mapping mode, we drop existing metadata on data schema // so that all the column mapping related properties can be reinitialized in // OptimisticTransaction.updateMetadata - val dataSchema = DeltaColumnMapping.dropColumnMappingMetadata(schema.asNullable) + val dataSchema = + DeltaColumnMapping.dropColumnMappingMetadata(schema.asNullable) val mergedSchema = mergeSchema(txn, dataSchema, isOverwriteMode, canOverwriteSchema) val normalizedPartitionCols = normalizePartitionColumns(spark, partitionColumns, dataSchema) diff --git a/core/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala index d526714e40f..c44c39c4f97 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/ActionSerializerSuite.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.QueryTest import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils // scalastyle:off: removeFile class ActionSerializerSuite extends QueryTest with SharedSparkSession with DeltaSQLCommandTest { @@ -367,6 +368,7 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta expectedJson: String, extraSettings: Seq[(String, String)] = Seq.empty, testTags: Seq[org.scalatest.Tag] = Seq.empty): Unit = { + import org.apache.spark.sql.delta.test.DeltaTestImplicits._ test(name, testTags: _*) { withTempDir { tempDir => val deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getAbsolutePath)) @@ -383,7 +385,7 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta val protocol = Protocol( minReaderVersion = spark.conf.get(DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_READER_VERSION), minWriterVersion = spark.conf.get(DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION)) - deltaLog.startTransaction().commit(Seq(protocol, Metadata()), ManualUpdate) + deltaLog.startTransaction().commitManually(protocol, Metadata()) // Commit the actual action. val version = deltaLog.startTransaction().commit(Seq(action), ManualUpdate) diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala index 6918fb80fa1..2ce455c6f7f 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala @@ -36,62 +36,68 @@ class DeltaParquetFileFormatSuite extends QueryTest with SharedSparkSession with DeltaSQLCommandTest { import testImplicits._ - test("Read with DV") { - withTempDir { tempDir => - val tablePath = tempDir.toString - - // Generate a table with one parquet file containing multiple row groups. - generateData(tablePath) - - val deltaLog = DeltaLog.forTable(spark, tempDir) - val metadata = deltaLog.snapshot.metadata - - // Add additional field that has the deleted row flag to existing data schema - val readingSchema = metadata.schema.add(DeltaParquetFileFormat.IS_ROW_DELETED_STRUCT_FIELD) - - val addFilePath = new Path( - tempDir.toString, - deltaLog.snapshot.allFiles.collect()(0).path) - assertParquetHasMultipleRowGroups(addFilePath) - - val dv = generateDV(tablePath, 0, 200, 300, 756, 10352, 19999) - - val fs = addFilePath.getFileSystem(hadoopConf) - val broadcastDvMap = spark.sparkContext.broadcast( - Map(fs.getFileStatus(addFilePath).getPath().toString() -> dv) - ) - - val broadcastHadoopConf = spark.sparkContext.broadcast( - new SerializableConfiguration(hadoopConf)) - - val deltaParquetFormat = new DeltaParquetFileFormat( - metadata, - isSplittable = false, - disablePushDowns = false, - Some(tablePath), - Some(broadcastDvMap), - Some(broadcastHadoopConf)) - - val fileIndex = DeltaLogFileIndex(deltaParquetFormat, fs, addFilePath :: Nil) - - val relation = HadoopFsRelation( - fileIndex, - fileIndex.partitionSchema, - readingSchema, - bucketSpec = None, - deltaParquetFormat, - options = Map.empty)(spark) - val plan = LogicalRelation(relation) - - // Select some rows that are deleted and some rows not deleted - // Deleted row `value`: 0, 200, 300, 756, 10352, 19999 - // Not deleted row `value`: 7, 900 - checkDatasetUnorderly( - Dataset.ofRows(spark, plan) - .filter("value in (0, 7, 200, 300, 756, 900, 10352, 19999)") - .as[(Int, Int)], - (0, 1), (7, 0), (200, 1), (300, 1), (756, 1), (900, 0), (10352, 1), (19999, 1) - ) + // Read with deletion vectors has separate code paths based on vectorized Parquet + // reader is enabled or not. Test both the combinations + for (enableVectorizedParquetReader <- Seq("true", "false")) { + test(s"read with DVs (vectorized Parquet reader enabled=$enableVectorizedParquetReader)") { + withTempDir { tempDir => + spark.conf.set("spark.sql.parquet.enableVectorizedReader", enableVectorizedParquetReader) + + val tablePath = tempDir.toString + + // Generate a table with one parquet file containing multiple row groups. + generateData(tablePath) + + val deltaLog = DeltaLog.forTable(spark, tempDir) + val metadata = deltaLog.snapshot.metadata + + // Add additional field that has the deleted row flag to existing data schema + val readingSchema = metadata.schema.add(DeltaParquetFileFormat.IS_ROW_DELETED_STRUCT_FIELD) + + val addFilePath = new Path( + tempDir.toString, + deltaLog.snapshot.allFiles.collect()(0).path) + assertParquetHasMultipleRowGroups(addFilePath) + + val dv = generateDV(tablePath, 0, 200, 300, 756, 10352, 19999) + + val fs = addFilePath.getFileSystem(hadoopConf) + val broadcastDvMap = spark.sparkContext.broadcast( + Map(fs.getFileStatus(addFilePath).getPath().toString() -> dv) + ) + + val broadcastHadoopConf = spark.sparkContext.broadcast( + new SerializableConfiguration(hadoopConf)) + + val deltaParquetFormat = new DeltaParquetFileFormat( + metadata, + isSplittable = false, + disablePushDowns = false, + Some(tablePath), + Some(broadcastDvMap), + Some(broadcastHadoopConf)) + + val fileIndex = DeltaLogFileIndex(deltaParquetFormat, fs, addFilePath :: Nil) + + val relation = HadoopFsRelation( + fileIndex, + fileIndex.partitionSchema, + readingSchema, + bucketSpec = None, + deltaParquetFormat, + options = Map.empty)(spark) + val plan = LogicalRelation(relation) + + // Select some rows that are deleted and some rows not deleted + // Deleted row `value`: 0, 200, 300, 756, 10352, 19999 + // Not deleted row `value`: 7, 900 + checkDatasetUnorderly( + Dataset.ofRows(spark, plan) + .filter("value in (0, 7, 200, 300, 756, 900, 10352, 19999)") + .as[(Int, Int)], + (0, 1), (7, 0), (200, 1), (300, 1), (756, 1), (900, 0), (10352, 1), (19999, 1) + ) + } } } diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaDetailSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaDetailSuite.scala index 784e9e65d14..c867ad81ecc 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaDetailSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaDetailSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta import java.io.File import java.io.FileNotFoundException + // scalastyle:off import.ordering.noEmptyLine import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.{TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION} import org.apache.spark.sql.delta.test.DeltaSQLCommandTest diff --git a/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoNotMatchedBySourceSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoNotMatchedBySourceSuite.scala index 7056c15a043..f850648f850 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoNotMatchedBySourceSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/MergeIntoNotMatchedBySourceSuite.scala @@ -266,6 +266,42 @@ trait MergeIntoNotMatchedBySourceSuite extends MergeIntoSuiteBase { ), cdc = Seq((2, 20, "update_preimage"), (2, 21, "update_postimage"))) + testExtendedMergeWithCDC("not matched by source update and delete with skipping")( + source = (0, 0) :: (1, 1) :: (2, 2) :: (5, 5) :: Nil, + target = (1, 10) :: (2, 20) :: (4, 40) :: (5, 50) :: Nil, + mergeOn = "s.key = t.key and t.key > 4", + updateNotMatched(condition = "t.key = 1", set = "t.value = t.value + 1"), + deleteNotMatched(condition = "t.key = 4"))( + result = Seq( + (1, 11), // Not matched by source based on merge condition, updated + (2, 20), // Not matched by source based on merge condition, no change + // (4, 40), Not matched by source, deleted + (5, 50) // Matched, no change + ), + cdc = Seq( + (1, 10, "update_preimage"), + (1, 11, "update_postimage"), + (4, 40, "delete"))) + + testExtendedMergeWithCDC( + "matched delete and not matched by source update with skipping")( + source = (0, 0) :: (1, 1) :: (2, 2) :: (5, 5) :: (6, 6) :: Nil, + target = (1, 10) :: (2, 20) :: (4, 40) :: (5, 50) :: (6, 60) :: Nil, + mergeOn = "s.key = t.key and t.key > 4", + delete(condition = "t.key = 5"), + updateNotMatched(condition = "t.key = 1", set = "t.value = t.value + 1"))( + result = Seq( + (1, 11), // Not matched by source based on merge condition, updated + (2, 20), // Not matched by source based on merge condition, no change + (4, 40), // Not matched by source, no change + // (5, 50), Matched, deleted + (6, 60) // Matched, no change + ), + cdc = Seq( + (1, 10, "update_preimage"), + (1, 11, "update_postimage"), + (5, 50, "delete"))) + testExtendedMergeWithCDC("not matched by source update + delete clauses")( source = (0, 0) :: (1, 1) :: (5, 5) :: Nil, target = (1, 10) :: (2, 20) :: (7, 70) :: Nil, diff --git a/core/src/test/scala/org/apache/spark/sql/delta/UpdateMetricsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/UpdateMetricsSuite.scala index cf1628f8bfe..0d1213f5893 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/UpdateMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/UpdateMetricsSuite.scala @@ -21,6 +21,7 @@ import com.databricks.spark.util.DatabricksLogging import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest + import org.apache.spark.sql.{Dataset, QueryTest} import org.apache.spark.sql.functions.expr import org.apache.spark.sql.test.SharedSparkSession diff --git a/core/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala b/core/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala index 2bdf899a78e..4b2e5790c64 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala @@ -1463,6 +1463,26 @@ trait DataSkippingDeltaTestsBase extends QueryTest } } + test("Data skipping should always return files from latest commit version") { + withTempDir { dir => + // If this test is flacky it is broken + Seq("aaa").toDF().write.format("delta").save(dir.getCanonicalPath) + val (log, snapshot) = DeltaLog.forTableWithSnapshot(spark, dir.getPath) + val addFile = snapshot.allFiles.collect().head + val fileWithStat = snapshot.getSpecificFilesWithStats(Seq(addFile.path)).head + // Ensure the stats has actual stats, not {} + assert(fileWithStat.stats.size > 2) + log.startTransaction().commitManually(addFile.copy(stats = "{}")) + + // Delta dedup should always keep AddFile from newer version so + // getSpecificFilesWithStats should return the AddFile with empty stats + log.update() + val newfileWithStat = + log.unsafeVolatileSnapshot.getSpecificFilesWithStats(Seq(addFile.path)).head + assert(newfileWithStat.stats === "{}") + } + } + protected def expectedStatsForFile(index: Int, colName: String, deltaLog: DeltaLog): String = { s"""{"numRecords":1,"minValues":{"$colName":$index},"maxValues":{"$colName":$index},""" + s""""nullCount":{"$colName":0}}""".stripMargin diff --git a/run-tests.py b/run-tests.py index 13e3c442d1d..e1cdbe277cc 100755 --- a/run-tests.py +++ b/run-tests.py @@ -46,6 +46,13 @@ def run_sbt_tests(root_dir, coverage, scala_version=None): cmd += ["++ %s test" % scala_version] if coverage: cmd += ["coverageAggregate", "coverageOff"] + cmd += ["-v"] # show java options used + + # https://docs.oracle.com/javase/7/docs/technotes/guides/vm/G1.html + # a GC that is optimized for larger multiprocessor machines with large memory + cmd += ["-J-XX:+UseG1GC"] + # 4x the default heap size (set in delta/built.sbt) + cmd += ["-J-Xmx4G"] run_cmd(cmd, env={"DELTA_TESTING": "1"}, stream_output=True)