Skip to content

Commit

Permalink
SHS-NG M4.7: Remove JobProgressListener.
Browse files Browse the repository at this point in the history
The only remaining use of this class was the SparkStatusTracker, which
was modified to use the new state store. The test code to wait for
executors was moved to TestUtils and now uses the SparkStatusTracker API.

As part of this change I also modified the streaming UI to read the needed
data from the store, which was missed in the previous patch that made
JobProgressListener redundant.
  • Loading branch information
Marcelo Vanzin committed May 1, 2017
1 parent fedc9ec commit d8f3598
Show file tree
Hide file tree
Showing 17 changed files with 113 additions and 1,438 deletions.
11 changes: 1 addition & 10 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ import org.apache.spark.status.AppStateStore
import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
import org.apache.spark.ui.jobs.JobProgressListener
import org.apache.spark.util._

/**
Expand Down Expand Up @@ -197,7 +196,6 @@ class SparkContext(config: SparkConf) extends Logging {
private var _eventLogDir: Option[URI] = None
private var _eventLogCodec: Option[String] = None
private var _env: SparkEnv = _
private var _jobProgressListener: JobProgressListener = _
private var _statusTracker: SparkStatusTracker = _
private var _progressBar: Option[ConsoleProgressBar] = None
private var _ui: Option[SparkUI] = None
Expand Down Expand Up @@ -270,8 +268,6 @@ class SparkContext(config: SparkConf) extends Logging {
val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
map.asScala
}
private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener

def statusTracker: SparkStatusTracker = _statusTracker

private[spark] def progressBar: Option[ConsoleProgressBar] = _progressBar
Expand Down Expand Up @@ -425,11 +421,6 @@ class SparkContext(config: SparkConf) extends Logging {

if (master == "yarn" && deployMode == "client") System.setProperty("SPARK_YARN_MODE", "true")

// "_jobProgressListener" should be set up before creating SparkEnv because when creating
// "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
_jobProgressListener = new JobProgressListener(_conf)
listenerBus.addListener(jobProgressListener)

// Initialize the app state store and listener before SparkEnv is created so that it gets
// all events.
_stateStore = AppStateStore.createTempStore(conf, listenerBus)
Expand All @@ -444,7 +435,7 @@ class SparkContext(config: SparkConf) extends Logging {
_conf.set("spark.repl.class.uri", replUri)
}

_statusTracker = new SparkStatusTracker(this)
_statusTracker = new SparkStatusTracker(this, _stateStore)

_progressBar =
if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
Expand Down
79 changes: 37 additions & 42 deletions core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.apache.spark

import java.util.Arrays

import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.status.AppStateStore
import org.apache.spark.status.api.v1.StageStatus

/**
* Low-level status reporting APIs for monitoring job and stage progress.
Expand All @@ -33,9 +37,7 @@ import org.apache.spark.scheduler.TaskSchedulerImpl
*
* NOTE: this class's constructor should be considered private and may be subject to change.
*/
class SparkStatusTracker private[spark] (sc: SparkContext) {

private val jobProgressListener = sc.jobProgressListener
class SparkStatusTracker private[spark] (sc: SparkContext, store: AppStateStore) {

/**
* Return a list of all known jobs in a particular job group. If `jobGroup` is `null`, then
Expand All @@ -46,9 +48,8 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
* its result.
*/
def getJobIdsForGroup(jobGroup: String): Array[Int] = {
jobProgressListener.synchronized {
jobProgressListener.jobGroupToJobIds.getOrElse(jobGroup, Seq.empty).toArray
}
val expected = Option(jobGroup)
store.jobsList(null).filter(_.jobGroup == expected).map(_.jobId).toArray
}

/**
Expand All @@ -57,9 +58,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
* This method does not guarantee the order of the elements in its result.
*/
def getActiveStageIds(): Array[Int] = {
jobProgressListener.synchronized {
jobProgressListener.activeStages.values.map(_.stageId).toArray
}
store.stageList(Arrays.asList(StageStatus.ACTIVE)).map(_.stageId).toArray
}

/**
Expand All @@ -68,19 +67,18 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
* This method does not guarantee the order of the elements in its result.
*/
def getActiveJobIds(): Array[Int] = {
jobProgressListener.synchronized {
jobProgressListener.activeJobs.values.map(_.jobId).toArray
}
store.jobsList(Arrays.asList(JobExecutionStatus.RUNNING)).map(_.jobId).toArray
}

/**
* Returns job information, or `None` if the job info could not be found or was garbage collected.
*/
def getJobInfo(jobId: Int): Option[SparkJobInfo] = {
jobProgressListener.synchronized {
jobProgressListener.jobIdToData.get(jobId).map { data =>
new SparkJobInfoImpl(jobId, data.stageIds.toArray, data.status)
}
try {
val job = store.job(jobId)
Some(new SparkJobInfoImpl(jobId, job.stageIds.toArray, job.status))
} catch {
case _: NoSuchElementException => None
}
}

Expand All @@ -89,39 +87,36 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
* garbage collected.
*/
def getStageInfo(stageId: Int): Option[SparkStageInfo] = {
jobProgressListener.synchronized {
for (
info <- jobProgressListener.stageIdToInfo.get(stageId);
data <- jobProgressListener.stageIdToData.get((stageId, info.attemptId))
) yield {
new SparkStageInfoImpl(
stageId,
info.attemptId,
info.submissionTime.getOrElse(0),
info.name,
info.numTasks,
data.numActiveTasks,
data.numCompleteTasks,
data.numFailedTasks)
}
try {
val info = store.lastStageAttempt(stageId)
Some(new SparkStageInfoImpl(
stageId,
info.attemptId,
info.submissionTime.map(_.getTime()).getOrElse(0L),
info.name,
info.numTasks,
info.numActiveTasks,
info.numCompleteTasks,
info.numFailedTasks))
} catch {
case _: NoSuchElementException => None
}
}

/**
* Returns information of all known executors, including host, port, cacheSize, numRunningTasks.
*/
def getExecutorInfos: Array[SparkExecutorInfo] = {
val executorIdToRunningTasks: Map[String, Int] =
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors

sc.getExecutorStorageStatus.map { status =>
val bmId = status.blockManagerId
store.executorList(true).map { exec =>
val (host, port) = exec.hostPort.split(":", 2) match {
case Array(h, p) => (h, p.toInt)
case Array(h) => (h, -1)
}
new SparkExecutorInfoImpl(
bmId.host,
bmId.port,
status.cacheSize,
executorIdToRunningTasks.getOrElse(bmId.executorId, 0)
)
}
host,
port,
exec.maxMemory,
exec.activeTasks)
}.toArray
}
}
26 changes: 25 additions & 1 deletion core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets
import java.security.SecureRandom
import java.security.cert.X509Certificate
import java.util.Arrays
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
import java.util.jar.{JarEntry, JarOutputStream}
import javax.net.ssl._
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
Expand Down Expand Up @@ -232,6 +232,30 @@ private[spark] object TestUtils {
}
}

/**
* Wait until at least `numExecutors` executors are up, or throw `TimeoutException` if the waiting
* time elapsed before `numExecutors` executors up. Exposed for testing.
*
* @param numExecutors the number of executors to wait at least
* @param timeout time to wait in milliseconds
*/
private[spark] def waitUntilExecutorsUp(
sc: SparkContext,
numExecutors: Int,
timeout: Long): Unit = {
val finishTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout)
while (System.nanoTime() < finishTime) {
if (sc.statusTracker.getExecutorInfos.length > numExecutors) {
return
}
// Sleep rather than using wait/notify, because this is used only for testing and wait/notify
// add overhead in the general case.
Thread.sleep(10)
}
throw new TimeoutException(
s"Can't find $numExecutors executors before $timeout milliseconds elapsed")
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.scheduler.StageInfo
import org.apache.spark.status.api.v1.StageStatus._
import org.apache.spark.status.api.v1.TaskSorting._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.UIData.StageUIData

@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class StagesResource extends BaseAppResource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.scheduler._
import org.apache.spark.status.AppStateStore
import org.apache.spark.status.api.v1
import org.apache.spark.ui._
import org.apache.spark.ui.jobs.UIData.{JobUIData, StageUIData}
import org.apache.spark.util.Utils

/** Page showing list of all ongoing and recently finished jobs */
Expand Down
Loading

0 comments on commit d8f3598

Please sign in to comment.