From a8583ee752bf738c568bee39dd9dc130878d2ad7 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Wed, 26 Feb 2014 19:25:57 -0500 Subject: [PATCH 1/5] Create a saveAsNewAPIHadoopDataset method --- .../apache/spark/api/java/JavaPairRDD.scala | 10 +- .../apache/spark/rdd/PairRDDFunctions.scala | 104 ++++++++++-------- 2 files changed, 69 insertions(+), 45 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 0ff428c120353..000d97cdd167e 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -26,7 +26,7 @@ import com.google.common.base.Optional import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{JobConf, OutputFormat} -import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job} import org.apache.spark.{HashPartitioner, Partitioner} import org.apache.spark.Partitioner._ @@ -558,6 +558,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf) } + /** + * Output the RDD to any Hadoop-supported storage system, using + * a org.apache.hadoop.mapreduce.Job object for that storage system. + */ + def saveAsNewAPIHadoopDataset(job: Job) { + rdd.saveAsNewAPIHadoopDataset(job) + } + /** Output the RDD to any Hadoop-supported file system. */ def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]]( path: String, diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index b0d322fe27bd5..e6d08b8856f36 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -29,13 +29,21 @@ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import com.clearspring.analytics.stream.cardinality.HyperLogLog +<<<<<<< HEAD import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.fs.{FileSystem, Path} +======= +import org.apache.hadoop.conf.Configuration +>>>>>>> Create a saveAsNewAPIHadoopDataset method import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, JobContext, SparkHadoopMapReduceUtil} +<<<<<<< HEAD import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} +======= + +>>>>>>> Create a saveAsNewAPIHadoopDataset method // SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark. import org.apache.hadoop.mapred.SparkHadoopWriter @@ -603,50 +611,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) val job = new NewAPIHadoopJob(conf) job.setOutputKeyClass(keyClass) job.setOutputValueClass(valueClass) - - val wrappedConf = new SerializableWritable(job.getConfiguration) - val outpath = new Path(path) - NewFileOutputFormat.setOutputPath(job, outpath) - val jobFormat = outputFormatClass.newInstance - jobFormat.checkOutputSpecs(job) - val formatter = new SimpleDateFormat("yyyyMMddHHmm") - val jobtrackerID = formatter.format(new Date()) - val stageId = self.id - def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = { - // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it - // around by taking a mod. We expect that no task will be attempted 2 billion times. - val attemptNumber = (context.attemptId % Int.MaxValue).toInt - /* "reduce task" */ - val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId, - attemptNumber) - val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) - val format = outputFormatClass.newInstance - format match { - case c: Configurable => c.setConf(wrappedConf.value) - case _ => () - } - val committer = format.getOutputCommitter(hadoopContext) - committer.setupTask(hadoopContext) - val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]] - while (iter.hasNext) { - val (k, v) = iter.next() - writer.write(k, v) - } - writer.close(hadoopContext) - committer.commitTask(hadoopContext) - return 1 - } - - /* apparently we need a TaskAttemptID to construct an OutputCommitter; - * however we're only going to use this local OutputCommitter for - * setupJob/commitJob, so we just use a dummy "map" task. - */ - val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0) - val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) - val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) - jobCommitter.setupJob(jobTaskContext) - self.context.runJob(self, writeShard _) - jobCommitter.commitJob(jobTaskContext) + job.setOutputFormatClass(outputFormatClass) + job.getConfiguration.set("mapred.output.dir", path) + saveAsNewAPIHadoopDataset(job) } /** @@ -692,6 +659,55 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) saveAsHadoopDataset(conf) } + /** + * Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop + * Job object for that storage system. The Job should set an OutputFormat and any output paths + * required (e.g. a table name to write to) in the same way as it would be configured for a Hadoop + * MapReduce job. + */ + def saveAsNewAPIHadoopDataset(job: NewAPIHadoopJob) { + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + val jobtrackerID = formatter.format(new Date()) + val stageId = self.id + val wrappedConf = new SerializableWritable(job.getConfiguration) + val outfmt = job.getOutputFormatClass + val outputFormatInstance = outfmt.newInstance() + + if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) { + // FileOutputFormat ignores the filesystem parameter + val conf = job.getConfiguration + outputFormatInstance.checkOutputSpecs(job) + } + + def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = { + // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it + // around by taking a mod. We expect that no task will be attempted 2 billion times. + val attemptNumber = (context.attemptId % Int.MaxValue).toInt + /* "reduce task" */ + val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId, + attemptNumber) + val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) + val format = outfmt.newInstance + val committer = format.getOutputCommitter(hadoopContext) + committer.setupTask(hadoopContext) + val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]] + while (iter.hasNext) { + val (k, v) = iter.next() + writer.write(k, v) + } + writer.close(hadoopContext) + committer.commitTask(hadoopContext) + return 1 + } + val jobFormat = outfmt.newInstance + val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0) + val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) + val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) + jobCommitter.setupJob(jobTaskContext) + self.context.runJob(self, writeShard _).sum + jobCommitter.commitJob(jobTaskContext) + } + /** * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for * that storage system. The JobConf should set an OutputFormat and any output paths required From 7643c88be2903a1e39f60282c22332f6bb37889b Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 28 Feb 2014 09:32:59 -0500 Subject: [PATCH 2/5] change the parameter type back to Configuration code sync --- .../apache/spark/api/java/JavaPairRDD.scala | 6 ++-- .../apache/spark/rdd/PairRDDFunctions.scala | 32 ++++++++----------- .../scala/org/apache/spark/FileSuite.scala | 6 ++-- 3 files changed, 20 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 000d97cdd167e..9596dbaf75488 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -560,10 +560,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) /** * Output the RDD to any Hadoop-supported storage system, using - * a org.apache.hadoop.mapreduce.Job object for that storage system. + * a Configuration object for that storage system. */ - def saveAsNewAPIHadoopDataset(job: Job) { - rdd.saveAsNewAPIHadoopDataset(job) + def saveAsNewAPIHadoopDataset(conf: Configuration) { + rdd.saveAsNewAPIHadoopDataset(conf) } /** Output the RDD to any Hadoop-supported file system. */ diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index e6d08b8856f36..8734bc65aa3df 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -29,21 +29,13 @@ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import com.clearspring.analytics.stream.cardinality.HyperLogLog -<<<<<<< HEAD import org.apache.hadoop.conf.{Configurable, Configuration} -import org.apache.hadoop.fs.{FileSystem, Path} -======= -import org.apache.hadoop.conf.Configuration ->>>>>>> Create a saveAsNewAPIHadoopDataset method +import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} -import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, JobContext, SparkHadoopMapReduceUtil} -<<<<<<< HEAD +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} -======= - ->>>>>>> Create a saveAsNewAPIHadoopDataset method // SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark. import org.apache.hadoop.mapred.SparkHadoopWriter @@ -613,7 +605,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) job.setOutputValueClass(valueClass) job.setOutputFormatClass(outputFormatClass) job.getConfiguration.set("mapred.output.dir", path) - saveAsNewAPIHadoopDataset(job) + saveAsNewAPIHadoopDataset(job.getConfiguration) } /** @@ -661,22 +653,22 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) /** * Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop - * Job object for that storage system. The Job should set an OutputFormat and any output paths + * Configuration object for that storage system. The Conf should set an OutputFormat and any output paths * required (e.g. a table name to write to) in the same way as it would be configured for a Hadoop * MapReduce job. */ - def saveAsNewAPIHadoopDataset(job: NewAPIHadoopJob) { + def saveAsNewAPIHadoopDataset(conf: Configuration) { + val job = new NewAPIHadoopJob(conf) val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date()) val stageId = self.id val wrappedConf = new SerializableWritable(job.getConfiguration) val outfmt = job.getOutputFormatClass - val outputFormatInstance = outfmt.newInstance() + val jobFormat = outfmt.newInstance - if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) { + if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) { // FileOutputFormat ignores the filesystem parameter - val conf = job.getConfiguration - outputFormatInstance.checkOutputSpecs(job) + jobFormat.checkOutputSpecs(job) } def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = { @@ -688,6 +680,10 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) attemptNumber) val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) val format = outfmt.newInstance + format match { + case c: Configurable => c.setConf(wrappedConf.value) + case _ => () + } val committer = format.getOutputCommitter(hadoopContext) committer.setupTask(hadoopContext) val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]] @@ -699,7 +695,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) committer.commitTask(hadoopContext) return 1 } - val jobFormat = outfmt.newInstance + val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0) val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 76173608e9f70..5cd86b39234f0 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.mapred.FileAlreadyExistsException import org.scalatest.FunSuite import org.apache.spark.SparkContext._ -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat} class FileSuite extends FunSuite with LocalSparkContext { @@ -244,8 +244,8 @@ class FileSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test") val tempdir = Files.createTempDir() val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) - randomRDD.saveAsTextFile(tempdir.getPath + "/output") - assert(new File(tempdir.getPath + "/output/part-00000").exists() === true) + randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath + "/output") + assert(new File(tempdir.getPath + "/output/part-r-00000").exists() === true) intercept[FileAlreadyExistsException] { randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath) } From 95a692935431b8e6fb875fcdb5605acf7250006a Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 1 Mar 2014 23:20:41 -0500 Subject: [PATCH 3/5] code clean --- core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 8734bc65aa3df..ffc01ee1824f5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -700,7 +700,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) jobCommitter.setupJob(jobTaskContext) - self.context.runJob(self, writeShard _).sum + self.context.runJob(self, writeShard _) jobCommitter.commitJob(jobTaskContext) } From a8d11ba7da222cb35d6aa611deae7bab82783653 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 2 Mar 2014 18:28:42 -0500 Subject: [PATCH 4/5] style fix......... --- .../main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index ffc01ee1824f5..447deafff53cd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -653,9 +653,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) /** * Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop - * Configuration object for that storage system. The Conf should set an OutputFormat and any output paths - * required (e.g. a table name to write to) in the same way as it would be configured for a Hadoop - * MapReduce job. + * Configuration object for that storage system. The Conf should set an OutputFormat and any + * output paths required (e.g. a table name to write to) in the same way as it would be + * configured for a Hadoop MapReduce job. */ def saveAsNewAPIHadoopDataset(conf: Configuration) { val job = new NewAPIHadoopJob(conf) From 6ba0c83e06b17c9bbf388bccd258766aabfa3771 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 18 Mar 2014 09:44:15 -0400 Subject: [PATCH 5/5] add test cases for saveAsHadoopDataSet (new&old API) --- .../scala/org/apache/spark/FileSuite.scala | 37 ++++++++++++++++--- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 5cd86b39234f0..01af94077144a 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -24,11 +24,12 @@ import scala.io.Source import com.google.common.io.Files import org.apache.hadoop.io._ import org.apache.hadoop.io.compress.DefaultCodec -import org.apache.hadoop.mapred.FileAlreadyExistsException +import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, TextOutputFormat} +import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} +import org.apache.hadoop.mapreduce.Job import org.scalatest.FunSuite import org.apache.spark.SparkContext._ -import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat} class FileSuite extends FunSuite with LocalSparkContext { @@ -236,7 +237,7 @@ class FileSuite extends FunSuite with LocalSparkContext { val tempdir = Files.createTempDir() val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) intercept[FileAlreadyExistsException] { - randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath) + randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath) } } @@ -244,10 +245,36 @@ class FileSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test") val tempdir = Files.createTempDir() val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) - randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath + "/output") + randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath + "/output") assert(new File(tempdir.getPath + "/output/part-r-00000").exists() === true) intercept[FileAlreadyExistsException] { - randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath) + randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath) } } + + test ("save Hadoop Dataset through old Hadoop API") { + sc = new SparkContext("local", "test") + val tempdir = Files.createTempDir() + val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) + val job = new JobConf() + job.setOutputKeyClass(classOf[String]) + job.setOutputValueClass(classOf[String]) + job.set("mapred.output.format.class", classOf[TextOutputFormat[String, String]].getName) + job.set("mapred.output.dir", tempdir.getPath + "/outputDataset_old") + randomRDD.saveAsHadoopDataset(job) + assert(new File(tempdir.getPath + "/outputDataset_old/part-00000").exists() === true) + } + + test ("save Hadoop Dataset through new Hadoop API") { + sc = new SparkContext("local", "test") + val tempdir = Files.createTempDir() + val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) + val job = new Job(sc.hadoopConfiguration) + job.setOutputKeyClass(classOf[String]) + job.setOutputValueClass(classOf[String]) + job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]]) + job.getConfiguration.set("mapred.output.dir", tempdir.getPath + "/outputDataset_new") + randomRDD.saveAsNewAPIHadoopDataset(job.getConfiguration) + assert(new File(tempdir.getPath + "/outputDataset_new/part-r-00000").exists() === true) + } }