Skip to content

Commit

Permalink
Adds Scala/Catalyst row conversion when writing non-partitioned tables
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed May 12, 2015
1 parent fa543f3 commit 0b8cd70
Showing 1 changed file with 12 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import parquet.hadoop.util.ContextUtil
import org.apache.spark._
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.RunnableCommand
Expand Down Expand Up @@ -123,9 +124,17 @@ private[sql] case class InsertIntoFSBasedRelation(
writerContainer.executorSideSetup(taskContext)

try {
while (iterator.hasNext) {
val row = iterator.next()
writerContainer.outputWriterForRow(row).write(row)
if (relation.needConversion) {
val converter = CatalystTypeConverters.createToScalaConverter(relation.dataSchema)
while (iterator.hasNext) {
val row = converter(iterator.next()).asInstanceOf[Row]
writerContainer.outputWriterForRow(row).write(row)
}
} else {
while (iterator.hasNext) {
val row = iterator.next()
writerContainer.outputWriterForRow(row).write(row)
}
}
writerContainer.commitTask()
} catch { case cause: Throwable =>
Expand Down

0 comments on commit 0b8cd70

Please sign in to comment.