Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34897][SQL] Support reconcile schemas based on index after nested column pruning #31993

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import org.apache.spark.sql.types._

object SchemaPruning {
/**
* Filters the schema by the requested fields. For example, if the schema is struct<a:int, b:int>,
* and given requested field are "a", the field "b" is pruned in the returned schema.
* Prunes the nested schema by the requested fields. For example, if the schema is
* struct<a:int, b:int>, and given requested field are "a", the field "b" is pruned in the
Copy link
Contributor

@cloud-fan cloud-fan Apr 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the example is incorrect now. This method doesn't prune top-level fields.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the example to id int, struct<a:int, b:int> .

* returned schema.
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
* Note that schema field ordering at original schema is still preserved in pruned schema.
*/
def pruneDataSchema(
Expand All @@ -32,11 +33,10 @@ object SchemaPruning {
// in the resulting schema may differ from their ordering in the logical relation's
// original schema
val mergedSchema = requestedRootFields
.map { case root: RootField => StructType(Array(root.field)) }
.map { root: RootField => StructType(Array(root.field)) }
.reduceLeft(_ merge _)
val dataSchemaFieldNames = dataSchema.fieldNames.toSet
val mergedDataSchema =
StructType(mergedSchema.filter(f => dataSchemaFieldNames.contains(f.name)))
StructType(dataSchema.map(s => mergedSchema.find(_.name.equals(s.name)).getOrElse(s)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the actual difference? can you give a simple example?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we don't prune anything from the root fields now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is the case please update the document of this method.

Copy link
Member Author

@wangyum wangyum Apr 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sql(
  """
    |CREATE TABLE t1 (
    |  _col0 INT,
    |  _col1 STRING,
    |  _col2 STRUCT<c1: STRING, c2: STRING, c3: STRING, c4: BIGINT>)
    |USING ORC
    |""".stripMargin)


spark.sql("SELECT _col0, _col2.c1 FROM t1").show

The origin schema is:

`_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING, `c2`: STRING, `c3`: STRING, `c4`: BIGINT> 

Before this PR, the pruneDataSchema returns:

`_col0` INT,`_col2` STRUCT<`c1`: STRING>

After this PR, the pruneDataSchema returns:

`_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING>

It only prune nested schemas.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's wrong with the previous behavior? We can't sacrifice performance for all the cases only because the ORC by ordinal case is problematic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it because column pruning will be done by other rules so we don't need to consider it here?

Yes.

val readDataColumns =
dataColumns
.filter(requiredAttributes.contains)
.filterNot(partitionColumns.contains)
val outputSchema = readDataColumns.toStructType
logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")

val neededFieldNames = neededOutput.map(_.name).toSet
r.pruneColumns(StructType(prunedSchema.filter(f => neededFieldNames.contains(f.name))))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide the full code workflow to explain why this causes issues in ORC? I'm still not very sure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Prune nested schema:

    def pruneDataSchema(
    dataSchema: StructType,
    requestedRootFields: Seq[RootField]): StructType = {
    // Merge the requested root fields into a single schema. Note the ordering of the fields
    // in the resulting schema may differ from their ordering in the logical relation's
    // original schema
    val mergedSchema = requestedRootFields
    .map { case root: RootField => StructType(Array(root.field)) }
    .reduceLeft(_ merge _)
    val dataSchemaFieldNames = dataSchema.fieldNames.toSet
    val mergedDataSchema =
    StructType(mergedSchema.filter(f => dataSchemaFieldNames.contains(f.name)))
    // Sort the fields of mergedDataSchema according to their order in dataSchema,
    // recursively. This makes mergedDataSchema a pruned schema of dataSchema
    sortLeftFieldsByRight(mergedDataSchema, dataSchema).asInstanceOf[StructType]
    }

  2. Use this pruned nested schema to build the dataSchema in Relation

    if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
    val prunedRelation = leafNodeBuilder(prunedDataSchema)
    val projectionOverSchema = ProjectionOverSchema(prunedDataSchema)
    Some(buildNewProjection(normalizedProjects, normalizedFilters, prunedRelation,
    projectionOverSchema))

  3. The readDataColumns is the complete column pruning:

    val readDataColumns =
    dataColumns
    .filter(requiredAttributes.contains)
    .filterNot(partitionColumns.contains)
    val outputSchema = readDataColumns.toStructType
    logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")
    val outputAttributes = readDataColumns ++ partitionColumns
    val scan =
    FileSourceScanExec(
    fsRelation,
    outputAttributes,
    outputSchema,
    partitionKeyFilters.toSeq,
    bucketSet,
    None,
    dataFilters,
    table.map(_.identifier))

  4. dataSchema from relation.dataSchema. It is the pruned nested schema:

    lazy val inputRDD: RDD[InternalRow] = {
    val readFile: (PartitionedFile) => Iterator[InternalRow] =
    relation.fileFormat.buildReaderWithPartitionValues(
    sparkSession = relation.sparkSession,
    dataSchema = relation.dataSchema,
    partitionSchema = relation.partitionSchema,
    requiredSchema = requiredSchema,
    filters = pushedDownFilters,
    options = relation.options,
    hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))

  5. OrcUtils.requestedColumnIds use this pruned nested schema:

    val resultedColPruneInfo =
    Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader =>
    OrcUtils.requestedColumnIds(
    isCaseSensitive, dataSchema, requiredSchema, reader, conf)
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is because requestedColumnIds will check if given data schema has less fields than physical schema in ORC file.

Under nested column pruning, Spark will let data source use pruned schema as data schema to read files. E.g., Spark prune _col1, for the above example. But the ORC file has three top-level fields _col0, _col1, and _col2, so the check in requestedColumnIds will fail on the case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it because column pruning will be done by other rules so we don't need to consider it here?

Yes.

Hmm? In PushDownUtils.pruneColumns, if you enable nested column pruning, Spark will only run the path of nested column pruning, not the quoted L96-97.

// Sort the fields of mergedDataSchema according to their order in dataSchema,
// recursively. This makes mergedDataSchema a pruned schema of dataSchema
sortLeftFieldsByRight(mergedDataSchema, dataSchema).asInstanceOf[StructType]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,28 @@ class SchemaPruningSuite extends SparkFunSuite {
test("prune schema by the requested fields") {
def testPrunedSchema(
schema: StructType,
requestedFields: StructField*): Unit = {
requestedFields: Seq[StructField],
expectedSchema: StructType): Unit = {
val requestedRootFields = requestedFields.map { f =>
// `derivedFromAtt` doesn't affect the result of pruned schema.
SchemaPruning.RootField(field = f, derivedFromAtt = true)
}
val expectedSchema = SchemaPruning.pruneDataSchema(schema, requestedRootFields)
assert(expectedSchema == StructType(requestedFields))
val prunedSchema = SchemaPruning.pruneDataSchema(schema, requestedRootFields)
assert(prunedSchema === expectedSchema)
}

testPrunedSchema(StructType.fromDDL("a int, b int"), StructField("a", IntegerType))
testPrunedSchema(StructType.fromDDL("a int, b int"), StructField("b", IntegerType))
testPrunedSchema(
StructType.fromDDL("a int, b int"),
Seq(StructField("a", IntegerType)),
StructType.fromDDL("a int, b int"))

val structOfStruct = StructType.fromDDL("a struct<a:int, b:int>, b int")
testPrunedSchema(structOfStruct, StructField("a", StructType.fromDDL("a int, b int")))
testPrunedSchema(structOfStruct, StructField("b", IntegerType))
testPrunedSchema(structOfStruct, StructField("a", StructType.fromDDL("b int")))
testPrunedSchema(structOfStruct,
Seq(StructField("a", StructType.fromDDL("a int")), StructField("b", IntegerType)),
StructType.fromDDL("a struct<a:int>, b int"))
testPrunedSchema(structOfStruct,
Seq(StructField("a", StructType.fromDDL("a int"))),
StructType.fromDDL("a struct<a:int>, b int"))

val arrayOfStruct = StructField("a", ArrayType(StructType.fromDDL("a int, b int, c string")))
val mapOfStruct = StructField("d", MapType(StructType.fromDDL("a int, b int, c string"),
Expand All @@ -49,14 +55,31 @@ class SchemaPruningSuite extends SparkFunSuite {
arrayOfStruct :: StructField("b", structOfStruct) :: StructField("c", IntegerType) ::
mapOfStruct :: Nil)

testPrunedSchema(complexStruct, StructField("a", ArrayType(StructType.fromDDL("b int"))),
StructField("b", StructType.fromDDL("a int")))
testPrunedSchema(complexStruct,
StructField("a", ArrayType(StructType.fromDDL("b int, c string"))),
StructField("b", StructType.fromDDL("b int")))
Seq(StructField("a", ArrayType(StructType.fromDDL("b int"))),
StructField("b", StructType.fromDDL("a int"))),
StructType(
StructField("a", ArrayType(StructType.fromDDL("b int"))) ::
StructField("b", StructType.fromDDL("a int")) ::
StructField("c", IntegerType) ::
mapOfStruct :: Nil))
testPrunedSchema(complexStruct,
Seq(StructField("a", ArrayType(StructType.fromDDL("b int, c string"))),
StructField("b", StructType.fromDDL("b int"))),
StructType(
StructField("a", ArrayType(StructType.fromDDL("b int, c string"))) ::
StructField("b", StructType.fromDDL("b int")) ::
StructField("c", IntegerType) ::
mapOfStruct :: Nil))

val selectFieldInMap = StructField("d", MapType(StructType.fromDDL("a int, b int"),
StructType.fromDDL("e int, f string")))
testPrunedSchema(complexStruct, StructField("c", IntegerType), selectFieldInMap)
testPrunedSchema(complexStruct,
Seq(StructField("c", IntegerType), selectFieldInMap),
StructType(
arrayOfStruct ::
StructField("b", structOfStruct) ::
StructField("c", IntegerType) ::
selectFieldInMap :: Nil))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ object PushDownUtils extends PredicateHelper {
relation: DataSourceV2Relation,
projects: Seq[NamedExpression],
filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
val exprs = projects ++ filters
val requiredColumns = AttributeSet(exprs.flatMap(_.references))
val neededOutput = relation.output.filter(requiredColumns.contains)

scanBuilder match {
case r: SupportsPushDownRequiredColumns if SQLConf.get.nestedSchemaPruningEnabled =>
val rootFields = SchemaPruning.identifyRootFields(projects, filters)
Expand All @@ -89,14 +93,12 @@ object PushDownUtils extends PredicateHelper {
} else {
new StructType()
}
r.pruneColumns(prunedSchema)
val neededFieldNames = neededOutput.map(_.name).toSet
r.pruneColumns(StructType(prunedSchema.filter(f => neededFieldNames.contains(f.name))))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move filter logical from SchemaPruning to PushDownUtils to support datasource V2 column pruning.

val scan = r.build()
scan -> toOutputAttrs(scan.readSchema(), relation)

case r: SupportsPushDownRequiredColumns =>
val exprs = projects ++ filters
val requiredColumns = AttributeSet(exprs.flatMap(_.references))
val neededOutput = relation.output.filter(requiredColumns.contains)
r.pruneColumns(neededOutput.toStructType)
val scan = r.build()
// always project, in case the relation's output has been updated and doesn't match
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,4 +633,20 @@ class OrcSourceSuite extends OrcSuite with SharedSparkSession {
}
}
}

test("SPARK-34897: Support reconcile schemas based on index after nested column pruning") {
withTable("t1") {
spark.sql(
"""
|CREATE TABLE t1 (
| _col0 INT,
| _col1 STRING,
| _col2 STRUCT<c1: STRING, c2: STRING, c3: STRING, c4: BIGINT>)
|USING ORC
|""".stripMargin)

spark.sql("INSERT INTO t1 values(1, '2', struct('a', 'b', 'c', 10L))")
checkAnswer(spark.sql("SELECT _col0, _col2.c1 FROM t1"), Seq(Row(1, "a")))
}
}
}