diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b20766172d9cd..badb250fa2650 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -188,7 +188,7 @@ class SparkContext(config: SparkConf) extends Logging { val appName = conf.get("spark.app.name") val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false) - val eventLogDir = { + val eventLogDir: Option[String] = { if (isEventLogEnabled) { Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/")) } else { @@ -291,7 +291,7 @@ class SparkContext(config: SparkConf) extends Logging { executorEnvs("SPARK_USER") = sparkUser // Create and start the scheduler - private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, eventLogDir) + private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master) private val heartbeatReceiver = env.actorSystem.actorOf( Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver") @volatile private[spark] var dagScheduler: DAGScheduler = _ @@ -1493,10 +1493,7 @@ object SparkContext extends Logging { } /** Creates a task scheduler based on a given master URL. Extracted for testing. */ - private def createTaskScheduler( - sc: SparkContext, - master: String, - eventLogDir: Option[String]): TaskScheduler = { + private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = { // Regular expression used for local[N] and local[*] master formats val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r // Regular expression for local[N, maxRetries], used in tests with failing tasks @@ -1542,7 +1539,7 @@ object SparkContext extends Logging { case SPARK_REGEX(sparkUrl) => val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) - val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, eventLogDir) + val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, sc.eventLogDir) scheduler.initialize(backend) scheduler @@ -1559,7 +1556,7 @@ object SparkContext extends Logging { val localCluster = new LocalSparkCluster( numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt) val masterUrls = localCluster.start() - val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, eventLogDir) + val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, sc.eventLogDir) scheduler.initialize(backend) backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => { localCluster.stop() diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 72974a16015eb..9b4a241a7a442 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -735,7 +735,7 @@ private[spark] class Master( } /** Generate a new app ID given a app's submission date */ - def newApplicationId(submitDate: Date) = { + def newApplicationId(submitDate: Date): String = { val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber) nextAppNumber += 1 appId diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index e0945099ddb06..bff11482f5334 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -59,11 +59,6 @@ private[spark] class EventLoggingListener( private val testing = sparkConf.getBoolean("spark.eventLog.testing", false) private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 val logDir = EventLoggingListener.getLogDirName(logBaseDir, appId) - private val name = { - val splitPath = logDir.split("/") - splitPath(splitPath.length-1) - } - protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize, shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS)) @@ -74,7 +69,7 @@ private[spark] class EventLoggingListener( * Return only the unique application directory without the base directory. */ def getApplicationLogDir(): String = { - name + logDir.split("/").last } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 7807c2271fc63..bb0ba9ee092c2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -41,6 +41,6 @@ private[spark] trait SchedulerBackend { * * @return The application ID, if the backend does not provide an ID. */ - def applicationId() = appId + def applicationId(): String = appId } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 412f24aa5404d..1eb9425d0bd4b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -129,7 +129,7 @@ private[spark] class SparkDeploySchedulerBackend( totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio } - override def applicationId() = + override def applicationId(): String = Option(appId).getOrElse { logWarning("Application ID is not initialized yet.") super.applicationId diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 23f17c2ce50b2..e823e43c38978 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -76,7 +76,7 @@ private[spark] class CoarseMesosSchedulerBackend( var nextMesosTaskId = 0 - @volatile var appId: FrameworkID = _ + @volatile var appId: String = _ def newMesosTaskId(): Int = { val id = nextMesosTaskId @@ -169,8 +169,8 @@ private[spark] class CoarseMesosSchedulerBackend( override def offerRescinded(d: SchedulerDriver, o: OfferID) {} override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { - appId = frameworkId - logInfo("Registered as framework ID " + frameworkId.getValue) + appId = frameworkId.getValue + logInfo("Registered as framework ID " + appId) registeredLock.synchronized { isRegistered = true registeredLock.notifyAll() @@ -313,8 +313,8 @@ private[spark] class CoarseMesosSchedulerBackend( slaveLost(d, s) } - override def applicationId = - Option(appId).map(_.getValue).getOrElse { + override def applicationId(): String = + Option(appId).getOrElse { logWarning("Application ID is not initialized yet.") super.applicationId } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index d1352d2b254a6..d6321bcc98a81 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -62,7 +62,7 @@ private[spark] class MesosSchedulerBackend( var classLoader: ClassLoader = null - @volatile var appId: FrameworkID = _ + @volatile var appId: String = _ override def start() { synchronized { @@ -170,8 +170,8 @@ private[spark] class MesosSchedulerBackend( override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { val oldClassLoader = setClassLoader() try { - appId = frameworkId - logInfo("Registered as framework ID " + frameworkId.getValue) + appId = frameworkId.getValue + logInfo("Registered as framework ID " + appId) registeredLock.synchronized { isRegistered = true registeredLock.notifyAll() @@ -353,8 +353,8 @@ private[spark] class MesosSchedulerBackend( // TODO: query Mesos for number of cores override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8) - override def applicationId() = - Option(appId).map(_.getValue).getOrElse { + override def applicationId(): String = + Option(appId).getOrElse { logWarning("Application ID is not initialized yet.") super.applicationId } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 70c337b3522eb..58b78f041cd85 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -116,6 +116,6 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: localActor ! StatusUpdate(taskId, state, serializedData) } - override def applicationId = appId + override def applicationId(): String = appId } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 4298e40b44872..4076ebc6fc8d5 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -40,11 +40,6 @@ object MimaExcludes { ) ++ MimaBuild.excludeSparkClass("mllib.linalg.Matrix") ++ MimaBuild.excludeSparkClass("mllib.linalg.Vector") ++ - Seq( - // Ignore SparkContext.createTaskScheduler because it's a private method. - ProblemFilters.exclude[MissingMethodProblem]( - "org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler") - ) ++ Seq( // Added normL1 and normL2 to trait MultivariateStatisticalSummary ProblemFilters.exclude[MissingMethodProblem]( diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 5dd3ef437697f..6bb4b82316ad4 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -155,7 +155,7 @@ private[spark] class YarnClientSchedulerBackend( totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio } - override def applicationId() = + override def applicationId(): String = Option(appId).map(_.toString).getOrElse { logWarning("Application ID is not initialized yet.") super.applicationId diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 25bfabdd97c48..3d042907ee428 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -48,7 +48,7 @@ private[spark] class YarnClusterSchedulerBackend( totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio } - override def applicationId() = + override def applicationId(): String = /** * In YARN Cluster mode, spark.yarn.app.id is expect to be set * before user application is launched.