Skip to content

Commit

Permalink
Same treatment for saveAsHadoopFiles
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Nov 25, 2014
1 parent b382ea9 commit bb4729a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -668,11 +668,13 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf
conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration)
) {
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
val serializableConf = new SerializableWritable(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value)
}
self.foreachRDD(saveFunc)
}
Expand Down Expand Up @@ -701,7 +703,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
conf: Configuration = ssc.sparkContext.hadoopConfiguration
) {
// Wrap this in SerializableWritable so that ForeachDStream can be serialized for checkpoints
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
val serializableConf = new SerializableWritable(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@ import java.nio.charset.Charset

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import com.google.common.io.Files
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}

import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils
import org.apache.hadoop.io.{Text, IntWritable}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

/**
* This test suites tests the checkpointing functionality of DStreams -
Expand Down Expand Up @@ -207,20 +210,19 @@ class CheckpointSuite extends TestSuiteBase {
testCheckpointedOperation(input, operation, output, 7)
}

test("recovery with saveAsNewAPIHadoopFiles") {
test("recovery with saveAsHadoopFiles operation") {
val tempDir = Files.createTempDir()
try {
testCheckpointedOperation(
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
(s: DStream[String]) => {
val output = s.map(x => (x, 1)).reduceByKey(_ + _)
output.saveAsNewAPIHadoopFiles(
output.saveAsHadoopFiles(
tempDir.toURI.toString,
"result",
classOf[Text],
classOf[IntWritable],
classOf[TextOutputFormat[Text, IntWritable]])
(tempDir.toString, "result")
output
},
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
Expand All @@ -231,6 +233,28 @@ class CheckpointSuite extends TestSuiteBase {
}
}

test("recovery with saveAsNewAPIHadoopFiles operation") {
val tempDir = Files.createTempDir()
try {
testCheckpointedOperation(
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
(s: DStream[String]) => {
val output = s.map(x => (x, 1)).reduceByKey(_ + _)
output.saveAsNewAPIHadoopFiles(
tempDir.toURI.toString,
"result",
classOf[Text],
classOf[IntWritable],
classOf[NewTextOutputFormat[Text, IntWritable]])
output
},
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
3
)
} finally {
Utils.deleteRecursively(tempDir)
}
}

// This tests whether the StateDStream's RDD checkpoints works correctly such
// that the system can recover from a master failure. This assumes as reliable,
Expand Down

0 comments on commit bb4729a

Please sign in to comment.