Skip to content

Commit

Permalink
Revert "Remove metadata in parquet format sharing"
Browse files Browse the repository at this point in the history
This reverts commit df607fb.
  • Loading branch information
johanl-db committed Nov 4, 2024
1 parent 20e8b54 commit 6900461
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,28 +111,20 @@ class DeltaSharingDataSourceTypeWideningSuite
}
}

for (responseFormat <- Seq(
DeltaSharingOptions.RESPONSE_FORMAT_DELTA,
DeltaSharingOptions.RESPONSE_FORMAT_PARQUET
)) {
// Type widening metadata for column `value` for the test table above.
// The server strips the metadata in Parquet format sharing, this is ok since it's not used on
// the read path anyway.
val typeWideningMetadata: Metadata =
if (responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_DELTA) {
new MetadataBuilder()
.putMetadataArray(
"delta.typeChanges", Array(
new MetadataBuilder()
.putLong("tableVersion", 2)
.putString("fromType", "short")
.putString("toType", "integer")
.build()))
.build()
} else {
Metadata.empty
}

/** Short-hand for the type widening metadata for column `value` for the test table above. */
private val typeWideningMetadata: Metadata =
new MetadataBuilder()
.putMetadataArray(
"delta.typeChanges", Array(
new MetadataBuilder()
.putLong("tableVersion", 2)
.putString("fromType", "short")
.putString("toType", "integer")
.build()))
.build()

for (responseFormat <- Seq(DeltaSharingOptions.RESPONSE_FORMAT_DELTA,
DeltaSharingOptions.RESPONSE_FORMAT_PARQUET)) {
test(s"Delta sharing with type widening, responseFormat=$responseFormat") {
withTestTable { tableName =>
testReadingDeltaShare(
Expand All @@ -150,23 +142,23 @@ class DeltaSharingDataSourceTypeWideningSuite
testReadingDeltaShare(
tableName,
versionAsOf = Some(3),
responseFormat,
responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA,
expectedSchema = new StructType()
.add("value", IntegerType, nullable = true, metadata = typeWideningMetadata),
expectedResult = Seq(1, 2, 3, Int.MaxValue).toDF("value"))

testReadingDeltaShare(
tableName,
versionAsOf = Some(2),
responseFormat,
responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA,
expectedSchema = new StructType()
.add("value", IntegerType, nullable = true, metadata = typeWideningMetadata),
expectedResult = Seq(1, 2).toDF("value"))

testReadingDeltaShare(
tableName,
versionAsOf = Some(1),
responseFormat,
responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA,
expectedSchema = new StructType()
.add("value", ShortType),
expectedResult = Seq(1, 2).toDF("value"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package io.delta.sharing.spark
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.util.JsonUtils
import io.delta.sharing.client.{
DeltaSharingClient,
Expand All @@ -30,13 +29,11 @@ import io.delta.sharing.client.model.{
AddFile => ClientAddFile,
DeltaTableFiles,
DeltaTableMetadata,
Metadata,
SingleAction,
Table
}

import org.apache.spark.SparkEnv
import org.apache.spark.sql.types.{DataType, Metadata => SparkMetadata, MetadataBuilder, StructType}
import org.apache.spark.storage.BlockId

/**
Expand Down Expand Up @@ -109,7 +106,7 @@ private[spark] class TestClientForDeltaFormatSharing(
DeltaTableMetadata(
version = versionAsOf.getOrElse(getTableVersion(table)),
protocol = protocol,
metadata = cleanUpTableSchema(metadata),
metadata = metadata,
respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET
)
} else {
Expand Down Expand Up @@ -186,7 +183,7 @@ private[spark] class TestClientForDeltaFormatSharing(
DeltaTableFiles(
versionAsOf.getOrElse(getTableVersion(table)),
protocol,
metadata = cleanUpTableSchema(metadata),
metadata,
files.toSeq,
respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET
)
Expand Down Expand Up @@ -271,27 +268,6 @@ private[spark] class TestClientForDeltaFormatSharing(
override def getForStreaming(): Boolean = forStreaming

override def getProfileProvider: DeltaSharingProfileProvider = profileProvider

/**
* Removes all field metadata except for comments from the table schema in the given Delta
* metadata action. Used for Parquet format sharing.
*/
private def cleanUpTableSchema(metadata: Metadata): Metadata = {
val schema = DataType.fromJson(metadata.schemaString).asInstanceOf[StructType]
val cleanedSchema = SchemaMergingUtils.transformColumns(schema) {
case (_, field, _) =>
val cleanedMetadata = if (field.metadata.contains("comment")) {
new MetadataBuilder()
.putString("comment", field.metadata.getString("comment"))
.build()
} else {
SparkMetadata.empty

}
field.copy(metadata = cleanedMetadata)
}
metadata.copy(schemaString = cleanedSchema.json)
}
}

object TestClientForDeltaFormatSharing {
Expand Down

0 comments on commit 6900461

Please sign in to comment.