Skip to content

Commit

Permalink
Replaces FSBasedRelation.outputCommitterClass with FSBasedRelation.pr…
Browse files Browse the repository at this point in the history
…epareForWrite
  • Loading branch information
liancheng committed May 12, 2015
1 parent c4ed4fe commit 51be443
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 20 deletions.
28 changes: 12 additions & 16 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
Original file line number Diff line number Diff line change
Expand Up @@ -229,39 +229,35 @@ private[sql] abstract class BaseWriterContainer(

protected val dataSchema = relation.dataSchema

protected val outputCommitterClass: Class[_ <: FileOutputCommitter] =
relation.outputCommitterClass

protected val outputWriterClass: Class[_ <: OutputWriter] = relation.outputWriterClass

private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _

def driverSideSetup(): Unit = {
setupIDs(0, 0, 0)
setupConf()
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
outputCommitter = newOutputCommitter(outputCommitterClass, outputPath, taskAttemptContext)
relation.prepareForWrite(job)
outputFormatClass = job.getOutputFormatClass
outputCommitter = newOutputCommitter(taskAttemptContext)
outputCommitter.setupJob(jobContext)
}

def executorSideSetup(taskContext: TaskContext): Unit = {
setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber())
setupConf()
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
outputCommitter = newOutputCommitter(outputCommitterClass, outputPath, taskAttemptContext)
outputCommitter = newOutputCommitter(taskAttemptContext)
outputCommitter.setupTask(taskAttemptContext)
initWriters()
}

private def newOutputCommitter(
clazz: Class[_ <: FileOutputCommitter],
path: String,
context: TaskAttemptContext): FileOutputCommitter = {
val ctor = outputCommitterClass.getConstructor(classOf[Path], classOf[TaskAttemptContext])
ctor.setAccessible(true)

val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(serializableConf.value)
val qualified = fs.makeQualified(hadoopPath)
ctor.newInstance(qualified, context)
private def newOutputCommitter(context: TaskAttemptContext): FileOutputCommitter = {
outputFormatClass.newInstance().getOutputCommitter(context) match {
case f: FileOutputCommitter => f
case f => sys.error(
s"FileOutputCommitter or its subclass is expected, but got a ${f.getClass.getName}.")
}
}

private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.sources

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

import org.apache.spark.annotation.{DeveloperApi, Experimental}
Expand Down Expand Up @@ -432,10 +432,14 @@ abstract class FSBasedRelation private[sql](
}

/**
* The output committer class to use. Default to
* [[org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter]].
* Client side preparation for data writing can be put here. For example, user defined output
* committer can be configured here.
*
* Note that the only side effect expected here is mutating `job` via its setters. Especially,
* Spark SQL caches [[BaseRelation]] instances for performance, mutating relation internal states
* may cause unexpected behaviors.
*/
def outputCommitterClass: Class[_ <: FileOutputCommitter] = classOf[FileOutputCommitter]
def prepareForWrite(job: Job): Unit = ()

/**
* This method is responsible for producing a new [[OutputWriter]] for each newly opened output
Expand Down

0 comments on commit 51be443

Please sign in to comment.