Skip to content

Commit

Permalink
Update tests to use dummy time config
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jan 6, 2016
1 parent 08b026e commit 75ac218
Showing 1 changed file with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
test("from conf with settings") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
// TODO(josh): Update these exmaples to use a different configuration.
myConf.set("spark.cleaner.periodicGC.interval", "10s")
myConf.set("spark.dummyTimeConfig", "10s")
ssc = new StreamingContext(myConf, batchDuration)
assert(ssc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "-1") === 10)
assert(ssc.conf.getTimeAsSeconds("spark.dummyTimeConfig", "-1") === 10)
}

test("from existing SparkContext") {
Expand All @@ -94,27 +94,27 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo

test("from existing SparkContext with settings") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
myConf.set("spark.cleaner.periodicGC.interval", "10s")
myConf.set("spark.dummyTimeConfig", "10s")
ssc = new StreamingContext(myConf, batchDuration)
assert(ssc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "-1") === 10)
assert(ssc.conf.getTimeAsSeconds("spark.dummyTimeConfig", "-1") === 10)
}

test("from checkpoint") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
myConf.set("spark.cleaner.periodicGC.interval", "10s")
myConf.set("spark.dummyTimeConfig", "10s")
val ssc1 = new StreamingContext(myConf, batchDuration)
addInputStream(ssc1).register()
ssc1.start()
val cp = new Checkpoint(ssc1, Time(1000))
assert(
Utils.timeStringAsSeconds(cp.sparkConfPairs
.toMap.getOrElse("spark.cleaner.periodicGC.interval", "-1")) === 10)
.toMap.getOrElse("spark.dummyTimeConfig", "-1")) === 10)
ssc1.stop()
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
assert(
newCp.createSparkConf().getTimeAsSeconds("spark.cleaner.periodicGC.interval", "-1") === 10)
newCp.createSparkConf().getTimeAsSeconds("spark.dummyTimeConfig", "-1") === 10)
ssc = new StreamingContext(null, newCp, null)
assert(ssc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "-1") === 10)
assert(ssc.conf.getTimeAsSeconds("spark.dummyTimeConfig", "-1") === 10)
}

test("checkPoint from conf") {
Expand Down Expand Up @@ -290,7 +290,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo

test("stop gracefully") {
val conf = new SparkConf().setMaster(master).setAppName(appName)
conf.set("spark.cleaner.periodicGC.interval", "3600s")
conf.set("spark.dummyTimeConfig", "3600s")
sc = new SparkContext(conf)
for (i <- 1 to 4) {
logInfo("==================================\n\n\n")
Expand Down

0 comments on commit 75ac218

Please sign in to comment.