Skip to content

Commit

Permalink
SPARK-1757 Failing test for saving null primitives with .saveAsParque…
Browse files Browse the repository at this point in the history
…tFile()

https://issues.apache.org/jira/browse/SPARK-1757

The first test succeeds, but the second test fails with exception:

```
[info] - save and load case class RDD with Nones as parquet *** FAILED *** (14 milliseconds)
[info]   java.lang.RuntimeException: Unsupported datatype StructType(List())
[info]   at scala.sys.package$.error(package.scala:27)
[info]   at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetRelation.scala:201)
[info]   at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$1.apply(ParquetRelation.scala:235)
[info]   at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$1.apply(ParquetRelation.scala:235)
[info]   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
[info]   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
[info]   at scala.collection.immutable.List.foreach(List.scala:318)
[info]   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
[info]   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
[info]   at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetRelation.scala:234)
[info]   at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetRelation.scala:267)
[info]   at org.apache.spark.sql.parquet.ParquetRelation$.createEmpty(ParquetRelation.scala:143)
[info]   at org.apache.spark.sql.parquet.ParquetRelation$.create(ParquetRelation.scala:122)
[info]   at org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:139)
[info]   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
[info]   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
[info]   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
[info]   at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
[info]   at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:264)
[info]   at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:264)
[info]   at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:265)
[info]   at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:265)
[info]   at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:268)
[info]   at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:268)
[info]   at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:66)
[info]   at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:98)
```

Author: Andrew Ash <[email protected]>
Author: Michael Armbrust <[email protected]>

Closes #690 from ash211/rdd-parquet-save and squashes the following commits:

747a0b9 [Andrew Ash] Merge pull request #1 from marmbrus/pr/690
54bd00e [Michael Armbrust] Need to put Option first since Option <: Seq.
8f3f281 [Andrew Ash] SPARK-1757 Add failing test for saving SparkSQL Schemas with Option[?] fields as parquet
  • Loading branch information
ash211 authored and rxin committed May 13, 2014
1 parent 9cf9f18 commit 156df87
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ object ScalaReflection {

/** Returns a catalyst DataType for the given Scala Type using reflection. */
def schemaFor(tpe: `Type`): DataType = tpe match {
case t if t <:< typeOf[Option[_]] =>
val TypeRef(_, _, Seq(optType)) = t
schemaFor(optType)
case t if t <:< typeOf[Product] =>
val params = t.member("<init>": TermName).asMethod.paramss
StructType(
Expand All @@ -59,9 +62,6 @@ object ScalaReflection {
case t if t <:< typeOf[String] => StringType
case t if t <:< typeOf[Timestamp] => TimestampType
case t if t <:< typeOf[BigDecimal] => DecimalType
case t if t <:< typeOf[Option[_]] =>
val TypeRef(_, _, Seq(optType)) = t
schemaFor(optType)
case t if t <:< typeOf[java.lang.Integer] => IntegerType
case t if t <:< typeOf[java.lang.Long] => LongType
case t if t <:< typeOf[java.lang.Double] => DoubleType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ import org.apache.spark.sql.test.TestSQLContext._

case class TestRDDEntry(key: Int, value: String)

case class NullReflectData(
intField: java.lang.Integer,
longField: java.lang.Long,
floatField: java.lang.Float,
doubleField: java.lang.Double,
booleanField: java.lang.Boolean)

case class OptionalReflectData(
intField: Option[Int],
longField: Option[Long],
floatField: Option[Float],
doubleField: Option[Double],
booleanField: Option[Boolean])

class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
import TestData._
TestData // Load test data tables.
Expand Down Expand Up @@ -195,5 +209,35 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
Utils.deleteRecursively(ParquetTestData.testDir)
ParquetTestData.writeFile()
}

test("save and load case class RDD with nulls as parquet") {
val data = NullReflectData(null, null, null, null, null)
val rdd = sparkContext.parallelize(data :: Nil)

val file = getTempFilePath("parquet")
val path = file.toString
rdd.saveAsParquetFile(path)
val readFile = parquetFile(path)

val rdd_saved = readFile.collect()
assert(rdd_saved(0) === Seq.fill(5)(null))
Utils.deleteRecursively(file)
assert(true)
}

test("save and load case class RDD with Nones as parquet") {
val data = OptionalReflectData(null, null, null, null, null)
val rdd = sparkContext.parallelize(data :: Nil)

val file = getTempFilePath("parquet")
val path = file.toString
rdd.saveAsParquetFile(path)
val readFile = parquetFile(path)

val rdd_saved = readFile.collect()
assert(rdd_saved(0) === Seq.fill(5)(null))
Utils.deleteRecursively(file)
assert(true)
}
}

0 comments on commit 156df87

Please sign in to comment.