Skip to content

Commit

Permalink
Merge upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyum committed Apr 21, 2021
1 parent fc904ba commit 4d0b510
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import org.apache.spark.sql.types._
object SchemaPruning extends SQLConfHelper {
/**
* Prunes the nested schema by the requested fields. For example, if the schema is:
* `id int, struct<a:int, b:int>`, and given requested field are "a", the field "b" is pruned
* in the returned schema: `id int, struct<a:int>`.
* `id int, s struct<a:int, b:int>`, and given requested field "s.a", the inner field "b"
* is pruned in the returned schema: `id int, s struct<a:int>`.
* Note that:
* 1. The schema field ordering at original schema is still preserved in pruned schema.
* 2. The top-level fields are not pruned here.
Expand All @@ -40,7 +40,7 @@ object SchemaPruning extends SQLConfHelper {
.map { root: RootField => StructType(Array(root.field)) }
.reduceLeft(_ merge _)
val mergedDataSchema =
StructType(dataSchema.map(s => mergedSchema.find(_.name.equals(s.name)).getOrElse(s)))
StructType(dataSchema.map(d => mergedSchema.find(m => resolver(m.name, d.name)).getOrElse(d)))
// Sort the fields of mergedDataSchema according to their order in dataSchema,
// recursively. This makes mergedDataSchema a pruned schema of dataSchema
sortLeftFieldsByRight(mergedDataSchema, dataSchema).asInstanceOf[StructType]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,24 @@
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.expressions.SchemaPruning.RootField
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.internal.SQLConf.CASE_SENSITIVE
import org.apache.spark.sql.types._

class SchemaPruningSuite extends SparkFunSuite with SQLHelper {

def getRootFields(requestedFields: StructField*): Seq[RootField] = {
requestedFields.map { f =>
private def testPrunedSchema(
schema: StructType,
requestedFields: Seq[StructField],
expectedSchema: StructType): Unit = {
val requestedRootFields = requestedFields.map { f =>
// `derivedFromAtt` doesn't affect the result of pruned schema.
SchemaPruning.RootField(field = f, derivedFromAtt = true)
}
val prunedSchema = SchemaPruning.pruneDataSchema(schema, requestedRootFields)
assert(prunedSchema === expectedSchema)
}

test("prune schema by the requested fields") {
def testPrunedSchema(
schema: StructType,
requestedFields: Seq[StructField],
expectedSchema: StructType): Unit = {
val requestedRootFields = requestedFields.map { f =>
// `derivedFromAtt` doesn't affect the result of pruned schema.
SchemaPruning.RootField(field = f, derivedFromAtt = true)
}
val prunedSchema = SchemaPruning.pruneDataSchema(schema, requestedRootFields)
assert(prunedSchema === expectedSchema)
}

testPrunedSchema(
StructType.fromDDL("a int, b int"),
Seq(StructField("a", IntegerType)),
Expand Down Expand Up @@ -95,32 +86,47 @@ class SchemaPruningSuite extends SparkFunSuite with SQLHelper {
}

test("SPARK-35096: test case insensitivity of pruned schema") {
Seq(true, false).foreach(isCaseSensitive => {
val upperCaseSchema = StructType.fromDDL("A struct<A:int, B:int>, B int")
val lowerCaseSchema = StructType.fromDDL("a struct<a:int, b:int>, b int")
val upperCaseRequestedFields = Seq(StructField("A", StructType.fromDDL("A int")))
val lowerCaseRequestedFields = Seq(StructField("a", StructType.fromDDL("a int")))

Seq(true, false).foreach { isCaseSensitive =>
withSQLConf(CASE_SENSITIVE.key -> isCaseSensitive.toString) {
if (isCaseSensitive) {
// Schema is case-sensitive
val requestedFields = getRootFields(StructField("id", IntegerType))
val prunedSchema = SchemaPruning.pruneDataSchema(
StructType.fromDDL("ID int, name String"), requestedFields)
assert(prunedSchema == StructType(Seq.empty))
// Root fields are case-sensitive
val rootFieldsSchema = SchemaPruning.pruneDataSchema(
StructType.fromDDL("id int, name String"),
getRootFields(StructField("ID", IntegerType)))
assert(rootFieldsSchema == StructType(StructType(Seq.empty)))
testPrunedSchema(
upperCaseSchema,
upperCaseRequestedFields,
StructType.fromDDL("A struct<A:int>, B int"))
testPrunedSchema(
upperCaseSchema,
lowerCaseRequestedFields,
upperCaseSchema)

testPrunedSchema(
lowerCaseSchema,
upperCaseRequestedFields,
lowerCaseSchema)
testPrunedSchema(
lowerCaseSchema,
lowerCaseRequestedFields,
StructType.fromDDL("a struct<a:int>, b int"))
} else {
// Schema is case-insensitive
val prunedSchema = SchemaPruning.pruneDataSchema(
StructType.fromDDL("ID int, name String"),
getRootFields(StructField("id", IntegerType)))
assert(prunedSchema == StructType(StructField("ID", IntegerType) :: Nil))
// Root fields are case-insensitive
val rootFieldsSchema = SchemaPruning.pruneDataSchema(
StructType.fromDDL("id int, name String"),
getRootFields(StructField("ID", IntegerType)))
assert(rootFieldsSchema == StructType(StructField("id", IntegerType) :: Nil))
Seq(upperCaseRequestedFields, lowerCaseRequestedFields).foreach { requestedFields =>
testPrunedSchema(
upperCaseSchema,
requestedFields,
StructType.fromDDL("A struct<A:int>, B int"))
}

Seq(upperCaseRequestedFields, lowerCaseRequestedFields).foreach { requestedFields =>
testPrunedSchema(
lowerCaseSchema,
requestedFields,
StructType.fromDDL("a struct<a:int>, b int"))
}
}
}
})
}
}
}

0 comments on commit 4d0b510

Please sign in to comment.