diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 5cc41a83cc792..f0df19112ae37 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -33,6 +33,7 @@ private[spark] object SQLConf { val DIALECT = "spark.sql.dialect" val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString" val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata" + val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec" // This is only used for the thriftserver val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool" @@ -78,6 +79,9 @@ trait SQLConf { /** When true tables cached using the in-memory columnar caching will be compressed. */ private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "false").toBoolean + /** The compression codec for writing to a Parquetfile */ + private[spark] def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION, "snappy") + /** The number of rows that will be */ private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "1000").toInt diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 1713ae6fb5d93..5ae768293a22e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -100,8 +100,13 @@ private[sql] object ParquetRelation { // The compression type type CompressionType = parquet.hadoop.metadata.CompressionCodecName - // The default compression - val defaultCompression = CompressionCodecName.GZIP + // The parquet compression short names + val shortParquetCompressionCodecNames = Map( + "NONE" -> CompressionCodecName.UNCOMPRESSED, + "UNCOMPRESSED" -> CompressionCodecName.UNCOMPRESSED, + "SNAPPY" -> CompressionCodecName.SNAPPY, + "GZIP" -> CompressionCodecName.GZIP, + "LZO" -> CompressionCodecName.LZO) /** * Creates a new ParquetRelation and underlying Parquetfile for the given LogicalPlan. Note that @@ -141,9 +146,8 @@ private[sql] object ParquetRelation { conf: Configuration, sqlContext: SQLContext): ParquetRelation = { val path = checkPath(pathString, allowExisting, conf) - if (conf.get(ParquetOutputFormat.COMPRESSION) == null) { - conf.set(ParquetOutputFormat.COMPRESSION, ParquetRelation.defaultCompression.name()) - } + conf.set(ParquetOutputFormat.COMPRESSION, shortParquetCompressionCodecNames.getOrElse( + sqlContext.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED).name()) ParquetRelation.enableLogForwarding() ParquetTypesConverter.writeMetaData(attributes, path, conf) new ParquetRelation(path.toString, Some(conf), sqlContext) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 172dcd6aa0ee3..28f43b36832ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -186,6 +186,100 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, oldIsParquetBinaryAsString.toString) } + test("Compression options for writing to a Parquetfile") { + val defaultParquetCompressionCodec = TestSQLContext.parquetCompressionCodec + import scala.collection.JavaConversions._ + + val file = getTempFilePath("parquet") + val path = file.toString + val rdd = TestSQLContext.sparkContext.parallelize((1 to 100)) + .map(i => TestRDDEntry(i, s"val_$i")) + + // test default compression codec + rdd.saveAsParquetFile(path) + var actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)) + .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct + assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil) + + parquetFile(path).registerTempTable("tmp") + checkAnswer( + sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"), + (5, "val_5") :: + (7, "val_7") :: Nil) + + Utils.deleteRecursively(file) + + // test uncompressed parquet file with property value "UNCOMPRESSED" + TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "UNCOMPRESSED") + + rdd.saveAsParquetFile(path) + actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)) + .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct + assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil) + + parquetFile(path).registerTempTable("tmp") + checkAnswer( + sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"), + (5, "val_5") :: + (7, "val_7") :: Nil) + + Utils.deleteRecursively(file) + + // test uncompressed parquet file with property value "none" + TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "none") + + rdd.saveAsParquetFile(path) + actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)) + .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct + assert(actualCodec === "UNCOMPRESSED" :: Nil) + + parquetFile(path).registerTempTable("tmp") + checkAnswer( + sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"), + (5, "val_5") :: + (7, "val_7") :: Nil) + + Utils.deleteRecursively(file) + + // test gzip compression codec + TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "gzip") + + rdd.saveAsParquetFile(path) + actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)) + .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct + assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil) + + parquetFile(path).registerTempTable("tmp") + checkAnswer( + sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"), + (5, "val_5") :: + (7, "val_7") :: Nil) + + Utils.deleteRecursively(file) + + // test snappy compression codec + TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "snappy") + + rdd.saveAsParquetFile(path) + actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)) + .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct + assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil) + + parquetFile(path).registerTempTable("tmp") + checkAnswer( + sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"), + (5, "val_5") :: + (7, "val_7") :: Nil) + + Utils.deleteRecursively(file) + + // TODO: Lzo requires additional external setup steps so leave it out for now + // ref.: https://github.com/Parquet/parquet-mr/blob/parquet-1.5.0/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java#L169 + + // Set it back. + TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, defaultParquetCompressionCodec) + } + test("Read/Write All Types with non-primitive type") { val tempDir = getTempFilePath("parquetTest").getCanonicalPath val range = (0 to 255)