Skip to content

Commit

Permalink
Remove metadata in parquet format sharing
Browse files Browse the repository at this point in the history
  • Loading branch information
johanl-db committed Oct 10, 2024
1 parent ae98b87 commit df607fb
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,28 @@ class DeltaSharingDataSourceTypeWideningSuite
}
}

/** Short-hand for the type widening metadata for column `value` for the test table above. */
private val typeWideningMetadata: Metadata =
new MetadataBuilder()
.putMetadataArray(
"delta.typeChanges", Array(
new MetadataBuilder()
.putLong("tableVersion", 2)
.putString("fromType", "short")
.putString("toType", "integer")
.build()))
.build()
for (responseFormat <- Seq(
DeltaSharingOptions.RESPONSE_FORMAT_DELTA,
DeltaSharingOptions.RESPONSE_FORMAT_PARQUET
)) {
// Type widening metadata for column `value` for the test table above.
// The server strips the metadata in Parquet format sharing, this is ok since it's not used on
// the read path anyway.
val typeWideningMetadata: Metadata =
if (responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_DELTA) {
new MetadataBuilder()
.putMetadataArray(
"delta.typeChanges", Array(
new MetadataBuilder()
.putLong("tableVersion", 2)
.putString("fromType", "short")
.putString("toType", "integer")
.build()))
.build()
} else {
Metadata.empty
}

for (responseFormat <- Seq(DeltaSharingOptions.RESPONSE_FORMAT_DELTA,
DeltaSharingOptions.RESPONSE_FORMAT_PARQUET)) {
test(s"Delta sharing with type widening, responseFormat=$responseFormat") {
withTestTable { tableName =>
testReadingDeltaShare(
Expand All @@ -121,23 +129,23 @@ class DeltaSharingDataSourceTypeWideningSuite
testReadingDeltaShare(
tableName,
versionAsOf = Some(3),
responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA,
responseFormat,
expectedSchema = new StructType()
.add("value", IntegerType, nullable = true, metadata = typeWideningMetadata),
expectedResult = Seq(1, 2, 3, Int.MaxValue).toDF("value"))

testReadingDeltaShare(
tableName,
versionAsOf = Some(2),
responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA,
responseFormat,
expectedSchema = new StructType()
.add("value", IntegerType, nullable = true, metadata = typeWideningMetadata),
expectedResult = Seq(1, 2).toDF("value"))

testReadingDeltaShare(
tableName,
versionAsOf = Some(1),
responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA,
responseFormat,
expectedSchema = new StructType()
.add("value", ShortType),
expectedResult = Seq(1, 2).toDF("value"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package io.delta.sharing.spark
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.util.JsonUtils
import io.delta.sharing.client.{
DeltaSharingClient,
Expand All @@ -29,11 +30,13 @@ import io.delta.sharing.client.model.{
AddFile => ClientAddFile,
DeltaTableFiles,
DeltaTableMetadata,
Metadata,
SingleAction,
Table
}

import org.apache.spark.SparkEnv
import org.apache.spark.sql.types.{DataType, Metadata => SparkMetadata, MetadataBuilder, StructType}
import org.apache.spark.storage.BlockId

/**
Expand Down Expand Up @@ -105,7 +108,7 @@ private[spark] class TestClientForDeltaFormatSharing(
DeltaTableMetadata(
version = versionAsOf.getOrElse(getTableVersion(table)),
protocol = protocol,
metadata = metadata,
metadata = cleanUpTableSchema(metadata),
respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET
)
} else {
Expand Down Expand Up @@ -182,7 +185,7 @@ private[spark] class TestClientForDeltaFormatSharing(
DeltaTableFiles(
versionAsOf.getOrElse(getTableVersion(table)),
protocol,
metadata,
metadata = cleanUpTableSchema(metadata),
files.toSeq,
respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET
)
Expand Down Expand Up @@ -267,6 +270,27 @@ private[spark] class TestClientForDeltaFormatSharing(
override def getForStreaming(): Boolean = forStreaming

override def getProfileProvider: DeltaSharingProfileProvider = profileProvider

/**
* Removes all field metadata except for comments from the table schema in the given Delta
* metadata action. Used for Parquet format sharing.
*/
private def cleanUpTableSchema(metadata: Metadata): Metadata = {
val schema = DataType.fromJson(metadata.schemaString).asInstanceOf[StructType]
val cleanedSchema = SchemaMergingUtils.transformColumns(schema) {
case (_, field, _) =>
val cleanedMetadata = if (field.metadata.contains("comment")) {
new MetadataBuilder()
.putString("comment", field.metadata.getString("comment"))
.build()
} else {
SparkMetadata.empty

}
field.copy(metadata = cleanedMetadata)
}
metadata.copy(schemaString = cleanedSchema.json)
}
}

object TestClientForDeltaFormatSharing {
Expand Down

0 comments on commit df607fb

Please sign in to comment.