Skip to content

Commit

Permalink
[SPARK-2883] [SQL] ORC data source for Spark SQL
Browse files Browse the repository at this point in the history
This PR updates PR #6135 authored by zhzhan from Hortonworks.

----

This PR implements a Spark SQL data source for accessing ORC files.

> **NOTE**
>
> Although ORC is now an Apache TLP, the codebase is still tightly coupled with Hive.  That's why the new ORC data source is under `org.apache.spark.sql.hive` package, and must be used with `HiveContext`.  However, it doesn't require existing Hive installation to access ORC files.

1.  Saving/loading ORC files without contacting Hive metastore

1.  Support for complex data types (i.e. array, map, and struct)

1.  Aware of common optimizations provided by Spark SQL:

    - Column pruning
    - Partitioning pruning
    - Filter push-down

1.  Schema evolution support
1.  Hive metastore table conversion

This PR also include initial work done by scwf from Huawei (PR #3753).

Author: Zhan Zhang <[email protected]>
Author: Cheng Lian <[email protected]>

Closes #6194 from liancheng/polishing-orc and squashes the following commits:

55ecd96 [Cheng Lian] Reorganizes ORC test suites
d4afeed [Cheng Lian] Addresses comments
21ada22 [Cheng Lian] Adds @SInCE and @experimental annotations
128bd3b [Cheng Lian] ORC filter bug fix
d734496 [Cheng Lian] Polishes the ORC data source
2650a42 [Zhan Zhang] resolve review comments
3c9038e [Zhan Zhang] resolve review comments
7b3c7c5 [Zhan Zhang] save mode fix
f95abfd [Zhan Zhang] reuse test suite
7cc2c64 [Zhan Zhang] predicate fix
4e61c16 [Zhan Zhang] minor change
305418c [Zhan Zhang] orc data source support
  • Loading branch information
zhzhan authored and marmbrus committed May 18, 2015
1 parent 9c7e802 commit aa31e43
Show file tree
Hide file tree
Showing 14 changed files with 1,477 additions and 76 deletions.
7 changes: 6 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ private[spark] object SQLConf {
val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
val PARQUET_USE_DATA_SOURCE_API = "spark.sql.parquet.useDataSourceApi"

val ORC_FILTER_PUSHDOWN_ENABLED = "spark.sql.orc.filterPushdown"

val HIVE_VERIFY_PARTITIONPATH = "spark.sql.hive.verifyPartitionPath"

val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
Expand Down Expand Up @@ -143,6 +145,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def parquetUseDataSourceApi =
getConf(PARQUET_USE_DATA_SOURCE_API, "true").toBoolean

private[spark] def orcFilterPushDown =
getConf(ORC_FILTER_PUSHDOWN_ENABLED, "false").toBoolean

/** When true uses verifyPartitionPath to prune the path which is not exists. */
private[spark] def verifyPartitionPath =
getConf(HIVE_VERIFY_PARTITIONPATH, "true").toBoolean
Expand Down Expand Up @@ -254,7 +259,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {

private[spark] def dataFrameRetainGroupColumns: Boolean =
getConf(DATAFRAME_RETAIN_GROUP_COLUMNS, "true").toBoolean

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ import java.io.File

import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import scala.util.Try

import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.util.Utils
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.{DataFrame, SaveMode}

/**
* A helper trait that provides convenient facilities for Parquet testing.
Expand All @@ -33,54 +32,9 @@ import org.apache.spark.util.Utils
* convenient to use tuples rather than special case classes when writing test cases/suites.
* Especially, `Tuple1.apply` can be used to easily wrap a single type/value.
*/
private[sql] trait ParquetTest {
val sqlContext: SQLContext

private[sql] trait ParquetTest extends SQLTestUtils {
import sqlContext.implicits.{localSeqToDataFrameHolder, rddToDataFrameHolder}
import sqlContext.{conf, sparkContext}

protected def configuration = sparkContext.hadoopConfiguration

/**
* Sets all SQL configurations specified in `pairs`, calls `f`, and then restore all SQL
* configurations.
*
* @todo Probably this method should be moved to a more general place
*/
protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
val (keys, values) = pairs.unzip
val currentValues = keys.map(key => Try(conf.getConf(key)).toOption)
(keys, values).zipped.foreach(conf.setConf)
try f finally {
keys.zip(currentValues).foreach {
case (key, Some(value)) => conf.setConf(key, value)
case (key, None) => conf.unsetConf(key)
}
}
}

/**
* Generates a temporary path without creating the actual file/directory, then pass it to `f`. If
* a file/directory is created there by `f`, it will be delete after `f` returns.
*
* @todo Probably this method should be moved to a more general place
*/
protected def withTempPath(f: File => Unit): Unit = {
val path = Utils.createTempDir()
path.delete()
try f(path) finally Utils.deleteRecursively(path)
}

/**
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
* returns.
*
* @todo Probably this method should be moved to a more general place
*/
protected def withTempDir(f: File => Unit): Unit = {
val dir = Utils.createTempDir().getCanonicalFile
try f(dir) finally Utils.deleteRecursively(dir)
}
import sqlContext.sparkContext

/**
* Writes `data` to a Parquet file, which is then passed to `f` and will be deleted after `f`
Expand All @@ -105,13 +59,6 @@ private[sql] trait ParquetTest {
withParquetFile(data)(path => f(sqlContext.read.parquet(path)))
}

/**
* Drops temporary table `tableName` after calling `f`.
*/
protected def withTempTable(tableName: String)(f: => Unit): Unit = {
try f finally sqlContext.dropTempTable(tableName)
}

/**
* Writes `data` to a Parquet file, reads it back as a [[DataFrame]] and registers it as a
* temporary table named `tableName`, then call `f`. The temporary table together with the
Expand Down
18 changes: 12 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -188,18 +188,20 @@ private[sql] class DDLParser(
private[sql] object ResolvedDataSource {

private val builtinSources = Map(
"jdbc" -> classOf[org.apache.spark.sql.jdbc.DefaultSource],
"json" -> classOf[org.apache.spark.sql.json.DefaultSource],
"parquet" -> classOf[org.apache.spark.sql.parquet.DefaultSource]
"jdbc" -> "org.apache.spark.sql.jdbc.DefaultSource",
"json" -> "org.apache.spark.sql.json.DefaultSource",
"parquet" -> "org.apache.spark.sql.parquet.DefaultSource",
"orc" -> "org.apache.spark.sql.hive.orc.DefaultSource"
)

/** Given a provider name, look up the data source class definition. */
def lookupDataSource(provider: String): Class[_] = {
val loader = Utils.getContextOrSparkClassLoader

if (builtinSources.contains(provider)) {
return builtinSources(provider)
return loader.loadClass(builtinSources(provider))
}

val loader = Utils.getContextOrSparkClassLoader
try {
loader.loadClass(provider)
} catch {
Expand All @@ -208,7 +210,11 @@ private[sql] object ResolvedDataSource {
loader.loadClass(provider + ".DefaultSource")
} catch {
case cnf: java.lang.ClassNotFoundException =>
sys.error(s"Failed to load class for data source: $provider")
if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
sys.error("The ORC data source must be used with Hive support enabled.")
} else {
sys.error(s"Failed to load class for data source: $provider")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.test

import java.io.File

import scala.util.Try

import org.apache.spark.sql.SQLContext
import org.apache.spark.util.Utils

trait SQLTestUtils {
val sqlContext: SQLContext

import sqlContext.{conf, sparkContext}

protected def configuration = sparkContext.hadoopConfiguration

/**
* Sets all SQL configurations specified in `pairs`, calls `f`, and then restore all SQL
* configurations.
*
* @todo Probably this method should be moved to a more general place
*/
protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
val (keys, values) = pairs.unzip
val currentValues = keys.map(key => Try(conf.getConf(key)).toOption)
(keys, values).zipped.foreach(conf.setConf)
try f finally {
keys.zip(currentValues).foreach {
case (key, Some(value)) => conf.setConf(key, value)
case (key, None) => conf.unsetConf(key)
}
}
}

/**
* Generates a temporary path without creating the actual file/directory, then pass it to `f`. If
* a file/directory is created there by `f`, it will be delete after `f` returns.
*
* @todo Probably this method should be moved to a more general place
*/
protected def withTempPath(f: File => Unit): Unit = {
val path = Utils.createTempDir()
path.delete()
try f(path) finally Utils.deleteRecursively(path)
}

/**
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
* returns.
*
* @todo Probably this method should be moved to a more general place
*/
protected def withTempDir(f: File => Unit): Unit = {
val dir = Utils.createTempDir().getCanonicalFile
try f(dir) finally Utils.deleteRecursively(dir)
}

/**
* Drops temporary table `tableName` after calling `f`.
*/
protected def withTempTable(tableName: String)(f: => Unit): Unit = {
try f finally sqlContext.dropTempTable(tableName)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.spark.sql.hive

import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.hive.serde2.objectinspector.{StructField => HiveStructField, _}
import org.apache.hadoop.hive.serde2.{io => hiveIo}
import org.apache.hadoop.{io => hadoopIo}

Expand Down Expand Up @@ -122,7 +122,7 @@ import scala.collection.JavaConversions._
* even a normal java object (POJO)
* UnionObjectInspector: (tag: Int, object data) (TODO: not supported by SparkSQL yet)
*
* 3) ConstantObjectInspector:
* 3) ConstantObjectInspector:
* Constant object inspector can be either primitive type or Complex type, and it bundles a
* constant value as its property, usually the value is created when the constant object inspector
* constructed.
Expand All @@ -133,7 +133,7 @@ import scala.collection.JavaConversions._
}
}}}
* Hive provides 3 built-in constant object inspectors:
* Primitive Object Inspectors:
* Primitive Object Inspectors:
* WritableConstantStringObjectInspector
* WritableConstantHiveVarcharObjectInspector
* WritableConstantHiveDecimalObjectInspector
Expand All @@ -147,9 +147,9 @@ import scala.collection.JavaConversions._
* WritableConstantByteObjectInspector
* WritableConstantBinaryObjectInspector
* WritableConstantDateObjectInspector
* Map Object Inspector:
* Map Object Inspector:
* StandardConstantMapObjectInspector
* List Object Inspector:
* List Object Inspector:
* StandardConstantListObjectInspector]]
* Struct Object Inspector: Hive doesn't provide the built-in constant object inspector for Struct
* Union Object Inspector: Hive doesn't provide the built-in constant object inspector for Union
Expand Down Expand Up @@ -250,9 +250,9 @@ private[hive] trait HiveInspectors {
poi.getWritableConstantValue.getHiveDecimal)
case poi: WritableConstantTimestampObjectInspector =>
poi.getWritableConstantValue.getTimestamp.clone()
case poi: WritableConstantIntObjectInspector =>
case poi: WritableConstantIntObjectInspector =>
poi.getWritableConstantValue.get()
case poi: WritableConstantDoubleObjectInspector =>
case poi: WritableConstantDoubleObjectInspector =>
poi.getWritableConstantValue.get()
case poi: WritableConstantBooleanObjectInspector =>
poi.getWritableConstantValue.get()
Expand Down Expand Up @@ -306,7 +306,7 @@ private[hive] trait HiveInspectors {
// In order to keep backward-compatible, we have to copy the
// bytes with old apis
val bw = x.getPrimitiveWritableObject(data)
val result = new Array[Byte](bw.getLength())
val result = new Array[Byte](bw.getLength())
System.arraycopy(bw.getBytes(), 0, result, 0, bw.getLength())
result
case x: DateObjectInspector if x.preferWritable() =>
Expand Down Expand Up @@ -394,6 +394,30 @@ private[hive] trait HiveInspectors {
identity[Any]
}

/**
* Builds specific unwrappers ahead of time according to object inspector
* types to avoid pattern matching and branching costs per row.
*/
def unwrapperFor(field: HiveStructField): (Any, MutableRow, Int) => Unit =
field.getFieldObjectInspector match {
case oi: BooleanObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value))
case oi: ByteObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) => row.setByte(ordinal, oi.get(value))
case oi: ShortObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) => row.setShort(ordinal, oi.get(value))
case oi: IntObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) => row.setInt(ordinal, oi.get(value))
case oi: LongObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) => row.setLong(ordinal, oi.get(value))
case oi: FloatObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value))
case oi: DoubleObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
case oi =>
(value: Any, row: MutableRow, ordinal: Int) => row(ordinal) = unwrap(value, oi)
}

/**
* Converts native catalyst types to the types expected by Hive
* @param a the value to be wrapped
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.orc

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.io.orc.{OrcFile, Reader}
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector

import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.hive.HiveMetastoreTypes
import org.apache.spark.sql.types.StructType

private[orc] object OrcFileOperator extends Logging{
def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = {
val conf = config.getOrElse(new Configuration)
val fspath = new Path(pathStr)
val fs = fspath.getFileSystem(conf)
val orcFiles = listOrcFiles(pathStr, conf)

// TODO Need to consider all files when schema evolution is taken into account.
OrcFile.createReader(fs, orcFiles.head)
}

def readSchema(path: String, conf: Option[Configuration]): StructType = {
val reader = getFileReader(path, conf)
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName
HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
}

def getObjectInspector(path: String, conf: Option[Configuration]): StructObjectInspector = {
getFileReader(path, conf).getObjectInspector.asInstanceOf[StructObjectInspector]
}

def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
val origPath = new Path(pathStr)
val fs = origPath.getFileSystem(conf)
val path = origPath.makeQualified(fs)
val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
.filterNot(_.isDir)
.map(_.getPath)
.filterNot(_.getName.startsWith("_"))
.filterNot(_.getName.startsWith("."))

if (paths == null || paths.size == 0) {
throw new IllegalArgumentException(
s"orcFileOperator: path $path does not have valid orc files matching the pattern")
}

paths
}
}
Loading

0 comments on commit aa31e43

Please sign in to comment.