diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 429199f2075c6..39a66b21eb500 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark -import org.scalatest.BeforeAndAfter import org.scalatest.FunSuite import org.scalatest.concurrent.Timeouts._ import org.scalatest.Matchers @@ -30,16 +29,10 @@ class NotSerializableClass class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {} -class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter - with LocalSparkContext { +class DistributedSuite extends FunSuite with Matchers with LocalSparkContext { val clusterUrl = "local-cluster[2,1,512]" - after { - System.clearProperty("spark.reducer.maxMbInFlight") - System.clearProperty("spark.storage.memoryFraction") - } - test("task throws not serializable exception") { // Ensures that executors do not crash when an exn is not serializable. If executors crash, // this test will hang. Correct behavior is that executors don't crash but fail tasks @@ -85,15 +78,14 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter } test("groupByKey where map output sizes exceed maxMbInFlight") { - System.setProperty("spark.reducer.maxMbInFlight", "1") - sc = new SparkContext(clusterUrl, "test") + val conf = new SparkConf().set("spark.reducer.maxMbInFlight", "1") + sc = new SparkContext(clusterUrl, "test", conf) // This data should be around 20 MB, so even with 4 mappers and 2 reducers, each map output // file should be about 2.5 MB val pairs = sc.parallelize(1 to 2000, 4).map(x => (x % 16, new Array[Byte](10000))) val groups = pairs.groupByKey(2).map(x => (x._1, x._2.size)).collect() assert(groups.length === 16) assert(groups.map(_._2).sum === 2000) - // Note that spark.reducer.maxMbInFlight will be cleared in the test suite's after{} block } test("accumulators") { @@ -211,7 +203,6 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter } test("compute without caching when no partitions fit in memory") { - System.setProperty("spark.storage.memoryFraction", "0.0001") sc = new SparkContext(clusterUrl, "test") // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache // to only 50 KB (0.0001 of 512 MB), so no partitions should fit in memory @@ -219,12 +210,11 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter assert(data.count() === 4000000) assert(data.count() === 4000000) assert(data.count() === 4000000) - System.clearProperty("spark.storage.memoryFraction") } test("compute when only some partitions fit in memory") { - System.setProperty("spark.storage.memoryFraction", "0.01") - sc = new SparkContext(clusterUrl, "test") + val conf = new SparkConf().set("spark.storage.memoryFraction", "0.01") + sc = new SparkContext(clusterUrl, "test", conf) // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache // to only 5 MB (0.01 of 512 MB), so not all of it will fit in memory; we use 20 partitions // to make sure that *some* of them do fit though @@ -232,7 +222,6 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter assert(data.count() === 4000000) assert(data.count() === 4000000) assert(data.count() === 4000000) - System.clearProperty("spark.storage.memoryFraction") } test("passing environment variables to cluster") { diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index 379c2a6ea4b55..1cae5ed875ac8 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -32,10 +32,11 @@ class FileServerSuite extends FunSuite with LocalSparkContext { @transient var tmpFile: File = _ @transient var tmpJarUrl: String = _ + def newConf: SparkConf = new SparkConf(loadDefaults = false).set("spark.authenticate", "false") + override def beforeEach() { super.beforeEach() resetSparkContext() - System.setProperty("spark.authenticate", "false") } override def beforeAll() { @@ -53,7 +54,6 @@ class FileServerSuite extends FunSuite with LocalSparkContext { val jarFile = new File(testTempDir, "test.jar") val jarStream = new FileOutputStream(jarFile) val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest()) - System.setProperty("spark.authenticate", "false") val jarEntry = new JarEntry(textFile.getName) jar.putNextEntry(jarEntry) @@ -75,7 +75,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { } test("Distributing files locally") { - sc = new SparkContext("local[4]", "test") + sc = new SparkContext("local[4]", "test", newConf) sc.addFile(tmpFile.toString) val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) val result = sc.parallelize(testData).reduceByKey { @@ -109,7 +109,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { test("Distributing files locally using URL as input") { // addFile("file:///....") - sc = new SparkContext("local[4]", "test") + sc = new SparkContext("local[4]", "test", newConf) sc.addFile(new File(tmpFile.toString).toURI.toString) val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) val result = sc.parallelize(testData).reduceByKey { @@ -123,7 +123,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { } test ("Dynamically adding JARS locally") { - sc = new SparkContext("local[4]", "test") + sc = new SparkContext("local[4]", "test", newConf) sc.addJar(tmpJarUrl) val testData = Array((1, 1)) sc.parallelize(testData).foreach { x => @@ -134,7 +134,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { } test("Distributing files on a standalone cluster") { - sc = new SparkContext("local-cluster[1,1,512]", "test") + sc = new SparkContext("local-cluster[1,1,512]", "test", newConf) sc.addFile(tmpFile.toString) val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) val result = sc.parallelize(testData).reduceByKey { @@ -148,7 +148,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { } test ("Dynamically adding JARS on a standalone cluster") { - sc = new SparkContext("local-cluster[1,1,512]", "test") + sc = new SparkContext("local-cluster[1,1,512]", "test", newConf) sc.addJar(tmpJarUrl) val testData = Array((1,1)) sc.parallelize(testData).foreach { x => @@ -159,7 +159,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { } test ("Dynamically adding JARS on a standalone cluster using local: URL") { - sc = new SparkContext("local-cluster[1,1,512]", "test") + sc = new SparkContext("local-cluster[1,1,512]", "test", newConf) sc.addJar(tmpJarUrl.replace("file", "local")) val testData = Array((1,1)) sc.parallelize(testData).foreach { x => diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index a57430e829ced..a5d2b426df077 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -41,12 +41,11 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter override def afterEach() { super.afterEach() resetSparkContext() - System.clearProperty("spark.scheduler.mode") } test("local mode, FIFO scheduler") { - System.setProperty("spark.scheduler.mode", "FIFO") - sc = new SparkContext("local[2]", "test") + val conf = new SparkConf().set("spark.scheduler.mode", "FIFO") + sc = new SparkContext("local[2]", "test", conf) testCount() testTake() // Make sure we can still launch tasks. @@ -54,10 +53,10 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter } test("local mode, fair scheduler") { - System.setProperty("spark.scheduler.mode", "FAIR") + val conf = new SparkConf().set("spark.scheduler.mode", "FAIR") val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() - System.setProperty("spark.scheduler.allocation.file", xmlPath) - sc = new SparkContext("local[2]", "test") + conf.set("spark.scheduler.allocation.file", xmlPath) + sc = new SparkContext("local[2]", "test", conf) testCount() testTake() // Make sure we can still launch tasks. @@ -65,8 +64,8 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter } test("cluster mode, FIFO scheduler") { - System.setProperty("spark.scheduler.mode", "FIFO") - sc = new SparkContext("local-cluster[2,1,512]", "test") + val conf = new SparkConf().set("spark.scheduler.mode", "FIFO") + sc = new SparkContext("local-cluster[2,1,512]", "test", conf) testCount() testTake() // Make sure we can still launch tasks. @@ -74,10 +73,10 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter } test("cluster mode, fair scheduler") { - System.setProperty("spark.scheduler.mode", "FAIR") + val conf = new SparkConf().set("spark.scheduler.mode", "FAIR") val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() - System.setProperty("spark.scheduler.allocation.file", xmlPath) - sc = new SparkContext("local-cluster[2,1,512]", "test") + conf.set("spark.scheduler.allocation.file", xmlPath) + sc = new SparkContext("local-cluster[2,1,512]", "test", conf) testCount() testTake() // Make sure we can still launch tasks. diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index d8e4765edffbd..96cb8e48644d9 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -36,19 +36,15 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex conf.set("spark.test.noStageRetry", "true") test("groupByKey without compression") { - try { - System.setProperty("spark.shuffle.compress", "false") - sc = new SparkContext("local", "test", conf) - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4) - val groups = pairs.groupByKey(4).collect() - assert(groups.size === 2) - val valuesFor1 = groups.find(_._1 == 1).get._2 - assert(valuesFor1.toList.sorted === List(1, 2, 3)) - val valuesFor2 = groups.find(_._1 == 2).get._2 - assert(valuesFor2.toList.sorted === List(1)) - } finally { - System.setProperty("spark.shuffle.compress", "true") - } + val myConf = conf.clone().set("spark.shuffle.compress", "false") + sc = new SparkContext("local", "test", myConf) + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4) + val groups = pairs.groupByKey(4).collect() + assert(groups.size === 2) + val valuesFor1 = groups.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) } test("shuffle non-zero block size") { diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 5d018ea9868a7..790976a5ac308 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -19,27 +19,20 @@ package org.apache.spark import org.scalatest.FunSuite import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer} +import org.apache.spark.util.ResetSystemProperties import com.esotericsoftware.kryo.Kryo -class SparkConfSuite extends FunSuite with LocalSparkContext { +class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemProperties { test("loading from system properties") { - try { - System.setProperty("spark.test.testProperty", "2") - val conf = new SparkConf() - assert(conf.get("spark.test.testProperty") === "2") - } finally { - System.clearProperty("spark.test.testProperty") - } + System.setProperty("spark.test.testProperty", "2") + val conf = new SparkConf() + assert(conf.get("spark.test.testProperty") === "2") } test("initializing without loading defaults") { - try { - System.setProperty("spark.test.testProperty", "2") - val conf = new SparkConf(false) - assert(!conf.contains("spark.test.testProperty")) - } finally { - System.clearProperty("spark.test.testProperty") - } + System.setProperty("spark.test.testProperty", "2") + val conf = new SparkConf(false) + assert(!conf.contains("spark.test.testProperty")) } test("named set methods") { @@ -117,23 +110,17 @@ class SparkConfSuite extends FunSuite with LocalSparkContext { test("nested property names") { // This wasn't supported by some external conf parsing libraries - try { - System.setProperty("spark.test.a", "a") - System.setProperty("spark.test.a.b", "a.b") - System.setProperty("spark.test.a.b.c", "a.b.c") - val conf = new SparkConf() - assert(conf.get("spark.test.a") === "a") - assert(conf.get("spark.test.a.b") === "a.b") - assert(conf.get("spark.test.a.b.c") === "a.b.c") - conf.set("spark.test.a.b", "A.B") - assert(conf.get("spark.test.a") === "a") - assert(conf.get("spark.test.a.b") === "A.B") - assert(conf.get("spark.test.a.b.c") === "a.b.c") - } finally { - System.clearProperty("spark.test.a") - System.clearProperty("spark.test.a.b") - System.clearProperty("spark.test.a.b.c") - } + System.setProperty("spark.test.a", "a") + System.setProperty("spark.test.a.b", "a.b") + System.setProperty("spark.test.a.b.c", "a.b.c") + val conf = new SparkConf() + assert(conf.get("spark.test.a") === "a") + assert(conf.get("spark.test.a.b") === "a.b") + assert(conf.get("spark.test.a.b.c") === "a.b.c") + conf.set("spark.test.a.b", "A.B") + assert(conf.get("spark.test.a") === "a") + assert(conf.get("spark.test.a.b") === "A.B") + assert(conf.get("spark.test.a.b.c") === "a.b.c") } test("register kryo classes through registerKryoClasses") { diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala index 0390a2e4f1dbb..8ae4f243ec1ae 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -27,10 +27,13 @@ import org.apache.spark.scheduler.local.LocalBackend class SparkContextSchedulerCreationSuite extends FunSuite with LocalSparkContext with PrivateMethodTester with Logging { - def createTaskScheduler(master: String): TaskSchedulerImpl = { + def createTaskScheduler(master: String): TaskSchedulerImpl = + createTaskScheduler(master, new SparkConf()) + + def createTaskScheduler(master: String, conf: SparkConf): TaskSchedulerImpl = { // Create local SparkContext to setup a SparkEnv. We don't actually want to start() the // real schedulers, so we don't want to create a full SparkContext with the desired scheduler. - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) val createTaskSchedulerMethod = PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler) val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master) @@ -102,19 +105,13 @@ class SparkContextSchedulerCreationSuite } test("local-default-parallelism") { - val defaultParallelism = System.getProperty("spark.default.parallelism") - System.setProperty("spark.default.parallelism", "16") - val sched = createTaskScheduler("local") + val conf = new SparkConf().set("spark.default.parallelism", "16") + val sched = createTaskScheduler("local", conf) sched.backend match { case s: LocalBackend => assert(s.defaultParallelism() === 16) case _ => fail() } - - Option(defaultParallelism) match { - case Some(v) => System.setProperty("spark.default.parallelism", v) - case _ => System.clearProperty("spark.default.parallelism") - } } test("simr") { @@ -155,9 +152,10 @@ class SparkContextSchedulerCreationSuite testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnClientClusterScheduler") } - def testMesos(master: String, expectedClass: Class[_]) { + def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) { + val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString) try { - val sched = createTaskScheduler(master) + val sched = createTaskScheduler(master, conf) assert(sched.backend.getClass === expectedClass) } catch { case e: UnsatisfiedLinkError => @@ -168,17 +166,14 @@ class SparkContextSchedulerCreationSuite } test("mesos fine-grained") { - System.setProperty("spark.mesos.coarse", "false") - testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend]) + testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend], coarse = false) } test("mesos coarse-grained") { - System.setProperty("spark.mesos.coarse", "true") - testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend]) + testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend], coarse = true) } test("mesos with zookeeper") { - System.setProperty("spark.mesos.coarse", "false") - testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend]) + testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend], coarse = false) } } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 9e454ddcc52a6..58ecb06df4a01 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -23,55 +23,37 @@ import org.apache.hadoop.io.BytesWritable class SparkContextSuite extends FunSuite with LocalSparkContext { - /** Allows system properties to be changed in tests */ - private def withSystemProperty[T](property: String, value: String)(block: => T): T = { - val originalValue = System.getProperty(property) - try { - System.setProperty(property, value) - block - } finally { - if (originalValue == null) { - System.clearProperty(property) - } else { - System.setProperty(property, originalValue) - } - } - } - test("Only one SparkContext may be active at a time") { // Regression test for SPARK-4180 - withSystemProperty("spark.driver.allowMultipleContexts", "false") { - val conf = new SparkConf().setAppName("test").setMaster("local") - sc = new SparkContext(conf) - // A SparkContext is already running, so we shouldn't be able to create a second one - intercept[SparkException] { new SparkContext(conf) } - // After stopping the running context, we should be able to create a new one - resetSparkContext() - sc = new SparkContext(conf) - } + val conf = new SparkConf().setAppName("test").setMaster("local") + .set("spark.driver.allowMultipleContexts", "false") + sc = new SparkContext(conf) + // A SparkContext is already running, so we shouldn't be able to create a second one + intercept[SparkException] { new SparkContext(conf) } + // After stopping the running context, we should be able to create a new one + resetSparkContext() + sc = new SparkContext(conf) } test("Can still construct a new SparkContext after failing to construct a previous one") { - withSystemProperty("spark.driver.allowMultipleContexts", "false") { - // This is an invalid configuration (no app name or master URL) - intercept[SparkException] { - new SparkContext(new SparkConf()) - } - // Even though those earlier calls failed, we should still be able to create a new context - sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test")) + val conf = new SparkConf().set("spark.driver.allowMultipleContexts", "false") + // This is an invalid configuration (no app name or master URL) + intercept[SparkException] { + new SparkContext(conf) } + // Even though those earlier calls failed, we should still be able to create a new context + sc = new SparkContext(conf.setMaster("local").setAppName("test")) } test("Check for multiple SparkContexts can be disabled via undocumented debug option") { - withSystemProperty("spark.driver.allowMultipleContexts", "true") { - var secondSparkContext: SparkContext = null - try { - val conf = new SparkConf().setAppName("test").setMaster("local") - sc = new SparkContext(conf) - secondSparkContext = new SparkContext(conf) - } finally { - Option(secondSparkContext).foreach(_.stop()) - } + var secondSparkContext: SparkContext = null + try { + val conf = new SparkConf().setAppName("test").setMaster("local") + .set("spark.driver.allowMultipleContexts", "true") + sc = new SparkContext(conf) + secondSparkContext = new SparkContext(conf) + } finally { + Option(secondSparkContext).foreach(_.stop()) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index eb7bd7ab3986e..5eda2d41f0e6d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -23,11 +23,13 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark._ import org.apache.spark.deploy.SparkSubmit._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{ResetSystemProperties, Utils} import org.scalatest.FunSuite import org.scalatest.Matchers -class SparkSubmitSuite extends FunSuite with Matchers { +// Note: this suite mixes in ResetSystemProperties because SparkSubmit.main() sets a bunch +// of properties that neeed to be cleared after tests. +class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties { def beforeAll() { System.setProperty("spark.testing", "true") } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index abe0dc35b07e2..e5b6f72b802c2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -27,9 +27,10 @@ import org.scalatest.Matchers import org.apache.spark.{LocalSparkContext, SparkContext} import org.apache.spark.SparkContext._ import org.apache.spark.executor.TaskMetrics +import org.apache.spark.util.ResetSystemProperties -class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers - with BeforeAndAfter with BeforeAndAfterAll { +class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers with BeforeAndAfter + with BeforeAndAfterAll with ResetSystemProperties { /** Length of time to wait while draining listener events. */ val WAIT_TIMEOUT_MILLIS = 10000 @@ -38,10 +39,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers sc = new SparkContext("local", "SparkListenerSuite") } - override def afterAll() { - System.clearProperty("spark.akka.frameSize") - } - test("basic creation and shutdown of LiveListenerBus") { val counter = new BasicJobCounter val bus = new LiveListenerBus diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index 5768a3a733f00..3aab5a156ee77 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} -import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv} +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv} import org.apache.spark.storage.TaskResultBlockId /** @@ -55,27 +55,20 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule /** * Tests related to handling task results (both direct and indirect). */ -class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll - with LocalSparkContext { +class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { - override def beforeAll { - // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small - // as we can make it) so the tests don't take too long. - System.setProperty("spark.akka.frameSize", "1") - } - - override def afterAll { - System.clearProperty("spark.akka.frameSize") - } + // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small + // as we can make it) so the tests don't take too long. + def conf: SparkConf = new SparkConf().set("spark.akka.frameSize", "1") test("handling results smaller than Akka frame size") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x) assert(result === 2) } test("handling results larger than Akka frame size") { - sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test", conf) val akkaFrameSize = sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x) @@ -89,7 +82,7 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA test("task retried if result missing from block manager") { // Set the maximum number of task failures to > 0, so that the task set isn't aborted // after the result is missing. - sc = new SparkContext("local[1,2]", "test") + sc = new SparkContext("local[1,2]", "test", conf) // If this test hangs, it's probably because no resource offers were made after the task // failed. val scheduler: TaskSchedulerImpl = sc.taskScheduler match { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 7532da88c6065..40aaf9dd1f1e9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -162,12 +162,12 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin } test("Fair Scheduler Test") { - sc = new SparkContext("local", "TaskSchedulerImplSuite") + val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() + val conf = new SparkConf().set("spark.scheduler.allocation.file", xmlPath) + sc = new SparkContext("local", "TaskSchedulerImplSuite", conf) val taskScheduler = new TaskSchedulerImpl(sc) val taskSet = FakeTask.createTaskSet(1) - val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() - System.setProperty("spark.scheduler.allocation.file", xmlPath) val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) schedulableBuilder.buildPools() diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 5554efbcbadf8..ffe6f039145ea 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -33,7 +33,7 @@ import akka.util.Timeout import org.mockito.Mockito.{mock, when} -import org.scalatest.{BeforeAndAfter, FunSuite, Matchers, PrivateMethodTester} +import org.scalatest._ import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts._ @@ -44,18 +44,17 @@ import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat -import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils} +import org.apache.spark.util._ -class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter - with PrivateMethodTester { +class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach + with PrivateMethodTester with ResetSystemProperties { private val conf = new SparkConf(false) var store: BlockManager = null var store2: BlockManager = null var actorSystem: ActorSystem = null var master: BlockManagerMaster = null - var oldArch: String = null conf.set("spark.authenticate", "false") val securityMgr = new SecurityManager(conf) val mapOutputTracker = new MapOutputTrackerMaster(conf) @@ -79,13 +78,13 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter manager } - before { + override def beforeEach(): Unit = { val (actorSystem, boundPort) = AkkaUtils.createActorSystem( "test", "localhost", 0, conf = conf, securityManager = securityMgr) this.actorSystem = actorSystem // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case - oldArch = System.setProperty("os.arch", "amd64") + System.setProperty("os.arch", "amd64") conf.set("os.arch", "amd64") conf.set("spark.test.useCompressedOops", "true") conf.set("spark.driver.port", boundPort.toString) @@ -100,7 +99,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter SizeEstimator invokePrivate initialize() } - after { + override def afterEach(): Unit = { if (store != null) { store.stop() store = null @@ -113,14 +112,6 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter actorSystem.awaitTermination() actorSystem = null master = null - - if (oldArch != null) { - conf.set("os.arch", oldArch) - } else { - System.clearProperty("os.arch") - } - - System.clearProperty("spark.test.useCompressedOops") } test("StorageLevel object caching") { diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala index 7bca1711ae226..6bbf72e929dcb 100644 --- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.storage.BlockManagerId /** * Test the AkkaUtils with various security settings. */ -class AkkaUtilsSuite extends FunSuite with LocalSparkContext { +class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemProperties { test("remote fetch security bad password") { val conf = new SparkConf diff --git a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala new file mode 100644 index 0000000000000..d4b92f33dd9e6 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.util.Properties + +import org.scalatest.{BeforeAndAfterEach, Suite} + +/** + * Mixin for automatically resetting system properties that are modified in ScalaTest tests. + * This resets the properties after each individual test. + * + * The order in which fixtures are mixed in affects the order in which they are invoked by tests. + * If we have a suite `MySuite extends FunSuite with Foo with Bar`, then + * Bar's `super` is Foo, so Bar's beforeEach() will and afterEach() methods will be invoked first + * by the rest runner. + * + * This means that ResetSystemProperties should appear as the last trait in test suites that it's + * mixed into in order to ensure that the system properties snapshot occurs as early as possible. + * ResetSystemProperties calls super.afterEach() before performing its own cleanup, ensuring that + * the old properties are restored as late as possible. + * + * See the "Composing fixtures by stacking traits" section at + * http://www.scalatest.org/user_guide/sharing_fixtures for more details about this pattern. + */ +private[spark] trait ResetSystemProperties extends BeforeAndAfterEach { this: Suite => + var oldProperties: Properties = null + + override def beforeEach(): Unit = { + oldProperties = new Properties(System.getProperties) + super.beforeEach() + } + + override def afterEach(): Unit = { + try { + super.afterEach() + } finally { + System.setProperties(oldProperties) + oldProperties = null + } + } +} diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala index 0ea2d13a83505..7424c2e91d4f2 100644 --- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala @@ -17,9 +17,7 @@ package org.apache.spark.util -import org.scalatest.BeforeAndAfterAll -import org.scalatest.FunSuite -import org.scalatest.PrivateMethodTester +import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite, PrivateMethodTester} class DummyClass1 {} @@ -46,20 +44,12 @@ class DummyString(val arr: Array[Char]) { } class SizeEstimatorSuite - extends FunSuite with BeforeAndAfterAll with PrivateMethodTester { + extends FunSuite with BeforeAndAfterEach with PrivateMethodTester with ResetSystemProperties { - var oldArch: String = _ - var oldOops: String = _ - - override def beforeAll() { + override def beforeEach() { // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case - oldArch = System.setProperty("os.arch", "amd64") - oldOops = System.setProperty("spark.test.useCompressedOops", "true") - } - - override def afterAll() { - resetOrClear("os.arch", oldArch) - resetOrClear("spark.test.useCompressedOops", oldOops) + System.setProperty("os.arch", "amd64") + System.setProperty("spark.test.useCompressedOops", "true") } test("simple classes") { @@ -122,7 +112,7 @@ class SizeEstimatorSuite } test("32-bit arch") { - val arch = System.setProperty("os.arch", "x86") + System.setProperty("os.arch", "x86") val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() @@ -131,14 +121,13 @@ class SizeEstimatorSuite assertResult(48)(SizeEstimator.estimate(DummyString("a"))) assertResult(48)(SizeEstimator.estimate(DummyString("ab"))) assertResult(56)(SizeEstimator.estimate(DummyString("abcdefgh"))) - resetOrClear("os.arch", arch) } // NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors // (Sun vs IBM). Use a DummyString class to make tests deterministic. test("64-bit arch with no compressed oops") { - val arch = System.setProperty("os.arch", "amd64") - val oops = System.setProperty("spark.test.useCompressedOops", "false") + System.setProperty("os.arch", "amd64") + System.setProperty("spark.test.useCompressedOops", "false") val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() @@ -146,16 +135,5 @@ class SizeEstimatorSuite assertResult(64)(SizeEstimator.estimate(DummyString("a"))) assertResult(64)(SizeEstimator.estimate(DummyString("ab"))) assertResult(72)(SizeEstimator.estimate(DummyString("abcdefgh"))) - - resetOrClear("os.arch", arch) - resetOrClear("spark.test.useCompressedOops", oops) - } - - def resetOrClear(prop: String, oldValue: String) { - if (oldValue != null) { - System.setProperty(prop, oldValue) - } else { - System.clearProperty(prop) - } } } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index f9d4bea823f7c..4544382094f96 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -31,7 +31,7 @@ import org.scalatest.FunSuite import org.apache.spark.SparkConf -class UtilsSuite extends FunSuite { +class UtilsSuite extends FunSuite with ResetSystemProperties { test("bytesToString") { assert(Utils.bytesToString(10) === "10.0 B") diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala index adecd934358c4..1b53f3edbe92e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala @@ -28,11 +28,9 @@ object BroadcastTest { val bcName = if (args.length > 2) args(2) else "Http" val blockSize = if (args.length > 3) args(3) else "4096" - System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + - "BroadcastFactory") - System.setProperty("spark.broadcast.blockSize", blockSize) val sparkConf = new SparkConf().setAppName("Broadcast Test") - + .set("spark.broadcast.factory", s"org.apache.spark.broadcast.${bcName}BroaddcastFactory") + .set("spark.broadcast.blockSize", blockSize) val sc = new SparkContext(sparkConf) val slices = if (args.length > 0) args(0).toInt else 2 diff --git a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java index 6e1f01900071b..1e24da7f5f60c 100644 --- a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -17,6 +17,7 @@ package org.apache.spark.streaming; +import org.apache.spark.SparkConf; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.After; import org.junit.Before; @@ -27,8 +28,11 @@ public abstract class LocalJavaStreamingContext { @Before public void setUp() { - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); - ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + SparkConf conf = new SparkConf() + .setMaster("local[2]") + .setAppName("test") + .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext(conf, new Duration(1000)); ssc.checkpoint("checkpoint"); } diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java index 6e1f01900071b..1e24da7f5f60c 100644 --- a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ b/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -17,6 +17,7 @@ package org.apache.spark.streaming; +import org.apache.spark.SparkConf; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.After; import org.junit.Before; @@ -27,8 +28,11 @@ public abstract class LocalJavaStreamingContext { @Before public void setUp() { - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); - ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + SparkConf conf = new SparkConf() + .setMaster("local[2]") + .setAppName("test") + .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext(conf, new Duration(1000)); ssc.checkpoint("checkpoint"); } diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java index 6e1f01900071b..1e24da7f5f60c 100644 --- a/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ b/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -17,6 +17,7 @@ package org.apache.spark.streaming; +import org.apache.spark.SparkConf; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.After; import org.junit.Before; @@ -27,8 +28,11 @@ public abstract class LocalJavaStreamingContext { @Before public void setUp() { - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); - ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + SparkConf conf = new SparkConf() + .setMaster("local[2]") + .setAppName("test") + .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext(conf, new Duration(1000)); ssc.checkpoint("checkpoint"); } diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java index 6e1f01900071b..1e24da7f5f60c 100644 --- a/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ b/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -17,6 +17,7 @@ package org.apache.spark.streaming; +import org.apache.spark.SparkConf; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.After; import org.junit.Before; @@ -27,8 +28,11 @@ public abstract class LocalJavaStreamingContext { @Before public void setUp() { - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); - ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + SparkConf conf = new SparkConf() + .setMaster("local[2]") + .setAppName("test") + .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext(conf, new Duration(1000)); ssc.checkpoint("checkpoint"); } diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java index 6e1f01900071b..1e24da7f5f60c 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -17,6 +17,7 @@ package org.apache.spark.streaming; +import org.apache.spark.SparkConf; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.After; import org.junit.Before; @@ -27,8 +28,11 @@ public abstract class LocalJavaStreamingContext { @Before public void setUp() { - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); - ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + SparkConf conf = new SparkConf() + .setMaster("local[2]") + .setAppName("test") + .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext(conf, new Duration(1000)); ssc.checkpoint("checkpoint"); } diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala index db58eb642b56d..15ee95070a3d3 100644 --- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala +++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala @@ -21,7 +21,7 @@ import java.util.concurrent.{CountDownLatch, Executors} import java.util.concurrent.atomic.AtomicLong import org.apache.spark.executor.ShuffleWriteMetrics -import org.apache.spark.SparkContext +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.util.Utils @@ -49,13 +49,13 @@ object StoragePerfTester { val writeData = "1" * recordLength val executor = Executors.newFixedThreadPool(numMaps) - System.setProperty("spark.shuffle.compress", "false") - System.setProperty("spark.shuffle.sync", "true") - System.setProperty("spark.shuffle.manager", - "org.apache.spark.shuffle.hash.HashShuffleManager") + val conf = new SparkConf() + .set("spark.shuffle.compress", "false") + .set("spark.shuffle.sync", "true") + .set("spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager") // This is only used to instantiate a BlockManager. All thread scheduling is done manually. - val sc = new SparkContext("local[4]", "Write Tester") + val sc = new SparkContext("local[4]", "Write Tester", conf) val hashShuffleManager = sc.env.shuffleManager.asInstanceOf[HashShuffleManager] def writeOutputBytes(mapId: Int, total: AtomicLong) = {