Skip to content

Commit

Permalink
Adds @SInCE and @experimental annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed May 16, 2015
1 parent 128bd3b commit 21ada22
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 17 deletions.
52 changes: 38 additions & 14 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,58 @@

package org.apache.spark.sql.hive

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.{DataFrame, SaveMode}

package object orc {
/**
* ::Experimental::
*
* Extra ORC file loading functionality on [[HiveContext]] through implicit conversion.
*
* @since 1.4.0
*/
@Experimental
implicit class OrcContext(sqlContext: HiveContext) {
/**
* ::Experimental::
*
* Loads specified Parquet files, returning the result as a [[DataFrame]].
*
* @since 1.4.0
*/
@Experimental
@scala.annotation.varargs
def orcFile(path: String, paths: String*): DataFrame = {
val pathArray: Array[String] = {
if (paths.isEmpty) {
Array(path)
} else {
paths.toArray ++ Array(path)
}
}

val orcRelation = OrcRelation(pathArray, Map.empty)(sqlContext)
def orcFile(paths: String*): DataFrame = {
val orcRelation = OrcRelation(paths.toArray, Map.empty)(sqlContext)
sqlContext.baseRelationToDataFrame(orcRelation)
}
}

/**
* ::Experimental::
*
* Extra ORC file writing functionality on [[DataFrame]] through implicit conversion
*
* @since 1.4.0
*/
@Experimental
implicit class OrcDataFrame(dataFrame: DataFrame) {
/**
* ::Experimental::
*
* Saves the contents of this [[DataFrame]] as an ORC file, preserving the schema. Files that
* are written out using this method can be read back in as a [[DataFrame]] using
* [[OrcContext.orcFile()]].
*
* @since 1.4.0
*/
@Experimental
def saveAsOrcFile(path: String, mode: SaveMode = SaveMode.Overwrite): Unit = {
dataFrame.save(path, source = classOf[DefaultSource].getCanonicalName, mode)
}
}

// Flags for orc copression, predicates pushdown, etc.
val orcDefaultCompressVar = "hive.exec.orc.default.compress"

// This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public.
val SARG_PUSHDOWN = "sarg.pushdown"
private[orc] val SARG_PUSHDOWN = "sarg.pushdown"
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.orc

import java.io.File

import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.io.orc.CompressionKind
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.Row
Expand Down Expand Up @@ -216,7 +217,8 @@ class OrcQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll {

// Following codec is supported in hive-0.13.1, ignore it now
ignore("Other Compression options for writing to an Orcfile - 0.13.1 and above") {
TestHive.sparkContext.hadoopConfiguration.set(orcDefaultCompressVar, "SNAPPY")
val conf = TestHive.sparkContext.hadoopConfiguration
conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.name(), "SNAPPY")
var tempDir = getTempFilePath("orcTest").getCanonicalPath
val rdd = sparkContext.parallelize(1 to 100)
.map(i => TestRDDEntry(i, s"val_$i"))
Expand All @@ -225,14 +227,14 @@ class OrcQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll {
assert(actualCodec == CompressionKind.SNAPPY)
Utils.deleteRecursively(new File(tempDir))

TestHive.sparkContext.hadoopConfiguration.set(orcDefaultCompressVar, "NONE")
conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.name(), "NONE")
tempDir = getTempFilePath("orcTest").getCanonicalPath
rdd.toDF().saveAsOrcFile(tempDir)
actualCodec = OrcFileOperator.getFileReader(tempDir).getCompression
assert(actualCodec == CompressionKind.NONE)
Utils.deleteRecursively(new File(tempDir))

TestHive.sparkContext.hadoopConfiguration.set(orcDefaultCompressVar, "LZO")
conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.name(), "LZO")
tempDir = getTempFilePath("orcTest").getCanonicalPath
rdd.toDF().saveAsOrcFile(tempDir)
actualCodec = OrcFileOperator.getFileReader(tempDir).getCompression
Expand Down

0 comments on commit 21ada22

Please sign in to comment.