Skip to content

Commit

Permalink
Uses TaskAttempContext rather than Configuration in OutputWriter.init
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed May 12, 2015
1 parent 0bc6ad1 commit be0c268
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,14 @@ private[sql] abstract class BaseWriterContainer(
protected val serializableConf = new SerializableWritable(ContextUtil.getConfiguration(job))

// This is only used on driver side.
@transient private var jobContext: JobContext = job
@transient private val jobContext: JobContext = job

// The following fields are initialized and used on both driver and executor side.
@transient private var outputCommitter: OutputCommitter = _
@transient private var jobId: JobID = _
@transient private var taskId: TaskID = _
@transient private var taskAttemptId: TaskAttemptID = _
@transient private var taskAttemptContext: TaskAttemptContext = _
@transient protected var taskAttemptContext: TaskAttemptContext = _

protected val outputPath = {
assert(
Expand Down Expand Up @@ -266,11 +266,7 @@ private[sql] abstract class BaseWriterContainer(
// Called on executor side when writing rows
def outputWriterForRow(row: Row): OutputWriter

protected def initWriters(): Unit = {
val writer = outputWriterClass.newInstance()
writer.init(outputPath, dataSchema, serializableConf.value)
mutable.Map(outputPath -> writer)
}
protected def initWriters(): Unit

def commitTask(): Unit = {
SparkHadoopMapRedUtil.commitTask(
Expand Down Expand Up @@ -302,7 +298,7 @@ private[sql] class DefaultWriterContainer(

override protected def initWriters(): Unit = {
writer = relation.outputWriterClass.newInstance()
writer.init(outputPath, dataSchema, serializableConf.value)
writer.init(outputPath, dataSchema, taskAttemptContext)
}

override def outputWriterForRow(row: Row): OutputWriter = writer
Expand Down Expand Up @@ -346,7 +342,7 @@ private[sql] class DynamicPartitionWriterContainer(
outputWriters.getOrElseUpdate(partitionPath, {
val path = new Path(outputPath, partitionPath.stripPrefix(Path.SEPARATOR))
val writer = outputWriterClass.newInstance()
writer.init(path.toString, dataSchema, serializableConf.value)
writer.init(path.toString, dataSchema, taskAttemptContext)
writer
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.sources

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.{OutputFormat, OutputCommitter}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, OutputFormat, OutputCommitter}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

import org.apache.spark.annotation.{DeveloperApi, Experimental}
Expand Down Expand Up @@ -263,12 +263,12 @@ abstract class OutputWriter {
* @param path The file path to which this [[OutputWriter]] is supposed to write.
* @param dataSchema Schema of the rows to be written. Partition columns are not included in the
* schema if the corresponding relation is partitioned.
* @param conf Hadoop configuration inherited from driver side.
* @param context The Hadoop MapReduce task context.
*/
def init(
path: String,
dataSchema: StructType,
conf: Configuration): Unit = ()
context: TaskAttemptContext): Unit = ()

/**
* Persists a single row. Invoked on the executor side. When writing to dynamically partitioned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable
import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.OutputFormat
import org.apache.hadoop.mapreduce.{TaskAttemptContext, OutputFormat}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.scalatest.BeforeAndAfter

Expand Down Expand Up @@ -56,7 +56,7 @@ class SimpleFSBasedSource extends FSBasedRelationProvider {
}

class SimpleOutputWriter extends OutputWriter {
override def init(path: String, dataSchema: StructType, conf: Configuration): Unit = {
override def init(path: String, dataSchema: StructType, context: TaskAttemptContext): Unit = {
TestResult.synchronized {
TestResult.writerPaths += path
}
Expand Down

0 comments on commit be0c268

Please sign in to comment.