Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34897][SQL] Support reconcile schemas based on index after nested column pruning #31993

Closed
wants to merge 9 commits into from

Conversation

wangyum
Copy link
Member

@wangyum wangyum commented Mar 29, 2021

What changes were proposed in this pull request?

It will remove StructField when pruning nested columns. For example:

spark.sql(
  """
    |CREATE TABLE t1 (
    |  _col0 INT,
    |  _col1 STRING,
    |  _col2 STRUCT<c1: STRING, c2: STRING, c3: STRING, c4: BIGINT>)
    |USING ORC
    |""".stripMargin)

spark.sql("INSERT INTO t1 values(1, '2', struct('a', 'b', 'c', 10L))")

spark.sql("SELECT _col0, _col2.c1 FROM t1").show

Before this pr. The returned schema is: `_col0` INT,`_col2` STRUCT<`c1`: STRING> add it will throw exception:

java.lang.AssertionError: assertion failed: The given data schema struct<_col0:int,_col2:struct<c1:string>> has less fields than the actual ORC physical schema, no idea which columns were dropped, fail to read.
	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.requestedColumnIds(OrcUtils.scala:160)

After this pr. The returned schema is: `_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING>.

The finally schema is `_col0` INT,`_col2` STRUCT<`c1`: STRING> after the complete column pruning:

val readDataColumns =
dataColumns
.filter(requiredAttributes.contains)
.filterNot(partitionColumns.contains)
val outputSchema = readDataColumns.toStructType
logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")

val neededFieldNames = neededOutput.map(_.name).toSet
r.pruneColumns(StructType(prunedSchema.filter(f => neededFieldNames.contains(f.name))))

Why are the changes needed?

Fix bug.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test.

@wangyum wangyum requested a review from cloud-fan March 29, 2021 09:21
@github-actions github-actions bot added the SQL label Mar 29, 2021
@cloud-fan
Copy link
Contributor

isn't it a bug? cc @viirya

@SparkQA
Copy link

SparkQA commented Mar 29, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41226/

@SparkQA
Copy link

SparkQA commented Mar 29, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41226/

@SparkQA
Copy link

SparkQA commented Mar 29, 2021

Test build #136644 has finished for PR 31993 at commit 2a3f136.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this seems to be a special case where nested column pruning doesn't work for ORC. For the case, it is needed to send entire unpruned data schema to ORC.

@viirya
Copy link
Member

viirya commented Mar 30, 2021

As nested column pruning rule is far from the point we get the physical information of ORC files, and this should be a narrow case, it looks okay to me to inform users a possible workaround here.

@wangyum
Copy link
Member Author

wangyum commented Mar 30, 2021

It is a Hive ORC table in our production environment.

@cloud-fan
Copy link
Contributor

Can we automatically disable nested column pruning at executor side when we find the orc file schema is the by-position style?

@wangyum
Copy link
Member Author

wangyum commented Mar 31, 2021

Can we disable column pruning when it is Hive ORC table?

private def canPruneRelation(fsRelation: HadoopFsRelation) =
fsRelation.fileFormat.isInstanceOf[ParquetFileFormat] ||
fsRelation.fileFormat.isInstanceOf[OrcFileFormat]

Update canPruneRelation to:

  private def canPruneRelation(fsRelation: HadoopFsRelation) = {
    fsRelation.fileFormat match {
      case _: ParquetFileFormat => true
      case _: OrcFileFormat =>
        fsRelation.location match {
          case c: CatalogFileIndex =>
            !c.table.provider.contains(DDLUtils.HIVE_PROVIDER)
          case _ => true
        }
    }
  }

@cloud-fan
Copy link
Contributor

Sorry I may miss something. Why it's only a problem in nested column pruning but not column pruning?

@wangyum wangyum changed the title [SPARK-34897][SQL] Add workaround to error message when OrcUtils.requestedColumnIds fails [SPARK-34897][SQL] Support reconcile schemas based on index after nested column pruning Apr 11, 2021
@wangyum
Copy link
Member Author

wangyum commented Apr 11, 2021

Sorry I may miss something. Why it's only a problem in nested column pruning but not column pruning?

Nested column pruning removed the field:

def pruneDataSchema(
dataSchema: StructType,
requestedRootFields: Seq[RootField]): StructType = {
// Merge the requested root fields into a single schema. Note the ordering of the fields
// in the resulting schema may differ from their ordering in the logical relation's
// original schema
val mergedSchema = requestedRootFields
.map { case root: RootField => StructType(Array(root.field)) }
.reduceLeft(_ merge _)
val dataSchemaFieldNames = dataSchema.fieldNames.toSet
val mergedDataSchema =
StructType(mergedSchema.filter(f => dataSchemaFieldNames.contains(f.name)))
// Sort the fields of mergedDataSchema according to their order in dataSchema,
// recursively. This makes mergedDataSchema a pruned schema of dataSchema
sortLeftFieldsByRight(mergedDataSchema, dataSchema).asInstanceOf[StructType]

@SparkQA
Copy link

SparkQA commented Apr 11, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41754/

@SparkQA
Copy link

SparkQA commented Apr 11, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41754/

@SparkQA
Copy link

SparkQA commented Apr 11, 2021

Test build #137176 has finished for PR 31993 at commit e64eb75.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -89,14 +93,12 @@ object PushDownUtils extends PredicateHelper {
} else {
new StructType()
}
r.pruneColumns(prunedSchema)
val neededFieldNames = neededOutput.map(_.name).toSet
r.pruneColumns(StructType(prunedSchema.filter(f => neededFieldNames.contains(f.name))))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move filter logical from SchemaPruning to PushDownUtils to support datasource V2 column pruning.

@wangyum wangyum requested a review from viirya April 13, 2021 10:38
val mergedDataSchema =
StructType(mergedSchema.filter(f => dataSchemaFieldNames.contains(f.name)))
StructType(dataSchema.map(s => mergedSchema.find(_.name.equals(s.name)).getOrElse(s)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the actual difference? can you give a simple example?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we don't prune anything from the root fields now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is the case please update the document of this method.

Copy link
Member Author

@wangyum wangyum Apr 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sql(
  """
    |CREATE TABLE t1 (
    |  _col0 INT,
    |  _col1 STRING,
    |  _col2 STRUCT<c1: STRING, c2: STRING, c3: STRING, c4: BIGINT>)
    |USING ORC
    |""".stripMargin)


spark.sql("SELECT _col0, _col2.c1 FROM t1").show

The origin schema is:

`_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING, `c2`: STRING, `c3`: STRING, `c4`: BIGINT> 

Before this PR, the pruneDataSchema returns:

`_col0` INT,`_col2` STRUCT<`c1`: STRING>

After this PR, the pruneDataSchema returns:

`_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING>

It only prune nested schemas.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's wrong with the previous behavior? We can't sacrifice performance for all the cases only because the ORC by ordinal case is problematic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it because column pruning will be done by other rules so we don't need to consider it here?

Yes.

val readDataColumns =
dataColumns
.filter(requiredAttributes.contains)
.filterNot(partitionColumns.contains)
val outputSchema = readDataColumns.toStructType
logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")

val neededFieldNames = neededOutput.map(_.name).toSet
r.pruneColumns(StructType(prunedSchema.filter(f => neededFieldNames.contains(f.name))))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide the full code workflow to explain why this causes issues in ORC? I'm still not very sure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Prune nested schema:

    def pruneDataSchema(
    dataSchema: StructType,
    requestedRootFields: Seq[RootField]): StructType = {
    // Merge the requested root fields into a single schema. Note the ordering of the fields
    // in the resulting schema may differ from their ordering in the logical relation's
    // original schema
    val mergedSchema = requestedRootFields
    .map { case root: RootField => StructType(Array(root.field)) }
    .reduceLeft(_ merge _)
    val dataSchemaFieldNames = dataSchema.fieldNames.toSet
    val mergedDataSchema =
    StructType(mergedSchema.filter(f => dataSchemaFieldNames.contains(f.name)))
    // Sort the fields of mergedDataSchema according to their order in dataSchema,
    // recursively. This makes mergedDataSchema a pruned schema of dataSchema
    sortLeftFieldsByRight(mergedDataSchema, dataSchema).asInstanceOf[StructType]
    }

  2. Use this pruned nested schema to build the dataSchema in Relation

    if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
    val prunedRelation = leafNodeBuilder(prunedDataSchema)
    val projectionOverSchema = ProjectionOverSchema(prunedDataSchema)
    Some(buildNewProjection(normalizedProjects, normalizedFilters, prunedRelation,
    projectionOverSchema))

  3. The readDataColumns is the complete column pruning:

    val readDataColumns =
    dataColumns
    .filter(requiredAttributes.contains)
    .filterNot(partitionColumns.contains)
    val outputSchema = readDataColumns.toStructType
    logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")
    val outputAttributes = readDataColumns ++ partitionColumns
    val scan =
    FileSourceScanExec(
    fsRelation,
    outputAttributes,
    outputSchema,
    partitionKeyFilters.toSeq,
    bucketSet,
    None,
    dataFilters,
    table.map(_.identifier))

  4. dataSchema from relation.dataSchema. It is the pruned nested schema:

    lazy val inputRDD: RDD[InternalRow] = {
    val readFile: (PartitionedFile) => Iterator[InternalRow] =
    relation.fileFormat.buildReaderWithPartitionValues(
    sparkSession = relation.sparkSession,
    dataSchema = relation.dataSchema,
    partitionSchema = relation.partitionSchema,
    requiredSchema = requiredSchema,
    filters = pushedDownFilters,
    options = relation.options,
    hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))

  5. OrcUtils.requestedColumnIds use this pruned nested schema:

    val resultedColPruneInfo =
    Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader =>
    OrcUtils.requestedColumnIds(
    isCaseSensitive, dataSchema, requiredSchema, reader, conf)
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is because requestedColumnIds will check if given data schema has less fields than physical schema in ORC file.

Under nested column pruning, Spark will let data source use pruned schema as data schema to read files. E.g., Spark prune _col1, for the above example. But the ORC file has three top-level fields _col0, _col1, and _col2, so the check in requestedColumnIds will fail on the case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it because column pruning will be done by other rules so we don't need to consider it here?

Yes.

Hmm? In PushDownUtils.pruneColumns, if you enable nested column pruning, Spark will only run the path of nested column pruning, not the quoted L96-97.

@SparkQA
Copy link

SparkQA commented Apr 16, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42024/

@SparkQA
Copy link

SparkQA commented Apr 16, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42024/

@SparkQA
Copy link

SparkQA commented Apr 16, 2021

Test build #137449 has finished for PR 31993 at commit a966bac.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42235/

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42235/

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Test build #137707 has finished for PR 31993 at commit 6112c9d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* and given requested field are "a", the field "b" is pruned in the returned schema.
* Note that schema field ordering at original schema is still preserved in pruned schema.
* Prunes the nested schema by the requested fields. For example, if the schema is:
* `id int, struct<a:int, b:int>`, and given requested field are "a", the field "b" is pruned
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

top-level columns need to have a name, id int, s struct<a:int, b:int>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and given requested field are "a" -> and given requested field "s.a"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the field "b" is pruned -> the inner field "b" ...

* Note that schema field ordering at original schema is still preserved in pruned schema.
* Prunes the nested schema by the requested fields. For example, if the schema is:
* `id int, struct<a:int, b:int>`, and given requested field are "a", the field "b" is pruned
* in the returned schema: `id int, struct<a:int>`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, id int, s struct<a:int>

@cloud-fan
Copy link
Contributor

@wangyum there are conflicts

# Conflicts:
#	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala
Comment on lines +89 to +130
val upperCaseSchema = StructType.fromDDL("A struct<A:int, B:int>, B int")
val lowerCaseSchema = StructType.fromDDL("a struct<a:int, b:int>, b int")
val upperCaseRequestedFields = Seq(StructField("A", StructType.fromDDL("A int")))
val lowerCaseRequestedFields = Seq(StructField("a", StructType.fromDDL("a int")))

Seq(true, false).foreach { isCaseSensitive =>
withSQLConf(CASE_SENSITIVE.key -> isCaseSensitive.toString) {
if (isCaseSensitive) {
// Schema is case-sensitive
val requestedFields = getRootFields(StructField("id", IntegerType))
val prunedSchema = SchemaPruning.pruneDataSchema(
StructType.fromDDL("ID int, name String"), requestedFields)
assert(prunedSchema == StructType(Seq.empty))
// Root fields are case-sensitive
val rootFieldsSchema = SchemaPruning.pruneDataSchema(
StructType.fromDDL("id int, name String"),
getRootFields(StructField("ID", IntegerType)))
assert(rootFieldsSchema == StructType(StructType(Seq.empty)))
testPrunedSchema(
upperCaseSchema,
upperCaseRequestedFields,
StructType.fromDDL("A struct<A:int>, B int"))
testPrunedSchema(
upperCaseSchema,
lowerCaseRequestedFields,
upperCaseSchema)

testPrunedSchema(
lowerCaseSchema,
upperCaseRequestedFields,
lowerCaseSchema)
testPrunedSchema(
lowerCaseSchema,
lowerCaseRequestedFields,
StructType.fromDDL("a struct<a:int>, b int"))
} else {
// Schema is case-insensitive
val prunedSchema = SchemaPruning.pruneDataSchema(
StructType.fromDDL("ID int, name String"),
getRootFields(StructField("id", IntegerType)))
assert(prunedSchema == StructType(StructField("ID", IntegerType) :: Nil))
// Root fields are case-insensitive
val rootFieldsSchema = SchemaPruning.pruneDataSchema(
StructType.fromDDL("id int, name String"),
getRootFields(StructField("ID", IntegerType)))
assert(rootFieldsSchema == StructType(StructField("id", IntegerType) :: Nil))
Seq(upperCaseRequestedFields, lowerCaseRequestedFields).foreach { requestedFields =>
testPrunedSchema(
upperCaseSchema,
requestedFields,
StructType.fromDDL("A struct<A:int>, B int"))
}

Seq(upperCaseRequestedFields, lowerCaseRequestedFields).foreach { requestedFields =>
testPrunedSchema(
lowerCaseSchema,
requestedFields,
StructType.fromDDL("a struct<a:int>, b int"))
}
}
}
})
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests LGTM, thanks for add more scenarios

@wangyum
Copy link
Member Author

wangyum commented Apr 21, 2021

@wangyum there are conflicts

Fixed.

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42257/

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42257/

@cloud-fan
Copy link
Contributor

how far shall we backport? to 3.0?

@wangyum
Copy link
Member Author

wangyum commented Apr 21, 2021

Yes. to 3.0.

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Test build #137730 has finished for PR 31993 at commit 4d0b510.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@viirya
Copy link
Member

viirya commented Apr 21, 2021

Thanks! Merging to master.

@viirya viirya closed this in e609395 Apr 21, 2021
@viirya
Copy link
Member

viirya commented Apr 21, 2021

@wangyum There are conflicts in 3.1/3.0. Can you create backport PRs? Thanks.

@wangyum wangyum deleted the SPARK-34897 branch April 22, 2021 03:16
wangyum added a commit that referenced this pull request Apr 23, 2021
…r nested column pruning

This PR backports #31993 to branch-3.1. The origin PR description:

### What changes were proposed in this pull request?

It will remove `StructField` when [pruning nested columns](https://github.com/apache/spark/blob/0f2c0b53e8fb18c86c67b5dd679c006db93f94a5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala#L28-L42). For example:
```scala
spark.sql(
  """
    |CREATE TABLE t1 (
    |  _col0 INT,
    |  _col1 STRING,
    |  _col2 STRUCT<c1: STRING, c2: STRING, c3: STRING, c4: BIGINT>)
    |USING ORC
    |""".stripMargin)

spark.sql("INSERT INTO t1 values(1, '2', struct('a', 'b', 'c', 10L))")

spark.sql("SELECT _col0, _col2.c1 FROM t1").show
```

Before this pr. The returned schema is: ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` add it will throw exception:
```
java.lang.AssertionError: assertion failed: The given data schema struct<_col0:int,_col2:struct<c1:string>> has less fields than the actual ORC physical schema, no idea which columns were dropped, fail to read.
	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.requestedColumnIds(OrcUtils.scala:160)
```

After this pr. The returned schema is: ``` `_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING> ```.

The finally schema is ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` after the complete column pruning:
https://github.com/apache/spark/blob/7a5647a93aaea9d1d78d9262e24fc8c010db04d0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L208-L213

https://github.com/apache/spark/blob/e64eb75aede71a5403a4d4436e63b1fcfdeca14d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala#L96-L97

### Why are the changes needed?

Fix bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #32279 from wangyum/SPARK-34897-3.1.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
wangyum added a commit that referenced this pull request Apr 24, 2021
…r nested column pruning

This PR backports #31993 to branch-3.0. The origin PR description:

### What changes were proposed in this pull request?

It will remove `StructField` when [pruning nested columns](https://github.com/apache/spark/blob/0f2c0b53e8fb18c86c67b5dd679c006db93f94a5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala#L28-L42). For example:
```scala
spark.sql(
  """
    |CREATE TABLE t1 (
    |  _col0 INT,
    |  _col1 STRING,
    |  _col2 STRUCT<c1: STRING, c2: STRING, c3: STRING, c4: BIGINT>)
    |USING ORC
    |""".stripMargin)

spark.sql("INSERT INTO t1 values(1, '2', struct('a', 'b', 'c', 10L))")

spark.sql("SELECT _col0, _col2.c1 FROM t1").show
```

Before this pr. The returned schema is: ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` add it will throw exception:
```
java.lang.AssertionError: assertion failed: The given data schema struct<_col0:int,_col2:struct<c1:string>> has less fields than the actual ORC physical schema, no idea which columns were dropped, fail to read.
	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.requestedColumnIds(OrcUtils.scala:160)
```

After this pr. The returned schema is: ``` `_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING> ```.

The finally schema is ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` after the complete column pruning:
https://github.com/apache/spark/blob/7a5647a93aaea9d1d78d9262e24fc8c010db04d0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L208-L213

https://github.com/apache/spark/blob/e64eb75aede71a5403a4d4436e63b1fcfdeca14d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala#L96-L97

### Why are the changes needed?

Fix bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #32310 from wangyum/SPARK-34897-3.0.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
flyrain pushed a commit to flyrain/spark that referenced this pull request Sep 21, 2021
…r nested column pruning

This PR backports apache#31993 to branch-3.1. The origin PR description:

It will remove `StructField` when [pruning nested columns](https://github.com/apache/spark/blob/0f2c0b53e8fb18c86c67b5dd679c006db93f94a5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala#L28-L42). For example:
```scala
spark.sql(
  """
    |CREATE TABLE t1 (
    |  _col0 INT,
    |  _col1 STRING,
    |  _col2 STRUCT<c1: STRING, c2: STRING, c3: STRING, c4: BIGINT>)
    |USING ORC
    |""".stripMargin)

spark.sql("INSERT INTO t1 values(1, '2', struct('a', 'b', 'c', 10L))")

spark.sql("SELECT _col0, _col2.c1 FROM t1").show
```

Before this pr. The returned schema is: ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` add it will throw exception:
```
java.lang.AssertionError: assertion failed: The given data schema struct<_col0:int,_col2:struct<c1:string>> has less fields than the actual ORC physical schema, no idea which columns were dropped, fail to read.
	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.requestedColumnIds(OrcUtils.scala:160)
```

After this pr. The returned schema is: ``` `_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING> ```.

The finally schema is ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` after the complete column pruning:
https://github.com/apache/spark/blob/7a5647a93aaea9d1d78d9262e24fc8c010db04d0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L208-L213

https://github.com/apache/spark/blob/e64eb75aede71a5403a4d4436e63b1fcfdeca14d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala#L96-L97

Fix bug.

No.

Unit test.

Closes apache#32279 from wangyum/SPARK-34897-3.1.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
fishcus pushed a commit to fishcus/spark that referenced this pull request Jan 12, 2022
…r nested column pruning

This PR backports apache#31993 to branch-3.1. The origin PR description:

### What changes were proposed in this pull request?

It will remove `StructField` when [pruning nested columns](https://github.com/apache/spark/blob/0f2c0b53e8fb18c86c67b5dd679c006db93f94a5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala#L28-L42). For example:
```scala
spark.sql(
  """
    |CREATE TABLE t1 (
    |  _col0 INT,
    |  _col1 STRING,
    |  _col2 STRUCT<c1: STRING, c2: STRING, c3: STRING, c4: BIGINT>)
    |USING ORC
    |""".stripMargin)

spark.sql("INSERT INTO t1 values(1, '2', struct('a', 'b', 'c', 10L))")

spark.sql("SELECT _col0, _col2.c1 FROM t1").show
```

Before this pr. The returned schema is: ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` add it will throw exception:
```
java.lang.AssertionError: assertion failed: The given data schema struct<_col0:int,_col2:struct<c1:string>> has less fields than the actual ORC physical schema, no idea which columns were dropped, fail to read.
	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.requestedColumnIds(OrcUtils.scala:160)
```

After this pr. The returned schema is: ``` `_col0` INT,`_col1` STRING,`_col2` STRUCT<`c1`: STRING> ```.

The finally schema is ``` `_col0` INT,`_col2` STRUCT<`c1`: STRING> ``` after the complete column pruning:
https://github.com/apache/spark/blob/7a5647a93aaea9d1d78d9262e24fc8c010db04d0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L208-L213

https://github.com/apache/spark/blob/e64eb75aede71a5403a4d4436e63b1fcfdeca14d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala#L96-L97

### Why are the changes needed?

Fix bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes apache#32279 from wangyum/SPARK-34897-3.1.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants