Skip to content

Commit

Permalink
Added data types to be returned to some methods
Browse files Browse the repository at this point in the history
Modified the signature of SparkContext#createTaskScheduler
  • Loading branch information
sarutak committed Oct 2, 2014
1 parent 6434b06 commit 16a9f01
Show file tree
Hide file tree
Showing 11 changed files with 22 additions and 35 deletions.
13 changes: 5 additions & 8 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class SparkContext(config: SparkConf) extends Logging {
val appName = conf.get("spark.app.name")

val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false)
val eventLogDir = {
val eventLogDir: Option[String] = {
if (isEventLogEnabled) {
Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/"))
} else {
Expand Down Expand Up @@ -291,7 +291,7 @@ class SparkContext(config: SparkConf) extends Logging {
executorEnvs("SPARK_USER") = sparkUser

// Create and start the scheduler
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, eventLogDir)
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
@volatile private[spark] var dagScheduler: DAGScheduler = _
Expand Down Expand Up @@ -1493,10 +1493,7 @@ object SparkContext extends Logging {
}

/** Creates a task scheduler based on a given master URL. Extracted for testing. */
private def createTaskScheduler(
sc: SparkContext,
master: String,
eventLogDir: Option[String]): TaskScheduler = {
private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {
// Regular expression used for local[N] and local[*] master formats
val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
Expand Down Expand Up @@ -1542,7 +1539,7 @@ object SparkContext extends Logging {
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, eventLogDir)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, sc.eventLogDir)
scheduler.initialize(backend)
scheduler

Expand All @@ -1559,7 +1556,7 @@ object SparkContext extends Logging {
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
val masterUrls = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, eventLogDir)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls, sc.eventLogDir)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
localCluster.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ private[spark] class Master(
}

/** Generate a new app ID given a app's submission date */
def newApplicationId(submitDate: Date) = {
def newApplicationId(submitDate: Date): String = {
val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber)
nextAppNumber += 1
appId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ private[spark] class EventLoggingListener(
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
val logDir = EventLoggingListener.getLogDirName(logBaseDir, appId)
private val name = {
val splitPath = logDir.split("/")
splitPath(splitPath.length-1)
}

protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS))

Expand All @@ -74,7 +69,7 @@ private[spark] class EventLoggingListener(
* Return only the unique application directory without the base directory.
*/
def getApplicationLogDir(): String = {
name
logDir.split("/").last
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ private[spark] trait SchedulerBackend {
*
* @return The application ID, if the backend does not provide an ID.
*/
def applicationId() = appId
def applicationId(): String = appId

}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private[spark] class SparkDeploySchedulerBackend(
totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
}

override def applicationId() =
override def applicationId(): String =
Option(appId).getOrElse {
logWarning("Application ID is not initialized yet.")
super.applicationId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private[spark] class CoarseMesosSchedulerBackend(

var nextMesosTaskId = 0

@volatile var appId: FrameworkID = _
@volatile var appId: String = _

def newMesosTaskId(): Int = {
val id = nextMesosTaskId
Expand Down Expand Up @@ -169,8 +169,8 @@ private[spark] class CoarseMesosSchedulerBackend(
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}

override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
appId = frameworkId
logInfo("Registered as framework ID " + frameworkId.getValue)
appId = frameworkId.getValue
logInfo("Registered as framework ID " + appId)
registeredLock.synchronized {
isRegistered = true
registeredLock.notifyAll()
Expand Down Expand Up @@ -313,8 +313,8 @@ private[spark] class CoarseMesosSchedulerBackend(
slaveLost(d, s)
}

override def applicationId =
Option(appId).map(_.getValue).getOrElse {
override def applicationId(): String =
Option(appId).getOrElse {
logWarning("Application ID is not initialized yet.")
super.applicationId
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[spark] class MesosSchedulerBackend(

var classLoader: ClassLoader = null

@volatile var appId: FrameworkID = _
@volatile var appId: String = _

override def start() {
synchronized {
Expand Down Expand Up @@ -170,8 +170,8 @@ private[spark] class MesosSchedulerBackend(
override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
val oldClassLoader = setClassLoader()
try {
appId = frameworkId
logInfo("Registered as framework ID " + frameworkId.getValue)
appId = frameworkId.getValue
logInfo("Registered as framework ID " + appId)
registeredLock.synchronized {
isRegistered = true
registeredLock.notifyAll()
Expand Down Expand Up @@ -353,8 +353,8 @@ private[spark] class MesosSchedulerBackend(
// TODO: query Mesos for number of cores
override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8)

override def applicationId() =
Option(appId).map(_.getValue).getOrElse {
override def applicationId(): String =
Option(appId).getOrElse {
logWarning("Application ID is not initialized yet.")
super.applicationId
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,6 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
localActor ! StatusUpdate(taskId, state, serializedData)
}

override def applicationId = appId
override def applicationId(): String = appId

}
5 changes: 0 additions & 5 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ object MimaExcludes {
) ++
MimaBuild.excludeSparkClass("mllib.linalg.Matrix") ++
MimaBuild.excludeSparkClass("mllib.linalg.Vector") ++
Seq(
// Ignore SparkContext.createTaskScheduler because it's a private method.
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler")
) ++
Seq(
// Added normL1 and normL2 to trait MultivariateStatisticalSummary
ProblemFilters.exclude[MissingMethodProblem](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private[spark] class YarnClientSchedulerBackend(
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}

override def applicationId() =
override def applicationId(): String =
Option(appId).map(_.toString).getOrElse {
logWarning("Application ID is not initialized yet.")
super.applicationId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[spark] class YarnClusterSchedulerBackend(
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}

override def applicationId() =
override def applicationId(): String =
/**
* In YARN Cluster mode, spark.yarn.app.id is expect to be set
* before user application is launched.
Expand Down

0 comments on commit 16a9f01

Please sign in to comment.