Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25557][SQL] Nested column predicate pushdown for ORC #28761

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2080,9 +2080,9 @@ object SQLConf {
.doc("A comma-separated list of data source short names or fully qualified data source " +
"implementation class names for which Spark tries to push down predicates for nested " +
"columns and/or names containing `dots` to data sources. This configuration is only " +
"effective with file-based data source in DSv1. Currently, Parquet implements " +
"both optimizations while ORC only supports predicates for names containing `dots`. The " +
"other data sources don't support this feature yet. So the default value is 'parquet,orc'.")
"effective with file-based data source in DSv1. Currently, Parquet and ORC implement " +
viirya marked this conversation as resolved.
Show resolved Hide resolved
"both optimizations. The other data sources don't support this feature yet. So the " +
"default value is 'parquet,orc'.")
.version("3.0.0")
.stringConf
.createWithDefault("parquet,orc")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,8 @@ abstract class PushableColumnBase {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
def helper(e: Expression): Option[Seq[String]] = e match {
case a: Attribute =>
// Attribute that contains dot "." in name is supported only when
// nested predicate pushdown is enabled.
if (nestedPredicatePushdownEnabled || !a.name.contains(".")) {
Some(Seq(a.name))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.spark.sql.execution.datasources.orc

import java.util.Locale

import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.sources.{And, Filter}
import org.apache.spark.sql.types.{AtomicType, BinaryType, DataType}
import org.apache.spark.sql.types.{AtomicType, BinaryType, DataType, StructField, StructType}

/**
* Methods that can be shared when upgrading the built-in Hive.
Expand All @@ -37,12 +40,44 @@ trait OrcFiltersBase {
}

/**
* Return true if this is a searchable type in ORC.
* Both CharType and VarcharType are cleaned at AstBuilder.
* This method returns a map which contains ORC field name and data type. Each key
* represents a column; `dots` are used as separators for nested columns. If any part
* of the names contains `dots`, it is quoted to avoid confusion. See
* `org.apache.spark.sql.connector.catalog.quote` for implementation details.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quote -> quoted.

*/
protected[sql] def isSearchableType(dataType: DataType) = dataType match {
case BinaryType => false
case _: AtomicType => true
case _ => false
protected[sql] def getNameToOrcFieldMap(
Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. OrcField looks a little mismatched because this function returns DataType instead of a field. Currently, it sounds likes ToOrcField.
  2. According to the behavior of this function, this ignores BinaryType, complexType, UserDefinedType. Also, function description doesn't mention the limitation at all. In order to be more clear, we had better have Searchable in the function name like the previous one (isSearchableType).

schema: StructType,
caseSensitive: Boolean): Map[String, DataType] = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
viirya marked this conversation as resolved.
Show resolved Hide resolved

def getPrimitiveFields(
fields: Seq[StructField],
parentFieldNames: Array[String] = Array.empty): Seq[(String, DataType)] = {
viirya marked this conversation as resolved.
Show resolved Hide resolved
fields.flatMap { f =>
f.dataType match {
case st: StructType =>
getPrimitiveFields(st.fields.toSeq, parentFieldNames :+ f.name)
case BinaryType => None
case _: AtomicType =>
Some(((parentFieldNames :+ f.name).toSeq.quoted, f.dataType))
viirya marked this conversation as resolved.
Show resolved Hide resolved
case _ => None
}
}
}

val primitiveFields = getPrimitiveFields(schema.fields)
if (caseSensitive) {
primitiveFields.toMap
dongjoon-hyun marked this conversation as resolved.
Show resolved Hide resolved
} else {
// Don't consider ambiguity here, i.e. more than one field is matched in case insensitive
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is matched -> are matched?

// mode, just skip pushdown for these fields, they will trigger Exception when reading,
// See: SPARK-25175.
val dedupPrimitiveFields =
primitiveFields
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation?

- val dedupPrimitiveFields =
- primitiveFields
+ val dedupPrimitiveFields = primitiveFields

.groupBy(_._1.toLowerCase(Locale.ROOT))
.filter(_._2.size == 1)
.mapValues(_.head._2)
CaseInsensitiveMap(dedupPrimitiveFields)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters}
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.orc.OrcFilters
import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -60,10 +61,8 @@ case class OrcScanBuilder(
// changed `hadoopConf` in executors.
OrcInputFormat.setSearchArgument(hadoopConf, f, schema.fieldNames)
}
val dataTypeMap = schema.map(f => quoteIfNeeded(f.name) -> f.dataType).toMap
// TODO (SPARK-25557): ORC doesn't support nested predicate pushdown, so they are removed.
val newFilters = filters.filter(!_.containsNestedColumn)
_pushedFilters = OrcFilters.convertibleFilters(schema, dataTypeMap, newFilters).toArray
dongjoon-hyun marked this conversation as resolved.
Show resolved Hide resolved
val dataTypeMap = OrcFilters.getNameToOrcFieldMap(schema, SQLConf.get.caseSensitiveAnalysis)
_pushedFilters = OrcFilters.convertibleFilters(schema, dataTypeMap, filters).toArray
}
filters
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.struct
import org.apache.spark.sql.types.StructType

/**
* Defines common stuff for nested column predicate pushdown test, e.g. `ParquetFilterSuite`.
*/
trait NestedColumnPredicateTest {
/**
* Takes single level `inputDF` dataframe to generate multi-level nested
* dataframes as new test data.
*
* This method accepts a function to run test. The given function takes three
* parameters: a DataFrame which ranges from zero-nested to multi-level nested,
* a string of the primitive field name, and a function that produces expected
* result of collected column.
*/
protected def withNestedDataFrame(inputDF: DataFrame)
(runTest: (DataFrame, String, Any => Any) => Unit): Unit = {
assert(inputDF.schema.fields.length == 1)
assert(!inputDF.schema.fields.head.dataType.isInstanceOf[StructType])
val df = inputDF.toDF("temp")
Seq(
(
df.withColumnRenamed("temp", "a"),
"a", // zero nesting
(x: Any) => x),
(
df.withColumn("a", struct(df("temp") as "b")).drop("temp"),
"a.b", // one level nesting
(x: Any) => Row(x)),
(
df.withColumn("a", struct(struct(df("temp") as "c") as "b")).drop("temp"),
"a.b.c", // two level nesting
(x: Any) => Row(Row(x))
),
(
df.withColumnRenamed("temp", "a.b"),
"`a.b`", // zero nesting with column name containing `dots`
(x: Any) => x
),
(
df.withColumn("a.b", struct(df("temp") as "c.d") ).drop("temp"),
"`a.b`.`c.d`", // one level nesting with column names containing `dots`
(x: Any) => Row(x)
)
).foreach { case (df, colName, resultFun) =>
runTest(df, colName, resultFun)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,16 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor
(f: String => Unit): Unit = withDataSourceFile(data)(f)

/**
* Writes `data` to a Orc file and reads it back as a `DataFrame`,
* Writes `df` dataframe to a Orc file and reads it back as a `DataFrame`,
* which is then passed to `f`. The Orc file will be deleted after `f` returns.
*/
protected def withOrcDataFrame[T <: Product: ClassTag: TypeTag]
(data: Seq[T], testVectorized: Boolean = true)
(f: DataFrame => Unit): Unit = withDataSourceDataFrame(data, testVectorized)(f)
protected def withOrcDataFrame(df: DataFrame, testVectorized: Boolean = true)
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
(f: DataFrame => Unit): Unit = {
withTempPath { file =>
df.write.format(dataSourceName).save(file.getCanonicalPath)
readFile(file.getCanonicalPath, testVectorized)(f)
}
}

/**
* Writes `data` to a Orc file, reads it back as a `DataFrame` and registers it as a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.orc
import scala.collection.JavaConverters._

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.{Column, DataFrame, Row}
viirya marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{Attribute, Predicate}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation, PushableColumnAndNestedColumn}
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation, NestedColumnPredicateTest, PushableColumnAndNestedColumn}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -62,7 +62,8 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2}
* dependent on this configuration, don't forget you better explicitly set this configuration
* within the test.
*/
abstract class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSparkSession {
abstract class ParquetFilterSuite
extends QueryTest with ParquetTest with NestedColumnPredicateTest with SharedSparkSession {

protected def createParquetFilters(
schema: MessageType,
Expand Down Expand Up @@ -105,44 +106,6 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
checkFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
}

/**
* Takes single level `inputDF` dataframe to generate multi-level nested
* dataframes as new test data.
*/
private def withNestedDataFrame(inputDF: DataFrame)
(runTest: (DataFrame, String, Any => Any) => Unit): Unit = {
assert(inputDF.schema.fields.length == 1)
assert(!inputDF.schema.fields.head.dataType.isInstanceOf[StructType])
val df = inputDF.toDF("temp")
Seq(
(
df.withColumnRenamed("temp", "a"),
"a", // zero nesting
(x: Any) => x),
(
df.withColumn("a", struct(df("temp") as "b")).drop("temp"),
"a.b", // one level nesting
(x: Any) => Row(x)),
(
df.withColumn("a", struct(struct(df("temp") as "c") as "b")).drop("temp"),
"a.b.c", // two level nesting
(x: Any) => Row(Row(x))
),
(
df.withColumnRenamed("temp", "a.b"),
"`a.b`", // zero nesting with column name containing `dots`
(x: Any) => x
),
(
df.withColumn("a.b", struct(df("temp") as "c.d") ).drop("temp"),
"`a.b`.`c.d`", // one level nesting with column names containing `dots`
(x: Any) => Row(x)
)
).foreach { case (df, colName, resultFun) =>
runTest(df, colName, resultFun)
}
}

private def testTimestampPushdown(data: Seq[String], java8Api: Boolean): Unit = {
implicit class StringToTs(s: String) {
def ts: Timestamp = Timestamp.valueOf(s)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.orc.storage.serde2.io.HiveDecimalWritable

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.quoteIfNeeded
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -68,11 +68,9 @@ private[sql] object OrcFilters extends OrcFiltersBase {
* Create ORC filter as a SearchArgument instance.
*/
def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = {
val dataTypeMap = schema.map(f => quoteIfNeeded(f.name) -> f.dataType).toMap
val dataTypeMap = OrcFilters.getNameToOrcFieldMap(schema, SQLConf.get.caseSensitiveAnalysis)
// Combines all convertible filters using `And` to produce a single conjunction
// TODO (SPARK-25557): ORC doesn't support nested predicate pushdown, so they are removed.
val newFilters = filters.filter(!_.containsNestedColumn)
val conjunctionOptional = buildTree(convertibleFilters(schema, dataTypeMap, newFilters))
val conjunctionOptional = buildTree(convertibleFilters(schema, dataTypeMap, filters))
conjunctionOptional.map { conjunction =>
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate.
// The input predicate is fully convertible. There should not be any empty result in the
Expand Down Expand Up @@ -231,37 +229,37 @@ private[sql] object OrcFilters extends OrcFiltersBase {
// Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters
// in order to distinguish predicate pushdown for nested columns.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we removed quoteIfNeeded in this file completely, I believe we can remove this old comment (231~232) together in both files v1.2(here) and v2.3.

expression match {
case EqualTo(name, value) if isSearchableType(dataTypeMap(name)) =>
case EqualTo(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startAnd().equals(name, getType(name), castedValue).end())

case EqualNullSafe(name, value) if isSearchableType(dataTypeMap(name)) =>
case EqualNullSafe(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startAnd().nullSafeEquals(name, getType(name), castedValue).end())

case LessThan(name, value) if isSearchableType(dataTypeMap(name)) =>
case LessThan(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startAnd().lessThan(name, getType(name), castedValue).end())

case LessThanOrEqual(name, value) if isSearchableType(dataTypeMap(name)) =>
case LessThanOrEqual(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startAnd().lessThanEquals(name, getType(name), castedValue).end())

case GreaterThan(name, value) if isSearchableType(dataTypeMap(name)) =>
case GreaterThan(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startNot().lessThanEquals(name, getType(name), castedValue).end())

case GreaterThanOrEqual(name, value) if isSearchableType(dataTypeMap(name)) =>
case GreaterThanOrEqual(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Some(builder.startNot().lessThan(name, getType(name), castedValue).end())

case IsNull(name) if isSearchableType(dataTypeMap(name)) =>
case IsNull(name) if dataTypeMap.contains(name) =>
Some(builder.startAnd().isNull(name, getType(name)).end())

case IsNotNull(name) if isSearchableType(dataTypeMap(name)) =>
case IsNotNull(name) if dataTypeMap.contains(name) =>
Some(builder.startNot().isNull(name, getType(name)).end())

case In(name, values) if isSearchableType(dataTypeMap(name)) =>
case In(name, values) if dataTypeMap.contains(name) =>
val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(name)))
Some(builder.startAnd().in(name, getType(name),
castedValues.map(_.asInstanceOf[AnyRef]): _*).end())
Expand Down
Loading