From 156df87e7ca0e6cda2cc970ecd1466ce06f7576f Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Mon, 12 May 2014 19:23:39 -0700 Subject: [PATCH] SPARK-1757 Failing test for saving null primitives with .saveAsParquetFile() https://issues.apache.org/jira/browse/SPARK-1757 The first test succeeds, but the second test fails with exception: ``` [info] - save and load case class RDD with Nones as parquet *** FAILED *** (14 milliseconds) [info] java.lang.RuntimeException: Unsupported datatype StructType(List()) [info] at scala.sys.package$.error(package.scala:27) [info] at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetRelation.scala:201) [info] at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$1.apply(ParquetRelation.scala:235) [info] at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$1.apply(ParquetRelation.scala:235) [info] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) [info] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) [info] at scala.collection.immutable.List.foreach(List.scala:318) [info] at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) [info] at scala.collection.AbstractTraversable.map(Traversable.scala:105) [info] at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetRelation.scala:234) [info] at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetRelation.scala:267) [info] at org.apache.spark.sql.parquet.ParquetRelation$.createEmpty(ParquetRelation.scala:143) [info] at org.apache.spark.sql.parquet.ParquetRelation$.create(ParquetRelation.scala:122) [info] at org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:139) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) [info] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) [info] at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:264) [info] at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:264) [info] at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:265) [info] at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:265) [info] at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:268) [info] at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:268) [info] at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:66) [info] at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:98) ``` Author: Andrew Ash Author: Michael Armbrust Closes #690 from ash211/rdd-parquet-save and squashes the following commits: 747a0b9 [Andrew Ash] Merge pull request #1 from marmbrus/pr/690 54bd00e [Michael Armbrust] Need to put Option first since Option <: Seq. 8f3f281 [Andrew Ash] SPARK-1757 Add failing test for saving SparkSQL Schemas with Option[?] fields as parquet --- .../spark/sql/catalyst/ScalaReflection.scala | 6 +-- .../spark/sql/parquet/ParquetQuerySuite.scala | 44 +++++++++++++++++++ 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 792ef6cee6f5d..196695a0a188f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -41,6 +41,9 @@ object ScalaReflection { /** Returns a catalyst DataType for the given Scala Type using reflection. */ def schemaFor(tpe: `Type`): DataType = tpe match { + case t if t <:< typeOf[Option[_]] => + val TypeRef(_, _, Seq(optType)) = t + schemaFor(optType) case t if t <:< typeOf[Product] => val params = t.member("": TermName).asMethod.paramss StructType( @@ -59,9 +62,6 @@ object ScalaReflection { case t if t <:< typeOf[String] => StringType case t if t <:< typeOf[Timestamp] => TimestampType case t if t <:< typeOf[BigDecimal] => DecimalType - case t if t <:< typeOf[Option[_]] => - val TypeRef(_, _, Seq(optType)) = t - schemaFor(optType) case t if t <:< typeOf[java.lang.Integer] => IntegerType case t if t <:< typeOf[java.lang.Long] => LongType case t if t <:< typeOf[java.lang.Double] => DoubleType diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index d9c9b9a076ab9..ff1677eb8a480 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -42,6 +42,20 @@ import org.apache.spark.sql.test.TestSQLContext._ case class TestRDDEntry(key: Int, value: String) +case class NullReflectData( + intField: java.lang.Integer, + longField: java.lang.Long, + floatField: java.lang.Float, + doubleField: java.lang.Double, + booleanField: java.lang.Boolean) + +case class OptionalReflectData( + intField: Option[Int], + longField: Option[Long], + floatField: Option[Float], + doubleField: Option[Double], + booleanField: Option[Boolean]) + class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll { import TestData._ TestData // Load test data tables. @@ -195,5 +209,35 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll { Utils.deleteRecursively(ParquetTestData.testDir) ParquetTestData.writeFile() } + + test("save and load case class RDD with nulls as parquet") { + val data = NullReflectData(null, null, null, null, null) + val rdd = sparkContext.parallelize(data :: Nil) + + val file = getTempFilePath("parquet") + val path = file.toString + rdd.saveAsParquetFile(path) + val readFile = parquetFile(path) + + val rdd_saved = readFile.collect() + assert(rdd_saved(0) === Seq.fill(5)(null)) + Utils.deleteRecursively(file) + assert(true) + } + + test("save and load case class RDD with Nones as parquet") { + val data = OptionalReflectData(null, null, null, null, null) + val rdd = sparkContext.parallelize(data :: Nil) + + val file = getTempFilePath("parquet") + val path = file.toString + rdd.saveAsParquetFile(path) + val readFile = parquetFile(path) + + val rdd_saved = readFile.collect() + assert(rdd_saved(0) === Seq.fill(5)(null)) + Utils.deleteRecursively(file) + assert(true) + } }