Skip to content

Commit

Permalink
test when vectorized reader is disabled and comments
Browse files Browse the repository at this point in the history
GitOrigin-RevId: d6204d5afcedd72f2f05da684cd001374e76b5dd
  • Loading branch information
vkorukanti committed Jan 9, 2023
1 parent 6964f4a commit 38676a5
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 67 deletions.
2 changes: 1 addition & 1 deletion PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ their content, e.g. `"hello%20world"`
* Connect path segments by `+`, e.g. `"b"+"y"`
* Connect path and value pairs by `=`, e.g. `"b"+"y"=null`
* Sort canonicalized path/value pairs using a byte-order sort on paths. The byte-order sort can be done by converting paths to byte array using UTF-8 charset\
and then comparing them, e.g. `"a" < "b"."x" < "b"."y"`
and then comparing them, e.g. `"a" < "b"+"x" < "b"+"y"`
* Separate ordered pairs by `,`, e.g. `"a"=10,"b"+"x"="https%3A%2F%2Fdelta.io","b"+"y"=null`

5. Array values (e.g. `[null, "hi ho", 2.71]`) are canonicalized as if they were objects, except the "name" has numeric type instead of string type, and gives the (0-based)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class DeltaParquetFileFormat(
val broadcastHadoopConf: Option[Broadcast[SerializableConfiguration]] = None)
extends ParquetFileFormat {
// Validate either we have all arguments for DV enabled read or none of them.
require(!(broadcastHadoopConf.isDefined ^ broadcastDvMap.isDefined ^ tablePath .isDefined ^
require(!(broadcastHadoopConf.isDefined ^ broadcastDvMap.isDefined ^ tablePath.isDefined ^
!isSplittable ^ disablePushDowns))

val columnMappingMode: DeltaColumnMappingMode = metadata.columnMappingMode
Expand Down Expand Up @@ -185,7 +185,7 @@ class DeltaParquetFileFormat(

val newIter = iterator.map { row =>
val newRow = row match {
case batch: ColumnarBatch =>
case batch: ColumnarBatch => // When vectorized Parquet reader is enabled
val size = batch.numRows()
// Create a new vector for the `is row deleted` column. We can't use the one from Parquet
// reader as it set the [[WritableColumnVector.isAllNulls]] to true and it can't be reset
Expand All @@ -210,7 +210,7 @@ class DeltaParquetFileFormat(
}
newBatch

case rest: InternalRow =>
case rest: InternalRow => // When vectorized Parquet reader is disabled
// Temporary vector variable used to get DV values from RowIndexFilter
// Currently the RowIndexFilter only supports writing into a columnar vector
// and doesn't have methods to get DV value for a specific row index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,10 +448,15 @@ case class MergeIntoCommand(
1
}.asNondeterministic()

// Skip data based on the merge condition
val targetOnlyPredicates =
splitConjunctivePredicates(condition).filter(_.references.subsetOf(target.outputSet))
val dataSkippedFiles = deltaTxn.filterFiles(targetOnlyPredicates)
// Prune non-matching files if we don't need to collect them for NOT MATCHED BY SOURCE clauses.
val dataSkippedFiles =
if (notMatchedBySourceClauses.isEmpty) {
val targetOnlyPredicates =
splitConjunctivePredicates(condition).filter(_.references.subsetOf(target.outputSet))
deltaTxn.filterFiles(targetOnlyPredicates)
} else {
deltaTxn.filterFiles()
}

// UDF to increment metrics
val incrSourceRowCountExpr = makeMetricUpdateUDF("numSourceRows")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ case class WriteIntoDelta(
CharVarcharUtils.replaceCharVarcharWithStringInSchema(
replaceCharWithVarchar(CharVarcharUtils.getRawSchema(data.schema)).asInstanceOf[StructType])
}
var finalSchema = schemaInCatalog.getOrElse(dataSchema)
val finalSchema = schemaInCatalog.getOrElse(dataSchema)
updateMetadata(data.sparkSession, txn, finalSchema,
partitionColumns, configuration, isOverwriteOperation, rearrangeOnly)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ trait ImplicitMetadataOperation extends DeltaLogging {
// To support the new column mapping mode, we drop existing metadata on data schema
// so that all the column mapping related properties can be reinitialized in
// OptimisticTransaction.updateMetadata
val dataSchema = DeltaColumnMapping.dropColumnMappingMetadata(schema.asNullable)
val dataSchema =
DeltaColumnMapping.dropColumnMappingMetadata(schema.asNullable)
val mergedSchema = mergeSchema(txn, dataSchema, isOverwriteMode, canOverwriteSchema)
val normalizedPartitionCols =
normalizePartitionColumns(spark, partitionColumns, dataSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

// scalastyle:off: removeFile
class ActionSerializerSuite extends QueryTest with SharedSparkSession with DeltaSQLCommandTest {
Expand Down Expand Up @@ -367,6 +368,7 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta
expectedJson: String,
extraSettings: Seq[(String, String)] = Seq.empty,
testTags: Seq[org.scalatest.Tag] = Seq.empty): Unit = {
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
test(name, testTags: _*) {
withTempDir { tempDir =>
val deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getAbsolutePath))
Expand All @@ -383,7 +385,7 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta
val protocol = Protocol(
minReaderVersion = spark.conf.get(DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_READER_VERSION),
minWriterVersion = spark.conf.get(DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION))
deltaLog.startTransaction().commit(Seq(protocol, Metadata()), ManualUpdate)
deltaLog.startTransaction().commitManually(protocol, Metadata())

// Commit the actual action.
val version = deltaLog.startTransaction().commit(Seq(action), ManualUpdate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,62 +36,68 @@ class DeltaParquetFileFormatSuite extends QueryTest
with SharedSparkSession with DeltaSQLCommandTest {
import testImplicits._

test("Read with DV") {
withTempDir { tempDir =>
val tablePath = tempDir.toString

// Generate a table with one parquet file containing multiple row groups.
generateData(tablePath)

val deltaLog = DeltaLog.forTable(spark, tempDir)
val metadata = deltaLog.snapshot.metadata

// Add additional field that has the deleted row flag to existing data schema
val readingSchema = metadata.schema.add(DeltaParquetFileFormat.IS_ROW_DELETED_STRUCT_FIELD)

val addFilePath = new Path(
tempDir.toString,
deltaLog.snapshot.allFiles.collect()(0).path)
assertParquetHasMultipleRowGroups(addFilePath)

val dv = generateDV(tablePath, 0, 200, 300, 756, 10352, 19999)

val fs = addFilePath.getFileSystem(hadoopConf)
val broadcastDvMap = spark.sparkContext.broadcast(
Map(fs.getFileStatus(addFilePath).getPath().toString() -> dv)
)

val broadcastHadoopConf = spark.sparkContext.broadcast(
new SerializableConfiguration(hadoopConf))

val deltaParquetFormat = new DeltaParquetFileFormat(
metadata,
isSplittable = false,
disablePushDowns = false,
Some(tablePath),
Some(broadcastDvMap),
Some(broadcastHadoopConf))

val fileIndex = DeltaLogFileIndex(deltaParquetFormat, fs, addFilePath :: Nil)

val relation = HadoopFsRelation(
fileIndex,
fileIndex.partitionSchema,
readingSchema,
bucketSpec = None,
deltaParquetFormat,
options = Map.empty)(spark)
val plan = LogicalRelation(relation)

// Select some rows that are deleted and some rows not deleted
// Deleted row `value`: 0, 200, 300, 756, 10352, 19999
// Not deleted row `value`: 7, 900
checkDatasetUnorderly(
Dataset.ofRows(spark, plan)
.filter("value in (0, 7, 200, 300, 756, 900, 10352, 19999)")
.as[(Int, Int)],
(0, 1), (7, 0), (200, 1), (300, 1), (756, 1), (900, 0), (10352, 1), (19999, 1)
)
// Read with deletion vectors has separate code paths based on vectorized Parquet
// reader is enabled or not. Test both the combinations
for (enableVectorizedParquetReader <- Seq("true", "false")) {
test(s"read with DVs (vectorized Parquet reader enabled=$enableVectorizedParquetReader)") {
withTempDir { tempDir =>
spark.conf.set("spark.sql.parquet.enableVectorizedReader", enableVectorizedParquetReader)

val tablePath = tempDir.toString

// Generate a table with one parquet file containing multiple row groups.
generateData(tablePath)

val deltaLog = DeltaLog.forTable(spark, tempDir)
val metadata = deltaLog.snapshot.metadata

// Add additional field that has the deleted row flag to existing data schema
val readingSchema = metadata.schema.add(DeltaParquetFileFormat.IS_ROW_DELETED_STRUCT_FIELD)

val addFilePath = new Path(
tempDir.toString,
deltaLog.snapshot.allFiles.collect()(0).path)
assertParquetHasMultipleRowGroups(addFilePath)

val dv = generateDV(tablePath, 0, 200, 300, 756, 10352, 19999)

val fs = addFilePath.getFileSystem(hadoopConf)
val broadcastDvMap = spark.sparkContext.broadcast(
Map(fs.getFileStatus(addFilePath).getPath().toString() -> dv)
)

val broadcastHadoopConf = spark.sparkContext.broadcast(
new SerializableConfiguration(hadoopConf))

val deltaParquetFormat = new DeltaParquetFileFormat(
metadata,
isSplittable = false,
disablePushDowns = false,
Some(tablePath),
Some(broadcastDvMap),
Some(broadcastHadoopConf))

val fileIndex = DeltaLogFileIndex(deltaParquetFormat, fs, addFilePath :: Nil)

val relation = HadoopFsRelation(
fileIndex,
fileIndex.partitionSchema,
readingSchema,
bucketSpec = None,
deltaParquetFormat,
options = Map.empty)(spark)
val plan = LogicalRelation(relation)

// Select some rows that are deleted and some rows not deleted
// Deleted row `value`: 0, 200, 300, 756, 10352, 19999
// Not deleted row `value`: 7, 900
checkDatasetUnorderly(
Dataset.ofRows(spark, plan)
.filter("value in (0, 7, 200, 300, 756, 900, 10352, 19999)")
.as[(Int, Int)],
(0, 1), (7, 0), (200, 1), (300, 1), (756, 1), (900, 0), (10352, 1), (19999, 1)
)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta
import java.io.File
import java.io.FileNotFoundException


// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.{TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION}
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,42 @@ trait MergeIntoNotMatchedBySourceSuite extends MergeIntoSuiteBase {
),
cdc = Seq((2, 20, "update_preimage"), (2, 21, "update_postimage")))

testExtendedMergeWithCDC("not matched by source update and delete with skipping")(
source = (0, 0) :: (1, 1) :: (2, 2) :: (5, 5) :: Nil,
target = (1, 10) :: (2, 20) :: (4, 40) :: (5, 50) :: Nil,
mergeOn = "s.key = t.key and t.key > 4",
updateNotMatched(condition = "t.key = 1", set = "t.value = t.value + 1"),
deleteNotMatched(condition = "t.key = 4"))(
result = Seq(
(1, 11), // Not matched by source based on merge condition, updated
(2, 20), // Not matched by source based on merge condition, no change
// (4, 40), Not matched by source, deleted
(5, 50) // Matched, no change
),
cdc = Seq(
(1, 10, "update_preimage"),
(1, 11, "update_postimage"),
(4, 40, "delete")))

testExtendedMergeWithCDC(
"matched delete and not matched by source update with skipping")(
source = (0, 0) :: (1, 1) :: (2, 2) :: (5, 5) :: (6, 6) :: Nil,
target = (1, 10) :: (2, 20) :: (4, 40) :: (5, 50) :: (6, 60) :: Nil,
mergeOn = "s.key = t.key and t.key > 4",
delete(condition = "t.key = 5"),
updateNotMatched(condition = "t.key = 1", set = "t.value = t.value + 1"))(
result = Seq(
(1, 11), // Not matched by source based on merge condition, updated
(2, 20), // Not matched by source based on merge condition, no change
(4, 40), // Not matched by source, no change
// (5, 50), Matched, deleted
(6, 60) // Matched, no change
),
cdc = Seq(
(1, 10, "update_preimage"),
(1, 11, "update_postimage"),
(5, 50, "delete")))

testExtendedMergeWithCDC("not matched by source update + delete clauses")(
source = (0, 0) :: (1, 1) :: (5, 5) :: Nil,
target = (1, 10) :: (2, 20) :: (7, 70) :: Nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.databricks.spark.util.DatabricksLogging
import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest

import org.apache.spark.sql.{Dataset, QueryTest}
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.test.SharedSparkSession
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1463,6 +1463,26 @@ trait DataSkippingDeltaTestsBase extends QueryTest
}
}

test("Data skipping should always return files from latest commit version") {
withTempDir { dir =>
// If this test is flacky it is broken
Seq("aaa").toDF().write.format("delta").save(dir.getCanonicalPath)
val (log, snapshot) = DeltaLog.forTableWithSnapshot(spark, dir.getPath)
val addFile = snapshot.allFiles.collect().head
val fileWithStat = snapshot.getSpecificFilesWithStats(Seq(addFile.path)).head
// Ensure the stats has actual stats, not {}
assert(fileWithStat.stats.size > 2)
log.startTransaction().commitManually(addFile.copy(stats = "{}"))

// Delta dedup should always keep AddFile from newer version so
// getSpecificFilesWithStats should return the AddFile with empty stats
log.update()
val newfileWithStat =
log.unsafeVolatileSnapshot.getSpecificFilesWithStats(Seq(addFile.path)).head
assert(newfileWithStat.stats === "{}")
}
}

protected def expectedStatsForFile(index: Int, colName: String, deltaLog: DeltaLog): String = {
s"""{"numRecords":1,"minValues":{"$colName":$index},"maxValues":{"$colName":$index},""" +
s""""nullCount":{"$colName":0}}""".stripMargin
Expand Down
7 changes: 7 additions & 0 deletions run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ def run_sbt_tests(root_dir, coverage, scala_version=None):
cmd += ["++ %s test" % scala_version]
if coverage:
cmd += ["coverageAggregate", "coverageOff"]
cmd += ["-v"] # show java options used

# https://docs.oracle.com/javase/7/docs/technotes/guides/vm/G1.html
# a GC that is optimized for larger multiprocessor machines with large memory
cmd += ["-J-XX:+UseG1GC"]
# 4x the default heap size (set in delta/built.sbt)
cmd += ["-J-Xmx4G"]
run_cmd(cmd, env={"DELTA_TESTING": "1"}, stream_output=True)


Expand Down

0 comments on commit 38676a5

Please sign in to comment.