Skip to content

Commit

Permalink
Tweaks data schema passed to FSBasedRelations
Browse files Browse the repository at this point in the history
- Only data columns should be passed
- Data schema should be converted to nullable version (see SPARK-5950 and PR apache#4826)
  • Loading branch information
liancheng committed May 12, 2015
1 parent 43ba50e commit 1f9b1a5
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,11 @@ private[sql] case class InsertIntoFSBasedRelation(
if (needsConversion) {
val converter = CatalystTypeConverters.createToScalaConverter(dataSchema)
while (iterator.hasNext) {
val row = converter(iterator.next()).asInstanceOf[Row]
val row = iterator.next()
val partitionPart = partitionProj(row)
val dataPart = dataProj(row)
writerContainer.outputWriterForRow(partitionPart).write(dataPart)
val convertedDataPart = converter(dataPart).asInstanceOf[Row]
writerContainer.outputWriterForRow(partitionPart).write(convertedDataPart)
}
} else {
while (iterator.hasNext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ private[sql] object ResolvedDataSource {
SparkHadoopUtil.get.globPath(patternPath).map(_.toString).toArray
}

val dataSchema = StructType(schema.filterNot(f => partitionColumns.contains(f.name)))

dataSource.createRelation(
sqlContext,
paths,
Expand Down Expand Up @@ -279,7 +281,7 @@ private[sql] object ResolvedDataSource {
schema.find(_.name == col).getOrElse {
throw new RuntimeException(s"Partition column $col not found in schema $schema")
}
})
}).asNullable
}

/** Create a [[ResolvedDataSource]] for saving the content of the given [[DataFrame]]. */
Expand All @@ -305,10 +307,11 @@ private[sql] object ResolvedDataSource {
val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
}
val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(f.name)))
val r = dataSource.createRelation(
sqlContext,
Array(outputPath.toString),
Some(data.schema),
Some(dataSchema.asNullable),
Some(partitionColumnsSchema(data.schema, partitionColumns)),
caseInsensitiveOptions)
sqlContext.executePlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ case class CreateMetastoreDataSourceAsSelect(
case SaveMode.Append =>
// Check if the specified data source match the data source of the existing table.
val resolved = ResolvedDataSource(
sqlContext, Some(query.schema), partitionColumns, provider, optionsWithPath)
sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath)
val createdRelation = LogicalRelation(resolved.relation)
EliminateSubQueries(sqlContext.table(tableName).logicalPlan) match {
case l @ LogicalRelation(_: InsertableRelation | _: FSBasedRelation) =>
Expand All @@ -203,14 +203,13 @@ case class CreateMetastoreDataSourceAsSelect(
s"table $tableName and using its data source and options."
val errorMessage =
s"""
|$errorDescription
|== Relations ==
|${sideBySide(
s"== Expected Relation ==" ::
l.toString :: Nil,
s"== Actual Relation ==" ::
createdRelation.toString :: Nil).mkString("\n")}
""".stripMargin
|$errorDescription
|== Relations ==
|${sideBySide(
s"== Expected Relation ==" :: l.toString :: Nil,
s"== Actual Relation ==" :: createdRelation.toString :: Nil
).mkString("\n")}
""".stripMargin
throw new AnalysisException(errorMessage)
}
existingSchema = Some(l.schema)
Expand Down

0 comments on commit 1f9b1a5

Please sign in to comment.