Skip to content

Commit

Permalink
More strict schema checking
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed May 12, 2015
1 parent b746ab5 commit f18dec2
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,28 +132,32 @@ private[sql] case class InsertIntoFSBasedRelation(
df: DataFrame,
partitionColumns: Array[String]): Unit = {

require(
df.schema == relation.schema,
s"""DataFrame must have the same schema as the relation to which is inserted.
|DataFrame schema: ${df.schema}
|Relation schema: ${relation.schema}
""".stripMargin)

val sqlContext = df.sqlContext

val (partitionRDD, dataRDD) = {
val fieldNames = relation.schema.fieldNames
val (partitionCols, dataCols) = fieldNames.partition(partitionColumns.contains)
val dataCols = fieldNames.filterNot(partitionColumns.contains)
val df = sqlContext.createDataFrame(
DataFrame(sqlContext, query).queryExecution.toRdd,
relation.schema,
needsConversion = false)

assert(
partitionCols.sameElements(partitionColumns), {
val insertionPartitionCols = partitionColumns.mkString(",")
val relationPartitionCols =
relation.partitionSpec.partitionColumns.fieldNames.mkString(",")
s"""Partition columns mismatch.
|Expected: $relationPartitionCols
|Actual: $insertionPartitionCols
""".stripMargin
})

val partitionDF = df.select(partitionCols.head, partitionCols.tail: _*)
val partitionColumnsInSpec = relation.partitionSpec.partitionColumns.map(_.name)
require(
partitionColumnsInSpec.sameElements(partitionColumns),
s"""Partition columns mismatch.
|Expected: ${partitionColumnsInSpec.mkString(", ")}
|Actual: ${partitionColumns.mkString(", ")}
""".stripMargin)

val partitionDF = df.select(partitionColumns.head, partitionColumns.tail: _*)
val dataDF = df.select(dataCols.head, dataCols.tail: _*)

(partitionDF.rdd, dataDF.rdd)
Expand Down

0 comments on commit f18dec2

Please sign in to comment.