Skip to content

Commit

Permalink
Address comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Aug 7, 2020
1 parent 0747fcd commit 558db46
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ trait OrcFiltersBase {
* This method returns a map which contains ORC field name and data type. Each key
* represents a column; `dots` are used as separators for nested columns. If any part
* of the names contains `dots`, it is quoted to avoid confusion. See
* `org.apache.spark.sql.connector.catalog.quote` for implementation details.
* `org.apache.spark.sql.connector.catalog.quoted` for implementation details.
*
* BinaryType, UserDefinedType, ArrayType and MapType are ignored.
*/
protected[sql] def getNameToOrcFieldMap(
protected[sql] def isSearchableType(
schema: StructType,
caseSensitive: Boolean): Map[String, DataType] = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
Expand All @@ -69,11 +71,10 @@ trait OrcFiltersBase {
if (caseSensitive) {
primitiveFields.toMap
} else {
// Don't consider ambiguity here, i.e. more than one field is matched in case insensitive
// Don't consider ambiguity here, i.e. more than one field are matched in case insensitive
// mode, just skip pushdown for these fields, they will trigger Exception when reading,
// See: SPARK-25175.
val dedupPrimitiveFields =
primitiveFields
val dedupPrimitiveFields = primitiveFields
.groupBy(_._1.toLowerCase(Locale.ROOT))
.filter(_._2.size == 1)
.mapValues(_.head._2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ case class OrcScanBuilder(
// changed `hadoopConf` in executors.
OrcInputFormat.setSearchArgument(hadoopConf, f, schema.fieldNames)
}
val dataTypeMap = OrcFilters.getNameToOrcFieldMap(schema, SQLConf.get.caseSensitiveAnalysis)
val dataTypeMap = OrcFilters.isSearchableType(schema, SQLConf.get.caseSensitiveAnalysis)
_pushedFilters = OrcFilters.convertibleFilters(schema, dataTypeMap, filters).toArray
}
filters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor
(f: String => Unit): Unit = withDataSourceFile(data)(f)

/**
* Writes `date` dataframe to a Orc file and reads it back as a `DataFrame`,
* Writes `data` to a Orc file and reads it back as a `DataFrame`,
* which is then passed to `f`. The Orc file will be deleted after `f` returns.
*/
protected def withOrcDataFrame[T <: Product: ClassTag: TypeTag]
Expand Down Expand Up @@ -149,7 +149,7 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor
* dataframes as new test data. It tests both non-nested and nested dataframes
* which are written and read back with Orc datasource.
*
* This is different from [[OrcTest.withOrcDataFrame]] which does not
* This is different from [[withOrcDataFrame]] which does not
* test nested cases.
*/
protected def withNestedOrcDataFrame[T <: Product: ClassTag: TypeTag](data: Seq[T])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private[sql] object OrcFilters extends OrcFiltersBase {
* Create ORC filter as a SearchArgument instance.
*/
def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = {
val dataTypeMap = OrcFilters.getNameToOrcFieldMap(schema, SQLConf.get.caseSensitiveAnalysis)
val dataTypeMap = OrcFilters.isSearchableType(schema, SQLConf.get.caseSensitiveAnalysis)
// Combines all convertible filters using `And` to produce a single conjunction
val conjunctionOptional = buildTree(convertibleFilters(schema, dataTypeMap, filters))
conjunctionOptional.map { conjunction =>
Expand Down Expand Up @@ -226,8 +226,6 @@ private[sql] object OrcFilters extends OrcFiltersBase {
// NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()`
// call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be
// wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
// Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters
// in order to distinguish predicate pushdown for nested columns.
expression match {
case EqualTo(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private[sql] object OrcFilters extends OrcFiltersBase {
* Create ORC filter as a SearchArgument instance.
*/
def createFilter(schema: StructType, filters: Seq[Filter]): Option[SearchArgument] = {
val dataTypeMap = OrcFilters.getNameToOrcFieldMap(schema, SQLConf.get.caseSensitiveAnalysis)
val dataTypeMap = OrcFilters.isSearchableType(schema, SQLConf.get.caseSensitiveAnalysis)
// Combines all convertible filters using `And` to produce a single conjunction
val conjunctionOptional = buildTree(convertibleFilters(schema, dataTypeMap, filters))
conjunctionOptional.map { conjunction =>
Expand Down Expand Up @@ -226,8 +226,6 @@ private[sql] object OrcFilters extends OrcFiltersBase {
// NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()`
// call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be
// wrapped by a "parent" predicate (`And`, `Or`, or `Not`).
// Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters
// in order to distinguish predicate pushdown for nested columns.
expression match {
case EqualTo(name, value) if dataTypeMap.contains(name) =>
val castedValue = castLiteralValue(value, dataTypeMap(name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession {

test("filter pushdown - double") {
withNestedOrcDataFrame(
(1 to 4).map(i => Tuple1(Option(i.toDouble)))) { case (inputDF, colName, _) =>
(1 to 4).map(i => Tuple1(Option(i.toDouble)))) { case (inputDF, colName, _) =>
implicit val df: DataFrame = inputDF

val doubleAttr = df(colName).expr
Expand Down Expand Up @@ -254,8 +254,8 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession {
}

test("filter pushdown - decimal") {
withNestedOrcDataFrame((1 to 4)
.map(i => Tuple1.apply(BigDecimal.valueOf(i)))) { case (inputDF, colName, _) =>
withNestedOrcDataFrame(
(1 to 4).map(i => Tuple1.apply(BigDecimal.valueOf(i)))) { case (inputDF, colName, _) =>
implicit val df: DataFrame = inputDF

val decimalAttr = df(colName).expr
Expand Down

0 comments on commit 558db46

Please sign in to comment.