Skip to content

Commit

Permalink
Polishes the ORC data source
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed May 16, 2015
1 parent 2650a42 commit d734496
Show file tree
Hide file tree
Showing 14 changed files with 591 additions and 328 deletions.
7 changes: 6 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ private[spark] object SQLConf {
val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
val PARQUET_USE_DATA_SOURCE_API = "spark.sql.parquet.useDataSourceApi"

val ORC_FILTER_PUSHDOWN_ENABLED = "spark.sql.orc.filterPushdown"

val HIVE_VERIFY_PARTITIONPATH = "spark.sql.hive.verifyPartitionPath"

val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
Expand Down Expand Up @@ -143,6 +145,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def parquetUseDataSourceApi =
getConf(PARQUET_USE_DATA_SOURCE_API, "true").toBoolean

private[spark] def orcFilterPushDown =
getConf(ORC_FILTER_PUSHDOWN_ENABLED, "false").toBoolean

/** When true uses verifyPartitionPath to prune the path which is not exists. */
private[spark] def verifyPartitionPath =
getConf(HIVE_VERIFY_PARTITIONPATH, "true").toBoolean
Expand Down Expand Up @@ -254,7 +259,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {

private[spark] def dataFrameRetainGroupColumns: Boolean =
getConf(DATAFRAME_RETAIN_GROUP_COLUMNS, "true").toBoolean

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ import java.io.File

import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import scala.util.Try

import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.util.Utils
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.{DataFrame, SaveMode}

/**
* A helper trait that provides convenient facilities for Parquet testing.
Expand All @@ -33,54 +32,9 @@ import org.apache.spark.util.Utils
* convenient to use tuples rather than special case classes when writing test cases/suites.
* Especially, `Tuple1.apply` can be used to easily wrap a single type/value.
*/
private[sql] trait ParquetTest {
val sqlContext: SQLContext

private[sql] trait ParquetTest extends SQLTestUtils {
import sqlContext.implicits.{localSeqToDataFrameHolder, rddToDataFrameHolder}
import sqlContext.{conf, sparkContext}

protected def configuration = sparkContext.hadoopConfiguration

/**
* Sets all SQL configurations specified in `pairs`, calls `f`, and then restore all SQL
* configurations.
*
* @todo Probably this method should be moved to a more general place
*/
protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
val (keys, values) = pairs.unzip
val currentValues = keys.map(key => Try(conf.getConf(key)).toOption)
(keys, values).zipped.foreach(conf.setConf)
try f finally {
keys.zip(currentValues).foreach {
case (key, Some(value)) => conf.setConf(key, value)
case (key, None) => conf.unsetConf(key)
}
}
}

/**
* Generates a temporary path without creating the actual file/directory, then pass it to `f`. If
* a file/directory is created there by `f`, it will be delete after `f` returns.
*
* @todo Probably this method should be moved to a more general place
*/
protected def withTempPath(f: File => Unit): Unit = {
val path = Utils.createTempDir()
path.delete()
try f(path) finally Utils.deleteRecursively(path)
}

/**
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
* returns.
*
* @todo Probably this method should be moved to a more general place
*/
protected def withTempDir(f: File => Unit): Unit = {
val dir = Utils.createTempDir().getCanonicalFile
try f(dir) finally Utils.deleteRecursively(dir)
}
import sqlContext.sparkContext

/**
* Writes `data` to a Parquet file, which is then passed to `f` and will be deleted after `f`
Expand All @@ -105,13 +59,6 @@ private[sql] trait ParquetTest {
withParquetFile(data)(path => f(sqlContext.read.parquet(path)))
}

/**
* Drops temporary table `tableName` after calling `f`.
*/
protected def withTempTable(tableName: String)(f: => Unit): Unit = {
try f finally sqlContext.dropTempTable(tableName)
}

/**
* Writes `data` to a Parquet file, reads it back as a [[DataFrame]] and registers it as a
* temporary table named `tableName`, then call `f`. The temporary table together with the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.test

import java.io.File

import scala.util.Try

import org.apache.spark.sql.SQLContext
import org.apache.spark.util.Utils

trait SQLTestUtils {
val sqlContext: SQLContext

import sqlContext.{conf, sparkContext}

protected def configuration = sparkContext.hadoopConfiguration

/**
* Sets all SQL configurations specified in `pairs`, calls `f`, and then restore all SQL
* configurations.
*
* @todo Probably this method should be moved to a more general place
*/
protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
val (keys, values) = pairs.unzip
val currentValues = keys.map(key => Try(conf.getConf(key)).toOption)
(keys, values).zipped.foreach(conf.setConf)
try f finally {
keys.zip(currentValues).foreach {
case (key, Some(value)) => conf.setConf(key, value)
case (key, None) => conf.unsetConf(key)
}
}
}

/**
* Generates a temporary path without creating the actual file/directory, then pass it to `f`. If
* a file/directory is created there by `f`, it will be delete after `f` returns.
*
* @todo Probably this method should be moved to a more general place
*/
protected def withTempPath(f: File => Unit): Unit = {
val path = Utils.createTempDir()
path.delete()
try f(path) finally Utils.deleteRecursively(path)
}

/**
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
* returns.
*
* @todo Probably this method should be moved to a more general place
*/
protected def withTempDir(f: File => Unit): Unit = {
val dir = Utils.createTempDir().getCanonicalFile
try f(dir) finally Utils.deleteRecursively(dir)
}

/**
* Drops temporary table `tableName` after calling `f`.
*/
protected def withTempTable(tableName: String)(f: => Unit): Unit = {
try f finally sqlContext.dropTempTable(tableName)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

package org.apache.spark.sql.hive.orc


import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.primitive._

import org.apache.spark.sql.catalyst.expressions.MutableRow
import org.apache.spark.sql.hive.{HiveInspectors, HiveShim}
import org.apache.spark.sql.hive.HiveInspectors

/**
* We can consolidate TableReader.unwrappers and HiveInspectors.wrapperFor to use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,25 @@ import org.apache.spark.sql.hive.HiveMetastoreTypes
import org.apache.spark.sql.types.StructType

private[orc] object OrcFileOperator extends Logging{

def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = {
val conf = config.getOrElse(new Configuration)
val fspath = new Path(pathStr)
val fs = fspath.getFileSystem(conf)
val orcFiles = listOrcFiles(pathStr, conf)
OrcFile.createReader(fs, orcFiles(0))

// TODO Need to consider all files when schema evolution is taken into account.
OrcFile.createReader(fs, orcFiles.head)
}

def readSchema(path: String, conf: Option[Configuration]): StructType = {
val reader = getFileReader(path, conf)
val readerInspector: StructObjectInspector = reader.getObjectInspector
.asInstanceOf[StructObjectInspector]
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName
HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
}

def getObjectInspector(path: String, conf: Option[Configuration]): StructObjectInspector = {
val reader = getFileReader(path, conf)
val readerInspector: StructObjectInspector = reader.getObjectInspector
.asInstanceOf[StructObjectInspector]
readerInspector
getFileReader(path, conf).getObjectInspector.asInstanceOf[StructObjectInspector]
}

def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
Expand All @@ -66,6 +63,7 @@ private[orc] object OrcFileOperator extends Logging{
throw new IllegalArgumentException(
s"orcFileOperator: path $path does not have valid orc files matching the pattern")
}

paths
}
}
Loading

0 comments on commit d734496

Please sign in to comment.