Skip to content

Commit

Permalink
Enforces that FileOutputFormat must be used
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed May 12, 2015
1 parent be0c268 commit 54c3d7b
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, FileOutputFormat}
import org.apache.hadoop.util.Shell
import parquet.hadoop.util.ContextUtil

Expand Down Expand Up @@ -211,7 +211,7 @@ private[sql] abstract class BaseWriterContainer(
@transient private val jobContext: JobContext = job

// The following fields are initialized and used on both driver and executor side.
@transient private var outputCommitter: OutputCommitter = _
@transient protected var outputCommitter: FileOutputCommitter = _
@transient private var jobId: JobID = _
@transient private var taskId: TaskID = _
@transient private var taskAttemptId: TaskAttemptID = _
Expand All @@ -235,7 +235,11 @@ private[sql] abstract class BaseWriterContainer(
setupConf()
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
val outputFormat = relation.outputFormatClass.newInstance()
outputCommitter = outputFormat.getOutputCommitter(taskAttemptContext)
outputCommitter = outputFormat.getOutputCommitter(taskAttemptContext) match {
case c: FileOutputCommitter => c
case _ => sys.error(
s"Output committer must be ${classOf[FileOutputCommitter].getName} or its subclasses")
}
outputCommitter.setupJob(jobContext)
}

Expand All @@ -244,7 +248,11 @@ private[sql] abstract class BaseWriterContainer(
setupConf()
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
val outputFormat = outputFormatClass.newInstance()
outputCommitter = outputFormat.getOutputCommitter(taskAttemptContext)
outputCommitter = outputFormat.getOutputCommitter(taskAttemptContext) match {
case c: FileOutputCommitter => c
case _ => sys.error(
s"Output committer must be ${classOf[FileOutputCommitter].getName} or its subclasses")
}
outputCommitter.setupTask(taskAttemptContext)
initWriters()
}
Expand Down Expand Up @@ -298,7 +306,7 @@ private[sql] class DefaultWriterContainer(

override protected def initWriters(): Unit = {
writer = relation.outputWriterClass.newInstance()
writer.init(outputPath, dataSchema, taskAttemptContext)
writer.init(outputCommitter.getWorkPath.toString, dataSchema, taskAttemptContext)
}

override def outputWriterForRow(row: Row): OutputWriter = writer
Expand Down Expand Up @@ -340,7 +348,7 @@ private[sql] class DynamicPartitionWriterContainer(
}.mkString

outputWriters.getOrElseUpdate(partitionPath, {
val path = new Path(outputPath, partitionPath.stripPrefix(Path.SEPARATOR))
val path = new Path(outputCommitter.getWorkPath, partitionPath.stripPrefix(Path.SEPARATOR))
val writer = outputWriterClass.newInstance()
writer.init(path.toString, dataSchema, taskAttemptContext)
writer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.apache.spark.sql.sources

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, OutputFormat, OutputCommitter}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -411,7 +411,7 @@ abstract class FSBasedRelation private[sql](
buildScan(requiredColumns, inputPaths)
}

def outputFormatClass: Class[_ <: OutputFormat[Void, Row]]
def outputFormatClass: Class[_ <: FileOutputFormat[Void, Row]]

/**
* This method is responsible for producing a new [[OutputWriter]] for each newly opened output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ package org.apache.spark.sql.sources
import scala.collection.mutable

import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, OutputFormat}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
import org.scalatest.BeforeAndAfter

import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -110,7 +109,7 @@ class SimpleFSBasedRelation

override def outputWriterClass: Class[_ <: OutputWriter] = classOf[SimpleOutputWriter]

override def outputFormatClass: Class[_ <: OutputFormat[Void, Row]] = {
override def outputFormatClass: Class[_ <: FileOutputFormat[Void, Row]] = {
// This is just a mock, not used within this test suite.
classOf[TextOutputFormat[Void, Row]]
}
Expand Down Expand Up @@ -268,7 +267,7 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
val expectedRows = for (i <- 1 to 3; _ <- 1 to 2) yield Row(i, s"val_$i")

TestResult.synchronized {
assert(TestResult.writerPaths.size === 2)
assert(TestResult.writerPaths.size === 4)
assert(TestResult.writtenRows === expectedRows.toSet)
}
}
Expand All @@ -295,7 +294,7 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
val expectedRows = for (i <- 1 to 3; _ <- 1 to 2) yield Row(i, s"val_$i")

TestResult.synchronized {
assert(TestResult.writerPaths.size === 2)
assert(TestResult.writerPaths.size === 4)
assert(TestResult.writtenRows === expectedRows.toSet)
}
}
Expand Down Expand Up @@ -328,7 +327,7 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
val expectedRows = for (i <- 1 to 3; _ <- 1 to 4) yield Row(i, s"val_$i")

TestResult.synchronized {
assert(TestResult.writerPaths.size === 4)
assert(TestResult.writerPaths.size === 8)
assert(TestResult.writtenRows === expectedRows.toSet)
}
}
Expand Down Expand Up @@ -381,7 +380,7 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
val expectedRows = for (i <- 1 to 3; _ <- 1 to 2) yield Row(i, s"val_$i")

TestResult.synchronized {
assert(TestResult.writerPaths.size === 2)
assert(TestResult.writerPaths.size === 4)
assert(TestResult.writtenRows === expectedRows.toSet)
}

Expand Down Expand Up @@ -443,7 +442,7 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
val expectedRows = for (i <- 1 to 3; _ <- 1 to 2) yield Row(i, s"val_$i")

TestResult.synchronized {
assert(TestResult.writerPaths.size === 2)
assert(TestResult.writerPaths.size === 4)
assert(TestResult.writtenRows === expectedRows.toSet)
}

Expand Down Expand Up @@ -472,7 +471,7 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
val expectedRows = for (i <- 1 to 3; _ <- 1 to 2) yield Row(i, s"val_$i")

TestResult.synchronized {
assert(TestResult.writerPaths.size === 2)
assert(TestResult.writerPaths.size === 4)
assert(TestResult.writtenRows === expectedRows.toSet)
}

Expand All @@ -493,7 +492,7 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
val expectedRows = for (i <- 1 to 3; _ <- 1 to 2) yield Row(i, s"val_$i")

TestResult.synchronized {
assert(TestResult.writerPaths.size === 2)
assert(TestResult.writerPaths.size === 8)
assert(TestResult.writtenRows === expectedRows.toSet)
}

Expand Down Expand Up @@ -535,7 +534,7 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
val expectedRows = for (i <- 1 to 3; _ <- 1 to 2) yield Row(i, s"val_$i")

TestResult.synchronized {
assert(TestResult.writerPaths.size === 2)
assert(TestResult.writerPaths.size === 4)
assert(TestResult.writtenRows === expectedRows.toSet)
}

Expand Down

0 comments on commit 54c3d7b

Please sign in to comment.