From ae98b87810616c472506d7ee69b65e95a01ea62a Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Fri, 13 Sep 2024 09:20:36 +0200 Subject: [PATCH 1/5] Add type widening delta sharing tests --- .../sharing/spark/DeltaSharingUtils.scala | 15 +- ...taSharingDataSourceTypeWideningSuite.scala | 147 ++++++++++++++++++ .../TestClientForDeltaFormatSharing.scala | 20 ++- 3 files changed, 165 insertions(+), 17 deletions(-) create mode 100644 sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala index e77290855b6..c03924f13b1 100644 --- a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala @@ -22,15 +22,7 @@ import java.util.{TimeZone, UUID} import scala.reflect.ClassTag -import org.apache.spark.sql.delta.{ - ColumnMappingTableFeature, - DeletionVectorsTableFeature, - DeltaLog, - DeltaParquetFileFormat, - SnapshotDescriptor, - TimestampNTZTableFeature -} -import org.apache.spark.sql.delta.VariantTypeTableFeature +import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions.{Metadata, Protocol} import com.google.common.hash.Hashing import io.delta.sharing.client.{DeltaSharingClient, DeltaSharingRestClient} @@ -51,13 +43,18 @@ object DeltaSharingUtils extends Logging { DeletionVectorsTableFeature.name, ColumnMappingTableFeature.name, TimestampNTZTableFeature.name, + TypeWideningPreviewTableFeature.name, + TypeWideningTableFeature.name, VariantTypeTableFeature.name ) + val SUPPORTED_READER_FEATURES: Seq[String] = Seq( DeletionVectorsTableFeature.name, ColumnMappingTableFeature.name, TimestampNTZTableFeature.name, + TypeWideningPreviewTableFeature.name, + TypeWideningTableFeature.name, VariantTypeTableFeature.name ) diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala new file mode 100644 index 00000000000..9177d1d4e8b --- /dev/null +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala @@ -0,0 +1,147 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.spark + +import org.apache.spark.sql.delta.DeltaConfigs +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.delta.sharing.DeltaSharingTestSparkUtils +import org.apache.spark.sql.types._ + +// Unit tests to verify that type widening works with delta sharing. +class DeltaSharingDataSourceTypeWideningSuite + extends QueryTest + with DeltaSQLCommandTest + with DeltaSharingTestSparkUtils + with DeltaSharingDataSourceDeltaTestUtils { + + import testImplicits._ + + protected override def sparkConf: SparkConf = { + super.sparkConf + .set(DeltaConfigs.ENABLE_TYPE_WIDENING.defaultTablePropertyKey, true.toString) + } + + /** Sets up delta sharing mocks to read a table and validates results. */ + private def testReadingDeltaShare( + tableName: String, + versionAsOf: Option[Long], + responseFormat: String, + expectedSchema: StructType, + expectedResult: DataFrame): Unit = { + withTempDir { tempDir => + val sharedTableName = + if (responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_DELTA) { + "type_widening_shared_delta_table" + } else { + "type_widening_shared_parquet_table" + } + prepareMockedClientMetadata(tableName, sharedTableName) + prepareMockedClientGetTableVersion(tableName, sharedTableName, versionAsOf) + if (responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_DELTA) { + prepareMockedClientAndFileSystemResult(tableName, sharedTableName, versionAsOf) + } else { + assert(responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_PARQUET) + prepareMockedClientAndFileSystemResultForParquet(tableName, sharedTableName, versionAsOf) + } + + var reader = spark.read + .format("deltaSharing") + .option("responseFormat", responseFormat) + versionAsOf.foreach { version => + reader = reader.option("versionAsOf", version) + } + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + val result = reader.load(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName") + assert(result.schema === expectedSchema) + checkAnswer(result, expectedResult) + } + } + } + + /** Creates a table and applies a type change to it. */ + private def withTestTable(testBody: String => Unit): Unit = { + val deltaTableName = "type_widening_table" + withTable(deltaTableName) { + sql(s"CREATE TABLE $deltaTableName (value SMALLINT) USING DELTA") + sql(s"INSERT INTO $deltaTableName VALUES (1), (2)") + sql(s"ALTER TABLE $deltaTableName CHANGE COLUMN value TYPE INT") + sql(s"INSERT INTO $deltaTableName VALUES (3), (${Int.MaxValue})") + sql(s"INSERT INTO $deltaTableName VALUES (4), (5)") + testBody(deltaTableName) + } + } + + /** Short-hand for the type widening metadata for column `value` for the test table above. */ + private val typeWideningMetadata: Metadata = + new MetadataBuilder() + .putMetadataArray( + "delta.typeChanges", Array( + new MetadataBuilder() + .putLong("tableVersion", 2) + .putString("fromType", "short") + .putString("toType", "integer") + .build())) + .build() + + for (responseFormat <- Seq(DeltaSharingOptions.RESPONSE_FORMAT_DELTA, + DeltaSharingOptions.RESPONSE_FORMAT_PARQUET)) { + test(s"Delta sharing with type widening, responseFormat=$responseFormat") { + withTestTable { tableName => + testReadingDeltaShare( + tableName, + versionAsOf = None, + responseFormat, + expectedSchema = new StructType() + .add("value", IntegerType, nullable = true, metadata = typeWideningMetadata), + expectedResult = Seq(1, 2, 3, Int.MaxValue, 4, 5).toDF("value")) + } + } + + test(s"Delta sharing with type widening, time travel, responseFormat=$responseFormat") { + withTestTable { tableName => + testReadingDeltaShare( + tableName, + versionAsOf = Some(3), + responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA, + expectedSchema = new StructType() + .add("value", IntegerType, nullable = true, metadata = typeWideningMetadata), + expectedResult = Seq(1, 2, 3, Int.MaxValue).toDF("value")) + + testReadingDeltaShare( + tableName, + versionAsOf = Some(2), + responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA, + expectedSchema = new StructType() + .add("value", IntegerType, nullable = true, metadata = typeWideningMetadata), + expectedResult = Seq(1, 2).toDF("value")) + + testReadingDeltaShare( + tableName, + versionAsOf = Some(1), + responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA, + expectedSchema = new StructType() + .add("value", ShortType), + expectedResult = Seq(1, 2).toDF("value")) + } + } + } +} diff --git a/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala b/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala index a3d3d52a85d..816e668d5a4 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala @@ -18,6 +18,7 @@ package io.delta.sharing.spark import scala.collection.mutable.ArrayBuffer +import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.util.JsonUtils import io.delta.sharing.client.{ DeltaSharingClient, @@ -59,16 +60,19 @@ private[spark] class TestClientForDeltaFormatSharing( tokenRenewalThresholdInSeconds: Int = 600) extends DeltaSharingClient { + private val supportedReaderFeatures: Seq[String] = Seq( + DeletionVectorsTableFeature, + ColumnMappingTableFeature, + TimestampNTZTableFeature, + TypeWideningPreviewTableFeature, + TypeWideningTableFeature + ).map(_.name) + assert( responseFormat == DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET || - ( - readerFeatures.contains("deletionVectors") && - readerFeatures.contains("columnMapping") && - readerFeatures.contains("timestampNtz") && - readerFeatures.contains("variantType-preview") - ), - "deletionVectors, columnMapping, timestampNtz, variantType-preview should be supported in " + - "all types of queries." + supportedReaderFeatures.forall(readerFeatures.split(",").contains), + s"${supportedReaderFeatures.diff(readerFeatures.split(",")).mkString(", ")} " + + s"should be supported in all types of queries." ) import TestClientForDeltaFormatSharing._ From df607fbf5831c76245f60af78e38a529052ab805 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Thu, 10 Oct 2024 17:30:38 +0200 Subject: [PATCH 2/5] Remove metadata in parquet format sharing --- ...taSharingDataSourceTypeWideningSuite.scala | 40 +++++++++++-------- .../TestClientForDeltaFormatSharing.scala | 28 ++++++++++++- 2 files changed, 50 insertions(+), 18 deletions(-) diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala index 9177d1d4e8b..93c3ba968b7 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala @@ -90,20 +90,28 @@ class DeltaSharingDataSourceTypeWideningSuite } } - /** Short-hand for the type widening metadata for column `value` for the test table above. */ - private val typeWideningMetadata: Metadata = - new MetadataBuilder() - .putMetadataArray( - "delta.typeChanges", Array( - new MetadataBuilder() - .putLong("tableVersion", 2) - .putString("fromType", "short") - .putString("toType", "integer") - .build())) - .build() + for (responseFormat <- Seq( + DeltaSharingOptions.RESPONSE_FORMAT_DELTA, + DeltaSharingOptions.RESPONSE_FORMAT_PARQUET + )) { + // Type widening metadata for column `value` for the test table above. + // The server strips the metadata in Parquet format sharing, this is ok since it's not used on + // the read path anyway. + val typeWideningMetadata: Metadata = + if (responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_DELTA) { + new MetadataBuilder() + .putMetadataArray( + "delta.typeChanges", Array( + new MetadataBuilder() + .putLong("tableVersion", 2) + .putString("fromType", "short") + .putString("toType", "integer") + .build())) + .build() + } else { + Metadata.empty + } - for (responseFormat <- Seq(DeltaSharingOptions.RESPONSE_FORMAT_DELTA, - DeltaSharingOptions.RESPONSE_FORMAT_PARQUET)) { test(s"Delta sharing with type widening, responseFormat=$responseFormat") { withTestTable { tableName => testReadingDeltaShare( @@ -121,7 +129,7 @@ class DeltaSharingDataSourceTypeWideningSuite testReadingDeltaShare( tableName, versionAsOf = Some(3), - responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA, + responseFormat, expectedSchema = new StructType() .add("value", IntegerType, nullable = true, metadata = typeWideningMetadata), expectedResult = Seq(1, 2, 3, Int.MaxValue).toDF("value")) @@ -129,7 +137,7 @@ class DeltaSharingDataSourceTypeWideningSuite testReadingDeltaShare( tableName, versionAsOf = Some(2), - responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA, + responseFormat, expectedSchema = new StructType() .add("value", IntegerType, nullable = true, metadata = typeWideningMetadata), expectedResult = Seq(1, 2).toDF("value")) @@ -137,7 +145,7 @@ class DeltaSharingDataSourceTypeWideningSuite testReadingDeltaShare( tableName, versionAsOf = Some(1), - responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA, + responseFormat, expectedSchema = new StructType() .add("value", ShortType), expectedResult = Seq(1, 2).toDF("value")) diff --git a/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala b/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala index 816e668d5a4..555995b5583 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala @@ -19,6 +19,7 @@ package io.delta.sharing.spark import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.schema.SchemaMergingUtils import org.apache.spark.sql.delta.util.JsonUtils import io.delta.sharing.client.{ DeltaSharingClient, @@ -29,11 +30,13 @@ import io.delta.sharing.client.model.{ AddFile => ClientAddFile, DeltaTableFiles, DeltaTableMetadata, + Metadata, SingleAction, Table } import org.apache.spark.SparkEnv +import org.apache.spark.sql.types.{DataType, Metadata => SparkMetadata, MetadataBuilder, StructType} import org.apache.spark.storage.BlockId /** @@ -105,7 +108,7 @@ private[spark] class TestClientForDeltaFormatSharing( DeltaTableMetadata( version = versionAsOf.getOrElse(getTableVersion(table)), protocol = protocol, - metadata = metadata, + metadata = cleanUpTableSchema(metadata), respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET ) } else { @@ -182,7 +185,7 @@ private[spark] class TestClientForDeltaFormatSharing( DeltaTableFiles( versionAsOf.getOrElse(getTableVersion(table)), protocol, - metadata, + metadata = cleanUpTableSchema(metadata), files.toSeq, respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET ) @@ -267,6 +270,27 @@ private[spark] class TestClientForDeltaFormatSharing( override def getForStreaming(): Boolean = forStreaming override def getProfileProvider: DeltaSharingProfileProvider = profileProvider + + /** + * Removes all field metadata except for comments from the table schema in the given Delta + * metadata action. Used for Parquet format sharing. + */ + private def cleanUpTableSchema(metadata: Metadata): Metadata = { + val schema = DataType.fromJson(metadata.schemaString).asInstanceOf[StructType] + val cleanedSchema = SchemaMergingUtils.transformColumns(schema) { + case (_, field, _) => + val cleanedMetadata = if (field.metadata.contains("comment")) { + new MetadataBuilder() + .putString("comment", field.metadata.getString("comment")) + .build() + } else { + SparkMetadata.empty + + } + field.copy(metadata = cleanedMetadata) + } + metadata.copy(schemaString = cleanedSchema.json) + } } object TestClientForDeltaFormatSharing { From 4d1174e1fc16c9ce74f441b2e75118e30ff15201 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Wed, 23 Oct 2024 17:11:17 +0200 Subject: [PATCH 3/5] Add tests for jsonPredicateHints --- ...taSharingDataSourceTypeWideningSuite.scala | 92 ++++++++++++++++++- .../TestClientForDeltaFormatSharing.scala | 3 +- 2 files changed, 89 insertions(+), 6 deletions(-) diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala index 93c3ba968b7..49ffdfe51da 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala @@ -20,8 +20,10 @@ import org.apache.spark.sql.delta.DeltaConfigs import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.SparkConf -import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.{Column, DataFrame, QueryTest} +import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.delta.sharing.DeltaSharingTestSparkUtils +import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ // Unit tests to verify that type widening works with delta sharing. @@ -43,14 +45,18 @@ class DeltaSharingDataSourceTypeWideningSuite tableName: String, versionAsOf: Option[Long], responseFormat: String, + filter: Column = new Column(Literal.TrueLiteral), expectedSchema: StructType, + expectedJsonPredicate: Seq[String] = Seq.empty, expectedResult: DataFrame): Unit = { withTempDir { tempDir => val sharedTableName = if (responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_DELTA) { - "type_widening_shared_delta_table" + tableName + "shared_delta_table" } else { - "type_widening_shared_parquet_table" + // The mock test client expects the table name to contain 'shared_parquet_table' for + // parquet format sharing. + tableName + "shared_parquet_table" } prepareMockedClientMetadata(tableName, sharedTableName) prepareMockedClientGetTableVersion(tableName, sharedTableName, versionAsOf) @@ -68,18 +74,31 @@ class DeltaSharingDataSourceTypeWideningSuite reader = reader.option("versionAsOf", version) } + TestClientForDeltaFormatSharing.jsonPredicateHints.clear() withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { val profileFile = prepareProfileFile(tempDir) - val result = reader.load(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName") + val result = reader + .load(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName") + .filter(filter) assert(result.schema === expectedSchema) checkAnswer(result, expectedResult) + assert(getJsonPredicateHints(tableName) === expectedJsonPredicate) } } } + /** Fetches JSON predicates passed to the test client when reading a table. */ + private def getJsonPredicateHints(tableName: String): Seq[String] = { + TestClientForDeltaFormatSharing + .jsonPredicateHints + .filterKeys(_.contains(tableName)) + .values + .toSeq + } + /** Creates a table and applies a type change to it. */ private def withTestTable(testBody: String => Unit): Unit = { - val deltaTableName = "type_widening_table" + val deltaTableName = "type_widening" withTable(deltaTableName) { sql(s"CREATE TABLE $deltaTableName (value SMALLINT) USING DELTA") sql(s"INSERT INTO $deltaTableName VALUES (1), (2)") @@ -151,5 +170,68 @@ class DeltaSharingDataSourceTypeWideningSuite expectedResult = Seq(1, 2).toDF("value")) } } + + test(s"jsonPredicateHints on non-partition column after type widening, " + + s"responseFormat=$responseFormat") { + withTestTable { tableName => + testReadingDeltaShare( + tableName, + versionAsOf = None, + responseFormat, + filter = col("value") === Int.MaxValue, + expectedSchema = new StructType() + .add("value", IntegerType, nullable = true, metadata = typeWideningMetadata), + expectedResult = Seq(Int.MaxValue).toDF("value"), + expectedJsonPredicate = Seq( + """ + |{"op":"and","children":[ + | {"op":"not","children":[ + | {"op":"isNull","children":[ + | {"op":"column","name":"value","valueType":"int"}]}]}, + | {"op":"equal","children":[ + | {"op":"column","name":"value","valueType":"int"}, + | {"op":"literal","value":"2147483647","valueType":"int"}]}]} + """.stripMargin.replaceAll("\n", "").replaceAll(" ", "")) + ) + } + } + + test(s"jsonPredicateHints on partition column after type widening, " + + s"responseFormat=$responseFormat") { + val deltaTableName = "type_widening_partitioned" + withTable(deltaTableName) { + sql( + s""" + |CREATE TABLE $deltaTableName (part SMALLINT, value SMALLINT) + |USING DELTA + |PARTITIONED BY (part) + """.stripMargin + ) + sql(s"INSERT INTO $deltaTableName VALUES (1, 1), (2, 2)") + sql(s"ALTER TABLE $deltaTableName CHANGE COLUMN part TYPE INT") + sql(s"INSERT INTO $deltaTableName VALUES (3, 3), (${Int.MaxValue}, 4)") + + testReadingDeltaShare( + deltaTableName, + versionAsOf = None, + responseFormat, + filter = col("part") === Int.MaxValue, + expectedSchema = new StructType() + .add("part", IntegerType, nullable = true, metadata = typeWideningMetadata) + .add("value", ShortType), + expectedResult = Seq((Int.MaxValue, 4)).toDF("part", "value"), + expectedJsonPredicate = Seq( + """ + |{"op":"and","children":[ + | {"op":"not","children":[ + | {"op":"isNull","children":[ + | {"op":"column","name":"part","valueType":"int"}]}]}, + | {"op":"equal","children":[ + | {"op":"column","name":"part","valueType":"int"}, + | {"op":"literal","value":"2147483647","valueType":"int"}]}]} + """.stripMargin.replaceAll("\n", "").replaceAll(" ", "")) + ) + } + } } } diff --git a/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala b/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala index 555995b5583..8913ac5fe91 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala @@ -68,7 +68,8 @@ private[spark] class TestClientForDeltaFormatSharing( ColumnMappingTableFeature, TimestampNTZTableFeature, TypeWideningPreviewTableFeature, - TypeWideningTableFeature + TypeWideningTableFeature, + VariantTypeTableFeature ).map(_.name) assert( From 20e8b54eb697a67f3b5f32817b8b5925efeb4021 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Wed, 23 Oct 2024 18:23:32 +0200 Subject: [PATCH 4/5] Add jsonPredicateHints tests --- .../DeltaSharingDataSourceTypeWideningSuite.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala index 49ffdfe51da..6018f2a59e2 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala @@ -45,7 +45,7 @@ class DeltaSharingDataSourceTypeWideningSuite tableName: String, versionAsOf: Option[Long], responseFormat: String, - filter: Column = new Column(Literal.TrueLiteral), + filter: Option[Column] = None, expectedSchema: StructType, expectedJsonPredicate: Seq[String] = Seq.empty, expectedResult: DataFrame): Unit = { @@ -77,9 +77,11 @@ class DeltaSharingDataSourceTypeWideningSuite TestClientForDeltaFormatSharing.jsonPredicateHints.clear() withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { val profileFile = prepareProfileFile(tempDir) - val result = reader + var result = reader .load(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName") - .filter(filter) + filter.foreach { f => + result = result.filter(f) + } assert(result.schema === expectedSchema) checkAnswer(result, expectedResult) assert(getJsonPredicateHints(tableName) === expectedJsonPredicate) @@ -178,7 +180,7 @@ class DeltaSharingDataSourceTypeWideningSuite tableName, versionAsOf = None, responseFormat, - filter = col("value") === Int.MaxValue, + filter = Some(col("value") === Int.MaxValue), expectedSchema = new StructType() .add("value", IntegerType, nullable = true, metadata = typeWideningMetadata), expectedResult = Seq(Int.MaxValue).toDF("value"), @@ -215,7 +217,7 @@ class DeltaSharingDataSourceTypeWideningSuite deltaTableName, versionAsOf = None, responseFormat, - filter = col("part") === Int.MaxValue, + filter = Some(col("part") === Int.MaxValue), expectedSchema = new StructType() .add("part", IntegerType, nullable = true, metadata = typeWideningMetadata) .add("value", ShortType), From 6900461b6d46af179f2f59b2435549d38c63ecf7 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Mon, 4 Nov 2024 13:46:51 +0100 Subject: [PATCH 5/5] Revert "Remove metadata in parquet format sharing" This reverts commit df607fbf5831c76245f60af78e38a529052ab805. --- ...taSharingDataSourceTypeWideningSuite.scala | 42 ++++++++----------- .../TestClientForDeltaFormatSharing.scala | 28 +------------ 2 files changed, 19 insertions(+), 51 deletions(-) diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala index 6018f2a59e2..5ce3665de6d 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala @@ -111,28 +111,20 @@ class DeltaSharingDataSourceTypeWideningSuite } } - for (responseFormat <- Seq( - DeltaSharingOptions.RESPONSE_FORMAT_DELTA, - DeltaSharingOptions.RESPONSE_FORMAT_PARQUET - )) { - // Type widening metadata for column `value` for the test table above. - // The server strips the metadata in Parquet format sharing, this is ok since it's not used on - // the read path anyway. - val typeWideningMetadata: Metadata = - if (responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_DELTA) { - new MetadataBuilder() - .putMetadataArray( - "delta.typeChanges", Array( - new MetadataBuilder() - .putLong("tableVersion", 2) - .putString("fromType", "short") - .putString("toType", "integer") - .build())) - .build() - } else { - Metadata.empty - } - + /** Short-hand for the type widening metadata for column `value` for the test table above. */ + private val typeWideningMetadata: Metadata = + new MetadataBuilder() + .putMetadataArray( + "delta.typeChanges", Array( + new MetadataBuilder() + .putLong("tableVersion", 2) + .putString("fromType", "short") + .putString("toType", "integer") + .build())) + .build() + + for (responseFormat <- Seq(DeltaSharingOptions.RESPONSE_FORMAT_DELTA, + DeltaSharingOptions.RESPONSE_FORMAT_PARQUET)) { test(s"Delta sharing with type widening, responseFormat=$responseFormat") { withTestTable { tableName => testReadingDeltaShare( @@ -150,7 +142,7 @@ class DeltaSharingDataSourceTypeWideningSuite testReadingDeltaShare( tableName, versionAsOf = Some(3), - responseFormat, + responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA, expectedSchema = new StructType() .add("value", IntegerType, nullable = true, metadata = typeWideningMetadata), expectedResult = Seq(1, 2, 3, Int.MaxValue).toDF("value")) @@ -158,7 +150,7 @@ class DeltaSharingDataSourceTypeWideningSuite testReadingDeltaShare( tableName, versionAsOf = Some(2), - responseFormat, + responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA, expectedSchema = new StructType() .add("value", IntegerType, nullable = true, metadata = typeWideningMetadata), expectedResult = Seq(1, 2).toDF("value")) @@ -166,7 +158,7 @@ class DeltaSharingDataSourceTypeWideningSuite testReadingDeltaShare( tableName, versionAsOf = Some(1), - responseFormat, + responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA, expectedSchema = new StructType() .add("value", ShortType), expectedResult = Seq(1, 2).toDF("value")) diff --git a/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala b/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala index 8913ac5fe91..74670e42c81 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala @@ -19,7 +19,6 @@ package io.delta.sharing.spark import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.delta._ -import org.apache.spark.sql.delta.schema.SchemaMergingUtils import org.apache.spark.sql.delta.util.JsonUtils import io.delta.sharing.client.{ DeltaSharingClient, @@ -30,13 +29,11 @@ import io.delta.sharing.client.model.{ AddFile => ClientAddFile, DeltaTableFiles, DeltaTableMetadata, - Metadata, SingleAction, Table } import org.apache.spark.SparkEnv -import org.apache.spark.sql.types.{DataType, Metadata => SparkMetadata, MetadataBuilder, StructType} import org.apache.spark.storage.BlockId /** @@ -109,7 +106,7 @@ private[spark] class TestClientForDeltaFormatSharing( DeltaTableMetadata( version = versionAsOf.getOrElse(getTableVersion(table)), protocol = protocol, - metadata = cleanUpTableSchema(metadata), + metadata = metadata, respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET ) } else { @@ -186,7 +183,7 @@ private[spark] class TestClientForDeltaFormatSharing( DeltaTableFiles( versionAsOf.getOrElse(getTableVersion(table)), protocol, - metadata = cleanUpTableSchema(metadata), + metadata, files.toSeq, respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET ) @@ -271,27 +268,6 @@ private[spark] class TestClientForDeltaFormatSharing( override def getForStreaming(): Boolean = forStreaming override def getProfileProvider: DeltaSharingProfileProvider = profileProvider - - /** - * Removes all field metadata except for comments from the table schema in the given Delta - * metadata action. Used for Parquet format sharing. - */ - private def cleanUpTableSchema(metadata: Metadata): Metadata = { - val schema = DataType.fromJson(metadata.schemaString).asInstanceOf[StructType] - val cleanedSchema = SchemaMergingUtils.transformColumns(schema) { - case (_, field, _) => - val cleanedMetadata = if (field.metadata.contains("comment")) { - new MetadataBuilder() - .putString("comment", field.metadata.getString("comment")) - .build() - } else { - SparkMetadata.empty - - } - field.copy(metadata = cleanedMetadata) - } - metadata.copy(schemaString = cleanedSchema.json) - } } object TestClientForDeltaFormatSharing {