Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1102: Create a saveAsNewAPIHadoopDataset method #12

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job}

import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.Partitioner._
Expand Down Expand Up @@ -558,6 +558,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
}

/**
* Output the RDD to any Hadoop-supported storage system, using
* a Configuration object for that storage system.
*/
def saveAsNewAPIHadoopDataset(conf: Configuration) {
rdd.saveAsNewAPIHadoopDataset(conf)
}

/** Output the RDD to any Hadoop-supported file system. */
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
path: String,
Expand Down
104 changes: 58 additions & 46 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ import scala.reflect.ClassTag

import com.clearspring.analytics.stream.cardinality.HyperLogLog
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, JobContext, SparkHadoopMapReduceUtil}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}

// SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark.
Expand Down Expand Up @@ -603,50 +603,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
val job = new NewAPIHadoopJob(conf)
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)

val wrappedConf = new SerializableWritable(job.getConfiguration)
val outpath = new Path(path)
NewFileOutputFormat.setOutputPath(job, outpath)
val jobFormat = outputFormatClass.newInstance
jobFormat.checkOutputSpecs(job)
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
/* "reduce task" <split #> <attempt # = spark task #> */
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
attemptNumber)
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
val format = outputFormatClass.newInstance
format match {
case c: Configurable => c.setConf(wrappedConf.value)
case _ => ()
}
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
while (iter.hasNext) {
val (k, v) = iter.next()
writer.write(k, v)
}
writer.close(hadoopContext)
committer.commitTask(hadoopContext)
return 1
}

/* apparently we need a TaskAttemptID to construct an OutputCommitter;
* however we're only going to use this local OutputCommitter for
* setupJob/commitJob, so we just use a dummy "map" task.
*/
val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
jobCommitter.setupJob(jobTaskContext)
self.context.runJob(self, writeShard _)
jobCommitter.commitJob(jobTaskContext)
job.setOutputFormatClass(outputFormatClass)
job.getConfiguration.set("mapred.output.dir", path)
saveAsNewAPIHadoopDataset(job.getConfiguration)
}

/**
Expand Down Expand Up @@ -692,6 +651,59 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
saveAsHadoopDataset(conf)
}

/**
* Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop
* Configuration object for that storage system. The Conf should set an OutputFormat and any
* output paths required (e.g. a table name to write to) in the same way as it would be
* configured for a Hadoop MapReduce job.
*/
def saveAsNewAPIHadoopDataset(conf: Configuration) {
val job = new NewAPIHadoopJob(conf)
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
val wrappedConf = new SerializableWritable(job.getConfiguration)
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance

if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
// FileOutputFormat ignores the filesystem parameter
jobFormat.checkOutputSpecs(job)
}

def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
/* "reduce task" <split #> <attempt # = spark task #> */
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
attemptNumber)
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
val format = outfmt.newInstance
format match {
case c: Configurable => c.setConf(wrappedConf.value)
case _ => ()
}
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
while (iter.hasNext) {
val (k, v) = iter.next()
writer.write(k, v)
}
writer.close(hadoopContext)
committer.commitTask(hadoopContext)
return 1
}

val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
jobCommitter.setupJob(jobTaskContext)
self.context.runJob(self, writeShard _)
jobCommitter.commitJob(jobTaskContext)
}

/**
* Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
* that storage system. The JobConf should set an OutputFormat and any output paths required
Expand Down
39 changes: 33 additions & 6 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import scala.io.Source
import com.google.common.io.Files
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress.DefaultCodec
import org.apache.hadoop.mapred.FileAlreadyExistsException
import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, TextOutputFormat}
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
import org.apache.hadoop.mapreduce.Job
import org.scalatest.FunSuite

import org.apache.spark.SparkContext._
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

class FileSuite extends FunSuite with LocalSparkContext {

Expand Down Expand Up @@ -236,18 +237,44 @@ class FileSuite extends FunSuite with LocalSparkContext {
val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
intercept[FileAlreadyExistsException] {
randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath)
}
}

test ("prevent user from overwriting the non-empty directory (new Hadoop API)") {
sc = new SparkContext("local", "test")
val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
randomRDD.saveAsTextFile(tempdir.getPath + "/output")
assert(new File(tempdir.getPath + "/output/part-00000").exists() === true)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath + "/output")
assert(new File(tempdir.getPath + "/output/part-r-00000").exists() === true)
intercept[FileAlreadyExistsException] {
randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath)
}
}

test ("save Hadoop Dataset through old Hadoop API") {
sc = new SparkContext("local", "test")
val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
val job = new JobConf()
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
job.set("mapred.output.format.class", classOf[TextOutputFormat[String, String]].getName)
job.set("mapred.output.dir", tempdir.getPath + "/outputDataset_old")
randomRDD.saveAsHadoopDataset(job)
assert(new File(tempdir.getPath + "/outputDataset_old/part-00000").exists() === true)
}

test ("save Hadoop Dataset through new Hadoop API") {
sc = new SparkContext("local", "test")
val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
val job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]])
job.getConfiguration.set("mapred.output.dir", tempdir.getPath + "/outputDataset_new")
randomRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
assert(new File(tempdir.getPath + "/outputDataset_new/part-r-00000").exists() === true)
}
}