diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala index 869c8a5b8f1db..ad0f65442b914 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala @@ -17,34 +17,58 @@ package org.apache.spark.sql.hive +import org.apache.spark.annotation.Experimental import org.apache.spark.sql.{DataFrame, SaveMode} package object orc { + /** + * ::Experimental:: + * + * Extra ORC file loading functionality on [[HiveContext]] through implicit conversion. + * + * @since 1.4.0 + */ + @Experimental implicit class OrcContext(sqlContext: HiveContext) { + /** + * ::Experimental:: + * + * Loads specified Parquet files, returning the result as a [[DataFrame]]. + * + * @since 1.4.0 + */ + @Experimental @scala.annotation.varargs - def orcFile(path: String, paths: String*): DataFrame = { - val pathArray: Array[String] = { - if (paths.isEmpty) { - Array(path) - } else { - paths.toArray ++ Array(path) - } - } - - val orcRelation = OrcRelation(pathArray, Map.empty)(sqlContext) + def orcFile(paths: String*): DataFrame = { + val orcRelation = OrcRelation(paths.toArray, Map.empty)(sqlContext) sqlContext.baseRelationToDataFrame(orcRelation) } } + /** + * ::Experimental:: + * + * Extra ORC file writing functionality on [[DataFrame]] through implicit conversion + * + * @since 1.4.0 + */ + @Experimental implicit class OrcDataFrame(dataFrame: DataFrame) { + /** + * ::Experimental:: + * + * Saves the contents of this [[DataFrame]] as an ORC file, preserving the schema. Files that + * are written out using this method can be read back in as a [[DataFrame]] using + * [[OrcContext.orcFile()]]. + * + * @since 1.4.0 + */ + @Experimental def saveAsOrcFile(path: String, mode: SaveMode = SaveMode.Overwrite): Unit = { dataFrame.save(path, source = classOf[DefaultSource].getCanonicalName, mode) } } - // Flags for orc copression, predicates pushdown, etc. - val orcDefaultCompressVar = "hive.exec.orc.default.compress" - // This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public. - val SARG_PUSHDOWN = "sarg.pushdown" + private[orc] val SARG_PUSHDOWN = "sarg.pushdown" } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 3d52c31eca9f7..abc4c92d91da8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.orc import java.io.File +import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.io.orc.CompressionKind import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.Row @@ -216,7 +217,8 @@ class OrcQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll { // Following codec is supported in hive-0.13.1, ignore it now ignore("Other Compression options for writing to an Orcfile - 0.13.1 and above") { - TestHive.sparkContext.hadoopConfiguration.set(orcDefaultCompressVar, "SNAPPY") + val conf = TestHive.sparkContext.hadoopConfiguration + conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.name(), "SNAPPY") var tempDir = getTempFilePath("orcTest").getCanonicalPath val rdd = sparkContext.parallelize(1 to 100) .map(i => TestRDDEntry(i, s"val_$i")) @@ -225,14 +227,14 @@ class OrcQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll { assert(actualCodec == CompressionKind.SNAPPY) Utils.deleteRecursively(new File(tempDir)) - TestHive.sparkContext.hadoopConfiguration.set(orcDefaultCompressVar, "NONE") + conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.name(), "NONE") tempDir = getTempFilePath("orcTest").getCanonicalPath rdd.toDF().saveAsOrcFile(tempDir) actualCodec = OrcFileOperator.getFileReader(tempDir).getCompression assert(actualCodec == CompressionKind.NONE) Utils.deleteRecursively(new File(tempDir)) - TestHive.sparkContext.hadoopConfiguration.set(orcDefaultCompressVar, "LZO") + conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.name(), "LZO") tempDir = getTempFilePath("orcTest").getCanonicalPath rdd.toDF().saveAsOrcFile(tempDir) actualCodec = OrcFileOperator.getFileReader(tempDir).getCompression